summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--freestyle_hid/_exceptions.py8
-rw-r--r--freestyle_hid/_session.py151
-rw-r--r--freestyle_hid/tests/test_freestyle.py4
3 files changed, 119 insertions, 44 deletions
diff --git a/freestyle_hid/_exceptions.py b/freestyle_hid/_exceptions.py
index 2b803f8..aa71e7c 100644
--- a/freestyle_hid/_exceptions.py
+++ b/freestyle_hid/_exceptions.py
@@ -10,6 +10,10 @@ class ConnectionError(Exception):
"""Errors related to Session establishment."""
+class EncryptionHandshakeError(ConnectionError):
+ """Errors related to encryption handshake."""
+
+
class ChecksumError(Exception):
"""Errors related to the transmission checksums."""
@@ -18,6 +22,10 @@ class CommandError(Exception):
"""Errors related to the command stream."""
+class EncryptionNotInitialized(CommandError):
+ """Device needs encryption handshake."""
+
+
class MissingFreeStyleKeys(Exception):
"""The freestyle-hid-keys package is missing."""
diff --git a/freestyle_hid/_session.py b/freestyle_hid/_session.py
index a2c9195..03e8c54 100644
--- a/freestyle_hid/_session.py
+++ b/freestyle_hid/_session.py
@@ -10,7 +10,13 @@ from typing import AnyStr, Callable, Iterator, Optional, Sequence, Tuple
import construct
-from ._exceptions import ChecksumError, CommandError, MissingFreeStyleKeys
+from ._exceptions import (
+ ChecksumError,
+ CommandError,
+ EncryptionHandshakeError,
+ EncryptionNotInitialized,
+ MissingFreeStyleKeys,
+)
from ._freestyle_encryption import SpeckCMAC, SpeckEncrypt
from ._hidwrapper import HidWrapper
@@ -64,13 +70,44 @@ _is_unknown_message_error = _create_matcher(_UNKNOWN_MESSAGE_RESPONSE, b"\x85")
_is_encryption_missing_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x15")
_is_encryption_setup_error = _create_matcher(_ENCRYPTION_SETUP_RESPONSE, b"\x14")
+_HID_REPORT = construct.Struct(
+ number=construct.Byte, content=construct.Padded(64, construct.GreedyBytes)
+)
+
_FREESTYLE_MESSAGE = construct.Struct(
- hid_report=construct.Const(0, construct.Byte),
message_type=construct.Byte,
command=construct.Padded(
- 63, # command can only be up to 62 bytes, but one is used for length.
+ 55, # command can only be up to 54 bytes, but one is used for length.
construct.Prefixed(construct.Byte, construct.GreedyBytes),
),
+ iv_counter=construct.Padding(4),
+ mac=construct.Int32ul,
+)
+
+_CHALLENGE_MESSAGE = construct.Struct(
+ subcmd=construct.Const(0x16, construct.Byte),
+ reader_nonce=construct.Bytes(8),
+ iv=construct.BytesInteger(7, signed=False, swapped=False),
+)
+
+_CHALLENGE_RESPONSE_NOMAC_RAW = construct.Struct(
+ message_type=construct.Const(_ENCRYPTION_SETUP_COMMAND, construct.Byte),
+ length=construct.Const(0x1A, construct.Byte),
+ response_subcmd=construct.Const(0x17, construct.Byte),
+ response=construct.Bytes(16),
+ const1=construct.Const(0x01, construct.Byte),
+)
+
+_CHALLENGE_RESPONSE_RAW = construct.Struct(
+ response=_CHALLENGE_RESPONSE_NOMAC_RAW,
+ mac=construct.Int64ul,
+)
+
+_CHALLENGE_ACCEPTED_MESSAGE = construct.Struct(
+ subcmd=construct.Const(0x18, construct.Byte),
+ encrypted_nonces=construct.Bytes(16),
+ iv=construct.BytesInteger(7, signed=False, swapped=False),
+ mac=construct.Int64ul,
)
_TEXT_COMPLETION_RE = re.compile(b"CMD (?:OK|Fail!)")
@@ -144,40 +181,66 @@ class Session:
auth_mac = SpeckCMAC(auth_mac_key)
self.send_command(_ENCRYPTION_SETUP_COMMAND, b"\x11")
- response = self.read_response()
- assert response[0] == _ENCRYPTION_SETUP_RESPONSE
- assert response[1][0] == 0x16
- reader_rand = response[1][1:9]
- iv = int.from_bytes(response[1][9:16], "big", signed=False)
- driver_rand = random.randbytes(8)
- resp_enc = auth_enc.encrypt(iv, reader_rand + driver_rand)
- resp_mac = auth_mac.sign(b"\x14\x1a\x17" + resp_enc + b"\x01")
- resp_mac = int.to_bytes(resp_mac, 8, byteorder="little", signed=False)
- self.send_command(
- _ENCRYPTION_SETUP_COMMAND, b"\x17" + resp_enc + b"\x01" + resp_mac
+ (response_type, response_bytes) = self.read_response()
+
+ if response_type != _ENCRYPTION_SETUP_RESPONSE:
+ raise EncryptionHandshakeError(
+ f"Unexpected response type: {response_type:02x}"
+ )
+
+ challenge_response = _CHALLENGE_MESSAGE.parse(response_bytes)
+ host_nonce = random.randbytes(8)
+
+ encrypted_challenge_response = auth_enc.encrypt(
+ challenge_response.iv, challenge_response.reader_nonce + host_nonce
)
- response = self.read_response()
- assert response[0] == _ENCRYPTION_SETUP_RESPONSE
- assert response[1][0] == 0x18
- mac = auth_mac.sign(b"\x33\x22" + response[1][:24])
- mac = int.to_bytes(mac, 8, byteorder="little", signed=False)
- assert mac == response[1][24:32]
- iv = int.from_bytes(response[1][17:24], "big", signed=False)
- resp_dec = auth_enc.decrypt(iv, response[1][1:17])
- assert resp_dec[:8] == driver_rand
- assert resp_dec[8:] == reader_rand
- crypt = SpeckCMAC(libre2_keys.SESSION_ENCRYPTION_KEY)
- ses_enc_key = crypt.derive(
- "SessnEnc".encode(), serial + reader_rand + driver_rand
+ raw_response_nomac = _CHALLENGE_RESPONSE_NOMAC_RAW.build(
+ {"response": encrypted_challenge_response}
)
- crypt = SpeckCMAC(libre2_keys.SESSION_MAC_KEY)
- ses_mac_key = crypt.derive(
- "SessnMAC".encode(), serial + reader_rand + driver_rand
+ response_mac = auth_mac.sign(raw_response_nomac)
+ raw_response = _CHALLENGE_RESPONSE_RAW.build(
+ {
+ "response": {"response": encrypted_challenge_response},
+ "mac": response_mac,
+ }
)
+
+ self._write_hid(raw_response)
+ (response_type, response_bytes) = self.read_response()
+
+ if response_type != _ENCRYPTION_SETUP_RESPONSE:
+ raise EncryptionHandshakeError(
+ f"Unexpected response type: {response_type:02x}"
+ )
+
+ acceptance_response = _CHALLENGE_ACCEPTED_MESSAGE.parse(response_bytes)
+
+ # We need to reconstruct the raw message, so we include the expected type and size.
+ mac = auth_mac.sign(b"\x33\x22" + response_bytes[:24])
+
+ if mac != acceptance_response.mac:
+ raise EncryptionHandshakeError(
+ f"Challenge acceptance has incorrect MAC! Expected {mac:016x} received {acceptance_response.mac:016x}."
+ )
+
+ decoded_nonces = auth_enc.decrypt(
+ acceptance_response.iv, acceptance_response.encrypted_nonces
+ )
+
+ if decoded_nonces != host_nonce + challenge_response.reader_nonce:
+ raise EncryptionHandshakeError("Decrypted nonces do not match expectation.")
+
+ context_key = serial + challenge_response.reader_nonce + host_nonce
+
+ logging.debug(f"Context key established: {context_key.hex()}")
+
+ crypt = SpeckCMAC(libre2_keys.SESSION_ENCRYPTION_KEY)
+ ses_enc_key = crypt.derive("SessnEnc".encode(), context_key)
+ crypt = SpeckCMAC(libre2_keys.SESSION_MAC_KEY)
+ ses_mac_key = crypt.derive("SessnMAC".encode(), context_key)
self.crypt_enc = SpeckEncrypt(ses_enc_key)
self.crypt_mac = SpeckCMAC(ses_mac_key)
- # print("HANDSHAKE SUCCESSFUL!")
def connect(self):
"""Open connection to the device, starting the knocking sequence."""
@@ -193,12 +256,12 @@ class Session:
def encrypt_message(self, packet: bytes):
output = bytearray(packet)
# 0xFF IV is actually 0, because of some weird padding
- encrypted = self.crypt_enc.encrypt(0xFF, packet[2:57])
- output[2:57] = encrypted
+ encrypted = self.crypt_enc.encrypt(0xFF, packet[1:56])
+ output[1:56] = encrypted
# Not giving a f**k about the IV counter for now
- output[57:61] = bytes(4)
- mac = self.crypt_mac.sign(output[1:61])
- output[61:65] = int.to_bytes(mac, 8, byteorder="little", signed=False)[4:]
+ output[56:60] = bytes(4)
+ mac = self.crypt_mac.sign(output[0:60])
+ output[60:64] = int.to_bytes(mac, 8, byteorder="little", signed=False)[4:]
return bytes(output)
def decrypt_message(self, packet: bytes):
@@ -210,6 +273,11 @@ class Session:
output[1:56] = self.crypt_enc.decrypt(iv, packet[1:56])
return bytes(output)
+ def _write_hid(self, packet: bytes, hid_report: int = 0) -> None:
+ usb_packet = _HID_REPORT.build({"number": hid_report, "content": packet})
+ logging.debug(f"Sending packet: {usb_packet!r}")
+ self._handle.write(usb_packet)
+
def send_command(self, message_type: int, command: bytes, encrypted: bool = False):
"""Send a raw command to the device.
@@ -218,18 +286,17 @@ class Session:
command: The command to send out the device.
"""
- usb_packet = _FREESTYLE_MESSAGE.build(
- {"message_type": message_type, "command": command}
+ message = _FREESTYLE_MESSAGE.build(
+ {"message_type": message_type, "command": command, "mac": 0}
)
if (
self._encrypted_protocol
and message_type not in _ALWAYS_UNENCRYPTED_MESSAGES
):
- usb_packet = self.encrypt_message(usb_packet)
+ message = self.encrypt_message(message)
- logging.debug(f"Sending packet: {usb_packet!r}")
- self._handle.write(usb_packet)
+ self._write_hid(message)
def read_response(self, encrypted: bool = False) -> Tuple[int, bytes]:
"""Read the response from the device and extracts it."""
@@ -265,7 +332,7 @@ class Session:
raise CommandError("Invalid command")
if _is_encryption_missing_error(message):
- raise CommandError("Device encryption not initialized.")
+ raise EncryptionNotInitialized("Device encryption not initialized.")
if _is_encryption_setup_error(message):
raise CommandError("Device encryption initialization failed.")
diff --git a/freestyle_hid/tests/test_freestyle.py b/freestyle_hid/tests/test_freestyle.py
index feb7c15..ebf6b80 100644
--- a/freestyle_hid/tests/test_freestyle.py
+++ b/freestyle_hid/tests/test_freestyle.py
@@ -14,9 +14,9 @@ class TestFreeStyle(unittest.TestCase):
"""Test the generation of a new outgoing message."""
self.assertEqual(
- b"\0\x17\7command\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"
+ b"\x17\7command\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"
b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
_session._FREESTYLE_MESSAGE.build(
- {"message_type": 23, "command": b"command"}
+ {"message_type": 23, "command": b"command", "mac": 0}
),
)