Skip to content
Snippets Groups Projects
Commit a8040a53 authored by bav6096's avatar bav6096
Browse files

refactoring

parent 18964bde
Branches main
No related tags found
No related merge requests found
Showing
with 215 additions and 187 deletions
<launch>
<node pkg="visualising" type="visualiser" name="visualiser">
<!-- frequency at which the watchdog aggregates metrics and domains -->
<param name="/wdg_freq" value="0.2" type="double" />
<!--
frequency at which the visualisation of the robot state is updated,
must be lesser than 0.2 and only is relevant for the emotion
visualisation
-->
<param name="/vis_freq" value="0.2" type="double" />
<param name="/vis_freq" value="0.1" type="double" />
<param name="/arduino_port" value="/dev/ttyUSB0" type="str" />
<param name="/arduino_baud" value="57600" type="int" />
......@@ -15,7 +8,7 @@
<!-- set visualisation strategy -->
<!-- 1: emotion -->
<!-- 2: abstract -->
<param name="/visualisation_strategy" value="1" type="int" />
<param name="/visualisation_strategy" value="2" type="int" />
<!-- example of how to set an aggregation strategy for a metric -->
<!-- strategy 1 : Take the highest error level of any metric in a domain. -->
......@@ -34,6 +27,6 @@
<!-- strategy 1 : Take the highest error level of any domain. -->
<!-- strategy 2 : Take the lowes error level of any domain. -->
<!-- strategy 0 | default strategy : Take the average error level of every domain. -->
<param name="aggregation_strategy_domains" value="2" type="int" />
<param name="aggregation_strategy_domains" value="0" type="int" />
</node>
</launch>
#!/usr/bin/env python
import argparse
import time
from visualising.expression.tool import Tool
from visualising.expression.abstract import Abstract
from visualising.communication.animation.animation import Animation
from visualising.communication.connection import Connection
from visualising.communication.arduino import Arduino
from visualising.communication.illustration.animation import Animation
from visualising.communication.channel.connection import Connection
from visualising.expression.abstract import Abstract
parser = argparse.ArgumentParser(description="script to play an animation")
parser.add_argument("-p", "--port", help="port to which the Arduino is connected", type=str, default="/dev/ttyUSB0")
......@@ -20,17 +17,13 @@ port = args["port"]
baud = args["baud"]
delay = args["time"]
color = [10, 10, 10]
arduino = Arduino(Connection(port, baud))
states = [0, 1.0, 0.10, 0.15, 0.20, 0.85, 0.30, 0.35, 0.40, 0.45, 0.50, 0.55, 0.60, 0.65,
0.70, 0.75, 0.80, 0.25, 0.90, 0.05, 0.95]
text = ["A", "CPU", "A", "A", "A", "ARM", "A", "A", "A", "A", "A", "A", "A", "A",
name = ["A", "CPU", "A", "A", "A", "ARM", "A", "A", "A", "A", "A", "A", "A", "A",
"A", "A", "A", "A", "RAM", "A", "CAMERA"]
ensemble = Abstract.cycle(states)
time.sleep(10)
error = [0, 1.0, 0.10, 0.15, 0.20, 0.85, 0.30, 0.35, 0.40, 0.45, 0.50, 0.55, 0.60, 0.65,
0.70, 0.75, 0.80, 0.25, 0.90, 0.05, 0.95]
arduino.stream_animations(Abstract.highlight(ensemble, states, text, 4))
# arduino.stream_animation(Animation([ensemble], 50, 2))
ensemble = Abstract.cycle(error)
arduino.stream_animations(Abstract.highlight(ensemble, name, error, 4))
......@@ -2,16 +2,17 @@
import argparse
from visualising.expression.tool import Tool
from visualising.expression.emotion import Emotion
from visualising.communication.animation.color.rgb import RGB
from visualising.communication.animation.animation import Animation
from visualising.communication.connection import Connection
from visualising.communication.arduino import Arduino
from visualising.communication.channel.connection import Connection
from visualising.communication.illustration.color.rgb import RGB
from visualising.expression.library.raw import Raw
from visualising.expression.emotion import Emotion
parser = argparse.ArgumentParser(description="script to play an animation")
parser.add_argument("-p", "--port", help="port to which the Arduino is connected", type=str, default="/dev/ttyUSB0")
parser.add_argument("-b", "--baud", help="baud rate of the connection", type=int, default=57600)
parser.add_argument("-f", "--file", help="file in the library directory to be played", type=str, required=True)
parser.add_argument("-f", "--file", help="emotion to be played", type=str, required=True)
parser.add_argument("-t", "--time", help="time between ensembles", type=int, default=100)
args = vars(parser.parse_args())
......@@ -20,16 +21,10 @@ baud = args["baud"]
file = args["file"]
time = args["time"]
# 50, 10, 10
color = RGB(10, 0, 0)
print(color.r)
print(color.g)
print(color.b)
arduino = Arduino(Connection(port, baud))
ensembles = Tool.create_ensembles(file, "emotion")
arduino.stream_animation(Emotion.build_emotion_parallel(ensembles[0], time, color))
ensembles = Raw.to_ensemble_list(file, "emotion")
animation = Animation(Emotion.build_parallel(ensembles[0], color), time, 1)
arduino.stream_animation(animation)
......@@ -2,25 +2,26 @@
import argparse
from visualising.expression.tool import Tool
from visualising.communication.connection import Connection
from visualising.communication.arduino import Arduino
from visualising.communication.channel.connection import Connection
from visualising.expression.library.raw import Raw
parser = argparse.ArgumentParser(description="script to play an animation")
parser.add_argument("-p", "--port", help="port to which the Arduino is connected", type=str, default="/dev/ttyUSB0")
parser.add_argument("-b", "--baud", help="baud rate of the connection", type=int, default=57600)
parser.add_argument("-f", "--file", help="file in the library directory to be played", type=str, required=True)
parser.add_argument("-f", "--file", help="file to be played", type=str, required=True)
parser.add_argument("-d", "--dict", help="folder containing the file to be played", type=str, required=True)
parser.add_argument("-t", "--time", help="time between ensembles", type=int, default=100)
parser.add_argument("-i", "--iterations", help="number of iterations", type=int, default=1)
parser.add_argument("-i", "--iter", help="number of iterations", type=int, default=1)
args = vars(parser.parse_args())
port = args["port"]
baud = args["baud"]
file = args["file"]
dict = args["dict"]
time = args["time"]
iter = args["iterations"]
iter = args["iter"]
arduino = Arduino(Connection(port, baud))
animation = Tool.create_animation(file, "alphabet", time, iter)
animation = Raw.to_animation(file, dict, time, iter)
arduino.stream_animation(animation)
......@@ -2,8 +2,8 @@
import argparse
from visualising.communication.connection import Connection
from visualising.communication.arduino import Arduino
from visualising.communication.channel.connection import Connection
parser = argparse.ArgumentParser(description="script to reset the NeoPixel rings")
parser.add_argument("-p", "--port", help="port to which the Arduino is connected", type=str, default="/dev/ttyUSB0")
......@@ -14,5 +14,4 @@ port = args["port"]
baud = args["baud"]
arduino = Arduino(Connection(port, baud))
arduino.reset_display()
......@@ -2,23 +2,25 @@
import argparse
from visualising.expression.tool import Tool
from visualising.communication.animation.animation import Animation
from visualising.communication.connection import Connection
from visualising.communication.arduino import Arduino
from visualising.communication.illustration.animation import Animation
from visualising.communication.channel.connection import Connection
from visualising.expression.abstract import Abstract
parser = argparse.ArgumentParser(description="script to play an animation")
parser.add_argument("-p", "--port", help="port to which the Arduino is connected", type=str, default="/dev/ttyUSB0")
parser.add_argument("-b", "--baud", help="baud rate of the connection", type=int, default=57600)
parser.add_argument("-t", "--time", help="time between ensembles", type=int, default=100)
parser.add_argument("-t", "--time", help="time between ensembles", type=int, default=1000)
parser.add_argument("-w", "--word", help="word to be displayed", type=str, required=True)
parser.add_argument("-r", "--ring", help="ring that is supposed to represent the word", type=str, default="r")
args = vars(parser.parse_args())
port = args["port"]
baud = args["baud"]
time = args["time"]
word = args["word"]
ring = args["ring"]
arduino = Arduino(Connection(port, baud))
animation = Tool.write("HALLONORMAN")
animation = Abstract.write(word, ring)
arduino.stream_animation(Animation(animation, time, 1))
#!/usr/bin/env python
class Animation:
# The ensemble time is given in milliseconds.
def __init__(self, ensembles, ensemble_time=0, num_iter=1):
def __init__(self, ensembles, ensemble_time, num_iter):
self.ensembles = ensembles
self._ensemble_time = ensemble_time
self._num_iter = num_iter
......
#!/usr/bin/env python
from visualising.communication.illustration.color.color import Color
from visualising.communication.animation.color.color import Color
class RGB(Color):
......
#!/usr/bin/env python
import re
import importlib.resources
from visualising.illustration.animation.color.rgb import RGB
from visualising.illustration.animation.pixel import Pixel
from visualising.illustration.animation.frame import Frame
from visualising.communication.animation.color.rgb import RGB
from visualising.communication.animation.pixel import Pixel
from visualising.communication.animation.frame import Frame
class Ensemble:
......@@ -13,6 +10,14 @@ class Ensemble:
self.l_frame = l_frame
self.r_frame = r_frame
def color(self, index):
if not 0 <= index <= 31:
raise ValueError("The parameter index must be an integer between 0 and 31!")
if index < 16:
return self.l_frame.color(index)
else:
return self.r_frame.color(index - 16)
def illuminated(self, index):
if not 0 <= index <= 31:
raise ValueError("The parameter index must be an integer between 0 and 31!")
......
......@@ -14,6 +14,11 @@ class Frame:
raise ValueError("The parameter pixels must be a list of exactly 16 objects of the Pixel class!")
self._pixels = pixels
def color(self, index):
if not 0 <= index <= 15:
raise ValueError("The parameter index must be an integer between 0 and 15!")
return self.pixels[index].color
def illuminated(self, index):
if not 0 <= index <= 15:
raise ValueError("The parameter index must be an integer between 0 and 15!")
......
......@@ -2,10 +2,10 @@
import time
from visualising.communication.channel.message.msg_frame import MsgFrame
from visualising.communication.channel.message.msg_instr import MsgInstr
from visualising.communication.channel.message.response import Response
from visualising.communication.illustration.animation import Animation
from visualising.communication.animation.animation import Animation
from visualising.communication.message.msg_frame import MsgFrame
from visualising.communication.message.msg_instr import MsgInstr
from visualising.communication.message.response import Response
class Arduino:
......@@ -52,9 +52,6 @@ class Arduino:
# animation is then started. In addition, it is checked whether the
# animation was played successfully.
def play_animation(self, animation):
if not isinstance(animation, Animation):
raise TypeError("The parameter animation must be an object of the Animation class!")
self.load_animation(animation)
self.start_playback()
......@@ -73,14 +70,13 @@ class Arduino:
# the received ensemble and then confirms the playback, whereupon a new
# ensemble is sent.
def stream_animation(self, animation):
if not isinstance(animation, Animation):
raise TypeError("The parameter animation must be an object of the Animation class!")
for _ in range(animation.num_iter):
for ensemble in animation.ensembles:
self.play_animation(Animation([ensemble], animation.ensemble_time, 1))
# Streams multiple animations in succession to the Arduino.
# Streams multiple animations in succession to the Arduino. This functionality
# makes it possible to send individual ensembles as animations and thus to play
# back individual ensembles with individual ensemble time.
def stream_animations(self, animations):
for animation in animations:
self.stream_animation(animation)
......@@ -3,7 +3,7 @@
import time
import serial
from visualising.communication.channel.message.response import Response
from visualising.communication.message.response import Response
class ArduinoException(Exception):
......@@ -55,10 +55,10 @@ class Connection:
raise ValueError("The parameter resends must be an integer greater 0!")
def evaluate():
bool = resends > 0
boolean = resends > 0
for expectation in expectations:
bool = bool and not response.compare(expectation)
return bool
boolean = boolean and not response.compare(expectation)
return boolean
while True:
self.send_msg(msg)
......
#!/usr/bin/env python
from visualising.communication.channel.message.message import Message
from visualising.communication.message.message import Message
class MsgFrame(Message):
......
#!/usr/bin/env python
from visualising.communication.channel.message.message import Message
from visualising.communication.message.message import Message
class MsgInstr(Message):
......
......@@ -4,7 +4,16 @@ class Response:
# Generates an Arduino message, which is composed of a message byte from the Arduino
# and a translation of the message byte.
def __init__(self, byte):
self.byte = byte
self._byte = byte
self.desc = self.translate()
@property
def byte(self):
return self._byte
@byte.setter
def byte(self, byte):
self._byte = byte
def translate(self):
translation = {
......
#!/usr/bin/env python
import time
import rospy
import numpy as np
from visualising.expression.tool import Tool
from visualising.communication.animation.color.rgb import RGB
from visualising.communication.animation.pixel import Pixel
from visualising.communication.animation.ensemble import Ensemble
from visualising.communication.animation.animation import Animation
from visualising.expression.library.raw import Raw
from visualising.expression.expression import Expression
from visualising.communication.illustration.animation import Animation
from visualising.communication.illustration.color.rgb import RGB
from visualising.expression.tool import Tool
class Abstract(Expression):
def __init__(self, arduino):
self.arduino = arduino
self.playback_start = 0
self._arduino = arduino
self.displayed = True
@property
def arduino(self):
return self._arduino
@arduino.setter
def arduino(self, arduino):
self._arduino = arduino
def react(self, state):
if len(state[1]) > 32:
rospy.logwarn("Only displaying the first 32 metric aggregations!")
state = state[:32]
def react(self, condition):
if self.displayed:
self.displayed = False
animations = Abstract.highlight(Abstract.cycle(condition.error()), condition.name(), condition.error(), 5)
self.arduino.stream_animations(animations)
self.displayed = True
@staticmethod
def cycle(state_vector):
ensemble = Tool.create_empty_ensemble()
def cycle(error_vector):
empty = Ensemble.empty()
index = 0
for state in state_vector:
color = Tool.state_to_color(state, RGB(0, 50, 0), RGB(50, 0, 0))
ensemble = Tool.set_pixel_color(ensemble, index, color)
for error_value in error_vector:
color = Tool.value_to_color(error_value, RGB(0, 50, 0), RGB(50, 0, 0))
empty.replace_pixel(index, Pixel(color))
index = index + 1
return empty
return ensemble
@staticmethod
def write(text, ring, index=None, color=None):
if not text.isalpha():
raise ValueError("The parameter text must be a string that only contains alphabetic characters!")
text = text.upper()
ensembles = []
for char in text:
if ring == "l":
filename = "l_" + char + ".txt"
elif ring == "r":
filename = "r_" + char + ".txt"
else:
raise ValueError("The parameter ring must be a character that is either l or r!")
letter_ensemble = Raw.to_ensemble_list(filename, "alphabet")
if index is not None:
letter_ensemble[0].replace_pixel(index, Pixel(color))
ensembles = ensembles + letter_ensemble
if index is not None:
empty = Ensemble.empty()
empty.replace_pixel(index, Pixel(color))
ensembles.append(empty)
else:
ensembles.append(Ensemble.empty())
return ensembles
@staticmethod
def highlight(cycle, states, text, number):
number = min(len(states), number)
highlight = sorted(zip(states, range(len(states))), reverse=True)[:number]
def highlight(generated_cycle, name_vector, error_vector, number):
number = min(len(name_vector), number)
zipped = sorted(zip(error_vector, range(len(error_vector))), reverse=True)[:number]
cycle_animation = Animation([cycle], 5000, 1)
cycle_animation = Animation([generated_cycle], 5000, 1)
animations = [cycle_animation]
for tuple in zipped:
highlighted_ring = ""
highlighted_color = None
highlighted_index = None
for tuple in highlight:
empty = Ensemble.empty()
ensembles = []
highlight = Tool.create_empty_ensemble()
ring = ""
highlight_pixel = None
color = None
for i in range(32):
if tuple[1] != i:
highlight = Tool.set_pixel_color(highlight, i, RGB(0, 0, 0))
empty.replace_pixel(i, Pixel(RGB(0, 0, 0)))
else:
highlight_pixel = i
highlighted_index = i
if i < 16:
ring = "l"
color = cycle.l_frame.pixels[i].color
highlighted_ring = "l"
else:
ring = "r"
color = cycle.r_frame.pixels[i % 16].color
highlighted_ring = "r"
highlight = Tool.set_pixel_color(highlight, i, color)
highlighted_color = generated_cycle.color(i)
empty.replace_pixel(i, Pixel(highlighted_color))
ensembles.append(highlight)
ensembles = ensembles + Abstract.write(text[highlight_pixel], ring, highlight_pixel, color)
ensembles.append(empty)
ensembles = ensembles + Abstract.write(name_vector[highlighted_index], highlighted_ring, highlighted_index,
highlighted_color)
animations.append(Animation(ensembles, 1000, 1))
animations.append(cycle_animation)
animations.append(Animation([Tool.create_empty_ensemble()], 1000, 1))
animations.append(Animation([Ensemble.empty()], 1000, 1))
return animations
@staticmethod
def write(text, ring, highlight=None, color=None):
if not isinstance(text, str) or not text.isalpha():
raise ValueError("The parameter text must be a string that only contains alphabetic characters!")
text = text.upper()
ensembles = []
for char in text:
if ring == "l":
file = "l_" + char + ".txt"
elif ring == "r":
file = "r_" + char + ".txt"
else:
raise ValueError("The parameter index must be a string that is either l or r!")
letter = Tool.create_ensembles(file, "alphabet")
if highlight is not None:
letter = [Tool.set_pixel_color(letter[0], highlight, color)]
ensembles = ensembles + letter
if highlight is not None:
ensembles.append(Tool.set_pixel_color(Tool.create_empty_ensemble(), highlight, color))
else:
ensembles.append(Tool.create_empty_ensemble())
return ensembles
#!/usr/bin/env python
import time
from visualising.communication.animation.color.rgb import RGB
from visualising.communication.animation.pixel import Pixel
from visualising.communication.animation.ensemble import Ensemble
from visualising.communication.animation.animation import Animation
from visualising.expression.library.raw import Raw
from visualising.communication.illustration.ensemble import Ensemble
from visualising.communication.illustration.color.rgb import RGB
from visualising.expression.expression import Expression
from visualising.expression.tool import Tool
class Emotion(Expression):
def __init__(self, arduino):
self.arduino = arduino
self.playback_start = 0
self.refresh = 5
self._arduino = arduino
self.displayed = True
self.happy = "emotion_happy.txt"
self.ok = "emotion_ok.txt"
self.angry = "emotion_angry.txt"
self.happy = "expression_happy.txt"
self.ok = "expression_ok.txt"
self.angry = "expression_angry.txt"
@property
def arduino(self):
return self._arduino
def react(self, state):
if time.time() - self.playback_start > self.refresh:
color = Tool.state_to_color(state[0], RGB(25, 25, 25), RGB(25, 0, 0))
if state[0] > 0.7:
@arduino.setter
def arduino(self, arduino):
self._arduino = arduino
def react(self, condition):
aggregated_domains = condition.aggregated_domains
if self.displayed:
self.displayed = False
color = Tool.value_to_color(aggregated_domains, RGB(25, 25, 25), RGB(25, 0, 0))
if aggregated_domains > 0.7:
self.visualise(self.angry, color)
else:
if state[0] > 0.3:
if aggregated_domains > 0.3:
self.visualise(self.ok, color)
else:
self.visualise(self.happy, color)
def visualise(self, filepath, color):
still = Raw.to_ensemble_list(filepath, "emotion")
buildup = Tool.build_emotion(still[0], color)
buildup = Emotion.build_parallel(Tool.color_ensemble(still[0], color), color)
animation = Animation(buildup, 50, 1)
self.arduino.stream_animation(animation)
self.playback_start = time.time()
self.displayed = True
# The function is given an ensemble. Then an animation is generated that builds
# up the given ensemble.
@staticmethod
def build_emotion(still, color):
def build(still, color):
ensembles = []
for i in range(32 + 1):
empty = Ensemble.empty()
if i < 32:
for j in range(i + 1):
if still.illuminated(j) or (j == i and i != 32):
if still.illuminated(j) or j == i:
empty.replace_pixel(j, Pixel(color))
ensembles.append(empty)
else:
ensembles.append(still)
ensembles.append(still)
return ensembles
# The function is given an ensemble. Then an animation is generated that builds
# up the given ensemble. The generated animation builds up the given ensemble
# on both NeoPixel rings in parallel.
@staticmethod
def build_emotion_parallel(still, color):
def build_parallel(still, color):
ensembles = []
for i in range(16 + 1):
empty = Ensemble.empty()
if i < 16:
for j in range(i + 1):
if still.l_frame.illuminated(j):
empty.replace_pixel(j, Pixel(color))
if still.r_frame.illuminated(j):
if still.illuminated(j):
empty.replace_pixel(j, Pixel(color))
if still.illuminated(j + 16):
empty.replace_pixel(j + 16, Pixel(color))
if j == i:
empty.replace_pixel(j, Pixel(color))
empty.replace_pixel(16 + j, Pixel(color))
ensembles.append(ensemble)
ensemble = Tool.create_empty_ensemble()
for k in range(32):
if k < 16:
illuminated = blueprint.l_frame.illuminated(k)
empty.replace_pixel(j + 16, Pixel(color))
ensembles.append(empty)
else:
illuminated = blueprint.r_frame.illuminated(k % 16)
if illuminated:
ensemble = Tool.set_pixel_color(ensemble, k, color)
ensembles.append(ensemble)
return Animation(ensembles, time, 1)
ensembles.append(still)
ensembles.append(still)
return ensembles
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment