# KrakenSDR Receiver # Copyright (C) 2018-2021 Carl Laufer, Tamás Pető # # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # -*- coding: utf-8 -*- # Import built-in modules import sys import os import time from struct import pack, unpack import socket import _thread from threading import Lock import queue import logging #import copy # Import third party modules import numpy as np from scipy import signal from iq_header import IQHeader from shmemIface import inShmemIface class ReceiverRTLSDR(): def __init__(self, data_que, data_interface = "eth", logging_level=10): """ Parameter: ---------- :param: data_que: Que to communicate with the UI (web iface/Qt GUI) :param: data_interface: This field is configured by the GUI during instantiation. Valid values are the followings: "eth" : The module will receiver IQ frames through an Ethernet connection "shmem": The module will receiver IQ frames through a shared memory interface :type : data_interface: string """ self.logger = logging.getLogger(__name__) self.logger.setLevel(logging_level) # DAQ parameters # These values are used by default to configure the DAQ through the configuration interface # Values are configured externally upon configuration request self.daq_center_freq = 100 # MHz self.daq_rx_gain = [0] * 100 # [dB] self.daq_squelch_th_dB = 0 # UI interface self.data_que = data_que # IQ data interface self.data_interface = data_interface # -> Ethernet self.receiver_connection_status = False self.port = 5000 self.rec_ip_addr = "127.0.0.1" # Configured by the GUI prior to connection request self.socket_inst = socket.socket() self.receiverBufferSize = 2 ** 18 # Size of the Ethernet receiver buffer measured in bytes # -> Shared memory root_path = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) daq_path = os.path.join(os.path.dirname(root_path),"heimdall_daq_fw") self.daq_shmem_control_path = os.path.join(os.path.join(daq_path,"Firmware"),"_data_control/") self.init_data_iface() # Control interface self.ctr_iface_socket = socket.socket() self.ctr_iface_port = 5001 self.ctr_iface_thread_lock = Lock() # Used to synchronize the operation of the ctr_iface thread self.iq_frame_bytes = None self.iq_samples = None self.iq_header = IQHeader() self.M = 0 # Number of receiver channels, updated after establishing connection def init_data_iface(self): if self.data_interface == "shmem": # Open shared memory interface to capture the DAQ firmware output self.in_shmem_iface = inShmemIface("delay_sync_iq", self.daq_shmem_control_path) if not self.in_shmem_iface.init_ok: self.logger.critical("Shared memory initialization failed") self.in_shmem_iface.destory_sm_buffer() return -1 return 0 def eth_connect(self): """ Compatible only with DAQ firmwares that has the IQ streaming mode. HeIMDALL DAQ Firmware version: 1.0 or later """ try: if not self.receiver_connection_status: if self.data_interface == "eth": # Establlish IQ data interface connection self.socket_inst.connect((self.rec_ip_addr, self.port)) self.socket_inst.sendall(str.encode('streaming')) test_iq = self.receive_iq_frame() self.M = self.iq_header.active_ant_chs # Establish control interface connection self.ctr_iface_socket.connect((self.rec_ip_addr, self.ctr_iface_port)) self.receiver_connection_status = True self.ctr_iface_init() self.logger.info("CTR INIT Center freq: {0}".format(self.daq_center_freq)) self.set_center_freq(self.daq_center_freq) self.set_if_gain(self.daq_rx_gain) self.set_squelch_threshold(self.daq_squelch_th_dB) except: errorMsg = sys.exc_info()[0] self.logger.error("Error message: "+str(errorMsg)) self.receiver_connection_status = False self.logger.error("Unexpected error: {0}".format(sys.exc_info()[0])) # Re-instantiating sockets self.socket_inst = socket.socket() self.ctr_iface_socket = socket.socket() return -1 self.logger.info("Connection established") que_data_packet = [] que_data_packet.append(['conn-ok',]) self.data_que.put(que_data_packet) def eth_close(self): """ Close Ethernet conenctions including the IQ data and the control interfaces """ try: if self.receiver_connection_status: if self.data_interface == "eth": self.socket_inst.sendall(str.encode('q')) # Send exit message self.socket_inst.close() self.socket_inst = socket.socket() # Re-instantiating socket # Close control interface connection exit_message_bytes=("EXIT".encode()+bytearray(124)) self.ctr_iface_socket.send(exit_message_bytes) self.ctr_iface_socket.close() self.ctr_iface_socket = socket.socket() self.receiver_connection_status = False que_data_packet = [] que_data_packet.append(['disconn-ok',]) self.data_que.put(que_data_packet) except: errorMsg = sys.exc_info()[0] self.logger.error("Error message: {0}".format(errorMsg)) return -1 if self.data_interface == "shmem": self.in_shmem_iface.destory_sm_buffer() return 0 def get_iq_online(self): """ This function obtains a new IQ data frame through the Ethernet IQ data or the shared memory interface """ # Check connection if not self.receiver_connection_status: fail = self.eth_connect() if fail: return -1 if self.data_interface == "eth": self.socket_inst.sendall(str.encode("IQDownload")) # Send iq request command self.iq_samples = self.receive_iq_frame() elif self.data_interface == "shmem": active_buff_index = self.in_shmem_iface.wait_buff_free() if active_buff_index < 0 or active_buff_index > 1: self.logger.info("Terminating.., signal: {:d}".format(active_buff_index)) return -1 buffer = self.in_shmem_iface.buffers[active_buff_index] iq_header_bytes = buffer[0:1024].tobytes() self.iq_header.decode_header(iq_header_bytes) # Inititalization from header - Set channel numbers if self.M == 0: self.M = self.iq_header.active_ant_chs self.daq_rx_gain = [0] * self.M incoming_payload_size = self.iq_header.cpi_length*self.iq_header.active_ant_chs*2*int(self.iq_header.sample_bit_depth/8) if incoming_payload_size > 0: iq_samples_in = (buffer[1024:1024 + incoming_payload_size].view(dtype=np.complex64))\ .reshape(self.iq_header.active_ant_chs, self.iq_header.cpi_length) self.iq_samples = iq_samples_in.copy() # Must be .copy self.in_shmem_iface.send_ctr_buff_ready(active_buff_index) def receive_iq_frame(self): """ Called by the get_iq_online function. Receives IQ samples over the establed Ethernet connection """ total_received_bytes = 0 recv_bytes_count = 0 iq_header_bytes = bytearray(self.iq_header.header_size) # allocate array view = memoryview(iq_header_bytes) # Get buffer self.logger.debug("Starting IQ header reception") while total_received_bytes < self.iq_header.header_size: # Receive into buffer recv_bytes_count = self.socket_inst.recv_into(view, self.iq_header.header_size-total_received_bytes) view = view[recv_bytes_count:] # reset memory region total_received_bytes += recv_bytes_count self.iq_header.decode_header(iq_header_bytes) # Uncomment to check the content of the IQ header #self.iq_header.dump_header() incoming_payload_size = self.iq_header.cpi_length*self.iq_header.active_ant_chs*2*int(self.iq_header.sample_bit_depth/8) if incoming_payload_size > 0: # Calculate total bytes to receive from the iq header data total_bytes_to_receive = incoming_payload_size receiver_buffer_size = 2**18 self.logger.debug("Total bytes to receive: {:d}".format(total_bytes_to_receive)) total_received_bytes = 0 recv_bytes_count = 0 iq_data_bytes = bytearray(total_bytes_to_receive + receiver_buffer_size) # allocate array view = memoryview(iq_data_bytes) # Get buffer while total_received_bytes < total_bytes_to_receive: # Receive into buffer recv_bytes_count = self.socket_inst.recv_into(view, receiver_buffer_size) view = view[recv_bytes_count:] # reset memory region total_received_bytes += recv_bytes_count self.logger.debug(" IQ data succesfully received") # Convert raw bytes to Complex float64 IQ samples self.iq_samples = np.frombuffer(iq_data_bytes[0:total_bytes_to_receive], dtype=np.complex64).reshape(self.iq_header.active_ant_chs, self.iq_header.cpi_length) self.iq_frame_bytes = bytearray()+iq_header_bytes+iq_data_bytes return self.iq_samples else: return 0 def set_squelch_threshold(self, threshold_dB): """ Configures the threshold level of the squelch module in the DAQ FW through the control interface """ if self.receiver_connection_status: # Check connection self.daq_squelch_th_dB = threshold_dB if threshold_dB == -80: threshold = 0 else: threshold = 10**(threshold_dB/20) # Assembling message cmd="STHU" th_bytes=pack("f",threshold) msg_bytes=(cmd.encode()+th_bytes+bytearray(120)) try: _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,)) except: errorMsg = sys.exc_info()[0] self.logger.error("Unable to start communication thread") self.logger.error("Error message: {:s}".format(errorMsg)) def ctr_iface_init(self): """ Initialize connection with the DAQ FW through the control interface """ if self.receiver_connection_status: # Check connection # Assembling message cmd="INIT" msg_bytes=(cmd.encode()+bytearray(124)) try: _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,)) except: errorMsg = sys.exc_info()[0] self.logger.error("Unable to start communication thread") self.logger.error("Error message: {:s}".format(errorMsg)) def ctr_iface_communication(self, msg_bytes): """ Handles communication on the control interface with the DAQ FW Parameters: ----------- :param: msg: Message bytes, that will be sent ont the control interface :type: msg: Byte array """ self.ctr_iface_thread_lock.acquire() self.logger.debug("Sending control message") self.ctr_iface_socket.send(msg_bytes) # Waiting for the command to take effect reply_msg_bytes = self.ctr_iface_socket.recv(128) self.logger.debug("Control interface communication finished") self.ctr_iface_thread_lock.release() status = reply_msg_bytes[0:4].decode() if status == "FNSD": self.logger.info("Reconfiguration succesfully finished") que_data_packet = [] que_data_packet.append(['config-ok',]) self.data_que.put(que_data_packet) else: self.logger.error("Failed to set the requested parameter, reply: {0}".format(status)) def set_center_freq(self, center_freq): """ Configures the RF center frequency of the receiver through the control interface Paramters: ---------- :param: center_freq: Required center frequency to set [Hz] :type: center_freq: float """ if self.receiver_connection_status: # Check connection self.daq_center_freq = int(center_freq) # Set center frequency cmd="FREQ" freq_bytes=pack("Q",int(center_freq)) msg_bytes=(cmd.encode()+freq_bytes+bytearray(116)) try: _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,)) except: errorMsg = sys.exc_info()[0] self.logger.error("Unable to start communication thread") self.logger.error("Error message: {:s}".format(errorMsg)) def set_if_gain(self, gain): """ Configures the IF gain of the receiver through the control interface Paramters: ---------- :param: gain: IF gain value [dB] :type: gain: int """ if self.receiver_connection_status: # Check connection self.daq_rx_gain = gain # Set center frequency cmd="GAIN" gain_list = [] for i in range(0, self.M): gain_list.append(int(gain[i]*10)) #gain_list=[297, 37] #[int(gain*10)]*self.M gain_bytes=pack("I"*self.M, *gain_list) msg_bytes=(cmd.encode()+gain_bytes+bytearray(128-(self.M+1)*4)) try: _thread.start_new_thread(self.ctr_iface_communication, (msg_bytes,)) except: errorMsg = sys.exc_info()[0] self.logger.error("Unable to start communication thread") self.logger.error("Error message: {:s}".format(errorMsg)) def close(self): """ Disconnet the receiver module and the DAQ FW """ self.eth_close()