Skip to content

Commit

Permalink
Merge pull request #566 from anarkiwi/sig
Browse files Browse the repository at this point in the history
catch/cleanup from interrupt
  • Loading branch information
anarkiwi authored Mar 2, 2023
2 parents 1a928e2 + cbe7d2e commit 0457c9b
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 36 deletions.
76 changes: 50 additions & 26 deletions gamutrf/sigfinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import json
import logging
import os
import pathlib
import subprocess
import sys
import tempfile
import threading
import time

Expand Down Expand Up @@ -328,7 +330,12 @@ def zstd_file(uncompressed_file):


def process_fft_lines(
args, prom_vars, buff_file, executor, proxy_result, runonce=False
args,
prom_vars,
buff_file,
executor,
proxy_result,
live_file,
):
lastfreq = 0
fftbuffer = []
Expand All @@ -354,11 +361,13 @@ def process_fft_lines(
openlogts = int(time.time())
with open(args.log, mode=mode, encoding="utf-8") as l:
while True:
if not live_file.exists():
return
if not proxy_result.running():
logging.error(
"FFT proxy stopped running: %s", proxy_result.result()
)
sys.exit(-1)
return
now = int(time.time())
if now - last_fft_report > FFT_BUFFER_TIME * 2:
logging.info(
Expand Down Expand Up @@ -448,8 +457,6 @@ def process_fft_lines(
rotate_age = now - openlogts
if rotate_age > args.rotatesecs:
rotatelognow = True
if runonce:
return
fftbuffer.append((row.ts, row.freq, row.pw))
if rotatelognow:
break
Expand All @@ -459,11 +466,13 @@ def process_fft_lines(
executor.submit(zstd_file, new_log)


def fft_proxy(args, buff_file, buffer_time=FFT_BUFFER_TIME, shutdown_str=None):
def fft_proxy(
args, buff_file, buffer_time=FFT_BUFFER_TIME, live_file=None, poll_timeout=1
):
zmq_addr = f"tcp://{args.logaddr}:{args.logport}"
logging.info("connecting to %s", zmq_addr)
context = zmq.Context()
socket = context.socket(zmq.SUB)
zmq_context = zmq.Context()
socket = zmq_context.socket(zmq.SUB)
socket.connect(zmq_addr)
socket.setsockopt_string(zmq.SUBSCRIBE, "")
packets_sent = 0
Expand All @@ -472,17 +481,19 @@ def fft_proxy(args, buff_file, buffer_time=FFT_BUFFER_TIME, shutdown_str=None):
tmp_buff_file = buff_file.replace(tmp_buff_file, "." + tmp_buff_file)
if os.path.exists(tmp_buff_file):
os.remove(tmp_buff_file)
shutdown = False
context = zstandard.ZstdCompressor()
shutdown = False
while not shutdown:
with open(tmp_buff_file, "wb") as zbf:
with context.stream_writer(zbf) as bf:
while not shutdown:
sock_txt = socket.recv()
shutdown = live_file is not None and not live_file.exists()
try:
sock_txt = socket.recv(flags=zmq.NOBLOCK)
except zmq.error.Again:
time.sleep(poll_timeout)
continue
bf.write(sock_txt)
shutdown = (
shutdown_str is not None and sock_txt.find(shutdown_str) != -1
)
now = time.time()
if (
shutdown or now - last_packet_sent_time > buffer_time
Expand All @@ -495,13 +506,12 @@ def fft_proxy(args, buff_file, buffer_time=FFT_BUFFER_TIME, shutdown_str=None):
os.rename(tmp_buff_file, buff_file)


def find_signals(args, prom_vars):
def find_signals(args, prom_vars, executor, live_file):
buff_file = os.path.join(args.buff_path, BUFF_FILE)
if os.path.exists(buff_file):
os.remove(buff_file)
with concurrent.futures.ProcessPoolExecutor(2) as executor:
proxy_result = executor.submit(fft_proxy, args, buff_file)
process_fft_lines(args, prom_vars, buff_file, executor, proxy_result)
proxy_result = executor.submit(fft_proxy, args, buff_file, live_file=live_file)
process_fft_lines(args, prom_vars, buff_file, executor, proxy_result, live_file)


def argument_parser():
Expand Down Expand Up @@ -645,20 +655,34 @@ def main():

logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(message)s")
prom_vars = init_prom_vars()
start_http_server(args.promport)
x = threading.Thread(
target=find_signals,
args=(
args,
prom_vars,
),
)
x.start()
app = falcon.App()
scanner_form = ScannerForm()
result = Result()
active_requests = ActiveRequests()
app.add_route("/", scanner_form)
app.add_route("/result", result)
app.add_route("/requests", active_requests)
bjoern.run(app, "0.0.0.0", args.port) # nosec
start_http_server(args.promport)

with tempfile.TemporaryDirectory() as tmpdir:
live_file = pathlib.Path(os.path.join(tmpdir, "live_file"))
live_file.touch()

with concurrent.futures.ProcessPoolExecutor(2) as executor:
x = threading.Thread(
target=find_signals,
args=(
args,
prom_vars,
executor,
live_file,
),
)
x.start()
try:
bjoern.run(app, "0.0.0.0", args.port) # nosec
except KeyboardInterrupt:
logging.info("interrupt!")
live_file.unlink()
executor.shutdown()
x.join()
36 changes: 26 additions & 10 deletions tests/test_sigfinder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import json
import os
import pathlib
import tempfile
import threading
import time
import unittest
import concurrent.futures
Expand Down Expand Up @@ -145,13 +147,14 @@ def test_argument_parser(self):
argument_parser()

def test_process_fft_lines(self):
with concurrent.futures.ProcessPoolExecutor(1) as executor:
with concurrent.futures.ProcessPoolExecutor() as executor:
proxy_result = executor.submit(null_proxy)
with tempfile.TemporaryDirectory() as tempdir:
test_log = os.path.join(str(tempdir), "test.csv")
test_fftlog = os.path.join(str(tempdir), "fft.csv")
test_fftgraph = os.path.join(str(tempdir), "fft.png")
buff_file = os.path.join(str(tempdir), "buff_file")
live_file = pathlib.Path(os.path.join(str(tempdir), "live_file"))
args = FakeArgs(
test_log,
60,
Expand Down Expand Up @@ -197,14 +200,25 @@ def test_process_fft_lines(self):
output["buckets"][str(freq)] = -50
freq += 1e5
bf.write(bytes(json.dumps(output) + "\n", encoding="utf8"))
process_fft_lines(
args,
prom_vars,
buff_file,
executor,
proxy_result,
runonce=True,
live_file.touch()
process_thread = threading.Thread(
target=process_fft_lines,
args=(
args,
prom_vars,
buff_file,
executor,
proxy_result,
live_file,
),
)
process_thread.start()
for i in range(10):
if os.path.exists(test_fftlog) and os.path.exists(test_fftgraph):
break
time.sleep(1)
live_file.unlink()
process_thread.join()
self.assertTrue(os.path.exists(test_fftlog))
self.assertTrue(os.path.exists(test_fftgraph))

Expand Down Expand Up @@ -233,6 +247,8 @@ def test_fft_proxy(self):
zstd_context = zstandard.ZstdDecompressor()

with tempfile.TemporaryDirectory() as tempdir:
live_file = pathlib.Path(os.path.join(tempdir, "live_file"))
live_file.touch()
buff_file = os.path.join(tempdir, "buff_file")
test_bytes = b"1, 2, 3\n4, 5, 6\n"
shutdown_str = b"shutdown\n"
Expand All @@ -241,13 +257,13 @@ def test_fft_proxy(self):
socket.bind(f"tcp://{args.logaddr}:{args.logport}")

with concurrent.futures.ProcessPoolExecutor(1) as executor:
executor.submit(fft_proxy, args, buff_file, 1, shutdown_str)
executor.submit(fft_proxy, args, buff_file, 1, live_file=live_file)
for _ in range(5):
socket.send(test_bytes)
if os.path.exists(buff_file):
break
time.sleep(1)
socket.send(shutdown_str)
live_file.unlink()
with open(buff_file, "rb") as zbf:
with zstd_context.stream_reader(zbf) as bf:
content = bf.read()
Expand Down

0 comments on commit 0457c9b

Please sign in to comment.