diff --git a/apps/hci_bridge.py b/apps/hci_bridge.py index e50d4051..4114bf44 100644 --- a/apps/hci_bridge.py +++ b/apps/hci_bridge.py @@ -81,7 +81,9 @@ def host_to_controller_filter(hci_packet): response = hci.HCI_Command_Complete_Event( num_hci_command_packets=1, command_opcode=hci_packet.op_code, - return_parameters=bytes([hci.HCI_SUCCESS]), + return_parameters=hci.HCI_StatusReturnParameters( + status=hci.HCI_ErrorCode.SUCCESS + ), ) # Return a packet with 'respond to sender' set to True return (bytes(response), True) diff --git a/bumble/bridge.py b/bumble/bridge.py index 27ff3abd..5e42df7d 100644 --- a/bumble/bridge.py +++ b/bumble/bridge.py @@ -37,7 +37,12 @@ def __init__(self, hci_sink, sender_hci_sink, packet_filter, trace): def on_packet(self, packet): # Convert the packet bytes to an object - hci_packet = HCI_Packet.from_bytes(packet) + try: + hci_packet = HCI_Packet.from_bytes(packet) + except Exception: + logger.warning('forwarding unparsed packet as-is') + self.hci_sink.on_packet(packet) + return # Filter the packet if self.packet_filter is not None: @@ -50,7 +55,10 @@ def on_packet(self, packet): return # Analyze the packet - self.trace(hci_packet) + try: + self.trace(hci_packet) + except Exception: + logger.exception('Exception while tracing packet') # Bridge the packet self.hci_sink.on_packet(packet) diff --git a/bumble/device.py b/bumble/device.py index ffaaa7b1..f2aee7eb 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -1177,7 +1177,7 @@ class ChannelSoundingCapabilities: rtt_capability: int rtt_aa_only_n: int rtt_sounding_n: int - rtt_random_payload_n: int + rtt_random_sequence_n: int nadm_sounding_capability: int nadm_random_capability: int cs_sync_phys_supported: int @@ -2763,24 +2763,39 @@ async def send_command( logger.warning(f'!!! Command {command.name} timed out') raise CommandTimeoutError() from error - async def send_sync_command( - self, command: hci.HCI_SyncCommand[_RP], check_status: bool = True - ) -> _RP: + async def send_sync_command(self, command: hci.HCI_SyncCommand[_RP]) -> _RP: ''' Send a synchronous command via the host. + If the `status` field of the response's `return_parameters` is not equal to + `SUCCESS` an exception is raised. + Params: command: the command to send. - check_status: If `True`, check the `status` field of the response's - `return_parameters` and raise and exception if not equal to `SUCCESS`. Returns: An instance of the return parameters class associated with the command class. ''' try: - return await self.host.send_sync_command( - command, check_status, self.command_timeout - ) + return await self.host.send_sync_command(command, self.command_timeout) + except asyncio.TimeoutError as error: + logger.warning(f'!!! Command {command.name} timed out') + raise CommandTimeoutError() from error + + async def send_sync_command_raw( + self, command: hci.HCI_SyncCommand[_RP] + ) -> hci.HCI_Command_Complete_Event[_RP]: + ''' + Send a synchronous command via the host without checking the response. + + Params: + command: the command to send. + + Returns: + An HCI_Command_Complete_Event instance. + ''' + try: + return await self.host.send_sync_command_raw(command, self.command_timeout) except asyncio.TimeoutError as error: logger.warning(f'!!! Command {command.name} timed out') raise CommandTimeoutError() from error @@ -2812,12 +2827,13 @@ async def power_on(self) -> None: await self.host.reset() # Try to get the public address from the controller - response = await self.host.send_sync_command( - hci.HCI_Read_BD_ADDR_Command(), check_status=False - ) - if response.status == hci.HCI_SUCCESS: + try: + response = await self.host.send_sync_command(hci.HCI_Read_BD_ADDR_Command()) logger.debug(color(f'BD_ADDR: {response.bd_addr}', 'yellow')) self.public_address = response.bd_addr + except hci.HCI_Error: + logger.debug('Controller has no public address') + pass # Instantiate the Key Store (we do this here rather than at __init__ time # because some Key Store implementations use the public address as a namespace) @@ -2926,7 +2942,7 @@ async def power_on(self) -> None: rtt_capability=result.rtt_capability, rtt_aa_only_n=result.rtt_aa_only_n, rtt_sounding_n=result.rtt_sounding_n, - rtt_random_payload_n=result.rtt_random_payload_n, + rtt_random_sequence_n=result.rtt_random_sequence_n, nadm_sounding_capability=result.nadm_sounding_capability, nadm_random_capability=result.nadm_random_capability, cs_sync_phys_supported=result.cs_sync_phys_supported, @@ -2954,27 +2970,23 @@ async def power_on(self) -> None: ) if self.classic_enabled: - await self.send_sync_command( - hci.HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')), - check_status=False, + await self.send_sync_command_raw( + hci.HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) ) - await self.send_sync_command( + await self.send_sync_command_raw( hci.HCI_Write_Class_Of_Device_Command( class_of_device=self.class_of_device - ), - check_status=False, + ) ) - await self.send_sync_command( + await self.send_sync_command_raw( hci.HCI_Write_Simple_Pairing_Mode_Command( simple_pairing_mode=int(self.classic_ssp_enabled) - ), - check_status=False, + ) ) - await self.send_sync_command( + await self.send_sync_command_raw( hci.HCI_Write_Secure_Connections_Host_Support_Command( secure_connections_host_support=int(self.classic_sc_enabled) - ), - check_status=False, + ) ) await self.set_connectable(self.connectable) await self.set_discoverable(self.discoverable) @@ -6673,7 +6685,7 @@ def on_cs_remote_supported_capabilities( rtt_capability=event.rtt_capability, rtt_aa_only_n=event.rtt_aa_only_n, rtt_sounding_n=event.rtt_sounding_n, - rtt_random_payload_n=event.rtt_random_payload_n, + rtt_random_sequence_n=event.rtt_random_sequence_n, nadm_sounding_capability=event.nadm_sounding_capability, nadm_random_capability=event.nadm_random_capability, cs_sync_phys_supported=event.cs_sync_phys_supported, diff --git a/bumble/drivers/intel.py b/bumble/drivers/intel.py index 71888b80..ccddd4f6 100644 --- a/bumble/drivers/intel.py +++ b/bumble/drivers/intel.py @@ -663,10 +663,13 @@ async def reboot_bootloader(self) -> None: async def read_device_info(self) -> dict[ValueType, Any]: self.host.ready = True - response1 = await self.host.send_sync_command( - hci.HCI_Reset_Command(), check_status=False - ) - if response1.status not in (hci.HCI_UNKNOWN_HCI_COMMAND_ERROR, hci.HCI_SUCCESS): + response1 = await self.host.send_sync_command_raw(hci.HCI_Reset_Command()) + if not isinstance( + response1.return_parameters, hci.HCI_StatusReturnParameters + ) or response1.return_parameters.status not in ( + hci.HCI_UNKNOWN_HCI_COMMAND_ERROR, + hci.HCI_SUCCESS, + ): # When the controller is in operational mode, the response is a # successful response. # When the controller is in bootloader mode, @@ -676,13 +679,18 @@ async def read_device_info(self) -> dict[ValueType, Any]: raise DriverError("unexpected HCI response") # Read the firmware version. - response2 = await self.host.send_sync_command( - HCI_Intel_Read_Version_Command(param0=0xFF), check_status=False + response2 = await self.host.send_sync_command_raw( + HCI_Intel_Read_Version_Command(param0=0xFF) ) - if response2.status != 0: # type: ignore + if ( + not isinstance( + response2.return_parameters, HCI_Intel_Read_Version_ReturnParameters + ) + or response2.return_parameters.status != 0 + ): raise DriverError("HCI_Intel_Read_Version_Command error") - tlvs = _parse_tlv(response2.tlv) # type: ignore + tlvs = _parse_tlv(response2.return_parameters.tlv) # type: ignore # Convert the list to a dict. That's Ok here because we only expect each type # to appear just once. diff --git a/bumble/drivers/rtk.py b/bumble/drivers/rtk.py index 03d03b72..24bb78b3 100644 --- a/bumble/drivers/rtk.py +++ b/bumble/drivers/rtk.py @@ -534,11 +534,13 @@ def check(host: Host) -> bool: @staticmethod async def get_loaded_firmware_version(host: Host) -> int | None: - response1 = await host.send_sync_command( - HCI_RTK_Read_ROM_Version_Command(), check_status=False - ) - - if response1.status != hci.HCI_SUCCESS: + response1 = await host.send_sync_command_raw(HCI_RTK_Read_ROM_Version_Command()) + if ( + not isinstance( + response1.return_parameters, HCI_RTK_Read_ROM_Version_ReturnParameters + ) + or response1.return_parameters.status != hci.HCI_SUCCESS + ): return None response2 = await host.send_sync_command( @@ -559,13 +561,20 @@ async def driver_info_for_host(cls, host: Host) -> DriverInfo | None: await host.send_sync_command(hci.HCI_Reset_Command()) host.ready = True - command = hci.HCI_Read_Local_Version_Information_Command() - response = await host.send_sync_command(command, check_status=False) - if response.status != hci.HCI_SUCCESS: + response = await host.send_sync_command_raw( + hci.HCI_Read_Local_Version_Information_Command() + ) + if ( + not isinstance( + response.return_parameters, + hci.HCI_Read_Local_Version_Information_ReturnParameters, + ) + or response.return_parameters.status != hci.HCI_SUCCESS + ): logger.error("failed to probe local version information") return None - local_version = response + local_version = response.return_parameters logger.debug( f"looking for a driver: 0x{local_version.lmp_subversion:04X} " @@ -641,15 +650,21 @@ async def download_for_rtl8723a(self): # TODO: load the firmware - async def download_for_rtl8723b(self): + async def download_for_rtl8723b(self) -> int | None: if self.driver_info.has_rom_version: - response1 = await self.host.send_sync_command( - HCI_RTK_Read_ROM_Version_Command(), check_status=False + response1 = await self.host.send_sync_command_raw( + HCI_RTK_Read_ROM_Version_Command() ) - if response1.status != hci.HCI_SUCCESS: + if ( + not isinstance( + response1.return_parameters, + HCI_RTK_Read_ROM_Version_ReturnParameters, + ) + or response1.return_parameters.status != hci.HCI_SUCCESS + ): logger.warning("can't get ROM version") return None - rom_version = response1.version + rom_version = response1.return_parameters.version logger.debug(f"ROM version before download: {rom_version:04X}") else: rom_version = 0 @@ -691,13 +706,18 @@ async def download_for_rtl8723b(self): logger.debug("download complete!") # Read the version again - response2 = await self.host.send_sync_command( - HCI_RTK_Read_ROM_Version_Command(), check_status=False + response2 = await self.host.send_sync_command_raw( + HCI_RTK_Read_ROM_Version_Command() ) - if response2.status != hci.HCI_SUCCESS: + if ( + not isinstance( + response2.return_parameters, HCI_RTK_Read_ROM_Version_ReturnParameters + ) + or response2.return_parameters.status != hci.HCI_SUCCESS + ): logger.warning("can't get ROM version") else: - rom_version = response2.version + rom_version = response2.return_parameters.version logger.debug(f"ROM version after download: {rom_version:02X}") return firmware.version diff --git a/bumble/hci.py b/bumble/hci.py index 474c1a80..d6696e0b 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -2407,24 +2407,28 @@ class HCI_Packet: @classmethod def from_bytes(cls, packet: bytes) -> HCI_Packet: - packet_type = packet[0] + try: + packet_type = packet[0] - if packet_type == HCI_COMMAND_PACKET: - return HCI_Command.from_bytes(packet) + if packet_type == HCI_COMMAND_PACKET: + return HCI_Command.from_bytes(packet) - if packet_type == HCI_ACL_DATA_PACKET: - return HCI_AclDataPacket.from_bytes(packet) + if packet_type == HCI_ACL_DATA_PACKET: + return HCI_AclDataPacket.from_bytes(packet) - if packet_type == HCI_SYNCHRONOUS_DATA_PACKET: - return HCI_SynchronousDataPacket.from_bytes(packet) + if packet_type == HCI_SYNCHRONOUS_DATA_PACKET: + return HCI_SynchronousDataPacket.from_bytes(packet) - if packet_type == HCI_EVENT_PACKET: - return HCI_Event.from_bytes(packet) + if packet_type == HCI_EVENT_PACKET: + return HCI_Event.from_bytes(packet) - if packet_type == HCI_ISO_DATA_PACKET: - return HCI_IsoDataPacket.from_bytes(packet) + if packet_type == HCI_ISO_DATA_PACKET: + return HCI_IsoDataPacket.from_bytes(packet) - return HCI_CustomPacket(packet) + return HCI_CustomPacket(packet) + except Exception as e: + logger.error(f'error parsing HCI packet [{packet.hex()}]: {e}') + raise def __init__(self, name: str) -> None: self.name = name @@ -2597,6 +2601,21 @@ class HCI_GenericReturnParameters(HCI_ReturnParameters): class HCI_StatusReturnParameters(HCI_ReturnParameters): status: HCI_ErrorCode = field(metadata=HCI_ErrorCode.type_metadata(1)) + @classmethod + def from_parameters(cls, parameters: bytes) -> Self | HCI_StatusReturnParameters: + status = HCI_ErrorCode(parameters[0]) + + if status != HCI_ErrorCode.SUCCESS: + # Don't parse further, just return the status. + return HCI_StatusReturnParameters(status=status) + + return cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields)) + + +@dataclasses.dataclass +class HCI_GenericStatusReturnParameters(HCI_StatusReturnParameters): + data: bytes = field(metadata=metadata('*')) + @dataclasses.dataclass class HCI_StatusAndAddressReturnParameters(HCI_StatusReturnParameters): @@ -5854,7 +5873,7 @@ class HCI_LE_CS_Read_Local_Supported_Capabilities_ReturnParameters( rtt_capability: int = field(metadata=metadata(1)) rtt_aa_only_n: int = field(metadata=metadata(1)) rtt_sounding_n: int = field(metadata=metadata(1)) - rtt_random_payload_n: int = field(metadata=metadata(1)) + rtt_random_sequence_n: int = field(metadata=metadata(1)) nadm_sounding_capability: int = field(metadata=metadata(2)) nadm_random_capability: int = field(metadata=metadata(2)) cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC)) @@ -5910,7 +5929,7 @@ class HCI_LE_CS_Write_Cached_Remote_Supported_Capabilities_Command( rtt_capability: int = field(metadata=metadata(1)) rtt_aa_only_n: int = field(metadata=metadata(1)) rtt_sounding_n: int = field(metadata=metadata(1)) - rtt_random_payload_n: int = field(metadata=metadata(1)) + rtt_random_sequence_n: int = field(metadata=metadata(1)) nadm_sounding_capability: int = field(metadata=metadata(2)) nadm_random_capability: int = field(metadata=metadata(2)) cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC)) @@ -7118,7 +7137,7 @@ class HCI_LE_CS_Read_Remote_Supported_Capabilities_Complete_Event(HCI_LE_Meta_Ev rtt_capability: int = field(metadata=metadata(1)) rtt_aa_only_n: int = field(metadata=metadata(1)) rtt_sounding_n: int = field(metadata=metadata(1)) - rtt_random_payload_n: int = field(metadata=metadata(1)) + rtt_random_sequence_n: int = field(metadata=metadata(1)) nadm_sounding_capability: int = field(metadata=metadata(2)) nadm_random_capability: int = field(metadata=metadata(2)) cs_sync_phys_supported: int = field(metadata=metadata(CS_SYNC_PHY_SUPPORTED_SPEC)) @@ -7494,6 +7513,7 @@ class HCI_Command_Complete_Event(HCI_Event, Generic[_RP]): def from_parameters(cls, parameters: bytes) -> Self: event = cls(**HCI_Object.dict_from_bytes(parameters, 0, cls.fields)) event.parameters = parameters + return_parameters_bytes = parameters[3:] # Find the class for the matching command. subclass = HCI_Command.command_classes.get(event.command_opcode) @@ -7506,16 +7526,16 @@ def from_parameters(cls, parameters: bytes) -> Self: 'HCI Command Complete event with opcode for a class that is not' ' an HCI_SyncCommand subclass: ' f'opcode={event.command_opcode:#04x}, ' - f'type={type(subclass).__name__}' + f'type={subclass.__name__}' ) event.return_parameters = HCI_GenericReturnParameters( - data=event.return_parameters # type: ignore[arg-type] + data=return_parameters_bytes ) # type: ignore[assignment] return event # Parse the return parameters bytes into an object. event.return_parameters = subclass.parse_return_parameters( - event.return_parameters # type: ignore[arg-type] + return_parameters_bytes ) # type: ignore[assignment] return event diff --git a/bumble/host.py b/bumble/host.py index ed8ec7eb..54f30bf3 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -270,7 +270,12 @@ def __init__( self.sco_links = {} # SCO links, by connection handle self.bigs = {} # BIG Handle to BIS Handles self.pending_command: hci.HCI_SyncCommand | hci.HCI_AsyncCommand | None = None - self.pending_response: asyncio.Future[Any] | None = None + self.pending_response: ( + asyncio.Future[ + hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event + ] + | None + ) = None self.number_of_supported_advertising_sets = 0 self.maximum_advertising_data_length = 31 self.local_version: ( @@ -658,25 +663,35 @@ async def _send_command( response_timeout: float | None = None, ) -> hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event: # Wait until we can send (only one pending command at a time) - async with self.command_semaphore: - assert self.pending_command is None - assert self.pending_response is None - - # Create a future value to hold the eventual response - self.pending_response = asyncio.get_running_loop().create_future() - self.pending_command = command - - try: - self.send_hci_packet(command) - return await asyncio.wait_for( - self.pending_response, timeout=response_timeout - ) - except Exception: - logger.exception(color("!!! Exception while sending command:", "red")) - raise - finally: - self.pending_command = None - self.pending_response = None + await self.command_semaphore.acquire() + + # Create a future value to hold the eventual response + assert self.pending_command is None + assert self.pending_response is None + self.pending_response = asyncio.get_running_loop().create_future() + self.pending_command = command + + response: ( + hci.HCI_Command_Complete_Event | hci.HCI_Command_Status_Event | None + ) = None + try: + self.send_hci_packet(command) + response = await asyncio.wait_for( + self.pending_response, timeout=response_timeout + ) + return response + except Exception: + logger.exception(color("!!! Exception while sending command:", "red")) + raise + finally: + self.pending_command = None + self.pending_response = None + if ( + response is not None + and response.num_hci_command_packets + and self.command_semaphore.locked() + ): + self.command_semaphore.release() @overload async def send_command( @@ -729,30 +744,42 @@ async def send_command( return response async def send_sync_command( + self, command: hci.HCI_SyncCommand[_RP], response_timeout: float | None = None + ) -> _RP: + response = await self.send_sync_command_raw(command, response_timeout) + return_parameters = response.return_parameters + + # Check the return parameters's status + if isinstance(return_parameters, hci.HCI_StatusReturnParameters): + status = return_parameters.status + elif isinstance(return_parameters, hci.HCI_GenericReturnParameters): + # if the payload has at least one byte, assume the first byte is the status + if not return_parameters.data: + raise RuntimeError('no status byte in return parameters') + status = hci.HCI_ErrorCode(return_parameters.data[0]) + else: + raise RuntimeError( + f'unexpected return parameters type ({type(return_parameters)})' + ) + if status != hci.HCI_ErrorCode.SUCCESS: + logger.warning( + f'{command.name} failed ' f'({hci.HCI_Constant.error_name(status)})' + ) + raise hci.HCI_Error(status) + + return return_parameters + + async def send_sync_command_raw( self, command: hci.HCI_SyncCommand[_RP], - check_status: bool = True, response_timeout: float | None = None, - ) -> _RP: + ) -> hci.HCI_Command_Complete_Event[_RP]: response = await self._send_command(command, response_timeout) # Check that the response is of the expected type assert isinstance(response, hci.HCI_Command_Complete_Event) - return_parameters: _RP = response.return_parameters - assert isinstance(return_parameters, command.return_parameters_class) - - # Check the return parameters if required - if check_status: - if isinstance(return_parameters, hci.HCI_StatusReturnParameters): - status = return_parameters.status - if status != hci.HCI_SUCCESS: - logger.warning( - f'{command.name} failed ' - f'({hci.HCI_Constant.error_name(status)})' - ) - raise hci.HCI_Error(status) - return return_parameters + return response async def send_async_command( self, @@ -1001,6 +1028,8 @@ def on_command_processed( self.pending_response.set_result(event) else: logger.warning('!!! no pending response future to set') + if event.num_hci_command_packets and self.command_semaphore.locked(): + self.command_semaphore.release() ############################################################ # HCI handlers @@ -1012,7 +1041,13 @@ def on_hci_command_complete_event(self, event: hci.HCI_Command_Complete_Event): if event.command_opcode == 0: # This is used just for the Num_HCI_Command_Packets field, not related to # an actual command - logger.debug('no-command event') + logger.debug('no-command event for flow control') + + # Release the command semaphore if needed + if event.num_hci_command_packets and self.command_semaphore.locked(): + logger.debug('command complete event releasing semaphore') + self.command_semaphore.release() + return return self.on_command_processed(event) diff --git a/bumble/transport/android_netsim.py b/bumble/transport/android_netsim.py index 904c5767..e53bd602 100644 --- a/bumble/transport/android_netsim.py +++ b/bumble/transport/android_netsim.py @@ -194,7 +194,7 @@ async def pump_loop(self): # We only accept BLUETOOTH if request.initial_info.chip.kind != ChipKind.BLUETOOTH: - logger.warning('Unsupported chip type') + logger.debug('Request for unsupported chip type') error = PacketResponse(error='Unsupported chip type') await self.context.write(error) # return diff --git a/docs/mkdocs/src/platforms/zephyr.md b/docs/mkdocs/src/platforms/zephyr.md index dea70644..f003d6f1 100644 --- a/docs/mkdocs/src/platforms/zephyr.md +++ b/docs/mkdocs/src/platforms/zephyr.md @@ -42,8 +42,7 @@ response = await host.send_sync_command( handle_type=HCI_Write_Tx_Power_Level_Command.TX_POWER_HANDLE_TYPE_ADV, connection_handle=0, tx_power_level=-4, - ), - check_status=False + ) ) if response.status == HCI_SUCCESS: diff --git a/tests/hci_test.py b/tests/hci_test.py index 6014a765..1ff45169 100644 --- a/tests/hci_test.py +++ b/tests/hci_test.py @@ -218,9 +218,9 @@ def test_return_parameters() -> None: assert isinstance(params.status, utils.OpenIntEnum) params = hci.HCI_Read_BD_ADDR_Command.parse_return_parameters( - bytes.fromhex('3C001122334455') + bytes.fromhex('00001122334455') ) - assert params.status == hci.HCI_ErrorCode.ADVERTISING_TIMEOUT_ERROR + assert params.status == hci.HCI_ErrorCode.SUCCESS assert isinstance(params.status, utils.OpenIntEnum) assert isinstance(params.bd_addr, hci.Address) diff --git a/tests/host_test.py b/tests/host_test.py index 895e4ada..3926232f 100644 --- a/tests/host_test.py +++ b/tests/host_test.py @@ -26,9 +26,11 @@ from bumble.hci import ( HCI_AclDataPacket, HCI_Command_Complete_Event, + HCI_Disconnect_Command, HCI_Error, HCI_ErrorCode, HCI_Event, + HCI_GenericReturnParameters, HCI_Reset_Command, HCI_StatusReturnParameters, ) @@ -195,6 +197,7 @@ async def test_send_sync_command() -> None: ) host = Host(source, sink) + host.ready = True # Sync command with success response1 = await host.send_sync_command(HCI_Reset_Command()) @@ -212,6 +215,17 @@ async def test_send_sync_command() -> None: assert excinfo.value.error_code == error_response.return_parameters.status - # Sync command with error status should not raise when `check_status` is False - response2 = await host.send_sync_command(HCI_Reset_Command(), check_status=False) - assert response2.status == HCI_ErrorCode.COMMAND_DISALLOWED_ERROR + # Sync command with raw result + response2 = await host.send_sync_command_raw(HCI_Reset_Command()) + assert response2.return_parameters.status == HCI_ErrorCode.COMMAND_DISALLOWED_ERROR + + # Sync command with a command that's not an HCI_SyncCommand + # (here, for convenience, we use an HCI_AsyncCommand instance) + command = HCI_Disconnect_Command(connection_handle=0x1234, reason=0x13) + sink.response = HCI_Command_Complete_Event( + 1, + command.op_code, + HCI_GenericReturnParameters(data=bytes.fromhex("00112233")), + ) + response3 = await host.send_sync_command_raw(command) # type: ignore + assert isinstance(response3.return_parameters, HCI_GenericReturnParameters)