1
1
import logging
2
- from sync .databricks .integrations ._run_submit_runner import apply_sync_gradient_cluster_recommendation
2
+
3
+ from sync .databricks .integrations ._run_submit_runner import (
4
+ apply_sync_gradient_cluster_recommendation ,
5
+ )
3
6
4
7
logger = logging .getLogger (__name__ )
5
8
@@ -9,21 +12,28 @@ def airflow_gradient_pre_execute_hook(context: dict):
9
12
logger .info ("Running airflow gradient pre-execute hook!" )
10
13
logger .debug (f"Airflow operator context - context:{ context } " )
11
14
15
+ task_id = context ["task" ].task_id
12
16
gradient_app_id = context ["params" ]["gradient_app_id" ]
13
17
auto_apply = context ["params" ]["gradient_auto_apply" ]
14
18
cluster_log_url = context ["params" ]["cluster_log_url" ]
15
19
workspace_id = context ["params" ]["databricks_workspace_id" ]
16
- run_submit_task = context ["task" ].json .copy () # copy the run submit json from the task context
20
+ run_submit_task = context [
21
+ "task"
22
+ ].json .copy () # copy the run submit json from the task context
17
23
18
24
updated_task_configuration = apply_sync_gradient_cluster_recommendation (
19
25
run_submit_task = run_submit_task ,
20
- gradient_app_id = gradient_app_id ,
26
+ gradient_app_id = build_app_id ( task_id , gradient_app_id ) ,
21
27
auto_apply = auto_apply ,
22
28
cluster_log_url = cluster_log_url ,
23
- workspace_id = workspace_id
29
+ workspace_id = workspace_id ,
24
30
)
25
31
26
32
context ["task" ].json = updated_task_configuration
27
33
except Exception as e :
28
34
logger .exception (e )
29
35
logger .error ("Unable to apply gradient configuration to Databricks run submit tasks" )
36
+
37
+
38
+ def build_app_id (task_id : str , app_id : str ):
39
+ return f"{ task_id } -{ app_id } "
0 commit comments