-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAudioSource.py
138 lines (116 loc) · 4.67 KB
/
AudioSource.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python3
import soundfile as sf
import time
import numpy as np
import threading
import os
import logging
# Initialize logging
logging.basicConfig(level=logging.INFO)
samplerate = None
p = None
if not p:
import pyaudio
p = pyaudio.PyAudio()
def list_audio_devices():
global p
for i in range(p.get_device_count()):
dev = p.get_device_info_by_index(i)
if dev['maxInputChannels'] < 1:
continue
print(f"Device {dev['name']} ({i})")
def RealTimeAudioSource(source):
global p, samplerate
# Initialize audio capture
os.environ['PA_ALSA_PLUGHW'] = '1'
os.environ['PYTHONWARNINGS'] = 'ignore'
bufflen = 2**16
def get_audio_device_index(name):
for i in range(p.get_device_count()):
dev = p.get_device_info_by_index(i)
if dev['maxInputChannels'] < 1:
continue
if not name:
print(f"Device {dev['name']} ({i})")
elif name in dev['name']:
return i
return None
def get_preferred_samplerate(dev_index):
dev = p.get_device_info_by_index(dev_index)
for rate in [48000, 44100, 96000, 192000]:
if dev['defaultSampleRate'] == rate:
return rate
raise ValueError('No supported sample rate found')
def _capture_audio():
nonlocal buffer, write_index, stop_flag
while not stop_flag:
try:
# Capture new audio data
data = stream.read(bufflen, exception_on_overflow=False)
new_data = np.frombuffer(data, dtype=np.int16)
# Safely update the buffer in a thread-safe manner (circular buffer)
with lock:
# Find the space available and wrap around if necessary
end_index = (write_index + len(new_data)) % buffer.size
# If no wrap-around, directly copy
if write_index < end_index:
buffer[write_index:end_index] = new_data
else:
# If wrapping, copy in two parts
buffer[write_index:] = new_data[:buffer.size - write_index]
buffer[:end_index] = new_data[buffer.size - write_index:]
# Update write_index
write_index = end_index
except OSError as e:
if e.errno == -9981:
logging.error('input overflowed: skipping buffer')
source = get_audio_device_index(source)
if source is None:
logging.error('Audio input device found')
raise RuntimeError('No audio input device found')
samplerate = get_preferred_samplerate(source)
stream = p.open(format=pyaudio.paInt16, channels=1, rate=samplerate, input=True, frames_per_buffer=1024, input_device_index=source)
# Initialize circular buffer and threading
buffer = np.zeros(bufflen, dtype=np.int16)
write_index = 0
lock = threading.Lock()
stop_flag = False
capture_thread = threading.Thread(target=_capture_audio)
capture_thread.start()
# Generator to yield the latest audio data
while True:
t0 = time.time()
# Safely access the buffer and return the last `chunksize` samples
with lock:
# Return a copy of the circular buffer, starting from the correct point
if write_index == bufflen:
chunk = buffer.copy() # No wrapping needed
else:
# Return the last `chunksize` samples in proper order (handle wrap-around)
chunk = np.concatenate((buffer[write_index:], buffer[:write_index]))
yield chunk
def FileAudioSource(testdir):
global samplerate
files = os.listdir(testdir)
while True:
for f in files:
fullpath = os.path.join(testdir, f)
if not (fullpath and fullpath.lower().endswith('.wav')):
raise ValueError('Only .wav files are supported')
with sf.SoundFile(fullpath) as audio_file:
samplerate = audio_file.samplerate
n_samples = len(audio_file)
t0 = time.time()
p0 = 0
p1 = 0
while p1 < n_samples:
now = time.time()
p0 = p1
p1 = int((now - t0) * samplerate)
audio_file.seek(p0)
chunk = audio_file.read(p1 - p0, dtype='float32')
# Convert to mono if necessary
if len(chunk.shape) == 2:
chunk = chunk.mean(axis=1)
yield chunk
files = os.listdir(testdir)