aboutsummaryrefslogtreecommitdiffstats
path: root/cir_measure.py
diff options
context:
space:
mode:
Diffstat (limited to 'cir_measure.py')
-rwxr-xr-xcir_measure.py157
1 files changed, 132 insertions, 25 deletions
diff --git a/cir_measure.py b/cir_measure.py
index 5ec3f12..8b7a3b6 100755
--- a/cir_measure.py
+++ b/cir_measure.py
@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
#
# This is the main program that
-# - runs rtl_sdr to record files containing samples
+# - runs rtl_tcp to record files containing samples
# - runs correlate_with_ref to calculate the CIR
# - runs a webserver to present the information
#
@@ -17,43 +17,165 @@ import subprocess
import time
import datetime
import multiprocessing as mp
+import threading
+import socket
import correlate_with_ref
import shlex
import argparse
+import collections
+import numpy as np
# The record and correlate tasks run in alternance.
# Maybe later we want to run them simultaneously in a small
# pipeline.
+class RTLSDR_Receiver(threading.Thread):
+ """Connection between the rtlsdr and our script is done using a TCP socket. This
+ class handles running the rtl_tcp tool, and reads the incoming data stream into
+ a local buffer. The buffer size is capped, and works as a FIFO, because analysis
+ of the data is slower than capturing it. We therefore want to lose some data"""
+
+ def __init__(self, options):
+ threading.Thread.__init__(self)
+
+ self.freq = float(options.freq)
+ self.rate = int(options.rate)
+ self.samps = int(options.samps)
+ self.gain = float(options.gain)
+
+ # We want to keep twice the amount of samples
+ # in the queue to have some margin. Samples are
+ # two bytes because they are I/Q interleaved u8
+ self.max_num_bytes = self.samps * 2 * 2
+
+ self.event_stop = threading.Event()
+
+ self.rtl_tcp_port = 59152 # chosen randomly
+
+ self.data_queue = collections.deque()
+
+ # While the data_queue is itself thread-safe, we need to make sure
+ # the consumer cannot preeempt the little housekeeping we do in run()
+ # to keep the maximum queue length.
+ self.data_lock = threading.Lock()
+
+ self.rtlsdr_proc = None
+
+ def run(self):
+ rtl_tcp_cmdline = shlex.split("rtl_tcp -f {} -s {} -g {} -p {}".format(self.freq, self.rate, self.gain, self.rtl_tcp_port))
+ self.rtlsdr_proc = subprocess.Popen(rtl_tcp_cmdline)
+
+ time.sleep(1.5)
+
+ self.sock = socket.socket()
+ self.sock.connect(("localhost", self.rtl_tcp_port))
+
+ while not self.event_stop.is_set():
+ try:
+ samples = self.sock.recv(1024)
+
+ self.data_queue.extend(samples)
+ except:
+ print('Socket error')
+ break
+
+ self.data_lock.acquire()
+
+ # try/catch/except to make sure we release the lock, and
+ # re-raise any exception up
+ try:
+ n_bytes = len(self.data_queue)
+
+ if n_bytes > self.max_num_bytes:
+ num_to_delete = n_bytes - self.max_num_bytes
+ for i in range(num_to_delete):
+ self.data_queue.popleft()
+ except:
+ raise
+ finally:
+ self.data_lock.release()
+ print("Receiver leaving")
+
+ self.sock.close()
+
+ self.rtlsdr_proc.terminate()
+
+ self.rtlsdr_proc.wait()
+
+ print("Receiver thread ends")
+
+ def stop(self):
+ self.event_stop.set()
+ self.join()
+
+ def get_samples(self, num_samples):
+ """Return a string containing num_bytes if that is available,
+ or return None if not enough data available"""
+ ret = None
+
+ num_bytes = num_samples * 2
+
+ self.data_lock.acquire()
+
+ try:
+ n_bytes = len(self.data_queue)
+
+ if n_bytes > num_bytes:
+ ret = "".join(
+ self.data_queue.popleft()
+ for i in range(num_bytes))
+ except:
+ raise
+ finally:
+ self.data_lock.release()
+
+ return ret
+
+
class RTLSDR_CIR_Runner(mp.Process):
def __init__(self, options, iq_file, fig_file):
- """Initialise a new runner, which runs rtl_sdr
+ """Initialise a new runner, which runs rtl_tcp
that will save to iq_file, and run the CIR analysis
that will save to fig_file.
options must contain freq, rate and samps fields"""
mp.Process.__init__(self)
- self.events = mp.Queue()
-
self.freq = float(options.freq)
- self.rate = int(options.rate)
self.samps = int(options.samps)
- self.gain = float(options.gain)
+
+ self.receiver = RTLSDR_Receiver(options)
+
+ self.events = mp.Queue()
self.iq_file = iq_file
self.fig_file = fig_file
def stop(self):
self.events.put("quit")
+ self.join()
def run(self):
+
+ self.receiver.start()
+
while True:
time.sleep(1)
try:
- self.do_one_cir_run()
+ samps = self.receiver.get_samples(self.samps)
+ if samps:
+ print("Got {} samples".format(len(samps)))
+ # The RTLSDR outputs u8 format
+ iq_data = np.array( [ord(c) for c in samps], np.uint8 )
+ self.do_one_cir_run(iq_data)
+ else:
+ print("Got 0 samples")
+
except Exception as e:
print("Exception occurred: {}".format(e))
+ except KeyboardInterrupt:
+ print("Keyhoard Interrupt")
+ break
try:
ev = self.events.get_nowait()
@@ -62,25 +184,11 @@ class RTLSDR_CIR_Runner(mp.Process):
except mp.queues.Empty:
pass
- def do_one_cir_run(self):
- # Build the rtl_sdr command line from the settings in config
- rtl_sdr_cmdline = shlex.split("rtl_sdr -f {} -s {} -g {} -S -".format(self.freq, self.rate, self.gain))
- dd_cmdline = shlex.split("dd of={} bs=2 count={}".format(self.iq_file, self.samps))
-
- # To avoid calling the shell, we do the pipe between rtlsdr and dd using Popen
- rtlsdr_proc = subprocess.Popen(rtl_sdr_cmdline, stdout=subprocess.PIPE)
- dd_proc = subprocess.Popen(dd_cmdline, stdin=rtlsdr_proc.stdout)
-
- # close our connection to the pipe so that rtlsdr gets the SIGPIPE
- rtlsdr_proc.stdout.close()
-
- dd_proc.communicate()
- dd_proc.wait()
- rtlsdr_proc.wait()
+ self.receiver.stop()
- # The RTLSDR outputs u8 format
+ def do_one_cir_run(self, iq_data):
print("Starting correlation")
- cir_corr = correlate_with_ref.CIR_Correlate(self.iq_file, "u8")
+ cir_corr = correlate_with_ref.CIR_Correlate(iq_data=iq_data, iq_format="u8")
title = "Correlation on {}kHz done at {}".format(
int(self.freq / 1000),
@@ -131,4 +239,3 @@ if __name__ == '__main__':
run(host=cli_args.host, port=int(cli_args.port), debug=True, reloader=False)
finally:
rtlsdr_cir.stop()
- rtlsdr_cir.join()