From e6ba344dd2f3176d0c7edd6489cfc5f405b624ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Diego=20Elio=20Petten=C3=B2?= Date: Sat, 27 Mar 2021 18:03:11 +0000 Subject: glucomenareo: new driver added. This driver supports GlucoMen areo by A. Menarini Diagnostics. It possibly works with the GlucoMen Areo 2K but it is untested with it, and does not support reporting Ketone readings. --- README.md | 4 + glucometerutils/drivers/glucomenareo.py | 222 ++++++++++++++++++++++++++++++++ mypy.ini | 3 + setup.py | 1 + 4 files changed, 230 insertions(+) create mode 100644 glucometerutils/drivers/glucomenareo.py diff --git a/README.md b/README.md index 7f7c097..bff6b7b 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,7 @@ supported. | Menarini | GlucoMen Nexus | `td42xx` | [construct] [pyserial]² [hidapi] | | Aktivmed | GlucoCheck XL | `td42xx` | [construct] [pyserial]² [hidapi] | | Ascensia | ContourUSB | `contourusb` | [construct] [hidapi]‡ | +| Menarini | GlucoMen areo³ | `glucomenareo` | [pyserial] [crcmod] | † Untested. @@ -78,6 +79,8 @@ supported. ² Requires a version of pyserial supporting CP2110 bridges. Supported starting from version 3.5. +³ Serial cable only, NFC not supported. + To identify the supported features for each of the driver, query the `help` action: @@ -92,6 +95,7 @@ it to https://protocols.glucometers.tech/ . [pyserial]: https://pythonhosted.org/pyserial/ [python-scsi]: https://pypi.org/project/PYSCSI/ [hidapi]: https://pypi.python.org/pypi/hidapi +[crcmod]: https://pypi.org/project/crcmod/ ## Dump format diff --git a/glucometerutils/drivers/glucomenareo.py b/glucometerutils/drivers/glucomenareo.py new file mode 100644 index 0000000..ca903e0 --- /dev/null +++ b/glucometerutils/drivers/glucomenareo.py @@ -0,0 +1,222 @@ +# SPDX-FileCopyrightText: © 2021 The glucometerutils Authors +# SPDX-License-Identifier: MIT + +"""Driver for GlucoMen Areo devices. + +Supported features: + - get readings, including pre-/post-meal notes and comments; + - set date and time. + +Expected device path: /dev/ttyUSB0 or similar serial port device. +""" + +import dataclasses +import datetime +import logging +from typing import Generator, Iterator, List, Mapping, NoReturn, Sequence, Union + +import crcmod.predefined +import serial as pyserial + +from glucometerutils import common, driver, exceptions +from glucometerutils.support import serial + +_crc8_maxim = crcmod.predefined.mkPredefinedCrcFun("crc-8-maxim") + +_CMD_GET_INFO = b"\xa2" + +_CMD_SET_DATETIME = b"\xc2\xa1" + +_CMD_GET_READINGS = b"\x80" + +_UNITS_MAPPING = { + "mmol/L": common.Unit.MMOL_L, + "mg/dL": common.Unit.MG_DL, +} + +_MARKINGS_MAPPING: Mapping[str, Union[str, common.Meal]] = { + "00": "", + "01": "Check Mark", + "02": common.Meal.BEFORE, + "04": common.Meal.AFTER, + "08": "Exercise", +} + + +@dataclasses.dataclass(frozen=True) +class _Reading: + reading_type: str + value_string: str + unit_string: str + marking_string: str + date: str + time: str + + @property + def value(self) -> float: + return float(self.value_string) + + @property + def unit(self) -> common.Unit: + return _UNITS_MAPPING[self.unit_string] + + @property + def _marking(self) -> Union[str, common.Meal]: + return _MARKINGS_MAPPING[self.marking_string] + + @property + def meal(self) -> common.Meal: + if isinstance(self._marking, common.Meal): + return self._marking + else: + return common.Meal.NONE + + @property + def comment(self) -> str: + if not isinstance(self._marking, common.Meal): + return self._marking + else: + return "" + + @property + def timestamp(self) -> datetime.datetime: + return datetime.datetime.strptime(f"{self.date},{self.time}", "%y%m%d,%H%M") + + +class Device(serial.SerialDevice, driver.GlucometerDevice): + BAUDRATE = 9600 + PARITY = pyserial.PARITY_ODD + DEFAULT_CABLE_ID = "10c4:ea60" # Generic cable. + + def connect(self) -> None: # pylint: disable=no-self-use + pass + + def disconnect(self) -> None: # pylint: disable=no-self-use + pass + + def _readline(self) -> bytes: + line = self.serial_.readline() + logging.debug(f"Read line: {line!r}") + return line + + def _read_text_response(self) -> Sequence[bytes]: + all_lines: List[bytes] = [] + + while True: + line = self._readline() + if not line.endswith(b"\r\n"): + raise exceptions.InvalidResponse(f"Corrupted response line: {line!r}") + all_lines.append(line) + + if line == b"]\r\n": + break + + if all_lines[0] != b"[\r\n": + raise exceptions.InvalidResponse( + f"Unexpected first response line: {all_lines!r}" + ) + + wire_checksum = int(all_lines[-2][:-2], base=16) + calculated_checksum = _crc8_maxim(b"".join(all_lines[:-2])) + + if wire_checksum != calculated_checksum: + raise exceptions.InvalidChecksum(wire_checksum, calculated_checksum) + + return [line[:-2] for line in all_lines[1:-2]] + + def _send_command(self, command: bytes) -> None: + logging.debug(f"sending command: {command!r}") + self.serial_.write(command) + + def _get_meter_info(self) -> Sequence[str]: + self._send_command(_CMD_GET_INFO) + get_info_response = list(self._read_text_response()) + if len(get_info_response) != 1: + raise exceptions.InvalidResponse( + f"Multiple lines returned, when one expected: {get_info_response!r}" + ) + info = get_info_response[0].split(b",") + if len(info) != 5: + raise exceptions.InvalidResponse( + f"Incomplete information response received: {get_info_response!r}" + ) + + return [component.decode("ascii") for component in info] + + def get_serial_number(self) -> str: + return self._get_meter_info()[3].strip() + + def get_version_info(self) -> Sequence[str]: + info = self._get_meter_info() + return (info[4].strip(),) + + def get_meter_info(self) -> common.MeterInfo: + return common.MeterInfo( + "GlucoMen areo", + serial_number=self.get_serial_number(), + version_info=self.get_version_info(), + native_unit=self.get_glucose_unit(), + ) + + def get_datetime(self) -> NoReturn: # pylint: disable=no-self-use + raise NotImplementedError + + def zero_log(self) -> NoReturn: + raise NotImplementedError + + def _set_device_datetime(self, date: datetime.datetime) -> datetime.datetime: + datetime_representation = date.strftime("%y%m%d%H%M").encode("ascii") + command_string = b"[\r\n" + datetime_representation + b"\r\n" + + checksum = _crc8_maxim(command_string) + assert 0 <= checksum <= 255 + + command_string += f"{checksum:02X}".encode("ascii") + b"\r\n]\r\n" + + command = _CMD_SET_DATETIME + command_string + self._send_command(command) + response = self.serial_.read() + if response == b"P": + return date + else: + raise exceptions.InvalidResponse(f"Unexpected response {response!r}.") + + def _get_raw_readings(self) -> Iterator[_Reading]: + self._send_command(_CMD_GET_READINGS) + response = list(self._read_text_response()) + if response[0] == b"\x90\x3d": + logging.debug("No readings available on the meter.") + return + + for reading in response: + yield _Reading(*reading.decode("ascii").split(",")) + + def get_glucose_unit(self) -> common.Unit: + for reading in self._get_raw_readings(): + if reading.reading_type != "Glu": + continue + return reading.unit + else: + logging.debug("No readings in the device, cannot guess glucose unit.") + return common.Unit.MG_DL + + def get_readings(self) -> Generator[common.AnyReading, None, None]: + for reading in self._get_raw_readings(): + if reading.reading_type != "Glu": + logging.warning( + f"Unsupported reading type {reading.reading_type!r}. Please file an issue at https://github.com/glucometers-tech/glucometerutils/issues" + ) + continue + + mgdl_value = common.convert_glucose_unit( + reading.value, + from_unit=reading.unit, + to_unit=common.Unit.MG_DL, + ) + + yield common.GlucoseReading( + reading.timestamp, + mgdl_value, + meal=reading.meal, + comment=reading.comment, + ) diff --git a/mypy.ini b/mypy.ini index cf851bc..9ff4178 100644 --- a/mypy.ini +++ b/mypy.ini @@ -14,6 +14,9 @@ ignore_missing_imports = True [mypy-construct] ignore_missing_imports = True +[mypy-crcmod.*] +ignore_missing_imports = True + [mypy-hid] ignore_missing_imports = True diff --git a/setup.py b/setup.py index e629ae0..96a2379 100644 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ extras_require = { "fslibre": ["freestyle-hid>=1.0.2"], "fsoptium": ["pyserial"], "fsprecisionneo": ["freestyle-hid>=1.0.2"], + "glucomenareo": ["pyserial", "crcmod"], "otultra2": ["pyserial"], "otultraeasy": ["construct", "pyserial"], "otverio2015": ["construct", "PYSCSI[sgio]>=2.0.1"], -- cgit v1.2.3