Source code for conducted_emission_mpylab.ce

# -*- coding: utf-8 -*-
"""
This is the :mod:`conducted_emission.ce` module.

   :author: Hans Georg Krauthäuser (main author)

   :license: GPLv3 or higher
"""
import os
from typing import Iterable, Protocol, TextIO, Any
from collections.abc import Callable
import logging
import csv
import pathvalidate
import time

[docs] class limitline(Protocol): def __call__(self, freq_or_freqs: Iterable[float] | float) -> Iterable[float] | float: ...
import numpy as np from matplotlib import pyplot as plt from matplotlib.ticker import EngFormatter from mpylab.tools.aunits import AMPLITUDERATIO, VOLT, WATT from mpylab.tools.unit_conversion import from_quantity from mpylab.tools.mgraph import MGraph from mpylab.tools.levenshtein import fstrcmp from scuq.quantities import Quantity from scuq.si import VOLT
[docs] class Stdout_Logger: """ Stdout_Logger class. Simply prints the message to stdout. """ def __call__(self, message: str) -> None: """ Prints the message to stdout. :param message: the leg message :return: None """ print(message)
[docs] class File_Logger: """ File_Logger class. A logger intended to log to file. """ levels = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL}
[docs] def __init__(self, logger): """ Constructor :param logger: Instance of the logging object """ self.logger = logger
def __call__(self, message, typ='info'): """ Prints the message to the logger. :param message: the log message :param typ: one of 'debug', 'info', 'warning', 'error', 'critical'; defaults to 'info' :return: None """ self.logger.log(self.levels[typ.lower()], message)
[docs] class ConductedEmission: """ Conducted Emission class Used to perform conducted emission EMC measurements using an EMC-Receiver and a Line-Impedance Stabilized Network (LISN) """
[docs] def __init__(self, description: str | None = None, log_fn: Callable[[str], None] | None = None, export_pre_fn: Callable[[None], None] | None = None, export_fin_fn: Callable[[None], None] | None = None, output_path: str | None = None, prefix_prescan: str | None = None, finscan_prefix: str | None = None) -> None: """ Constructor :param description: str; general description of the measurement :param log_fn: callback for logging messages :param export_pre_fn: callback for exporting conducted emission prescan measurements :param export_fin_fn: callback for exporting conducted emission final measurements :param output_path: file save path if internal export functions are used :param prefix_prescan: file prefix for prescan measurements if internal export functions are used :param finscan_prefix: file prefix for finscan measurements if internal export functions are used """ self.paths = None self.path=None self.names = None self.mg = None self.searchpaths = None self.limitline = None self.margin = None self.freqs = None if output_path is None: self.output_path = os.getcwd() else: self.output_path = output_path if prefix_prescan is None: self.prefix_prescan = 'prescan' else: self.prefix_prescan = prefix_prescan if finscan_prefix is None: self.finscan_prefix = 'finscan' else: self.finscan_prefix = finscan_prefix if log_fn is None: self.log_fn = Stdout_Logger() else: self.log_fn = log_fn if export_pre_fn is None: self.export_pre_fn = self.export_prescan else: self.export_pre_fn = export_pre_fn if export_fin_fn is None: self.export_fin_fn = self.export_finscan else: self.export_fin_fn = export_fin_fn if description is None: self.description = "Conducted Emission" else: self.description = description # data structures to save scan data # keys are description, paths and str(freqs) self.prescan_data = {} self.finscan_data = {}
[docs] def prepare_scan(self, dot: str | TextIO, names: dict | None = None, searchpaths: Iterable[str] | None = None, paths: Iterable[str] | None = None, freqs: Iterable[float] | None = None, limitline: limitline = None, dut_description: str | None = None) -> None: """ Prepare conducted emission measurement :param dot: filename, file-like object or data; the graph dot file :param names: dict; maps internal used names 'eut', 'lisn', 'rec' to the names in dot file :param searchpaths: list; search paths for ini-files and data-files :param paths: list or tuple; the different measurent paths, e.g. "('L', 'N')", or "('L1', 'L2', 'L3', 'N')" :param freqs: list; the frequencies :param limitline: callable or None; gives the actual limit at freq :param dut_description: str; description of the device under test (DUT) :return: None """ if dut_description is None: self.dut_description = "DUT" else: self.dut_description = dut_description if names is None: self.names = {'rec': 'rec', 'lisn': 'lisn', 'eut': 'eut'} else: self.names = names if searchpaths is None: self.searchpaths = ['.'] else: self.searchpaths = searchpaths if paths is None: self.paths = ('L', 'N') else: self.paths = paths if freqs is None: self.freqs = [] else: self.freqs = freqs self.limitline = limitline # create instance of MGraph, Create device instances and Init instruments self.mg = MGraph(dot, self.names, SearchPaths=self.searchpaths) self.mg.CreateDevices() self.mg.Init_Devices()
[docs] def uniquify(self, path): """ Method to uniquify a path: append three digit counter until unique filemame is sanitized first. :param path: str: full path to uniquify :return: str: uniquified path """ head, tail = os.path.split(path) filename, extension = os.path.splitext(tail) filename = filename.replace(' ', '_') filename = pathvalidate.sanitize_filename(filename) path = os.path.join(head, filename) path = f'{path}{extension}' counter = 1 while os.path.exists(path): path = os.path.join(head, filename) path = f'{path}-{counter:03d}{extension}' counter += 1 return path
[docs] def export_prescan(self): """ Method to export prescan data to csv file :return: None """ descriptions = self.prescan_data.keys() for d in descriptions: paths = self.prescan_data[d].keys() for p in paths: data = self.prescan_data[d][p] short_d = self.dut_description.splitlines()[0] freqs = [float(f) for f in data.keys()] freqs.sort() filename = f'{self.prefix_prescan}-{short_d}-{p}.csv' filename = os.path.join(self.output_path, filename) filename = self.uniquify(filename) with open(filename, 'a', newline='', encoding='utf-8') as csvfile: csvwriter = csv.writer(csvfile, delimiter=';', quoting=csv.QUOTE_NONE) csvwriter.writerow([f'# Conducted Emission PreScan Output File', None , None]) csvwriter.writerow([f'# Time: {time.ctime()}', None , None]) csvwriter.writerow(['# Description:', None , None]) for _l in d.splitlines(): csvwriter.writerow([f'# {_l}:', None, None]) csvwriter.writerow(['# DUT Description:', None , None]) for _l in self.dut_description.splitlines(): csvwriter.writerow([f'# {_l}', None, None]) csvwriter.writerow([f'# Measuring Path: {p}', None, None]) csvwriter.writerow(['# freq in Hz', 'value (scuq)', 'value (dBuV)']) for f in freqs: dat = data[str(f)] dBuV_dat = from_quantity('dBuV', dat) csvwriter.writerow([f]+[dat, dBuV_dat])
[docs] def export_finscan(self): """ Method to export finscan data to csv file :return: None """ descriptions = self.finscan_data.keys() for d in descriptions: short_d = self.dut_description.splitlines()[0] paths = self.finscan_data[d].keys() for p in paths: data = self.finscan_data[d][p] freqs = [float(f) for f in data.keys()] freqs.sort() filename = f'{self.finscan_prefix}-{short_d}-{p}.csv' filename = os.path.join(self.output_path, filename) filename = self.uniquify(filename) with open(filename, 'a', newline='', encoding='utf-8') as csvfile: csvwriter = csv.writer(csvfile, delimiter=';', quoting=csv.QUOTE_NONE) csvwriter.writerow([f'# Conducted Emission FinScan Output File', None , None, None , None]) csvwriter.writerow([f'# Time: {time.ctime()}', None , None, None , None]) csvwriter.writerow(['# Description:', None , None, None , None]) for _l in d.splitlines(): csvwriter.writerow([f'# {_l}', None, None, None, None]) csvwriter.writerow(['# DUT Description:', None , None, None, None]) for _l in self.dut_description.splitlines(): csvwriter.writerow([f'# {_l}', None, None, None, None]) csvwriter.writerow([f'# Measuring Path: {p}', None, None, None , None]) csvwriter.writerow(['# freq in Hz', 'value (scuq)', 'value (dBuV)', 'limit (dBuV)', 'status']) n_failed = 0 n_passed = 0 for f in freqs: dat = data[str(f)] dBuV_dat = from_quantity('dBuV', dat) lim = self.limitline(f) if dBuV_dat > lim: status = 'Failed' n_failed += 1 else: status = 'Passed' n_passed += 1 csvwriter.writerow([f]+[dat, dBuV_dat, lim, status]) csvwriter.writerow([f'# Freqs Measured: {n_failed + n_passed}, Freqs Failed: {n_failed}', None, None, None, None])
[docs] def do_pre_scan(self, paths: Iterable[str] | None = None, detector: str = 'PEAK', should_stop: Callable[[], bool] | None = None, wait_if_paused: Callable[[], bool] | None = None, progress: Callable[[object], None] | None = None): """ Perform a pre scan (PEAK or AVERAGE-Detector) conducted emission measurement :param paths: list or tuple; e.g. "('L, 'N')", or "('L1', 'L2', 'L3', 'N')" :param detector: str; either 'PEAK' or 'AVERAGE' :return: None """ if should_stop is None: should_stop = lambda: False if wait_if_paused is None: wait_if_paused = lambda: False if progress is None: progress = lambda _payload: None if paths is None: paths = self.paths self.log_fn("Start Prescan....") self.init_prescan(detector=detector) stop_requested = False for path in paths: self.pre_scan_set_path(path) for f in self.freqs: if should_stop() or not wait_if_paused(): self.log_fn("Prescan stopped.") stop_requested = True break data = self.pre_scan_do_freq(f) self.log_fn(f'Freq: {f}, Path: {path} Data: {data}') progress({'path': path, 'f': f, 'data': data, 'pre_or_final': 'pre'}) if stop_requested: break if not stop_requested: self.log_fn("Prescan finished.")
[docs] def init_prescan(self, detector: str ='PEAK') -> None: """ Setup pre-scan :param detector: str; either 'PEAK' or 'AVERAGE' :return: None """ self.prescan_data.setdefault(self.description, {}) self.mg.rec.SetDetector(detector)
[docs] def pre_scan_set_path(self, path: str) -> None: """ Set measurement path for pre scan :param path: str; in '('L', 'N')', or '('L1', 'L2', 'L3', 'N') :return: None """ self.mg.lisn.SetPath(path) self.prescan_data[self.description][path] = {} self.path = path
[docs] def pre_scan_do_freq(self, freq: float) -> Quantity: """ Perform pre scan for one frequency :param freq: float; the frequency :return: data; scuq object with pre scan data for this frequency """ # set frequency for graph self.mg.SetFreq_Devices(freq) # read receiver data = None while data is None: err, data = self.mg.rec.GetData() time.sleep(0.1) # calculate insertion loss EUT -> REC c_eut_rec = self.mg.get_path_correction(self.names['eut'], self.names['rec'], AMPLITUDERATIO) # correct for insertion loss data = data / c_eut_rec data = data.reduce_to(VOLT) # save date self.prescan_data[self.description][self.path][str(freq)] = data # return data return data
[docs] def do_final_scan(self, paths: Iterable[str] | None = None, margin: float = 6, detector: str | None = None, should_stop: Callable[[], bool] | None = None, wait_if_paused: Callable[[], bool] | None = None, progress: Callable[[object], None] | None = None) -> None: """ Perform a final conducted emission measurement QPEAK measurements are performed at frequencies and paths where prescan is larger that limit - margin (on dB scale) :param paths: list or tuple; e.g. ['L', 'N'] :param margin: float; dB value below limit :return: None """ if should_stop is None: should_stop = lambda : False if wait_if_paused is None: wait_if_paused = lambda : False if progress is None: progress = lambda _payload: None if detector is None: detector = "QPEAK" if paths is None: paths = self.paths self.log_fn("Start Final Scan....") self.init_finscan(margin=margin, detector=detector) stop_requested = False for path in paths: self.finscan_set_path(path) for f in self.freqs: if should_stop() or not wait_if_paused(): self.log_fn("Finscan stopped.") stop_requested = True break data, lim = self.finscan_do_freq(f) if data is None: continue progress({'path': path, 'f': f, 'data': data, 'pre_or_final': 'fin'}) self.log_fn(f'Freq: {f}, Path: {path} Data: {data} Limit: {lim}') if stop_requested: break self.log_fn("Final scan finished....")
[docs] def init_finscan(self, margin: float = 6, detector: str | None = None) -> None: """ Setup final conducted emission measurement :param margin: float; dB value below limit :return: None """ if detector is None: detector = 'QPEAK' detector = fstrcmp(detector, ("QPEAK", "AVERAGE", "PEAK"))[0] detector = detector.upper() self.margin = margin self.finscan_data.setdefault(self.description, {}) self.mg.rec.SetDetector(detector)
[docs] def finscan_set_path(self, path: str) -> None: """ Set measurement path for final conducted emission :param path: str; e.g. from ['L', 'N'] :return: None """ self.mg.lisn.SetPath(path) self.finscan_data[self.description][path] = {} self.path = path
[docs] def finscan_do_freq(self, freq: float) -> tuple[Quantity | None, Quantity | None]: """ Perform final conducted emission measurement for one frequency :param freq: float; the frequency :return: data; scuq object with final conducted emission data for this frequency """ mfac = 10**(self.margin/20.) pre = self.prescan_data[self.description][self.path][str(freq)] lim = self.limitline(freq) # in dBuV if lim is None: return None, None try: if np.isnan(lim): # no limit return None, None except TypeError: pass lim = 10 ** (lim / 20.) * 1e-6 # in Volt lim = Quantity(VOLT, lim) if pre * mfac < lim: # margin holt return None, lim self.mg.SetFreq_Devices(freq) data = None while data is None: err, data = self.mg.rec.GetData() time.sleep(0.1) c_eut_rec = self.mg.get_path_correction(self.names['eut'], self.names['rec'], AMPLITUDERATIO) data = data / c_eut_rec data = data.reduce_to(VOLT) if not data is None: self.finscan_data[self.description][self.path][str(freq)] = data return data, lim
[docs] def quit(self, log: Callable[[str], None] | None = None) -> None: """ Exit the conducted emission measurement Quits all devices in Graph :return: None """ if log is None: log = lambda s: None log("Quitting Devices") self.mg.Quit_Devices()
[docs] class ce_plot: """ Class providing plot facilities for Conducted Emission Measurement """
[docs] def __init__(self, paths: Iterable[str] | None = None, limit_dct: dict | None = None, limit_legend: str | None = None, unit: str | None =None, margin: float = 6, fmin: float | None = None, fmax: float | None = None) -> None: """ Constructor for class ce_plot :param paths: list or tuple; e.g. ['L', 'N'] :param limit_dct: dictionary with keys 'freq' and 'limit'; both are lists (limit is a dB value) :param limit_legend: str; provides the legend for the limit :param unit: str; provides the unit of measurement :param margin: float; dB value below limit """ if paths is None: self.paths = ('L', 'N') else: self.paths = paths if limit_legend is None: self.limit_legend = "Limit Line" else: self.limit_legend = limit_legend if unit is None: self.unit = 'dbµV' else: self.unit = unit if fmin is None: fmin = 10e3 if fmax is None: fmax = 100e6 # fig with two plot (pre and fin) self.fig, (ax_p, ax_f) = plt.subplots(2,1) # axis objects self.ax = {'pre': ax_p, 'fin': ax_f} # freqs self.freqs = {'pre': {}, 'fin': {}} # data self.data = {'pre': {}, 'fin': {}} # data_line object are used to update plots self.data_line = {'pre': {}, 'fin': {}} self.data_markers = {'pre': ',', 'fin': '.'} self.data_linestyle = {'pre': '-', 'fin': 'None'} # Hz. kHz, MHz, GHz on freq axis formatter0 = EngFormatter(unit='Hz') x_min = fmin x_max = fmax y_min = -20 y_max = 100 for k in ('pre', 'fin'): self.ax[k].set_ylabel(f'Voltage in {self.unit}') self.ax[k].set_xlim(x_min, x_max) self.ax[k].set_ylim(y_min, y_max) self.ax[k].grid(True, which='both') self.ax[k].set_xscale('log') self.ax[k].xaxis.set_major_formatter(formatter0) self.ax[k].annotate(k, xy=(x_min,y_max), color='r') if limit_dct: self.ax[k].plot('freq', 'limit', data=limit_dct, ls='-', color='r', label=limit_legend) mg = {'freq': limit_dct['freq'], 'margin': [v-margin for v in limit_dct['limit']]} self.ax[k].plot('freq', 'margin', data=mg, ls="--", color='r', label=f"Limit - {margin} dB") for path in self.paths: self.freqs[k][path] = np.array([], dtype=float) self.data[k][path] = np.array([], dtype=float) self.data_line[k][path], = self.ax[k].plot(self.freqs[k][path], self.data[k][path], marker=self.data_markers[k], linestyle=self.data_linestyle[k], label=f'{path}') self.ax['fin'].set_xlabel('Frequency') self.ax['pre'].legend(bbox_to_anchor=(0,1.05), loc='lower left', frameon=False, fontsize=6, borderpad=1, ncol=2) plt.tight_layout()
[docs] def update_data_plot(self, typ: str, path: str, freqs: Iterable[float], data: Iterable[float]) -> None: """ Update the plot with (freq, data) tuples for all freq in freqs :param typ: str; 'pre' or 'fin' to indicate pre scan or final scan :param path: str; e.g. 'L' or 'N' :param freqs: iterable; the frequencies :param data: iterable; the data (dB values) :return: None """ self.freqs[typ][path] = np.append(self.freqs[typ][path], freqs) self.data[typ][path] = np.append(self.data[typ][path], data) self.data_line[typ][path].set_data(self.freqs[typ][path], self.data[typ][path]) self.fig.canvas.draw() self.fig.canvas.flush_events()
[docs] def quit(self) -> None: """ Quit: set interactive mode off :return: None """ plt.ioff()
if __name__ == '__main__': import sys import io from mpylab.tools.util import format_block from mpylab.tools.spacing import logspace from mpylab.limits.conducted_emission.en_55011 import LIMIT try: dot = sys.argv[1] except (IndexError): dot = format_block(""" digraph { rec [ini="rec_rs_eshs30.ini"] lisn [ini="vlisn_rs_env216.ini"] eut -> lisn lisn -> rec [dev=lisn what="S21"]}""") dot = io.StringIO(dot) group = '1' classification = 'B' detector = 'QP' port = 'AC (≤ 20 kVA)' margin = 6 limit = LIMIT(group=group, classification=classification, detector=detector, port=port) limitline = limit.limitline limit_legend = f"{limit.description_title}_{group}_{classification}_{detector}_{port}" names = {'rec': 'rec', 'lisn': 'lisn', 'eut': 'eut'} freqs = logspace(150e3, 30e6, 1.01) lim_dct = {'freq': freqs, 'limit': limitline(freqs)} plt.ion() ceplt = ce_plot(paths=('L', 'N'), limit_dct=lim_dct, limit_legend=limit_legend, unit=limit.unit, margin=margin) # set up logging logfile = 'conducted_emission.log' logger = logging.getLogger(__name__) logging.basicConfig(filename=logfile, encoding='utf-8', format='%(asctime)s %(levelname)s:%(message)s', level=logging.INFO) logger.info(f"Start Logging to {logfile}.") log_fn = File_Logger(logger) # log_fn = Stdout_Logger() ce = ConductedEmission(log_fn=log_fn) ce.prepare_scan(dot, names=names, searchpaths=['conf'], freqs=freqs, paths=('L', 'N'), limitline=limitline) ce.init_prescan() ce.log_fn("Start Prescan....") last_plot = 0 for path in ce.paths: ce.pre_scan_set_path(path) _f = [] _d = [] for freq in ce.freqs: data = ce.pre_scan_do_freq(freq) _f.append(freq) _d.append(ce.mg.rec._get_db_from_obj(data)) ce.log_fn(f'Freq: {freq}, Path: {path} Data: {data}') now = time.time() if now - last_plot > 2: ceplt.update_data_plot('pre', path, _f, _d) _f = [] _d = [] last_plot = time.time() ceplt.update_data_plot('pre', path, _f, _d) ce.log_fn("Prescan finished....") ce.export_prescan() ce.init_finscan(margin=margin) ce.log_fn("Start Final Scan....") last_plot = 0 for path in ce.paths: ce.finscan_set_path(path) _f = [] _d = [] for freq in ce.freqs: data, lim = ce.finscan_do_freq(freq) if data: _f.append(freq) _d.append(ce.mg.rec._get_db_from_obj(data)) ce.log_fn(f'Freq: {freq}, Path: {path} Data: {data} Limit: {lim}') now = time.time() if now - last_plot > 2: ceplt.update_data_plot('fin', path, _f, _d) _f = [] _d = [] last_plot = time.time() ceplt.update_data_plot('fin', path, _f, _d) ce.log_fn("Final Scan finished....") ce.export_finscan() ce.quit() # 'block=True' ensures that the plot stays on screen plt.show(block=True)