From ec4452e9c73bbad9c048dd9c6c58fbcb92d5ce41 Mon Sep 17 00:00:00 2001 From: Femi Novia Lina Date: Tue, 29 Jul 2025 15:24:23 +0700 Subject: [PATCH 1/3] feat: enable firehose keda autoscaler --- modules/firehose/autoscaler.go | 71 +++++++ modules/firehose/config.go | 8 + modules/firehose/driver.go | 34 ++-- modules/firehose/driver_output.go | 6 + modules/firehose/driver_output_test.go | 147 +++++++++++++- modules/firehose/driver_plan.go | 22 +++ modules/firehose/driver_sync.go | 4 + modules/firehose/driver_sync_test.go | 33 +++- modules/firehose/keda.go | 258 +++++++++++++++++++++++++ modules/firehose/module.go | 7 + modules/firehose/schema/config.json | 18 ++ pkg/kube/client.go | 53 +++++ 12 files changed, 642 insertions(+), 19 deletions(-) create mode 100644 modules/firehose/autoscaler.go create mode 100644 modules/firehose/keda.go diff --git a/modules/firehose/autoscaler.go b/modules/firehose/autoscaler.go new file mode 100644 index 00000000..fc54d495 --- /dev/null +++ b/modules/firehose/autoscaler.go @@ -0,0 +1,71 @@ +package firehose + +import ( + "encoding/json" + + "github.com/goto/entropy/pkg/errors" +) + +type AutoscalerType string + +const ( + KEDA AutoscalerType = "keda" +) + +type AutoscalerSpec interface { + ReadConfig(cfg Config, driverConf driverConf) error + Pause(replica ...int) + Resume() + GetHelmValues(cfg Config) (map[string]any, error) +} + +type Autoscaler struct { + Enabled bool `json:"enabled"` + Type AutoscalerType `json:"type,omitempty"` + Spec AutoscalerSpec `json:"spec,omitempty"` +} + +type FirehoseAutoscaler struct { + Keda Keda `json:"keda,omitempty"` +} + +func (autoscaler *Autoscaler) GetHelmValues(cfg Config) (map[string]any, error) { + values := map[string]any{ + "enabled": autoscaler.Enabled, + "type": autoscaler.Type, + } + + typeValues, err := autoscaler.Spec.GetHelmValues(cfg) + if err != nil { + return nil, err + } + values[string(autoscaler.Type)] = typeValues + + return values, nil +} + +func (autoscaler *Autoscaler) UnmarshalJSON(data []byte) error { + type BaseAutoscaler Autoscaler + autoscalerTemp := &struct { + Spec json.RawMessage `json:"spec"` + *BaseAutoscaler + }{ + BaseAutoscaler: (*BaseAutoscaler)(autoscaler), + } + + if err := json.Unmarshal(data, &autoscalerTemp); err != nil { + return errors.ErrInvalid.WithMsgf("invalid autoscaler config").WithCausef(err.Error()) + } + + switch autoscalerTemp.Type { + case KEDA: + var kedaSpec *Keda + if err := json.Unmarshal(autoscalerTemp.Spec, &kedaSpec); err != nil { + return errors.ErrInvalid.WithMsgf("invalid keda config").WithCausef(err.Error()) + } + autoscaler.Spec = kedaSpec + default: + return errors.ErrInvalid.WithMsgf("unsupported autoscaler type: %s", autoscaler.Type) + } + return nil +} diff --git a/modules/firehose/config.go b/modules/firehose/config.go index 8119ec4e..f1b20005 100644 --- a/modules/firehose/config.go +++ b/modules/firehose/config.go @@ -16,6 +16,7 @@ const ( confSinkType = "SINK_TYPE" confKeyConsumerID = "SOURCE_KAFKA_CONSUMER_GROUP_ID" confKeyKafkaBrokers = "SOURCE_KAFKA_BROKERS" + confKeyKafkaTopic = "SOURCE_KAFKA_TOPIC" ) const helmReleaseNameMaxLength = 53 @@ -65,6 +66,7 @@ type Config struct { Telegraf *Telegraf `json:"telegraf,omitempty"` ChartValues *ChartValues `json:"chart_values,omitempty"` InitContainer InitContainer `json:"init_container,omitempty"` + Autoscaler *Autoscaler `json:"autoscaler,omitempty"` } type Telegraf struct { @@ -131,5 +133,11 @@ func readConfig(r resource.Resource, confJSON json.RawMessage, dc driverConf) (* cfg.Namespace = ns } + if cfg.Autoscaler != nil && cfg.Autoscaler.Enabled { + if err := cfg.Autoscaler.Spec.ReadConfig(cfg, dc); err != nil { + return nil, err + } + } + return &cfg, nil } diff --git a/modules/firehose/driver.go b/modules/firehose/driver.go index d4d07d7b..de1899c4 100644 --- a/modules/firehose/driver.go +++ b/modules/firehose/driver.go @@ -71,17 +71,19 @@ var defaultDriverConf = driverConf{ } type firehoseDriver struct { - timeNow func() time.Time - conf driverConf - kubeDeploy kubeDeployFn - kubeGetPod kubeGetPodFn - consumerReset consumerResetFn + timeNow func() time.Time + conf driverConf + kubeDeploy kubeDeployFn + kubeGetPod kubeGetPodFn + kubeGetDeployment kubeGetDeploymentFn + consumerReset consumerResetFn } type ( - kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error - kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) - consumerResetFn func(ctx context.Context, conf Config, out kubernetes.Output, resetTo string, offsetResetDelaySeconds int) error + kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error + kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) + kubeGetDeploymentFn func(ctx context.Context, conf kube.Config, ns string, name string) (kube.Deployment, error) + consumerResetFn func(ctx context.Context, conf Config, out kubernetes.Output, resetTo string, offsetResetDelaySeconds int) error ) type driverConf struct { @@ -130,6 +132,8 @@ type driverConf struct { // timeout value for a kube deployment run KubeDeployTimeout int `json:"kube_deploy_timeout_seconds"` + + Autoscaler FirehoseAutoscaler `json:"autoscaler,omitempty"` } type RequestsAndLimits struct { @@ -170,9 +174,10 @@ type UsageSpec struct { } type Output struct { - Pods []kube.Pod `json:"pods,omitempty"` - Namespace string `json:"namespace,omitempty"` - ReleaseName string `json:"release_name,omitempty"` + Pods []kube.Pod `json:"pods,omitempty"` + Namespace string `json:"namespace,omitempty"` + ReleaseName string `json:"release_name,omitempty"` + Deployment *kube.Deployment `json:"deployment,omitempty"` } type transientData struct { @@ -361,6 +366,13 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config, "mountSecrets": mountSecrets, } + if conf.Autoscaler != nil { + rc.Values["autoscaler"], err = conf.Autoscaler.GetHelmValues(conf) + if err != nil { + return nil, err + } + } + return rc, nil } diff --git a/modules/firehose/driver_output.go b/modules/firehose/driver_output.go index da4b73ff..1101adf4 100644 --- a/modules/firehose/driver_output.go +++ b/modules/firehose/driver_output.go @@ -45,5 +45,11 @@ func (fd *firehoseDriver) refreshOutput(ctx context.Context, r resource.Resource output.Pods = pods output.Namespace = conf.Namespace + deployment, err := fd.kubeGetDeployment(ctx, kubeOut.Configs, rc.Namespace, conf.DeploymentID) + if err != nil { + return nil, errors.ErrInternal.WithCausef(err.Error()) + } + output.Deployment = &deployment + return modules.MustJSON(output), nil } diff --git a/modules/firehose/driver_output_test.go b/modules/firehose/driver_output_test.go index 03229c8d..7e8f1a50 100644 --- a/modules/firehose/driver_output_test.go +++ b/modules/firehose/driver_output_test.go @@ -20,11 +20,12 @@ func TestFirehoseDriver_Output(t *testing.T) { t.Parallel() table := []struct { - title string - kubeGetPod func(t *testing.T) kubeGetPodFn - exr module.ExpandedResource - want json.RawMessage - wantErr error + title string + kubeGetPod func(t *testing.T) kubeGetPodFn + kubeGetDeployment func(t *testing.T) kubeGetDeploymentFn + exr module.ExpandedResource + want json.RawMessage + wantErr error }{ { title: "InvalidModuleData", @@ -109,6 +110,20 @@ func TestFirehoseDriver_Output(t *testing.T) { }, nil } }, + kubeGetDeployment: func(t *testing.T) kubeGetDeploymentFn { + t.Helper() + return func(ctx context.Context, conf kube.Config, ns string, name string) (kube.Deployment, error) { + assert.Equal(t, ns, "firehose") + return kube.Deployment{ + Name: "foo-bar", + Paused: false, + ReadyReplicas: 1, + AvailableReplicas: 1, + UnavailableReplicas: 2, + Conditions: []map[string]string{}, + }, nil + } + }, want: modules.MustJSON(Output{ Pods: []kube.Pod{ { @@ -116,6 +131,102 @@ func TestFirehoseDriver_Output(t *testing.T) { Containers: []string{"firehose"}, }, }, + Deployment: &kube.Deployment{ + Name: "foo-bar", + Paused: false, + ReadyReplicas: 1, + AvailableReplicas: 1, + UnavailableReplicas: 2, + Conditions: []map[string]string{}, + }, + Namespace: "firehose", + ReleaseName: "foo-bar", + }), + }, + { + title: "GetDeployment_Failure", + exr: sampleResourceWithState(resource.State{ + Status: resource.StatusCompleted, + Output: modules.MustJSON(Output{ + Pods: nil, + Namespace: "firehose", + ReleaseName: "foo-bar", + }), + }, "LOG", "firehose"), + kubeGetPod: func(t *testing.T) kubeGetPodFn { + t.Helper() + return func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) { + assert.Equal(t, ns, "firehose") + assert.Equal(t, labels["app"], "firehose-foo-fh1") + return []kube.Pod{ + { + Name: "foo-1", + Containers: []string{"firehose"}, + }, + }, nil + } + }, + kubeGetDeployment: func(t *testing.T) kubeGetDeploymentFn { + t.Helper() + return func(ctx context.Context, conf kube.Config, ns string, name string) (kube.Deployment, error) { + assert.Equal(t, ns, "firehose") + return kube.Deployment{}, errors.New("failed") + } + }, + wantErr: errors.ErrInternal, + }, + { + title: "GetDeployment_Success", + exr: sampleResourceWithState(resource.State{ + Status: resource.StatusCompleted, + Output: modules.MustJSON(Output{ + Pods: nil, + Namespace: "firehose", + ReleaseName: "foo-bar", + }), + }, "LOG", "firehose"), + kubeGetPod: func(t *testing.T) kubeGetPodFn { + t.Helper() + return func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) { + assert.Equal(t, ns, "firehose") + assert.Equal(t, labels["app"], "firehose-foo-fh1") + return []kube.Pod{ + { + Name: "foo-1", + Containers: []string{"firehose"}, + }, + }, nil + } + }, + kubeGetDeployment: func(t *testing.T) kubeGetDeploymentFn { + t.Helper() + return func(ctx context.Context, conf kube.Config, ns string, name string) (kube.Deployment, error) { + assert.Equal(t, ns, "firehose") + return kube.Deployment{ + Name: "foo-bar", + Paused: false, + ReadyReplicas: 1, + AvailableReplicas: 1, + UnavailableReplicas: 2, + Conditions: []map[string]string{}, + }, nil + } + }, + want: modules.MustJSON(Output{ + Pods: []kube.Pod{ + { + Name: "foo-1", + Containers: []string{"firehose"}, + }, + }, + Deployment: &kube.Deployment{ + Name: "foo-bar", + Paused: false, + ReadyReplicas: 1, + AvailableReplicas: 1, + UnavailableReplicas: 2, + Conditions: []map[string]string{}, + }, Namespace: "firehose", ReleaseName: "foo-bar", }), @@ -143,6 +254,20 @@ func TestFirehoseDriver_Output(t *testing.T) { }, nil } }, + kubeGetDeployment: func(t *testing.T) kubeGetDeploymentFn { + t.Helper() + return func(ctx context.Context, conf kube.Config, ns string, name string) (kube.Deployment, error) { + assert.Equal(t, ns, "bigquery-firehose") + return kube.Deployment{ + Name: "foo-bar", + Paused: false, + ReadyReplicas: 1, + AvailableReplicas: 1, + UnavailableReplicas: 2, + Conditions: []map[string]string{}, + }, nil + } + }, want: modules.MustJSON(Output{ Pods: []kube.Pod{ { @@ -150,6 +275,14 @@ func TestFirehoseDriver_Output(t *testing.T) { Containers: []string{"firehose"}, }, }, + Deployment: &kube.Deployment{ + Name: "foo-bar", + Paused: false, + ReadyReplicas: 1, + AvailableReplicas: 1, + UnavailableReplicas: 2, + Conditions: []map[string]string{}, + }, Namespace: "bigquery-firehose", ReleaseName: "foo-bar", }), @@ -172,6 +305,10 @@ func TestFirehoseDriver_Output(t *testing.T) { fd.kubeGetPod = tt.kubeGetPod(t) } + if tt.kubeGetDeployment != nil { + fd.kubeGetDeployment = tt.kubeGetDeployment(t) + } + got, err := fd.Output(context.Background(), tt.exr) if tt.wantErr != nil { require.Error(t, err) diff --git a/modules/firehose/driver_plan.go b/modules/firehose/driver_plan.go index 91b6f620..4f0a83db 100644 --- a/modules/firehose/driver_plan.go +++ b/modules/firehose/driver_plan.go @@ -106,9 +106,17 @@ func (fd *firehoseDriver) planChange(exr module.ExpandedResource, act module.Act curConf.StopTime = startParams.StopTime } + if curConf.Autoscaler != nil { + curConf.Autoscaler.Spec.Resume() + } + case StopAction: curConf.Stopped = true + if curConf.Autoscaler != nil { + curConf.Autoscaler.Spec.Pause(0) + } + case UpgradeAction: // upgrade the chart values to the latest project-level config. // Note: upgrade/downgrade will happen based on module-level configs. @@ -237,6 +245,20 @@ func (fd *firehoseDriver) planReset(exr module.ExpandedResource, act module.Acti return nil, err } + // if keda autoscaler enabled, update scaler metadata value + if curConf.Autoscaler != nil && curConf.Autoscaler.Type == KEDA { + kedaSpec, ok := curConf.Autoscaler.Spec.(*Keda) + if !ok { + return nil, err + } + + err = kedaSpec.updateTriggersMetadata(curConf.EnvVariables) + if err != nil { + return nil, err + } + curConf.Autoscaler.Spec = kedaSpec + } + exr.Resource.Spec.Configs = modules.MustJSON(curConf) exr.Resource.State = resource.State{ Status: resource.StatusPending, diff --git a/modules/firehose/driver_sync.go b/modules/firehose/driver_sync.go index de3a8758..75e846b5 100644 --- a/modules/firehose/driver_sync.go +++ b/modules/firehose/driver_sync.go @@ -49,6 +49,10 @@ func (fd *firehoseDriver) Sync(ctx context.Context, exr module.ExpandedResource) // config changes during Sync() are not saved. if pendingStep == stepReleaseStop || conf.Stopped { conf.Replicas = 0 + + if conf.Autoscaler != nil { + conf.Autoscaler.Spec.Pause(0) + } } isCreate := pendingStep == stepReleaseCreate diff --git a/modules/firehose/driver_sync_test.go b/modules/firehose/driver_sync_test.go index b82020ac..c774e647 100644 --- a/modules/firehose/driver_sync_test.go +++ b/modules/firehose/driver_sync_test.go @@ -21,9 +21,10 @@ func TestFirehoseDriver_Sync(t *testing.T) { t.Parallel() table := []struct { - title string - kubeDeploy func(t *testing.T) kubeDeployFn - kubeGetPod func(t *testing.T) kubeGetPodFn + title string + kubeDeploy func(t *testing.T) kubeDeployFn + kubeGetPod func(t *testing.T) kubeGetPodFn + kubeGetDeployment func(t *testing.T) kubeGetDeploymentFn exr module.ExpandedResource want *resource.State @@ -95,6 +96,20 @@ func TestFirehoseDriver_Sync(t *testing.T) { }, nil } }, + kubeGetDeployment: func(t *testing.T) kubeGetDeploymentFn { + t.Helper() + return func(ctx context.Context, conf kube.Config, ns string, name string) (kube.Deployment, error) { + assert.Equal(t, ns, "firehose") + return kube.Deployment{ + Name: "foo-1", + Paused: false, + ReadyReplicas: 1, + AvailableReplicas: 1, + UnavailableReplicas: 2, + Conditions: []map[string]string{}, + }, nil + } + }, want: &resource.State{ Status: resource.StatusCompleted, Output: modules.MustJSON(Output{ @@ -105,6 +120,14 @@ func TestFirehoseDriver_Sync(t *testing.T) { Containers: []string{"firehose"}, }, }, + Deployment: &kube.Deployment{ + Name: "foo-1", + Paused: false, + ReadyReplicas: 1, + AvailableReplicas: 1, + UnavailableReplicas: 2, + Conditions: []map[string]string{}, + }, }), ModuleData: nil, }, @@ -248,6 +271,10 @@ func TestFirehoseDriver_Sync(t *testing.T) { fd.kubeDeploy = tt.kubeDeploy(t) } + if tt.kubeGetDeployment != nil { + fd.kubeGetDeployment = tt.kubeGetDeployment(t) + } + got, err := fd.Sync(context.Background(), tt.exr) if tt.wantErr != nil { require.Error(t, err) diff --git a/modules/firehose/keda.go b/modules/firehose/keda.go new file mode 100644 index 00000000..ed3cb824 --- /dev/null +++ b/modules/firehose/keda.go @@ -0,0 +1,258 @@ +package firehose + +import ( + "fmt" + "maps" + + "github.com/goto/entropy/pkg/errors" +) + +type Scaler string + +const ( + KAFKA Scaler = "kafka" + PROMETHEUS Scaler = "prometheus" +) + +const ( + KedaPausedAnnotationKey = "autoscaling.keda.sh/paused" + KedaPausedReplicaAnnotationKey = "autoscaling.keda.sh/paused-replicas" +) + +type Keda struct { + Paused bool `json:"paused,omitempty"` + PausedWithReplica bool `json:"paused_with_replica,omitempty"` + PausedReplica int `json:"paused_replica,omitempty"` + MinReplicas int `json:"min_replicas,omitempty"` + MaxReplicas int `json:"max_replicas,omitempty"` + PollingInterval int `json:"polling_interval,omitempty"` + CooldownPeriod int `json:"cooldown_period,omitempty"` + Triggers map[string]Trigger `json:"triggers,omitempty"` + RestoreToOriginalReplica bool `json:"restore_to_original_replica_count,omitempty"` + Fallback *Fallback `json:"fallback,omitempty"` + HPA *HorizontalPodAutoscaler `json:"hpa,omitempty"` +} + +type Trigger struct { + Type Scaler `json:"type,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` + AuthenticationRef *AuthenticationRef `json:"authentication_ref,omitempty"` +} + +type AuthenticationRef struct { + Name string `json:"name,omitempty" validate:"required"` + Kind string `json:"kind,omitempty"` +} + +type Fallback struct { + Behavior string `json:"behavior,omitempty"` + Replicas int `json:"replicas,omitempty"` + FailureThreshold int `json:"failure_threshold,omitempty"` +} + +type HorizontalPodAutoscaler struct { + ScaleDown ScaleBehaviour `json:"scale_down,omitempty"` + ScaleUp ScaleBehaviour `json:"scale_up,omitempty"` +} + +type ScaleBehaviour struct { + Policies []Policy `json:"policies,omitempty"` + StabilizationWindowSeconds *int `json:"stabilization_window_seconds,omitempty"` + Tolerance *float32 `json:"tolerance,omitempty"` +} + +type Policy struct { + Type string `json:"type,omitempty"` + Value float32 `json:"value,omitempty"` + PeriodSeconds int `json:"period_seconds,omitempty"` +} + +func (keda *Keda) ReadConfig(cfg Config, driverCfg driverConf) error { + kedaConfig := driverCfg.Autoscaler.Keda + + mergedTriggers := deepCopyTriggers(driverCfg.Autoscaler.Keda.Triggers) + for key, trigger := range keda.Triggers { + if existingTrigger, exists := mergedTriggers[key]; exists { + maps.Copy(existingTrigger.Metadata, trigger.Metadata) + if trigger.AuthenticationRef != nil && trigger.AuthenticationRef.Name != "" { + existingTrigger.AuthenticationRef = trigger.AuthenticationRef + } + existingTrigger.Type = trigger.Type + mergedTriggers[key] = existingTrigger + } + + if _, exists := mergedTriggers[key]; !exists { + mergedTriggers[key] = trigger + } + } + kedaConfig.Triggers = mergedTriggers + kedaConfig.updateTriggersMetadata(cfg.EnvVariables) + + if keda.MinReplicas > 0 || keda.MaxReplicas > 0 { + if keda.MinReplicas < 0 { + return errors.ErrInvalid.WithMsgf("min_replicas must be greater than or equal to 0") + } + + if keda.MaxReplicas < 1 { + return errors.ErrInvalid.WithMsgf("max_replicas must be greater than or equal to 1") + } + + if keda.MinReplicas > keda.MaxReplicas { + return errors.ErrInvalid.WithMsgf("min_replicas must be less than or equal to max_replicas") + } + + kedaConfig.MinReplicas = keda.MinReplicas + kedaConfig.MaxReplicas = keda.MaxReplicas + } + + if keda.Fallback != nil && keda.Fallback.Behavior != "" { + kedaConfig.Fallback = keda.Fallback + } + + if keda.HPA != nil { + kedaConfig.HPA = keda.HPA + } + + kedaConfig.Paused = keda.Paused + kedaConfig.PausedWithReplica = keda.PausedWithReplica + kedaConfig.PausedReplica = keda.PausedReplica + + *keda = kedaConfig + return nil +} + +func (keda *Keda) Pause(replica ...int) { + if len(replica) == 0 { + keda.Paused = true + } + if len(replica) > 0 { + keda.PausedWithReplica = true + keda.PausedReplica = replica[0] + } +} + +func (keda *Keda) Resume() { + keda.Paused = false + keda.PausedWithReplica = false +} + +func (keda *Keda) GetHelmValues(cfg Config) (map[string]any, error) { + annotations := make(map[string]string) + if keda.Paused { + annotations[KedaPausedAnnotationKey] = "true" + } + if keda.PausedWithReplica { + annotations[KedaPausedReplicaAnnotationKey] = fmt.Sprint(keda.PausedReplica) + } + + var triggers []map[string]any + for _, trigger := range keda.Triggers { + renderedMetadata, err := renderTpl(trigger.Metadata, cfg.EnvVariables) + if err != nil { + return nil, err + } + trigger.Metadata = renderedMetadata + triggers = append(triggers, map[string]any{ + "type": trigger.Type, + "metadata": trigger.Metadata, + "authenticationRef": trigger.AuthenticationRef, + }) + } + + var hpa map[string]any + if keda.HPA != nil { + var scaleUpPolicy []map[string]any + for _, policy := range keda.HPA.ScaleUp.Policies { + scaleUpPolicy = append(scaleUpPolicy, map[string]any{ + "type": policy.Type, + "value": policy.Value, + "periodSeconds": policy.PeriodSeconds, + }) + } + + var scaleDownPolicy []map[string]any + for _, policy := range keda.HPA.ScaleUp.Policies { + scaleDownPolicy = append(scaleDownPolicy, map[string]any{ + "type": policy.Type, + "value": policy.Value, + "periodSeconds": policy.PeriodSeconds, + }) + } + + hpa = map[string]any{ + "scaleUp": map[string]any{ + "policies": scaleUpPolicy, + "stabilizationWindowSeconds": keda.HPA.ScaleUp.StabilizationWindowSeconds, + "tolerance": keda.HPA.ScaleUp.Tolerance, + }, + "scaleDown": map[string]any{ + "policies": scaleDownPolicy, + "stabilizationWindowSeconds": keda.HPA.ScaleDown.StabilizationWindowSeconds, + "tolerance": keda.HPA.ScaleDown.Tolerance, + }, + } + } + + var fallback map[string]any + if keda.Fallback != nil { + fallback = map[string]any{ + "behavior": keda.Fallback.Behavior, + "failureThreshold": keda.Fallback.FailureThreshold, + "replicas": keda.Fallback.Replicas, + } + } + + return map[string]any{ + "annotations": annotations, + "maxReplicaCount": keda.MaxReplicas, + "minReplicaCount": keda.MinReplicas, + "pollingInterval": keda.PollingInterval, + "cooldownPeriod": keda.CooldownPeriod, + "restoreToOriginalReplicaCount": keda.RestoreToOriginalReplica, + "fallback": fallback, + "triggers": triggers, + "hpa": hpa, + }, nil +} + +func (keda *Keda) updateTriggersMetadata(cfg map[string]string) error { + for key, trigger := range keda.Triggers { + switch trigger.Type { + case KAFKA: + if _, ok := cfg[confKeyConsumerID]; ok { + trigger.Metadata["consumerGroup"] = cfg[confKeyConsumerID] + } + if _, ok := cfg[confKeyKafkaTopic]; ok { + trigger.Metadata["topic"] = cfg[confKeyKafkaTopic] + } + if _, ok := cfg[confKeyKafkaBrokers]; ok { + trigger.Metadata["brokers"] = cfg[confKeyKafkaBrokers] + } + } + keda.Triggers[key] = trigger + } + return nil +} + +func deepCopyTriggers(src map[string]Trigger) map[string]Trigger { + dst := make(map[string]Trigger, len(src)) + for k, v := range src { + newMetadata := make(map[string]string, len(v.Metadata)) + for mk, mv := range v.Metadata { + newMetadata[mk] = mv + } + var newAuthRef *AuthenticationRef + if v.AuthenticationRef != nil { + newAuthRef = &AuthenticationRef{ + Name: v.AuthenticationRef.Name, + Kind: v.AuthenticationRef.Kind, + } + } + dst[k] = Trigger{ + Type: v.Type, + Metadata: newMetadata, + AuthenticationRef: newAuthRef, + } + } + return dst +} diff --git a/modules/firehose/module.go b/modules/firehose/module.go index 77eec61e..7233ccd7 100644 --- a/modules/firehose/module.go +++ b/modules/firehose/module.go @@ -117,6 +117,13 @@ var Module = module.Descriptor{ return pod.Status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil }) }, + kubeGetDeployment: func(ctx context.Context, conf kube.Config, ns, name string) (kube.Deployment, error) { + kubeCl, err := kube.NewClient(ctx, conf) + if err != nil { + return kube.Deployment{}, errors.ErrInternal.WithMsgf("failed to create new kube client on firehose driver kube get deployment").WithCausef(err.Error()) + } + return kubeCl.GetDeploymentDetails(ctx, ns, name) + }, consumerReset: consumerReset, }, nil }, diff --git a/modules/firehose/schema/config.json b/modules/firehose/schema/config.json index 510f4a82..2d844fc5 100644 --- a/modules/firehose/schema/config.json +++ b/modules/firehose/schema/config.json @@ -65,6 +65,24 @@ "type": "string" } } + }, + "autoscaler": { + "type": "object", + "properties": { + "enabled": { + "type": "boolean", + "default": false + }, + "type": { + "type": "string", + "enum": [ + "keda" + ] + }, + "spec": { + "type": "object" + } + } } } } diff --git a/pkg/kube/client.go b/pkg/kube/client.go index b4c15b2e..7420be04 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -14,6 +14,7 @@ import ( "github.com/mitchellh/mapstructure" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + k8s_errors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" @@ -76,6 +77,15 @@ type LogOptions struct { TailLines string `mapstructure:"tail_lines"` } +type Deployment struct { + Name string `json:"name"` + Paused bool `json:"paused"` + ReadyReplicas int `json:"ready_replicas"` + AvailableReplicas int `json:"available_replicas"` + UnavailableReplicas int `json:"unavailable_replicas"` + Conditions []map[string]string `json:"conditions"` +} + func (l LogOptions) getPodListOptions() (metav1.ListOptions, error) { labelSelector := labels.NewSelector() fieldSelector := fields.Everything() @@ -390,3 +400,46 @@ func streamContainerLogs(ctx context.Context, ns, podName string, logCh chan<- L } } } + +func (c Client) GetDeploymentDetails(ctx context.Context, namespace string, name string) (Deployment, error) { + clientSet, err := kubernetes.NewForConfig(&c.restConfig) + if err != nil { + return Deployment{}, err + } + + deployment, err := clientSet.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{}) + if k8s_errors.IsNotFound(err) { + return Deployment{}, nil + } + + if err != nil { + return Deployment{}, err + } + + d := Deployment{ + Name: deployment.Name, + Paused: deployment.Spec.Paused, + ReadyReplicas: int(deployment.Status.ReadyReplicas), + AvailableReplicas: int(deployment.Status.AvailableReplicas), + UnavailableReplicas: int(deployment.Status.UnavailableReplicas), + } + + if deployment.Status.Conditions != nil { + d.Conditions = make([]map[string]string, 0, len(deployment.Status.Conditions)) + for _, condition := range deployment.Status.Conditions { + if condition.Status == corev1.ConditionUnknown || condition.Status == corev1.ConditionFalse { + continue + } + + condMap := map[string]string{ + "type": string(condition.Type), + "status": string(condition.Status), + "reason": condition.Reason, + "message": condition.Message, + } + d.Conditions = append(d.Conditions, condMap) + } + } + + return d, nil +} From 84425bda1d36b43f1cdcb7fa805863e4dbe7d703 Mon Sep 17 00:00:00 2001 From: Femi Novia Lina Date: Mon, 11 Aug 2025 14:33:15 +0700 Subject: [PATCH 2/3] fix: firehose keda autoscaler config --- modules/firehose/keda.go | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/modules/firehose/keda.go b/modules/firehose/keda.go index ed3cb824..e630f54b 100644 --- a/modules/firehose/keda.go +++ b/modules/firehose/keda.go @@ -17,6 +17,10 @@ const ( const ( KedaPausedAnnotationKey = "autoscaling.keda.sh/paused" KedaPausedReplicaAnnotationKey = "autoscaling.keda.sh/paused-replicas" + + KedaKafkaMetadataBootstrapServersKey = "bootstrapServers" + KedaKafkaMetadataTopicKey = "topic" + KedaKafkaMetadataConsumerGroupKey = "consumerGroup" ) type Keda struct { @@ -34,9 +38,9 @@ type Keda struct { } type Trigger struct { - Type Scaler `json:"type,omitempty"` - Metadata map[string]string `json:"metadata,omitempty"` - AuthenticationRef *AuthenticationRef `json:"authentication_ref,omitempty"` + Type Scaler `json:"type,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` + AuthenticationRef AuthenticationRef `json:"authentication_ref,omitempty"` } type AuthenticationRef struct { @@ -74,7 +78,7 @@ func (keda *Keda) ReadConfig(cfg Config, driverCfg driverConf) error { for key, trigger := range keda.Triggers { if existingTrigger, exists := mergedTriggers[key]; exists { maps.Copy(existingTrigger.Metadata, trigger.Metadata) - if trigger.AuthenticationRef != nil && trigger.AuthenticationRef.Name != "" { + if trigger.AuthenticationRef.Name != "" { existingTrigger.AuthenticationRef = trigger.AuthenticationRef } existingTrigger.Type = trigger.Type @@ -153,9 +157,12 @@ func (keda *Keda) GetHelmValues(cfg Config) (map[string]any, error) { } trigger.Metadata = renderedMetadata triggers = append(triggers, map[string]any{ - "type": trigger.Type, - "metadata": trigger.Metadata, - "authenticationRef": trigger.AuthenticationRef, + "type": trigger.Type, + "metadata": trigger.Metadata, + "authenticationRef": map[string]any{ + "name": trigger.AuthenticationRef.Name, + "kind": trigger.AuthenticationRef.Kind, + }, }) } @@ -220,13 +227,13 @@ func (keda *Keda) updateTriggersMetadata(cfg map[string]string) error { switch trigger.Type { case KAFKA: if _, ok := cfg[confKeyConsumerID]; ok { - trigger.Metadata["consumerGroup"] = cfg[confKeyConsumerID] + trigger.Metadata[KedaKafkaMetadataConsumerGroupKey] = cfg[confKeyConsumerID] } if _, ok := cfg[confKeyKafkaTopic]; ok { - trigger.Metadata["topic"] = cfg[confKeyKafkaTopic] + trigger.Metadata[KedaKafkaMetadataTopicKey] = cfg[confKeyKafkaTopic] } if _, ok := cfg[confKeyKafkaBrokers]; ok { - trigger.Metadata["brokers"] = cfg[confKeyKafkaBrokers] + trigger.Metadata[KedaKafkaMetadataBootstrapServersKey] = cfg[confKeyKafkaBrokers] } } keda.Triggers[key] = trigger @@ -241,13 +248,12 @@ func deepCopyTriggers(src map[string]Trigger) map[string]Trigger { for mk, mv := range v.Metadata { newMetadata[mk] = mv } - var newAuthRef *AuthenticationRef - if v.AuthenticationRef != nil { - newAuthRef = &AuthenticationRef{ - Name: v.AuthenticationRef.Name, - Kind: v.AuthenticationRef.Kind, - } + + newAuthRef := AuthenticationRef{ + Name: v.AuthenticationRef.Name, + Kind: v.AuthenticationRef.Kind, } + dst[k] = Trigger{ Type: v.Type, Metadata: newMetadata, From fd97ba4c982c7f7798dfe6db4562b7e9a2194006 Mon Sep 17 00:00:00 2001 From: Femi Novia Lina Date: Tue, 12 Aug 2025 15:28:12 +0700 Subject: [PATCH 3/3] fix: firehose keda autoscaler config --- modules/firehose/keda.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/firehose/keda.go b/modules/firehose/keda.go index e630f54b..7a02cec9 100644 --- a/modules/firehose/keda.go +++ b/modules/firehose/keda.go @@ -178,7 +178,7 @@ func (keda *Keda) GetHelmValues(cfg Config) (map[string]any, error) { } var scaleDownPolicy []map[string]any - for _, policy := range keda.HPA.ScaleUp.Policies { + for _, policy := range keda.HPA.ScaleDown.Policies { scaleDownPolicy = append(scaleDownPolicy, map[string]any{ "type": policy.Type, "value": policy.Value,