diff --git a/src/StreamDeck/DeviceManager.py b/src/StreamDeck/DeviceManager.py index e54e47a9..734df903 100644 --- a/src/StreamDeck/DeviceManager.py +++ b/src/StreamDeck/DeviceManager.py @@ -15,6 +15,7 @@ from .Devices.StreamDeckPlus import StreamDeckPlus from .Transport import Transport from .Devices.Mirabox293S import Mirabox293S +from .Devices.AjazzAKP03E import AjazzAKP03E, SoomfonCN002 from .Transport.Dummy import Dummy from .Transport.LibUSBHIDAPI import LibUSBHIDAPI from .ProductIDs import USBVendorIDs, USBProductIDs @@ -112,13 +113,19 @@ def enumerate(self) -> list[StreamDeck]: (USBVendorIDs.USB_VID_ELGATO, USBProductIDs.USB_PID_STREAMDECK_XL_V2, StreamDeckXL), (USBVendorIDs.USB_VID_ELGATO, USBProductIDs.USB_PID_STREAMDECK_XL_V2_MODULE, StreamDeckXL), (USBVendorIDs.USB_VID_ELGATO, USBProductIDs.USB_PID_STREAMDECK_PLUS, StreamDeckPlus), - (USBVendorIDs.USB_VID_MIRABOX, USBProductIDs.USB_PID_MIRABOX_STREAMDOCK_293S, Mirabox293S) + (USBVendorIDs.USB_VID_MIRABOX, USBProductIDs.USB_PID_MIRABOX_STREAMDOCK_293S, Mirabox293S), + (USBVendorIDs.USB_VID_AJAZZ, USBProductIDs.USB_PID_AJAZZ_AKP03E, AjazzAKP03E), + (USBVendorIDs.USB_VID_SOOMFON, USBProductIDs.USB_PID_SOOMFON_CN002, SoomfonCN002) ] streamdecks = list() for vid, pid, class_type in products: found_devices = self.transport.enumerate(vid=vid, pid=pid) + + # This device has a second HID interface as a keyboard + if getattr(class_type, "IGNORE_SECOND_HID_DEVICE", False): + found_devices = found_devices[::2] streamdecks.extend([class_type(d) for d in found_devices]) return streamdecks diff --git a/src/StreamDeck/Devices/AjazzAKP03E.py b/src/StreamDeck/Devices/AjazzAKP03E.py new file mode 100644 index 00000000..59cca6f8 --- /dev/null +++ b/src/StreamDeck/Devices/AjazzAKP03E.py @@ -0,0 +1,328 @@ +# Python Stream Deck Library +# Released under the MIT license +# +# dean [at] fourwalledcubicle [dot] com +# www.fourwalledcubicle.com +# +# Mirabox Stream Dock 293S non-official support +# by Renato Schmidt (github.com/rescbr) + +from .StreamDeck import StreamDeck, ControlType, DialEventType + + +class AjazzAKP03E(StreamDeck): + """ + Represents a physically attached Ajazz AKP03E or soomfon N3 device. + + Basically a Mirabox N3, but with their own quirks. + + Two major related projects helped, along with raw device captures: + + soomfon-plugin: https://github.com/PRL-Digital/soomfon-plugin/ + mirajazz: https://github.com/4ndv/mirajazz + opendeck plugin: https://github.com/4ndv/opendeck-akp03 + """ + + LAYOUT = "N3" + IGNORE_SECOND_HID_DEVICE = True + + KEY_COUNT = 9 + KEY_COLS = 3 + KEY_ROWS = 3 + + DIAL_COUNT = 3 + + KEY_PIXEL_WIDTH = 64 # TODO: check if this is the correct value + KEY_PIXEL_HEIGHT = 64 # TODO: check if this is the correct value + KEY_IMAGE_FORMAT = "JPEG" + KEY_FLIP = (False, False) + KEY_ROTATION = -90 + + DECK_TYPE = "AJAZZ AKP03E" + DECK_VISUAL = True + DECK_TOUCH = False # kind of... it could be used for the side display. + + PACKET_LENGTH = 1024 + NUM_SECONDS_KEEP_ALIVE = 10 + + # the side display uses key ids 0x10, 0x11, 0x12 with 80x80 images. + # match input { + # (0..=6) | 0x25 | 0x30 | 0x31 => read_button_press(input, state), + # 0x90 | 0x91 | 0x50 | 0x51 | 0x60 | 0x61 => read_encoder_value(input), + # 0x33..=0x35 => read_encoder_press(input, state), + # _ => Err(MirajazzError::BadData), + + NO_IMG_KEYS = [0x25, 0x30, 0x31] + + KEY_NUM_TO_DEVICE_KEY_ID = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06] + NO_IMG_KEYS + KEY_DEVICE_KEY_ID_TO_NUM = {value: index for index, value in enumerate(KEY_NUM_TO_DEVICE_KEY_ID)} + + # center, left, right + DIAL_MAPPING = { + "cw": [0x51, 0x91, 0x61], + "ccw": [0x50, 0x90, 0x60], + "push": [0x35, 0x33, 0x34] + } + + DIAL_TURN_CW_LOOKUP = {value: index for index, value in enumerate(DIAL_MAPPING["cw"])} + DIAL_TURN_CCW_LOOKUP = {value: index for index, value in enumerate(DIAL_MAPPING["ccw"])} + DIAL_PUSH_LOOKUP = {value: index for index, value in enumerate(DIAL_MAPPING["push"])} + + # see note in _read_control_states() method. + _key_triggered_last_read = False + + # 64 x 64 black JPEG + BLANK_KEY_IMAGE = [ + 0xff, 0xd8, 0xff, 0xe0, 0x00, 0x10, 0x4a, 0x46, 0x49, 0x46, 0x00, 0x01, 0x01, 0x01, 0x01, 0x2c, 0x01, 0x2c, 0x00, + 0x00, 0xff, 0xdb, 0x00, 0x43, 0x00, 0x03, 0x02, 0x02, 0x03, 0x02, 0x02, 0x03, 0x03, 0x03, 0x03, 0x04, 0x03, 0x03, 0x04, + 0x05, 0x08, 0x05, 0x05, 0x04, 0x04, 0x05, 0x0a, 0x07, 0x07, 0x06, 0x08, 0x0c, 0x0a, 0x0c, 0x0c, 0x0b, 0x0a, 0x0b, 0x0b, + 0x0d, 0x0e, 0x12, 0x10, 0x0d, 0x0e, 0x11, 0x0e, 0x0b, 0x0b, 0x10, 0x16, 0x10, 0x11, 0x13, 0x14, 0x15, 0x15, 0x15, 0x0c, + 0x0f, 0x17, 0x18, 0x16, 0x14, 0x18, 0x12, 0x14, 0x15, 0x14, 0xff, 0xdb, 0x00, 0x43, 0x01, 0x03, 0x04, 0x04, 0x05, 0x04, + 0x05, 0x09, 0x05, 0x05, 0x09, 0x14, 0x0d, 0x0b, 0x0d, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, + 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, + 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0x14, 0xff, + 0xc2, 0x00, 0x11, 0x08, 0x00, 0x40, 0x00, 0x40, 0x03, 0x01, 0x11, 0x00, 0x02, 0x11, 0x01, 0x03, 0x11, 0x01, 0xff, 0xc4, + 0x00, 0x15, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x08, 0xff, 0xc4, 0x00, 0x14, 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0xff, 0xda, 0x00, 0x0c, 0x03, 0x01, 0x00, 0x02, 0x10, 0x03, 0x10, 0x00, 0x00, 0x01, 0x95, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x7f, 0xff, 0xc4, 0x00, 0x14, 0x10, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x60, 0xff, 0xda, 0x00, 0x08, 0x01, 0x01, 0x00, 0x01, 0x05, 0x02, 0x01, 0xff, 0xc4, 0x00, 0x14, + 0x11, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x60, 0xff, 0xda, + 0x00, 0x08, 0x01, 0x03, 0x01, 0x01, 0x3f, 0x01, 0x01, 0xff, 0xc4, 0x00, 0x14, 0x11, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x60, 0xff, 0xda, 0x00, 0x08, 0x01, 0x02, 0x01, 0x01, 0x3f, + 0x01, 0x01, 0xff, 0xc4, 0x00, 0x14, 0x10, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x60, 0xff, 0xda, 0x00, 0x08, 0x01, 0x01, 0x00, 0x06, 0x3f, 0x02, 0x01, 0xff, 0xc4, 0x00, 0x14, 0x10, + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x60, 0xff, 0xda, 0x00, + 0x08, 0x01, 0x01, 0x00, 0x01, 0x3f, 0x21, 0x01, 0xff, 0xda, 0x00, 0x0c, 0x03, 0x01, 0x00, 0x02, 0x00, 0x03, 0x00, 0x00, + 0x00, 0x10, 0x92, 0x49, 0x24, 0x92, 0x49, 0x24, 0x92, 0x49, 0x24, 0x92, 0x49, 0x24, 0x92, 0x49, 0x24, 0x92, 0x49, 0x24, + 0x92, 0x49, 0x24, 0x92, 0x49, 0x24, 0xff, 0xc4, 0x00, 0x14, 0x11, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x60, 0xff, 0xda, 0x00, 0x08, 0x01, 0x03, 0x01, 0x01, 0x3f, 0x10, 0x01, 0xff, + 0xc4, 0x00, 0x14, 0x11, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x60, 0xff, 0xda, 0x00, 0x08, 0x01, 0x02, 0x01, 0x01, 0x3f, 0x10, 0x01, 0xff, 0xc4, 0x00, 0x14, 0x10, 0x01, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x60, 0xff, 0xda, 0x00, 0x08, 0x01, 0x01, + 0x00, 0x01, 0x3f, 0x10, 0x01, 0xff, 0xd9 + ] + + def _convert_key_num_to_device_key_id(self, key): + return self.KEY_NUM_TO_DEVICE_KEY_ID[key] + + def _convert_device_key_id_to_key_num(self, key): + return self.KEY_DEVICE_KEY_ID_TO_NUM[key] + + def _make_payload_for_report_id(self, report_id, payload_data): + payload = bytearray(self.PACKET_LENGTH+1) + payload[0] = report_id + payload[1:len(payload_data)+1] = payload_data + return payload + + def _send_keep_alive(self): + # # connect/ping # CRT\0\0CONNECT + payload = self._make_payload_for_report_id(0x00, [0x43, 0x52, 0x54, 0x00, 0x00, 0x43, 0x4f, 0x4e, 0x4e, 0x45, 0x43, 0x54]) + self.device.write(payload) + + def _read_control_states(self): + """Requires ~10 second poll for a keep-alive of device + + Given the default polling rate of 1kHz, this would be 10k ticks + + TODO It would be better to overload _read/_read_from_suspend, as this adds duplicate code. + There needs to be some cleanup in the _read functions first though.""" + + device_input_data = self.device.read(self.PACKET_LENGTH) # Only read maximum of possible given device + + self.counter += 1 + + if self.counter >= self.read_poll_hz*self.NUM_SECONDS_KEEP_ALIVE: + # print("Sending keep alive") + self._send_keep_alive() + self.counter = 0 + + if device_input_data is None: + return None + + # Check that this is a valid message + # ACK\0\0OK\0 + if(device_input_data.startswith(bytes([0x41, 0x43, 0x4b, 0x00, 0x00, 0x4f, 0x4b, 0x00]))): + read_event = device_input_data[9] + read_value = device_input_data[10] + else: + # we don't know how to handle the response + return None + + # print(f"Received event from 0x{read_event:0x} : 0x{read_value:0x}") + + if read_event in self.KEY_NUM_TO_DEVICE_KEY_ID: # Key Event + + # Save last state to be able to build output array + self._int_key_states[self.KEY_DEVICE_KEY_ID_TO_NUM[read_event]] = bool(read_value) + + return { + ControlType.KEY: self._int_key_states + } + elif read_event in self.DIAL_MAPPING["push"]: # Dial push + + self._int_dial_key_states[self.DIAL_PUSH_LOOKUP[read_event]] = read_value + + return { + ControlType.DIAL: { + DialEventType.PUSH: self._int_dial_key_states, + } + } + elif read_event in self.DIAL_MAPPING["cw"]: # Dial turn + # print(f"Handling CW turn") + states = [0] * self.DIAL_COUNT + states[self.DIAL_TURN_CW_LOOKUP[read_event]] = 1 + return { + ControlType.DIAL: { + DialEventType.TURN: states, + } + } + elif read_event in self.DIAL_MAPPING["ccw"]: # Dial turn + # print(f"Handling CCW turn") + states = [0] * self.DIAL_COUNT + states[self.DIAL_TURN_CCW_LOOKUP[read_event]] = -1 + return { + ControlType.DIAL: { + DialEventType.TURN: states, + } + } + else: + print(f"Unknown event for key id 0x{read_event:0x} received") + return None + + def _reset_key_stream(self): + self.reset() + + def _send_disconnect(self): + # disconnect # CRT\0\0DIS + payload = self._make_payload_for_report_id(0x00, [0x43, 0x52, 0x54, 0x00, 0x00, 0x44, 0x49, 0x53]) + self.device.write(payload) + + def initialize(self): + + self.counter = 0 + self._int_key_states = [False] * self.KEY_COUNT + self._int_dial_key_states = [False] * self.DIAL_COUNT + + self._send_disconnect() + self.set_brightness(0x19) # Based on raw captures + + # Quad CMD # CRT\0\0QUCMD + # From packet capture 43 52 54 00 00 51 55 43 4d 44 11 11 00 11 00 11 + payload = self._make_payload_for_report_id(0x00, [0x43, 0x52, 0x54, 0x00, 0x00, 0x51, 0x55, 0x42, 0x4d, 0x44, 0x11, 0x11, 0x00, 0x11, 0x00, 0x11]) + self.device.write(payload) + + self._send_keep_alive() + + self.clear_all_button_images() + self.clear_button_states() + + + def clear_all_button_images(self): + self.clear_button_image(0xff) + + def clear_button_image(self, key): + + cval = 0xff if key == 0xff else key + 1 + + # clear contents # CRT\0\0CLE #0x00 0x00 0x00 + payload = self._make_payload_for_report_id(0x00, [0x43, 0x52, 0x54, 0x00, 0x00, 0x43, 0x4c, 0x45, 0x00, 0x00, 0x00, cval]) + self.device.write(payload) + + def clear_lcd_displays(self): + + # clear contents # CRT\0\0CLE\0DC + payload = self._make_payload_for_report_id(0x00, [0x43, 0x52, 0x54, 0x0, 0x0, 0x43, 0x4c, 0x45, 0x0, 0x44, 0x43]) + self.device.write(payload) + + def clear_button_states(self): + + self._int_key_states = [False] * self.KEY_COUNT + self._int_dial_key_states = [False] * self.DIAL_COUNT + + # clear contents # CRT\0\0CLB\0DC + payload = self._make_payload_for_report_id(0x00, [0x43, 0x52, 0x54, 0x0, 0x0, 0x43, 0x4c, 0x42, 0x0, 0x44, 0x43]) + self.device.write(payload) + + def halt_device(self): + # clear contents # CRT\0\0HAH + payload = self._make_payload_for_report_id(0x00, [0x43, 0x52, 0x54, 0x00, 0x00, 0x48, 0x41, 0x48]) + self.device.write(payload) + + def reset(self): + self.initialize() + + # Set brightness to full + self.set_brightness(100) + + # Clear all button images + self.clear_all_button_images() + + def set_brightness(self, percent): + if isinstance(percent, float): + percent = int(100.0 * percent) + + percent = min(max(percent, 0), 100) + + # set brightness # CRT\0\0LIG #0x00 0x00 0x00 + payload = self._make_payload_for_report_id(0x00, [0x43, 0x52, 0x54, 0x00, 0x00, 0x4c, 0x49, 0x47, 0x00, 0x00, percent, 0x00]) + self.device.write(payload) + + def get_serial_number(self): + return self.device.serial_number() + + def get_firmware_version(self): + version = self.device.read_input(0x00, self.PACKET_LENGTH + 1) + return self._extract_string(version[1:]) + + def set_key_image(self, key:int, image): + if min(max(key, 0), self.KEY_COUNT) != key: + raise IndexError("Invalid key index {}.".format(key)) + + image = bytes(image or self.BLANK_KEY_IMAGE) + image_payload_page_length = self.PACKET_LENGTH + + if key in self.NO_IMG_KEYS: + raise RuntimeWarning("Requested key index {} has no image.".format(key)) + # TODO determine proper error/warning behavior + return + + key = self._convert_key_num_to_device_key_id(key) + + image_size_uint16_be = int.to_bytes(len(image), 2, 'big', signed=False) + + # start batch # CRT\0\0BAT #0x00 0x00 + command = bytes([0x43, 0x52, 0x54, 0x00, 0x00, 0x42, 0x41, 0x54, 0x00, 0x00]) + image_size_uint16_be + bytes([key]) + payload = self._make_payload_for_report_id(0x00, command) + self.device.write(payload) + + page_number = 0 + bytes_remaining = len(image) + while bytes_remaining > 0: + this_length = min(bytes_remaining, image_payload_page_length) + bytes_sent = page_number * image_payload_page_length + + #send data + payload = self._make_payload_for_report_id(0x00, image[bytes_sent:bytes_sent + this_length]) + self.device.write(payload) + + bytes_remaining = bytes_remaining - this_length + page_number = page_number + 1 + + # stop batch # CRT\0\0STP + payload = self._make_payload_for_report_id(0x00, [0x43, 0x52, 0x54, 0x00, 0x00, 0x53, 0x54, 0x50]) + self.device.write(payload) + + + def set_touchscreen_image(self, image, x_pos=0, y_pos=0, width=0, height=0): + pass + + def set_key_color(self, key, r, g, b): + pass + + def set_screen_image(self, image): + pass + +class SoomfonCN002(AjazzAKP03E): + DECK_TYPE = "Soomfon CN002" diff --git a/src/StreamDeck/Devices/Mirabox293S.py b/src/StreamDeck/Devices/Mirabox293S.py index e82dc22e..b7e63b10 100644 --- a/src/StreamDeck/Devices/Mirabox293S.py +++ b/src/StreamDeck/Devices/Mirabox293S.py @@ -29,7 +29,7 @@ class Mirabox293S(StreamDeck): DECK_VISUAL = True DECK_TOUCH = False # kind of... it could be used for the side display. - PACKET_LENGHT = 512 + PACKET_LENGTH = 512 # the side display uses key ids 0x10, 0x11, 0x12 with 80x80 images. KEY_NUM_TO_DEVICE_KEY_ID = [0x0d, 0x0a, 0x07, 0x04, 0x01, 0x10, 0xe, 0xb, 0x08, 0x05, 0x02, 0x11, 0x0f, 0x0c, 0x09, 0x06, 0x03, 0x12] @@ -89,7 +89,7 @@ def _convert_device_key_id_to_key_num(self, key): def _make_payload_for_report_id(self, report_id, payload_data): - payload = bytearray(self.PACKET_LENGHT + 1) + payload = bytearray(self.PACKET_LENGTH + 1) payload[0] = report_id payload[1:len(payload_data)] = payload_data return payload @@ -102,7 +102,7 @@ def _read_control_states(self): # if a firmware upgrade that supports key down/up events is released, this variable can be removed from the code. if not self._key_triggered_last_read: - device_input_data = self.device.read(self.PACKET_LENGHT) + device_input_data = self.device.read(self.PACKET_LENGTH) if device_input_data is None: return None @@ -152,7 +152,7 @@ def get_serial_number(self): return self.device.serial_number() def get_firmware_version(self): - version = self.device.read_input(0x00, self.PACKET_LENGHT + 1) + version = self.device.read_input(0x00, self.PACKET_LENGTH + 1) return self._extract_string(version[1:]) def set_key_image(self, key, image): @@ -160,7 +160,7 @@ def set_key_image(self, key, image): raise IndexError("Invalid key index {}.".format(key)) image = bytes(image or self.BLANK_KEY_IMAGE) - image_payload_page_length = self.PACKET_LENGHT + image_payload_page_length = self.PACKET_LENGTH key = self._convert_key_num_to_device_key_id(key) diff --git a/src/StreamDeck/ProductIDs.py b/src/StreamDeck/ProductIDs.py index f07f672f..13372c96 100644 --- a/src/StreamDeck/ProductIDs.py +++ b/src/StreamDeck/ProductIDs.py @@ -13,6 +13,8 @@ class USBVendorIDs: USB_VID_ELGATO = 0x0fd9 USB_VID_MIRABOX = 0x5548 + USB_VID_AJAZZ = 0x0300 + USB_VID_SOOMFON = 0x1500 class USBProductIDs: @@ -38,3 +40,5 @@ class USBProductIDs: USB_PID_STREAMDECK_PLUS = 0x0084 USB_PID_STREAMDECK_XL_V2_MODULE = 0x00ba USB_PID_MIRABOX_STREAMDOCK_293S = 0x6670 + USB_PID_AJAZZ_AKP03E = 0x3002 + USB_PID_SOOMFON_CN002 = 0x3001 # Stream Controller SE diff --git a/src/example_basic_grid.py b/src/example_basic_grid.py new file mode 100644 index 00000000..842bde8f --- /dev/null +++ b/src/example_basic_grid.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 + +# Python Stream Deck Library +# Released under the MIT license +# +# dean [at] fourwalledcubicle [dot] com +# www.fourwalledcubicle.com +# + +# Example script showing basic library usage - updating key images with new +# tiles generated at runtime, and responding to button state change events. + +import os +import threading + +from PIL import Image, ImageDraw, ImageFont +from StreamDeck.DeviceManager import DeviceManager +from StreamDeck.ImageHelpers import PILHelper +from StreamDeck.Transport.Transport import TransportError + +# Folder location of image assets used by this example. +ASSETS_PATH = os.path.join(os.path.dirname(__file__), "Assets") + + +# Generates a custom tile with run-time generated text and custom image via the +# PIL module. +def render_key_image(deck, icon_filename, font_filename, label_text): + # Resize the source image asset to best-fit the dimensions of a single key, + # leaving a margin at the bottom so that we can draw the key title + # afterwards. + icon = Image.open(icon_filename) + image = PILHelper.create_scaled_key_image(deck, icon, margins=[0, 0, 20, 0]) + + # Load a custom TrueType font and use it to overlay the key index, draw key + # label onto the image a few pixels from the bottom of the key. + draw = ImageDraw.Draw(image) + font = ImageFont.truetype(font_filename, 14) + draw.text((image.width / 2, image.height - 5), text=label_text, font=font, anchor="ms", fill="white") + draw.line([0, image.height/2, image.width, image.height / 2], width=1) + draw.line([0, image.height/4, image.width, image.height / 4], width=1) + draw.line([0, 3*image.height/4, image.width, 3*image.height / 4], width=1) + + draw.line([image.width / 2, 0, image.width/2, image.height], width=1) + draw.line([image.width / 4, 0, image.width/4, image.height], width=1) + draw.line([3*image.width / 4, 0, 3*image.width/4, image.height], width=1) + + return PILHelper.to_native_key_format(deck, image) + + +# Returns styling information for a key based on its position and state. +def get_key_style(deck, key, state): + # Last button in the example application is the exit button. + exit_key_index = deck.key_count() - 1 + + if key == exit_key_index: + name = "exit" + icon = "{}.png".format("Exit") + font = "Roboto-Regular.ttf" + label = "Bye" if state else "Exit" + else: + name = "emoji" + icon = "{}.png".format("Pressed" if state else "Released") + font = "Roboto-Regular.ttf" + label = "Pressed!" if state else "Key {}".format(key) + + return { + "name": name, + "icon": os.path.join(ASSETS_PATH, icon), + "font": os.path.join(ASSETS_PATH, font), + "label": label + } + + +# Creates a new key image based on the key index, style and current key state +# and updates the image on the StreamDeck. +def update_key_image(deck, key, state): + # Determine what icon and label to use on the generated key. + key_style = get_key_style(deck, key, state) + + # Generate the custom key with the requested image and label. + image = render_key_image(deck, key_style["icon"], key_style["font"], key_style["label"]) + + # Use a scoped-with on the deck to ensure we're the only thread using it + # right now. + with deck: + # Update requested key with the generated image. + deck.set_key_image(key, image) + + +# Prints key state change information, updates rhe key image and performs any +# associated actions when a key is pressed. +def key_change_callback(deck, key, state): + # Print new key state + print("Deck {} Key {} = {}".format(deck.id(), key, state), flush=True) + + # Don't try to draw an image on a touch button + if key >= deck.key_count(): + return + + # Update the key image based on the new key state. + update_key_image(deck, key, state) + + # Check if the key is changing to the pressed state. + if state: + key_style = get_key_style(deck, key, state) + + # When an exit button is pressed, close the application. + if key_style["name"] == "exit": + # Use a scoped-with on the deck to ensure we're the only thread + # using it right now. + with deck: + # Reset deck, clearing all button images. + deck.reset() + + # Close deck handle, terminating internal worker threads. + deck.close() + + +# Prints dial state change information, updates rhe key image and performs any +# associated actions when a key is pressed. +def dial_change_callback(deck, dial, event, amount): + print("Deck {} Dial {} {} = {}".format(deck.id(), dial, event, amount), flush=True) + +if __name__ == "__main__": + streamdecks = DeviceManager().enumerate() + + print("Found {} Stream Deck(s).\n".format(len(streamdecks))) + + for index, deck in enumerate(streamdecks): + # This example only works with devices that have screens. + if not deck.is_visual(): + continue + + deck.open() + deck.reset() + + print("Opened '{}' device (serial number: '{}', fw: '{}')".format( + deck.deck_type(), deck.get_serial_number(), deck.get_firmware_version() + )) + + # Set initial screen brightness to 30%. + deck.set_brightness(30) + + # Set initial key images. + for key in range(deck.key_count()): + update_key_image(deck, key, False) + + # Register callback function for when a key state changes. + deck.set_key_callback(key_change_callback) + deck.set_dial_callback(dial_change_callback) + + # Wait until all application threads have terminated (for this example, + # this is when all deck handles are closed). + for t in threading.enumerate(): + try: + t.join() + except (TransportError, RuntimeError): + pass