Skip to content
Snippets Groups Projects
Commit 9e332661 authored by bav6096's avatar bav6096
Browse files

preparation for developed expressions

parent b10656bb
Branches
No related tags found
No related merge requests found
<launch>
<node pkg="visualising" type="visualiser" name="visualiser">
<param name="/wdg_freq" value="2.0" type="double" />
<param name="/vis_freq" value="2.0" type="double" />
<param name="/arduino_port" value="/ttyUSB0" type="str" />
<param name="/arduino_baud" value="57600" type="int" />
<!-- set visualisation strategy -->
<param name="/visualisation_strategy" value="1" type="int" />
<!-- example of how to set an aggregation strategy for a metric -->
<rosparam param="aggregation_strategy_metrics">
example_1: 10
example_2: 10
test_domain_1: 0
test_domain_2: 0
</rosparam>
<!-- set the aggregation strategy for domains -->
<param name="aggregation_strategy_domains" value="0" type="int" />
</node>
</launch>
<launch>
<include file="$(find visualising)/launch/visualiser.launch" />
</launch>
\ No newline at end of file
#!/usr/bin/env python #!/usr/bin/env python
import rospy import rospy
from visualising.visualiser import Visualiser
def visualiser():
rospy.init_node("visualiser")
node = Visualiser()
rate = rospy.Rate(10)
while not rospy.is_shutdown():
rate.sleep()
from visualising.visualiser import Visualiser
if __name__ == '__main__': if __name__ == '__main__':
try: try:
visualiser() rospy.init_node("visualiser")
Visualiser()
rospy.spin()
except rospy.ROSInterruptException: except rospy.ROSInterruptException:
pass pass
import numpy as np
class Emotion:
def __init__(self, arduino):
self.arduino = arduino
@staticmethod
def state_to_color(state):
red = np.array([255, 0, 0])
white = np.array([255, 255, 255])
return (1 - state) * white + state * red
def react(self, state):
color = Emotion.state_to_color(state)
if state > 0.7:
self.happy(color)
else:
if state > 0.3:
self.ok(color)
else:
self.angry(color)
def happy(self, color):
def ok(self, color):
def angry(self, color):
#!/usr/bin/env python
import rospy
from monitoring.msg import Monitoring
class Watchdog:
def __init__(self):
self.sub = rospy.Subscriber("/monitoring", Monitoring, self.update_log)
# Stores the most current values of each metric.
self.log = {}
# Strategies that specify how the critical level of each metric should be aggregated in a domain.
# It is possible to specify a different aggregation strategy for every single domain.
self.aggregation_strategy_metrics = rospy.get_param("/visualiser/aggregation_strategy_metrics")
# Dictionary that stores the aggregated critical levels of the metrics of a domain.
self.aggregated_metrics = {}
# Strategy that specifies how the critical levels of all domains should be aggregated.
self.aggregation_strategy_domains = rospy.get_param("/visualiser/aggregation_strategy_domains")
# Variable that stores the aggregated critical level of all domains.
self.aggregated_domains = None
# Frequency at which the metrics in the log are aggregated.
wdg_freq = rospy.get_param("/visualiser/wdg_freq", 1.0)
if not wdg_freq > 0.0:
rospy.logwarn(
"Watchdog: The frequency at which the metrics of the robot are aggregated must be greater then 0! "
"Using 1 as frequency!")
wdg_freq = 1.0
self.timer = rospy.Timer(rospy.Duration(1.0 / wdg_freq), self.update_condition)
# TODO: Think about efficiency!
def update_log(self, monitoring):
origin = monitoring.origin
metric = monitoring.metric
domain = metric.domain
label = metric.label
value = metric.value
unit = metric.unit
error = metric.error
if domain in self.log:
if origin in self.log[domain]:
self.log[domain][origin].update({label: {"value": value, "unit": unit, "error": error}})
else:
self.log[domain][origin] = {}
self.log[domain][origin][label] = {"value": value, "unit": unit, "error": error}
else:
self.log[domain] = {}
self.log[domain][origin] = {}
self.log[domain][origin][label] = {"value": value, "unit": unit, "error": error}
def update_condition(self, event):
self.aggregate_metrics()
self.aggregate_domains()
# TODO: Add more modes!
def aggregate_metrics(self):
for domain in self.log:
if domain in self.aggregation_strategy_metrics and self.aggregation_strategy_metrics[domain] != 0:
# Strategy 1: Take the highest error level of any metric in a domain.
if self.aggregation_strategy_metrics[domain] == 1:
highest_error = 0.0
for origin in self.log[domain]:
for label in self.log[domain][origin]:
local_error = self.log[domain][origin][label]["error"]
if local_error > highest_error:
highest_error = local_error
self.aggregated_metrics.update({domain: highest_error})
# Strategy 2: Take the lowest error level of any metric in a domain.
elif self.aggregation_strategy_metrics[domain] == 2:
lowest_error = 1.0
for origin in self.log[domain]:
for label in self.log[domain][origin]:
local_error = self.log[domain][origin][label]["error"]
if local_error < lowest_error:
lowest_error = local_error
self.aggregated_metrics.update({domain: lowest_error})
else:
rospy.logwarn("Watchdog: The specified method for aggregating metrics is not recognised!")
# Default strategy: Take the average error level of every metric in a domain.
else:
count = 0.001
aggregated_error = 0.0
for origin in self.log[domain]:
for label in self.log[domain][origin]:
count = count + 1
aggregated_error = aggregated_error + self.log[domain][origin][label]["error"]
quotient = aggregated_error / count
self.aggregated_metrics.update({domain: quotient})
# TODO: Add more modes!
def aggregate_domains(self):
# Strategy 1: Take the highest error level of any domain.
if self.aggregation_strategy_domains == 1:
highest_error = 0.0
for domain in self.aggregated_metrics:
local_error = self.aggregated_metrics[domain]
if local_error > highest_error:
highest_error = local_error
self.aggregated_domains = highest_error
# Strategy 2: Take the lowes error level of any domain.
elif self.aggregation_strategy_domains == 2:
lowest_error = 1.0
for domain in self.aggregated_metrics:
local_error = self.aggregated_metrics[domain]
if local_error < lowest_error:
lowest_error = local_error
self.aggregated_domains = lowest_error
# Default strategy: Take the average error level of every domain.
else:
count = 0.001
aggregated_error = 0.0
for domain in self.aggregated_metrics:
count = count + 1
aggregated_error = aggregated_error + self.aggregated_metrics[domain]
quotient = aggregated_error / count
self.aggregated_domains = quotient
...@@ -2,31 +2,33 @@ ...@@ -2,31 +2,33 @@
import rospy import rospy
from monitoring.watchdog import Watchdog from visualising.monitoring.watchdog import Watchdog
from visualising.arduino import Arduino from visualising.communication.arduino import Arduino
from visualising.connection import Connection, ArduinoException from visualising.communication.channel.connection import Connection
from visualising.animation import Animation
class Visualiser: class Visualiser:
def __init__(self): def __init__(self):
port = rospy.get_param("/arduino_port", "/dev/ttyUSB0") port = rospy.get_param("/visualiser/arduino_port")
baud = rospy.get_param("/arduino_baud", 57600) baud = rospy.get_param("/visualiser/arduino_baud")
self.visualisation_strategy = rospy.get_param("/visualisation_strategy", 1) freq = rospy.get_param("/visualiser/vis_freq")
self.arduino = Arduino(Connection(port, baud)) # self.arduino = Arduino(Connection(port, baud))
self.watchdog = Watchdog() self.watchdog = Watchdog()
def visualise(self): if rospy.get_param("/visualiser/visualisation_strategy") == 1:
if self.visualisation_strategy == 1: self.expression = None
self.mood_mode()
elif self.visualisation_strategy == 2:
self.info_mode()
else: else:
rospy.logwarn("Visualiser: The specified strategy is not recognised by the visualiser!") self.expression = None
def info_mode(self): if not freq > 0.0:
print("Hallo!") rospy.logwarn(
"Visualiser: The frequency at which the visualisation of the robot state is updated must be greater "
"than 0! Using 1 as frequency!")
freq = 1.0
def stream_animation(self, animation): self.timer = rospy.Timer(rospy.Duration(1.0 / freq), self.visualise)
self.arduino.stream_animation(animation)
def visualise(self, event):
# self.expression.react()
rospy.logwarn(self.watchdog.aggregated_domains)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment