forked from brianhouse/housepy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmidi.py
117 lines (101 loc) · 4.13 KB
/
midi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/env python3
import sys, time, threading, atexit, queue, rtmidi
from rtmidi.midiconstants import NOTE_ON, NOTE_OFF, CONTROLLER_CHANGE
from . import log
from .util import num_args
log_midi = True
class MidiOut(threading.Thread):
def __init__(self, port=0, throttle=0):
threading.Thread.__init__(self)
self.daemon = True
self.port = port
self.throttle = throttle
self.queue = queue.Queue()
self.midi = rtmidi.MidiOut()
available_ports = self.midi.get_ports()
if len(available_ports):
log.info("MIDI outputs available: %s" % available_ports)
else:
log.info("No MIDI outputs available")
if available_ports:
if self.port >= len(available_ports):
log.info("Port index %s not available" % self.port)
exit()
log.info("MIDI OUT opening %s" % available_ports[self.port])
self.midi.open_port(self.port)
else:
log.info("MIDI OUT opening virtual output (\"Python\")...")
self.midi.open_virtual_port("Python")
self.start()
time.sleep(0.5)
def send_control(self, channel, control, value):
self.queue.put((channel, (control, value), None))
def send_note(self, channel, pitch, velocity):
self.queue.put((channel, None, (pitch, velocity)))
def run(self):
while True:
channel, control, note = self.queue.get()
if control is not None:
control, value = control
if type(value) == bool:
value = 127 if value else 0
if log_midi:
log.info("MIDI ctrl %s %s %s" % (channel, control, value))
channel -= 1
self.midi.send_message([CONTROLLER_CHANGE | (channel & 0xF), control, value])
if note is not None:
pitch, velocity = note
if log_midi:
log.info("MIDI note %s %s %s" % (channel, pitch, velocity))
channel -= 1
if velocity:
self.midi.send_message([NOTE_ON | (channel & 0xF), pitch & 0x7F, velocity & 0x7F])
else:
self.midi.send_message([NOTE_OFF | (channel & 0xF), pitch & 0x7F, 0])
if self.throttle > 0:
time.sleep(self.throttle)
class MidiIn(threading.Thread):
def __init__(self, port=0):
threading.Thread.__init__(self)
self.daemon = True
self.port = port
self.queue = queue.Queue()
self.midi = rtmidi.MidiIn()
self.callbacks = {}
available_ports = self.midi.get_ports()
if len(available_ports):
if self.port >= len(available_ports):
log.info("Port index %s not available" % self.port)
exit()
log.info("MIDI IN opening %s" % available_ports[self.port])
self.midi.open_port(self.port)
else:
log.info("No MIDI inputs available")
return
self.start()
time.sleep(0.5)
def run(self):
def receive_control(event, data=None):
message, deltatime = event
nop, control, value = message
self.queue.put((control, value / 127.0))
self.midi.set_callback(receive_control)
while True:
time.sleep(1)
def perform_callbacks(self):
while True:
try:
control, value = self.queue.get_nowait()
except queue.Empty:
return
if control in self.callbacks:
if num_args(self.callbacks[control]) > 0:
self.callbacks[control](value)
else:
self.callbacks[control]()
def callback(self, control, f):
"""For a given control message, call a function"""
self.callbacks[control] = f
midi_out = MidiOut(int(sys.argv[1]) if len(sys.argv) > 1 else 0)
midi_in = MidiIn(int(sys.argv[2]) if len(sys.argv) > 2 else 0)
time.sleep(1)