Skip to content

fix: use notifyContext to manage the operator exit #2463

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 11 additions & 19 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package controller

import (
"context"
"flag"
"fmt"
"os"
Expand Down Expand Up @@ -79,8 +80,10 @@ func StartOperator(kubeconfig string) {
_ = stsv1beta1.AddToScheme(scheme.Scheme)
_ = stsv1alpha1.AddToScheme(scheme.Scheme)
klog.Info("Starting MinIO Operator")

// set up signals, so we handle the first shutdown signal gracefully
stopCh := setupSignalHandler()
ctx, cancel := setupSignalHandler(context.Background())
defer cancel()

flag.Parse()

Expand Down Expand Up @@ -170,38 +173,27 @@ func StartOperator(kubeconfig string) {
kubeInformerFactoryInOperatorNamespace,
)

go kubeInformerFactory.Start(stopCh)
go minioInformerFactory.Start(stopCh)
go kubeInformerFactoryInOperatorNamespace.Start(stopCh)
go kubeInformerFactory.Start(ctx.Done())
go minioInformerFactory.Start(ctx.Done())
go kubeInformerFactoryInOperatorNamespace.Start(ctx.Done())

if err = mainController.Start(2, stopCh); err != nil {
if err = mainController.Start(ctx, cancel, 2); err != nil {
klog.Fatalf("Error running mainController: %s", err.Error())
}

<-stopCh
<-ctx.Done()
klog.Info("Shutting down the MinIO Operator")
mainController.Stop()
}

// setupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func setupSignalHandler() (stopCh <-chan struct{}) {
func setupSignalHandler(ctx context.Context) (context.Context, context.CancelFunc) {
// panics when called twice
close(onlyOneSignalHandler)

stop := make(chan struct{})
c := make(chan os.Signal, 2)
signal.Notify(c, shutdownSignals...)
go func() {
<-c
close(stop)
<-c
// second signal. Exit directly.
os.Exit(1)
}()

return stop
return signal.NotifyContext(ctx, shutdownSignals...)
}

// Result contains the result of a sync invocation.
Expand Down
12 changes: 5 additions & 7 deletions pkg/controller/csr.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"errors"
"fmt"
"io"
"os"
"os/signal"
"syscall"
"time"
Expand Down Expand Up @@ -188,15 +187,14 @@ func (c *Controller) fetchCertificate(ctx context.Context, csrName string) ([]by

timeout := time.NewTimer(miniov2.DefaultQueryTimeout)

ch := make(chan os.Signal, 1) // should be always un-buffered SA1017
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(ch)
ctx, cancel := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer cancel()

for {
select {
case s := <-ch:
klog.Infof("Signal %s received, exiting ...", s.String())
return nil, fmt.Errorf("%s", s.String())
case <-ctx.Done():
klog.Infof("Termination signal received, exiting.")
return nil, errors.New("Termination signal received, exiting.")

case <-tick.C:
if certificates.GetCertificatesAPIVersion(c.kubeClientSet) == certificates.CSRV1 {
Expand Down
42 changes: 11 additions & 31 deletions pkg/controller/main-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ import (
"fmt"
"maps"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

kubeinformers "k8s.io/client-go/informers"
Expand Down Expand Up @@ -376,8 +373,8 @@ func NewController(
}

// StartPodInformer runs PodInformer
func (c *Controller) StartPodInformer(stopCh <-chan struct{}) {
c.podInformer.Run(stopCh)
func (c *Controller) StartPodInformer(ctx context.Context) {
c.podInformer.Run(ctx.Done())
}

// startUpgradeServer Starts the Upgrade tenant API server and notifies the start and stop via notificationChannel returned
Expand Down Expand Up @@ -421,7 +418,7 @@ func (c *Controller) startSTSAPIServer(ctx context.Context, notificationChannel

// leaderRun start the Controller and the API's
// When a new leader is elected this function is ran in the pod
func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-chan struct{}, notificationChannel chan *EventNotification) {
func leaderRun(ctx context.Context, c *Controller, threadiness int, notificationChannel chan *EventNotification) {
// we declate the channel to communicate on servers errors
var upgradeServerChannel <-chan error

Expand All @@ -433,21 +430,21 @@ func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-cha

// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.statefulSetListerSynced, c.deploymentListerSynced, c.tenantsSynced, c.policyBindingListerSynced, c.secretListerSynced); !ok {
if ok := cache.WaitForCacheSync(ctx.Done(), c.statefulSetListerSynced, c.deploymentListerSynced, c.tenantsSynced, c.policyBindingListerSynced, c.secretListerSynced); !ok {
panic("failed to wait for caches to sync")
}

// Launch two workers to process Job resources
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())
}

// Launch a single worker for Health Check reacting to Pod Changes
go wait.Until(c.runHealthCheckWorker, time.Second, stopCh)
go wait.Until(c.runHealthCheckWorker, time.Second, ctx.Done())

// Launch a goroutine to monitor all Tenants
go c.recurrentTenantStatusMonitor(stopCh)
go c.StartPodInformer(stopCh)
go c.recurrentTenantStatusMonitor(ctx)
go c.StartPodInformer(ctx)

// 2) we need to make sure we have STS API certificates (if enabled)
if IsSTSEnabled() {
Expand Down Expand Up @@ -475,7 +472,7 @@ func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-cha
}
// webserver was instructed to stop, do not attempt to restart
continue
case <-stopCh:
case <-ctx.Done():
return
}
}
Expand All @@ -485,23 +482,7 @@ func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-cha
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Start(threadiness int, stopCh <-chan struct{}) error {
// use a Go context so we can tell the leaderelection code when we
// want to step down
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// listen for interrupts or the Linux SIGTERM signal and cancel
// our context, which the leader election code will observe and
// step down
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()

func (c *Controller) Start(ctx context.Context, cancel context.CancelFunc, threadiness int) error {
leaseLockName := "minio-operator-lock"
leaseLockNamespace := miniov2.GetNSFromFile()

Expand Down Expand Up @@ -553,7 +534,7 @@ func (c *Controller) Start(threadiness int, stopCh <-chan struct{}) error {
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// start the controller + API code
leaderRun(ctx, c, threadiness, stopCh, notificationChannel)
leaderRun(ctx, c, threadiness, notificationChannel)
},
OnStoppedLeading: func() {
klog.Infof("leader lost, removing any leader labels that I '%s' might have", c.podName)
Expand All @@ -568,7 +549,6 @@ func (c *Controller) Start(threadiness int, stopCh <-chan struct{}) error {
} else {
c.kubeClientSet.CoreV1().Pods(leaseLockNamespace).Patch(ctx, c.podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{})
}
c.Stop()
cancel()
},
OnNewLeader: func(identity string) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
)

// recurrentTenantStatusMonitor loop that checks every N minutes for tenants health
func (c *Controller) recurrentTenantStatusMonitor(stopCh <-chan struct{}) {
func (c *Controller) recurrentTenantStatusMonitor(ctx context.Context) {
// How often will this function run
interval := miniov2.GetMonitoringInterval()
ticker := time.NewTicker(time.Duration(interval) * time.Minute)
Expand All @@ -53,7 +53,7 @@ func (c *Controller) recurrentTenantStatusMonitor(stopCh <-chan struct{}) {
if err := c.tenantsHealthMonitor(); err != nil {
klog.Infof("%v", err)
}
case <-stopCh:
case <-ctx.Done():
ticker.Stop()
return
}
Expand Down
Loading