Skip to content
Snippets Groups Projects
Select Git revision
2 results Searching

HelperFunctions.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    monitor.py 2.78 KiB
    #!/usr/bin/env python
    import rospy
    from monitoring.msg import Metric
    
    
    class Monitor:
        def __init__(self, mode, freq):
            self.log = {}  # Logging dictionary. Is used to accomplish different publishing modes.
            self.pub = rospy.Publisher('metric', Metric, queue_size=1)
            self.mode = mode
            self.metric = Metric()
    
            if not freq > 0:
                rospy.logwarn("Frequency must be greater then 0! Using 1 as frequency!")
                freq = 1
    
            self.timer = rospy.Timer(rospy.Duration(1.0 / freq), self.publish_metric)
    
        def reset_metric(self):
            self.metric = Metric()
    
        def publish_metric(self):
            self.pub.publish(self.metric)
            self.reset_metric()
    
        def update_metric(self, key, val, unit, err_lvl):
            def set_metric():
                self.metric.key = key
                self.metric.val = val
                self.metric.unit = unit
                self.metric.err_lvl = err_lvl
    
            def rst_key():  # Reset the value of a key in the logging dictionary.
                self.log[key] = {'num': 0, 'val': 0, 'sum': 0, 'dur': rospy.get_rostime()}
    
            if " " in key:  # A key cannot contain whitespace!
                rospy.logwarn("Whitespaces are not allowed in metric keys!")
            else:
                self.reset_metric()  # Reset metric.
    
                if key in self.log:
                    # Mode 1: Set the last obtained value.
                    if self.mode == 1:
                        set_metric()
    
                    # Mode 2: Remember and set the minimum value,
                    #         obtained after mode selection.
                    elif self.mode == 2:
                        if val < self.log[key]['val']:
                            self.log[key]['val'] = val
                        else:
                            val = self.log[key]['val']
    
                        set_metric()
    
                    # Mode 3: Remember and set the maximum value,
                    #         obtained after mode selection.
                    elif self.mode == 3:
                        if val > self.log[key]['val']:
                            self.log[key]['val'] = val
                        else:
                            val = self.log[key]['val']
    
                        set_metric()
    
                    # Mode 4: Set the average obtained value over five
                    #         seconds.
                    elif self.mode == 4:
                        dur = rospy.get_rostime() - self.log[key]['dur']
                        if dur < rospy.Duration(5):
                            self.log[key]['num'] += 1
                            self.log[key]['sum'] += val
                        else:
                            val = self.log[key]['sum'] / (self.log[key]['num'] + 0.001)
                            set_metric()
                            rst_key()
                else:
                    rst_key()  # Add key to the logging dictionary.
                    set_metric()