29
29
import six .moves .urllib .parse as urlparse
30
30
31
31
from .auth import JWT , Auth , AccessRequest
32
- from .error import UnavailableError , ClientError , RequestError , ServerError
32
+ from .error import BatchError , UnavailableError , ClientError , RequestError , ServerError
33
33
from .version import __version__ as umapi_version
34
34
35
35
@@ -285,13 +285,18 @@ def execute_multiple(self, actions, immediate=True):
285
285
286
286
NOTE: This is where we throttle the number of commands per action. So the number
287
287
of actions we were given may not be the same as the number we queue or send to the server.
288
+
289
+ NOTE: If the server gives us a response we don't understand, we note that and continue
290
+ processing as usual. Then, at the end of the batch, we throw in order to warn the client
291
+ that we had a problem understanding the server.
288
292
289
293
:param actions: the list of Action objects to be executed
290
294
:param immediate: whether to immediately send them to the server
291
295
:return: tuple: the number of actions in the queue, that got sent, and that executed successfully.
292
296
"""
293
297
# throttling part 1: split up each action into smaller actions, as needed
294
298
split_actions = []
299
+ exceptions = []
295
300
for a in actions :
296
301
if len (a .commands ) == 0 :
297
302
if self .logger : self .logger .warning ("Sending action with no commands: %s" , a .frame )
@@ -303,29 +308,24 @@ def execute_multiple(self, actions, immediate=True):
303
308
split_actions .append (a )
304
309
actions = self .action_queue + split_actions
305
310
# throttling part 2: execute the action list in batches, as needed
306
- sent = completed = last_batch_sent = last_batch_completed = 0
307
- try :
308
- while len (actions ) >= self .throttle_actions :
309
- batch , actions = actions [0 :self .throttle_actions ], actions [self .throttle_actions :]
310
- if self .logger : self .logger .debug ("Executing %d actions (%d remaining)." , len (batch ), len (actions ))
311
- sent += len (batch )
312
- completed += self ._execute_batch (batch )
313
- finally :
314
- self .action_queue = actions
315
- self .local_status ["actions-queued" ] = len (actions )
316
- self .local_status ["actions-sent" ] += sent
317
- self .local_status ["actions-completed" ] += completed
318
- # there may be actions left over
319
- if actions and immediate :
311
+ sent = completed = 0
312
+ batch_size = self .throttle_actions
313
+ min_size = 1 if immediate else batch_size
314
+ while len (actions ) >= min_size :
315
+ batch , actions = actions [0 :batch_size ], actions [batch_size :]
316
+ if self .logger : self .logger .debug ("Executing %d actions (%d remaining)." , len (batch ), len (actions ))
317
+ sent += len (batch )
320
318
try :
321
- last_batch_sent = len (actions )
322
- last_batch_completed += self ._execute_batch (actions )
323
- finally :
324
- self .action_queue = []
325
- self .local_status ["actions-queued" ] = 0
326
- self .local_status ["actions-sent" ] += last_batch_sent
327
- self .local_status ["actions-completed" ] += last_batch_completed
328
- return len (self .action_queue ), sent + last_batch_sent , completed + last_batch_completed
319
+ completed += self ._execute_batch (batch )
320
+ except Exception as e :
321
+ exceptions .append (e )
322
+ self .action_queue = actions
323
+ self .local_status ["actions-queued" ] = queued = len (actions )
324
+ self .local_status ["actions-sent" ] += sent
325
+ self .local_status ["actions-completed" ] += completed
326
+ if exceptions :
327
+ raise BatchError (exceptions , queued , sent , completed )
328
+ return queued , sent , completed
329
329
330
330
def _execute_batch (self , actions ):
331
331
"""
0 commit comments