1
1
"""Library to handle connection with Switchbot"""
2
- import time
3
-
4
2
import binascii
5
3
import logging
4
+ import time
6
5
7
6
import bluepy
8
7
33
32
34
33
class SwitchbotDevice :
35
34
# pylint: disable=too-few-public-methods
35
+ # pylint: disable=too-many-instance-attributes
36
36
"""Base Representation of a Switchbot Device."""
37
37
38
38
def __init__ (self , mac , password = None , interface = None , ** kwargs ) -> None :
39
39
self ._interface = interface
40
40
self ._mac = mac
41
41
self ._device = None
42
+ self ._battery_percent = 0
42
43
self ._retry_count = kwargs .pop ("retry_count" , DEFAULT_RETRY_COUNT )
43
- self ._time_between_update_command = kwargs .pop ("time_between_update_command" ,
44
- DEFAULT_TIME_BETWEEN_UPDATE_COMMAND )
44
+ self ._time_between_update_command = kwargs .pop (
45
+ "time_between_update_command" , DEFAULT_TIME_BETWEEN_UPDATE_COMMAND
46
+ )
45
47
self ._last_time_command_send = time .time ()
46
48
if password is None or password == "" :
47
49
self ._password_encoded = None
48
50
else :
49
- self ._password_encoded = '%x' % (binascii .crc32 (password .encode ('ascii' )) & 0xffffffff )
51
+ self ._password_encoded = "%x" % (
52
+ binascii .crc32 (password .encode ("ascii" )) & 0xFFFFFFFF
53
+ )
50
54
51
55
def _connect (self ) -> None :
52
56
if self ._device is not None :
53
57
return
54
58
try :
55
59
_LOGGER .debug ("Connecting to Switchbot..." )
56
- self ._device = bluepy .btle .Peripheral (self . _mac ,
57
- bluepy .btle .ADDR_TYPE_RANDOM ,
58
- self . _interface )
60
+ self ._device = bluepy .btle .Peripheral (
61
+ self . _mac , bluepy .btle .ADDR_TYPE_RANDOM , self . _interface
62
+ )
59
63
_LOGGER .debug ("Connected to Switchbot." )
60
64
except bluepy .btle .BTLEException :
61
65
_LOGGER .debug ("Failed connecting to Switchbot." , exc_info = True )
@@ -91,8 +95,10 @@ def _writekey(self, key) -> bool:
91
95
write_result = hand .write (binascii .a2b_hex (key ), withResponse = True )
92
96
self ._last_time_command_send = time .time ()
93
97
if not write_result :
94
- _LOGGER .error ("Sent command but didn't get a response from Switchbot confirming command was sent. "
95
- "Please check the Switchbot." )
98
+ _LOGGER .error (
99
+ "Sent command but didn't get a response from Switchbot confirming command was sent."
100
+ " Please check the Switchbot."
101
+ )
96
102
else :
97
103
_LOGGER .info ("Successfully sent command to Switchbot (MAC: %s)." , self ._mac )
98
104
return write_result
@@ -111,12 +117,54 @@ def _sendcommand(self, key, retry) -> bool:
111
117
if send_success :
112
118
return True
113
119
if retry < 1 :
114
- _LOGGER .error ("Switchbot communication failed. Stopping trying." , exc_info = True )
120
+ _LOGGER .error (
121
+ "Switchbot communication failed. Stopping trying." , exc_info = True
122
+ )
115
123
return False
116
- _LOGGER .warning ("Cannot connect to Switchbot. Retrying (remaining: %d)..." , retry )
124
+ _LOGGER .warning (
125
+ "Cannot connect to Switchbot. Retrying (remaining: %d)..." , retry
126
+ )
117
127
time .sleep (DEFAULT_RETRY_TIMEOUT )
118
128
return self ._sendcommand (key , retry - 1 )
119
129
130
+ def get_servicedata (self , retry = DEFAULT_RETRY_COUNT , scan_timeout = 5 ) -> bytearray :
131
+ """Get BTLE 16b Service Data,
132
+ returns after the given timeout period in seconds."""
133
+ devices = None
134
+
135
+ waiting_time = self ._time_between_update_command - time .time ()
136
+ if waiting_time > 0 :
137
+ time .sleep (waiting_time )
138
+ try :
139
+ devices = bluepy .btle .Scanner ().scan (scan_timeout )
140
+
141
+ except bluepy .btle .BTLEManagementError :
142
+ _LOGGER .warning ("Error updating Switchbot." , exc_info = True )
143
+
144
+ if devices is None :
145
+ if retry < 1 :
146
+ _LOGGER .error (
147
+ "Switchbot update failed. Stopping trying." , exc_info = True
148
+ )
149
+ return None
150
+
151
+ _LOGGER .warning (
152
+ "Cannot update Switchbot. Retrying (remaining: %d)..." , retry
153
+ )
154
+ time .sleep (DEFAULT_RETRY_TIMEOUT )
155
+ return self .get_servicedata (retry - 1 , scan_timeout )
156
+
157
+ for device in devices :
158
+ if self ._mac .lower () == device .addr .lower ():
159
+ for (adtype , _ , value ) in device .getScanData ():
160
+ if adtype == 22 :
161
+ service_data = value [4 :].encode ()
162
+ service_data = binascii .unhexlify (service_data )
163
+
164
+ return service_data
165
+
166
+ return None
167
+
120
168
def get_mac (self ) -> str :
121
169
"""Returns the mac address of the device."""
122
170
return self ._mac
@@ -125,10 +173,40 @@ def get_min_time_update(self):
125
173
"""Returns the first time an update can be executed."""
126
174
return self ._last_time_command_send + self ._time_between_update_command
127
175
176
+ def get_battery_percent (self ) -> int :
177
+ """Returns device battery level in percent."""
178
+ return self ._battery_percent
179
+
128
180
129
181
class Switchbot (SwitchbotDevice ):
130
182
"""Representation of a Switchbot."""
131
183
184
+ def __init__ (self , * args , ** kwargs ) -> None :
185
+ self ._is_on = None
186
+ self ._mode = None
187
+ super ().__init__ (* args , ** kwargs )
188
+
189
+ def update (self , scan_timeout = 5 ) -> None :
190
+ """Updates the mode, battery percent and state of the device."""
191
+ barray = self .get_servicedata (scan_timeout = scan_timeout )
192
+
193
+ if barray is None :
194
+ return
195
+
196
+ _mode = barray [1 ] & 0b10000000 # 128 switch or 0 toggle
197
+ if _mode != 0 :
198
+ self ._mode = "switch"
199
+ else :
200
+ self ._mode = "toggle"
201
+
202
+ _is_on = barray [1 ] & 0b01000000 # 64 on or 0 for off
203
+ if _is_on == 0 and self ._mode == "switch" :
204
+ self ._is_on = True
205
+ else :
206
+ self ._is_on = False
207
+
208
+ self ._battery_percent = barray [2 ] & 0b01111111
209
+
132
210
def turn_on (self ) -> bool :
133
211
"""Turn device on."""
134
212
return self ._sendcommand (ON_KEY , self ._retry_count )
@@ -141,6 +219,16 @@ def press(self) -> bool:
141
219
"""Press command to device."""
142
220
return self ._sendcommand (PRESS_KEY , self ._retry_count )
143
221
222
+ def switch_mode (self ) -> str :
223
+ """Return Toggle or Switch from cache.
224
+ Run update first."""
225
+ return self ._mode
226
+
227
+ def is_on (self ) -> bool :
228
+ """Return switch state from cache.
229
+ Run update first."""
230
+ return self ._is_on
231
+
144
232
145
233
class SwitchbotCurtain (SwitchbotDevice ):
146
234
"""Representation of a Switchbot Curtain."""
@@ -150,23 +238,24 @@ def __init__(self, *args, **kwargs) -> None:
150
238
The position of the curtain is saved in self._pos with 0 = open and 100 = closed.
151
239
This is independent of the calibration of the curtain bot (Open left to right/
152
240
Open right to left/Open from the middle).
153
- The parameter 'reverse_mode' reverse these values, if 'reverse_mode' = True, position = 0 equals close
241
+ The parameter 'reverse_mode' reverse these values,
242
+ if 'reverse_mode' = True, position = 0 equals close
154
243
and position = 100 equals open. The parameter is default set to True so that
155
244
the definition of position is the same as in Home Assistant."""
156
- self ._reverse = kwargs .pop (' reverse_mode' , True )
245
+ self ._reverse = kwargs .pop (" reverse_mode" , True )
157
246
self ._pos = 0
158
247
self ._light_level = 0
159
- self ._battery_percent = 0
248
+ self ._is_calibrated = 0
160
249
super ().__init__ (* args , ** kwargs )
161
250
162
251
def open (self ) -> bool :
163
252
"""Send open command."""
164
- self ._pos = ( 100 if self ._reverse else 0 )
253
+ self ._pos = 100 if self ._reverse else 0
165
254
return self ._sendcommand (OPEN_KEY , self ._retry_count )
166
255
167
256
def close (self ) -> bool :
168
257
"""Send close command."""
169
- self ._pos = ( 0 if self ._reverse else 100 )
258
+ self ._pos = 0 if self ._reverse else 100
170
259
return self ._sendcommand (CLOSE_KEY , self ._retry_count )
171
260
172
261
def stop (self ) -> bool :
@@ -175,39 +264,29 @@ def stop(self) -> bool:
175
264
176
265
def set_position (self , position : int ) -> bool :
177
266
"""Send position command (0-100) to device."""
178
- position = (( 100 - position ) if self ._reverse else position )
267
+ position = (100 - position ) if self ._reverse else position
179
268
self ._pos = position
180
269
hex_position = "%0.2X" % position
181
270
return self ._sendcommand (POSITION_KEY + hex_position , self ._retry_count )
182
271
183
272
def update (self , scan_timeout = 5 ) -> None :
184
- """Updates the current position, battery percent and light level of the device.
185
- Returns after the given timeout period in seconds."""
186
- waiting_time = self .get_min_time_update () - time .time ()
187
- if waiting_time > 0 :
188
- time .sleep (waiting_time )
189
- devices = bluepy .btle .Scanner ().scan (scan_timeout )
273
+ """Updates the current position, battery percent and light level of the device."""
274
+ barray = self .get_servicedata (scan_timeout = scan_timeout )
190
275
191
- for device in devices :
192
- if self .get_mac ().lower () == device .addr .lower ():
193
- for (adtype , _ , value ) in device .getScanData ():
194
- if adtype == 22 :
195
- barray = bytearray (value , 'ascii' )
196
- self ._battery_percent = int (barray [- 6 :- 4 ], 16 )
197
- position = max (min (int (barray [- 4 :- 2 ], 16 ), 100 ), 0 )
198
- self ._pos = ((100 - position ) if self ._reverse else position )
199
- self ._light_level = int (barray [- 2 :], 16 )
276
+ if barray is None :
277
+ return
278
+
279
+ self ._is_calibrated = barray [1 ] & 0b01000000
280
+ self ._battery_percent = barray [2 ] & 0b01111111
281
+ position = max (min (barray [3 ] & 0b01111111 , 100 ), 0 )
282
+ self ._pos = (100 - position ) if self ._reverse else position
283
+ self ._light_level = (barray [4 ] >> 4 ) & 0b00001111 # light sensor level (1-10)
200
284
201
285
def get_position (self ) -> int :
202
286
"""Returns the current cached position (0-100), the actual position could vary.
203
287
To get the actual position call update() first."""
204
288
return self ._pos
205
289
206
- def get_battery_percent (self ) -> int :
207
- """Returns the current cached battery percent (0-100), the actual battery percent could vary.
208
- To get the actual battery percent call update() first."""
209
- return self ._battery_percent
210
-
211
290
def get_light_level (self ) -> int :
212
291
"""Returns the current cached light level, the actual light level could vary.
213
292
To get the actual light level call update() first."""
0 commit comments