Skip to content

Commit eaa3039

Browse files
authored
ClI update n' bug fix (#42)
* Added support for updating project external IDs * Fixed cluster discovery bug * Version bump * Ignore tasks without a cluster
1 parent 6a29bbc commit eaa3039

File tree

5 files changed

+32
-15
lines changed

5 files changed

+32
-15
lines changed

sync/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
"""Library for leveraging the power of Sync"""
2-
__version__ = "0.4.0"
2+
__version__ = "0.4.1"
33

44
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"

sync/_databricks.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ def create_prediction(
2929
compute_type: str,
3030
cluster: dict,
3131
cluster_events: dict,
32-
instances: dict,
3332
eventlog: bytes,
33+
instances: dict = None,
3434
volumes: dict = None,
3535
tasks: List[dict] = None,
3636
project_id: str = None,
@@ -49,19 +49,20 @@ def create_prediction(
4949
If the cluster is a long-running cluster, this should only include events relevant to the time window that a
5050
run occurred in.
5151
:type cluster_events: dict
52+
:param eventlog: encoded event log zip
53+
:type eventlog: bytes
5254
:param instances: All EC2 Instances that were a part of the cluster. Expects a data format as is returned by
5355
`boto3's EC2.describe_instances API <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2/client/describe_instances.html>`_
5456
Instances should be narrowed to just those instances relevant to the Databricks Run. This can be done by passing
5557
a `tag:ClusterId` filter to the describe_instances call like -
5658
``Filters=[{"Name": "tag:ClusterId", "Values": ["my-dbx-clusterid"]}]``
5759
If there are multiple pages of instances, all pages should be accumulated into 1 dictionary and passed to this
5860
function
59-
:param eventlog: encoded event log zip
60-
:type eventlog: bytes
61-
:param tasks: The Databricks Tasks associated with the cluster
62-
:type tasks: List[dict]
61+
:type instances: dict, optional
6362
:param volumes: The EBS volumes that were attached to this cluster
6463
:type volumes: dict, optional
64+
:param tasks: The Databricks Tasks associated with the cluster
65+
:type tasks: List[dict]
6566
:param project_id: Sync project ID, defaults to None
6667
:type project_id: str, optional
6768
:return: prediction ID
@@ -1032,7 +1033,9 @@ def _get_cluster_id_and_tasks_from_run_tasks(
10321033
project_cluster_ids = defaultdict(list)
10331034
all_cluster_tasks = defaultdict(list)
10341035
for task in run["tasks"]:
1035-
if not exclude_tasks or task["task_key"] not in exclude_tasks:
1036+
if "cluster_instance" in task and (
1037+
not exclude_tasks or task["task_key"] not in exclude_tasks
1038+
):
10361039
cluster_id = task["cluster_instance"]["cluster_id"]
10371040
all_cluster_tasks[cluster_id].append(task)
10381041

@@ -1051,7 +1054,7 @@ def _get_cluster_id_and_tasks_from_run_tasks(
10511054
if cluster_ids:
10521055
project_cluster_tasks = {
10531056
cluster_id: tasks
1054-
for cluster_id, tasks in all_cluster_tasks
1057+
for cluster_id, tasks in all_cluster_tasks.items()
10551058
if cluster_id in cluster_ids
10561059
}
10571060
else:
@@ -1062,15 +1065,17 @@ def _get_cluster_id_and_tasks_from_run_tasks(
10621065
cluster_tasks = project_cluster_tasks or all_cluster_tasks
10631066
num_clusters = len(cluster_tasks)
10641067
if num_clusters == 0:
1065-
raise Exception("No cluster found for tasks")
1068+
raise RuntimeError("No cluster found for tasks")
10661069
elif num_clusters > 1:
1067-
raise Exception("More than 1 cluster found for tasks")
1070+
raise RuntimeError("More than 1 cluster found for tasks")
10681071

10691072
return cluster_tasks.popitem()
10701073

10711074

10721075
def _get_run_spark_context_id(tasks: List[dict]) -> Response[str]:
1073-
context_ids = {task["cluster_instance"]["spark_context_id"] for task in tasks}
1076+
context_ids = {
1077+
task["cluster_instance"]["spark_context_id"] for task in tasks if "cluster_instance" in task
1078+
}
10741079
num_ids = len(context_ids)
10751080

10761081
if num_ids == 1:

sync/api/projects.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ def update_project(
102102
project_id: str,
103103
description: str = None,
104104
s3_url: str = None,
105+
app_id: str = None,
105106
prediction_preference: Preference = None,
106107
prediction_params: dict = None,
107108
) -> Response[dict]:
@@ -113,6 +114,8 @@ def update_project(
113114
:type description: str, optional
114115
:param s3_url: location of project event logs and configurations, defaults to None
115116
:type s3_url: str, optional
117+
:param app_id: external identifier, defaults to None
118+
:type app_id: str, optional
116119
:param prediction_preference: default preference for predictions, defaults to None
117120
:type prediction_preference: Preference, optional
118121
:param prediction_params: dictionary of prediction parameters, defaults to None. Valid options are documented `here <https://developers.synccomputing.com/reference/update_project_v1_projects__project_id__put>`__
@@ -125,6 +128,8 @@ def update_project(
125128
project_update["description"] = description
126129
if s3_url:
127130
project_update["s3_url"] = s3_url
131+
if app_id:
132+
project_update["app_id"] = app_id
128133
if prediction_preference:
129134
project_update["prediction_preference"] = prediction_preference
130135
if prediction_params:

sync/cli/projects.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,17 +97,24 @@ def create(
9797
@click.argument("project-id")
9898
@click.option("-d", "--description")
9999
@click.option("-l", "--location", help="S3 URL under which to store event logs and configuration")
100+
@click.option(
101+
"-i", "--app-id", help="External identifier often based on the project's target application"
102+
)
100103
@click.option(
101104
"-p",
102105
"--preference",
103106
type=click.Choice(Preference),
104107
default=CONFIG.default_prediction_preference,
105108
)
106109
def update(
107-
project_id: str, description: str = None, location: str = None, preference: Preference = None
110+
project_id: str,
111+
description: str = None,
112+
location: str = None,
113+
app_id: str = None,
114+
preference: Preference = None,
108115
):
109116
"""Update a project"""
110-
response = update_project(project_id, description, location, preference)
117+
response = update_project(project_id, description, location, app_id, preference)
111118
if response.result:
112119
click.echo("Project updated")
113120
else:

sync/clients/sync.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def update_access_token(self, response: httpx.Response):
5050

5151

5252
class SyncClient(RetryableHTTPClient):
53-
def __init__(self, api_url, api_key):
53+
def __init__(self, api_url: str, api_key: APIKey):
5454
super().__init__(
5555
client=httpx.Client(
5656
base_url=api_url,
@@ -138,7 +138,7 @@ def _send(self, request: httpx.Request) -> dict:
138138

139139

140140
class ASyncClient(RetryableHTTPClient):
141-
def __init__(self, api_url, api_key):
141+
def __init__(self, api_url: str, api_key: APIKey):
142142
super().__init__(
143143
client=httpx.AsyncClient(
144144
base_url=api_url,

0 commit comments

Comments
 (0)