# -*- coding: utf-8 -*-
"""
This is the :mod:`conducted_emission.ce` module.
:author: Hans Georg Krauthäuser (main author)
:license: GPLv3 or higher
"""
import os
from typing import Iterable, Protocol, TextIO, Any
from collections.abc import Callable
import logging
import csv
import pathvalidate
import time
[docs]
class limitline(Protocol):
def __call__(self, freq_or_freqs: Iterable[float] | float) -> Iterable[float] | float:
...
import numpy as np
from matplotlib import pyplot as plt
from matplotlib.ticker import EngFormatter
from mpylab.tools.aunits import AMPLITUDERATIO, VOLT, WATT
from mpylab.tools.unit_conversion import from_quantity
from mpylab.tools.mgraph import MGraph
from mpylab.tools.levenshtein import fstrcmp
from scuq.quantities import Quantity
from scuq.si import VOLT
[docs]
class Stdout_Logger:
"""
Stdout_Logger class.
Simply prints the message to stdout.
"""
def __call__(self, message: str) -> None:
"""
Prints the message to stdout.
:param message: the leg message
:return: None
"""
print(message)
[docs]
class File_Logger:
"""
File_Logger class.
A logger intended to log to file.
"""
levels = {'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL}
[docs]
def __init__(self, logger):
"""
Constructor
:param logger: Instance of the logging object
"""
self.logger = logger
def __call__(self, message, typ='info'):
"""
Prints the message to the logger.
:param message: the log message
:param typ: one of 'debug', 'info', 'warning', 'error', 'critical'; defaults to 'info'
:return: None
"""
self.logger.log(self.levels[typ.lower()], message)
[docs]
class ConductedEmission:
"""
Conducted Emission class
Used to perform conducted emission EMC measurements using an
EMC-Receiver and a Line-Impedance Stabilized Network (LISN)
"""
[docs]
def __init__(self,
description: str | None = None,
log_fn: Callable[[str], None] | None = None,
export_pre_fn: Callable[[None], None] | None = None,
export_fin_fn: Callable[[None], None] | None = None,
output_path: str | None = None,
prefix_prescan: str | None = None,
finscan_prefix: str | None = None) -> None:
"""
Constructor
:param description: str; general description of the measurement
:param log_fn: callback for logging messages
:param export_pre_fn: callback for exporting conducted emission prescan measurements
:param export_fin_fn: callback for exporting conducted emission final measurements
:param output_path: file save path if internal export functions are used
:param prefix_prescan: file prefix for prescan measurements if internal export functions are used
:param finscan_prefix: file prefix for finscan measurements if internal export functions are used
"""
self.paths = None
self.path=None
self.names = None
self.mg = None
self.searchpaths = None
self.limitline = None
self.margin = None
self.freqs = None
if output_path is None:
self.output_path = os.getcwd()
else:
self.output_path = output_path
if prefix_prescan is None:
self.prefix_prescan = 'prescan'
else:
self.prefix_prescan = prefix_prescan
if finscan_prefix is None:
self.finscan_prefix = 'finscan'
else:
self.finscan_prefix = finscan_prefix
if log_fn is None:
self.log_fn = Stdout_Logger()
else:
self.log_fn = log_fn
if export_pre_fn is None:
self.export_pre_fn = self.export_prescan
else:
self.export_pre_fn = export_pre_fn
if export_fin_fn is None:
self.export_fin_fn = self.export_finscan
else:
self.export_fin_fn = export_fin_fn
if description is None:
self.description = "Conducted Emission"
else:
self.description = description
# data structures to save scan data
# keys are description, paths and str(freqs)
self.prescan_data = {}
self.finscan_data = {}
[docs]
def prepare_scan(self,
dot: str | TextIO,
names: dict | None = None,
searchpaths: Iterable[str] | None = None,
paths: Iterable[str] | None = None,
freqs: Iterable[float] | None = None,
limitline: limitline = None,
dut_description: str | None = None) -> None:
"""
Prepare conducted emission measurement
:param dot: filename, file-like object or data; the graph dot file
:param names: dict; maps internal used names 'eut', 'lisn', 'rec' to the names in dot file
:param searchpaths: list; search paths for ini-files and data-files
:param paths: list or tuple; the different measurent paths, e.g. "('L', 'N')", or "('L1', 'L2', 'L3', 'N')"
:param freqs: list; the frequencies
:param limitline: callable or None; gives the actual limit at freq
:param dut_description: str; description of the device under test (DUT)
:return: None
"""
if dut_description is None:
self.dut_description = "DUT"
else:
self.dut_description = dut_description
if names is None:
self.names = {'rec': 'rec', 'lisn': 'lisn', 'eut': 'eut'}
else:
self.names = names
if searchpaths is None:
self.searchpaths = ['.']
else:
self.searchpaths = searchpaths
if paths is None:
self.paths = ('L', 'N')
else:
self.paths = paths
if freqs is None:
self.freqs = []
else:
self.freqs = freqs
self.limitline = limitline
# create instance of MGraph, Create device instances and Init instruments
self.mg = MGraph(dot, self.names, SearchPaths=self.searchpaths)
self.mg.CreateDevices()
self.mg.Init_Devices()
[docs]
def uniquify(self, path):
"""
Method to uniquify a path: append three digit counter until unique
filemame is sanitized first.
:param path: str: full path to uniquify
:return: str: uniquified path
"""
head, tail = os.path.split(path)
filename, extension = os.path.splitext(tail)
filename = filename.replace(' ', '_')
filename = pathvalidate.sanitize_filename(filename)
path = os.path.join(head, filename)
path = f'{path}{extension}'
counter = 1
while os.path.exists(path):
path = os.path.join(head, filename)
path = f'{path}-{counter:03d}{extension}'
counter += 1
return path
[docs]
def export_prescan(self):
"""
Method to export prescan data to csv file
:return: None
"""
descriptions = self.prescan_data.keys()
for d in descriptions:
paths = self.prescan_data[d].keys()
for p in paths:
data = self.prescan_data[d][p]
short_d = self.dut_description.splitlines()[0]
freqs = [float(f) for f in data.keys()]
freqs.sort()
filename = f'{self.prefix_prescan}-{short_d}-{p}.csv'
filename = os.path.join(self.output_path, filename)
filename = self.uniquify(filename)
with open(filename, 'a', newline='', encoding='utf-8') as csvfile:
csvwriter = csv.writer(csvfile,
delimiter=';',
quoting=csv.QUOTE_NONE)
csvwriter.writerow([f'# Conducted Emission PreScan Output File', None , None])
csvwriter.writerow([f'# Time: {time.ctime()}', None , None])
csvwriter.writerow(['# Description:', None , None])
for _l in d.splitlines():
csvwriter.writerow([f'# {_l}:', None, None])
csvwriter.writerow(['# DUT Description:', None , None])
for _l in self.dut_description.splitlines():
csvwriter.writerow([f'# {_l}', None, None])
csvwriter.writerow([f'# Measuring Path: {p}', None, None])
csvwriter.writerow(['# freq in Hz', 'value (scuq)', 'value (dBuV)'])
for f in freqs:
dat = data[str(f)]
dBuV_dat = from_quantity('dBuV', dat)
csvwriter.writerow([f]+[dat, dBuV_dat])
[docs]
def export_finscan(self):
"""
Method to export finscan data to csv file
:return: None
"""
descriptions = self.finscan_data.keys()
for d in descriptions:
short_d = self.dut_description.splitlines()[0]
paths = self.finscan_data[d].keys()
for p in paths:
data = self.finscan_data[d][p]
freqs = [float(f) for f in data.keys()]
freqs.sort()
filename = f'{self.finscan_prefix}-{short_d}-{p}.csv'
filename = os.path.join(self.output_path, filename)
filename = self.uniquify(filename)
with open(filename, 'a', newline='', encoding='utf-8') as csvfile:
csvwriter = csv.writer(csvfile,
delimiter=';',
quoting=csv.QUOTE_NONE)
csvwriter.writerow([f'# Conducted Emission FinScan Output File', None , None, None , None])
csvwriter.writerow([f'# Time: {time.ctime()}', None , None, None , None])
csvwriter.writerow(['# Description:', None , None, None , None])
for _l in d.splitlines():
csvwriter.writerow([f'# {_l}', None, None, None, None])
csvwriter.writerow(['# DUT Description:', None , None, None, None])
for _l in self.dut_description.splitlines():
csvwriter.writerow([f'# {_l}', None, None, None, None])
csvwriter.writerow([f'# Measuring Path: {p}', None, None, None , None])
csvwriter.writerow(['# freq in Hz', 'value (scuq)', 'value (dBuV)', 'limit (dBuV)', 'status'])
n_failed = 0
n_passed = 0
for f in freqs:
dat = data[str(f)]
dBuV_dat = from_quantity('dBuV', dat)
lim = self.limitline(f)
if dBuV_dat > lim:
status = 'Failed'
n_failed += 1
else:
status = 'Passed'
n_passed += 1
csvwriter.writerow([f]+[dat, dBuV_dat, lim, status])
csvwriter.writerow([f'# Freqs Measured: {n_failed + n_passed}, Freqs Failed: {n_failed}', None, None, None, None])
[docs]
def do_pre_scan(self,
paths: Iterable[str] | None = None,
detector: str = 'PEAK',
should_stop: Callable[[], bool] | None = None,
wait_if_paused: Callable[[], bool] | None = None,
progress: Callable[[object], None] | None = None):
"""
Perform a pre scan (PEAK or AVERAGE-Detector) conducted emission measurement
:param paths: list or tuple; e.g. "('L, 'N')", or "('L1', 'L2', 'L3', 'N')"
:param detector: str; either 'PEAK' or 'AVERAGE'
:return: None
"""
if should_stop is None:
should_stop = lambda: False
if wait_if_paused is None:
wait_if_paused = lambda: False
if progress is None:
progress = lambda _payload: None
if paths is None:
paths = self.paths
self.log_fn("Start Prescan....")
self.init_prescan(detector=detector)
stop_requested = False
for path in paths:
self.pre_scan_set_path(path)
for f in self.freqs:
if should_stop() or not wait_if_paused():
self.log_fn("Prescan stopped.")
stop_requested = True
break
data = self.pre_scan_do_freq(f)
self.log_fn(f'Freq: {f}, Path: {path} Data: {data}')
progress({'path': path, 'f': f, 'data': data, 'pre_or_final': 'pre'})
if stop_requested:
break
if not stop_requested:
self.log_fn("Prescan finished.")
[docs]
def init_prescan(self,
detector: str ='PEAK') -> None:
"""
Setup pre-scan
:param detector: str; either 'PEAK' or 'AVERAGE'
:return: None
"""
self.prescan_data.setdefault(self.description, {})
self.mg.rec.SetDetector(detector)
[docs]
def pre_scan_set_path(self,
path: str) -> None:
"""
Set measurement path for pre scan
:param path: str; in '('L', 'N')', or '('L1', 'L2', 'L3', 'N')
:return: None
"""
self.mg.lisn.SetPath(path)
self.prescan_data[self.description][path] = {}
self.path = path
[docs]
def pre_scan_do_freq(self,
freq: float) -> Quantity:
"""
Perform pre scan for one frequency
:param freq: float; the frequency
:return: data; scuq object with pre scan data for this frequency
"""
# set frequency for graph
self.mg.SetFreq_Devices(freq)
# read receiver
data = None
while data is None:
err, data = self.mg.rec.GetData()
time.sleep(0.1)
# calculate insertion loss EUT -> REC
c_eut_rec = self.mg.get_path_correction(self.names['eut'], self.names['rec'], AMPLITUDERATIO)
# correct for insertion loss
data = data / c_eut_rec
data = data.reduce_to(VOLT)
# save date
self.prescan_data[self.description][self.path][str(freq)] = data
# return data
return data
[docs]
def do_final_scan(self,
paths: Iterable[str] | None = None,
margin: float = 6,
detector: str | None = None,
should_stop: Callable[[], bool] | None = None,
wait_if_paused: Callable[[], bool] | None = None,
progress: Callable[[object], None] | None = None) -> None:
"""
Perform a final conducted emission measurement
QPEAK measurements are performed at frequencies and paths where prescan is larger
that limit - margin (on dB scale)
:param paths: list or tuple; e.g. ['L', 'N']
:param margin: float; dB value below limit
:return: None
"""
if should_stop is None:
should_stop = lambda : False
if wait_if_paused is None:
wait_if_paused = lambda : False
if progress is None:
progress = lambda _payload: None
if detector is None:
detector = "QPEAK"
if paths is None:
paths = self.paths
self.log_fn("Start Final Scan....")
self.init_finscan(margin=margin, detector=detector)
stop_requested = False
for path in paths:
self.finscan_set_path(path)
for f in self.freqs:
if should_stop() or not wait_if_paused():
self.log_fn("Finscan stopped.")
stop_requested = True
break
data, lim = self.finscan_do_freq(f)
if data is None:
continue
progress({'path': path, 'f': f, 'data': data, 'pre_or_final': 'fin'})
self.log_fn(f'Freq: {f}, Path: {path} Data: {data} Limit: {lim}')
if stop_requested:
break
self.log_fn("Final scan finished....")
[docs]
def init_finscan(self,
margin: float = 6,
detector: str | None = None) -> None:
"""
Setup final conducted emission measurement
:param margin: float; dB value below limit
:return: None
"""
if detector is None:
detector = 'QPEAK'
detector = fstrcmp(detector, ("QPEAK", "AVERAGE", "PEAK"))[0]
detector = detector.upper()
self.margin = margin
self.finscan_data.setdefault(self.description, {})
self.mg.rec.SetDetector(detector)
[docs]
def finscan_set_path(self,
path: str) -> None:
"""
Set measurement path for final conducted emission
:param path: str; e.g. from ['L', 'N']
:return: None
"""
self.mg.lisn.SetPath(path)
self.finscan_data[self.description][path] = {}
self.path = path
[docs]
def finscan_do_freq(self,
freq: float) -> tuple[Quantity | None, Quantity | None]:
"""
Perform final conducted emission measurement for one frequency
:param freq: float; the frequency
:return: data; scuq object with final conducted emission data for this frequency
"""
mfac = 10**(self.margin/20.)
pre = self.prescan_data[self.description][self.path][str(freq)]
lim = self.limitline(freq) # in dBuV
if lim is None:
return None, None
try:
if np.isnan(lim): # no limit
return None, None
except TypeError:
pass
lim = 10 ** (lim / 20.) * 1e-6 # in Volt
lim = Quantity(VOLT, lim)
if pre * mfac < lim: # margin holt
return None, lim
self.mg.SetFreq_Devices(freq)
data = None
while data is None:
err, data = self.mg.rec.GetData()
time.sleep(0.1)
c_eut_rec = self.mg.get_path_correction(self.names['eut'], self.names['rec'], AMPLITUDERATIO)
data = data / c_eut_rec
data = data.reduce_to(VOLT)
if not data is None:
self.finscan_data[self.description][self.path][str(freq)] = data
return data, lim
[docs]
def quit(self, log: Callable[[str], None] | None = None) -> None:
"""
Exit the conducted emission measurement
Quits all devices in Graph
:return: None
"""
if log is None:
log = lambda s: None
log("Quitting Devices")
self.mg.Quit_Devices()
[docs]
class ce_plot:
"""
Class providing plot facilities for Conducted Emission Measurement
"""
[docs]
def __init__(self,
paths: Iterable[str] | None = None,
limit_dct: dict | None = None,
limit_legend: str | None = None,
unit: str | None =None,
margin: float = 6,
fmin: float | None = None,
fmax: float | None = None) -> None:
"""
Constructor for class ce_plot
:param paths: list or tuple; e.g. ['L', 'N']
:param limit_dct: dictionary with keys 'freq' and 'limit'; both are lists (limit is a dB value)
:param limit_legend: str; provides the legend for the limit
:param unit: str; provides the unit of measurement
:param margin: float; dB value below limit
"""
if paths is None:
self.paths = ('L', 'N')
else:
self.paths = paths
if limit_legend is None:
self.limit_legend = "Limit Line"
else:
self.limit_legend = limit_legend
if unit is None:
self.unit = 'dbµV'
else:
self.unit = unit
if fmin is None:
fmin = 10e3
if fmax is None:
fmax = 100e6
# fig with two plot (pre and fin)
self.fig, (ax_p, ax_f) = plt.subplots(2,1)
# axis objects
self.ax = {'pre': ax_p, 'fin': ax_f}
# freqs
self.freqs = {'pre': {}, 'fin': {}}
# data
self.data = {'pre': {}, 'fin': {}}
# data_line object are used to update plots
self.data_line = {'pre': {}, 'fin': {}}
self.data_markers = {'pre': ',', 'fin': '.'}
self.data_linestyle = {'pre': '-', 'fin': 'None'}
# Hz. kHz, MHz, GHz on freq axis
formatter0 = EngFormatter(unit='Hz')
x_min = fmin
x_max = fmax
y_min = -20
y_max = 100
for k in ('pre', 'fin'):
self.ax[k].set_ylabel(f'Voltage in {self.unit}')
self.ax[k].set_xlim(x_min, x_max)
self.ax[k].set_ylim(y_min, y_max)
self.ax[k].grid(True, which='both')
self.ax[k].set_xscale('log')
self.ax[k].xaxis.set_major_formatter(formatter0)
self.ax[k].annotate(k, xy=(x_min,y_max), color='r')
if limit_dct:
self.ax[k].plot('freq', 'limit', data=limit_dct,
ls='-', color='r',
label=limit_legend)
mg = {'freq': limit_dct['freq'],
'margin': [v-margin for v in limit_dct['limit']]}
self.ax[k].plot('freq', 'margin', data=mg,
ls="--", color='r',
label=f"Limit - {margin} dB")
for path in self.paths:
self.freqs[k][path] = np.array([], dtype=float)
self.data[k][path] = np.array([], dtype=float)
self.data_line[k][path], = self.ax[k].plot(self.freqs[k][path],
self.data[k][path],
marker=self.data_markers[k],
linestyle=self.data_linestyle[k],
label=f'{path}')
self.ax['fin'].set_xlabel('Frequency')
self.ax['pre'].legend(bbox_to_anchor=(0,1.05),
loc='lower left',
frameon=False,
fontsize=6,
borderpad=1,
ncol=2)
plt.tight_layout()
[docs]
def update_data_plot(self,
typ: str,
path: str,
freqs: Iterable[float],
data: Iterable[float]) -> None:
"""
Update the plot with (freq, data) tuples for all freq in freqs
:param typ: str; 'pre' or 'fin' to indicate pre scan or final scan
:param path: str; e.g. 'L' or 'N'
:param freqs: iterable; the frequencies
:param data: iterable; the data (dB values)
:return: None
"""
self.freqs[typ][path] = np.append(self.freqs[typ][path], freqs)
self.data[typ][path] = np.append(self.data[typ][path], data)
self.data_line[typ][path].set_data(self.freqs[typ][path], self.data[typ][path])
self.fig.canvas.draw()
self.fig.canvas.flush_events()
[docs]
def quit(self) -> None:
"""
Quit: set interactive mode off
:return: None
"""
plt.ioff()
if __name__ == '__main__':
import sys
import io
from mpylab.tools.util import format_block
from mpylab.tools.spacing import logspace
from mpylab.limits.conducted_emission.en_55011 import LIMIT
try:
dot = sys.argv[1]
except (IndexError):
dot = format_block("""
digraph {
rec [ini="rec_rs_eshs30.ini"]
lisn [ini="vlisn_rs_env216.ini"]
eut -> lisn
lisn -> rec [dev=lisn what="S21"]}""")
dot = io.StringIO(dot)
group = '1'
classification = 'B'
detector = 'QP'
port = 'AC (≤ 20 kVA)'
margin = 6
limit = LIMIT(group=group,
classification=classification,
detector=detector,
port=port)
limitline = limit.limitline
limit_legend = f"{limit.description_title}_{group}_{classification}_{detector}_{port}"
names = {'rec': 'rec', 'lisn': 'lisn', 'eut': 'eut'}
freqs = logspace(150e3, 30e6, 1.01)
lim_dct = {'freq': freqs, 'limit': limitline(freqs)}
plt.ion()
ceplt = ce_plot(paths=('L', 'N'),
limit_dct=lim_dct,
limit_legend=limit_legend,
unit=limit.unit,
margin=margin)
# set up logging
logfile = 'conducted_emission.log'
logger = logging.getLogger(__name__)
logging.basicConfig(filename=logfile,
encoding='utf-8',
format='%(asctime)s %(levelname)s:%(message)s',
level=logging.INFO)
logger.info(f"Start Logging to {logfile}.")
log_fn = File_Logger(logger)
# log_fn = Stdout_Logger()
ce = ConductedEmission(log_fn=log_fn)
ce.prepare_scan(dot,
names=names,
searchpaths=['conf'],
freqs=freqs,
paths=('L', 'N'),
limitline=limitline)
ce.init_prescan()
ce.log_fn("Start Prescan....")
last_plot = 0
for path in ce.paths:
ce.pre_scan_set_path(path)
_f = []
_d = []
for freq in ce.freqs:
data = ce.pre_scan_do_freq(freq)
_f.append(freq)
_d.append(ce.mg.rec._get_db_from_obj(data))
ce.log_fn(f'Freq: {freq}, Path: {path} Data: {data}')
now = time.time()
if now - last_plot > 2:
ceplt.update_data_plot('pre', path, _f, _d)
_f = []
_d = []
last_plot = time.time()
ceplt.update_data_plot('pre', path, _f, _d)
ce.log_fn("Prescan finished....")
ce.export_prescan()
ce.init_finscan(margin=margin)
ce.log_fn("Start Final Scan....")
last_plot = 0
for path in ce.paths:
ce.finscan_set_path(path)
_f = []
_d = []
for freq in ce.freqs:
data, lim = ce.finscan_do_freq(freq)
if data:
_f.append(freq)
_d.append(ce.mg.rec._get_db_from_obj(data))
ce.log_fn(f'Freq: {freq}, Path: {path} Data: {data} Limit: {lim}')
now = time.time()
if now - last_plot > 2:
ceplt.update_data_plot('fin', path, _f, _d)
_f = []
_d = []
last_plot = time.time()
ceplt.update_data_plot('fin', path, _f, _d)
ce.log_fn("Final Scan finished....")
ce.export_finscan()
ce.quit()
# 'block=True' ensures that the plot stays on screen
plt.show(block=True)