Source code for mpylab.tools.mgraph

# -*- coding: utf-8 -*-
"""This is :mod:`mpylab.tools.mgraph`.

   Provides the MGraph class (mainly)

   :author: Hans Georg Krauthäuser (main author)

   :license: GPL-3 or higher
"""
from __future__ import annotations
import os
#import importlib.machinery
import importlib.util
import inspect
from io import StringIO
from typing import Any
from pathlib import Path
import ast
import operator as op

import pydot
import configparser
from numpy import bool_, sqrt, absolute
from numpy import bool_
from scipy.interpolate import interp1d

#import mpylab.device.device as device
from scuq.ucomponents import Context
from scuq.quantities import Quantity
from scuq.si import WATT
from scuq.units import ONE
from mpylab.tools.aunits import AMPLITUDERATIO, POWERRATIO
from mpylab.tools.configuration import fstrcmp
from mpylab.tools.util import extrap1d, locate, format_block


_ALLOWED_BINOPS = {
    ast.Add: op.add,
    ast.Sub: op.sub,
    ast.Mult: op.mul,
    ast.Div: op.truediv,
    ast.FloorDiv: op.floordiv,
    ast.Mod: op.mod,
    ast.Pow: op.pow,
}

_ALLOWED_UNARYOPS = {
    ast.UAdd: op.pos,
    ast.USub: op.neg,
    ast.Not: op.not_,
}

_ALLOWED_CMPOPS = {
    ast.Lt: op.lt,
    ast.LtE: op.le,
    ast.Gt: op.gt,
    ast.GtE: op.ge,
    ast.Eq: op.eq,
    ast.NotEq: op.ne,
}

_ALLOWED_BOOLOPS = {
    ast.And: all,
    ast.Or: any,
}


def safe_action_exec(expr, names):
    """Safely execute one method-call action expression with literal arguments."""
    tree = ast.parse(expr, mode="exec")

    if len(tree.body) != 1 or not isinstance(tree.body[0], ast.Expr):
        raise ValueError("Only one action expression is allowed")

    call = tree.body[0].value
    if not isinstance(call, ast.Call):
        raise ValueError("Only calls are allowed")

    if not isinstance(call.func, ast.Attribute):
        raise ValueError("Only object method calls are allowed")

    if not isinstance(call.func.value, ast.Name):
        raise ValueError("Only simple object names are allowed")

    obj_name = call.func.value.id
    method_name = call.func.attr

    obj = names[obj_name]
    method = getattr(obj, method_name)

    args = [ast.literal_eval(arg) for arg in call.args]
    kwargs = {kw.arg: ast.literal_eval(kw.value) for kw in call.keywords}

    return method(*args, **kwargs)

def safe_condition_eval(expr, names):
    """
    Safe evaluator for graph conditions.

    Supported:
    - numeric constants, strings, booleans, None
    - names from `names`
    - arithmetic: + - * / // % **
    - unary: + - not
    - comparisons: < <= > >= == !=
    - chained comparisons: 10e3 < f <= 1e6
    - boolean operators: and, or
    """
    tree = ast.parse(expr, mode="eval")

    def _eval(node):
        if isinstance(node, ast.Expression):
            return _eval(node.body)

        if isinstance(node, ast.Constant):
            return node.value

        if isinstance(node, ast.Name):
            if node.id in names:
                return names[node.id]
            raise NameError(f"Unknown name in condition: {node.id}")

        if isinstance(node, ast.BinOp):
            fn = _ALLOWED_BINOPS.get(type(node.op))
            if fn is None:
                raise ValueError(f"Operator not allowed: {type(node.op).__name__}")
            return fn(_eval(node.left), _eval(node.right))

        if isinstance(node, ast.UnaryOp):
            fn = _ALLOWED_UNARYOPS.get(type(node.op))
            if fn is None:
                raise ValueError(f"Unary operator not allowed: {type(node.op).__name__}")
            return fn(_eval(node.operand))

        if isinstance(node, ast.BoolOp):
            fn = _ALLOWED_BOOLOPS.get(type(node.op))
            if fn is None:
                raise ValueError(f"Boolean operator not allowed: {type(node.op).__name__}")
            return fn(_eval(v) for v in node.values)

        if isinstance(node, ast.Compare):
            left = _eval(node.left)
            for op_node, comparator in zip(node.ops, node.comparators):
                right = _eval(comparator)
                fn = _ALLOWED_CMPOPS.get(type(op_node))
                if fn is None:
                    raise ValueError(f"Comparison operator not allowed: {type(op_node).__name__}")
                if not fn(left, right):
                    return False
                left = right
            return True

        raise ValueError(f"Expression element not allowed: {type(node).__name__}")

    return _eval(tree)

def _stripquotes(s: str) -> str:
    """
    Strip quotes from a string.
    """
    return s.strip("'\"")


class DictObj(dict):
    """
    A dict with object-like attributes.

    Instead of `dct['name']` you can do `dct.name`
    """
    def __getattr__(self, name):
        try:
            return self.__getitem__(name)
        except KeyError:
            return super(DictObj, self).__getattr__(name)


class GName:
    """
    Helper class for :class:`MGraph`

    if *mg* is instance of :class:`MGraph` yo can access the 'physical' instrument name (that in the dot-file) with the
    'logical' device mame (that in your code) by mg.name.logical_name
    """
    def __init__(self, mginst: MGraph) -> None:
        self.mg = mginst     # the instance of the mgraph

    def __getattribute__(self, name: str) -> Any:
        try:
            attr = super(GName, self).__getattribute__(name)   # ensure other attributes still work
        except AttributeError:
            attr = self.mg.get_gname(name)   # the 'real' name
            if attr is None:
                raise AttributeError
        return attr


class Graph():
    """Graph class based on :mod:`pydot`.

       The graph is created using the methods (called in this order)

         - ``pydot.graph_from_dot_file``
         - ``pydot.graph_from_dot_data``
         - ``pydot.graph_from_edges``
         - ``pydot.graph_from_adjacency_matrix``
         - ``pydot.graph_from_incidence_matrix``

       with the argument of the ``__init__`` method.
    """
    def __init__(self, fname_or_data=None, SearchPaths=None):
        if SearchPaths is None:
            SearchPaths = [os.getcwd()]
        self.SearchPaths = SearchPaths

        methods = ('graph_from_dot_file',
                   'graph_from_dot_data',
                   'graph_from_edges',
                   'graph_from_adjacency_matrix',
                   'graph_from_incidence_matrix')
        dotgraph = None
        # self.dotcontents=None
        for m in methods:
            meth = getattr(pydot, m)
            try:
                if m == 'graph_from_dot_file':
                    if isinstance(fname_or_data, StringIO):
                        self.dotcontents = fname_or_data.read()  #
                        dotgraph = pydot.graph_from_dot_data(self.dotcontents)
                        break
                    # print self.SearchPaths, fname_or_data
                    try:
                        # print "Hey", self.instance_from_pickle
                        fname_or_data = next(locate(fname_or_data, paths=self.SearchPaths))  # first hit
                        """
                        if file was found update dotcontets from this file.
                        if not we maybe come from a pickle file and haven't found the graph
                        in this case we restore the graph from self.dotcontets -> except clause
                        """
                        self.dotcontents = (open(fname_or_data, 'r')).read()  #
                        dotgraph = meth(fname_or_data)
                    except StopIteration:  # not found
                        # print "Hey", self.instance_from_pickle
                        if hasattr(self, 'instance_from_pickle') and self.instance_from_pickle:
                            self.graph = dotgraph = pydot.graph_from_dot_data(self.dotcontents)  # TODO
                            return
                        else:
                            raise  # reraise
                else:
                    dotgraph = meth(fname_or_data)
            except (IOError, IndexError, StopIteration):
                continue
            else:
                break
        if dotgraph:
            self.graph = dotgraph[0]
            self.edges = self.graph.get_edges()
        else:
            raise RuntimeError("Graph could no be created")

    def __str__(self):
        return self.graph.to_string()

    def find_path(self, start, end, path=None):
        """Returns a path from *start* to *end*.
           Ignores edges with attribute `active==False`.
        """
        if path is None:
            path = []
        try:
            return self.find_all_paths(start, end, path)[0]   # first in the list
        except IndexError:   # no path
            return None

    def _active(self, edge_or_gnode):
        try:
            att = edge_or_gnode.get_attributes()
            act = att['active']
            return act
        except (AttributeError, KeyError):
            return True

    # def find_all_paths(self, start, end, path=None, edge=None):
    #     """
    #     Find all all_paths in graph from *start* to *end* (without circles).
    #     Ignores edges with attribute `active==False`.
    #     As user: allways call with `path=None` and `edges=None`; these are only used within recursion
    #     """
    #     # print 'enter:', start, end, path
    #     # path = path + [start]
    #     if path is None:
    #         path = []   # initialize list
    #     if edge:
    #         path.append(edge)  # edge given -> append to path
    #         # path = path + [edge]
    #         # print "added edge to path:", edge.get_source(), edge.get_destination(), path
    #     if start == end:  # end node reached
    #         # print "start==end: returning", [path]
    #         return [path]  # this is the end of the recursion
    #     all_paths = []   # initialize list of all paths
    #     # list of all edges with source==start
    #     start_edges = [e for e in self.edges if e.get_source() == start]
    #     for edge in start_edges:
    #         next_node = edge.get_destination()    # end of this edge
    #         gnode = self.graph.get_node(next_node)  # next node in underlying graph
    #         eact = self._active(edge)   # active flag of edge
    #         gact = self._active(gnode)  # active flag of next node
    #         is_active = eact and gact   # both active?
    #         if is_active and edge not in path:   # new active edge
    #             newpaths = self.find_all_paths(next_node, end, path, edge)   # start recursion
    #             # print "newpaths returned:", newpaths
    #             for newpath in newpaths:
    #                 all_paths.append(newpath)   # append to list of all paths
    #     # print 'exit:', all_paths
    #     return all_paths

    def find_all_paths(self, start, end, path=None, edge=None):
        """
        Find all all_paths in graph from *start* to *end* (without circles).
        Ignores edges with attribute `active==False`.
        As user: allways call with `path=None` and `edges=None`; these are only used within recursion
        """
        if path is None:
            path = []
        if edge:
            path.append(edge)

        if start == end:
            res = [path.copy()]  # Kopie, sonst später wieder kaputt
            if edge:
                path.pop()
            return res

        all_paths = []
        start_edges = [e for e in self.edges if e.get_source() == start]

        for edge2 in start_edges:
            next_node = edge2.get_destination()
            gnode = self.graph.get_node(next_node)
            is_active = self._active(edge2) and self._active(gnode)

            if is_active and edge2 not in path:
                all_paths.extend(self.find_all_paths(next_node, end, path, edge2))

        if edge:
            path.pop()
        return all_paths

    def get_common_parent(self, n1, n2):
        """
        Tries to find a node that is parent of *n1* and *n2*.
        Only active paths are valid.
        Returns that node or *None*.
        """
        # trivial cases
        if self.find_path(n1, n2):   # direct path n1->n2 => n1 is parent of both
            return n1
        if self.find_path(n2, n1):   # other direction but same case
            return n2
        # find for 'real' parent
        edges = [e for e in self.edges if e.get_destination() == n1]   # all edges ending in n1
        for edge in edges:
            parent_node = edge.get_source()
            gnode = self.graph.get_node(parent_node)
            eact = self._active(edge)
            gact = self._active(gnode)
            is_active = eact and gact
            if is_active:
                return self.get_common_parent(parent_node, n2)  # recursion; stops when direct path exists
        return None

    def find_shortest_path(self, start, end, path=None):
        """
        Returns the shortest (in terms number of elements in path) path from *start* to *end*.
        Ignores edges with attribute `active==False`.
        Returns shortest path from *start* to *end* or `None` if no path is found.
        As user: don't use the `path=` argument; used in recursion only
        """
        if path is None:
            path = []
        allpaths = self.find_all_paths(start, end, path)
        if allpaths:
            #return sorted(allpaths)[0]
            return min(allpaths, key=len)
        else:
            return None


[docs] class MGraph(Graph): """ Measurement graph class based of :class:`Graph`. See there for the argument of the :meth:`__init__` method. """
[docs] def __init__(self, fname_or_data=None, themap=None, SearchPaths=None): self.fname_or_data = fname_or_data self.map = themap if SearchPaths is None: SearchPaths = [os.getcwd()] super().__init__(fname_or_data, SearchPaths=SearchPaths) self.name = GName(self) try: self.graph = self.graph[0] # new version of pydot except TypeError: self.graph = self.graph # old version of pydot self.gnodes = self.graph.get_nodes() self.gedges = self.graph.get_edges() self.nodes = dict([[n.get_name(), {}] for n in self.gnodes]) nametonode = dict([[n.get_name(), n] for n in self.gnodes]) for n, dct in list(self.nodes.items()): dct['gnode'] = nametonode[n] self.activenodes = list(self.nodes.keys()) if themap is None: themap = {} self.map = themap # make map bijective self.bimap = dict(self.map) for k, v in list(themap.items()): try: self.bimap[v] = k except TypeError: # this happens if v is a list for _v in v: self.bimap[_v] = k self.instrumentation = None
def __setstate__(self, dct): """ used instead of __init__ when instance is created from pickle file """ self.instance_from_pickle = True if 'dotcontents' not in dct: dct['dotcontents'] = "digraph {sg->ant}" self.dotcontents = dct['dotcontents'] self.__init__(fname_or_data=dct['fname_or_data'], themap=dct['map'], SearchPaths=dct['SearchPaths']) def __getstate__(self): """ prepare a dict for pickling """ odict = {'fname_or_data': self.fname_or_data, 'map': self.map, 'SearchPaths': self.SearchPaths, 'dotcontents': self.dotcontents} return odict # def __getattribute__(self, name): # try: # attr=Graph.__getattribute__(self, name) # except AttributeError: # attr=self.get_gname(name) # if attr is None: # raise AttributeError # return attr
[docs] def get_gname(self, name: str) -> str | None: """ Tries to get the name of the device *name* in the dot file. Returns self.map[name] if *name* is key. Returns *name* if *name* is in self.bimap. Else returns None. """ if name in self.map: return self.map[name] elif name in self.bimap: return name else: return None
@staticmethod def _pr2ar(pr): """ Convert a Quantity with unit POWERRATIO (Dimensionless) to a Quantity with Unit AMPLITUDERATIO """ assert pr._unit == POWERRATIO pr._unit = ONE # yes, we know what we are doing ar = sqrt(pr) ar._unit = AMPLITUDERATIO return ar @staticmethod def _ar2pr(ar): """ Convert a Quantity with unit AMPLITUDERATIO (Dimensionless) to a Quantity with Unit POWERRATIO """ assert ar._unit == AMPLITUDERATIO return ar * ar
[docs] def get_path_correction(self, start, end, unit=None): """ Get the correction (S21) from *start* to *end*. If *unit* is None, an AMPLITUDERATIO is returned. *unit* has to be AMPLITUDERATIO or POWERRATIO (from mpylab.tools.aunits) """ if unit is None: unit = AMPLITUDERATIO assert unit in (AMPLITUDERATIO, POWERRATIO) if start == end: corr = Quantity(unit, 1) # trivial case return corr # start ne end... parent = self.get_common_parent(start, end) # find the parent node of start AND end. CAN be start... CAN be end .. if parent == start: # simple case corr = self.get_path_corrections(start, end, unit=unit)['total'] elif parent == end: # second-simplest case, but wrong direction -> inverted corr = 1.0 / self.get_path_corrections(end, start, unit=unit)['total'] else: # more complicated because no direct path between start and end OR end and start corr = self.get_path_corrections(parent, end, unit=unit)['total'] / \ self.get_path_corrections(parent, start, unit=unit)['total'] return corr
[docs] def get_path_corrections(self, start, end, unit=None): """ Returns a dict with the corrections for all edges from *start* to *end*. *unit* can be :data:`mpylab.tools.aunits.AMPLITUDERATIO` or :data:`mpylab.tools.aunits.POWERRATIO`. If *unit* is `None`, :data:`mpylab.tools.aunits.AMPLITUDERATIO` is used. The key 'total' gives the total correction. All corrections are :class:`scuq.quantities.Quantity` objects. """ if unit is None: unit = AMPLITUDERATIO assert unit in (AMPLITUDERATIO, POWERRATIO) result = {} all_paths = self.find_all_paths(start, end) # returns a list of (list of edges) # print all_paths ctx = Context() Total = Quantity(unit, 0.0) # init total path correction with 0 for p in all_paths: # p is a list of edges # totals in that path TotalPath = Quantity(unit, 1.0) # init total correction for this path for n in p: # for all edges in this path # print n n_attr = n.get_attributes() # dict with edge attributes if 'dev' in n_attr: # it's a real device # the edge device dev = str(n_attr['dev']) # edge device instance inst = self.nodes[dev]['inst'] # this is the device instance try: what = _stripquotes(str(n_attr['what'])) # has it a what attribute except KeyError: continue # no what -> nothing to do with this device try: stat = -1 for cmd in ['getData', 'GetData']: # print cmd if hasattr(inst, cmd): # print "Vor getattr", getattr(inst,cmd) stat, result[dev] = getattr(inst, cmd)(what) # try to read value of 'what' # print "Nach getattr", stat break if stat < 0: # has the attribute but getData failed raise UserWarning('Failed to getData: %s, %s' % (dev, what)) except AttributeError: # function not callable # raise UserWarning('Failed to getData %s, %s' % (dev, what)) # store the values unconverted # print dev, result[dev] r = result[dev] # .get_value(unit) # we have a result from the devive if unit == AMPLITUDERATIO and r._unit == POWERRATIO: r = self._pr2ar(r) # convert in this direction elif unit == POWERRATIO and r._unit == AMPLITUDERATIO: r = self._ar2pr(r) # or convert in the other direction elif (unit == POWERRATIO and r._unit == POWERRATIO) or \ (unit == AMPLITUDERATIO and r._unit == AMPLITUDERATIO): # nothing to convert pass else: raise RuntimeError("Unit Error") # should not happen TotalPath *= r # multiply the S21 .... # for different paths between two points, s parameters have # to be summed. # print TotalPath # for k,v in result.items(): # print k,v TotalPath = TotalPath.eval() TotalPath = TotalPath.reduce_to(unit) Total += TotalPath # in case of multiple paths: add it together ; 2 paths with S21=0.25 -> total = 0.5 # print start, end, Total try: result['total'] = Total.eval() except AttributeError: result['total'] = Total return result
[docs] def EvaluateConditions(self, doAction=True): """Set key *isActice* in nodes argument depending on the condition given in the graph. If *doAction* is `True` an action *act* defined in the edge attributed is executed using `exec str(act)` if the condition evaluates to `True`. The condition may refer to variables in the callers namespace, e.g. 'f'. """ __frame = inspect.currentframe() __outerframes = inspect.getouterframes(__frame) __caller = __outerframes[1][0] # loop all nodes for name, act_dct in list(self.nodes.items()): node = act_dct['gnode'] cond_dct = node.get_attributes() if 'condition' in cond_dct: stmt = _stripquotes(str(cond_dct['condition'])) names = {} names.update(__caller.f_globals) names.update(__caller.f_locals) cond = safe_condition_eval(stmt, names) if (cond is True) or (cond is bool_(True)): act_dct['active'] = True if doAction and 'action' in cond_dct: act = cond_dct['action'] safe_action_exec(act, names) else: act_dct['active'] = False else: act_dct['active'] = True # cond_dct = node.get_attributes() # dict with node or edge atributs # if 'condition' in cond_dct: # stmt = "(%s)" % _stripquotes(str(cond_dct['condition'])) # # print " Cond:", stmt, " = ", # cond = eval(stmt, __caller.f_globals, __caller.f_locals) # # print cond # if (cond is True) or (cond is bool_(True)): # act_dct['active'] = True # if doAction and 'action' in cond_dct: # act = cond_dct['action'] # # print str(act) # # print self.CallerLocals['f'] # # print act # exec(str(act)) # in self.CallerGlobals, self.CallerLocals # else: # act_dct['active'] = False # else: # act_dct['active'] = True self.activenodes = [name for name, dct in list(self.nodes.items()) if dct['active']] # loop all edges for edge in self.edges: act_dct = cond_dct = edge.get_attributes() if 'condition' in cond_dct: stmt = _stripquotes(str(cond_dct['condition'])) cond = safe_condition_eval(stmt, names) if (cond is True) or (cond is bool_(True)): act_dct['active'] = True if doAction and 'action' in cond_dct: act = cond_dct['action'] safe_action_exec(act, names) else: act_dct['active'] = False else: act_dct['active'] = True # # loop all edges # for edge in self.edges: # act_dct = cond_dct = edge.get_attributes() # if 'condition' in cond_dct: # stmt = "(%s)" % _stripquotes(str(cond_dct['condition'])) # # print " Cond:", stmt, " = ", # cond = eval(stmt, __caller.f_globals, __caller.f_locals) # # print cond # if (cond is True) or (cond is bool_(True)): # act_dct['active'] = True # if doAction and 'action' in cond_dct: # act = cond_dct['action'] # # print str(act) # # print self.CallerLocals['f'] # # print act # exec(str(act)) # in self.CallerGlobals, self.CallerLocals # else: # act_dct['active'] = False # else: # act_dct['active'] = True del __caller del __outerframes del __frame
[docs] def CreateDevices(self): """ Create instances of the devices found in the graph. Should be called once after creating the graph instance. - Sets attribute `active = True` for all nodes and edges - Reads the ini-file (if ini atrib is present) - Creates the device instances of all nodes and save the variable in the nodes dict (`nodes[key]['inst']`) Returns a dict with keys from the graphs nodes names and val are the device instances Can be used to create local references like so:: for k,v in ddict.items(): globals()['k']=v """ dev_map = {'signalgenerator': 'Signalgenerator', 'powermeter': 'Powermeter', 'switch': 'Switch', 'fieldprobe': 'Fieldprobe', 'cable': 'Cable', 'motorcontroller': 'Motorcontroller', 'tuner': 'Tuner', 'antenna': 'Antenna', 'nport': 'NPort', 'amplifier': 'Amplifier', 'step2port': 'SwitchedTwoPort', 'spectrumanalyzer': 'Spectrumanalyzer', 'vectornetworkanalyser': 'NetworkAnalyser', 'receiver': 'Receiver', 'vlisn': 'VLISN', 'custom': 'Custom'} devs = list(dev_map.keys()) ddict = DictObj() for name, dct in list(self.nodes.items()): obj = dct['gnode'] attribs = obj.get_attributes() for n, v in list(attribs.items()): attribs[n] = _stripquotes(v) # strip ' and " dct['active'] = True try: ini = dct['ini'] = next(locate(attribs['ini'], paths=self.SearchPaths)) # the ini file name # print ini except KeyError: ini = dct['ini'] = dct['inst'] = None # no ini file, no device continue # print "ini:", self.nodes dct['inidic'] = self.__parse_ini(ini) # parse the ini file and save it as dict in the attributes try: typetxt = dct['inidic']['description']['type'] except: raise UserWarning("No type found for node '%s'." % obj.get_name()) # create device instances d = None try: # fuzzy type matching... best_type_guess = fstrcmp(typetxt, devs, cutoff=0, ignorecase=True)[0] except IndexError: raise IndexError( 'Instrument type %s from file %s not in list of valid instrument types: %r' % (typetxt, ini, devs)) dtype = dev_map[best_type_guess] driver = dct['inidic']['description']['driver'] if dtype == 'Custom': cls = dct['inidic']['description']['class'] drvfile = next(locate(driver, self.SearchPaths)) # m = imp.load_source('m', drvfile) # Spezifikation aus der Datei erstellen spec = importlib.util.spec_from_file_location(driver, drvfile) # Modul aus der Spezifikation erstellen module = importlib.util.module_from_spec(spec) # Modul ausführen spec.loader.exec_module(module) # module = importlib.machinery.SourceFileLoader(driver, drvfile).load_module() d = getattr(module, cls)() else: modname = Path(driver) modname = modname.with_suffix('') # remove suffix modname = modname.name.lower() m = importlib.import_module(f'mpylab.device.{modname}') d = getattr(m, dtype.upper())(SearchPaths=self.SearchPaths) # d = getattr(device, dtype)(SearchPaths=self.SearchPaths) ddict[name] = dct['inst'] = d # save instances in nodes dict and in return value # self.CallerGlobals['d']=d # exec str(key)+'=d' in self.CallerGlobals # valiable in caller context # exec 'self.'+str(key)+'=d' # as member variable self.__dict__.update(ddict) for k, v in list(ddict.items()): if k in self.bimap: try: ddict[self.bimap[k]] = v except TypeError: # this happens if v is a list for _k in self.bimap[k]: ddict[_k] = v self.instrumentation = ddict return ddict
[docs] def NBTrigger(self, lst): """ Triggers all devices in list if possible (node exists, has dev instance, is active, and has Trigger method). Returns dict: keys->list items, vals->None or return val from Trigger method """ devices = [l for l in lst if l in self.activenodes] # intersept of list and activenodes result = {} for name in devices: attribs = self.nodes[name] if not attribs['active']: continue try: stat = attribs['inst'].Trigger() result[name] = stat except (KeyError, AttributeError): continue return result
def _Read(self, lst, result=None): """ Read the measurement results from devices in lst (list or string). Mode is blocking if result is None and NonBlocking else. Non blocking is finished when `len(result) = len(list)`. A dict is returned with keys from list and values from the device reading or `None`. """ if result is None: # blocking cmds = ('GetData', 'getData', 'ReadData') result = {} NB = False else: # none blocking cmds = ('GetData', 'getDataNB', 'ReadDataNB') NB = True if isinstance(lst, str): lst = [lst] devices = [l for l in lst if l in self.activenodes] # intersept of list and activenodes for n in devices: if NB and n in result: continue try: nattr = self.nodes[n] dev = nattr['inst'] except KeyError: result[n] = None else: c = -1 for cmd in cmds: try: c, val = getattr(dev, cmd)() # print "DEBUG:", dev, cmd, val except AttributeError: continue # try other command(s) else: break if c == 0: result[n] = val return result
[docs] def NBRead(self, lst, result): """ Non Blocking read. See :meth:`_Read`. """ return self._Read(lst, result)
[docs] def Read(self, lst): """ Blocking read. See :meth:`_Read`. """ return self._Read(lst)
[docs] def CmdDevices(self, IgnoreInactive, cmd, *args): """ Tries to send `cmd(*arg)` to all devices in graph. If *IgnoreInactice* is `True`, only active devices are used. Returns the sum of all status codes returned from the called methods, i.e. a return value of zero indicates success. Return error codes for all devices are stored in `self.nodes[str(n)]['ret']` and `self.nodes[str(n)]['err']`. """ devices = [name for name in list(self.nodes.keys()) if IgnoreInactive or name in self.activenodes] # intersept of list and activenodes cmd = str(cmd) serr = 0 for n in devices: attribs = self.nodes[n] if attribs['inst'] is None: # not a real device continue err = 0 stat = 0 dev = attribs['inst'] try: ans = getattr(dev, cmd)(*args) if isinstance(ans, tuple): stat = ans[0] else: stat = ans if (stat < 0): err = attribs['inst'].GetLastError() except AttributeError: pass self.nodes[str(n)]['ret'] = stat self.nodes[str(n)]['err'] = err serr += stat return serr
[docs] def Init_Devices(self, IgnoreInactive=False): """ Initialize all device. Raises :exc:`UserWarning` if a device fails to initialize. If `IgnoreInactive = False` (default), all devices are initialized, else only active devices are initialized. """ devices = [name for name in self.nodes if IgnoreInactive or name in self.activenodes] # intersept serr = 0 for n in devices: # print("Init %s ..."%str(n)) attribs = self.nodes[n] if attribs['inst'] is None: continue err = 0 stat = 0 ini = attribs['ini'] gattr = attribs['gnode'].get_attributes() ch = 1 for c in ('ch', 'channel'): try: ch = int(gattr[c]) except KeyError: continue dev = attribs['inst'] if (hasattr(dev, 'Init')): # print n #print("vor Init") stat = dev.Init(ini, ch) #print("nach Init") if (stat < 0): # print ini, ch err = dev.GetLastError() attribs['ret'] = stat attribs['err'] = err if stat < 0: raise UserWarning('Error while init of %s, err: %s' % (str(n), err)) serr += stat return serr
[docs] def Quit_Devices(self, IgnoreInactive=False): """ Quit all devices using :meth:`CmdDevices`. Input: `IgnoreInactive=False` Return: return val of :meth:`CmdDevices` """ return self.CmdDevices(IgnoreInactive, "Quit")
[docs] def SetFreq_Devices(self, freq, IgnoreInactive=True): """Set frequency on all (optionally active) devices and return min/max.""" minfreq = 1e100 maxfreq = -1e100 devices = [name for name in self.nodes if IgnoreInactive or name in self.activenodes] # intersept for n in devices: attribs = self.nodes[n] if attribs['inst'] is None: continue err = 0 dev = attribs['inst'] if (hasattr(dev, 'SetFreq')): err, f = dev.SetFreq(freq) minfreq = min(minfreq, f) maxfreq = max(maxfreq, f) attribs['ret'] = f attribs['err'] = err return (minfreq, maxfreq)
[docs] def ConfReceivers(self, conf, IgnoreInactive=True): """ Configures all SA/Receivers in Graph. Input: - *conf*: a dict with keys from `('rbw', 'vbw', 'att', 'preamp', 'reflevel', 'detector', 'tracemode', 'sweeptime', 'sweepcount', 'span')` and values for these parameters. If a key, val pair exists in *conf*, we try to set this parameter. If the a key is not in *conf*, or if the value is missing (`None`), we try to read the val from the instrument. - *IgnoreInactive*: flag to ignore devices marked as inactive Return: - `rdict`: a dict of dicts with `rdict[node][key] = val` mapping """ parlist = ('rbw', 'vbw', 'att', 'preamp', 'reflevel', 'detector', 'tracemode', 'sweeptime', 'sweepcount', 'span') set_names = ('SetRBW', 'SetVBW', 'SetAtt', 'SetPreAmp', 'SetRefLevel', 'SetDetector', 'SetTraceMode', 'SetSweepTime', 'SetSweepCount', 'SetSpan') get_names = ('GetRBW', 'GetVBW', 'GetAtt', 'GetPreAmp', 'GetRefLevel', 'GetDetector', 'GetTraceMode', 'GetSweepTime', 'GetSweepCount', 'GetSpan') rdict = {} devices = [name for name in self.nodes if IgnoreInactive or name in self.activenodes] # intersept for n in devices: attribs = self.nodes[n] if attribs['inst'] is None: continue # not a device err = 0 dev = attribs['inst'] if not hasattr(dev, set_names[0]): continue # not a spectrumanalyzer # ok, a spec analyzer rdict[str(n)] = {} for index, par in enumerate(parlist): if par in conf: val = conf[par] else: val = None if hasattr(dev, set_names[index]) and val is not None: try: err, val = getattr(dev, set_names[index])(val) except TypeError: err, val = getattr(dev, set_names[index])(*val) rdict[str(n)][par] = val elif hasattr(dev, get_names[index]): err, val = getattr(dev, get_names[index])() rdict[str(n)][par] = val return rdict
[docs] def Zero_Devices(self, IgnoreInactive=True): """ Zero all devices using CmdDevices Input: IgnoreInactive=True Return: return val of CmdDevices """ return self.CmdDevices(IgnoreInactive, "Zero", 1)
[docs] def RFOn_Devices(self, IgnoreInactive=True): """ RFOn all devices using CmdDevices Input: IgnoreInactive=True Return: return val of CmdDevices """ return self.CmdDevices(IgnoreInactive, "RFOn")
[docs] def RFOff_Devices(self, IgnoreInactive=False): """ RFOff all devices using CmdDevices Input: IgnoreInactive=False Return: return val of CmdDevices """ return self.CmdDevices(IgnoreInactive, "RFOff")
[docs] def Trigger_Devices(self, IgnoreInactive=True): """ Trigger all devices using CmdDevices Input: IgnoreInactive=True Return: return val of CmdDevices """ return self.CmdDevices(IgnoreInactive, "Trigger")
[docs] def getBatteryLow_Devices(self, IgnoreInactive=True): """ Get a list of all devices in the graph with a low battery state Input: IgnoreInactive=True Return: list of nodes with low battery state """ lowBatList = [] # print self.nodes.items() devices = [name for name in self.nodes if IgnoreInactive or name in self.activenodes] # intersept for n in devices: attribs = self.nodes[n] if attribs['inst'] is None: continue err = 0 dev = attribs['inst'] if hasattr(dev, 'getBatteryState'): # print "check bat state for node ", n stat, bat = dev.getBatteryState() if stat < 0: err = dev.GetLastError() elif bat < 0: # Low lowBatList.append(n) attribs['ret'] = bat attribs['err'] = err return lowBatList
[docs] def GetAntennaEfficiency(self, node): """ Get the antenna efficiency of an antenna connected to node. Input: node, the node to which the antenna is connected. Typically this is a 'virtual' node in the graph, e.g. 'ant' to which the real antennas are connected. Return: antenna efficiency of the first active , real antenna connected to 'node' None is returned if no antenna is found """ eta = None cmds = ('getData', 'GetData') # look for an antenna connected to 'node' ... devices = [n for n in self.nodes if n in self.activenodes] for n in devices: attribs = self.nodes[n] if attribs['inst'] is None: continue # not a real device if not attribs['inidic']['description']['type'] in ('antenna', 'ANTENNA'): continue # n is not an antenna # a real, active antenna if self.find_path(n, node) or self.find_path(node, n): # ok, there is a coonection to our node try: stat = -1 inst = attribs['inst'] for cmd in cmds: if hasattr(inst, cmd): stat, result = getattr(inst, cmd)('EFF') break if stat == 0: eta = result break except AttributeError: # function not callable pass return eta
[docs] def AmplifierProtect(self, start, end, startlevel, sg_unit=WATT, typ='save'): """Check amplifier safety margins along all active paths.""" isSafe = True msg = '' if not isinstance(startlevel, Quantity): startlevel = Quantity(sg_unit, startlevel) allpaths = self.find_all_paths(start, end) for path in allpaths: # path is a list of edges edges = [] for p in path: left = p.get_source() right = p.get_destination() edges.append((left, right, p)) for left, right, edge in edges: try: edge_dev = edge.get_attributes()['dev'] attribs = self.nodes[edge_dev] except KeyError: continue if attribs['inst'] is None: continue err = 0 if attribs['active']: dev = attribs['inst'] cmds = ['getData', 'GetData'] stat = -1 for cmd in cmds: # print hasattr(dev, cmd) if hasattr(dev, cmd): # at the moment, we only check for MAXIN what = ['MAXIN'] # ['MAXIN', 'MAXFWD', 'MAXBWD'] for w in what: stat = 0 try: stat, result = getattr(dev, cmd)(w) except AttributeError: # function not callable # print "attrErr" continue if stat != 0: # print stat continue # ok we have a value that can be checked corr = self.get_path_correction(start, left, POWERRATIO) # for _k,_v in corr.items(): # print "corr[%s]:"%_k, _v # print "Startlevel:", startlevel level = corr * startlevel level = level.reduce_to(result._unit) # print "Level:", level # print "What = '%s', Level = %s, Max = %s\n"%(w, str(level), str(result)) # if typ=='lasy': notsafe = (abs(level.get_expectation_value()) > abs(result.get_expectation_value())) # elif typ=='save': # condition=level.get_u() > result.get_l() #be safe: errorbars overlap # else: # condition=level.get_u() > result.get_l() #be safe: errorbars overlap if notsafe: isSafe = False msg += "Amplifier Pretection failed for node '%s'. What = '%s', Level = %s, Max = %s, Startlevel = %s, Corr = %s\n" % ( edge_dev, w, level, result, startlevel, corr) break return isSafe, msg
[docs] def MaxSafeLevel(self, start, end, typ='save'): """Return the minimum safe start level across all active paths.""" levels = [] allpaths = self.find_all_paths(start, end) for path in allpaths: # path is a list of edges edges = [] for p in path: left = p.get_source() right = p.get_destination() edges.append((left, right, p)) for left, right, edge in edges: try: edge_dev = edge.get_attributes()['dev'] attribs = self.nodes[edge_dev] except KeyError: continue if attribs['inst'] is None: continue err = 0 if attribs['active']: dev = attribs['inst'] cmds = ['getData', 'GetData'] stat = -1 for cmd in cmds: # print hasattr(dev, cmd) if hasattr(dev, cmd): # at the moment, we only check for MAXIN what = ['MAXIN'] # ['MAXIN', 'MAXFWD', 'MAXBWD'] for w in what: stat = 0 try: stat, result = getattr(dev, cmd)(w) except AttributeError: # function not callable # print "attrErr" continue if stat != 0: # print stat continue # ok we have a value that can be checked corr = self.get_path_correction(start, left, POWERRATIO) level = result / corr # level at start level = level.reduce_to(result._unit) levels.append(level) if not len(levels): return None else: return min(absolute(levels))
[docs] def CalcLevelFrom(self, sg, limiter, what): """Validate nodes/path for level calculation and return status placeholder.""" if sg not in self.nodes: raise UserWarning('Node not in nodes: %s' % sg) if limiter not in self.nodes: raise UserWarning('Node not in nodes: %s' % limiter) if not len(self.find_all_paths(sg, limiter)): raise UserWarning('Nodes not connected') il = self.get_path_correction(sg, limiter, POWERRATIO) return 0
def __parse_ini(self, ini): def readConfig(filename): """ Read config data *** Here add for doc the format of the data *** Return configVals """ configVals = configparser.ConfigParser() if hasattr(filename, 'readline'): # file like object configVals.read_file(filename) else: configVals.read(filename) # filename return (configVals) def makeDict(configData): """ create a dict from a Config file """ d = {} for section in configData.sections(): s = section.lower() d[s] = {} for option in configData.options(section): o = option.lower() d[s][o] = configData.get(section, option) return (d) # umdpath = getUMDPath() # _ini = GetFileFromPath (ini, umdpath) # if _ini is None: # raise "Ini file '%s' not found. Path is '%s'"%(ini,umdpath) v = readConfig(ini) return makeDict(v)
class Leveler(object): """Iterative level control helper based on measurement-graph corrections.""" def __init__(self, mg, actor, output, lpoint, observer, pin=None, datafunc=None, min_actor=None): """ mg: MGraph instance actor: name of device in mg. device instance has to have SetLevel method output: name of output device: path from actor to output is checked for MaxSafeLevel lpoint: name of point where a specific value has to be reached observer: name of device where lpoint is observed """ if min_actor is None: self.min_actor = Quantity(WATT, 1e-13) # -100 dBm else: if isinstance(min_actor, Quantity): self.min_actor = min_actor else: self.min_actor = Quantity(WATT, min_actor) self.mg = mg self.actor = actor self.sg = getattr(mg, actor) self.pm = getattr(mg, observer) self.lpoint = lpoint self.output = output self.observer = observer self.MaxSafe = abs(mg.MaxSafeLevel(actor, output).get_expectation_value()) self.actorunit = self.MaxSafe._unit self.lpointunit = None self.corr = None self.samples = {} if pin is None: pin = [fac * self.MaxSafe for fac in (0.001, 0.01, 0.1)] # [Quantity(WATT, 1e-6),Quantity(WATT, 1e-4)]# if datafunc is None: self.datafunc = lambda x: x else: self.datafunc = datafunc self.add_samples(pin) self.update_interpol() def add_samples(self, pin): """Add one or more actor-level samples and measured observation values.""" if not hasattr(pin, '__iter__'): pin = [pin] pinr = [] for pi in pin: if pi > self.MaxSafe: continue if pi < self.min_actor: pi = self.min_actor pinr.append(pi) self.sg.SetLevel(pi) pikey = abs(pi) pikey = pikey.get_expectation_value_as_float() self.pm.Trigger() err, obs = self.pm.GetData() obs = self.datafunc(obs) self.corr = self.mg.get_path_correction(self.observer, self.lpoint, POWERRATIO) lpoint = obs * self.corr lpoint = lpoint.reduce_to(obs._unit) lpoint = abs(lpoint) if not self.lpointunit: self.lpointunit = lpoint._unit assert (self.lpointunit == lpoint._unit) self.samples[pikey] = lpoint.get_expectation_value_as_float() self.update_interpol() return pinr def update_interpol(self): """Rebuild forward and inverse interpolators from current sample set.""" x = sorted(self.samples) y = [self.samples[xi] for xi in x] self.interp = interp1d(x, y) self.extrap = extrap1d(self.interp) self.i_interp = interp1d(y, x) self.i_extrap = extrap1d(self.i_interp) def adjust_level(self, soll, maxiter=10, relerr=0.01): """Iteratively adjust actor level to reach requested target quantity.""" # self.add_samples(soll/self.amp.g) sf = soll.get_value(self.lpointunit) sf = float(abs(sf)) safemax = self.MaxSafe.get_expectation_value_as_float() # self.x=[] # self.y=[] for i in range(maxiter): inval = self.i_extrap(sf)[0] pin = Quantity(self.actorunit, min(inval, safemax)) pin = self.add_samples(pin)[0] pout = Quantity(self.lpointunit, self.samples[pin.get_expectation_value_as_float()]) re = abs(pout - soll) / soll re = re.reduce_to(ONE) # self.x.append(pin) # self.y.append(pout) # print i, pin, pout, soll, re if re.get_expectation_value_as_float() <= relerr: break if (pin >= self.MaxSafe) and (pout <= soll): break return pin, pout if __name__ == '__main__': dotstr = """digraph { a -> b b -> d a -> c c -> e e -> d }""" names = {v: v for v in 'abcde'} mg = MGraph(fname_or_data=dotstr, themap=names) print("all paths a->d:", mg.find_all_paths('a', 'd')) try: sp = mg.find_shortest_path('a', 'd') print("shortest path a->d:", sp) except Exception as e: print("find_shortest_path failed:", e)