diff options
Diffstat (limited to 'freestyle_hid')
-rw-r--r-- | freestyle_hid/__init__.py | 5 | ||||
-rw-r--r-- | freestyle_hid/_exceptions.py | 18 | ||||
-rw-r--r-- | freestyle_hid/_hidwrapper.py | 66 | ||||
-rw-r--r-- | freestyle_hid/_session.py | 264 | ||||
-rw-r--r-- | freestyle_hid/py.typed | 2 | ||||
-rw-r--r-- | freestyle_hid/tools/__init__.py | 3 | ||||
-rw-r--r-- | freestyle_hid/tools/encrypted_setup_extractor.py | 162 | ||||
-rwxr-xr-x | freestyle_hid/tools/extract_chatter.py | 233 | ||||
-rwxr-xr-x | freestyle_hid/tools/hid_console.py | 80 | ||||
-rw-r--r-- | freestyle_hid/tools/py.typed | 2 |
10 files changed, 835 insertions, 0 deletions
diff --git a/freestyle_hid/__init__.py b/freestyle_hid/__init__.py new file mode 100644 index 0000000..41dce11 --- /dev/null +++ b/freestyle_hid/__init__.py @@ -0,0 +1,5 @@ +# SPDX-FileCopyrightText: © 2020 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 + +from ._exceptions import * # noqa: F403,F401 +from ._session import Session # noqa: F403,F401 diff --git a/freestyle_hid/_exceptions.py b/freestyle_hid/_exceptions.py new file mode 100644 index 0000000..38a822d --- /dev/null +++ b/freestyle_hid/_exceptions.py @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: © 2020 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 + + +class HIDError(Exception): + """Errors related to the HID access process.""" + + +class ConnectionError(Exception): + """Errors related to Session establishment.""" + + +class ChecksumError(Exception): + """Errors related to the transmission checksums.""" + + +class CommandError(Exception): + """Errors related to the command stream.""" diff --git a/freestyle_hid/_hidwrapper.py b/freestyle_hid/_hidwrapper.py new file mode 100644 index 0000000..a82bd4c --- /dev/null +++ b/freestyle_hid/_hidwrapper.py @@ -0,0 +1,66 @@ +# SPDX-FileCopyrightText: © 2020 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 +"""HID wrappers to access files with either hidraw or cython-hidapi.""" + +import abc +import pathlib +from typing import BinaryIO, Optional, Union + +try: + import hid +except ImportError: + hid = None + +from ._exceptions import HIDError + + +class HidWrapper(abc.ABC): + + _handle: Union[BinaryIO, "hid.device"] + + def write(self, report: bytes) -> None: + if len(report) > 65: + raise HIDError(f"Invalid report length {len(report)}.") + + written = self._handle.write(report) + if written < 0: + raise HIDError(f"Invalid write ({written}).") + + @abc.abstractmethod + def read(self, size: int = 64) -> bytes: + pass + + @staticmethod + def open( + device_path: Optional[pathlib.Path], vendor_id: int, product_id: Optional[int] + ) -> "HidWrapper": + if device_path: + return HidRaw(device_path) + else: + assert product_id is not None + return HidApi(vendor_id, product_id) + + +class HidRaw(HidWrapper): + def __init__(self, device_path: pathlib.Path) -> None: + if not device_path.exists(): + raise ValueError(f"Path {device_path} does not exists.") + + self._handle = device_path.open("w+b") + + def read(self, size: int = 64) -> bytes: + return self._handle.read(size) + + +class HidApi(HidWrapper): + _handle: "hid.device" + + def __init__(self, vendor_id: int, product_id: int) -> None: + if hid is None: + raise ValueError("cython-hidapi not found.") + + self._handle = hid.device() + self._handle.open(vendor_id, product_id) + + def read(self, size: int = 64) -> bytes: + return bytes(self._handle.read(size, timeout_ms=0)) diff --git a/freestyle_hid/_session.py b/freestyle_hid/_session.py new file mode 100644 index 0000000..7529ef3 --- /dev/null +++ b/freestyle_hid/_session.py @@ -0,0 +1,264 @@ +# SPDX-FileCopyrightText: © 2013 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 + +import csv +import logging +import pathlib +import re +from typing import AnyStr, Callable, Iterator, Optional, Sequence, Tuple + +import construct + +from ._exceptions import ChecksumError, CommandError +from ._hidwrapper import HidWrapper + +ABBOTT_VENDOR_ID = 0x1A61 + +_INIT_COMMAND = 0x01 +_INIT_RESPONSE = 0x71 + +_KEEPALIVE_RESPONSE = 0x22 +_UNKNOWN_MESSAGE_RESPONSE = 0x30 + +_ENCRYPTION_SETUP_COMMAND = 0x14 +_ENCRYPTION_SETUP_RESPONSE = 0x33 + +_ALWAYS_UNENCRYPTED_MESSAGES = ( + _INIT_COMMAND, + 0x04, + 0x05, + 0x06, + 0x0C, + 0x0D, + _ENCRYPTION_SETUP_COMMAND, + 0x15, + _ENCRYPTION_SETUP_RESPONSE, + 0x34, + 0x35, + _INIT_RESPONSE, + _KEEPALIVE_RESPONSE, +) + + +def _create_matcher( + message_type: int, content: Optional[bytes] +) -> Callable[[Tuple[int, bytes]], bool]: + def _matcher(message: Tuple[int, bytes]) -> bool: + return message[0] == message_type and (content is None or content == message[1]) + + return _matcher + + +_is_init_reply = _create_matcher(_INIT_RESPONSE, b"\x01") +_is_keepalive_response = _create_matcher(_KEEPALIVE_RESPONSE, None) +_is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b"\x85") +_is_encryption_missing_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x15") +_is_encryption_setup_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x14") + +_FREESTYLE_MESSAGE = construct.Struct( + hid_report=construct.Const(0, construct.Byte), + message_type=construct.Byte, + command=construct.Padded( + 63, # command can only be up to 62 bytes, but one is used for length. + construct.Prefixed(construct.Byte, construct.GreedyBytes), + ), +) + +_FREESTYLE_ENCRYPTED_MESSAGE = construct.Struct( + hid_report=construct.Const(0, construct.Byte), + message_type=construct.Byte, + command=construct.Padded( + 63, # command can only be up to 62 bytes, but one is used for length. + construct.GreedyBytes, + ), +) + +_TEXT_COMPLETION_RE = re.compile(b"CMD (?:OK|Fail!)") +_TEXT_REPLY_FORMAT = re.compile( + b"^(?P<message>.*)CKSM:(?P<checksum>[0-9A-F]{8})\r\n" + b"CMD (?P<status>OK|Fail!)\r\n$", + re.DOTALL, +) + +_MULTIRECORDS_FORMAT = re.compile( + "^(?P<message>.+\r\n)(?P<count>[0-9]+),(?P<checksum>[0-9A-F]{8})\r\n$", re.DOTALL +) + + +def _verify_checksum(message: AnyStr, expected_checksum_hex: AnyStr) -> None: + """Calculate the simple checksum of the message and compare with expected. + + Args: + message: (str) message to calculate the checksum of. + expected_checksum_hex: hexadecimal string representing the checksum + expected to match the message. + + Raises: + InvalidChecksum: if the message checksum calculated does not match the one + received. + """ + expected_checksum = int(expected_checksum_hex, 16) + if isinstance(message, bytes): + all_bytes = (c for c in message) + else: + all_bytes = (ord(c) for c in message) + + calculated_checksum = sum(all_bytes) + + if expected_checksum != calculated_checksum: + raise ChecksumError( + f"Invalid checksum, expected {expected_checksum}, calculated {calculated_checksum}" + ) + + +class Session: + def __init__( + self, + product_id: Optional[int], + device_path: Optional[pathlib.Path], + text_message_type: int, + text_reply_message_type: int, + ) -> None: + self._handle = HidWrapper.open(device_path, ABBOTT_VENDOR_ID, product_id) + + self._text_message_type = text_message_type + self._text_reply_message_type = text_reply_message_type + + def connect(self): + """Open connection to the device, starting the knocking sequence.""" + self.send_command(_INIT_COMMAND, b"") + response = self.read_response() + if not _is_init_reply(response): + raise ConnectionError( + f"Connection error: unexpected message %{response[0]:02x}:{response[1].hex()}" + ) + + def send_command(self, message_type: int, command: bytes, encrypted: bool = False): + """Send a raw command to the device. + + Args: + message_type: The first byte sent with the report to the device. + command: The command to send out the device. + """ + if encrypted: + assert message_type not in _ALWAYS_UNENCRYPTED_MESSAGES + meta_construct = _FREESTYLE_ENCRYPTED_MESSAGE + else: + meta_construct = _FREESTYLE_MESSAGE + + usb_packet = meta_construct.build( + {"message_type": message_type, "command": command} + ) + + logging.debug(f"Sending packet: {usb_packet!r}") + self._handle.write(usb_packet) + + def read_response(self, encrypted: bool = False) -> Tuple[int, bytes]: + """Read the response from the device and extracts it.""" + usb_packet = self._handle.read() + + logging.debug(f"Read packet: {usb_packet!r}") + + assert usb_packet + message_type = usb_packet[0] + + if not encrypted or message_type in _ALWAYS_UNENCRYPTED_MESSAGES: + message_length = usb_packet[1] + message_end_idx = 2 + message_length + message_content = usb_packet[2:message_end_idx] + else: + message_content = usb_packet[1:] + + # hidapi module returns a list of bytes rather than a bytes object. + message = (message_type, bytes(message_content)) + + # There appears to be a stray number of 22 01 xx messages being returned + # by some devices after commands are sent. These do not appear to have + # meaning, so ignore them and proceed to the next. These are always sent + # unencrypted, so we need to inspect them before we decide what the + # message content is. + if _is_keepalive_response(message): + return self.read_response(encrypted=encrypted) + + if _is_unknown_message_error(message): + raise CommandError("Invalid command") + + if _is_encryption_missing_error(message): + raise CommandError("Device encryption not initialized.") + + if _is_encryption_setup_error(message): + raise CommandError("Device encryption initialization failed.") + + return message + + def send_text_command(self, command: bytes) -> str: + """Send a command to the device that expects a text reply.""" + self.send_command(self._text_message_type, command) + + # Reply can stretch multiple buffers + full_content = b"" + while True: + message_type, content = self.read_response() + + logging.debug( + f"Received message: type {message_type:02x} content {content.hex()}" + ) + + if message_type != self._text_reply_message_type: + raise CommandError( + f"Message type {message_type:02x}: content does not match expectations: {content!r}" + ) + + full_content += content + + if _TEXT_COMPLETION_RE.search(full_content): + break + + match = _TEXT_REPLY_FORMAT.search(full_content) + if not match: + raise CommandError(repr(full_content)) + + message = match.group("message") + _verify_checksum(message, match.group("checksum")) + + if match.group("status") != b"OK": + raise CommandError(repr(message) or "Command failed") + + # If there is anything in the response that is not ASCII-safe, this is + # probably in the patient name. The Windows utility does not seem to + # validate those, so just replace anything non-ASCII with the correct + # unknown codepoint. + return message.decode("ascii", "replace") + + def query_multirecord(self, command: bytes) -> Iterator[Sequence[str]]: + """Queries for, and returns, "multirecords" results. + + Multirecords are used for querying events, readings, history and similar + other data out of a FreeStyle device. These are comma-separated values, + variable-length. + + The validation includes the general HID framing parsing, as well as + validation of the record count, and of the embedded records checksum. + + Args: + command: The text command to send to the device for the query. + + Returns: + A CSV reader object that returns a record for each line in the + reply buffer. + """ + message = self.send_text_command(command) + logging.debug(f"Received multirecord message:\n{message}") + if message == "Log Empty\r\n": + return iter(()) + + match = _MULTIRECORDS_FORMAT.search(message) + if not match: + raise CommandError(message) + + records_str = match.group("message") + _verify_checksum(records_str, match.group("checksum")) + + logging.debug(f"Received multi-record string: {records_str}") + + return csv.reader(records_str.split("\r\n")) diff --git a/freestyle_hid/py.typed b/freestyle_hid/py.typed new file mode 100644 index 0000000..311e481 --- /dev/null +++ b/freestyle_hid/py.typed @@ -0,0 +1,2 @@ +# SPDX-FileCopyrightText: © 2020 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 diff --git a/freestyle_hid/tools/__init__.py b/freestyle_hid/tools/__init__.py new file mode 100644 index 0000000..3e0558f --- /dev/null +++ b/freestyle_hid/tools/__init__.py @@ -0,0 +1,3 @@ +# SPDX-FileCopyrightText: 2013 The freestyle-hid Authors +# +# SPDX-License-Identifier: 0BSD diff --git a/freestyle_hid/tools/encrypted_setup_extractor.py b/freestyle_hid/tools/encrypted_setup_extractor.py new file mode 100644 index 0000000..dfe8229 --- /dev/null +++ b/freestyle_hid/tools/encrypted_setup_extractor.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 +# +# SPDX-FileCopyrightText: © 2019 The usbmon-tools Authors +# SPDX-FileCopyrightText: © 2019 The freestyle-hid Authors +# +# SPDX-License-Identifier: Apache-2.0 + +import logging +import sys +from typing import BinaryIO, Sequence + +import click +import click_log +import construct +import usbmon +import usbmon.pcapng + +logger = logging.getLogger() +click_log.basic_config(logger) + + +_SERIAL_NUMBER_RESPONSE_TYPE = 0x06 +_ENCRYPTION_SETUP_REQ_TYPE = 0x14 +_ENCRYPTION_SETUP_RESP_TYPE = 0x33 + + +_START_AUTHORIZE_CMD = 0x11 +_CHALLENGE_CMD = 0x16 +_CHALLENGE_RESPONSE_CMD = 0x17 + + +_ABBOTT_VENDOR_ID = 0x1A61 +_LIBRE2_PRODUCT_ID = 0x3950 + +_SERIAL_NO = construct.Struct( + message_type=construct.Const(_SERIAL_NUMBER_RESPONSE_TYPE, construct.Byte), + length=construct.Const(14, construct.Byte), + serial_number=construct.PaddedString(13, "ascii"), + termination=construct.Const(0, construct.Byte), +) + +_CHALLENGE = construct.Struct( + message_type=construct.Const(_ENCRYPTION_SETUP_RESP_TYPE, construct.Byte), + length=construct.Const(16, construct.Byte), + subcmd=construct.Const(_CHALLENGE_CMD, construct.Byte), + challenge=construct.Bytes(8), + iv=construct.Bytes(7), +) + +_CHALLENGE_RESPONSE = construct.Struct( + message_type=construct.Const(_ENCRYPTION_SETUP_REQ_TYPE, construct.Byte), + length=construct.Const(26, construct.Byte), + subcmd=construct.Const(_CHALLENGE_RESPONSE_CMD, construct.Byte), + challenge_response_encrypted=construct.Bytes(16), + const=construct.Const(1, construct.Byte), + mac=construct.Bytes(8), +) + + +@click.command() +@click_log.simple_verbosity_option(logger, "--vlog") +@click.option( + "--device-address", + help=( + "Device address (busnum.devnum) of the device to extract capture" + " of. If none provided, device descriptors will be relied on." + ), +) +@click.argument( + "pcap-files", + type=click.File(mode="rb"), + nargs=None, +) +def main(*, device_address: str, pcap_files: Sequence[BinaryIO]): + if sys.version_info < (3, 7): + raise Exception("Unsupported Python version, please use at least Python 3.7.") + + for pcap_file in pcap_files: + session = usbmon.pcapng.parse_stream(pcap_file, retag_urbs=False) + + if not device_address: + for descriptor in session.device_descriptors.values(): + if ( + descriptor.vendor_id == _ABBOTT_VENDOR_ID + and descriptor.product_id == _LIBRE2_PRODUCT_ID + ): + if device_address and device_address != descriptor.address: + raise Exception( + "Multiple Libre2 devices present in capture, please" + " provide a --device-address flag." + ) + device_address = descriptor.address + else: + device_address = descriptor.address + + if device_address in session.device_descriptors: + descriptor = session.device_descriptors[device_address] + assert descriptor.vendor_id == _ABBOTT_VENDOR_ID + assert descriptor.product_id == _LIBRE2_PRODUCT_ID + + serial_number = "UNKNOWN" + challenge = "UNKNOWN" + iv = "UNKNOWN" + encrypted_challenge = "UNKNOWN" + mac = "UNKNOWN" + + for first, second in session.in_pairs(): + # Ignore stray callbacks/errors. + if not first.type == usbmon.constants.PacketType.SUBMISSION: + continue + + if not first.address.startswith(f"{device_address}."): + # No need to check second, they will be linked. + continue + + if first.xfer_type == usbmon.constants.XferType.INTERRUPT: + pass + elif ( + first.xfer_type == usbmon.constants.XferType.CONTROL + and not first.setup_packet + or first.setup_packet.type == usbmon.setup.Type.CLASS # type: ignore + ): + pass + else: + continue + + if first.direction == usbmon.constants.Direction.OUT: + packet = first + else: + assert second is not None + packet = second + + if not packet.payload: + continue + + assert len(packet.payload) >= 2 + + message_type = packet.payload[0] + + if message_type == _SERIAL_NUMBER_RESPONSE_TYPE: + obj = _SERIAL_NO.parse(packet.payload) + serial_number = obj.serial_number + elif ( + message_type == _ENCRYPTION_SETUP_RESP_TYPE + and packet.payload[2] == _CHALLENGE_CMD + ): + obj = _CHALLENGE.parse(packet.payload) + challenge = obj.challenge.hex() + iv = obj.iv.hex() + elif ( + message_type == _ENCRYPTION_SETUP_REQ_TYPE + and packet.payload[2] == _CHALLENGE_RESPONSE_CMD + ): + obj = _CHALLENGE_RESPONSE.parse(packet.payload) + encrypted_challenge = obj.challenge_response_encrypted.hex() + mac = obj.mac.hex() + + print(f"{serial_number},{challenge},{iv},{encrypted_challenge},{mac}") + + +if __name__ == "__main__": + main() diff --git a/freestyle_hid/tools/extract_chatter.py b/freestyle_hid/tools/extract_chatter.py new file mode 100755 index 0000000..a77a0ec --- /dev/null +++ b/freestyle_hid/tools/extract_chatter.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 +# +# SPDX-FileCopyrightText: © 2019 The usbmon-tools Authors +# SPDX-FileCopyrightText: © 2020 The freestyle-hid Authors +# +# SPDX-License-Identifier: Apache-2.0 + +import logging +import sys +import textwrap +from typing import BinaryIO + +import click +import click_log +import construct +import usbmon +import usbmon.chatter +import usbmon.pcapng + +logger = logging.getLogger() +click_log.basic_config(logger) + +_KEEPALIVE_TYPE = 0x22 + +_UNENCRYPTED_TYPES = ( + 0x01, + 0x04, + 0x05, + 0x06, + 0x0C, + 0x0D, + 0x14, + 0x15, + 0x33, + 0x34, + 0x35, + 0x71, + _KEEPALIVE_TYPE, +) + +_ENCRYPTION_SETUP_TYPES = (0x14, 0x33) + +_START_AUTHORIZE_CMD = 0x11 +_CHALLENGE_CMD = 0x16 +_CHALLENGE_RESPONSE_CMD = 0x17 +_CHALLENGE_ACCEPTED_CMD = 0x18 + +_ABBOTT_VENDOR_ID = 0x1A61 +_LIBRE2_PRODUCT_ID = 0x3950 + +_ENCRYPTED_MESSAGE = construct.Struct( + message_type=construct.Byte, + encrypted_message=construct.Bytes(64 - 1 - 4 - 4), + sequence_number=construct.Int32ul, + mac=construct.Int32ul, +) + + +@click.command() +@click_log.simple_verbosity_option(logger, "--vlog") +@click.option( + "--device-address", + help=( + "Device address (busnum.devnum) of the device to extract capture" + " of. If none provided, device descriptors will be relied on." + ), +) +@click.option( + "--encrypted-protocol / --no-encrypted-protocol", + default=False, + help=( + "Whether to expect encrypted protocol in the capture." + " Ignored if the device descriptors are present in the capture." + ), +) +@click.option( + "--verbose-encryption-setup / --no-verbose-encryption-setup", + default=False, + help=( + "Whether to parse encryption setup commands and printing their component" + " together with the raw messsage." + ), +) +@click.option( + "--print-keepalive / --no-print-keepalive", + default=False, + help=( + "Whether to print the keepalive messages sent by the device. " + "Keepalive messages are usually safely ignored." + ), +) +@click.argument( + "pcap-file", + type=click.File(mode="rb"), +) +def main( + *, + device_address: str, + encrypted_protocol: bool, + verbose_encryption_setup: bool, + print_keepalive: bool, + pcap_file: BinaryIO, +) -> None: + if sys.version_info < (3, 7): + raise Exception("Unsupported Python version, please use at least Python 3.7.") + + session = usbmon.pcapng.parse_stream(pcap_file, retag_urbs=False) + + if not device_address: + for descriptor in session.device_descriptors.values(): + if descriptor.vendor_id == _ABBOTT_VENDOR_ID: + if device_address and device_address != descriptor.address: + raise Exception( + "Multiple Abbott device present in capture, please" + " provide a --device-address flag." + ) + device_address = descriptor.address + + if device_address not in session.device_descriptors: + logging.warning( + f"Unable to find device {device_address} in the capture's descriptors." + " Assuming non-encrypted protocol.", + ) + else: + descriptor = session.device_descriptors[device_address] + assert descriptor.vendor_id == _ABBOTT_VENDOR_ID + + if descriptor.product_id == _LIBRE2_PRODUCT_ID: + encrypted_protocol = True + + for first, second in session.in_pairs(): + # Ignore stray callbacks/errors. + if not first.type == usbmon.constants.PacketType.SUBMISSION: + continue + + if not first.address.startswith(f"{device_address}."): + # No need to check second, they will be linked. + continue + + if first.xfer_type == usbmon.constants.XferType.INTERRUPT: + pass + elif ( + first.xfer_type == usbmon.constants.XferType.CONTROL + and not first.setup_packet + or first.setup_packet.type == usbmon.setup.Type.CLASS # type: ignore + ): + pass + else: + continue + + if first.direction == usbmon.constants.Direction.OUT: + packet = first + else: + assert second is not None + packet = second + + if not packet.payload: + continue + + assert len(packet.payload) >= 2 + + message_type = packet.payload[0] + + if message_type == _KEEPALIVE_TYPE and not print_keepalive: + continue + + message_metadata = [] + + if encrypted_protocol and message_type not in _UNENCRYPTED_TYPES: + # With encrypted communication, the length of the message is also encrypted, + # and all the packets use the full 64 bytes. So instead, we extract what + # metadata we can. + parsed = _ENCRYPTED_MESSAGE.parse(packet.payload) + message_metadata.extend( + [f"SEQUENCE_NUMBER={parsed.sequence_number}", f"MAC={parsed.mac:04x}"] + ) + + message_type_str = f"x{message_type:02x}" + message = parsed.encrypted_message + elif verbose_encryption_setup and message_type in _ENCRYPTION_SETUP_TYPES: + message_length = packet.payload[1] + message_end_idx = 2 + message_length + message = packet.payload[2:message_end_idx] + + if message[0] == _START_AUTHORIZE_CMD: + message_metadata.append("START_AUTHORIZE") + elif message[0] == _CHALLENGE_CMD: + message_metadata.append("CHALLENGE") + challenge = message[1:9] + iv = message[9:16] + message_metadata.append(f"CHALLENGE={challenge.hex()}") + message_metadata.append(f"IV={iv.hex()}") + elif message[0] == _CHALLENGE_RESPONSE_CMD: + message_metadata.append("CHALLENGE_RESPONSE") + encrypted_challenge = message[1:17] + challenge_mac = message[18:26] + message_metadata.append( + f"ENCRYPTED_CHALLENGE={encrypted_challenge.hex()}" + ) + message_metadata.append(f"MAC={challenge_mac.hex()}") + elif message[0] == _CHALLENGE_ACCEPTED_CMD: + message_metadata.append("CHALLENGE_ACCEPTED") + + message_metadata.append(f"RAW_LENGTH={message_length}") + message_type_str = f" {message_type:02x}" + else: + message_length = packet.payload[1] + message_metadata.append(f"LENGTH={message_length}") + message_end_idx = 2 + message_length + message_type_str = f" {message_type:02x}" + message = packet.payload[2:message_end_idx] + + if message_metadata: + metadata_string = "\n".join( + textwrap.wrap( + " ".join(message_metadata), width=80, break_long_words=False + ) + ) + print(metadata_string) + + print( + usbmon.chatter.dump_bytes( + packet.direction, + message, + prefix=f"[{message_type_str}]", + print_empty=True, + ), + "\n", + ) + + +if __name__ == "__main__": + main() diff --git a/freestyle_hid/tools/hid_console.py b/freestyle_hid/tools/hid_console.py new file mode 100755 index 0000000..b3b3fee --- /dev/null +++ b/freestyle_hid/tools/hid_console.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: © 2019 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 +"""CLI tool to send messages through FreeStyle HID protocol.""" + +import logging +import pathlib +import sys +from typing import Optional + +import click +import click_log + +import freestyle_hid + +logger = logging.getLogger() +click_log.basic_config(logger) + + +@click.command() +@click_log.simple_verbosity_option(logger, "--vlog") +@click.option( + "--text-command-type", + "-c", + type=int, + default=0x60, + help="Message type for text commands sent to the device.", +) +@click.option( + "--text-reply-type", + "-r", + type=int, + default=0x60, + help="Message type for text replies received from the device.", +) +@click.option( + "--product-id", + "-p", + type=int, + help="Optional product ID (in alternative to the device path)", +) +@click.argument( + "device-path", + type=click.Path(exists=True, dir_okay=False, writable=True, allow_dash=False), + callback=lambda ctx, param, value: pathlib.Path(value) if value else None, + required=False, +) +def main( + *, + text_command_type: int, + text_reply_type: int, + product_id: Optional[int], + device_path: Optional[pathlib.Path], +): + if not product_id and not device_path: + raise click.UsageError( + "One of --product-id or DEVICE_PATH need to be provided." + ) + + session = freestyle_hid.Session( + product_id, device_path, text_command_type, text_reply_type + ) + + session.connect() + + while True: + if sys.stdin.isatty(): + command = input(">>> ") + else: + command = input() + print(f">>> {command}") + + try: + print(session.send_text_command(bytes(command, "ascii"))) + except freestyle_hid.CommandError as error: + print(f"! {error!r}") + + +if __name__ == "__main__": + main() diff --git a/freestyle_hid/tools/py.typed b/freestyle_hid/tools/py.typed new file mode 100644 index 0000000..311e481 --- /dev/null +++ b/freestyle_hid/tools/py.typed @@ -0,0 +1,2 @@ +# SPDX-FileCopyrightText: © 2020 The freestyle-hid Authors +# SPDX-License-Identifier: Apache-2.0 |