Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions cflib/utils/supervisor_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# -*- coding: utf-8 -*-
#
# ,---------, ____ _ __
# | ,-^-, | / __ )(_) /_______________ _____ ___
# | ( O ) | / __ / / __/ ___/ ___/ __ `/_ / / _ \
# | / ,--' | / /_/ / / /_/ /__/ / / /_/ / / /_/ __/
# +------` /_____/_/\__/\___/_/ \__,_/ /___/\___/
#
# Copyright (C) 2025 Bitcraze AB
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, in version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
This class is used to easily get the state of your Crazyflie by through
the supervisor. Check the `reading_supervisor.py` example in
examples/supervisor/ to better understand how it works.
"""
import threading

from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a must but could be nice with a short explanation on how this class is intended to be used/what it is good for.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a small description that refers users to the reading_supervisor.py example

class SupervisorState:
STATES = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe replace "can be" with "ready to" (for example ready to fly)? Matches with other places and might be a bit more clear.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted the states to match the corresponding firmware ones. Changed "Auto armed" to "Is_auto_armed"

'Can be armed',
'Is armed',
'Is auto armed',
'Can fly',
'Is flying',
'Is tumbled',
'Is locked',
'Is crashed',
'HL control is active',
'Finished HL trajectory',
'HL control is disabled'
]

def __init__(self, crazyflie):
if isinstance(crazyflie, SyncCrazyflie):
self.cf = crazyflie.cf
else:
self.cf = crazyflie

self.logconf = LogConfig(name='SupervisorInfo', period_in_ms=100)
self.logconf.add_variable('supervisor.info', 'uint16_t')
self.cf.log.add_config(self.logconf)

def close(self):
try:
self.logconf.delete()
except Exception as e:
print(f'Warning: failed to delete logconf: {e}')

def read_supervisor_state_bitfield(self):
"""
Reads 'supervisor.info' once from the Crazyflie.
Returns the value or None if timed out.
"""
value_holder = {'val': None}
event = threading.Event()

def log_callback(timestamp, data, logconf):
value_holder['val'] = data['supervisor.info']
event.set()

def log_error(logconf, msg):
print(f'Error when logging {logconf.name}: {msg}')
event.set()

self.logconf.data_received_cb.add_callback(log_callback)
self.logconf.error_cb.add_callback(log_error)
self.logconf.start()

if event.wait(2.0):
bitfield = value_holder['val']
else:
print('Timeout waiting for supervisor.info')
bitfield = None

self.logconf.stop()
self.logconf.data_received_cb.remove_callback(log_callback)
self.logconf.error_cb.remove_callback(log_error)

return bitfield

def read_supervisor_state_list(self):
"""
Reads 'supervisor.info' once from the Crazyflie.
Returns the list of all active states.
"""
bitfield = self.read_supervisor_state_bitfield()
list = self.decode_bitfield(bitfield)
return list

def decode_bitfield(self, value):
"""
Given a bitfield integer `value` and a list of `self.STATES`,
returns the names of all states whose bits are set.
Bit 0 corresponds to states[0], Bit 1 to states[1], etc.

* Bit 0 = Can be armed - the system can be armed and will accept an arming command.
* Bit 1 = Is armed - the system is armed.
* Bit 2 = Is auto armed - the system is configured to automatically arm.
* Bit 3 = Can fly - the Crazyflie is ready to fly.
* Bit 4 = Is flying - the Crazyflie is flying.
* Bit 5 = Is tumbled - the Crazyflie is up side down.
* Bit 6 = Is locked - the Crazyflie is in the locked state and must be restarted.
* Bit 7 = Is crashed - the Crazyflie has crashed.
* Bit 8 = High level control is actively flying the drone.
* Bit 9 = High level trajectory has finished.
* Bit 10 = High level control is disabled and not producing setpoints.
"""
if value < 0:
raise ValueError('value must be >= 0')

result = []
for bit_index, name in enumerate(self.STATES):
if value & (1 << bit_index):
result.append(name)
return result

# Individual state checks
def can_be_armed(self):
# Bit 0
return bool(self.read_supervisor_state_bitfield() & (1 << 0))

def is_armed(self):
# Bit 1
return bool(self.read_supervisor_state_bitfield() & (1 << 1))

def is_auto_armed(self):
# Bit 2
return bool(self.read_supervisor_state_bitfield() & (1 << 2))

def can_fly(self):
# Bit 3
return bool(self.read_supervisor_state_bitfield() & (1 << 3))

def is_flying(self):
# Bit 4
return bool(self.read_supervisor_state_bitfield() & (1 << 4))

def is_tumbled(self):
# Bit 5
return bool(self.read_supervisor_state_bitfield() & (1 << 5))

def is_locked(self):
# Bit 6
return bool(self.read_supervisor_state_bitfield() & (1 << 6))

def is_crashed(self):
# Bit 7
return bool(self.read_supervisor_state_bitfield() & (1 << 7))

def active_hl_control(self):
# Bit 8
return bool(self.read_supervisor_state_bitfield() & (1 << 8))

def finished_hl_traj(self):
# Bit 9
return bool(self.read_supervisor_state_bitfield() & (1 << 9))

def disabled_hl_control(self):
# Bit 10
return bool(self.read_supervisor_state_bitfield() & (1 << 10))
87 changes: 87 additions & 0 deletions examples/supervisor/reading_supervisor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
#
# || ____ _ __
# +------+ / __ )(_) /_______________ _____ ___
# | 0xBC | / __ / / __/ ___/ ___/ __ `/_ / / _ \
# +------+ / /_/ / / /_/ /__/ / / /_/ / / /_/ __/
# || || /_____/_/\__/\___/_/ \__,_/ /___/\___/
#
# Copyright (C) 2025 Bitcraze AB
#
# Crazyflie Nano Quadcopter Client
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Simple example of reading the state of the Crazyflie through the supervisor.

Based on its state, the Crazyflie will arm (if it can be armed), take off
(if it can fly), and land (if it is flying). After each action, we call
the supervisor to check if the Crazyflie is crashed, locked or tumbled.
"""
import time

import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.utils import uri_helper
from cflib.utils.reset_estimator import reset_estimator
from cflib.utils.supervisor_state import SupervisorState


# URI to the Crazyflie to connect to
uri = uri_helper.uri_from_env(default='radio://0/80/2M/E7E7E7E7E7')


def safety_check(sup: SupervisorState):
if sup.is_crashed():
raise Exception('Crazyflie crashed!')
if sup.is_locked():
raise Exception('Crazyflie locked!')
if sup.is_tumbled():
raise Exception('Crazyflie tumbled!')


def run_sequence(scf: SyncCrazyflie, sup: SupervisorState):
commander = scf.cf.high_level_commander

try:
if sup.can_be_armed():
print('The Crazyflie can be armed...arming!')
safety_check(sup)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a print similar to: print('The Crazyflie can arm...arming!') to match the other states.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Added the print.

scf.cf.platform.send_arming_request(True)
time.sleep(1)

if sup.can_fly():
print('The Crazyflie can fly...taking off!')
commander.takeoff(1.0, 2.0)
time.sleep(3)
safety_check(sup)
if sup.is_flying():
print('The Crazyflie is flying...landing!')
commander.land(0.0, 2.0)
time.sleep(3)
safety_check(sup)

except Exception as e:
print(e)


if __name__ == '__main__':
cflib.crtp.init_drivers()

with SyncCrazyflie(uri, cf=Crazyflie(rw_cache='./cache')) as scf:
time.sleep(1)
supervisor = SupervisorState(scf)
reset_estimator(scf)
time.sleep(1)
run_sequence(scf, supervisor)