Skip to content

Commit 7c71815

Browse files
author
Brandon Kaplan
authored
Prod-1746-Hosted-Monitor-Idempotency (#115)
* add lambda option to monitor * Tests = * bump version * lint * reduce complexity
1 parent 7dea788 commit 7c71815

File tree

4 files changed

+45
-14
lines changed

4 files changed

+45
-14
lines changed

sync/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""Library for leveraging the power of Sync"""
22

3-
__version__ = "1.6.0"
3+
__version__ = "1.7.0"
44

55
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

sync/awsdatabricks.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import sync._databricks
1313
from sync._databricks import (
1414
_cluster_log_destination,
15-
get_all_cluster_events,
1615
_get_cluster_instances_from_dbfs,
1716
_update_monitored_timelines,
1817
_wait_for_cluster_termination,
@@ -23,6 +22,7 @@
2322
create_run,
2423
create_submission_for_run,
2524
create_submission_with_cluster_info,
25+
get_all_cluster_events,
2626
get_cluster,
2727
get_cluster_report,
2828
get_project_cluster,
@@ -48,9 +48,9 @@
4848
AccessReportLine,
4949
AccessStatusCode,
5050
AWSDatabricksClusterReport,
51+
DatabricksComputeType,
5152
DatabricksError,
5253
DatabricksPlanType,
53-
DatabricksComputeType,
5454
Response,
5555
)
5656
from sync.utils.dbfs import format_dbfs_filepath, write_dbfs_file
@@ -367,9 +367,11 @@ def monitor_cluster(
367367
spark_context_id = cluster.get("spark_context_id")
368368

369369
(log_url, filesystem, bucket, base_prefix) = _cluster_log_destination(cluster)
370+
write_function = None
370371
if cluster_report_destination_override:
371372
filesystem = cluster_report_destination_override.get("filesystem", filesystem)
372373
base_prefix = cluster_report_destination_override.get("base_prefix", base_prefix)
374+
write_function = cluster_report_destination_override.get("write_function")
373375

374376
if log_url or cluster_report_destination_override:
375377
_monitor_cluster(
@@ -378,6 +380,7 @@ def monitor_cluster(
378380
spark_context_id,
379381
polling_period,
380382
kill_on_termination,
383+
write_function,
381384
)
382385
else:
383386
logger.warning("Unable to monitor cluster due to missing cluster log destination - exiting")
@@ -389,6 +392,7 @@ def _monitor_cluster(
389392
spark_context_id: int,
390393
polling_period: int,
391394
kill_on_termination: bool = False,
395+
write_function=None,
392396
) -> None:
393397

394398
(log_url, filesystem, bucket, base_prefix) = cluster_log_destination
@@ -400,7 +404,7 @@ def _monitor_cluster(
400404
aws_region_name = DB_CONFIG.aws_region_name
401405
ec2 = boto.client("ec2", region_name=aws_region_name)
402406

403-
write_file = _define_write_file(file_key, filesystem, bucket)
407+
write_file = _define_write_file(file_key, filesystem, bucket, write_function)
404408

405409
all_inst_by_id = {}
406410
active_timelines_by_id = {}
@@ -454,8 +458,14 @@ def _monitor_cluster(
454458
sleep(polling_period)
455459

456460

457-
def _define_write_file(file_key, filesystem, bucket):
458-
if filesystem == "file":
461+
def _define_write_file(file_key, filesystem, bucket, write_function):
462+
if filesystem == "lambda":
463+
464+
def write_file(body: bytes):
465+
logger.info("Using custom lambda function to write data")
466+
write_function(body)
467+
468+
elif filesystem == "file":
459469
file_path = Path(file_key)
460470

461471
def ensure_path_exists(report_path: Path):

sync/azuredatabricks.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,13 @@
99

1010
from azure.common.credentials import get_cli_profile
1111
from azure.core.exceptions import ClientAuthenticationError
12-
from azure.identity import DefaultAzureCredential, ClientSecretCredential
12+
from azure.identity import ClientSecretCredential, DefaultAzureCredential
1313
from azure.mgmt.compute import ComputeManagementClient
1414
from azure.mgmt.resource import ResourceManagementClient
1515

1616
import sync._databricks
1717
from sync._databricks import (
1818
_cluster_log_destination,
19-
get_all_cluster_events,
2019
_get_cluster_instances_from_dbfs,
2120
_update_monitored_timelines,
2221
_wait_for_cluster_termination,
@@ -27,6 +26,7 @@
2726
create_run,
2827
create_submission_for_run,
2928
create_submission_with_cluster_info,
29+
get_all_cluster_events,
3030
get_cluster,
3131
get_cluster_report,
3232
get_project_cluster,
@@ -51,9 +51,9 @@
5151
AccessReportLine,
5252
AccessStatusCode,
5353
AzureDatabricksClusterReport,
54+
DatabricksComputeType,
5455
DatabricksError,
5556
DatabricksPlanType,
56-
DatabricksComputeType,
5757
Response,
5858
)
5959
from sync.utils.dbfs import format_dbfs_filepath, write_dbfs_file
@@ -338,9 +338,11 @@ def monitor_cluster(
338338
spark_context_id = cluster.get("spark_context_id")
339339

340340
(log_url, filesystem, bucket, base_prefix) = _cluster_log_destination(cluster)
341+
write_function = None
341342
if cluster_report_destination_override:
342343
filesystem = cluster_report_destination_override.get("filesystem", filesystem)
343344
base_prefix = cluster_report_destination_override.get("base_prefix", base_prefix)
345+
write_function = cluster_report_destination_override.get("write_function")
344346

345347
if log_url:
346348
_monitor_cluster(
@@ -349,6 +351,7 @@ def monitor_cluster(
349351
spark_context_id,
350352
polling_period,
351353
kill_on_termination,
354+
write_function,
352355
)
353356
else:
354357
logger.warning("Unable to monitor cluster due to missing cluster log destination - exiting")
@@ -360,6 +363,7 @@ def _monitor_cluster(
360363
spark_context_id: int,
361364
polling_period: int,
362365
kill_on_termination: bool = False,
366+
write_function=None,
363367
) -> None:
364368
(log_url, filesystem, bucket, base_prefix) = cluster_log_destination
365369
# If the event log destination is just a *bucket* without any sub-path, then we don't want to include
@@ -370,7 +374,7 @@ def _monitor_cluster(
370374
azure_logger = logging.getLogger("azure.core.pipeline.policies.http_logging_policy")
371375
azure_logger.setLevel(logging.WARNING)
372376

373-
write_file = _define_write_file(file_key, filesystem)
377+
write_file = _define_write_file(file_key, filesystem, write_function)
374378

375379
resource_group_name = _get_databricks_resource_group_name()
376380
if not resource_group_name:
@@ -418,8 +422,14 @@ def _monitor_cluster(
418422
sleep(polling_period)
419423

420424

421-
def _define_write_file(file_key, filesystem):
422-
if filesystem == "file":
425+
def _define_write_file(file_key, filesystem, write_function):
426+
if filesystem == "lambda":
427+
428+
def write_file(body: bytes):
429+
logger.info("Using custom lambda function to write data")
430+
write_function(body)
431+
432+
elif filesystem == "file":
423433
file_path = Path(file_key)
424434

425435
def ensure_path_exists(report_path: Path):

tests/test_awsdatabricks.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,24 @@ def test_monitor_cluster_with_override(
3939

4040
expected_log_destination_override = ("s3://bucket/path", "file", "bucket", "test_file_path")
4141
mock_monitor_cluster.assert_called_with(
42-
expected_log_destination_override, "0101-214342-tpi6qdp2", 1443449481634833945, 1, True
42+
expected_log_destination_override,
43+
"0101-214342-tpi6qdp2",
44+
1443449481634833945,
45+
1,
46+
True,
47+
None,
4348
)
4449

4550
mock_cluster_log_destination.return_value = (None, "s3", None, "path")
4651
monitor_cluster("0101-214342-tpi6qdp2", 1, cluster_report_destination_override, True)
4752
expected_log_destination_override = (None, "file", None, "test_file_path")
4853
mock_monitor_cluster.assert_called_with(
49-
expected_log_destination_override, "0101-214342-tpi6qdp2", 1443449481634833945, 1, True
54+
expected_log_destination_override,
55+
"0101-214342-tpi6qdp2",
56+
1443449481634833945,
57+
1,
58+
True,
59+
None,
5060
)
5161

5262
def test_monitor_cluster_without_override(
@@ -70,4 +80,5 @@ def test_monitor_cluster_without_override(
7080
1443449481634833945,
7181
1,
7282
False,
83+
None,
7384
)

0 commit comments

Comments
 (0)