-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathsound_decoder.py
181 lines (153 loc) · 4.83 KB
/
sound_decoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from __future__ import division # float division of integers
from collections import deque
import pyaudio
import wave
import struct
import math
import numpy
import sys
import threading
from constants import *
# Mathy things
TWOPI = 2 * math.pi
WINDOW = numpy.hamming(CHUNK_SIZE)
class Decoder:
def __init__(self, debug):
self.win_len = 2 * int(BIT_DURATION * RATE / CHUNK_SIZE / 2)
self.win_fudge = int(self.win_len / 2)
self.buffer = deque()
self.buf_len = self.win_len + self.win_fudge
self.byte = []
self.idlecount = 0
self.do_quit = False
self.character_callback = None
self.idle_callback = None
self.debug = debug
self.p = pyaudio.PyAudio()
self.stream = self.p.open(format = pyaudio.paInt16,
channels = 1,
rate = RATE,
input = True,
frames_per_buffer = AUDIOBUF_SIZE)
listen_thread = threading.Thread(target = self.listen)
listen_thread.start()
def listen(self):
self.do_listen = True
while (True):
if self.do_quit == True:
break
audiostr = self.stream.read(CHUNK_SIZE)
if self.do_listen == False:
continue
self.audio = list(struct.unpack("%dh" % CHUNK_SIZE, audiostr))
self.window()
power0 = self.goertzel(ZERO)
power1 = self.goertzel(ONE)
powerC = self.goertzel(CHARSTART)
base = self.goertzel(BASELINE)
self.update_state(power0, power1, powerC, base)
self.signal_to_bits()
self.process_byte()
self.stream.stop_stream()
self.stream.close()
self.p.terminate()
def attach_character_callback(self, func):
self.character_callback = func
def attach_idle_callback(self, func):
self.idle_callback = func
def start_listening(self):
self.do_listen = True
def stop_listening(self):
self.do_listen = False
def quit(self):
self.do_quit = True
def printbuf(self, buf):
newbuf = ['-' if x is -1 else x for x in buf]
print repr(newbuf).replace(', ', '').replace('\'', '')
# Takes the raw noisy samples of -1/0/1 and finds the bitstream from it
def signal_to_bits(self):
if len(self.buffer) < self.buf_len:
return
buf = list(self.buffer)
if self.debug:
self.printbuf(buf)
costs = [[] for i in range(4)]
for i in range(self.win_fudge):
win = buf[i : self.win_len + i]
costs[0].append(sum(x != 0 for x in win))
costs[1].append(sum(x != 1 for x in win))
costs[2].append(sum(x != 2 for x in win))
costs[3].append(sum(x != -1 for x in win))
min_costs = [min(costs[i]) for i in range(4)]
min_cost = min(min_costs)
signal = min_costs.index(min_cost)
fudge = costs[signal].index(min_cost)
for i in range(self.win_len + fudge):
self.buffer.popleft()
# If we got a signal, put it in the byte!
if signal < 2:
self.byte.append(signal)
# If we get a charstart signal, reset byte!
elif signal == 2:
self.byte = []
# If we get no signal, increment idlecount if we are idling
if signal == 3:
self.idlecount += 1
else:
self.idlecount = 0
if self.idlecount > IDLE_LIMIT and self.idle_callback:
self.idlecount = 0
self.idle_callback()
if self.debug:
if signal == 3:
signal = '-'
sys.stdout.write('')
sys.stdout.write('|{}|\n'.format(signal))
sys.stdout.flush()
# For now, just print out the characters as we go.
def process_byte(self):
if len(self.byte) != 8:
return
ascii = 0
for bit in self.byte:
ascii = (ascii << 1) | bit
char = chr(ascii)
if self.character_callback:
self.character_callback(char)
else:
sys.stdout.write(char)
sys.stdout.flush()
self.byte = []
# Determine the raw input signal of silences, 0s, and 1s. Insert into sliding window.
def update_state(self, power0, power1, powerC, base):
state = -1
if power1 / base > ONE_THRESH:
state = 1
elif power0 / base > ZERO_THRESH:
state = 0
elif powerC / base > CHARSTART_THRESH:
state = 2
# print int(power0 / base), int(power1 / base), int(powerC / base)
if len(self.buffer) >= self.buf_len:
self.buffer.popleft()
self.buffer.append(state)
def goertzel(self, frequency):
prev1 = 0.0
prev2 = 0.0
norm_freq = frequency / RATE
coeff = 2 * math.cos(TWOPI * norm_freq)
for sample in self.audio:
s = sample + (coeff * prev1) - prev2
prev2 = prev1
prev1 = s
power = (prev2 * prev2) + (prev1 * prev1) - (coeff * prev1 * prev2)
return int(power) + 1 # prevents division by zero problems
def window(self):
self.audio = [aud * win for aud, win in zip(self.audio, WINDOW)]
def main():
if len(sys.argv) == 2 and sys.argv[1] == 'debug':
dec = Decoder(1)
else:
dec = Decoder(0)
if __name__ == "__main__":
main()