From 5cdc6a76019ae9999850106967b8d7e2d023687d Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Sat, 10 Nov 2018 17:03:08 +0100 Subject: [PATCH 01/16] cleanups and refactoring --- pytuya/__init__.py | 212 ++++++++++++++++++--------------------------- tests.py | 79 +++++++++-------- 2 files changed, 127 insertions(+), 164 deletions(-) diff --git a/pytuya/__init__.py b/pytuya/__init__.py index 737b420..e5db863 100644 --- a/pytuya/__init__.py +++ b/pytuya/__init__.py @@ -8,7 +8,6 @@ # # Tested with Python 2.7 and Python 3.6.1 only - import base64 from hashlib import md5 import json @@ -19,21 +18,20 @@ import colorsys try: - #raise ImportError + # raise ImportError import Crypto from Crypto.Cipher import AES # PyCrypto except ImportError: Crypto = AES = None import pyaes # https://github.com/ricmoo/pyaes - version_tuple = (7, 0, 3) version = version_string = __version__ = '%d.%d.%d' % version_tuple __author__ = 'clach04' log = logging.getLogger(__name__) logging.basicConfig() # TODO include function name/line numbers in log -#log.setLevel(level=logging.DEBUG) # Debug hack! +# log.setLevel(level=logging.DEBUG) # Debug hack! log.info('Python %s on %s', sys.version, sys.platform) if Crypto is None: @@ -49,11 +47,12 @@ IS_PY2 = sys.version_info[0] == 2 + class AESCipher(object): def __init__(self, key): - #self.bs = 32 # 32 work fines for ON, does not work for OFF. Padding different compared to js version https://github.com/codetheweb/tuyapi/ self.bs = 16 self.key = key + def encrypt(self, raw): if Crypto: raw = self._pad(raw) @@ -64,46 +63,36 @@ def encrypt(self, raw): cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 crypted_text = cipher.feed(raw) crypted_text += cipher.feed() # flush final block - #print('crypted_text %r' % crypted_text) - #print('crypted_text (%d) %r' % (len(crypted_text), crypted_text)) crypted_text_b64 = base64.b64encode(crypted_text) - #print('crypted_text_b64 (%d) %r' % (len(crypted_text_b64), crypted_text_b64)) return crypted_text_b64 + def decrypt(self, enc): enc = base64.b64decode(enc) - #print('enc (%d) %r' % (len(enc), enc)) - #enc = self._unpad(enc) - #enc = self._pad(enc) - #print('upadenc (%d) %r' % (len(enc), enc)) if Crypto: cipher = AES.new(self.key, AES.MODE_ECB) raw = cipher.decrypt(enc) - #print('raw (%d) %r' % (len(raw), raw)) return self._unpad(raw).decode('utf-8') - #return self._unpad(cipher.decrypt(enc)).decode('utf-8') else: cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 plain_text = cipher.feed(enc) plain_text += cipher.feed() # flush final block return plain_text + def _pad(self, s): padnum = self.bs - len(s) % self.bs return s + padnum * chr(padnum).encode() + @staticmethod def _unpad(s): - return s[:-ord(s[len(s)-1:])] + return s[:-ord(s[len(s) - 1:])] def bin2hex(x, pretty=False): - if pretty: - space = ' ' - else: - space = '' + space = ' ' if pretty else '' if IS_PY2: - result = ''.join('%02X%s' % (ord(y), space) for y in x) - else: - result = ''.join('%02X%s' % (y, space) for y in x) - return result + x = [ord(xi) for xi in x] + + return ''.join('%02X%s' % (y, space) for y in x) def hex2bin(x): @@ -112,26 +101,29 @@ def hex2bin(x): else: return bytes.fromhex(x) + # This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi payload_dict = { - "device": { - "status": { - "hexByte": "0a", - "command": {"gwId": "", "devId": ""} - }, - "set": { - "hexByte": "07", - "command": {"devId": "", "uid": "", "t": ""} - }, - "prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte) - "suffix": "000000000000aa55" - } + "device": { + "status": { + "hexByte": "0a", + "command": {"gwId": "", "devId": ""} + }, + "set": { + "hexByte": "07", + "command": {"devId": "", "uid": "", "t": ""} + }, + "prefix": "000055aa00000000000000", + # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command + # + suffix (unclear if multiple bytes used for length, zero padding implies could be more than one byte) + "suffix": "000000000000aa55" + } } + class XenonDevice(object): def __init__(self, dev_id, address, local_key=None, dev_type=None, connection_timeout=10): - """ - Represents a Tuya device. + """ Represents a Tuya device. Args: dev_id (str): The device id. @@ -146,19 +138,17 @@ def __init__(self, dev_id, address, local_key=None, dev_type=None, connection_ti """ self.id = dev_id self.address = address - self.local_key = local_key self.local_key = local_key.encode('latin1') self.dev_type = dev_type self.connection_timeout = connection_timeout - + self.cipher = None self.port = 6668 # default - do not expect caller to pass in def __repr__(self): return '%r' % ((self.id, self.address),) # FIXME can do better than this def _send_receive(self, payload): - """ - Send single buffer `payload` and receive a single buffer. + """ Send single buffer `payload` and receive a single buffer. Args: payload(bytes): Data to send. @@ -173,16 +163,13 @@ def _send_receive(self, payload): return data def generate_payload(self, command, data=None): - """ - Generate the payload to send. + """ Generate the payload to send. Args: - command(str): The type of command. - This is one of the entries from payload_dict - data(dict, optional): The data to be send. - This is what will be passed via the 'dps' entry + command(str): The type of command. This is one of the entries from payload_dict + data(dict, optional): The data to be send. This is what will be passed via the 'dps' entry """ - json_data = payload_dict[self.dev_type][command]['command'] + json_data = payload_dict[self.dev_type][command]['command'].copy() if 'gwId' in json_data: json_data['gwId'] = self.id @@ -198,59 +185,34 @@ def generate_payload(self, command, data=None): # Create byte buffer from hex data json_payload = json.dumps(json_data) - #print(json_payload) json_payload = json_payload.replace(' ', '') # if spaces are not removed device does not respond! json_payload = json_payload.encode('utf-8') log.debug('json_payload=%r', json_payload) if command == SET: # need to encrypt - #print('json_payload %r' % json_payload) self.cipher = AESCipher(self.local_key) # expect to connect and then disconnect to set new json_payload = self.cipher.encrypt(json_payload) - #print('crypted json_payload %r' % json_payload) - preMd5String = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES + b'||' + self.local_key - #print('preMd5String %r' % preMd5String) + pre_md5_str = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES + b'||' + self.local_key m = md5() - m.update(preMd5String) - #print(repr(m.digest())) + m.update(pre_md5_str) hexdigest = m.hexdigest() - #print(hexdigest) - #print(hexdigest[8:][:16]) json_payload = PROTOCOL_VERSION_BYTES + hexdigest[8:][:16].encode('latin1') + json_payload - #print('data_to_send') - #print(json_payload) - #print('crypted json_payload (%d) %r' % (len(json_payload), json_payload)) - #print('json_payload %r' % repr(json_payload)) - #print('json_payload len %r' % len(json_payload)) - #print(bin2hex(json_payload)) self.cipher = None # expect to connect and then disconnect to set new - postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix']) - #print('postfix_payload %r' % postfix_payload) - #print('postfix_payload %r' % len(postfix_payload)) - #print('postfix_payload %x' % len(postfix_payload)) - #print('postfix_payload %r' % hex(len(postfix_payload))) + assert len(postfix_payload) <= 0xff postfix_payload_hex_len = '%x' % len(postfix_payload) # TODO this assumes a single byte 0-255 (0x00-0xff) - buffer = hex2bin( payload_dict[self.dev_type]['prefix'] + - payload_dict[self.dev_type][command]['hexByte'] + - '000000' + - postfix_payload_hex_len ) + postfix_payload - #print('command', command) - #print('prefix') - #print(payload_dict[self.dev_type][command]['prefix']) - #print(repr(buffer)) - #print(bin2hex(buffer, pretty=True)) - #print(bin2hex(buffer, pretty=False)) - #print('full buffer(%d) %r' % (len(buffer), buffer)) - return buffer - + + return hex2bin(payload_dict[self.dev_type]['prefix'] + payload_dict[self.dev_type][command]['hexByte'] + + '000000' + postfix_payload_hex_len) + postfix_payload + + class Device(XenonDevice): def __init__(self, dev_id, address, local_key=None, dev_type=None): super(Device, self).__init__(dev_id, address, local_key, dev_type) - + def status(self): log.debug('status() entry') # open device, send request, then close connection @@ -261,8 +223,6 @@ def status(self): result = data[20:-8] # hard coded offsets log.debug('result=%r', result) - #result = data[data.find('{'):data.rfind('}')+1] # naive marker search, hope neither { nor } occur in header/footer - #print('result %r' % result) if result.startswith(b'{'): # this is the regular expected code path if not isinstance(result, str): @@ -270,10 +230,10 @@ def status(self): result = json.loads(result) elif result.startswith(PROTOCOL_VERSION_BYTES): # got an encrypted payload, happens occasionally - # expect resulting json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM} + # expect json to look similar to:: {"devId":"ID","dps":{"1":true,"2":0},"t":EPOCH_SECS,"s":3_DIGIT_NUM} # NOTE dps.2 may or may not be present result = result[len(PROTOCOL_VERSION_BYTES):] # remove version header - result = result[16:] # remove (what I'm guessing, but not confirmed is) 16-bytes of MD5 hexdigest of payload + result = result[16:] # remove first 16-bytes - MD5 hexdigest of payload (guess, unconfirmed) cipher = AESCipher(self.local_key) result = cipher.decrypt(result) log.debug('decrypted result=%r', result) @@ -286,8 +246,7 @@ def status(self): return result def set_status(self, on, switch=1): - """ - Set status of the device to 'on' or 'off'. + """ Set status of the device to 'on' or 'off'. Args: on(bool): True for 'on', False for 'off'. @@ -296,8 +255,8 @@ def set_status(self, on, switch=1): # open device, send request, then close connection if isinstance(switch, int): switch = str(switch) # index and payload is a string - payload = self.generate_payload(SET, {switch:on}) - #print('payload %r' % payload) + payload = self.generate_payload(SET, {switch: on}) + # print('payload %r' % payload) data = self._send_receive(payload) log.debug('set_status received data=%r', data) @@ -305,16 +264,15 @@ def set_status(self, on, switch=1): return data def turn_on(self, switch=1): - """Turn the device on""" + """ Turn the device on """ self.set_status(True, switch) def turn_off(self, switch=1): - """Turn the device off""" + """ Turn the device off """ self.set_status(False, switch) def set_timer(self, num_secs): - """ - Set a timer. + """ Set a timer. Args: num_secs(int): Number of seconds @@ -328,27 +286,29 @@ def set_timer(self, num_secs): devices_numbers.sort() dps_id = devices_numbers[-1] - payload = self.generate_payload(SET, {dps_id:num_secs}) + payload = self.generate_payload(SET, {dps_id: num_secs}) data = self._send_receive(payload) log.debug('set_timer received data=%r', data) return data + class OutletDevice(Device): def __init__(self, dev_id, address, local_key=None): dev_type = 'device' super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type) + class BulbDevice(Device): - DPS_INDEX_ON = '1' - DPS_INDEX_MODE = '2' + DPS_INDEX_ON = '1' + DPS_INDEX_MODE = '2' DPS_INDEX_BRIGHTNESS = '3' DPS_INDEX_COLOURTEMP = '4' - DPS_INDEX_COLOUR = '5' + DPS_INDEX_COLOUR = '5' - DPS = 'dps' + DPS = 'dps' DPS_MODE_COLOUR = 'colour' - DPS_MODE_WHITE = 'white' + DPS_MODE_WHITE = 'white' def __init__(self, dev_id, address, local_key=None): dev_type = 'device' @@ -356,8 +316,7 @@ def __init__(self, dev_id, address, local_key=None): @staticmethod def _rgb_to_hexvalue(r, g, b): - """ - Convert an RGB value to the hex representation expected by tuya. + """ Convert an RGB value to the hex representation expected by tuya. Index '5' (DPS_INDEX_COLOUR) is assumed to be in the format: rrggbb0hhhssvv @@ -371,12 +330,12 @@ def _rgb_to_hexvalue(r, g, b): g(int): Value for the colour green as int from 0-255. b(int): Value for the colour blue as int from 0-255. """ - rgb = [r,g,b] - hsv = colorsys.rgb_to_hsv(rgb[0]/255, rgb[1]/255, rgb[2]/255) + rgb = [r, g, b] + hsv = colorsys.rgb_to_hsv(rgb[0] / 255, rgb[1] / 255, rgb[2] / 255) hexvalue = "" for value in rgb: - temp = str(hex(int(value))).replace("0x","") + temp = str(hex(int(value))).replace("0x", "") if len(temp) == 1: temp = "0" + temp hexvalue = hexvalue + temp @@ -384,7 +343,7 @@ def _rgb_to_hexvalue(r, g, b): hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)] hexvalue_hsv = "" for value in hsvarray: - temp = str(hex(int(value))).replace("0x","") + temp = str(hex(int(value))).replace("0x", "") if len(temp) == 1: temp = "0" + temp hexvalue_hsv = hexvalue_hsv + temp @@ -398,8 +357,7 @@ def _rgb_to_hexvalue(r, g, b): @staticmethod def _hexvalue_to_rgb(hexvalue): """ - Converts the hexvalue used by tuya for colour representation into - an RGB value. + Converts the hexvalue used by tuya for colour representation into an RGB value. Args: hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() @@ -407,14 +365,12 @@ def _hexvalue_to_rgb(hexvalue): r = int(hexvalue[0:2], 16) g = int(hexvalue[2:4], 16) b = int(hexvalue[4:6], 16) - - return (r, g, b) + return r, g, b @staticmethod def _hexvalue_to_hsv(hexvalue): """ - Converts the hexvalue used by tuya for colour representation into - an HSV value. + Converts the hexvalue used by tuya for colour representation into an HSV value. Args: hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() @@ -426,8 +382,7 @@ def _hexvalue_to_hsv(hexvalue): return (h, s, v) def set_colour(self, r, g, b): - """ - Set colour of an rgb bulb. + """ Set colour of an rgb bulb. Args: r(int): Value for the colour red as int from 0-255. @@ -451,8 +406,7 @@ def set_colour(self, r, g, b): return data def set_white(self, brightness, colourtemp): - """ - Set white coloured theme of an rgb bulb. + """ Set white coloured theme of an rgb bulb. Args: brightness(int): Value for the brightness (25-255). @@ -472,8 +426,7 @@ def set_white(self, brightness, colourtemp): return data def set_brightness(self, brightness): - """ - Set the brightness value of an rgb bulb. + """ Set the brightness value of an rgb bulb. Args: brightness(int): Value for the brightness (25-255). @@ -486,8 +439,7 @@ def set_brightness(self, brightness): return data def set_colourtemp(self, colourtemp): - """ - Set the colour temperature of an rgb bulb. + """ Set the colour temperature of an rgb bulb. Args: colourtemp(int): Value for the colour temperature (0-255). @@ -500,30 +452,30 @@ def set_colourtemp(self, colourtemp): return data def brightness(self): - """Return brightness value""" + """ Return brightness value """ return self.status()[self.DPS][self.DPS_INDEX_BRIGHTNESS] def colourtemp(self): - """Return colour temperature""" + """ Return colour temperature """ return self.status()[self.DPS][self.DPS_INDEX_COLOURTEMP] def colour_rgb(self): - """Return colour as RGB value""" + """ Return colour as RGB value """ hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR] return BulbDevice._hexvalue_to_rgb(hexvalue) def colour_hsv(self): - """Return colour as HSV value""" + """ Return colour as HSV value """ hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR] return BulbDevice._hexvalue_to_hsv(hexvalue) def state(self): status = self.status() state = { - 'is_on' : status[self.DPS][self.DPS_INDEX_ON], - 'mode' : status[self.DPS][self.DPS_INDEX_MODE], - 'brightness' : status[self.DPS][self.DPS_INDEX_BRIGHTNESS], - 'colourtemp' : status[self.DPS][self.DPS_INDEX_COLOURTEMP], - 'colour' : status[self.DPS][self.DPS_INDEX_COLOUR], - } + 'is_on': status[self.DPS][self.DPS_INDEX_ON], + 'mode': status[self.DPS][self.DPS_INDEX_MODE], + 'brightness': status[self.DPS][self.DPS_INDEX_BRIGHTNESS], + 'colourtemp': status[self.DPS][self.DPS_INDEX_COLOURTEMP], + 'colour': status[self.DPS][self.DPS_INDEX_COLOUR], + } return state diff --git a/tests.py b/tests.py index 826b067..33f43d0 100755 --- a/tests.py +++ b/tests.py @@ -1,28 +1,26 @@ #!/usr/bin/env python3 +import json import logging - +import struct +import pytuya import unittest + try: from unittest.mock import MagicMock # Python 3 except ImportError: from mock import MagicMock # py2 use https://pypi.python.org/pypi/mock -from hashlib import md5 -import json -import logging -import struct + # Enable info logging to see version information log = logging.getLogger('pytuya') log.setLevel(level=logging.INFO) -#log.setLevel(level=logging.DEBUG) # Debug hack! - -import pytuya +# log.setLevel(level=logging.DEBUG) # Debug hack! LOCAL_KEY = '0123456789abcdef' - mock_byte_encoding = 'utf-8' + def compare_json_strings(json1, json2, ignoring_keys=None): json1 = json.loads(json1) json2 = json.loads(json2) @@ -33,20 +31,20 @@ def compare_json_strings(json1, json2, ignoring_keys=None): return json.dumps(json1, sort_keys=True) == json.dumps(json2, sort_keys=True) + def check_data_frame(data, expected_prefix, encrypted=True): prefix = data[:15] suffix = data[-8:] - + if encrypted: - payload_len = struct.unpack(">B",data[15:16])[0] # big-endian, unsigned char + payload_len = struct.unpack(">B", data[15:16])[0] # big-endian, unsigned char version = data[16:19] checksum = data[19:35] encrypted_json = data[35:-8] - json_data = pytuya.AESCipher(LOCAL_KEY.encode(mock_byte_encoding)).decrypt(encrypted_json) else: json_data = data[16:-8].decode(mock_byte_encoding) - + frame_ok = True if prefix != pytuya.hex2bin(expected_prefix): frame_ok = False @@ -57,27 +55,33 @@ def check_data_frame(data, expected_prefix, encrypted=True): frame_ok = False elif version != b"3.1": frame_ok = False - + return json_data, frame_ok - + + def mock_send_receive_set_timer(data): + if not hasattr(mock_send_receive_set_timer, "call_counter"): + mock_send_receive_set_timer.call_counter = 0 if mock_send_receive_set_timer.call_counter == 0: - ret = 20*chr(0x0) + '{"devId":"DEVICE_ID","dps":{"1":false,"2":0}}' + 8*chr(0x0) + ret = 20 * chr(0x0) + '{"devId":"DEVICE_ID","dps":{"1":false,"2":0}}' + 8 * chr(0x0) elif mock_send_receive_set_timer.call_counter == 1: - expected = '{"uid":"DEVICE_ID_HERE","devId":"DEVICE_ID_HERE","t":"","dps":{"2":6666}}' + expected = '{"uid":"ID","devId":"ID","t":"","dps":{"2":6666}}' json_data, frame_ok = check_data_frame(data, "000055aa0000000000000007000000") - + if frame_ok and compare_json_strings(json_data, expected, ['t']): ret = '{"test_result":"SUCCESS"}' else: ret = '{"test_result":"FAIL"}' + else: + raise RuntimeError("unexpected counter of > 1") ret = ret.encode(mock_byte_encoding) mock_send_receive_set_timer.call_counter += 1 return ret - + + def mock_send_receive_set_status(data): - expected = '{"dps":{"1":true},"uid":"DEVICE_ID_HERE","t":"1516117564","devId":"DEVICE_ID_HERE"}' + expected = '{"dps":{"1":true},"uid":"ID","t":"1516117564","devId":"ID"}' json_data, frame_ok = check_data_frame(data, "000055aa0000000000000007000000") if frame_ok and compare_json_strings(json_data, expected, ['t']): @@ -89,8 +93,9 @@ def mock_send_receive_set_status(data): ret = ret.encode(mock_byte_encoding) return ret + def mock_send_receive_status(data): - expected = '{"devId":"DEVICE_ID_HERE","gwId":"DEVICE_ID_HERE"}' + expected = '{"devId":"ID","gwId":"ID"}' json_data, frame_ok = check_data_frame(data, "000055aa000000000000000a000000", False) # FIXME dead code block @@ -100,12 +105,13 @@ def mock_send_receive_status(data): logging.error("json data not the same: {} != {}".format(json_data, expected)) ret = '{"test_result":"FAIL"}' - ret = 20*chr(0) + ret + 8*chr(0) + ret = 20 * chr(0) + ret + 8 * chr(0) ret = ret.encode(mock_byte_encoding) return ret + def mock_send_receive_set_colour(data): - expected = '{"dps":{"2":"colour", "5":"ffffff000000ff"}, "devId":"DEVICE_ID_HERE","uid":"DEVICE_ID_HERE", "t":"1516117564"}' + expected = '{"dps":{"2":"colour", "5":"ffffff000000ff"}, "devId":"ID","uid":"ID", "t":"1516117564"}' json_data, frame_ok = check_data_frame(data, "000055aa0000000000000007000000") @@ -118,8 +124,9 @@ def mock_send_receive_set_colour(data): ret = ret.encode(mock_byte_encoding) return ret + def mock_send_receive_set_white(data): - expected = '{"dps":{"2":"white", "3":255, "4":255}, "devId":"DEVICE_ID_HERE","uid":"DEVICE_ID_HERE", "t":"1516117564"}' + expected = '{"dps":{"2":"white", "3":255, "4":255}, "devId":"ID","uid":"ID", "t":"1516117564"}' json_data, frame_ok = check_data_frame(data, "000055aa0000000000000007000000") if frame_ok and compare_json_strings(json_data, expected, ['t']): @@ -131,53 +138,56 @@ def mock_send_receive_set_white(data): ret = ret.encode(mock_byte_encoding) return ret + class TestXenonDevice(unittest.TestCase): def test_set_timer(self): - d = pytuya.OutletDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) + d = pytuya.OutletDevice('ID', 'IP_ADDRESS_HERE', LOCAL_KEY) d._send_receive = MagicMock(side_effect=mock_send_receive_set_timer) # Reset call_counter and start test mock_send_receive_set_timer.call_counter = 0 result = d.set_timer(6666) - result = result[result.find(b'{'):result.rfind(b'}')+1] - result = result.decode(mock_byte_encoding) # Python 3 (3.5.4 and earlier) workaround to json stdlib "behavior" https://docs.python.org/3/whatsnew/3.6.html#json + result = result[result.find(b'{'):result.rfind(b'}') + 1] + result = result.decode( + mock_byte_encoding) # Python 3 (3.5.4 and earlier) workaround to json stdlib "behavior" https://docs.python.org/3/whatsnew/3.6.html#json result = json.loads(result) # Make sure mock_send_receive_set_timer() has been called twice with correct parameters self.assertEqual(result['test_result'], "SUCCESS") def test_set_status(self): - d = pytuya.OutletDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) + d = pytuya.OutletDevice('ID', 'IP_ADDRESS_HERE', LOCAL_KEY) d._send_receive = MagicMock(side_effect=mock_send_receive_set_status) result = d.set_status(True, 1) - result = result.decode(mock_byte_encoding) # Python 3 (3.5.4 and earlier) workaround to json stdlib "behavior" https://docs.python.org/3/whatsnew/3.6.html#json + result = result.decode( + mock_byte_encoding) # Python 3 (3.5.4 and earlier) workaround to json stdlib "behavior" https://docs.python.org/3/whatsnew/3.6.html#json result = json.loads(result) # Make sure mock_send_receive_set_timer() has been called twice with correct parameters self.assertEqual(result['test_result'], "SUCCESS") def test_status(self): - d = pytuya.OutletDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) + d = pytuya.OutletDevice('ID', 'IP_ADDRESS_HERE', LOCAL_KEY) d._send_receive = MagicMock(side_effect=mock_send_receive_status) result = d.status() # Make sure mock_send_receive_set_timer() has been called twice with correct parameters self.assertEqual(result['test_result'], "SUCCESS") - + def test_set_colour(self): - d = pytuya.BulbDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) + d = pytuya.BulbDevice('ID', 'IP_ADDRESS_HERE', LOCAL_KEY) d._send_receive = MagicMock(side_effect=mock_send_receive_set_colour) - result = d.set_colour(255,255,255) + result = d.set_colour(255, 255, 255) result = result.decode(mock_byte_encoding) result = json.loads(result) self.assertEqual(result['test_result'], "SUCCESS") def test_set_white(self): - d = pytuya.BulbDevice('DEVICE_ID_HERE', 'IP_ADDRESS_HERE', LOCAL_KEY) + d = pytuya.BulbDevice('ID', 'IP_ADDRESS_HERE', LOCAL_KEY) d._send_receive = MagicMock(side_effect=mock_send_receive_set_white) result = d.set_white(255, 255) @@ -186,5 +196,6 @@ def test_set_white(self): self.assertEqual(result['test_result'], "SUCCESS") + if __name__ == '__main__': unittest.main() From a70dfa16ed63decca8dfde7986d3a0f4cc3f8e51 Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Sun, 11 Nov 2018 12:33:09 +0100 Subject: [PATCH 02/16] further refactoring, moving devices into separate module --- pytuya/{__init__.py => devices.py} | 204 ++++------------------------- 1 file changed, 26 insertions(+), 178 deletions(-) rename pytuya/{__init__.py => devices.py} (64%) diff --git a/pytuya/__init__.py b/pytuya/devices.py similarity index 64% rename from pytuya/__init__.py rename to pytuya/devices.py index e5db863..048d870 100644 --- a/pytuya/__init__.py +++ b/pytuya/devices.py @@ -8,110 +8,28 @@ # # Tested with Python 2.7 and Python 3.6.1 only -import base64 from hashlib import md5 import json import logging import socket -import sys import time -import colorsys - -try: - # raise ImportError - import Crypto - from Crypto.Cipher import AES # PyCrypto -except ImportError: - Crypto = AES = None - import pyaes # https://github.com/ricmoo/pyaes - -version_tuple = (7, 0, 3) -version = version_string = __version__ = '%d.%d.%d' % version_tuple -__author__ = 'clach04' +from pytuya.utils import hex2bin, bin2hex, AESCipher, Colour log = logging.getLogger(__name__) logging.basicConfig() # TODO include function name/line numbers in log -# log.setLevel(level=logging.DEBUG) # Debug hack! - -log.info('Python %s on %s', sys.version, sys.platform) -if Crypto is None: - log.info('Using pyaes version %r', pyaes.VERSION) - log.info('Using pyaes from %r', pyaes.__file__) -else: - log.info('Using PyCrypto %r', Crypto.version_info) - log.info('Using PyCrypto from %r', Crypto.__file__) SET = 'set' - PROTOCOL_VERSION_BYTES = b'3.1' -IS_PY2 = sys.version_info[0] == 2 - - -class AESCipher(object): - def __init__(self, key): - self.bs = 16 - self.key = key - - def encrypt(self, raw): - if Crypto: - raw = self._pad(raw) - cipher = AES.new(self.key, mode=AES.MODE_ECB) - crypted_text = cipher.encrypt(raw) - else: - _ = self._pad(raw) - cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 - crypted_text = cipher.feed(raw) - crypted_text += cipher.feed() # flush final block - crypted_text_b64 = base64.b64encode(crypted_text) - return crypted_text_b64 - - def decrypt(self, enc): - enc = base64.b64decode(enc) - if Crypto: - cipher = AES.new(self.key, AES.MODE_ECB) - raw = cipher.decrypt(enc) - return self._unpad(raw).decode('utf-8') - else: - cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 - plain_text = cipher.feed(enc) - plain_text += cipher.feed() # flush final block - return plain_text - - def _pad(self, s): - padnum = self.bs - len(s) % self.bs - return s + padnum * chr(padnum).encode() - - @staticmethod - def _unpad(s): - return s[:-ord(s[len(s) - 1:])] - - -def bin2hex(x, pretty=False): - space = ' ' if pretty else '' - if IS_PY2: - x = [ord(xi) for xi in x] - - return ''.join('%02X%s' % (y, space) for y in x) - - -def hex2bin(x): - if IS_PY2: - return x.decode('hex') - else: - return bytes.fromhex(x) - # This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi payload_dict = { "device": { "status": { - "hexByte": "0a", - "command": {"gwId": "", "devId": ""} + "hexByte": "0a", "command": {"gwId": "", "devId": ""} }, "set": { - "hexByte": "07", - "command": {"devId": "", "uid": "", "t": ""} + "hexByte": "07", "command": {"devId": "", "uid": "", "t": ""} }, "prefix": "000055aa00000000000000", # Next byte is command byte ("hexByte") some zero padding, then length of remaining payload, i.e. command @@ -196,8 +114,7 @@ def generate_payload(self, command, data=None): pre_md5_str = b'data=' + json_payload + b'||lpv=' + PROTOCOL_VERSION_BYTES + b'||' + self.local_key m = md5() m.update(pre_md5_str) - hexdigest = m.hexdigest() - json_payload = PROTOCOL_VERSION_BYTES + hexdigest[8:][:16].encode('latin1') + json_payload + json_payload = PROTOCOL_VERSION_BYTES + m.hexdigest()[8:][:16].encode('latin1') + json_payload self.cipher = None # expect to connect and then disconnect to set new postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix']) @@ -314,73 +231,6 @@ def __init__(self, dev_id, address, local_key=None): dev_type = 'device' super(BulbDevice, self).__init__(dev_id, address, local_key, dev_type) - @staticmethod - def _rgb_to_hexvalue(r, g, b): - """ Convert an RGB value to the hex representation expected by tuya. - - Index '5' (DPS_INDEX_COLOUR) is assumed to be in the format: - rrggbb0hhhssvv - - While r, g and b are just hexadecimal values of the corresponding - Red, Green and Blue values, the h, s and v values (which are values - between 0 and 1) are scaled to 360 (h) and 255 (s and v) respectively. - - Args: - r(int): Value for the colour red as int from 0-255. - g(int): Value for the colour green as int from 0-255. - b(int): Value for the colour blue as int from 0-255. - """ - rgb = [r, g, b] - hsv = colorsys.rgb_to_hsv(rgb[0] / 255, rgb[1] / 255, rgb[2] / 255) - - hexvalue = "" - for value in rgb: - temp = str(hex(int(value))).replace("0x", "") - if len(temp) == 1: - temp = "0" + temp - hexvalue = hexvalue + temp - - hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)] - hexvalue_hsv = "" - for value in hsvarray: - temp = str(hex(int(value))).replace("0x", "") - if len(temp) == 1: - temp = "0" + temp - hexvalue_hsv = hexvalue_hsv + temp - if len(hexvalue_hsv) == 7: - hexvalue = hexvalue + "0" + hexvalue_hsv - else: - hexvalue = hexvalue + "00" + hexvalue_hsv - - return hexvalue - - @staticmethod - def _hexvalue_to_rgb(hexvalue): - """ - Converts the hexvalue used by tuya for colour representation into an RGB value. - - Args: - hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() - """ - r = int(hexvalue[0:2], 16) - g = int(hexvalue[2:4], 16) - b = int(hexvalue[4:6], 16) - return r, g, b - - @staticmethod - def _hexvalue_to_hsv(hexvalue): - """ - Converts the hexvalue used by tuya for colour representation into an HSV value. - - Args: - hexvalue(string): The hex representation generated by BulbDevice._rgb_to_hexvalue() - """ - h = int(hexvalue[7:10], 16) / 360 - s = int(hexvalue[10:12], 16) / 255 - v = int(hexvalue[12:14], 16) / 255 - - return (h, s, v) - def set_colour(self, r, g, b): """ Set colour of an rgb bulb. @@ -397,30 +247,30 @@ def set_colour(self, r, g, b): raise ValueError("The value for blue needs to be between 0 and 255.") print(BulbDevice) - hexvalue = BulbDevice._rgb_to_hexvalue(r, g, b) + hex_value = Colour.rgb_to_hex_value(r, g, b) payload = self.generate_payload(SET, { self.DPS_INDEX_MODE: self.DPS_MODE_COLOUR, - self.DPS_INDEX_COLOUR: hexvalue}) + self.DPS_INDEX_COLOUR: hex_value}) data = self._send_receive(payload) return data - def set_white(self, brightness, colourtemp): + def set_white(self, brightness, colour_temp): """ Set white coloured theme of an rgb bulb. Args: brightness(int): Value for the brightness (25-255). - colourtemp(int): Value for the colour temperature (0-255). + colour_temp(int): Value for the colour temperature (0-255). """ if not 25 <= brightness <= 255: raise ValueError("The brightness needs to be between 25 and 255.") - if not 0 <= colourtemp <= 255: + if not 0 <= colour_temp <= 255: raise ValueError("The colour temperature needs to be between 0 and 255.") payload = self.generate_payload(SET, { self.DPS_INDEX_MODE: self.DPS_MODE_WHITE, self.DPS_INDEX_BRIGHTNESS: brightness, - self.DPS_INDEX_COLOURTEMP: colourtemp}) + self.DPS_INDEX_COLOURTEMP: colour_temp}) data = self._send_receive(payload) return data @@ -438,16 +288,16 @@ def set_brightness(self, brightness): data = self._send_receive(payload) return data - def set_colourtemp(self, colourtemp): + def set_colour_temp(self, colour_temp): """ Set the colour temperature of an rgb bulb. Args: - colourtemp(int): Value for the colour temperature (0-255). + colour_temp(int): Value for the colour temperature (0-255). """ - if not 0 <= colourtemp <= 255: + if not 0 <= colour_temp <= 255: raise ValueError("The colour temperature needs to be between 0 and 255.") - payload = self.generate_payload(SET, {self.DPS_INDEX_COLOURTEMP: colourtemp}) + payload = self.generate_payload(SET, {self.DPS_INDEX_COLOURTEMP: colour_temp}) data = self._send_receive(payload) return data @@ -455,27 +305,25 @@ def brightness(self): """ Return brightness value """ return self.status()[self.DPS][self.DPS_INDEX_BRIGHTNESS] - def colourtemp(self): + def colour_temp(self): """ Return colour temperature """ return self.status()[self.DPS][self.DPS_INDEX_COLOURTEMP] def colour_rgb(self): """ Return colour as RGB value """ - hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR] - return BulbDevice._hexvalue_to_rgb(hexvalue) + hex_value = self.status()[self.DPS][self.DPS_INDEX_COLOUR] + return Colour.hex_value_to_rgb(hex_value) def colour_hsv(self): """ Return colour as HSV value """ - hexvalue = self.status()[self.DPS][self.DPS_INDEX_COLOUR] - return BulbDevice._hexvalue_to_hsv(hexvalue) + hex_value = self.status()[self.DPS][self.DPS_INDEX_COLOUR] + return Colour.hex_value_to_hsv(hex_value) def state(self): - status = self.status() - state = { - 'is_on': status[self.DPS][self.DPS_INDEX_ON], - 'mode': status[self.DPS][self.DPS_INDEX_MODE], - 'brightness': status[self.DPS][self.DPS_INDEX_BRIGHTNESS], - 'colourtemp': status[self.DPS][self.DPS_INDEX_COLOURTEMP], - 'colour': status[self.DPS][self.DPS_INDEX_COLOUR], - } - return state + dps = self.status()[self.DPS] + return dict(is_on=dps[self.DPS_INDEX_ON], + mode=dps[self.DPS_INDEX_MODE], + brightness=dps[self.DPS_INDEX_BRIGHTNESS], + colourtemp=dps[self.DPS_INDEX_COLOURTEMP], + colour=dps[self.DPS_INDEX_COLOUR]) + From 708b1382ae41bb0705703da60fc0eaae03834f09 Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Sun, 11 Nov 2018 16:39:16 +0100 Subject: [PATCH 03/16] further simplications for BulbDevice, refactoring with separate utils.py, minor changes --- pytuya/devices.py | 123 +++++++++++---------- pytuya/utils.py | 267 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 336 insertions(+), 54 deletions(-) create mode 100644 pytuya/utils.py diff --git a/pytuya/devices.py b/pytuya/devices.py index 048d870..f049aeb 100644 --- a/pytuya/devices.py +++ b/pytuya/devices.py @@ -40,7 +40,7 @@ class XenonDevice(object): - def __init__(self, dev_id, address, local_key=None, dev_type=None, connection_timeout=10): + def __init__(self, dev_id, address, local_key=None, dev_type='device', connection_timeout=10): """ Represents a Tuya device. Args: @@ -71,14 +71,25 @@ def _send_receive(self, payload): Args: payload(bytes): Data to send. """ - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - s.settimeout(self.connection_timeout) - s.connect((self.address, self.port)) - s.send(payload) - data = s.recv(1024) - s.close() - return data + + success, tries = False, 0 + while not success and tries < 3: + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + s.settimeout(self.connection_timeout) + s.connect((self.address, self.port)) + s.send(payload) + data = s.recv(1024) + s.close() + success = True + except ConnectionResetError as e: + tries += 1 + + if not success: + logging.warning("failed to communicate %s" % e) + else: + return data def generate_payload(self, command, data=None): """ Generate the payload to send. @@ -127,9 +138,6 @@ def generate_payload(self, command, data=None): class Device(XenonDevice): - def __init__(self, dev_id, address, local_key=None, dev_type=None): - super(Device, self).__init__(dev_id, address, local_key, dev_type) - def status(self): log.debug('status() entry') # open device, send request, then close connection @@ -212,15 +220,14 @@ def set_timer(self, num_secs): class OutletDevice(Device): def __init__(self, dev_id, address, local_key=None): - dev_type = 'device' - super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type) + super(OutletDevice, self).__init__(dev_id, address, local_key, dev_type='device') class BulbDevice(Device): DPS_INDEX_ON = '1' DPS_INDEX_MODE = '2' DPS_INDEX_BRIGHTNESS = '3' - DPS_INDEX_COLOURTEMP = '4' + DPS_INDEX_COLOUR_TEMP = '4' DPS_INDEX_COLOUR = '5' DPS = 'dps' @@ -231,56 +238,43 @@ def __init__(self, dev_id, address, local_key=None): dev_type = 'device' super(BulbDevice, self).__init__(dev_id, address, local_key, dev_type) + def _send(self, mode=None, colour=None, brightness=None, colour_temp=None): + payload_data = {self.DPS_INDEX_MODE: mode, + self.DPS_INDEX_COLOUR: colour, + self.DPS_INDEX_BRIGHTNESS: brightness, + self.DPS_INDEX_COLOUR_TEMP: colour_temp} + payload = self.generate_payload(SET, {k: v for k, v in payload_data.items() if v is not None}) + return self._send_receive(payload) + def set_colour(self, r, g, b): """ Set colour of an rgb bulb. - Args: r(int): Value for the colour red as int from 0-255. g(int): Value for the colour green as int from 0-255. - b(int): Value for the colour blue as int from 0-255. - """ - if not 0 <= r <= 255: - raise ValueError("The value for red needs to be between 0 and 255.") - if not 0 <= g <= 255: - raise ValueError("The value for green needs to be between 0 and 255.") - if not 0 <= b <= 255: - raise ValueError("The value for blue needs to be between 0 and 255.") - - print(BulbDevice) - hex_value = Colour.rgb_to_hex_value(r, g, b) - - payload = self.generate_payload(SET, { - self.DPS_INDEX_MODE: self.DPS_MODE_COLOUR, - self.DPS_INDEX_COLOUR: hex_value}) - data = self._send_receive(payload) - return data + b(int): Value for the colour blue as int from 0-255. """ + + for value, name in ((r, "red"), (b, "blue"), (g, "green")): + if not 0 <= value <= 255: + raise ValueError("The %s for red needs to be between 0 and 255." % name) + + return self._send(self.DPS_INDEX_MODE, colour=Colour.rgb_to_hex_value(r, g, b)) def set_white(self, brightness, colour_temp): """ Set white coloured theme of an rgb bulb. - Args: brightness(int): Value for the brightness (25-255). - colour_temp(int): Value for the colour temperature (0-255). - """ + colour_temp(int): Value for the colour temperature (0-255). """ if not 25 <= brightness <= 255: raise ValueError("The brightness needs to be between 25 and 255.") if not 0 <= colour_temp <= 255: raise ValueError("The colour temperature needs to be between 0 and 255.") - payload = self.generate_payload(SET, { - self.DPS_INDEX_MODE: self.DPS_MODE_WHITE, - self.DPS_INDEX_BRIGHTNESS: brightness, - self.DPS_INDEX_COLOURTEMP: colour_temp}) - - data = self._send_receive(payload) - return data + return self._send(self.DPS_MODE_WHITE, brightness=brightness, colour_temp=colour_temp) def set_brightness(self, brightness): """ Set the brightness value of an rgb bulb. - Args: - brightness(int): Value for the brightness (25-255). - """ + brightness(int): Value for the brightness (25-255). """ if not 25 <= brightness <= 255: raise ValueError("The brightness needs to be between 25 and 255.") @@ -290,16 +284,12 @@ def set_brightness(self, brightness): def set_colour_temp(self, colour_temp): """ Set the colour temperature of an rgb bulb. - Args: - colour_temp(int): Value for the colour temperature (0-255). - """ + colour_temp(int): Value for the colour temperature (0-255). """ if not 0 <= colour_temp <= 255: raise ValueError("The colour temperature needs to be between 0 and 255.") - payload = self.generate_payload(SET, {self.DPS_INDEX_COLOURTEMP: colour_temp}) - data = self._send_receive(payload) - return data + return self._send(colour_temp=colour_temp) def brightness(self): """ Return brightness value """ @@ -307,7 +297,7 @@ def brightness(self): def colour_temp(self): """ Return colour temperature """ - return self.status()[self.DPS][self.DPS_INDEX_COLOURTEMP] + return self.status()[self.DPS][self.DPS_INDEX_COLOUR_TEMP] def colour_rgb(self): """ Return colour as RGB value """ @@ -324,6 +314,31 @@ def state(self): return dict(is_on=dps[self.DPS_INDEX_ON], mode=dps[self.DPS_INDEX_MODE], brightness=dps[self.DPS_INDEX_BRIGHTNESS], - colourtemp=dps[self.DPS_INDEX_COLOURTEMP], + colourtemp=dps[self.DPS_INDEX_COLOUR_TEMP], colour=dps[self.DPS_INDEX_COLOUR]) + +class CoverDevice(Device): + action_open = {'2': '1'} + action_close = {'2': '2'} + action_stop = {'2': '3'} + + def state(self): + status = self.status() + if type(status) is bytes: + return str(status) + return {'1': "opening or open", '2': "closing or closed", '3': "stopped"}.get(status.get('dps').get('1')) + + def send_action(self, action): + payload = self.generate_payload(command=SET, data=action) + self._send_receive(payload) + return + + def open(self): + self.send_action(self.action_open) + + def close(self): + self.send_action(self.action_close) + + def stop(self): + self.send_action(self.action_stop) diff --git a/pytuya/utils.py b/pytuya/utils.py new file mode 100644 index 0000000..7802dfe --- /dev/null +++ b/pytuya/utils.py @@ -0,0 +1,267 @@ +import base64 +import sys +import logging +import time +import json +import colorsys +import re + +try: + # raise ImportError + import Crypto + from Crypto.Cipher import AES # PyCrypto +except ImportError: + Crypto = AES = None + import pyaes # https://github.com/ricmoo/pyaes + +log = logging.getLogger(__name__) +logging.basicConfig() # TODO include function name/line numbers in log +# log.setLevel(level=logging.DEBUG) # Debug hack! + +log.info('Python %s on %s', sys.version, sys.platform) +if Crypto is None: + log.info('Using pyaes version %r', pyaes.VERSION) + log.info('Using pyaes from %r', pyaes.__file__) +else: + log.info('Using PyCrypto %r', Crypto.version_info) + log.info('Using PyCrypto from %r', Crypto.__file__) + +SET = 'set' + +PROTOCOL_VERSION_BYTES = b'3.1' + +IS_PY2 = sys.version_info[0] == 2 + + +class AESCipher(object): + def __init__(self, key): + self.bs = 16 + self.key = key + + def encrypt(self, raw): + if Crypto: + raw = self._pad(raw) + cipher = AES.new(self.key, mode=AES.MODE_ECB) + crypted_text = cipher.encrypt(raw) + else: + _ = self._pad(raw) + cipher = pyaes.blockfeeder.Encrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 + crypted_text = cipher.feed(raw) + crypted_text += cipher.feed() # flush final block + crypted_text_b64 = base64.b64encode(crypted_text) + return crypted_text_b64 + + def decrypt(self, enc): + enc = base64.b64decode(enc) + if Crypto: + cipher = AES.new(self.key, AES.MODE_ECB) + raw = cipher.decrypt(enc) + return self._unpad(raw).decode('utf-8') + else: + cipher = pyaes.blockfeeder.Decrypter(pyaes.AESModeOfOperationECB(self.key)) # no IV, auto pads to 16 + plain_text = cipher.feed(enc) + plain_text += cipher.feed() # flush final block + return plain_text + + def _pad(self, s): + padnum = self.bs - len(s) % self.bs + return s + padnum * chr(padnum).encode() + + @staticmethod + def _unpad(s): + return s[:-ord(s[len(s) - 1:])] + + +def bin2hex(x, pretty=False): + space = ' ' if pretty else '' + if IS_PY2: + x = [ord(xi) for xi in x] + + return ''.join('%02X%s' % (y, space) for y in x) + + +def hex2bin(x): + if IS_PY2: + return x.decode('hex') + else: + return bytes.fromhex(x) + + +class Colour: + @staticmethod + def rgb_to_hex_value(r, g, b): + """ Convert an RGB value to the hex representation expected by tuya. + + Index '5' (DPS_INDEX_COLOUR) is assumed to be in the format: + rrggbb0hhhssvv + + While r, g and b are just hexadecimal values of the corresponding + Red, Green and Blue values, the h, s and v values (which are values + between 0 and 1) are scaled to 360 (h) and 255 (s and v) respectively. + + Args: + r(int): Value for the colour red as int from 0-255. + g(int): Value for the colour green as int from 0-255. + b(int): Value for the colour blue as int from 0-255. + """ + rgb = [r, g, b] + hsv = colorsys.rgb_to_hsv(rgb[0] / 255, rgb[1] / 255, rgb[2] / 255) + + hex_value = "" + for value in rgb: + temp = str(hex(int(value))).replace("0x", "") + if len(temp) == 1: + temp = "0" + temp + hex_value = hex_value + temp + + hsvarray = [int(hsv[0] * 360), int(hsv[1] * 255), int(hsv[2] * 255)] + hex_value_hsv = "" + for value in hsvarray: + temp = str(hex(int(value))).replace("0x", "") + if len(temp) == 1: + temp = "0" + temp + hex_value_hsv = hex_value_hsv + temp + if len(hex_value_hsv) == 7: + hex_value = hex_value + "0" + hex_value_hsv + else: + hex_value = hex_value + "00" + hex_value_hsv + + return hex_value + + @staticmethod + def hex_value_to_rgb(hex_value): + """ + Converts the hex_value used by tuya for colour representation into an RGB value. + + Args: + hex_value(string): The hex representation generated by Color.rgb_to_hex_value() + """ + r = int(hex_value[0:2], 16) + g = int(hex_value[2:4], 16) + b = int(hex_value[4:6], 16) + return r, g, b + + @staticmethod + def hex_value_to_hsv(hex_value): + """ + Converts the hex_value used by tuya for colour representation into an HSV value. + + Args: + hex_value(string): The hex representation generated by rgb_to_hex_value() + """ + h = int(hex_value[7:10], 16) / 360 + s = int(hex_value[10:12], 16) / 255 + v = int(hex_value[12:14], 16) / 255 + + return h, s, v + + +def query_devices(timeout_in_s=3.1, max_count=None): + from socket import socket, AF_INET, SOCK_DGRAM + + def decode_message(message): + json_msg = message[message.index(b"{"):-message[::-1].index(b"}")] + try: + return json.loads(json_msg) + except json.JSONDecodeError as e: + logging.warning("Error occurred while trying to decode json message: %s", e) + return {} + + s = socket(AF_INET, SOCK_DGRAM) + s.bind(('', 6666)) + s.settimeout(timeout_in_s) + + guids = {} + t_start = time.time() + while time.time() < t_start + timeout_in_s: + message = s.recv(1024) + data = decode_message(message) + if data.get("gwId") not in guids: + guids[data.get("gwId")] = data + if max_count and len(guids) == max_count: + break + + return guids + + +class KeyExtractor: + # extracts device ids and corresponding local keys from a server api response + # which can be obtained by sniffing app traffic (e.g. using ssl capture app). see howto for details. + # local keys are necessary to send commands to devices + + @staticmethod + def get_device_keys_hacky(api_response): + api_response = str(api_response) + + def get_json_value(key, json_str): + matches = re.findall("\"%s\":\"([a-zA-Z0-9\ -]*)\"" % key, json_str, re.DOTALL) + return matches[0] if len(matches) > 0 else None + + keys = {} + + # split api result by devId entry and extract next found localKey entry as corresponding key + # CAREFULLY NOTE: this assumes the devId entry always comes BEFORE the localKey entry in the dictionary + for dev_result in api_response.split("devId")[1:]: + dev_id = dev_result[dev_result.index(":") + 2:dev_result.index("\",")] + key = get_json_value("localKey", dev_result) + if key is None: + logging.warning("no key found for entry with id %s" % dev_id) + continue + + name = get_json_value("name", dev_result) + keys[dev_id] = dict(key=key, name=name, id=dev_id) + + return keys + + @staticmethod + def get_device_keys_json(api_response): + data = json.loads(api_response) + keys = {} + for entry in data.get("result", []): + if entry.get('a') == 'tuya.m.my.group.device.list': + for dev in entry.get('result'): + if 'devId' in dev: + keys[dev['devId']] = dict(key=dev['localKey'], name=dev.get("name", ""), id=dev['devId']) + if 'devId' in entry: + dev = entry + keys[dev['devId']] = dict(key=dev['localKey'], name=dev.get("name", ""), id=dev['devId']) + return keys + + @staticmethod + def parse_device_keys_from_api_response(api_response): + try: + keys = KeyExtractor.get_device_keys_json(api_response) + except Exception as e: + logging.info("getting device keys using json method failed: \n\t%s\ntrying hacky method instead.." % e) + keys = KeyExtractor.get_device_keys_hacky(api_response) + return keys + + +if __name__ == "__main__": + import pytuya + _dev = pytuya.CoverDevice(dev_id="55870625b4e62d4b2ff7", address="192.168.137.80", + local_key="385943cd1379f098", dev_type="device") + + + def get_status_descr(status): + if type(status) is bytes: + return str(status) + return {'1': "opening or open", '2': "closing or closed", '3': "stopped"}.get(status.get('dps').get('1')) + + + print(get_status_descr(dev.status())) + + action_open = {'2': '1'} + action_close = {'2': '2'} + action_stop = {'2': '3'} + + + def send_action(action): + payload = _dev.generate_payload(command=pytuya.SET, data=action) + _dev._send_receive(payload) + return + + + send_action(action_stop) + + print(get_status_descr(_dev.status())) From 8dff0e5493c2042e77ed55a40386f53368f873df Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Mon, 12 Nov 2018 17:04:02 +0100 Subject: [PATCH 04/16] updated readme with instructions for extracting device keys --- README.md | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 7c29df6..7ebd069 100644 --- a/README.md +++ b/README.md @@ -3,20 +3,44 @@ [![Build Status](https://travis-ci.org/clach04/python-tuya.svg?branch=master)](https://travis-ci.org/clach04/python-tuya) Python 2.7 and Python 3.6.1 interface to ESP8266MOD WiFi smart devices from Shenzhen Xenon. -If you are using the Jinvoo Smart App, this allows local control over the LAN. -NOTE requires the devices to have already been **activated** by Jinvoo Smart App (or similar). +If you are using the Jinvoo Smart app, this allows local control over the LAN. +NOTE requires the devices to have already been **activated** by Jinvoo Smart app (or similar). -## Key extraction - -https://github.com/clach04/python-tuya/wiki has background information for how to get device id and local key. -(the device id can be seen in Jinvoo Smart App, under "Device Info"). Known to work with: * SKYROKU SM-PW701U Wi-Fi Plug Smart Plug - see https://wikidevi.com/wiki/Xenon_SM-PW701U * Wuudi SM-S0301-US - WIFI Smart Power Socket Multi Plug with 4 AC Outlets and 4 USB Charging + * Jinvoo WiFi Curtain / Roller Shutter Switch -Demo: +## Key extraction + +- background knowledge + - The ``local-key`` is used for AES-based encryption of the messages sent between device and client. + - Key changes every time the device is reset and paired to a new account (e.g. using app) + - Key and further meta-data is regularly requested by the app from the cloud-server via HTTPS requests + - The request responses can be recorded by apps to extract the key and further data (such as name, ip, state, etc.) +- how to extract key + - Android + - Install your app used for setup and paring (tested with TuyaSmart) + - Install [SSL Capture](https://play.google.com/store/apps/details?id=com.minhui.networkcapture) + - Change app settings to only record your pairing app and start recording + - Go inside app and do something with one of your devices + - Go back inside the SSL Capture app, stop the recording + - Find the package with the longest response by the server + - Copy all information from the response body to your computer (e.g. via email) + - Use pytuya to extract key from the response stored inside a file: + ``pytuya extract response.txt`` + - IOS + - > TODO + +- further resources: + - https://github.com/clach04/python-tuya/wiki has further information for how to get device id and local key. +(the device id can be seen in Jinvoo Smart app, under "Device Info"). + +## CLI-Demo + +## API-Demo: import pytuya From 59e0e40dd757bf5f1ec05e8e4c1a06f6db21530f Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Mon, 12 Nov 2018 23:06:55 +0100 Subject: [PATCH 05/16] updated readme, added cli interface, modified setup.py for pytuya script support --- README.md | 70 ++++++++++++++++++++++- pytuya/__init__.py | 7 +++ pytuya/cli/__init__.py | 7 +++ pytuya/cli/bulb.py | 71 +++++++++++++++++++++++ pytuya/cli/cover.py | 50 +++++++++++++++++ pytuya/cli/main.py | 125 +++++++++++++++++++++++++++++++++++++++++ pytuya/cli/outlet.py | 43 ++++++++++++++ pytuya/cli/utils.py | 46 +++++++++++++++ requirements.txt | 2 + setup.py | 5 +- 10 files changed, 423 insertions(+), 3 deletions(-) create mode 100644 pytuya/__init__.py create mode 100644 pytuya/cli/__init__.py create mode 100644 pytuya/cli/bulb.py create mode 100644 pytuya/cli/cover.py create mode 100644 pytuya/cli/main.py create mode 100644 pytuya/cli/outlet.py create mode 100644 pytuya/cli/utils.py diff --git a/README.md b/README.md index 7ebd069..c080b59 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ Known to work with: - Find the package with the longest response by the server - Copy all information from the response body to your computer (e.g. via email) - Use pytuya to extract key from the response stored inside a file: - ``pytuya extract response.txt`` + ``pytuya utils extract_keys response.txt`` - IOS - > TODO @@ -38,7 +38,73 @@ Known to work with: - https://github.com/clach04/python-tuya/wiki has further information for how to get device id and local key. (the device id can be seen in Jinvoo Smart app, under "Device Info"). -## CLI-Demo +## CLI - Commandline Interface +The command line tool ``pytuya`` can be used to send actions to devices. Simply executing ``pytuya`` after +installing displays the following options: + + >pytuya + Usage: pytuya [OPTIONS] COMMAND [ARGS]... + + Options: + -l, --debug / --no-debug + -c, --config_path PATH + --help Show this message and exit. + + Commands: + bulb + cover + outlet + update_config + utils + +In order to use this client interface, it is first necessary to update the configuration, which should +a name, ip, id and local key used for encryption for each device. Given a recorded API response extracted +from the app using SSL Capture (see above), the configuration can be automatically built as following: + + > pytuya update_config example_response.txt + + INFO:root:Querying devices + WARNING:root:wrote config at C:\Users\username\pytuya.yaml with content: + + Bedroom Blinds: + id: 51870625b4e62e4b2fc4 + ip: 192.168.1.116 + key: afab3d41b839c54c + Bedroom Lights: + id: 5517064584f3ec2e4095 + ip: 192.168.1.210 + key: bfa4804827714672 + +Once this config file exists, actions can be sent to the corresponding devices by referencing them via name: + + > pytuya cover close "bedroom blinds" + + INFO:root:sending close to device bedroom blinds at 192.168.1.116 + +In order to get help, simply use the ``--help`` flag, e.g.: + + > pytuya bulb --help + + Usage: pytuya bulb [OPTIONS] COMMAND [ARGS]... + + Options: + --help Show this message and exit. + + Commands: + brightness set brightness of device + colour set colour of device using provided R, G, B... + off sends turn off action to device + on sends turn on action to device + state sends turn off action to device + + > pytuya bulb colour --help + Usage: pytuya bulb colour [OPTIONS] NAME [R] [G] [B] + + set colour of device using provided R, G, B (red green and blue) + + Options: + --help Show this message and exit. + ## API-Demo: diff --git a/pytuya/__init__.py b/pytuya/__init__.py new file mode 100644 index 0000000..e9d8f12 --- /dev/null +++ b/pytuya/__init__.py @@ -0,0 +1,7 @@ +from .devices import XenonDevice, Device, BulbDevice, OutletDevice, CoverDevice, SET +from .utils import query_devices, KeyExtractor +from pytuya.cli import cli_root + +version_tuple = (7, 0, 4) +version = version_string = __version__ = '%d.%d.%d' % version_tuple +__author__ = 'clach04' diff --git a/pytuya/cli/__init__.py b/pytuya/cli/__init__.py new file mode 100644 index 0000000..4b3c64b --- /dev/null +++ b/pytuya/cli/__init__.py @@ -0,0 +1,7 @@ +from pytuya.cli.main import cli_root, config, get_device_from_config +from pytuya.cli import bulb, cover, outlet, utils + + +def main(): + cli_root() + diff --git a/pytuya/cli/bulb.py b/pytuya/cli/bulb.py new file mode 100644 index 0000000..d96cc9d --- /dev/null +++ b/pytuya/cli/bulb.py @@ -0,0 +1,71 @@ +import yaml +import click +from pytuya.cli import cli_root, get_device_from_config, config +from pytuya.devices import BulbDevice + + +@cli_root.group("bulb") +def bulb(): + pass + + +@bulb.command() +@click.argument('name', default=None) +def on(name): + """ sends turn on action to device """ + dev_props = get_device_from_config(config, name) + dev = BulbDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + dev.turn_on() + + +@bulb.command() +@click.argument('name', default=None) +def off(name): + """ sends turn off action to device """ + dev_props = get_device_from_config(config, name) + dev = BulbDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + dev.turn_off() + + +@bulb.command() +@click.argument('name', default=None) +@click.argument('brightness', default=255, type=click.types.IntRange(min=25, max=255, clamp=True)) +@click.option('-t', '--colour_temp', default=None, type=click.types.IntRange(min=25, max=255, clamp=True), + help="colour temperature") +def brightness(name, brightness, colour_temp): + """ set brightness of device""" + dev_props = get_device_from_config(config, name) + dev = BulbDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + if colour_temp is None: + dev.set_brightness(brightness) + else: + dev.set_white(brightness=brightness, colour_temp=colour_temp) + + +@bulb.command() +@click.argument('name', default=None) +@click.argument('r', default=255, type=click.types.IntRange(min=0, max=255, clamp=True)) +@click.argument('g', default=255, type=click.types.IntRange(min=0, max=255, clamp=True)) +@click.argument('b', default=255, type=click.types.IntRange(min=0, max=255, clamp=True)) +def colour(name, r, g, b): + """ set colour of device using provided R, G, B (red green and blue)""" + dev_props = get_device_from_config(config, name) + dev = BulbDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + dev.set_colour(r, g, b) + + +@bulb.command() +@click.argument('name', default=None) +def state(name): + """ sends turn off action to device""" + dev_props = get_device_from_config(config, name) + dev = BulbDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + dev.state() + print(yaml.dump({name: dev.state()}, default_flow_style=False)) + + +if __name__ == "__main__": + import sys + sys.argv = list(sys.argv) + ["bulb", "state", "study"] + from pytuya import cli_root + cli_root() diff --git a/pytuya/cli/cover.py b/pytuya/cli/cover.py new file mode 100644 index 0000000..3a14062 --- /dev/null +++ b/pytuya/cli/cover.py @@ -0,0 +1,50 @@ +import logging +import click +import yaml +from pytuya.devices import CoverDevice +from pytuya.cli.main import cli_root, config, get_device_from_config + + +@cli_root.group("cover") +def cover(): + pass + + +def exec_cover_action(name, action_name): + actions = dict(close=CoverDevice.action_close, open=CoverDevice.action_open, + stop=CoverDevice.action_stop) + dev_props = get_device_from_config(config, name) + logging.info("sending %s to device %s at %s" % (action_name, name, dev_props["ip"])) + dev = CoverDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + dev.send_action(action=actions.get(action_name)) + + +def add_cover_command(action_name): + cmd = cover.command(action_name)( + click.argument('name', default=None)( + lambda name: exec_cover_action(name, action_name))) + return cmd + + +for action in "open", "close", "stop": + add_cover_command(action) + + +@cover.command() +@click.argument('name', default=None) +def state(name): + """ sends turn off action to device specified via NAME """ + dev_props = get_device_from_config(config, name) + dev = CoverDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + print(yaml.dump({name: dev.state()}, default_flow_style=False)) + + +if __name__ == "__main__": + import sys + + name = "study blinds" + sys.argv = list(sys.argv) + ["cover", "state", name.lower()] + + print("\nexecuting test: " + " ".join(sys.argv[1:])) + + cli_root() diff --git a/pytuya/cli/main.py b/pytuya/cli/main.py new file mode 100644 index 0000000..d0f6e32 --- /dev/null +++ b/pytuya/cli/main.py @@ -0,0 +1,125 @@ +import logging +import click +import yaml +import os + + +def_config_path = os.path.join(os.path.expanduser("~"), "pytuya.yaml") + + +class Config(dict): + _path = None + + @property + def path(self): + return self._path + + @path.setter + def path(self, value): + self._path = value + with open(self._path, "r") as f: + content = yaml.load(f.read()) + + if content is None: + raise RuntimeError("Invalid Config: %s " % self.path) + + super().update(content) + + def __str__(self): + return yaml.dump(dict(**self), default_flow_style=False) + + def update(self, new_config, **kwargs): + if os.path.isfile(self._path): + try: + old_config = dict(**self) + except Exception as e: + logging.warning("%s: generating new config" % e) + old_config = {} + else: + old_config = {} + + old_config.update(new_config) + with open(self.path, "w") as f: + config_yaml = yaml.dump(old_config, default_flow_style=False) + f.write(config_yaml) + + super().update(new_config) + + logging.warning("wrote config at %s with content:\n%s" % (self._path, config_yaml)) + + +config = Config() + + +def get_keys_from_file(api_response_path): + # extracts a local key from recorded api responses stored in a file + from pytuya.utils import KeyExtractor + + with open(api_response_path, "rb") as f: + api_response = f.read() + + return KeyExtractor.parse_device_keys_from_api_response(api_response) + + +def build_config(api_response_path): + from pytuya.utils import query_devices + keys = get_keys_from_file(api_response_path) + logging.info("Querying devices") + devices = query_devices() + res = {} + for dev_id, props in keys.items(): + if dev_id not in devices or "ip" not in devices[dev_id]: + logging.warning("device %s with id %s not found" % (props["name"], dev_id)) + continue + res[props["name"]] = dict(key=props["key"], ip=devices[dev_id]["ip"], id=dev_id) + return res + + +def get_device_from_config(config, name): + dev_props = config.get(name) + if dev_props is None: + dev_props = {k.lower().replace(" ", ""): v for k, v in config.items()}.get(name.lower().replace(" ", "")) + if dev_props is None: + raise RuntimeError("Device %s not found in config:\n%s" % (name, config)) + return dev_props + + +@click.group() +@click.option('-l', '--debug/--no-debug', default=False) +@click.option("-c", "--config_path", default=def_config_path, type=click.Path(file_okay=True, dir_okay=False)) +def cli_root(debug, config_path): + log_level = logging.DEBUG if debug else logging.INFO + logging.getLogger().setLevel(log_level) + logging.basicConfig( + level=log_level, + format='%(asctime)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + ) + config.path = config_path + + +@cli_root.command("update_config") +@click.argument('api_response_path', default=None, type=click.Path(file_okay=True, dir_okay=False, readable=True)) +def update_config(api_response_path): + # updates a config file using info extracted from api response and queried devices + new_config = build_config(api_response_path) + config.update(new_config) + + + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1: + cli_root() + + test_extract = list(sys.argv) + ["extract", "../../example_response.txt"] + test_query = list(sys.argv) + ["query"] + test_update_config = list(sys.argv) + ["update_config", "../../example_response.txt"] + + test = test_update_config + print("\nexecuting test: pytuya %s\n" % test) + sys.argv = test + + cli_root() diff --git a/pytuya/cli/outlet.py b/pytuya/cli/outlet.py new file mode 100644 index 0000000..9012dac --- /dev/null +++ b/pytuya/cli/outlet.py @@ -0,0 +1,43 @@ +import click +import yaml +from pytuya.cli import cli_root, get_device_from_config, config +from pytuya.devices import OutletDevice + + +@cli_root.group("outlet") +def outlet(): + pass + + +@outlet.command() +@click.argument('name', default=None) +def on(name): + """ sends turn on action to device specified via NAME """ + dev_props = get_device_from_config(config, name) + dev = OutletDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + dev.turn_on() + + +@outlet.command() +@click.argument('name', default=None) +def off(name): + """ sends turn off action to device specified via NAME """ + dev_props = get_device_from_config(config, name) + dev = OutletDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + dev.turn_off() + + +@outlet.command() +@click.argument('name', default=None) +def state(name): + """ sends turn off action to device specified via NAME """ + dev_props = get_device_from_config(config, name) + dev = OutletDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) + print(yaml.dump({name: dev.status()}, default_flow_style=False)) + + +if __name__ == "__main__": + import sys + sys.argv = list(sys.argv) + ["outlet", "state", "study"] + from pytuya import cli_root + cli_root() diff --git a/pytuya/cli/utils.py b/pytuya/cli/utils.py new file mode 100644 index 0000000..a3eeec0 --- /dev/null +++ b/pytuya/cli/utils.py @@ -0,0 +1,46 @@ +import yaml +import click +from pytuya.cli.main import cli_root, get_keys_from_file, build_config, config +from pytuya.utils import query_devices + + +@cli_root.group("utils") +def utils(): + pass + + +@utils.command("extract_keys") +@click.argument('api_response_path', default=None, type=click.Path(file_okay=True, dir_okay=False, readable=True)) +def extract_keys(api_response_path): + """ Extracts local keys from a recorded api response. API_RESPONSE_PATH is a file containing the response """ + result = get_keys_from_file(api_response_path) + pretty = yaml.dump({el["name"]: {k: v for k, v in el.items() if k != "name"} for el in result.values()}, + default_flow_style=False) + print(pretty) + + +@utils.command() +@click.option('-t', '--timeout', default=3.1, help="time spent for listening for device broadcasts") +def discover(timeout): + """ discovers tuya devices available on the network """ + result = query_devices(timeout_in_s=timeout) + pretty = yaml.dump(result, default_flow_style=False) + print(pretty) + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1: + cli_root() + + test_extract = list(sys.argv) + ["utils", "extract_keys", "../../example_response.txt"] + test_query = list(sys.argv) + ["utils", "discover"] + + for test in (test_extract, test_query): + print("\nexecuting test: pytuya %s\n" % test) + sys.argv = test + try: + cli_root() + except SystemExit: + pass diff --git a/requirements.txt b/requirements.txt index f7d3e23..609b1bd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ pycrypto==2.6.1 +pyyaml +click \ No newline at end of file diff --git a/setup.py b/setup.py index cd02ba5..9d62482 100644 --- a/setup.py +++ b/setup.py @@ -55,9 +55,12 @@ 'Topic :: Home Automation', ], keywords='home automation', - packages=['pytuya'], + packages=['pytuya', 'pytuya.cli'], platforms='any', install_requires=[ 'pyaes', # NOTE this is optional, AES can be provided via PyCrypto or PyCryptodome ], + entry_points={ + 'console_scripts': ['pytuya=pytuya.cli:main'], + } ) From 963d733b8bfbf9cec89a50a2ac0c7f7b5847a1ea Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Mon, 12 Nov 2018 23:32:17 +0100 Subject: [PATCH 06/16] bug fixes, minor changes --- pytuya/cli/main.py | 15 +++++++-------- setup.py | 2 ++ 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pytuya/cli/main.py b/pytuya/cli/main.py index d0f6e32..2ad0970 100644 --- a/pytuya/cli/main.py +++ b/pytuya/cli/main.py @@ -4,7 +4,7 @@ import os -def_config_path = os.path.join(os.path.expanduser("~"), "pytuya.yaml") +def_config_path = os.path.join(os.path.expanduser("~"), ".pytuya.yaml") class Config(dict): @@ -17,13 +17,14 @@ def path(self): @path.setter def path(self, value): self._path = value - with open(self._path, "r") as f: - content = yaml.load(f.read()) + if os.path.isfile(value): + with open(self._path, "r") as f: + content = yaml.load(f.read()) - if content is None: - raise RuntimeError("Invalid Config: %s " % self.path) + if content is None: + raise RuntimeError("Invalid Config: %s " % self.path) - super().update(content) + super().update(content) def __str__(self): return yaml.dump(dict(**self), default_flow_style=False) @@ -106,8 +107,6 @@ def update_config(api_response_path): config.update(new_config) - - if __name__ == "__main__": import sys diff --git a/setup.py b/setup.py index 9d62482..a5cff34 100644 --- a/setup.py +++ b/setup.py @@ -59,6 +59,8 @@ platforms='any', install_requires=[ 'pyaes', # NOTE this is optional, AES can be provided via PyCrypto or PyCryptodome + 'pyyaml', + 'click' ], entry_points={ 'console_scripts': ['pytuya=pytuya.cli:main'], From 04018a30610a243512d358c689a0743dc47d95cc Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Mon, 12 Nov 2018 23:51:03 +0100 Subject: [PATCH 07/16] updated readme --- README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/README.md b/README.md index c080b59..1f82fb6 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,22 @@ In order to get help, simply use the ``--help`` flag, e.g.: Options: --help Show this message and exit. +### HomeAssistant Integration +HomeAssistant does already support tuya devices via the official cloud API. However, the direct communication +of the pytuya package may be preferable to relying on cloud services (pytuya works regardless of internet connectivity). +To use pytuya in HomeAssistant, simply use the commandline device components. +Here's an example configuration for a cover device with the name "bedroom blinds": + +```yaml +cover: + - platform: command_line + covers: + bedroom: + command_open: pytuya cover open "bedroom blinds" + command_close: pytuya cover close "bedroom blinds" + command_stop: pytuya cover stop "bedroom blinds" +``` +Note that pytuya needs to be set up first before these commands can work (see ``pytuya update_config``) ## API-Demo: From 23f4864ae42fbe8dc2133240def893c5580b2295 Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Tue, 13 Nov 2018 09:07:33 +0100 Subject: [PATCH 08/16] bugfix and updated tests --- pytuya/devices.py | 6 ++---- tests.py | 6 +++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pytuya/devices.py b/pytuya/devices.py index f049aeb..372c76c 100644 --- a/pytuya/devices.py +++ b/pytuya/devices.py @@ -257,7 +257,7 @@ def set_colour(self, r, g, b): if not 0 <= value <= 255: raise ValueError("The %s for red needs to be between 0 and 255." % name) - return self._send(self.DPS_INDEX_MODE, colour=Colour.rgb_to_hex_value(r, g, b)) + return self._send(self.DPS_MODE_COLOUR, colour=Colour.rgb_to_hex_value(r, g, b)) def set_white(self, brightness, colour_temp): """ Set white coloured theme of an rgb bulb. @@ -278,9 +278,7 @@ def set_brightness(self, brightness): if not 25 <= brightness <= 255: raise ValueError("The brightness needs to be between 25 and 255.") - payload = self.generate_payload(SET, {self.DPS_INDEX_BRIGHTNESS: brightness}) - data = self._send_receive(payload) - return data + return self._send(brightness=brightness) def set_colour_temp(self, colour_temp): """ Set the colour temperature of an rgb bulb. diff --git a/tests.py b/tests.py index 33f43d0..69b6f56 100755 --- a/tests.py +++ b/tests.py @@ -41,14 +41,14 @@ def check_data_frame(data, expected_prefix, encrypted=True): version = data[16:19] checksum = data[19:35] encrypted_json = data[35:-8] - json_data = pytuya.AESCipher(LOCAL_KEY.encode(mock_byte_encoding)).decrypt(encrypted_json) + json_data = pytuya.utils.AESCipher(LOCAL_KEY.encode(mock_byte_encoding)).decrypt(encrypted_json) else: json_data = data[16:-8].decode(mock_byte_encoding) frame_ok = True - if prefix != pytuya.hex2bin(expected_prefix): + if prefix != pytuya.utils.hex2bin(expected_prefix): frame_ok = False - elif suffix != pytuya.hex2bin("000000000000aa55"): + elif suffix != pytuya.utils.hex2bin("000000000000aa55"): frame_ok = False elif encrypted: if payload_len != len(version) + len(checksum) + len(encrypted_json) + len(suffix): From ed3b82c3d6ea7205ed702c06c866c0742a2c3def Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Fri, 7 Dec 2018 19:03:37 +0100 Subject: [PATCH 09/16] removed cli import from pytuya.__init__ (pytuya doesn't depend on cli) --- pytuya/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pytuya/__init__.py b/pytuya/__init__.py index e9d8f12..737b14c 100644 --- a/pytuya/__init__.py +++ b/pytuya/__init__.py @@ -1,7 +1,8 @@ from .devices import XenonDevice, Device, BulbDevice, OutletDevice, CoverDevice, SET from .utils import query_devices, KeyExtractor -from pytuya.cli import cli_root version_tuple = (7, 0, 4) version = version_string = __version__ = '%d.%d.%d' % version_tuple __author__ = 'clach04' + + From ac263c02ee3d643ca790b0f74a189696c24774e2 Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Fri, 7 Dec 2018 19:05:17 +0100 Subject: [PATCH 10/16] ensure that return type is always json in device.get_state() --- pytuya/devices.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pytuya/devices.py b/pytuya/devices.py index 372c76c..849c3bb 100644 --- a/pytuya/devices.py +++ b/pytuya/devices.py @@ -167,6 +167,7 @@ def status(self): result = json.loads(result) else: log.error('Unexpected status() payload=%r', result) + result = dict(error=result) return result From 2a586126da142a62dc99598b2396916081ec903e Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Fri, 7 Dec 2018 19:07:25 +0100 Subject: [PATCH 11/16] cli: return formatted json for state, other minor changes and testing --- pytuya/cli/__init__.py | 5 +++++ pytuya/cli/bulb.py | 16 +++++++++------- pytuya/cli/cover.py | 11 +++++++---- pytuya/cli/main.py | 7 ++++--- pytuya/cli/outlet.py | 26 ++++++++++++++++---------- pytuya/cli/utils.py | 2 +- 6 files changed, 42 insertions(+), 25 deletions(-) diff --git a/pytuya/cli/__init__.py b/pytuya/cli/__init__.py index 4b3c64b..9d30908 100644 --- a/pytuya/cli/__init__.py +++ b/pytuya/cli/__init__.py @@ -5,3 +5,8 @@ def main(): cli_root() + +if __name__ == "__main__": + import sys, os + sys.argv = list(sys.argv) + ["update_config", os.getcwd()+"/../example_response.json"] + main() \ No newline at end of file diff --git a/pytuya/cli/bulb.py b/pytuya/cli/bulb.py index d96cc9d..ad66b36 100644 --- a/pytuya/cli/bulb.py +++ b/pytuya/cli/bulb.py @@ -1,4 +1,4 @@ -import yaml +import json import click from pytuya.cli import cli_root, get_device_from_config, config from pytuya.devices import BulbDevice @@ -54,18 +54,20 @@ def colour(name, r, g, b): dev.set_colour(r, g, b) +def get_json_state(dev_props): + return BulbDevice(dev_props["id"], dev_props["ip"], dev_props["key"]).status() + + @bulb.command() @click.argument('name', default=None) def state(name): - """ sends turn off action to device""" + """ prints the current state of device specified via NAME """ dev_props = get_device_from_config(config, name) - dev = BulbDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) - dev.state() - print(yaml.dump({name: dev.state()}, default_flow_style=False)) + print(json.dumps(get_json_state(dev_props))) if __name__ == "__main__": import sys - sys.argv = list(sys.argv) + ["bulb", "state", "study"] - from pytuya import cli_root + sys.argv = list(sys.argv) + ["bulb", "state", "garden lights"] + from pytuya.cli import cli_root cli_root() diff --git a/pytuya/cli/cover.py b/pytuya/cli/cover.py index 3a14062..f245bb6 100644 --- a/pytuya/cli/cover.py +++ b/pytuya/cli/cover.py @@ -1,6 +1,6 @@ import logging import click -import yaml +import json from pytuya.devices import CoverDevice from pytuya.cli.main import cli_root, config, get_device_from_config @@ -30,19 +30,22 @@ def add_cover_command(action_name): add_cover_command(action) +def get_json_state(dev_props): + return CoverDevice(dev_props["id"], dev_props["ip"], dev_props["key"]).status() + + @cover.command() @click.argument('name', default=None) def state(name): """ sends turn off action to device specified via NAME """ dev_props = get_device_from_config(config, name) - dev = CoverDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) - print(yaml.dump({name: dev.state()}, default_flow_style=False)) + print(json.dumps(get_json_state(dev_props))) if __name__ == "__main__": import sys - name = "study blinds" + name = "study_blinds" sys.argv = list(sys.argv) + ["cover", "state", name.lower()] print("\nexecuting test: " + " ".join(sys.argv[1:])) diff --git a/pytuya/cli/main.py b/pytuya/cli/main.py index 2ad0970..11d94a6 100644 --- a/pytuya/cli/main.py +++ b/pytuya/cli/main.py @@ -79,7 +79,8 @@ def build_config(api_response_path): def get_device_from_config(config, name): dev_props = config.get(name) if dev_props is None: - dev_props = {k.lower().replace(" ", ""): v for k, v in config.items()}.get(name.lower().replace(" ", "")) + name_map = lambda name: name.lower().replace(" ", "").replace("_", "") + dev_props = {name_map(k): v for k, v in config.items()}.get(name_map(name)) if dev_props is None: raise RuntimeError("Device %s not found in config:\n%s" % (name, config)) return dev_props @@ -113,9 +114,9 @@ def update_config(api_response_path): if len(sys.argv) > 1: cli_root() - test_extract = list(sys.argv) + ["extract", "../../example_response.txt"] + test_extract = list(sys.argv) + ["extract", "../../example_response.json"] test_query = list(sys.argv) + ["query"] - test_update_config = list(sys.argv) + ["update_config", "../../example_response.txt"] + test_update_config = list(sys.argv) + ["update_config", "../../example_response.json"] test = test_update_config print("\nexecuting test: pytuya %s\n" % test) diff --git a/pytuya/cli/outlet.py b/pytuya/cli/outlet.py index 9012dac..77e792d 100644 --- a/pytuya/cli/outlet.py +++ b/pytuya/cli/outlet.py @@ -1,5 +1,5 @@ import click -import yaml +import json from pytuya.cli import cli_root, get_device_from_config, config from pytuya.devices import OutletDevice @@ -11,33 +11,39 @@ def outlet(): @outlet.command() @click.argument('name', default=None) -def on(name): +@click.argument('switch', default=1) +def on(name, switch): """ sends turn on action to device specified via NAME """ dev_props = get_device_from_config(config, name) dev = OutletDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) - dev.turn_on() + dev.turn_on(switch=switch) @outlet.command() @click.argument('name', default=None) -def off(name): +@click.argument('switch', default=1, type=click.types.IntRange(0, 3)) +def off(name, switch): """ sends turn off action to device specified via NAME """ dev_props = get_device_from_config(config, name) dev = OutletDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) - dev.turn_off() + dev.turn_off(switch=switch) + + +def get_json_state(dev_props): + return OutletDevice(dev_props["id"], dev_props["ip"], dev_props["key"]).status() @outlet.command() @click.argument('name', default=None) def state(name): - """ sends turn off action to device specified via NAME """ + """ prints the current state of device specified via NAME """ dev_props = get_device_from_config(config, name) - dev = OutletDevice(dev_props["id"], dev_props["ip"], dev_props["key"]) - print(yaml.dump({name: dev.status()}, default_flow_style=False)) + print(json.dumps(get_json_state(dev_props))) if __name__ == "__main__": import sys - sys.argv = list(sys.argv) + ["outlet", "state", "study"] - from pytuya import cli_root + # sys.argv = list(sys.argv) + ["outlet", "off", "garden lights", "2"] + sys.argv = list(sys.argv) + ["outlet", "state", "garden lights"] + from pytuya.cli import cli_root cli_root() diff --git a/pytuya/cli/utils.py b/pytuya/cli/utils.py index a3eeec0..7ea1c34 100644 --- a/pytuya/cli/utils.py +++ b/pytuya/cli/utils.py @@ -34,7 +34,7 @@ def discover(timeout): if len(sys.argv) > 1: cli_root() - test_extract = list(sys.argv) + ["utils", "extract_keys", "../../example_response.txt"] + test_extract = list(sys.argv) + ["utils", "extract_keys", "../../example_response.json"] test_query = list(sys.argv) + ["utils", "discover"] for test in (test_extract, test_query): From 2a0b15a7be81f89148e4dfedc9d71ea3c3b24362 Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Fri, 7 Dec 2018 22:38:18 +0100 Subject: [PATCH 12/16] cli: updated status description for cover device --- pytuya/cli/cover.py | 10 +++++++++- pytuya/devices.py | 20 ++++++++++---------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/pytuya/cli/cover.py b/pytuya/cli/cover.py index f245bb6..9c37aa9 100644 --- a/pytuya/cli/cover.py +++ b/pytuya/cli/cover.py @@ -34,12 +34,20 @@ def get_json_state(dev_props): return CoverDevice(dev_props["id"], dev_props["ip"], dev_props["key"]).status() +def get_status_descr(status): + if type(status) is bytes: + return str(status) + return {'1': "open", '2': "closed", '3': "stopped"}.get(status.get('dps').get('1')) + + @cover.command() @click.argument('name', default=None) def state(name): """ sends turn off action to device specified via NAME """ dev_props = get_device_from_config(config, name) - print(json.dumps(get_json_state(dev_props))) + state = get_json_state(dev_props) + state["descr"] = get_status_descr(state) + print(json.dumps(state)) if __name__ == "__main__": diff --git a/pytuya/devices.py b/pytuya/devices.py index 849c3bb..cece613 100644 --- a/pytuya/devices.py +++ b/pytuya/devices.py @@ -72,19 +72,19 @@ def _send_receive(self, payload): payload(bytes): Data to send. """ - success, tries = False, 0 - while not success and tries < 3: + success, e, data = False, "", "" + for tries in range(3): try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - s.settimeout(self.connection_timeout) - s.connect((self.address, self.port)) - s.send(payload) - data = s.recv(1024) - s.close() + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + s.settimeout(self.connection_timeout) + s.connect((self.address, self.port)) + s.send(payload) + data = s.recv(1024) success = True + break except ConnectionResetError as e: - tries += 1 + pass if not success: logging.warning("failed to communicate %s" % e) From f7f9f711234b38727d6c8accc78e0a1e6a655919 Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Fri, 7 Dec 2018 22:40:30 +0100 Subject: [PATCH 13/16] updated send_receive error, timeout handling --- pytuya/devices.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pytuya/devices.py b/pytuya/devices.py index cece613..1d92fa3 100644 --- a/pytuya/devices.py +++ b/pytuya/devices.py @@ -58,7 +58,8 @@ def __init__(self, dev_id, address, local_key=None, dev_type='device', connectio self.address = address self.local_key = local_key.encode('latin1') self.dev_type = dev_type - self.connection_timeout = connection_timeout + self.send_receive_max_tries = 3 + self.socket_timeout = connection_timeout / self.send_receive_max_tries self.cipher = None self.port = 6668 # default - do not expect caller to pass in @@ -72,22 +73,24 @@ def _send_receive(self, payload): payload(bytes): Data to send. """ - success, e, data = False, "", "" - for tries in range(3): + success, data = False, "" + for tries in range(1, self.send_receive_max_tries+1): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - s.settimeout(self.connection_timeout) + s.settimeout(self.socket_timeout) s.connect((self.address, self.port)) s.send(payload) data = s.recv(1024) success = True break except ConnectionResetError as e: - pass + logging.warning("Connection attempt %i/%i: %s" % (tries, self.send_receive_max_tries, e)) + except socket.timeout as e: + logging.warning("Connection attempt %i/%i: %s" % (tries, self.send_receive_max_tries, e)) if not success: - logging.warning("failed to communicate %s" % e) + raise RuntimeError("Unable to communicate with device") else: return data From 6104f9d5d5d0b824d5317e42754fdbc0dfde1f4c Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Sun, 30 Dec 2018 14:58:38 +0100 Subject: [PATCH 14/16] added payload crc32 support as mentioned in https://github.com/clach04/python-tuya/issues/40 --- pytuya/devices.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pytuya/devices.py b/pytuya/devices.py index 1d92fa3..0529ce0 100644 --- a/pytuya/devices.py +++ b/pytuya/devices.py @@ -13,6 +13,7 @@ import logging import socket import time +import binascii from pytuya.utils import hex2bin, bin2hex, AESCipher, Colour log = logging.getLogger(__name__) @@ -131,11 +132,12 @@ def generate_payload(self, command, data=None): json_payload = PROTOCOL_VERSION_BYTES + m.hexdigest()[8:][:16].encode('latin1') + json_payload self.cipher = None # expect to connect and then disconnect to set new - postfix_payload = hex2bin(bin2hex(json_payload) + payload_dict[self.dev_type]['suffix']) - - assert len(postfix_payload) <= 0xff - postfix_payload_hex_len = '%x' % len(postfix_payload) # TODO this assumes a single byte 0-255 (0x00-0xff) - + suffix = payload_dict[self.dev_type]['suffix'] + payload = bin2hex(json_payload) + crc32 = "%.8x" % binascii.crc32(bytearray(payload.encode())) + suffix = crc32 + suffix[-8:] + postfix_payload = hex2bin(payload + suffix) + postfix_payload_hex_len = '%x' % len(postfix_payload) return hex2bin(payload_dict[self.dev_type]['prefix'] + payload_dict[self.dev_type][command]['hexByte'] + '000000' + postfix_payload_hex_len) + postfix_payload From 9767298eabb1328a00066f9aaeb6de23653c1f06 Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Sun, 30 Dec 2018 14:59:23 +0100 Subject: [PATCH 15/16] cleanup --- pytuya/cli/__init__.py | 1 - pytuya/utils.py | 30 ------------------------------ 2 files changed, 31 deletions(-) diff --git a/pytuya/cli/__init__.py b/pytuya/cli/__init__.py index 9d30908..36fce55 100644 --- a/pytuya/cli/__init__.py +++ b/pytuya/cli/__init__.py @@ -8,5 +8,4 @@ def main(): if __name__ == "__main__": import sys, os - sys.argv = list(sys.argv) + ["update_config", os.getcwd()+"/../example_response.json"] main() \ No newline at end of file diff --git a/pytuya/utils.py b/pytuya/utils.py index 7802dfe..7394158 100644 --- a/pytuya/utils.py +++ b/pytuya/utils.py @@ -235,33 +235,3 @@ def parse_device_keys_from_api_response(api_response): logging.info("getting device keys using json method failed: \n\t%s\ntrying hacky method instead.." % e) keys = KeyExtractor.get_device_keys_hacky(api_response) return keys - - -if __name__ == "__main__": - import pytuya - _dev = pytuya.CoverDevice(dev_id="55870625b4e62d4b2ff7", address="192.168.137.80", - local_key="385943cd1379f098", dev_type="device") - - - def get_status_descr(status): - if type(status) is bytes: - return str(status) - return {'1': "opening or open", '2': "closing or closed", '3': "stopped"}.get(status.get('dps').get('1')) - - - print(get_status_descr(dev.status())) - - action_open = {'2': '1'} - action_close = {'2': '2'} - action_stop = {'2': '3'} - - - def send_action(action): - payload = _dev.generate_payload(command=pytuya.SET, data=action) - _dev._send_receive(payload) - return - - - send_action(action_stop) - - print(get_status_descr(_dev.status())) From 79721ddfac49c9ee99634ed9baf99ae728800b3c Mon Sep 17 00:00:00 2001 From: dominik dienlin Date: Sun, 30 Dec 2018 17:45:31 +0100 Subject: [PATCH 16/16] improve resilience to missing status attributes --- pytuya/devices.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/pytuya/devices.py b/pytuya/devices.py index 0529ce0..f87d07c 100644 --- a/pytuya/devices.py +++ b/pytuya/devices.py @@ -17,12 +17,10 @@ from pytuya.utils import hex2bin, bin2hex, AESCipher, Colour log = logging.getLogger(__name__) -logging.basicConfig() # TODO include function name/line numbers in log SET = 'set' PROTOCOL_VERSION_BYTES = b'3.1' - # This is intended to match requests.json payload at https://github.com/codetheweb/tuyapi payload_dict = { "device": { @@ -75,7 +73,7 @@ def _send_receive(self, payload): """ success, data = False, "" - for tries in range(1, self.send_receive_max_tries+1): + for tries in range(1, self.send_receive_max_tries + 1): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) @@ -235,9 +233,11 @@ class BulbDevice(Device): DPS_INDEX_BRIGHTNESS = '3' DPS_INDEX_COLOUR_TEMP = '4' DPS_INDEX_COLOUR = '5' + DPS_INDEX_COLOUR_SCENE = '6' DPS = 'dps' DPS_MODE_COLOUR = 'colour' + DPS_MODE_COLOUR_SCENE = 'scene' DPS_MODE_WHITE = 'white' def __init__(self, dev_id, address, local_key=None): @@ -297,29 +297,30 @@ def set_colour_temp(self, colour_temp): def brightness(self): """ Return brightness value """ - return self.status()[self.DPS][self.DPS_INDEX_BRIGHTNESS] + return self.status().get(self.DPS, {}).get(self.DPS_INDEX_BRIGHTNESS, 0) def colour_temp(self): """ Return colour temperature """ - return self.status()[self.DPS][self.DPS_INDEX_COLOUR_TEMP] + return self.status().get(self.DPS, {}).get(self.DPS_INDEX_COLOUR_TEMP, 0) def colour_rgb(self): """ Return colour as RGB value """ - hex_value = self.status()[self.DPS][self.DPS_INDEX_COLOUR] + hex_value = self.status().get(self.DPS, {}).get(self.DPS_INDEX_COLOUR, "0"*6) return Colour.hex_value_to_rgb(hex_value) def colour_hsv(self): """ Return colour as HSV value """ - hex_value = self.status()[self.DPS][self.DPS_INDEX_COLOUR] + hex_value = self.status().get(self.DPS, {}).get(self.DPS_INDEX_COLOUR, "0"*14) return Colour.hex_value_to_hsv(hex_value) def state(self): - dps = self.status()[self.DPS] - return dict(is_on=dps[self.DPS_INDEX_ON], - mode=dps[self.DPS_INDEX_MODE], - brightness=dps[self.DPS_INDEX_BRIGHTNESS], - colourtemp=dps[self.DPS_INDEX_COLOUR_TEMP], - colour=dps[self.DPS_INDEX_COLOUR]) + dps = self.status().get(self.DPS, {}) + return {k: v for k, v in + dict(is_on=dps.get(self.DPS_INDEX_ON), + mode=dps.get(self.DPS_INDEX_MODE), + brightness=dps.get(self.DPS_INDEX_BRIGHTNESS), + colourtemp=dps.get(self.DPS_INDEX_COLOUR_TEMP), + colour=dps.get(self.DPS_INDEX_COLOUR)).items() if v is not None} class CoverDevice(Device):