-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[WIP] hitless #3447
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: ndyakov/CAE-1088-resp3-notification-handlers
Are you sure you want to change the base?
[WIP] hitless #3447
Conversation
49e5814
to
43aef14
Compare
8608f93
to
a39a23a
Compare
e88e673
to
4542e8f
Compare
9590c26
to
100c3d2
Compare
100c3d2
to
de641c5
Compare
// Enabled controls whether hitless upgrades are enabled. | ||
// Requires RESP3 protocol for push notifications. | ||
// Default: false | ||
Enabled bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be enabled by default?
// PostHandoffRelaxedDuration is how long to keep relaxed timeouts on the new connection | ||
// after a handoff completes. This provides additional resilience during cluster transitions. | ||
// Default: 10 seconds | ||
PostHandoffRelaxedDuration time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default should be RelaxedTimeoutDuration*2 ?
func (c *Config) applyWorkerDefaults(poolSize int) { | ||
// Calculate defaults based on pool size | ||
if poolSize <= 0 { | ||
poolSize = 100 // Default assumption if pool size unknown |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If pool size is unknown, use the same calculation for poolsize that is currently done in the pool, i.e. // default: 10 * runtime.GOMAXPROCS(0)
if isPrivate { | ||
endpointType = "internal-fqdn" | ||
} else { | ||
endpointType = "external-fqdn" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if currently fqdn is used, isPrivate
will most likely be false
|
||
// Register handlers for all hitless upgrade notifications with the client's processor | ||
if err := processor.RegisterHandler("MOVING", handler, true); err != nil { | ||
return err | ||
} | ||
if err := processor.RegisterHandler("MIGRATING", handler, true); err != nil { | ||
return err | ||
} | ||
if err := processor.RegisterHandler("MIGRATED", handler, true); err != nil { | ||
return err | ||
} | ||
if err := processor.RegisterHandler("FAILING_OVER", handler, true); err != nil { | ||
return err | ||
} | ||
if err := processor.RegisterHandler("FAILED_OVER", handler, true); err != nil { | ||
return err | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe rewrite in a loop? Try couple of different approaches and decide which works the best.
} | ||
|
||
// StartMovingOperationWithConnID starts a new MOVING operation with a specific connection ID. | ||
func (hm *HitlessManager) StartMovingOperationWithConnID(ctx context.Context, newEndpoint string, deadline time.Time, seqID int64, connID uint64) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initially this was starting the moving operation, but right now is just inserting a record for us to track the moving operation. It is only fair to actually rename the method to something like TrackMovingOperationWithConnID
which will be more aligned with the fact that this method does actually nothing related to the actual handof.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or RecordMovingOperationWithConnID
if _, exists := hm.activeMovingOps[key]; exists { | ||
// Duplicate MOVING notification, ignore | ||
internal.Logger.Printf(ctx, "Duplicate MOVING operation ignored: %s", key.String()) | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add more comments why this is ignored and doesn't actually return an error. this should not happen in normal operation, i think...
// Remove from active operations | ||
delete(hm.activeMovingOps, key) | ||
|
||
// Note: Atomic state updates removed (per-connection handoffs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✂️
// GetActiveMovingOperations returns active operations with composite keys. | ||
func (hm *HitlessManager) GetActiveMovingOperations() map[MovingOperationKey]*MovingOperation { | ||
hm.mu.RLock() | ||
defer hm.mu.RUnlock() | ||
|
||
result := make(map[MovingOperationKey]*MovingOperation) | ||
for key, op := range hm.activeMovingOps { | ||
result[key] = &MovingOperation{ | ||
SeqID: op.SeqID, | ||
NewEndpoint: op.NewEndpoint, | ||
StartTime: op.StartTime, | ||
Deadline: op.Deadline, | ||
} | ||
} | ||
return result | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not used and I would vote to remove it if it won't be needed in the final implementation. I can see how it can be beneficial for observability but we can find think about another way of logging without locking and copying
// GetCurrentState returns current state based on active operations. | ||
func (hm *HitlessManager) GetCurrentState() State { | ||
hm.mu.RLock() | ||
defer hm.mu.RUnlock() | ||
|
||
if len(hm.activeMovingOps) > 0 { | ||
return StateMoving | ||
} | ||
return StateIdle | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
atomic flag instead of lock?
// IsHandoffInProgress returns true if any handoff is in progress. | ||
func (hm *HitlessManager) IsHandoffInProgress() bool { | ||
hm.mu.RLock() | ||
defer hm.mu.RUnlock() | ||
return len(hm.activeMovingOps) > 0 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
atomic flag.
No description provided.