Skip to content

Commit e43d10b

Browse files
Merge pull request #46 from synccomputingcode/add-dbfs-cluster-log-support-to-projects
add support for dbfs cluster log locations
2 parents 91fba90 + cda7cbc commit e43d10b

File tree

12 files changed

+312
-35
lines changed

12 files changed

+312
-35
lines changed

sync/_databricks.py

Lines changed: 108 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
import boto3 as boto
1616

1717
from sync.api.predictions import create_prediction_with_eventlog_bytes, get_prediction
18-
from sync.api.projects import create_project_submission_with_eventlog_bytes, get_project
18+
from sync.api.projects import (
19+
create_project_submission_with_eventlog_bytes,
20+
get_project,
21+
get_project_recommendation,
22+
)
1923
from sync.clients.databricks import get_default_client
2024
from sync.config import CONFIG
2125
from sync.models import DatabricksAPIError, DatabricksClusterReport, DatabricksError, Response
@@ -381,11 +385,11 @@ def get_cluster_report(
381385
if "error_code" in run:
382386
return Response(error=DatabricksAPIError(**run))
383387

384-
project_cluster_tasks = _get_project_cluster_tasks(run, exclude_tasks, project_id)
388+
project_cluster_tasks = _get_project_cluster_tasks(run, exclude_tasks)
385389
cluster_tasks = project_cluster_tasks.get(project_id)
386390
if not cluster_tasks:
387391
return Response(
388-
error=DatabricksError(f"Failed to locate cluster for project ID {project_id}")
392+
error=DatabricksError(message=f"Failed to locate cluster for project ID {project_id}")
389393
)
390394

391395
return _get_cluster_report(
@@ -582,6 +586,92 @@ def get_prediction_cluster(
582586
return prediction_response
583587

584588

589+
def get_recommendation_job(job_id: str, project_id: str, recommendation_id: str) -> Response[dict]:
590+
"""Apply the recommendation to the specified job.
591+
592+
The basis job can only have tasks that run on the same cluster. That cluster is updated with the
593+
configuration from the prediction and returned in the result job configuration. Use this function
594+
to apply a prediction to an existing job or test a prediction with a one-off run.
595+
596+
:param job_id: basis job ID
597+
:type job_id: str
598+
:param project_id: Sync project ID
599+
:type project_id: str
600+
:param recommendation_id: recommendation ID
601+
:type recommendation_id: str
602+
:return: job object with recommendation applied to it
603+
:rtype: Response[dict]
604+
"""
605+
job = get_default_client().get_job(job_id)
606+
607+
if "error_code" in job:
608+
return Response(error=DatabricksAPIError(**job))
609+
610+
job_settings = job["settings"]
611+
tasks = job_settings.get("tasks", [])
612+
if tasks:
613+
cluster_response = _get_job_cluster(tasks, job_settings.get("job_clusters", []))
614+
cluster = cluster_response.result
615+
if cluster:
616+
recommendation_cluster_response = get_recommendation_cluster(
617+
cluster, project_id, recommendation_id
618+
)
619+
recommendation_cluster = recommendation_cluster_response.result
620+
if recommendation_cluster:
621+
cluster_key = tasks[0].get("job_cluster_key")
622+
if cluster_key:
623+
job_settings["job_clusters"] = [
624+
j
625+
for j in job_settings["job_clusters"]
626+
if j.get("job_cluster_key") != cluster_key
627+
] + [{"job_cluster_key": cluster_key, "new_cluster": recommendation_cluster}]
628+
else:
629+
# For `new_cluster` definitions, Databricks will automatically assign the newly created cluster a name,
630+
# and will reject any run submissions where the `cluster_name` is pre-populated
631+
if "cluster_name" in recommendation_cluster:
632+
del recommendation_cluster["cluster_name"]
633+
tasks[0]["new_cluster"] = recommendation_cluster
634+
return Response(result=job)
635+
return recommendation_cluster_response
636+
return cluster_response
637+
return Response(error=DatabricksError(message="No task found in job"))
638+
639+
640+
def get_recommendation_cluster(
641+
cluster: dict, project_id: str, recommendation_id: str
642+
) -> Response[dict]:
643+
"""Apply the recommendation to the provided cluster.
644+
645+
The cluster is updated with configuration from the prediction and returned in the result.
646+
647+
:param cluster: Databricks cluster object
648+
:type cluster: dict
649+
:param project_id: Sync project ID
650+
:type project_id: str
651+
:param recommendation_id: The id of the recommendation to fetch and apply to the given cluster
652+
:type recommendation_id: str, optional
653+
:return: job object with prediction applied to it
654+
:rtype: Response[dict]
655+
"""
656+
recommendation_response = get_project_recommendation(project_id, recommendation_id)
657+
recommendation = recommendation_response.result.get("recommendation")
658+
if recommendation:
659+
# num_workers/autoscale are mutually exclusive settings, and we are relying on our Prediction
660+
# Recommendations to set these appropriately. Since we may recommend a Static cluster (i.e. a cluster
661+
# with `num_workers`) for a cluster that was originally autoscaled, we want to make sure to remove this
662+
# prior configuration
663+
if "num_workers" in cluster:
664+
del cluster["num_workers"]
665+
666+
if "autoscale" in cluster:
667+
del cluster["autoscale"]
668+
669+
recommendation_cluster = _deep_update(cluster, recommendation["configuration"])
670+
671+
return Response(result=recommendation_cluster)
672+
return recommendation_response
673+
674+
585675
def get_project_job(job_id: str, project_id: str, region_name: str = None) -> Response[dict]:
586676
"""Apply project configuration to a job.
587677
@@ -672,20 +762,32 @@ def get_project_cluster_settings(project_id: str, region_name: str = None) -> Re
672762
}
673763
}
674764

675-
s3_url = project.get("s3_url")
676-
if s3_url:
765+
cluster_log_url = urlparse(project.get("cluster_log_url"))
766+
if cluster_log_url.scheme == "s3":
677767
result.update(
678768
{
679769
"cluster_log_conf": {
680770
"s3": {
681-
"destination": f"{s3_url}/{project_id}",
771+
"destination": f"{cluster_log_url.geturl()}/{project_id}",
682772
"enable_encryption": True,
683773
"region": region_name or boto.client("s3").meta.region_name,
684774
"canned_acl": "bucket-owner-full-control",
685775
}
686776
}
687777
}
688778
)
779+
780+
elif cluster_log_url.scheme == "dbfs":
781+
result.update(
782+
{
783+
"cluster_log_conf": {
784+
"dbfs": {
785+
"destination": f"{cluster_log_url.geturl()}/{project_id}",
786+
}
787+
}
788+
}
789+
)
790+
689791
return Response(result=result)
690792
return project_response
691793

sync/api/projects.py

Lines changed: 119 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,22 @@
22
"""
33
import io
44
import logging
5+
from time import sleep
56
from typing import List
67
from urllib.parse import urlparse
78

89
import httpx
910

1011
from sync.api.predictions import generate_presigned_url, get_predictions
1112
from sync.clients.sync import get_default_client
12-
from sync.models import Platform, Preference, ProjectError, Response, SubmissionError
13+
from sync.models import (
14+
Platform,
15+
Preference,
16+
ProjectError,
17+
RecommendationError,
18+
Response,
19+
SubmissionError,
20+
)
1321

1422
logger = logging.getLogger()
1523

@@ -45,7 +53,7 @@ def create_project(
4553
product_code: str,
4654
description: str = None,
4755
job_id: str = None,
48-
s3_url: str = None,
56+
cluster_log_url: str = None,
4957
prediction_preference: Preference = Preference.ECONOMY,
5058
prediction_params: dict = None,
5159
app_id: str = None,
@@ -60,8 +68,8 @@ def create_project(
6068
:type description: str, optional
6169
:param job_id: Databricks job ID, defaults to None
6270
:type job_id: str, optional
63-
:param s3_url: S3 URL under which to store project configurations and logs, defaults to None
64-
:type s3_url: str, optional
71+
:param cluster_log_url: S3 or DBFS URL under which to store project configurations and logs, defaults to None
72+
:type cluster_log_url: str, optional
6573
:param prediction_preference: preferred prediction solution, defaults to `Preference.ECONOMY`
6674
:type prediction_preference: Preference, optional
6775
:param prediction_params: dictionary of prediction parameters, defaults to None. Valid options are documented `here <https://developers.synccomputing.com/reference/create_project_v1_projects_post>`__
@@ -78,7 +86,7 @@ def create_project(
7886
"product_code": product_code,
7987
"description": description,
8088
"job_id": job_id,
81-
"s3_url": s3_url,
89+
"cluster_log_url": cluster_log_url,
8290
"prediction_preference": prediction_preference,
8391
"prediction_params": prediction_params,
8492
"app_id": app_id,
@@ -101,7 +109,7 @@ def get_project(project_id: str) -> Response[dict]:
101109
def update_project(
102110
project_id: str,
103111
description: str = None,
104-
s3_url: str = None,
112+
cluster_log_url: str = None,
105113
app_id: str = None,
106114
prediction_preference: Preference = None,
107115
prediction_params: dict = None,
@@ -112,8 +120,8 @@ def update_project(
112120
:type project_id: str
113121
:param description: description, defaults to None
114122
:type description: str, optional
115-
:param s3_url: location of project event logs and configurations, defaults to None
116-
:type s3_url: str, optional
123+
:param cluster_log_url: location of project event logs and configurations, defaults to None
124+
:type cluster_log_url: str, optional
117125
:param app_id: external identifier, defaults to None
118126
:type app_id: str, optional
119127
:param prediction_preference: default preference for predictions, defaults to None
@@ -126,8 +134,8 @@ def update_project(
126134
project_update = {}
127135
if description:
128136
project_update["description"] = description
129-
if s3_url:
130-
project_update["s3_url"] = s3_url
137+
if cluster_log_url:
138+
project_update["cluster_log_url"] = cluster_log_url
131139
if app_id:
132140
project_update["app_id"] = app_id
133141
if prediction_preference:
@@ -187,7 +195,7 @@ def delete_project(project_id: str) -> Response[str]:
187195
def create_project_submission(
188196
platform: Platform, cluster_report: dict, eventlog_url: str, project_id: str
189197
) -> Response[str]:
190-
"""Create prediction
198+
"""Create a submission
191199
192200
:param platform: platform, e.g. "aws-emr"
193201
:type platform: Platform
@@ -211,13 +219,17 @@ def create_project_submission(
211219
else:
212220
return Response(error=SubmissionError(message="Unsupported event log URL scheme"))
213221

222+
payload = {
223+
"product": platform,
224+
"cluster_report": cluster_report,
225+
"event_log_uri": eventlog_http_url,
226+
}
227+
228+
logger.info(payload)
229+
214230
response = get_default_client().create_project_submission(
215231
project_id,
216-
{
217-
"product": platform,
218-
"cluster_report": cluster_report,
219-
"event_log_uri": eventlog_http_url,
220-
},
232+
payload,
221233
)
222234

223235
if response.get("error"):
@@ -226,14 +238,43 @@ def create_project_submission(
226238
return Response(result=response["result"]["submission_id"])
227239

228240

241+
def _clear_cluster_report_errors(cluster_report_orig: dict) -> dict:
242+
"""Clears error messages from the cluster_events field
243+
This circumvents issues where certain strange characters in the error fields of Azure cluster
244+
reports were causing the client to throw errors when trying to make submissions.
245+
246+
:param cluster_report_orig: cluster_report
247+
:type cluster_report_orig: dict
248+
:return: cleared cluster report
249+
:rtype: dict
250+
"""
251+
cluster_report = cluster_report_orig.copy()
252+
253+
def clear_error(event: dict):
254+
try:
255+
del event["details"]["reason"]["parameters"]["azure_error_message"]
256+
except KeyError:
257+
pass
258+
try:
259+
del event["details"]["reason"]["parameters"]["databricks_error_message"]
260+
except KeyError:
261+
pass
262+
263+
try:
264+
list(map(clear_error, cluster_report["cluster_events"]["events"]))
265+
except KeyError:
266+
pass
267+
return cluster_report
268+
269+
229270
def create_project_submission_with_eventlog_bytes(
230271
platform: Platform,
231272
cluster_report: dict,
232273
eventlog_name: str,
233274
eventlog_bytes: bytes,
234275
project_id: str,
235276
) -> Response[str]:
236-
"""Creates a prediction giving event log bytes instead of a URL
277+
"""Creates a submission given event log bytes instead of a URL
237278
238279
:param platform: platform, e.g. "aws-emr"
239280
:type platform: Platform
@@ -243,14 +284,15 @@ def create_project_submission_with_eventlog_bytes(
243284
:type eventlog_name: str
244285
:param eventlog_bytes: encoded event log
245286
:type eventlog_bytes: bytes
246-
:param project_id: ID of project to which the prediction belongs, defaults to None
247-
:type project_id: str, optional
287+
:param project_id: ID of project to which the submission belongs
288+
:type project_id: str
248289
:return: prediction ID
249290
:rtype: Response[str]
250291
"""
251292
# TODO - best way to handle "no eventlog"
293+
cluster_report_clear = _clear_cluster_report_errors(cluster_report)
252294
response = get_default_client().create_project_submission(
253-
project_id, {"product_code": platform, "cluster_report": cluster_report}
295+
project_id, {"product_code": platform, "cluster_report": cluster_report_clear}
254296
)
255297

256298
if response.get("error"):
@@ -269,3 +311,60 @@ def create_project_submission_with_eventlog_bytes(
269311
return Response(error=SubmissionError(message="Failed to upload event log"))
270312

271313
return Response(result=response["result"]["submission_id"])
314+
315+
316+
def create_project_recommendation(project_id: str, **options) -> Response[str]:
317+
"""Creates a prediction given a project id
318+
319+
:param project_id: ID of project to which the prediction belongs, defaults to None
320+
:type project_id: str, optional
321+
:return: prediction ID
322+
:rtype: Response[str]
323+
"""
324+
response = get_default_client().create_project_recommendation(project_id, **options)
325+
326+
if response.get("error"):
327+
return Response(**response)
328+
329+
return Response(result=response["result"]["id"])
330+
331+
332+
def wait_for_recommendation(project_id: str, recommendation_id: str) -> Response[dict]:
333+
"""Get a recommendation, wait if it's not ready
334+
335+
:param project_id: project ID
336+
:type project_id: str
337+
:param recommendation_id: recommendation ID
338+
:type recommendation_id: str
339+
:return: recommendation object
340+
:rtype: Response[dict]
341+
"""
342+
response = get_project_recommendation(project_id, recommendation_id)
343+
while response:
344+
result = response.result
345+
if result:
346+
if result["state"] == "SUCCESS":
347+
return Response(result=result)
348+
if result["state"] == "FAILURE":
349+
return Response(error=RecommendationError(message="Recommendation failed"))
350+
logger.info("Waiting for recommendation")
351+
sleep(10)
352+
response = get_project_recommendation(project_id, recommendation_id)
353+
354+
355+
def get_project_recommendation(project_id: str, recommendation_id: str) -> Response[dict]:
356+
"""Get a specific recommendation for a project id
357+
358+
:param project_id: project ID
359+
:type project_id: str
360+
:param recommendation_id: recommendation ID
361+
:type recommendation_id: str
362+
:return: recommendation object
363+
:rtype: Response[dict]
364+
"""
365+
response = get_default_client().get_project_recommendation(project_id, recommendation_id)
366+
367+
if response.get("error"):
368+
return Response(**response)
369+
370+
return Response(result=response["result"])

0 commit comments

Comments
 (0)