Skip to content

Alvik lifted/dropped events #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 19, 2025
4 changes: 2 additions & 2 deletions arduino_alvik/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

__author__ = "Lucio Rossi <[email protected]>, Giovanni Bruno <[email protected]>"
__license__ = "MPL 2.0"
__version__ = "1.1.2"
__version__ = "1.1.3"
__maintainer__ = "Lucio Rossi <[email protected]>, Giovanni Bruno <[email protected]>"
__required_firmware_version__ = "1.1.0"
__required_firmware_version__ = "1.1.1"

from .arduino_alvik import *
155 changes: 141 additions & 14 deletions arduino_alvik/arduino_alvik.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,72 @@
from .__init__ import __required_firmware_version__


def writes_uart(method):
def wrapper(*args, **kwargs):
with ArduinoAlvik._write_lock:
return method(*args, **kwargs)

return wrapper


def reads_uart(method):
def wrapper(*args, **kwargs):
with ArduinoAlvik._read_lock:
return method(*args, **kwargs)

return wrapper


class _AlvikRLock:
def __init__(self):
"""Alvik re-entrant Lock implementation"""
self._lock = _thread.allocate_lock()
self._owner = None
self._count = 0

def acquire(self):
tid = _thread.get_ident()

if self._owner == tid:
self._count += 1
return True

self._lock.acquire()
self._owner = tid
self._count = 1
return True

def release(self):
tid = _thread.get_ident()

if self._owner != tid:
raise RuntimeError("Cannot release an unowned lock")

self._count -= 1
if self._count == 0:
self._owner = None
self._lock.release()

def locked(self):
return self._lock.locked()

def __enter__(self):
self.acquire()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.release()


class ArduinoAlvik:
_update_thread_running = False
_update_thread_id = None
_events_thread_running = False
_events_thread_id = None

_write_lock = _AlvikRLock()
_read_lock = _AlvikRLock()

def __new__(cls):
if not hasattr(cls, '_instance'):
cls._instance = super(ArduinoAlvik, cls).__new__(cls)
Expand Down Expand Up @@ -119,7 +179,8 @@ def _print_battery_status(percentage: float, is_charging) -> None:
word = marks_str + f" {percentage}% {charging_str} \t"
print(word, end='')

def _lenghty_op(self, iterations=10000000) -> int:
@staticmethod
def _lengthy_op(self, iterations=10000000) -> int:
result = 0
for i in range(1, iterations):
result += i * i
Expand All @@ -134,7 +195,7 @@ def _idle(self, delay_=1, check_on_thread=False, blocking=False) -> None:
self.i2c.set_single_thread(True)

if blocking:
self._lenghty_op(50000)
self._lengthy_op(50000)
else:
sleep_ms(500)
led_val = 0
Expand All @@ -157,7 +218,7 @@ def _idle(self, delay_=1, check_on_thread=False, blocking=False) -> None:
self._battery_perc = abs(soc_perc)
self._print_battery_status(round(soc_perc), self._battery_is_charging)
if blocking:
self._lenghty_op(10000)
self._lengthy_op(10000)
else:
sleep_ms(delay_)
if soc_perc > 97:
Expand All @@ -169,13 +230,13 @@ def _idle(self, delay_=1, check_on_thread=False, blocking=False) -> None:
led_val = (led_val + 1) % 2
self.i2c.set_single_thread(False)
if self.is_on():
print("********** Alvik is on **********")
print("\n********** Alvik is on **********")
except KeyboardInterrupt:
self.stop()
sys.exit()
except Exception as e:
pass
print(f'Unable to read SOC: {e}')
print(f'\nUnable to read SOC: {e}')
finally:
LEDR.value(1)
LEDG.value(1)
Expand Down Expand Up @@ -232,7 +293,7 @@ def begin(self) -> int:
self.set_behaviour(2)
self._set_color_reference()
if self._has_events_registered():
print('Starting events thread')
print('\n********** Starting events thread **********\n')
self._start_events_thread()
self.set_servo_positions(90, 90)
return 0
Expand Down Expand Up @@ -279,6 +340,7 @@ def _wait_for_fw_check(self, timeout=5) -> bool:
return False

@staticmethod
@reads_uart
def _flush_uart():
"""
Empties the UART buffer
Expand Down Expand Up @@ -313,6 +375,7 @@ def _wait_for_target(self, idle_time):
# print(self._last_ack)
sleep_ms(100)

@writes_uart
def is_target_reached(self) -> bool:
"""
Returns True if robot has sent an M or R acknowledgment.
Expand All @@ -330,6 +393,7 @@ def is_target_reached(self) -> bool:
return True
return False

@writes_uart
def set_behaviour(self, behaviour: int):
"""
Sets the behaviour of Alvik
Expand All @@ -339,6 +403,7 @@ def set_behaviour(self, behaviour: int):
self._packeter.packetC1B(ord('B'), behaviour & 0xFF)
uart.write(self._packeter.msg[0:self._packeter.msg_size])

@writes_uart
def rotate(self, angle: float, unit: str = 'deg', blocking: bool = True):
"""
Rotates the robot by given angle
Expand All @@ -355,6 +420,7 @@ def rotate(self, angle: float, unit: str = 'deg', blocking: bool = True):
if blocking:
self._wait_for_target(idle_time=(angle / MOTOR_CONTROL_DEG_S))

@writes_uart
def move(self, distance: float, unit: str = 'cm', blocking: bool = True):
"""
Moves the robot by given distance
Expand Down Expand Up @@ -408,6 +474,7 @@ def get_wheels_speed(self, unit: str = 'rpm') -> (float | None, float | None):
"""
return self.left_wheel.get_speed(unit), self.right_wheel.get_speed(unit)

@writes_uart
def set_wheels_speed(self, left_speed: float, right_speed: float, unit: str = 'rpm'):
"""
Sets left/right motor speed
Expand All @@ -427,12 +494,13 @@ def set_wheels_speed(self, left_speed: float, right_speed: float, unit: str = 'r
self._packeter.packetC2F(ord('J'), left_speed, right_speed)
uart.write(self._packeter.msg[0:self._packeter.msg_size])

@writes_uart
def set_wheels_position(self, left_angle: float, right_angle: float, unit: str = 'deg', blocking: bool = True):
"""
Sets left/right motor angle
:param left_angle:
:param right_angle:
:param unit: the speed unit of measurement (default: 'rpm')
:param unit: the speed unit of measurement (default: 'deg')
:param blocking:
:return:
"""
Expand Down Expand Up @@ -490,6 +558,7 @@ def get_line_sensors(self) -> (int | None, int | None, int | None):

return self._left_line, self._center_line, self._right_line

@writes_uart
def drive(self, linear_velocity: float, angular_velocity: float, linear_unit: str = 'cm/s',
angular_unit: str = 'deg/s'):
"""
Expand Down Expand Up @@ -530,6 +599,7 @@ def get_drive_speed(self, linear_unit: str = 'cm/s', angular_unit: str = 'deg/s'

return convert_speed(self._linear_velocity, 'mm/s', linear_unit), angular_velocity

@writes_uart
def reset_pose(self, x: float, y: float, theta: float, distance_unit: str = 'cm', angle_unit: str = 'deg'):
"""
Resets the robot pose
Expand Down Expand Up @@ -559,6 +629,7 @@ def get_pose(self, distance_unit: str = 'cm', angle_unit: str = 'deg') \
convert_distance(self._y, 'mm', distance_unit),
convert_angle(self._theta, 'deg', angle_unit))

@writes_uart
def set_servo_positions(self, a_position: int, b_position: int):
"""
Sets A/B servomotor angle
Expand Down Expand Up @@ -586,10 +657,16 @@ def get_ack(self) -> str:
"""
return self._last_ack

# def send_ack(self):
# self._packeter.packetC1B(ord('X'), ACK_)
# uart.write(self._packeter.msg[0:self._packeter.msg_size])
@writes_uart
def send_ack(self, ack: str = 'K'):
"""
Sends an ack message on UART
:return:
"""
self._packeter.packetC1B(ord('X'), ord(ack))
uart.write(self._packeter.msg[0:self._packeter.msg_size])

@writes_uart
def _set_leds(self, led_state: int):
"""
Sets the LEDs state
Expand Down Expand Up @@ -651,6 +728,7 @@ def _update(self, delay_=1):
self._read_message()
sleep_ms(delay_)

@reads_uart
def _read_message(self) -> None:
"""
Read a message from the uC
Expand Down Expand Up @@ -1259,6 +1337,24 @@ def on_shake(self, callback: callable, args: tuple = ()) -> None:
"""
self._move_events.register_callback('on_shake', callback, args)

def on_lift(self, callback: callable, args: tuple = ()) -> None:
"""
Register callback when Alvik is lifted
:param callback:
:param args:
:return:
"""
self._move_events.register_callback('on_lift', callback, args)

def on_drop(self, callback: callable, args: tuple = ()) -> None:
"""
Register callback when Alvik is dropped
:param callback:
:param args:
:return:
"""
self._move_events.register_callback('on_drop', callback, args)

def on_x_tilt(self, callback: callable, args: tuple = ()) -> None:
"""
Register callback when Alvik is tilted on X-axis
Expand Down Expand Up @@ -1521,15 +1617,15 @@ def writeto_mem(self, addr, memaddr, buf, addrsize=8) -> None:
return i2c.writeto_mem(addr, memaddr, buf, addrsize=addrsize)



class _ArduinoAlvikServo:

def __init__(self, packeter: ucPack, label: str, servo_id: int, position: list[int | None]):
self._packeter = packeter
self._label = label
self._id = servo_id
self._position = position


@writes_uart
def set_position(self, position):
"""
Sets the position of the servo
Expand Down Expand Up @@ -1558,7 +1654,8 @@ def __init__(self, packeter: ucPack, label: int, wheel_diameter_mm: float = WHEE
self._speed = None
self._position = None
self._alvik = alvik


@writes_uart
def reset(self, initial_position: float = 0.0, unit: str = 'deg'):
"""
Resets the wheel reference position
Expand All @@ -1570,6 +1667,7 @@ def reset(self, initial_position: float = 0.0, unit: str = 'deg'):
self._packeter.packetC2B1F(ord('W'), self._label & 0xFF, ord('Z'), initial_position)
uart.write(self._packeter.msg[0:self._packeter.msg_size])

@writes_uart
def set_pid_gains(self, kp: float = MOTOR_KP_DEFAULT, ki: float = MOTOR_KI_DEFAULT, kd: float = MOTOR_KD_DEFAULT):
"""
Set PID gains for Alvik wheels
Expand All @@ -1589,6 +1687,7 @@ def stop(self):
"""
self.set_speed(0)

@writes_uart
def set_speed(self, velocity: float, unit: str = 'rpm'):
"""
Sets the motor speed
Expand Down Expand Up @@ -1625,6 +1724,7 @@ def get_position(self, unit: str = 'deg') -> float | None:
"""
return convert_angle(self._position, 'deg', unit)

@writes_uart
def set_position(self, position: float, unit: str = 'deg', blocking: bool = True):
"""
Sets left/right motor speed
Expand Down Expand Up @@ -1655,6 +1755,7 @@ def __init__(self, packeter: ucPack, label: str, led_state: list[int | None], rg
self._rgb_mask = rgb_mask
self._led_state = led_state

@writes_uart
def set_color(self, red: bool, green: bool, blue: bool):
"""
Sets the LED's r,g,b state
Expand Down Expand Up @@ -1965,7 +2066,7 @@ class _ArduinoAlvikMoveEvents(_ArduinoAlvikEvents):
Event class to handle move events
"""

available_events = ['on_shake', 'on_x_tilt', 'on_y_tilt', 'on_z_tilt',
available_events = ['on_shake', 'on_lift', 'on_drop', 'on_x_tilt', 'on_y_tilt', 'on_z_tilt',
'on_nx_tilt', 'on_ny_tilt', 'on_nz_tilt']

NZ_TILT = 0x80
Expand All @@ -1992,6 +2093,26 @@ def _is_shaken(current_state, new_state) -> bool:
"""
return not bool(current_state & 0b00000001) and bool(new_state & 0b00000001)

@staticmethod
def _is_lifted(current_state, new_state) -> bool:
"""
True if Alvik was lifted
:param current_state:
:param new_state:
:return:
"""
return not bool(current_state & 0b00000010) and bool(new_state & 0b00000010)

@staticmethod
def _is_dropped(current_state, new_state) -> bool:
"""
True if Alvik was dropped
:param current_state:
:param new_state:
:return:
"""
return bool(current_state & 0b00000010) and not bool(new_state & 0b00000010)

@staticmethod
def _is_x_tilted(current_state, new_state) -> bool:
"""
Expand Down Expand Up @@ -2065,6 +2186,12 @@ def update_state(self, state: int | None):
if self._is_shaken(self._current_state, state):
self.execute_callback('on_shake')

if self._is_lifted(self._current_state, state):
self.execute_callback('on_lift')

if self._is_dropped(self._current_state, state):
self.execute_callback('on_drop')

if self._is_x_tilted(self._current_state, state):
self.execute_callback('on_x_tilt')

Expand Down
Loading