diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 715661e3..83342f1b 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -10,10 +10,10 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Set up Python 3.7 + - name: Set up Python 3.9 uses: actions/setup-python@v2 with: - python-version: '3.7' + python-version: '3.9' - name: Install dependencies run: | diff --git a/.github/workflows/testlint.yml b/.github/workflows/testlint.yml index ddb6505e..0e61a748 100644 --- a/.github/workflows/testlint.yml +++ b/.github/workflows/testlint.yml @@ -8,7 +8,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.7] + python-version: [3.9] steps: - uses: actions/checkout@v2 @@ -20,6 +20,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip + pip install -r requirements-test.txt pip install -r requirements.txt - name: Lint with flake8 diff --git a/CHANGELOG.md b/CHANGELOG.md index 579a1808..5e53c303 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # What's New in Hologram Python SDK +## v0.10.0 +2023-09-05 Hologram +* targets python version 3.9 +* Allow setting a specific modem when initializing a network. This can be done by passing the modem into the `HologramCloud` intializing method for example: `HologramCloud({}, authentication_type='totp', network='cellular', modem=modem)`. Initialize a modem using one of the following methods: + 1. Initialize a modem object with a known good port using a supported modem class in `Hologram.Network.Modem` for example: `EC21(device_name="/dev/ttyUSB4")` This initializes a Quectel EC21 on port `/dev/ttyUSB4` + 2. Scan for all available modems through the new `Cellular.scan_for_all_usable_modems()` method at `Hologram.Network.Cellular`. This returns a list of accessible intialized modem objects. Just pass one of these in as a modem. +* Allow modems to send SMS messages through the modem interface. For example: `hologram.network.modem.send_sms_message("+80112", "Hi dashboard!")`. *Note: Extra charges for sending SMS with this method may apply* + ## v0.9.1 2021-04-30 Hologram includes the following bug fixes diff --git a/Hologram/Cloud.py b/Hologram/Cloud.py index 7fe82fcc..7ee80e11 100644 --- a/Hologram/Cloud.py +++ b/Hologram/Cloud.py @@ -10,6 +10,8 @@ import logging from logging import NullHandler from Hologram.Event import Event +from typing import Union +from Hologram.Network.Modem.Modem import Modem from Hologram.Network import NetworkManager from Hologram.Authentication import * @@ -21,7 +23,7 @@ def __repr__(self): return type(self).__name__ def __init__(self, credentials, send_host = '', send_port = 0, - receive_host = '', receive_port = 0, network = ''): + receive_host = '', receive_port = 0, network = '', modem: Union[None, Modem] = None): # Logging setup. self.logger = logging.getLogger(__name__) @@ -33,7 +35,7 @@ def __init__(self, credentials, send_host = '', send_port = 0, self.__initialize_host_and_port(send_host, send_port, receive_host, receive_port) - self.initializeNetwork(network) + self.initializeNetwork(network, modem) def __initialize_host_and_port(self, send_host, send_port, receive_host, receive_port): self.send_host = send_host @@ -41,13 +43,13 @@ def __initialize_host_and_port(self, send_host, send_port, receive_host, receive self.receive_host = receive_host self.receive_port = receive_port - def initializeNetwork(self, network): + def initializeNetwork(self, network, modem): self.event = Event() self.__message_buffer = [] # Network Configuration - self._networkManager = NetworkManager.NetworkManager(self.event, network) + self._networkManager = NetworkManager.NetworkManager(self.event, network, modem=modem) # This registers the message buffering feature based on network availability. self.event.subscribe('network.connected', self.__clear_payload_buffer) diff --git a/Hologram/CustomCloud.py b/Hologram/CustomCloud.py index 9cd3e269..a3b7fb9b 100644 --- a/Hologram/CustomCloud.py +++ b/Hologram/CustomCloud.py @@ -12,6 +12,8 @@ import sys import threading import time +from typing import Union +from Hologram.Network.Modem.Modem import Modem from Hologram.Cloud import Cloud from Exceptions.HologramError import HologramError @@ -25,14 +27,15 @@ class CustomCloud(Cloud): def __init__(self, credentials, send_host='', send_port=0, receive_host='', receive_port=0, enable_inbound=False, - network=''): + network='', modem: Union[None, Modem] = None): super().__init__(credentials, send_host=send_host, send_port=send_port, receive_host=receive_host, receive_port=receive_port, - network=network) + network=network, + modem=modem) # Enforce that the send and receive configs are set before using the class. if enable_inbound and (receive_host == '' or receive_port == 0): diff --git a/Hologram/HologramCloud.py b/Hologram/HologramCloud.py index 6c730363..b8f9f0cf 100755 --- a/Hologram/HologramCloud.py +++ b/Hologram/HologramCloud.py @@ -11,10 +11,12 @@ import binascii import json import sys +from typing import Union +from Hologram.Network.Modem.Modem import Modem from Hologram.CustomCloud import CustomCloud from HologramAuth import TOTPAuthentication, SIMOTPAuthentication from Hologram.Authentication import CSRPSKAuthentication -from Exceptions.HologramError import HologramError +from Exceptions.HologramError import HologramError, AuthenticationError DEFAULT_SEND_MESSAGE_TIMEOUT = 5 HOLOGRAM_HOST_SEND = 'cloudsocket.hologram.io' @@ -60,14 +62,16 @@ class HologramCloud(CustomCloud): } def __init__(self, credentials, enable_inbound=False, network='', - authentication_type='totp'): + authentication_type='totp', modem: Union[None, Modem] = None): super().__init__(credentials, send_host=HOLOGRAM_HOST_SEND, send_port=HOLOGRAM_PORT_SEND, receive_host=HOLOGRAM_HOST_RECEIVE, receive_port=HOLOGRAM_PORT_RECEIVE, enable_inbound=enable_inbound, - network=network) + network=network, + modem=modem + ) self.setAuthenticationType(credentials, authentication_type=authentication_type) @@ -125,6 +129,7 @@ def __populate_totp_credentials(self): self.authentication.credentials['private_key'] = self.network.imsi except Exception as e: self.logger.error('Unable to fetch device id or private key') + raise AuthenticationError('Unable to fetch device id or private key for TOTP authenication') def __populate_sim_otp_credentials(self): nonce = self.request_hex_nonce() diff --git a/Hologram/Network/Cellular.py b/Hologram/Network/Cellular.py index 928c69ff..b9c1d96e 100644 --- a/Hologram/Network/Cellular.py +++ b/Hologram/Network/Cellular.py @@ -11,9 +11,10 @@ from Hologram.Event import Event from Exceptions.HologramError import NetworkError from Hologram.Network.Route import Route -from Hologram.Network.Modem import Modem, E303, MS2131, E372, BG96, Nova_U201, NovaM, DriverLoader +from Hologram.Network.Modem import Modem, E303, MS2131, E372, BG96, EC21, Nova_U201, NovaM, DriverLoader from Hologram.Network import Network, NetworkScope import time +from typing import Union from serial.tools import list_ports # Cellular return codes. @@ -32,6 +33,7 @@ class Cellular(Network): 'ms2131': MS2131.MS2131, 'e372': E372.E372, 'bg96': BG96.BG96, + 'ec21': EC21.EC21, 'nova': Nova_U201.Nova_U201, 'novam': NovaM.NovaM, '': Modem @@ -47,10 +49,10 @@ def __init__(self, event=Event()): def autodetect_modem(self): # scan for a modem and set it if found - dev_devices = self._scan_for_modems() - if dev_devices is None: + first_modem_handler = Cellular._scan_and_select_first_supported_modem() + if first_modem_handler is None: raise NetworkError('Modem not detected') - self.modem = dev_devices[0] + self.modem = first_modem_handler(event=self.event) def load_modem_drivers(self): self._load_modem_drivers() @@ -207,27 +209,45 @@ def _load_modem_drivers(self): dl.force_driver_for_device(syspath, vid_pid[0], vid_pid[1]) - - def _scan_for_modems(self): - res = None - for (modemName, modemHandler) in self._modemHandlers.items(): - if self._scan_for_modem(modemHandler): - res = (modemName, modemHandler) - break - return res + @staticmethod + def _scan_and_select_first_supported_modem() -> Union[Modem, None]: + for (_, modemHandler) in Cellular._modemHandlers.items(): + modem_exists = Cellular._does_modem_exist_for_handler(modemHandler) + if modem_exists: + return modemHandler + return None - def _scan_for_modem(self, modemHandler): + @staticmethod + def _does_modem_exist_for_handler(modemHandler): usb_ids = modemHandler.usb_ids for vid_pid in usb_ids: if not vid_pid: continue - self.logger.debug('checking for vid_pid: %s', str(vid_pid)) - for dev in list_ports.grep("{0}:{1}".format(vid_pid[0], vid_pid[1])): - self.logger.info('Detected modem %s', modemHandler.__name__) + for _ in list_ports.grep("{0}:{1}".format(vid_pid[0], vid_pid[1])): return True return False + @staticmethod + def scan_for_all_usable_modems() -> list[Modem]: + modems = [] + unique_imeis = set() + for (_, modemHandler) in Cellular._modemHandlers.items(): + modem_exists = Cellular._does_modem_exist_for_handler(modemHandler) + if modem_exists: + try: + test_handler = modemHandler() + usable_ports = test_handler.detect_usable_serial_port(stop_on_first=False) + for port in usable_ports: + modem = modemHandler(device_name=port) + imei = modem.imei + if imei not in unique_imeis: + unique_imeis.add(imei) + modems.append(modem) + except Exception: + # Any exception already logged up the chain + pass + return modems @@ -236,11 +256,8 @@ def modem(self): return self._modem @modem.setter - def modem(self, modem): - if modem not in self._modemHandlers: - raise NetworkError('Invalid modem type: %s' % modem) - else: - self._modem = self._modemHandlers[modem](event=self.event) + def modem(self, modem: Union[None, Modem] = None): + self._modem = modem @property def localIPAddress(self): diff --git a/Hologram/Network/Modem/BG96.py b/Hologram/Network/Modem/BG96.py index 23ec9bf9..fc1b4edf 100644 --- a/Hologram/Network/Modem/BG96.py +++ b/Hologram/Network/Modem/BG96.py @@ -7,182 +7,26 @@ # # LICENSE: Distributed under the terms of the MIT License # -import binascii -import time -from serial.serialutil import Timeout - -from Hologram.Network.Modem import Modem -from Hologram.Event import Event +from Hologram.Network.Modem.Quectel import Quectel from UtilClasses import ModemResult -from Exceptions.HologramError import SerialError, NetworkError DEFAULT_BG96_TIMEOUT = 200 -class BG96(Modem): +class BG96(Quectel): usb_ids = [('2c7c', '0296')] - - def __init__(self, device_name=None, baud_rate='9600', - chatscript_file=None, event=Event()): - - super().__init__(device_name=device_name, baud_rate=baud_rate, - chatscript_file=chatscript_file, event=event) - self._at_sockets_available = True - self.urc_response = '' - + def connect(self, timeout=DEFAULT_BG96_TIMEOUT): + return super().connect(timeout) - success = super().connect(timeout) - - # put serial mode on other port - # if success is True: - # # detect another open serial port to use for PPP - # devices = self.detect_usable_serial_port() - # if not devices: - # raise SerialError('Not enough serial ports detected for Nova') - # self.logger.debug('Moving connection to port %s', devices[0]) - # self.device_name = devices[0] - # super().initialize_serial_interface() - - return success - - def send_message(self, data, timeout=Modem.DEFAULT_SEND_TIMEOUT): - # Waiting for the open socket urc - while self.urc_state != Modem.SOCKET_WRITE_STATE: - self.checkURC() - - self.write_socket(data) - - loop_timeout = Timeout(timeout) - while self.urc_state != Modem.SOCKET_SEND_READ: - self.checkURC() - if self.urc_state != Modem.SOCKET_SEND_READ: - if loop_timeout.expired(): - raise SerialError('Timeout occurred waiting for message status') - time.sleep(self._RETRY_DELAY) - elif self.urc_state == Modem.SOCKET_CLOSED: - return '[1,0]' #this is connection closed for hologram cloud response - - return self.urc_response - - def create_socket(self): - self._set_up_pdp_context() - - def connect_socket(self, host, port): - self.command('+QIOPEN', '1,0,\"TCP\",\"%s\",%d,0,1' % (host, port)) - # According to the BG96 Docs - # Have to wait for URC response “+QIOPEN: ,” - - def close_socket(self, socket_identifier=None): - ok, _ = self.command('+QICLOSE', self.socket_identifier) - if ok != ModemResult.OK: - self.logger.error('Failed to close socket') - self.urc_state = Modem.SOCKET_CLOSED - - def write_socket(self, data): - hexdata = binascii.hexlify(data) - # We have to do it in chunks of 510 since 512 is actually too long (CMEE error) - # and we need 2n chars for hexified data - for chunk in self._chunks(hexdata, 510): - value = '%d,\"%s\"' % (self.socket_identifier, chunk.decode()) - ok, _ = self.set('+QISENDEX', value, timeout=10) - if ok != ModemResult.OK: - self.logger.error('Failed to write to socket') - raise NetworkError('Failed to write to socket') - - def read_socket(self, socket_identifier=None, payload_length=None): - - if socket_identifier is None: - socket_identifier = self.socket_identifier - - if payload_length is None: - payload_length = self.last_read_payload_length - - ok, resp = self.set('+QIRD', '%d,%d' % (socket_identifier, payload_length)) - if ok == ModemResult.OK: - resp = resp.lstrip('+QIRD: ') - if resp is not None: - resp = resp.strip('"') - try: - resp = resp.decode() - except: - # This is some sort of binary data that can't be decoded so just - # return the bytes. We might want to make this happen via parameter - # in the future so it is more deterministic - self.logger.debug('Could not decode recieved data') - - return resp - - def is_registered(self): - return self.check_registered('+CREG') or self.check_registered('+CGREG') - - # EFFECTS: Handles URC related AT command responses. - def handleURC(self, urc): - if urc.startswith('+QIOPEN: '): - response_list = urc.lstrip('+QIOPEN: ').split(',') - socket_identifier = int(response_list[0]) - err = int(response_list[-1]) - if err == 0: - self.urc_state = Modem.SOCKET_WRITE_STATE - self.socket_identifier = socket_identifier - else: - self.logger.error('Failed to open socket') - raise NetworkError('Failed to open socket') - return - if urc.startswith('+QIURC: '): - response_list = urc.lstrip('+QIURC: ').split(',') - urctype = response_list[0] - if urctype == '\"recv\"': - self.urc_state = Modem.SOCKET_SEND_READ - self.socket_identifier = int(response_list[1]) - self.last_read_payload_length = int(response_list[2]) - self.urc_response = self._readline_from_serial_port(5) - if urctype == '\"closed\"': - self.urc_state = Modem.SOCKET_CLOSED - self.socket_identifier = int(response_list[-1]) - return - super().handleURC(urc) - - def _is_pdp_context_active(self): - if not self.is_registered(): - return False - - ok, r = self.command('+QIACT?') - if ok == ModemResult.OK: - try: - pdpstatus = int(r.lstrip('+QIACT: ').split(',')[1]) - # 1: PDP active - return pdpstatus == 1 - except (IndexError, ValueError) as e: - self.logger.error(repr(e)) - except AttributeError as e: - self.logger.error(repr(e)) - return False - - def init_serial_commands(self): - self.command("E0") #echo off - self.command("+CMEE", "2") #set verbose error codes - self.command("+CPIN?") - self.set_timezone_configs() - #self.command("+CPIN", "") #set SIM PIN - self.command("+CPMS", "\"ME\",\"ME\",\"ME\"") - self.set_sms_configs() - self.set_network_registration_status() - - def set_network_registration_status(self): - self.command("+CREG", "2") - self.command("+CGREG", "2") - - def _set_up_pdp_context(self): - if self._is_pdp_context_active(): return True - self.logger.info('Setting up PDP context') - self.set('+QICSGP', f'1,1,\"{self._apn}\",\"\",\"\",1') - ok, _ = self.set('+QIACT', '1', timeout=30) + def _tear_down_pdp_context(self): + if not self._is_pdp_context_active(): return True + self.logger.info('Tearing down PDP context') + ok, _ = self.set('+QIACT', '0', timeout=30) if ok != ModemResult.OK: - self.logger.error('PDP Context setup failed') - raise NetworkError('Failed PDP context setup') + self.logger.error('PDP Context tear down failed') else: - self.logger.info('PDP context active') + self.logger.info('PDP context deactivated') @property def description(self): diff --git a/Hologram/Network/Modem/E303.py b/Hologram/Network/Modem/E303.py index 2f066e9f..48e7f458 100644 --- a/Hologram/Network/Modem/E303.py +++ b/Hologram/Network/Modem/E303.py @@ -25,15 +25,7 @@ def __init__(self, device_name=None, baud_rate='9600', def connect(self, timeout = DEFAULT_E303_TIMEOUT): return super().connect(timeout) - def init_serial_commands(self): - self.command("E0") #echo off - self.command("+CMEE", "2") #set verbose error codes - self.command("+CPIN?") - self.command("+CTZU", "1") #time/zone sync - self.command("+CTZR", "1") #time/zone URC - #self.command("+CPIN", "") #set SIM PIN - self.command("+CPMS", "\"ME\",\"ME\",\"ME\"") - self.set_sms_configs() + def set_network_registration_status(self): self.command("+CREG", "2") self.command("+CGREG", "2") diff --git a/Hologram/Network/Modem/E372.py b/Hologram/Network/Modem/E372.py index cfed7a31..195df12d 100644 --- a/Hologram/Network/Modem/E372.py +++ b/Hologram/Network/Modem/E372.py @@ -25,15 +25,7 @@ def __init__(self, device_name=None, baud_rate='9600', def connect(self, timeout = DEFAULT_E372_TIMEOUT): return super().connect(timeout) - def init_serial_commands(self): - self.command("E0") #echo off - self.command("+CMEE", "2") #set verbose error codes - self.command("+CPIN?") - self.command("+CTZU", "1") #time/zone sync - self.command("+CTZR", "1") #time/zone URC - #self.command("+CPIN", "") #set SIM PIN - self.command("+CPMS", "\"ME\",\"ME\",\"ME\"") - self.set_sms_configs() + def set_network_registration_status(self): self.command("+CREG", "2") self.command("+CGREG", "2") diff --git a/Hologram/Network/Modem/EC21.py b/Hologram/Network/Modem/EC21.py new file mode 100644 index 00000000..45963636 --- /dev/null +++ b/Hologram/Network/Modem/EC21.py @@ -0,0 +1,34 @@ +# EC21.py - Hologram Python SDK Quectel EC21 modem interface +# +# Author: Hologram +# +# Copyright 2016 - Hologram (Konekt, Inc.) +# +# +# LICENSE: Distributed under the terms of the MIT License +# + +from Hologram.Network.Modem.Quectel import Quectel +from UtilClasses import ModemResult + +DEFAULT_EC21_TIMEOUT = 200 + +class EC21(Quectel): + usb_ids = [('2c7c', '0121')] + + def connect(self, timeout=DEFAULT_EC21_TIMEOUT): + success = super().connect(timeout) + return success + + def _tear_down_pdp_context(self): + if not self._is_pdp_context_active(): return True + self.logger.info('Tearing down PDP context') + ok, _ = self.set('+QIDEACT', '1', timeout=30) + if ok != ModemResult.OK: + self.logger.error('PDP Context tear down failed') + else: + self.logger.info('PDP context deactivated') + + @property + def description(self): + return 'Quectel EC21' diff --git a/Hologram/Network/Modem/MS2131.py b/Hologram/Network/Modem/MS2131.py index ecc80e28..a738f314 100644 --- a/Hologram/Network/Modem/MS2131.py +++ b/Hologram/Network/Modem/MS2131.py @@ -26,15 +26,7 @@ def __init__(self, device_name=None, baud_rate='9600', def connect(self, timeout = DEFAULT_MS2131_TIMEOUT): return super().connect(timeout) - def init_serial_commands(self): - self.command("E0") #echo off - self.command("+CMEE", "2") #set verbose error codes - self.command("+CPIN?") - self.command("+CTZU", "1") #time/zone sync - self.command("+CTZR", "1") #time/zone URC - #self.command("+CPIN", "") #set SIM PIN - self.command("+CPMS", "\"ME\",\"ME\",\"ME\"") - self.set_sms_configs() + def set_network_registration_status(self): self.command("+CREG", "2") self.command("+CGREG", "2") diff --git a/Hologram/Network/Modem/Modem.py b/Hologram/Network/Modem/Modem.py index a116bd8a..43a194ee 100644 --- a/Hologram/Network/Modem/Modem.py +++ b/Hologram/Network/Modem/Modem.py @@ -13,13 +13,12 @@ from UtilClasses import ModemResult from UtilClasses import SMS from Hologram.Event import Event -from Exceptions.HologramError import SerialError, HologramError, NetworkError, PPPError +from Exceptions.HologramError import SerialError, NetworkError, PPPError from collections import deque import binascii import datetime -import logging import os import serial from serial.tools import list_ports @@ -35,6 +34,7 @@ class Modem(IModem): DEFAULT_SERIAL_TIMEOUT = 1 DEFAULT_SERIAL_RETRIES = 0 DEFAULT_SEND_TIMEOUT = 10 + DEFAULT_PDP_CONTEXT = 1 _RETRY_DELAY = 0.05 # 50 millisecond delay to avoid spinning loops @@ -57,8 +57,9 @@ class Modem(IModem): 0x2F: u'\\', } - def __init__(self, device_name=None, baud_rate='9600', - chatscript_file=None, event=Event()): + # The device_name is the same as the serial port, only provide a device_name if you dont want it to be autodectected + def __init__(self, device_name=None, baud_rate='9600', chatscript_file=None, + event=Event(), apn='hologram', pdp_context=1): super().__init__(device_name=device_name, baud_rate=baud_rate, event=event) @@ -74,7 +75,8 @@ def __init__(self, device_name=None, baud_rate='9600', self.result = ModemResult.OK self.debug_out = '' self.in_ext = False - self._apn = 'hologram' + self._apn = apn + self._pdp_context = pdp_context self._initialize_device_name(device_name) @@ -199,20 +201,23 @@ def __detect_all_serial_ports(self, stop_on_first=False, include_all_ports=True) # since our usable serial devices usually start at 0. udevices = [x for x in list_ports.grep("{0}:{1}".format(vid, pid))] for udevice in reversed(udevices): - if include_all_ports == False: - self.logger.debug('checking port %s', udevice.name) - port_opened = self.openSerialPort(udevice.device) - if not port_opened: - continue - - res = self.command('', timeout=1) - if res[0] != ModemResult.OK: - continue - self.logger.info('found working port at %s', udevice.name) - - device_names.append(udevice.device) - if stop_on_first: - break + try: + if include_all_ports == False: + self.logger.debug('checking port %s', udevice.name) + port_opened = self.openSerialPort(udevice.device) + if not port_opened: + continue + + res = self.command('', timeout=1) + if res[0] != ModemResult.OK: + continue + self.logger.info('found working port at %s', udevice.name) + + device_names.append(udevice.device) + if stop_on_first: + break + except Exception as e: + self.logger.warning(f"Error attempting to connect to serial port: {e}") if stop_on_first and device_names: break return device_names @@ -226,7 +231,13 @@ def initialize_serial_interface(self): self.init_serial_commands() def init_serial_commands(self): - pass + self.command("E0") #echo off + self.command("+CMEE", "2") #set verbose error codes + self.command("+CPIN?") + self.set_timezone_configs() + self.command("+CPMS", "\"ME\",\"ME\",\"ME\"") + self.set_sms_configs() + self.set_network_registration_status() def set_sms_configs(self): self.command("+CMGF", "0") #SMS PDU format @@ -265,6 +276,23 @@ def send_message(self, data, timeout=DEFAULT_SEND_TIMEOUT): return self.read_socket() + def send_sms_message(self, phonenumber, message, timeout=DEFAULT_SEND_TIMEOUT): + self.command("+CMGF", "1") + + ctrl_z = chr(26).encode('utf-8') + ok, r = self.command( + "+CMGS", + f"\"{phonenumber}\"", + prompt=b">", + data=f"{message}\r", + commit_cmd=ctrl_z, + timeout=timeout + ) + + self.command("+CMGF", "0") + return ok == ModemResult.OK + + def pop_received_message(self): self.checkURC() data = None @@ -491,7 +519,7 @@ def _command_result(self): def __command_helper(self, cmd='', value=None, expected=None, timeout=None, retries=DEFAULT_SERIAL_RETRIES, seteq=False, read=False, - prompt=None, data=None, hide=False): + prompt=None, data=None, hide=False, commit_cmd=None): self.result = ModemResult.Timeout if cmd.endswith('?'): @@ -522,6 +550,8 @@ def __command_helper(self, cmd='', value=None, expected=None, timeout=None, if prompt in p: time.sleep(1) self._write_to_serial_port_and_flush(data) + if commit_cmd: + self.debugwrite(commit_cmd, hide=True) self.result = self.process_response(cmd, timeout, hide=hide) if self.result == ModemResult.OK: @@ -712,6 +742,10 @@ def _is_pdp_context_active(self): def _set_up_pdp_context(self): if self._is_pdp_context_active(): return True self.logger.info('Setting up PDP context') + + if self._pdp_context != Modem.DEFAULT_PDP_CONTEXT: + self.set('+UPSD', f'0,100,{self._pdp_context}') + self.set('+UPSD', f'0,1,\"{self._apn}\"') self.set('+UPSD', '0,7,\"0.0.0.0\"') ok, _ = self.set('+UPSDA', '0,3', timeout=30) @@ -721,6 +755,15 @@ def _set_up_pdp_context(self): else: self.logger.info('PDP context active') + def _tear_down_pdp_context(self): + if not self._is_pdp_context_active(): return True + self.logger.info('Tearing down PDP context') + ok, _ = self.set('+UPSDA', '0,4', timeout=30) + if ok != ModemResult.OK: + self.logger.error('PDP Context tear down failed') + else: + self.logger.info('PDP context deactivated') + def __enforce_serial_port_open(self): if not (self.serial_port and self.serial_port.isOpen()): @@ -770,10 +813,10 @@ def _basic_set(self, cmd, value, strip_val=True): def command(self, cmd='', value=None, expected=None, timeout=None, retries=DEFAULT_SERIAL_RETRIES, seteq=False, read=False, - prompt=None, data=None, hide=False): + prompt=None, data=None, hide=False, commit_cmd=None): try: return self.__command_helper(cmd, value, expected, timeout, - retries, seteq, read, prompt, data, hide) + retries, seteq, read, prompt, data, hide, commit_cmd) except serial.serialutil.SerialTimeoutException as e: self.logger.debug('unable to write to port') self.result = ModemResult.Error @@ -829,6 +872,10 @@ def disable_hex_mode(self): def __set_hex_mode(self, enable_hex_mode): self.command('+UDCONF', '1,%d' % enable_hex_mode) + + @property + def details(self): + return f"{self.description} at port: {self.device_name}" @property def serial_port(self): @@ -856,14 +903,16 @@ def modem_id(self): @property def iccid(self): - return self._basic_command('+CCID') + return self._basic_command('+CCID').rstrip('F') @property def operator(self): - op = self._basic_set('+UDOPN','12') - if op is not None: - return op.strip('"') - return op + ret = self._basic_command('+COPS?') + if ret is not None: + parts = ret.split(',') + if len(parts) >= 3: + return parts[2].strip('"') + return None @property def location(self): @@ -929,4 +978,12 @@ def apn(self): @apn.setter def apn(self, apn): self._apn = apn - return self.set('+CGDCONT', f'1,"IP","{self._apn}"') + return self.set('+CGDCONT', f'{self._pdp_context},"IP","{self._apn}"') + + @property + def pdp_context(self): + return self._pdp_context + + @pdp_context.setter + def pdp_context(self, pdp_context): + self._pdp_context = pdp_context diff --git a/Hologram/Network/Modem/NovaM.py b/Hologram/Network/Modem/NovaM.py index b57c3a5e..65e6750d 100644 --- a/Hologram/Network/Modem/NovaM.py +++ b/Hologram/Network/Modem/NovaM.py @@ -33,30 +33,12 @@ def __init__(self, device_name=None, baud_rate='9600', else: self.is_r410 = True - - def init_serial_commands(self): - self.command("E0") #echo off - self.command("+CMEE", "2") #set verbose error codes - self.command("+CPIN?") - self.command("+CPMS", "\"ME\",\"ME\",\"ME\"") - self.set_sms_configs() - self.set_network_registration_status() - def set_network_registration_status(self): self.command("+CEREG", "2") def is_registered(self): return self.check_registered('+CEREG') - def close_socket(self, socket_identifier=None): - - if socket_identifier is None: - socket_identifier = self.socket_identifier - - ok, r = self.set('+USOCL', "%s" % socket_identifier, timeout=40) - if ok != ModemResult.OK: - self.logger.error('Failed to close socket') - @property def description(self): modemtype = '(R410)' if self.is_r410 else '(R404)' @@ -66,16 +48,6 @@ def description(self): def location(self): raise NotImplementedError('The R404 and R410 do not support Cell Locate at this time') - @property - def operator(self): - # R4 series doesn't have UDOPN so need to override - ret = self._basic_command('+COPS?') - parts = ret.split(',') - if len(parts) >= 3: - return parts[2].strip('"') - return None - - # same as Modem::connect_socket except with longer timeout def connect_socket(self, host, port): at_command_val = "%d,\"%s\",%s" % (self.socket_identifier, host, port) diff --git a/Hologram/Network/Modem/Nova_U201.py b/Hologram/Network/Modem/Nova_U201.py index ad21c049..d6a23d3c 100644 --- a/Hologram/Network/Modem/Nova_U201.py +++ b/Hologram/Network/Modem/Nova_U201.py @@ -50,6 +50,10 @@ def create_socket(self): self._set_up_pdp_context() super().create_socket() + def close_socket(self, socket_identifier=None): + super().close_socket(socket_identifier) + self._tear_down_pdp_context() + def is_registered(self): return self.check_registered('+CREG') or self.check_registered('+CGREG') @@ -67,16 +71,6 @@ def enforce_nova_modem_mode(self): self.device_name = devices[0] super().initialize_serial_interface() - def init_serial_commands(self): - self.command("E0") #echo off - self.command("+CMEE", "2") #set verbose error codes - self.command("+CPIN?") - self.set_timezone_configs() - #self.command("+CPIN", "") #set SIM PIN - self.command("+CPMS", "\"ME\",\"ME\",\"ME\"") - self.set_sms_configs() - self.set_network_registration_status() - def set_network_registration_status(self): self.command("+CREG", "2") self.command("+CGREG", "2") @@ -130,3 +124,10 @@ def location(self): @property def description(self): return 'Hologram Nova Global 3G/2G Cellular USB Modem (U201)' + + @property + def operator(self): + op = self._basic_set('+UDOPN','12') + if op is not None: + return op.strip('"') + return op \ No newline at end of file diff --git a/Hologram/Network/Modem/Quectel.py b/Hologram/Network/Modem/Quectel.py new file mode 100644 index 00000000..7b0f4c78 --- /dev/null +++ b/Hologram/Network/Modem/Quectel.py @@ -0,0 +1,161 @@ +# Quectel.py - Hologram Python SDK Quectel modem interface +# +# Author: Hologram +# +# Copyright 2016 - Hologram (Konekt, Inc.) +# +# +# LICENSE: Distributed under the terms of the MIT License +# +import time +import binascii + +from serial.serialutil import Timeout + +from Hologram.Network.Modem import Modem +from Hologram.Event import Event +from UtilClasses import ModemResult +from Exceptions.HologramError import SerialError, NetworkError + +class Quectel(Modem): + + def __init__(self, device_name=None, baud_rate='9600', chatscript_file=None, + event=Event(), apn='hologram', pdp_context=1): + + super().__init__(device_name=device_name, baud_rate=baud_rate, chatscript_file=chatscript_file, + event=event, apn=apn, pdp_context=pdp_context) + self._at_sockets_available = True + self.urc_response = '' + + def send_message(self, data, timeout=Modem.DEFAULT_SEND_TIMEOUT): + # Waiting for the open socket urc + while self.urc_state != Modem.SOCKET_WRITE_STATE: + self.checkURC() + + self.write_socket(data) + + loop_timeout = Timeout(timeout) + while self.urc_state != Modem.SOCKET_SEND_READ: + self.checkURC() + if self.urc_state != Modem.SOCKET_SEND_READ: + if loop_timeout.expired(): + raise SerialError('Timeout occurred waiting for message status') + time.sleep(self._RETRY_DELAY) + elif self.urc_state == Modem.SOCKET_CLOSED: + return '[1,0]' #this is connection closed for hologram cloud response + + return self.urc_response.rstrip('\r\n') + + def create_socket(self): + self._set_up_pdp_context() + + def connect_socket(self, host, port): + self.command('+QIOPEN', '1,0,\"TCP\",\"%s\",%d,0,1' % (host, port)) + # According to the Quectel Docs + # Have to wait for URC response “+QIOPEN: ,” + + def close_socket(self, socket_identifier=None): + ok, _ = self.command('+QICLOSE', self.socket_identifier) + if ok != ModemResult.OK: + self.logger.error('Failed to close socket') + self.urc_state = Modem.SOCKET_CLOSED + self._tear_down_pdp_context() + + def write_socket(self, data): + hexdata = binascii.hexlify(data) + # We have to do it in chunks of 510 since 512 is actually too long (CMEE error) + # and we need 2n chars for hexified data + for chunk in self._chunks(hexdata, 510): + value = '%d,\"%s\"' % (self.socket_identifier, chunk.decode()) + ok, _ = self.set('+QISENDEX', value, timeout=10) + if ok != ModemResult.OK: + self.logger.error('Failed to write to socket') + raise NetworkError('Failed to write to socket') + + def read_socket(self, socket_identifier=None, payload_length=None): + + if socket_identifier is None: + socket_identifier = self.socket_identifier + + if payload_length is None: + payload_length = self.last_read_payload_length + + ok, resp = self.set('+QIRD', '%d,%d' % (socket_identifier, payload_length)) + if ok == ModemResult.OK: + resp = resp.lstrip('+QIRD: ') + if resp is not None: + resp = resp.strip('"') + try: + resp = resp.decode() + except: + # This is some sort of binary data that can't be decoded so just + # return the bytes. We might want to make this happen via parameter + # in the future so it is more deterministic + self.logger.debug('Could not decode recieved data') + + return resp + + def listen_socket(self, port): + # No equivilent exists for quectel modems + pass + + def is_registered(self): + return self.check_registered('+CREG') or self.check_registered('+CEREG') + + # EFFECTS: Handles URC related AT command responses. + def handleURC(self, urc): + if urc.startswith('+QIOPEN: '): + response_list = urc.lstrip('+QIOPEN: ').split(',') + socket_identifier = int(response_list[0]) + err = int(response_list[-1]) + if err == 0: + self.urc_state = Modem.SOCKET_WRITE_STATE + self.socket_identifier = socket_identifier + else: + self.logger.error('Failed to open socket') + raise NetworkError('Failed to open socket') + return + if urc.startswith('+QIURC: '): + response_list = urc.lstrip('+QIURC: ').split(',') + urctype = response_list[0] + if urctype == '\"recv\"': + self.urc_state = Modem.SOCKET_SEND_READ + self.socket_identifier = int(response_list[1]) + self.last_read_payload_length = int(response_list[2]) + self.urc_response = self._readline_from_serial_port(5) + if urctype == '\"closed\"': + self.urc_state = Modem.SOCKET_CLOSED + self.socket_identifier = int(response_list[-1]) + return + super().handleURC(urc) + + def _is_pdp_context_active(self): + if not self.is_registered(): + return False + + ok, r = self.command('+QIACT?') + if ok == ModemResult.OK: + try: + pdpstatus = int(r.lstrip('+QIACT: ').split(',')[1]) + # 1: PDP active + return pdpstatus == 1 + except (IndexError, ValueError) as e: + self.logger.error(repr(e)) + except AttributeError as e: + self.logger.error(repr(e)) + return False + + def set_network_registration_status(self): + self.command("+CREG", "2") + self.command("+CEREG", "2") + + def _set_up_pdp_context(self): + if self._is_pdp_context_active(): return True + self.logger.info('Setting up PDP context') + self.set('+QICSGP', f'{self._pdp_context},1,\"{self._apn}\",\"\",\"\",1') + ok, _ = self.set('+QIACT', f'{self._pdp_context}', timeout=30) + if ok != ModemResult.OK: + self.logger.error('PDP Context setup failed') + raise NetworkError('Failed PDP context setup') + else: + self.logger.info('PDP context active') \ No newline at end of file diff --git a/Hologram/Network/Modem/__init__.py b/Hologram/Network/Modem/__init__.py index fe44d41c..1e795a17 100644 --- a/Hologram/Network/Modem/__init__.py +++ b/Hologram/Network/Modem/__init__.py @@ -1,4 +1,4 @@ -__all__ = ['Modem', 'MockModem', 'MS2131', 'Nova', 'E303', 'E372'] +__all__ = ['Modem', 'MockModem', 'MS2131', 'Nova', 'E303', 'E372', 'Quectel'] from .IModem import IModem from .Modem import Modem diff --git a/Hologram/Network/NetworkManager.py b/Hologram/Network/NetworkManager.py index e0866471..18f79818 100644 --- a/Hologram/Network/NetworkManager.py +++ b/Hologram/Network/NetworkManager.py @@ -10,6 +10,8 @@ # from Hologram.Network import Wifi, Ethernet, BLE, Cellular +from typing import Union +from Hologram.Network.Modem.Modem import Modem from Exceptions.HologramError import NetworkError import logging from logging import NullHandler @@ -26,7 +28,7 @@ class NetworkManager: 'ethernet' : Ethernet.Ethernet, } - def __init__(self, event, network): + def __init__(self, event, network_name, modem: Union[None, Modem] = None): # Logging setup. self.logger = logging.getLogger(__name__) @@ -34,7 +36,7 @@ def __init__(self, event, network): self.event = event self.networkActive = False - self.network = network + self.init_network(network_name, modem) # EFFECTS: Event handler function that sets the network disconnect flag. def networkDisconnected(self): @@ -50,8 +52,7 @@ def listAvailableInterfaces(self): def network(self): return self._network - @network.setter - def network(self, network, modem=None): + def init_network(self, network, modem: Union[None, Modem] = None): if not network: # non-network mode self.networkConnected() self._network = None diff --git a/README.md b/README.md index c97b8275..9933e326 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ in the spirit of bringing connectivity to your devices. ### Requirements: -You will need `ppp` and Python 3.7 installed on your system for the SDK to work. +You will need `ppp` and Python 3.9 installed on your system for the SDK to work. We wrote scripts to ease the installation process. diff --git a/install.sh b/install.sh index 191387ca..771a8e82 100755 --- a/install.sh +++ b/install.sh @@ -13,7 +13,7 @@ set -euo pipefail # This script will install the Hologram SDK and the necessary software dependencies # for it to work. -required_programs=('python3' 'pip3' 'ps' 'kill' 'libpython3.7-dev') +required_programs=('python3' 'pip3' 'ps' 'kill' 'libpython3.9-dev') OS='' # Check OS. @@ -61,8 +61,8 @@ function install_software() { } function check_python_version() { - if ! python3 -V | grep '3.[7-9].[0-9]' > /dev/null 2>&1; then - echo "An unsupported version of python 3 is installed. Must have python 3.7+ installed to use the Hologram SDK" + if ! python3 -V | grep -E '3.(9|1[012]).[0-9]' > /dev/null 2>&1; then + echo "An unsupported version of python 3 is installed. Must have python 3.9+ installed to use the Hologram SDK" exit 1 fi } @@ -124,7 +124,7 @@ do pause "Installing $program. Press [Enter] key to continue..."; install_software 'python3-pip' fi - if ! pip3 -V | grep '3.[7-9]' >/dev/null 2>&1; then + if ! pip3 -V | grep -E '3.(9|1[012])' >/dev/null 2>&1; then echo "pip3 is installed for an unsupported version of python." exit 1 fi diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 00000000..82800846 --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1 @@ +mock~=4.0.3 diff --git a/requirements.txt b/requirements.txt index f36e43a4..8229d290 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,8 @@ -hjson~=3.0.1 -mock~=3.0.5 pyroute2==0.5.* pyserial~=3.5 -python-pppd==1.0.3 -python-sdk-auth~=0.3.0 +python-pppd==1.0.4 +python-sdk-auth==0.4.0 pyudev~=0.22.0 -pyusb~=1.1.1 +pyusb~=1.2.1 psutil~=5.8.0 -requests~=2.25.1 +requests>=2.25.1 diff --git a/setup.py b/setup.py index 2d27cc9e..4367192e 100755 --- a/setup.py +++ b/setup.py @@ -33,6 +33,7 @@ url = 'https://github.com/hologram-io/hologram-python/', packages = find_packages(), include_package_data = True, + tests_require = open('requirements-test.txt').read().split(), install_requires = open('requirements.txt').read().split(), scripts = ['scripts/hologram'], license = 'MIT', @@ -45,7 +46,7 @@ 'Topic :: Internet', 'Topic :: Security :: Cryptography', 'Programming Language :: Python', - 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.9', ], **kw ) diff --git a/tests/API/test_API.py b/tests/API/test_API.py new file mode 100644 index 00000000..6d7c9896 --- /dev/null +++ b/tests/API/test_API.py @@ -0,0 +1,82 @@ +import sys +import pytest +from unittest.mock import Mock, patch +sys.path.append(".") +sys.path.append("..") +sys.path.append("../..") +from Hologram.Api import Api +from Exceptions.HologramError import ApiError + +class TestHologramAPI: + + def test_create_no_creds(self): + with pytest.raises(ApiError, match = 'Must specify valid HTTP authentication credentials'): + Api() + + def test_create_missing_password(self): + with pytest.raises(ApiError, match = 'Must specify valid HTTP authentication credentials'): + Api(username='user') + + def test_create_missing_username(self): + with pytest.raises(ApiError, match = 'Must specify valid HTTP authentication credentials'): + Api(password='password') + + @patch('requests.post') + def test_activate(self, r_post): + api = Api(apikey='123apikey') + + r_post.return_value = Mock(status_code=200) + r_post.return_value.json = Mock(return_value={"success": True, 'order_data': {}}) + + success, response = api.activateSIM('iccid') + + assert success == True + assert response == {} + + @patch('requests.post') + def test_activate_failed(self, r_post): + api = Api(apikey='123apikey') + + r_post.return_value = Mock(status_code=200) + r_post.return_value.json = Mock(return_value={"success": False, 'data': {'iccid': 'Activation failed'}}) + + success, response = api.activateSIM('iccid') + + assert success == False + assert response == 'Activation failed' + + @patch('requests.post') + def test_activate_bad_status_code(self, r_post): + api = Api(apikey='123apikey') + + r_post.return_value = Mock( + status_code=429, + text = 'Too many requests') + + success, response = api.activateSIM('iccid') + + assert success == False + assert response == 'Too many requests' + + @patch('requests.get') + def test_get_plans(self, r_post): + api = Api(apikey='123apikey') + + r_post.return_value.json = Mock(return_value={"success": True, 'data': {'id': 1, 'orgid': 1}}) + + success, response = api.getPlans() + + assert success == True + assert response == {'id': 1, 'orgid': 1} + + @patch('requests.get') + def test_get_sim_state(self, r_post): + api = Api(apikey='123apikey') + + r_post.return_value.json = Mock(return_value={"success": True, 'data': [{'state': 'LIVE'}]}) + + success, response = api.getSIMState('iccid') + + assert success == True + assert response == 'LIVE' + diff --git a/tests/MessageMode/test_HologramCloud.py b/tests/MessageMode/test_HologramCloud.py index e2d328d8..89345c05 100644 --- a/tests/MessageMode/test_HologramCloud.py +++ b/tests/MessageMode/test_HologramCloud.py @@ -13,19 +13,24 @@ sys.path.append("../..") from Hologram.Authentication import * from Hologram.HologramCloud import HologramCloud +from Exceptions.HologramError import AuthenticationError credentials = {'devicekey':'12345678'} class TestHologramCloud: def test_create(self): - hologram = HologramCloud(credentials, enable_inbound = False) + hologram = HologramCloud(credentials, authentication_type='csrpsk', enable_inbound = False) assert hologram.send_host == 'cloudsocket.hologram.io' assert hologram.send_port == 9999 assert hologram.receive_host == '0.0.0.0' assert hologram.receive_port == 4010 + def test_create_bad_totp_keys(self): + with pytest.raises(AuthenticationError, match = 'Unable to fetch device id or private key for TOTP authenication'): + HologramCloud(credentials, enable_inbound = False) + def test_invalid_sms_length(self): hologram = HologramCloud(credentials, authentication_type='csrpsk', enable_inbound = False) @@ -36,7 +41,7 @@ def test_invalid_sms_length(self): def test_get_result_string(self): - hologram = HologramCloud(credentials, enable_inbound = False) + hologram = HologramCloud(credentials, authentication_type='csrpsk', enable_inbound = False) assert hologram.getResultString(-1) == 'Unknown error' assert hologram.getResultString(0) == 'Message sent successfully' diff --git a/tests/Modem/test_BG96.py b/tests/Modem/test_BG96.py new file mode 100644 index 00000000..bc61a641 --- /dev/null +++ b/tests/Modem/test_BG96.py @@ -0,0 +1,96 @@ +# Author: Hologram +# +# Copyright 2017 - Hologram (Konekt, Inc.) +# +# LICENSE: Distributed under the terms of the MIT License +# +# test_BG96.py - This file implements unit tests for the BG96 modem interface. + +from unittest.mock import patch, call +import pytest +import sys + +from Hologram.Network.Modem.BG96 import BG96 +from UtilClasses import ModemResult + +sys.path.append(".") +sys.path.append("..") +sys.path.append("../..") + + +def mock_write(modem, message): + return True + + +def mock_read(modem): + return True + + +def mock_readline(modem, timeout=None, hide=False): + return "" + + +def mock_open_serial_port(modem, device_name=None): + return True + + +def mock_close_serial_port(modem): + return True + + +def mock_detect_usable_serial_port(modem, stop_on_first=True): + return "/dev/ttyUSB0" + + +@pytest.fixture +def no_serial_port(monkeypatch): + monkeypatch.setattr(BG96, "_read_from_serial_port", mock_read) + monkeypatch.setattr(BG96, "_readline_from_serial_port", mock_readline) + monkeypatch.setattr(BG96, "_write_to_serial_port_and_flush", mock_write) + monkeypatch.setattr(BG96, "openSerialPort", mock_open_serial_port) + monkeypatch.setattr(BG96, "closeSerialPort", mock_close_serial_port) + monkeypatch.setattr(BG96, "detect_usable_serial_port", mock_detect_usable_serial_port) + + +def test_init_BG96_no_args(no_serial_port): + modem = BG96() + assert modem.timeout == 1 + assert modem.socket_identifier == 0 + assert modem.chatscript_file.endswith("/chatscripts/default-script") + assert modem._at_sockets_available + + +@patch.object(BG96, "set") +@patch.object(BG96, "command") +@patch.object(BG96, "_is_pdp_context_active") +def test_close_socket(mock_pdp, mock_command, mock_set, no_serial_port): + modem = BG96() + modem.socket_identifier = 1 + mock_set.return_value = (ModemResult.OK, None) + mock_command.return_value = (ModemResult.OK, None) + mock_pdp.return_value = True + modem.close_socket() + mock_set.assert_called_with("+QIACT", "0", timeout=30) + mock_command.assert_called_with("+QICLOSE", 1) + +@patch.object(BG96, "set") +def test_set_up_pdp_context_default(mock_set, no_serial_port): + modem = BG96() + mock_set.return_value = (ModemResult.OK, None) + + modem._set_up_pdp_context() + + expected_calls = [call('+QICSGP', '1,1,\"hologram\",\"\",\"\",1'), + call('+QIACT', '1', timeout=30)] + mock_set.assert_has_calls(expected_calls, any_order=True) + +@patch.object(BG96, "set") +def test_set_up_pdp_context_custom_apn_and_pdp_context(mock_set, no_serial_port): + modem = BG96(apn='hologram2', pdp_context=3) + mock_set.return_value = (ModemResult.OK, None) + + modem._set_up_pdp_context() + + expected_calls = [call('+QICSGP', '3,1,\"hologram2\",\"\",\"\",1'), + call('+QIACT', '3', timeout=30)] + mock_set.assert_has_calls(expected_calls, any_order=True) diff --git a/tests/Modem/test_EC21.py b/tests/Modem/test_EC21.py new file mode 100644 index 00000000..b36e24e0 --- /dev/null +++ b/tests/Modem/test_EC21.py @@ -0,0 +1,74 @@ +# Author: Hologram +# +# Copyright 2017 - Hologram (Konekt, Inc.) +# +# LICENSE: Distributed under the terms of the MIT License +# +# test_EC21.py - This file implements unit tests for the EC21 modem interface. + +from unittest.mock import patch +import pytest +import sys + +from Hologram.Network.Modem.EC21 import EC21 +from UtilClasses import ModemResult + +sys.path.append(".") +sys.path.append("..") +sys.path.append("../..") + + +def mock_write(modem, message): + return True + + +def mock_read(modem): + return True + + +def mock_readline(modem, timeout=None, hide=False): + return "" + + +def mock_open_serial_port(modem, device_name=None): + return True + + +def mock_close_serial_port(modem): + return True + + +def mock_detect_usable_serial_port(modem, stop_on_first=True): + return "/dev/ttyUSB0" + + +@pytest.fixture +def no_serial_port(monkeypatch): + monkeypatch.setattr(EC21, "_read_from_serial_port", mock_read) + monkeypatch.setattr(EC21, "_readline_from_serial_port", mock_readline) + monkeypatch.setattr(EC21, "_write_to_serial_port_and_flush", mock_write) + monkeypatch.setattr(EC21, "openSerialPort", mock_open_serial_port) + monkeypatch.setattr(EC21, "closeSerialPort", mock_close_serial_port) + monkeypatch.setattr(EC21, "detect_usable_serial_port", mock_detect_usable_serial_port) + + +def test_init_EC21_no_args(no_serial_port): + modem = EC21() + assert modem.timeout == 1 + assert modem.socket_identifier == 0 + assert modem.chatscript_file.endswith("/chatscripts/default-script") + assert modem._at_sockets_available + + +@patch.object(EC21, "set") +@patch.object(EC21, "command") +@patch.object(EC21, "_is_pdp_context_active") +def test_close_socket(mock_pdp, mock_command, mock_set, no_serial_port): + modem = EC21() + modem.socket_identifier = 1 + mock_set.return_value = (ModemResult.OK, None) + mock_command.return_value = (ModemResult.OK, None) + mock_pdp.return_value = True + modem.close_socket() + mock_set.assert_called_with("+QIDEACT", "1", timeout=30) + mock_command.assert_called_with("+QICLOSE", 1) diff --git a/tests/Modem/test_Modem.py b/tests/Modem/test_Modem.py index cbf71f40..6c83b576 100644 --- a/tests/Modem/test_Modem.py +++ b/tests/Modem/test_Modem.py @@ -6,6 +6,7 @@ # # test_Modem.py - This file implements unit tests for the Modem class. +from unittest.mock import patch, call import pytest import sys from datetime import datetime @@ -23,6 +24,9 @@ def mock_write(modem, message): def mock_read(modem): return True +def mock_init_commands(modem): + return True + def mock_readline(modem, timeout=None, hide=False): return '' @@ -44,14 +48,19 @@ def mock_command_sms(modem, at_command): def mock_set_sms(modem, at_command, val): return None +def mock_inactive_pdp_context(modem): + return False + @pytest.fixture def no_serial_port(monkeypatch): monkeypatch.setattr(Modem, '_read_from_serial_port', mock_read) monkeypatch.setattr(Modem, '_readline_from_serial_port', mock_readline) monkeypatch.setattr(Modem, '_write_to_serial_port_and_flush', mock_write) + monkeypatch.setattr(Modem, 'init_serial_commands', mock_init_commands) monkeypatch.setattr(Modem, 'openSerialPort', mock_open_serial_port) monkeypatch.setattr(Modem, 'closeSerialPort', mock_close_serial_port) monkeypatch.setattr(Modem, 'detect_usable_serial_port', mock_detect_usable_serial_port) + monkeypatch.setattr(Modem, '_is_pdp_context_active', mock_inactive_pdp_context) @pytest.fixture def get_sms(monkeypatch): @@ -71,6 +80,8 @@ def test_init_modem_no_args(no_serial_port): assert(modem.chatscript_file.endswith('/chatscripts/default-script')) assert(modem._at_sockets_available == False) assert(modem.description == 'Modem') + assert(modem.apn == 'hologram') + assert(modem.pdp_context == 1) def test_init_modem_chatscriptfileoverride(no_serial_port): modem = Modem(chatscript_file='test-chatscript') @@ -78,6 +89,14 @@ def test_init_modem_chatscriptfileoverride(no_serial_port): assert(modem.socket_identifier == 0) assert(modem.chatscript_file == 'test-chatscript') +def test_init_modem_apn(no_serial_port): + modem = Modem(apn='hologram2') + assert(modem.apn == 'hologram2') + +def test_init_modem_pdp_context(no_serial_port): + modem = Modem(pdp_context=3) + assert(modem.pdp_context == 3) + def test_get_result_string(no_serial_port): modem = Modem() assert(modem.getResultString(0) == 'Modem returned OK') @@ -94,6 +113,31 @@ def test_get_location(no_serial_port): assert(modem.location == 'test location') assert('This modem does not support this property' in str(e)) +@patch.object(Modem, "set") +def test_set_up_pdp_context_default(mock_set, no_serial_port): + modem = Modem() + mock_set.return_value = (ModemResult.OK, None) + + modem._set_up_pdp_context() + + expected_calls = [call('+UPSD', '0,1,\"hologram\"'), + call('+UPSD', '0,7,\"0.0.0.0\"'), + call('+UPSDA', '0,3', timeout=30)] + mock_set.assert_has_calls(expected_calls, any_order=True) + +@patch.object(Modem, "set") +def test_set_up_pdp_context_custom_apn_and_pdp_context(mock_set, no_serial_port): + modem = Modem(apn='hologram2', pdp_context=3) + mock_set.return_value = (ModemResult.OK, None) + + modem._set_up_pdp_context() + + expected_calls = [call('+UPSD', '0,100,3'), + call('+UPSD', '0,1,\"hologram2\"'), + call('+UPSD', '0,7,\"0.0.0.0\"'), + call('+UPSDA', '0,3', timeout=30)] + mock_set.assert_has_calls(expected_calls, any_order=True) + # SMS def test_get_sms(no_serial_port, get_sms): diff --git a/tests/Modem/test_NovaM.py b/tests/Modem/test_NovaM.py index 41215325..bd1bb16d 100644 --- a/tests/Modem/test_NovaM.py +++ b/tests/Modem/test_NovaM.py @@ -73,7 +73,7 @@ def test_close_socket_no_args(mock_set, no_serial_port): mock_set.return_value = (0,0) mock_set.reset_mock() modem.close_socket() - mock_set.assert_called_once_with('+USOCL', '0', timeout=40) + mock_set.assert_called_once_with('+USOCL', '0') @patch.object(NovaM, 'set') def test_close_socket_with_socket_identifier(mock_set, no_serial_port): @@ -81,7 +81,7 @@ def test_close_socket_with_socket_identifier(mock_set, no_serial_port): mock_set.return_value = (0,0) mock_set.reset_mock() modem.close_socket(5) - mock_set.assert_called_once_with('+USOCL', '5', timeout=40) + mock_set.assert_called_once_with('+USOCL', '5') @patch.object(NovaM, 'command') def test_set_network_registration_status(mock_command, no_serial_port): diff --git a/tests/Modem/test_Quectel.py b/tests/Modem/test_Quectel.py new file mode 100644 index 00000000..d7e956dd --- /dev/null +++ b/tests/Modem/test_Quectel.py @@ -0,0 +1,150 @@ +# Author: Hologram +# +# Copyright 2017 - Hologram (Konekt, Inc.) +# +# LICENSE: Distributed under the terms of the MIT License +# +# test_Quectel.py - This file implements unit tests for the Quectel modem interface. + +from unittest.mock import patch, call +import pytest +import sys + +from Hologram.Network.Modem.Quectel import Quectel +from Hologram.Network.Modem.Modem import Modem +from UtilClasses import ModemResult + +sys.path.append(".") +sys.path.append("..") +sys.path.append("../..") + + +def mock_write(modem, message): + return True + + +def mock_read(modem): + return True + + +def mock_readline(modem, timeout=None, hide=False): + return "" + + +def mock_open_serial_port(modem, device_name=None): + return True + + +def mock_close_serial_port(modem): + return True + + +def mock_detect_usable_serial_port(modem, stop_on_first=True): + return "/dev/ttyUSB0" + + +@pytest.fixture +def no_serial_port(monkeypatch): + monkeypatch.setattr(Quectel, "_read_from_serial_port", mock_read) + monkeypatch.setattr(Quectel, "_readline_from_serial_port", mock_readline) + monkeypatch.setattr(Quectel, "_write_to_serial_port_and_flush", mock_write) + monkeypatch.setattr(Quectel, "openSerialPort", mock_open_serial_port) + monkeypatch.setattr(Quectel, "closeSerialPort", mock_close_serial_port) + monkeypatch.setattr(Quectel, "detect_usable_serial_port", mock_detect_usable_serial_port) + + +def test_init_Quectel_no_args(no_serial_port): + modem = Quectel() + assert modem.timeout == 1 + assert modem.socket_identifier == 0 + assert modem.chatscript_file.endswith("/chatscripts/default-script") + assert modem._at_sockets_available + +@patch.object(Quectel, "check_registered") +@patch.object(Quectel, "set") +@patch.object(Quectel, "command") +def test_create_socket(mock_command, mock_set, mock_check, no_serial_port): + modem = Quectel() + modem.apn = 'test' + mock_check.return_value = True + # The PDP context is not active + mock_command.return_value = (ModemResult.OK, '+QIACT: 0,0') + mock_set.return_value = (ModemResult.OK, None) + modem.create_socket() + mock_command.assert_called_with("+QIACT?") + mock_set.assert_has_calls( + [ + call("+QICSGP", '1,1,\"test\",\"\",\"\",1'), + call("+QIACT", '1', timeout=30) + ], + any_order=True + ) + +@patch.object(Quectel, "command") +def test_connect_socket(mock_command, no_serial_port): + modem = Quectel() + modem.socket_identifier = 1 + host = "hologram.io" + port = 9999 + modem.connect_socket(host, port) + mock_command.assert_called_with("+QIOPEN", '1,0,"TCP","%s",%d,0,1' % (host, port)) + + +@patch.object(Quectel, "set") +def test_write_socket_small(mock_command, no_serial_port): + modem = Quectel() + modem.socket_identifier = 1 + data = b"Message smaller than 510 bytes" + mock_command.return_value = (ModemResult.OK, None) + modem.write_socket(data) + mock_command.assert_called_with( + "+QISENDEX", + '1,"4d65737361676520736d616c6c6572207468616e20353130206279746573"', + timeout=10, + ) + + +@patch.object(Quectel, "set") +def test_write_socket_large(mock_command, no_serial_port): + modem = Quectel() + modem.socket_identifier = 1 + data = b"a" * 300 + mock_command.return_value = (ModemResult.OK, None) + modem.write_socket(data) + mock_command.assert_has_calls( + [ + call("+QISENDEX", '1,"%s"' % ("61" * 255), timeout=10), + call("+QISENDEX", '1,"%s"' % ("61" * 45), timeout=10), + ], + any_order=True, + ) + +@patch.object(Quectel, "set") +def test_read_socket(mock_command, no_serial_port): + modem = Quectel() + modem.socket_identifier = 1 + mock_command.return_value = (ModemResult.OK, '+QIRD: "Some val"') + # Double quotes should be stripped from the reutrn value + assert (modem.read_socket(payload_length=10) == 'Some val') + mock_command.assert_called_with("+QIRD", '1,10') + +def test_handle_open_urc(no_serial_port): + modem = Quectel() + modem.handleURC('+QIOPEN: 1,0') + assert modem.urc_state == Modem.SOCKET_WRITE_STATE + assert modem.socket_identifier == 1 + +def test_handle_received_data_urc(no_serial_port): + modem = Quectel() + modem.handleURC('+QIURC: \"recv\",1,25') + assert modem.urc_state == Modem.SOCKET_SEND_READ + assert modem.socket_identifier == 1 + assert modem.last_read_payload_length == 25 + assert modem.urc_response == "" + +def test_handle_socket_closed_urc(no_serial_port): + modem = Quectel() + modem.handleURC('+QIURC: \"closed\",1') + assert modem.urc_state == Modem.SOCKET_CLOSED + assert modem.socket_identifier == 1 + diff --git a/update.sh b/update.sh index 7b602f2b..23d4c5dd 100755 --- a/update.sh +++ b/update.sh @@ -10,7 +10,7 @@ set -euo pipefail -required_programs=('python3' 'pip3' 'ps' 'kill' 'libpython3.7-dev') +required_programs=('python3' 'pip3' 'ps' 'kill' 'libpython3.9-dev') OS='' # Check OS. @@ -58,8 +58,8 @@ function install_software() { } function check_python_version() { - if ! python3 -V | grep '3.[7-9].[0-9]' > /dev/null 2>&1; then - echo "An unsupported version of python 3 is installed. Must have python 3.7+ installed to use the Hologram SDK" + if ! python3 -V | grep -E '3.(9|1[012]).[0-9]' > /dev/null 2>&1; then + echo "An unsupported version of python 3 is installed. Must have python 3.9+ installed to use the Hologram SDK" exit 1 fi } @@ -132,7 +132,7 @@ do pause "Installing $program. Press [Enter] key to continue..."; install_software 'python3-pip' fi - if ! pip3 -V | grep '3.[7-9]' >/dev/null 2>&1; then + if ! pip3 -V | grep -E '3.(9|1[012])' >/dev/null 2>&1; then echo "pip3 is installed for an unsupported version of python." exit 1 fi diff --git a/version.txt b/version.txt index f374f666..57121573 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.9.1 +0.10.1