11
11
import redis
12
12
import os
13
13
from pathlib import Path
14
+ import redistimeseries
15
+ from docker .models .containers import Container
14
16
15
17
from redisbench_admin .run .common import (
16
18
get_start_time_vars ,
@@ -79,6 +81,13 @@ def main():
79
81
topologies_map = get_topologies (topologies_files [0 ])
80
82
testsuites_folder = os .path .abspath (args .test_suites_folder )
81
83
logging .info ("Using test-suites folder dir {}" .format (testsuites_folder ))
84
+ testsuite_spec_files = get_benchmark_specs (testsuites_folder )
85
+ logging .info (
86
+ "There are a total of {} test-suites in folder {}" .format (
87
+ len (testsuite_spec_files ), testsuites_folder
88
+ )
89
+ )
90
+
82
91
logging .info (
83
92
"Using redis available at: {}:{} to read the event streams" .format (
84
93
GH_REDIS_SERVER_HOST , GH_REDIS_SERVER_PORT
@@ -108,7 +117,7 @@ def main():
108
117
)
109
118
)
110
119
try :
111
- rts = redis . StrictRedis (
120
+ rts = redistimeseries . client . Client (
112
121
host = args .datasink_redistimeseries_host ,
113
122
port = args .datasink_redistimeseries_port ,
114
123
decode_responses = True ,
@@ -127,11 +136,36 @@ def main():
127
136
exit (1 )
128
137
129
138
logging .info ("checking build spec requirements" )
139
+ build_runners_consumer_group_create (conn )
140
+ stream_id = None
141
+ docker_client = docker .from_env ()
142
+ home = str (Path .home ())
143
+ # TODO: confirm we do have enough cores to run the spec
144
+ # availabe_cpus = args.cpu_count
145
+ datasink_push_results_redistimeseries = args .datasink_push_results_redistimeseries
146
+ logging .info ("Entering blocking read waiting for work." )
147
+ if stream_id is None :
148
+ stream_id = args .consumer_start_id
149
+ while True :
150
+ _ , stream_id , _ = self_contained_coordinator_blocking_read (
151
+ conn ,
152
+ datasink_push_results_redistimeseries ,
153
+ docker_client ,
154
+ home ,
155
+ stream_id ,
156
+ rts ,
157
+ testsuite_spec_files ,
158
+ topologies_map ,
159
+ )
160
+
161
+
162
+ def build_runners_consumer_group_create (conn , id = "$" ):
130
163
try :
131
164
conn .xgroup_create (
132
165
STREAM_KEYNAME_NEW_BUILD_EVENTS ,
133
166
STREAM_GH_NEW_BUILD_RUNNERS_CG ,
134
167
mkstream = True ,
168
+ id = id ,
135
169
)
136
170
logging .info (
137
171
"Created consumer group named {} to distribute work." .format (
@@ -144,36 +178,44 @@ def main():
144
178
STREAM_GH_NEW_BUILD_RUNNERS_CG
145
179
)
146
180
)
147
- previous_id = None
148
- docker_client = docker .from_env ()
149
- home = str (Path .home ())
150
- # TODO: confirm we do have enough cores to run the spec
151
- # availabe_cpus = args.cpu_count
152
- datasink_push_results_redistimeseries = args .datasink_push_results_redistimeseries
153
181
154
- while True :
155
- logging .info ("Entering blocking read waiting for work." )
156
- if previous_id is None :
157
- previous_id = args .consumer_start_id
158
- newTestInfo = conn .xreadgroup (
159
- STREAM_GH_NEW_BUILD_RUNNERS_CG ,
160
- "{}-self-contained-proc#{}" .format (STREAM_GH_NEW_BUILD_RUNNERS_CG , "1" ),
161
- {STREAM_KEYNAME_NEW_BUILD_EVENTS : previous_id },
162
- count = 1 ,
163
- block = 0 ,
164
- )
165
- if len (newTestInfo [0 ]) < 2 or len (newTestInfo [0 ][1 ]) < 1 :
166
- previous_id = ">"
167
- continue
168
- previous_id = process_self_contained_coordinator_stream (
182
+
183
+ def self_contained_coordinator_blocking_read (
184
+ conn ,
185
+ datasink_push_results_redistimeseries ,
186
+ docker_client ,
187
+ home ,
188
+ stream_id ,
189
+ rts ,
190
+ testsuite_spec_files ,
191
+ topologies_map ,
192
+ ):
193
+ num_process_streams = 0
194
+ overall_result = False
195
+ consumer_name = "{}-self-contained-proc#{}" .format (
196
+ STREAM_GH_NEW_BUILD_RUNNERS_CG , "1"
197
+ )
198
+ newTestInfo = conn .xreadgroup (
199
+ STREAM_GH_NEW_BUILD_RUNNERS_CG ,
200
+ consumer_name ,
201
+ {STREAM_KEYNAME_NEW_BUILD_EVENTS : stream_id },
202
+ count = 1 ,
203
+ block = 0 ,
204
+ )
205
+ if len (newTestInfo [0 ]) < 2 or len (newTestInfo [0 ][1 ]) < 1 :
206
+ stream_id = ">"
207
+ else :
208
+ stream_id , overall_result = process_self_contained_coordinator_stream (
169
209
datasink_push_results_redistimeseries ,
170
210
docker_client ,
171
211
home ,
172
212
newTestInfo ,
173
213
rts ,
174
- testsuites_folder ,
214
+ testsuite_spec_files ,
175
215
topologies_map ,
176
216
)
217
+ num_process_streams = num_process_streams + 1
218
+ return overall_result , stream_id , num_process_streams
177
219
178
220
179
221
def process_self_contained_coordinator_stream (
@@ -182,12 +224,14 @@ def process_self_contained_coordinator_stream(
182
224
home ,
183
225
newTestInfo ,
184
226
rts ,
185
- testsuites_folder ,
227
+ testsuite_spec_files ,
186
228
topologies_map ,
187
229
):
188
230
stream_id , testDetails = newTestInfo [0 ][1 ][0 ]
189
231
stream_id = stream_id .decode ()
190
232
logging .info ("Received work . Stream id {}." .format (stream_id ))
233
+ overall_result = False
234
+
191
235
if b"git_hash" in testDetails :
192
236
(
193
237
build_artifacts ,
@@ -197,9 +241,8 @@ def process_self_contained_coordinator_stream(
197
241
run_image ,
198
242
) = extract_build_info_from_streamdata (testDetails )
199
243
200
- files = get_benchmark_specs (testsuites_folder )
201
-
202
- for test_file in files :
244
+ overall_result = True
245
+ for test_file in testsuite_spec_files :
203
246
redis_containers = []
204
247
client_containers = []
205
248
@@ -213,6 +256,7 @@ def process_self_contained_coordinator_stream(
213
256
_ ,
214
257
) = extract_redis_dbconfig_parameters (benchmark_config , "dbconfig" )
215
258
for topology_spec_name in benchmark_config ["redis-topologies" ]:
259
+ test_result = False
216
260
try :
217
261
current_cpu_pos = 0
218
262
ceil_db_cpu_limit = extract_db_cpu_limit (
@@ -383,13 +427,13 @@ def process_self_contained_coordinator_stream(
383
427
with open (local_benchmark_output_filename , "r" ) as json_file :
384
428
results_dict = json .load (json_file )
385
429
logging .info ("Final JSON result {}" .format (results_dict ))
386
-
430
+ dataset_load_duration_seconds = 0
387
431
timeseries_test_sucess_flow (
388
432
datasink_push_results_redistimeseries ,
389
433
git_version ,
390
434
benchmark_config ,
391
435
benchmark_duration_seconds ,
392
- None ,
436
+ dataset_load_duration_seconds ,
393
437
None ,
394
438
topology_spec_name ,
395
439
None ,
@@ -404,6 +448,7 @@ def process_self_contained_coordinator_stream(
404
448
tf_triggering_env ,
405
449
tsname_project_total_success ,
406
450
)
451
+ test_result = True
407
452
408
453
except :
409
454
logging .critical (
@@ -414,18 +459,38 @@ def process_self_contained_coordinator_stream(
414
459
print ("-" * 60 )
415
460
traceback .print_exc (file = sys .stdout )
416
461
print ("-" * 60 )
462
+ test_result = False
417
463
# tear-down
418
464
logging .info ("Tearing down setup" )
419
465
for container in redis_containers :
420
- container .stop ()
421
- for container in client_containers :
422
- if type (container ) != bytes :
466
+ try :
423
467
container .stop ()
468
+ except docker .errors .NotFound :
469
+ logging .info (
470
+ "When trying to stop DB container with id {} and image {} it was already stopped" .format (
471
+ container .id , container .image
472
+ )
473
+ )
474
+ pass
475
+
476
+ for container in client_containers :
477
+ if type (container ) == Container :
478
+ try :
479
+ container .stop ()
480
+ except docker .errors .NotFound :
481
+ logging .info (
482
+ "When trying to stop Client container with id {} and image {} it was already stopped" .format (
483
+ container .id , container .image
484
+ )
485
+ )
486
+ pass
424
487
shutil .rmtree (temporary_dir , ignore_errors = True )
425
488
489
+ overall_result &= test_result
490
+
426
491
else :
427
492
logging .error ("Missing commit information within received message." )
428
- return stream_id
493
+ return stream_id , overall_result
429
494
430
495
431
496
def get_benchmark_specs (testsuites_folder ):
0 commit comments