Skip to content

feat: enable firehose keda autoscaler #140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions modules/firehose/autoscaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package firehose

import (
"encoding/json"

"github.com/goto/entropy/pkg/errors"
)

type AutoscalerType string

const (
KEDA AutoscalerType = "keda"
)

type AutoscalerSpec interface {
ReadConfig(cfg Config, driverConf driverConf) error
Pause(replica ...int)
Resume()
GetHelmValues(cfg Config) (map[string]any, error)
}

type Autoscaler struct {
Enabled bool `json:"enabled"`
Type AutoscalerType `json:"type,omitempty"`
Spec AutoscalerSpec `json:"spec,omitempty"`
}

type FirehoseAutoscaler struct {
Keda Keda `json:"keda,omitempty"`
}

func (autoscaler *Autoscaler) GetHelmValues(cfg Config) (map[string]any, error) {
values := map[string]any{
"enabled": autoscaler.Enabled,
"type": autoscaler.Type,
}

typeValues, err := autoscaler.Spec.GetHelmValues(cfg)
if err != nil {
return nil, err
}
values[string(autoscaler.Type)] = typeValues

return values, nil
}

func (autoscaler *Autoscaler) UnmarshalJSON(data []byte) error {
type BaseAutoscaler Autoscaler
autoscalerTemp := &struct {
Spec json.RawMessage `json:"spec"`
*BaseAutoscaler
}{
BaseAutoscaler: (*BaseAutoscaler)(autoscaler),
}

if err := json.Unmarshal(data, &autoscalerTemp); err != nil {
return errors.ErrInvalid.WithMsgf("invalid autoscaler config").WithCausef(err.Error())
}

switch autoscalerTemp.Type {
case KEDA:
var kedaSpec *Keda
if err := json.Unmarshal(autoscalerTemp.Spec, &kedaSpec); err != nil {
return errors.ErrInvalid.WithMsgf("invalid keda config").WithCausef(err.Error())
}
autoscaler.Spec = kedaSpec
default:
return errors.ErrInvalid.WithMsgf("unsupported autoscaler type: %s", autoscaler.Type)
}
return nil
}
8 changes: 8 additions & 0 deletions modules/firehose/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const (
confSinkType = "SINK_TYPE"
confKeyConsumerID = "SOURCE_KAFKA_CONSUMER_GROUP_ID"
confKeyKafkaBrokers = "SOURCE_KAFKA_BROKERS"
confKeyKafkaTopic = "SOURCE_KAFKA_TOPIC"
)

const helmReleaseNameMaxLength = 53
Expand Down Expand Up @@ -65,6 +66,7 @@ type Config struct {
Telegraf *Telegraf `json:"telegraf,omitempty"`
ChartValues *ChartValues `json:"chart_values,omitempty"`
InitContainer InitContainer `json:"init_container,omitempty"`
Autoscaler *Autoscaler `json:"autoscaler,omitempty"`
}

type Telegraf struct {
Expand Down Expand Up @@ -131,5 +133,11 @@ func readConfig(r resource.Resource, confJSON json.RawMessage, dc driverConf) (*
cfg.Namespace = ns
}

if cfg.Autoscaler != nil && cfg.Autoscaler.Enabled {
if err := cfg.Autoscaler.Spec.ReadConfig(cfg, dc); err != nil {
return nil, err
}
}

return &cfg, nil
}
34 changes: 23 additions & 11 deletions modules/firehose/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,19 @@ var defaultDriverConf = driverConf{
}

type firehoseDriver struct {
timeNow func() time.Time
conf driverConf
kubeDeploy kubeDeployFn
kubeGetPod kubeGetPodFn
consumerReset consumerResetFn
timeNow func() time.Time
conf driverConf
kubeDeploy kubeDeployFn
kubeGetPod kubeGetPodFn
kubeGetDeployment kubeGetDeploymentFn
consumerReset consumerResetFn
}

type (
kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error
kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error)
consumerResetFn func(ctx context.Context, conf Config, out kubernetes.Output, resetTo string, offsetResetDelaySeconds int) error
kubeDeployFn func(ctx context.Context, isCreate bool, conf kube.Config, hc helm.ReleaseConfig) error
kubeGetPodFn func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error)
kubeGetDeploymentFn func(ctx context.Context, conf kube.Config, ns string, name string) (kube.Deployment, error)
consumerResetFn func(ctx context.Context, conf Config, out kubernetes.Output, resetTo string, offsetResetDelaySeconds int) error
)

type driverConf struct {
Expand Down Expand Up @@ -130,6 +132,8 @@ type driverConf struct {

// timeout value for a kube deployment run
KubeDeployTimeout int `json:"kube_deploy_timeout_seconds"`

Autoscaler FirehoseAutoscaler `json:"autoscaler,omitempty"`
}

type RequestsAndLimits struct {
Expand Down Expand Up @@ -170,9 +174,10 @@ type UsageSpec struct {
}

type Output struct {
Pods []kube.Pod `json:"pods,omitempty"`
Namespace string `json:"namespace,omitempty"`
ReleaseName string `json:"release_name,omitempty"`
Pods []kube.Pod `json:"pods,omitempty"`
Namespace string `json:"namespace,omitempty"`
ReleaseName string `json:"release_name,omitempty"`
Deployment *kube.Deployment `json:"deployment,omitempty"`
}

type transientData struct {
Expand Down Expand Up @@ -361,6 +366,13 @@ func (fd *firehoseDriver) getHelmRelease(res resource.Resource, conf Config,
"mountSecrets": mountSecrets,
}

if conf.Autoscaler != nil {
rc.Values["autoscaler"], err = conf.Autoscaler.GetHelmValues(conf)
if err != nil {
return nil, err
}
}

return rc, nil
}

Expand Down
6 changes: 6 additions & 0 deletions modules/firehose/driver_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,11 @@ func (fd *firehoseDriver) refreshOutput(ctx context.Context, r resource.Resource
output.Pods = pods
output.Namespace = conf.Namespace

deployment, err := fd.kubeGetDeployment(ctx, kubeOut.Configs, rc.Namespace, conf.DeploymentID)
if err != nil {
return nil, errors.ErrInternal.WithCausef(err.Error())
}
output.Deployment = &deployment

return modules.MustJSON(output), nil
}
147 changes: 142 additions & 5 deletions modules/firehose/driver_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ func TestFirehoseDriver_Output(t *testing.T) {
t.Parallel()

table := []struct {
title string
kubeGetPod func(t *testing.T) kubeGetPodFn
exr module.ExpandedResource
want json.RawMessage
wantErr error
title string
kubeGetPod func(t *testing.T) kubeGetPodFn
kubeGetDeployment func(t *testing.T) kubeGetDeploymentFn
exr module.ExpandedResource
want json.RawMessage
wantErr error
}{
{
title: "InvalidModuleData",
Expand Down Expand Up @@ -109,13 +110,123 @@ func TestFirehoseDriver_Output(t *testing.T) {
}, nil
}
},
kubeGetDeployment: func(t *testing.T) kubeGetDeploymentFn {
t.Helper()
return func(ctx context.Context, conf kube.Config, ns string, name string) (kube.Deployment, error) {
assert.Equal(t, ns, "firehose")
return kube.Deployment{
Name: "foo-bar",
Paused: false,
ReadyReplicas: 1,
AvailableReplicas: 1,
UnavailableReplicas: 2,
Conditions: []map[string]string{},
}, nil
}
},
want: modules.MustJSON(Output{
Pods: []kube.Pod{
{
Name: "foo-1",
Containers: []string{"firehose"},
},
},
Deployment: &kube.Deployment{
Name: "foo-bar",
Paused: false,
ReadyReplicas: 1,
AvailableReplicas: 1,
UnavailableReplicas: 2,
Conditions: []map[string]string{},
},
Namespace: "firehose",
ReleaseName: "foo-bar",
}),
},
{
title: "GetDeployment_Failure",
exr: sampleResourceWithState(resource.State{
Status: resource.StatusCompleted,
Output: modules.MustJSON(Output{
Pods: nil,
Namespace: "firehose",
ReleaseName: "foo-bar",
}),
}, "LOG", "firehose"),
kubeGetPod: func(t *testing.T) kubeGetPodFn {
t.Helper()
return func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) {
assert.Equal(t, ns, "firehose")
assert.Equal(t, labels["app"], "firehose-foo-fh1")
return []kube.Pod{
{
Name: "foo-1",
Containers: []string{"firehose"},
},
}, nil
}
},
kubeGetDeployment: func(t *testing.T) kubeGetDeploymentFn {
t.Helper()
return func(ctx context.Context, conf kube.Config, ns string, name string) (kube.Deployment, error) {
assert.Equal(t, ns, "firehose")
return kube.Deployment{}, errors.New("failed")
}
},
wantErr: errors.ErrInternal,
},
{
title: "GetDeployment_Success",
exr: sampleResourceWithState(resource.State{
Status: resource.StatusCompleted,
Output: modules.MustJSON(Output{
Pods: nil,
Namespace: "firehose",
ReleaseName: "foo-bar",
}),
}, "LOG", "firehose"),
kubeGetPod: func(t *testing.T) kubeGetPodFn {
t.Helper()
return func(ctx context.Context, conf kube.Config, ns string, labels map[string]string) ([]kube.Pod, error) {
assert.Equal(t, ns, "firehose")
assert.Equal(t, labels["app"], "firehose-foo-fh1")
return []kube.Pod{
{
Name: "foo-1",
Containers: []string{"firehose"},
},
}, nil
}
},
kubeGetDeployment: func(t *testing.T) kubeGetDeploymentFn {
t.Helper()
return func(ctx context.Context, conf kube.Config, ns string, name string) (kube.Deployment, error) {
assert.Equal(t, ns, "firehose")
return kube.Deployment{
Name: "foo-bar",
Paused: false,
ReadyReplicas: 1,
AvailableReplicas: 1,
UnavailableReplicas: 2,
Conditions: []map[string]string{},
}, nil
}
},
want: modules.MustJSON(Output{
Pods: []kube.Pod{
{
Name: "foo-1",
Containers: []string{"firehose"},
},
},
Deployment: &kube.Deployment{
Name: "foo-bar",
Paused: false,
ReadyReplicas: 1,
AvailableReplicas: 1,
UnavailableReplicas: 2,
Conditions: []map[string]string{},
},
Namespace: "firehose",
ReleaseName: "foo-bar",
}),
Expand Down Expand Up @@ -143,13 +254,35 @@ func TestFirehoseDriver_Output(t *testing.T) {
}, nil
}
},
kubeGetDeployment: func(t *testing.T) kubeGetDeploymentFn {
t.Helper()
return func(ctx context.Context, conf kube.Config, ns string, name string) (kube.Deployment, error) {
assert.Equal(t, ns, "bigquery-firehose")
return kube.Deployment{
Name: "foo-bar",
Paused: false,
ReadyReplicas: 1,
AvailableReplicas: 1,
UnavailableReplicas: 2,
Conditions: []map[string]string{},
}, nil
}
},
want: modules.MustJSON(Output{
Pods: []kube.Pod{
{
Name: "foo-1",
Containers: []string{"firehose"},
},
},
Deployment: &kube.Deployment{
Name: "foo-bar",
Paused: false,
ReadyReplicas: 1,
AvailableReplicas: 1,
UnavailableReplicas: 2,
Conditions: []map[string]string{},
},
Namespace: "bigquery-firehose",
ReleaseName: "foo-bar",
}),
Expand All @@ -172,6 +305,10 @@ func TestFirehoseDriver_Output(t *testing.T) {
fd.kubeGetPod = tt.kubeGetPod(t)
}

if tt.kubeGetDeployment != nil {
fd.kubeGetDeployment = tt.kubeGetDeployment(t)
}

got, err := fd.Output(context.Background(), tt.exr)
if tt.wantErr != nil {
require.Error(t, err)
Expand Down
Loading
Loading