Skip to content
Draft
119 changes: 109 additions & 10 deletions dask_kubernetes/operator/operator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import asyncio
import copy

from distributed.core import rpc
import aiohttp
from contextlib import suppress

Expand All @@ -15,36 +18,59 @@
)


def build_scheduler_pod_spec(name, spec):
return {
def build_scheduler_pod_spec(cluster_name, spec):
scheduler_name = f"{cluster_name}-scheduler"
pod_spec = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": f"{name}-scheduler",
"name": scheduler_name,
"labels": {
"dask.org/cluster-name": name,
"dask.org/cluster-name": cluster_name,
"dask.org/component": "scheduler",
"sidecar.istio.io/inject": "false",
"app": "scheduler",
"version": "v1",
},
},
"spec": spec,
}

pod_spec["spec"]["serviceAccountName"] = f"{scheduler_name}-service"

return pod_spec

def build_scheduler_service_spec(name, spec):

def build_scheduler_service_spec(cluster_name, spec):
return {
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": f"{name}-service",
"name": f"{cluster_name}-scheduler-service",
"labels": {
"dask.org/cluster-name": name,
"dask.org/cluster-name": cluster_name,
"app": "scheduler",
"service": "scheduler",
},
},
"spec": spec,
}


def build_scheduler_service_account_spec(cluster_name):
scheduler_service_name = f"{cluster_name}-scheduler-service"
return {
"apiVersion": "v1",
"kind": "ServiceAccount",
"metadata": {
"name": scheduler_service_name,
"labels": {
"dask.org/cluster-name": cluster_name,
"account": scheduler_service_name,
},
},
}


def build_worker_pod_spec(worker_group_name, namespace, cluster_name, uuid, spec):
worker_name = f"{worker_group_name}-worker-{uuid}"
pod_spec = {
Expand All @@ -56,10 +82,10 @@ def build_worker_pod_spec(worker_group_name, namespace, cluster_name, uuid, spec
"dask.org/cluster-name": cluster_name,
"dask.org/workergroup-name": worker_group_name,
"dask.org/component": "worker",
"sidecar.istio.io/inject": "false",
"dask.org/worker-name": worker_name,
},
},
"spec": spec,
"spec": copy.copy(spec),
}
env = [
{
Expand Down Expand Up @@ -119,6 +145,54 @@ def build_worker_group_spec(name, spec):
}


def build_worker_service_spec(cluster_name, worker_name):
return {
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": f"{worker_name}-service",
"labels": {
"dask.org/cluster-name": cluster_name,
},
},
"spec": {
"type": "ClusterIP",
"selector": {
"dask.org/cluster-name": cluster_name,
"dask.org/worker-name": worker_name,
},
"ports": [
{
"name": "tcp-comm",
"protocol": "TCP",
"port": 8788,
"targetPort": "tcp-comm",
},
{
"name": "http-dashboard",
"protocol": "TCP",
"port": 8787,
"targetPort": "http-dashboard",
},
],
},
}


def build_worker_service_account_spec(cluster_name, worker_name):
return {
"apiVersion": "v1",
"kind": "ServiceAccount",
"metadata": {
"name": f"{worker_name}-service",
"labels": {
"dask.org/cluster-name": cluster_name,
"account": f"{worker_name}-service",
},
},
}


def build_cluster_spec(name, worker_spec, scheduler_spec):
return {
"apiVersion": "kubernetes.dask.org/v1",
Expand Down Expand Up @@ -151,6 +225,13 @@ async def daskcluster_create(spec, name, namespace, logger, **kwargs):
async with kubernetes.client.api_client.ApiClient() as api_client:
api = kubernetes.client.CoreV1Api(api_client)

scheduler_service_account_spec = build_scheduler_service_account_spec(name)
kopf.adopt(scheduler_service_account_spec)
await api.create_namespaced_service_account(
namespace=namespace,
body=scheduler_service_account_spec,
)

# TODO Check for existing scheduler pod
scheduler_spec = spec.get("scheduler", {})
data = build_scheduler_pod_spec(name, scheduler_spec.get("spec"))
Expand Down Expand Up @@ -288,6 +369,24 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs):

if workers_needed > 0:
for _ in range(workers_needed):
worker_name = f"{name}-worker-{uuid4().hex[:10]}"

worker_service_account_spec = build_worker_service_account_spec(
spec["cluster"], worker_name
)
kopf.adopt(worker_service_account_spec)
await api.create_namespaced_service_account(
namespace=namespace,
body=worker_service_account_spec,
)

data = build_worker_service_spec(spec["cluster"], worker_name)
kopf.adopt(data)
await api.create_namespaced_service(
namespace=namespace,
body=data,
)
await wait_for_service(api, data["metadata"]["name"], namespace)
data = build_worker_pod_spec(
worker_group_name=name,
namespace=namespace,
Expand Down
15 changes: 15 additions & 0 deletions dask_kubernetes/operator/tests/resources/simplecluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,23 @@ spec:
imagePullPolicy: "IfNotPresent"
args:
- dask-worker
- tcp://simple-cluster-scheduler-service.default.svc.cluster.local:8786
- --no-nanny
- --name
- $(DASK_WORKER_NAME)
- --dashboard-address
- 0.0.0.0:8787
- --listen-address
- tcp://0.0.0.0:8788
- --contact-address
- tcp://$(DASK_WORKER_NAME)-service.default.svc.cluster.local:8788
ports:
- name: tcp-comm
containerPort: 8788
protocol: TCP
- name: http-dashboard
containerPort: 8787
protocol: TCP
env:
- name: WORKER_ENV
value: hello-world # We dont test the value, just the name
Expand Down
2 changes: 1 addition & 1 deletion dask_kubernetes/operator/tests/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster):
await client.wait_for_workers(3)


@pytest.mark.timeout(180)
@pytest.mark.timeout(300)
@pytest.mark.asyncio
async def test_simplecluster(k8s_cluster, kopf_runner, gen_cluster):
with kopf_runner as runner:
Expand Down