-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathairspy_rx.py
executable file
·112 lines (89 loc) · 2.89 KB
/
airspy_rx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/python3
import os
from ctypes import *
import time
import sys
import wave
import struct
import argparse
try:
from airspy import *
except ImportError:
print("Probably unsuported version of libairspy")
sys.exit(1)
print("Check airspy version")
AIRSPY_MAX_DEVICES=32
devices = [POINTER(airspy_device_t)() for i in range(0,AIRSPY_MAX_DEVICES+1)]
#device_ = airspy_device_t()
#print(f" device so={sizeof(device_)}")
#device = byref(device_)
print(f" airspy_transfer_t ={sizeof(airspy_transfer_t)}")
parser = argparse.ArgumentParser()
parser.add_argument("-f","--frequency")
parser.add_argument("-s","--samplerate")
parser.add_argument("-o","--outputfile")
args = parser.parse_args()
p = airspy_lib_version_t()
libairspy.airspy_lib_version(byref(p))
print("Libairspy version %d.%d.%d"%(p.major_version, p.minor_version, p.revision))
result = libairspy.airspy_init()
if (result != airspy_error.AIRSPY_SUCCESS):
print("Airspy init failed %s (%d)"%(libairspy.airspy_error_name(result), result))
sys.exit(1)
count = 0
for i in range(0,AIRSPY_MAX_DEVICES):
ref = devices[i]
res = libairspy.airspy_open(devices[i])
if (res != 0):
break
count += 1
if (count<1):
print("No airspy devices found")
exit(1)
device = devices[0]
#ret = libairspy.airspy_open(device)
#if ret != airspy_error.AIRSPY_SUCCESS:
# print("Couldnt open device")
# sys.exit(0)
res = libairspy.airspy_set_sample_type(device,airspy_sample_type.AIRSPY_SAMPLE_INT16_IQ)
if (res != airspy_error.AIRSPY_SUCCESS):
print("Couldnt set samplerate")
sys.exit(1)
sample_rate_count = c_uint32(0)
libairspy.airspy_get_samplerates(device,byref(sample_rate_count),c_uint32(0))
print("Sample rate number %d"%(sample_rate_count.value))
if sample_rate_count.value<1:
print("Found 0 samplerates")
sys.exit(1)
supported_samplerates = (c_uint32 * sample_rate_count.value)(0)
libairspy.airspy_get_samplerates(device,supported_samplerates,sample_rate_count)
print("Supported samplerates")
for i in range(0,count+1):
print("%d"%(supported_samplerates[i]))
ret = libairspy.airspy_set_samplerate(device,supported_samplerates[0])
if ret != airspy_error.AIRSPY_SUCCESS:
print("Couldn't set samplerate")
sys.exit(1)
#libairspy.airspy_board_partid_serialno_read()
#libairspy.airspy_set_rf_bias()
def rx_callback(transfer):
print("RX callback")
print(f"Sample count {transfer.sample_count}")
return 0
print("Here 1")
ret = libairspy.airspy_start_rx(device, airspy_sample_block_cb_fn(rx_callback), None)
if ret != airspy_error.AIRSPY_SUCCESS:
print("Cant start RX")
sys.exit(1)
print("Here 2")
libairspy.airspy_set_freq(device, 100000000) #100M
print("Here 3")
time.sleep(5)
print("Here 4")
count = 10
while (count>0) and (libairspy.airspy_is_streaming(device) == airspy_error.AIRSPY_TRUE):
time.sleep(1)
count -= 1
print("Here 4")
libairspy.airspy_close(device)
libairspy.airspy_exit()