-
Notifications
You must be signed in to change notification settings - Fork 0
/
FBQ_xcvr_epy_block_1_0.py
150 lines (125 loc) · 5.47 KB
/
FBQ_xcvr_epy_block_1_0.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
Embedded Python Blocks:
Each time this file is saved, GRC will instantiate the first class it finds
to get ports and parameters of your block. The arguments to __init__ will
be the parameters. All of them are required to have default values!
"""
import threading
import typing as T
import numpy as np
import pmt
from gnuradio import gr
import time
class one_shot:
def __init__(self, delay: float, callback: T.Callable):
self.delay = delay
self.timer = None
self.callback = callback
self.last_start = None
self.to_sleep = None
self.thread = None
def countdown(self):
now = time.time_ns()
if self.to_sleep is None:
self.last_start = None
#print("Timer timed out, at %d last_start is now %s" % (now, self.last_start))
self.timer.cancel()
self.callback()
return
self.timer = threading.Timer(self.to_sleep, self.countdown)
self.timer.start()
self.last_start = now
#print("Timer restarted at %d for %f seconds" % (self.last_start, self.to_sleep))
self.to_sleep = None
def trigger(self):
now = time.time_ns()
if self.last_start is None:
self.to_sleep = self.delay
self.timer = threading.Timer(self.to_sleep, self.countdown)
self.timer.start()
self.last_start = now
#print("Timer started at %d for %f seconds" % (self.last_start, self.to_sleep))
self.to_sleep = None
return
else:
#print("Trig, last_start=%f" % self.last_start)
self.to_sleep = (now - self.last_start) / 1000000000
#print("Timer retrig at %d, To_sleep = %f" % (now, self.to_sleep))
# noinspection PyPep8Naming
class blk(gr.sync_block): # other base classes are basic_block, decim_block, interp_block
"""VOX detector - sends a message when detecting a signal and another some time after the signal has been absent
threshold is the signal level needed to trigger an on-air message
attack is the number of consecutive samples needed before the message is sent
delay is the number of seconds after the last sample over the threshold level that the off-air
message is sent.
The on-air message is represented by the message pair ("onair", 1) and the off-air message is represented by the
message pair ("onair", 0)
The output stream contains 100 zeros if the last message sent is am off-air message and 100 ones
if the last message sent is a on-air message. Whenever a message is sent a ramp string of samples
are sent, going from 0 to 1 on any on-air signal and from 1 to 0 on any off-air signal.
Note that the incoming stream needs to represent the volume level of the audio, not the audio samples themselves.
"""
def __init__(self, threshold=5, attack=10, delay=0.5): # only default arguments here
"""arguments to this function show up as parameters in GRC"""
gr.sync_block.__init__(
self,
name='VOX detector', # will show up in GRC
in_sig=[np.float32],
out_sig=None
)
# if an attribute with the same name as a parameter is found,
# a callback is registered (properties work, too).
self.threshold = threshold
self.attack = attack
self.delay = delay
self.attack_counter = attack # Counts down to 0
self.one_shot = None
self.message_port_register_out(pmt.intern("onair"))
self.onair = False
# self.to_output = []
def send_onair_message(self):
if not self.onair:
self.message_port_pub(pmt.intern("onair"), pmt.cons(pmt.string_to_symbol("onair"), pmt.to_pmt(1)))
self.onair = True
# self.ramp_up()
def send_offair_message(self):
if self.onair:
self.message_port_pub(pmt.intern("onair"), pmt.cons(pmt.string_to_symbol("onair"), pmt.to_pmt(0)))
self.onair = False
# self.ramp_down()
def ramp_up(self):
self.to_output = []
for i in range(0, 128):
self.to_output.append(i/128)
def ramp_down(self):
self.to_output = []
for i in range(0, 100):
self.to_output.append(1 - i/128)
def work(self, input_items, output_items):
# print("Input0(%f)=" % len(input_items[0]), input_items[0])
# print("Output0(%d)=" % len(output_items[0]), output_items[0])
for item in input_items[0]:
if item is not None:
item = -item if item < 0 else item
if item > self.threshold:
self.attack_counter -= 1
if self.attack_counter <= 0:
self.send_onair_message()
self.trigger_one_shot()
self.attack_counter = self.attack
else:
self.attack_counter = self.attack
else:
self.attack_counter = self.attack
self.consume(0, len(input_items[0]))
# self.produce(0, 128)
# output_items[0] = self.to_output
# self.to_output = [1 if self.onair else 0 for i in range(0, 128)]
# return len(output_items[0])
return 0
def trigger_one_shot(self):
if self.one_shot is None:
self.one_shot = one_shot(self.delay, self.send_offair_message)
# print("Oneshot created")
self.one_shot.trigger()
# print("Oneshot triggered")