Skip to content

Make topKMetrics thread-safe in metrics limiter #1738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ type topKMetrics struct {
metricMap map[string]*MetricData
minMetric *MetricData
sizeLimit int
mutex sync.RWMutex
}

// newTopKMetrics creates a new topKMetrics with a specified size limit.
Expand All @@ -278,6 +279,8 @@ func newTopKMetrics(sizeLimit int) *topKMetrics {

// Push adds a key-value pair to the priority queue. If the value already exists, it updates the frequency.
func (t *topKMetrics) Push(oldMetric, newMetric *MetricData) {
t.mutex.Lock()
defer t.mutex.Unlock()
hashValue := oldMetric.hashKey
if t.minMetric == nil {
t.minMetric = oldMetric
Expand All @@ -290,7 +293,7 @@ func (t *topKMetrics) Push(oldMetric, newMetric *MetricData) {
// Check if this oldMetric is the new minimum, find the new minMetric after the updates
if t.minMetric.hashKey == hashValue {
// Find the new minMetrics after update the frequency
t.minMetric = t.findMinMetric()
t.minMetric = t.findMinMetricLocked()
}
return
}
Expand All @@ -300,7 +303,7 @@ func (t *topKMetrics) Push(oldMetric, newMetric *MetricData) {
if newMetric.frequency > t.minMetric.frequency {
delete(t.metricMap, t.minMetric.hashKey)
t.metricMap[hashValue] = newMetric
t.minMetric = t.findMinMetric()
t.minMetric = t.findMinMetricLocked()
}
} else {
// Check if this newMetric is the new minimum.
Expand All @@ -311,8 +314,9 @@ func (t *topKMetrics) Push(oldMetric, newMetric *MetricData) {
}
}

// findMinMetric removes and returns the key-value pair with the minimum value.
func (t *topKMetrics) findMinMetric() *MetricData {
// findMinMetricLocked removes and returns the key-value pair with the minimum value.
// It assumes the caller already holds the read/write lock.
func (t *topKMetrics) findMinMetricLocked() *MetricData {
// Find the new minimum metric and smallest frequency.
var newMinMetric *MetricData
smallestFrequency := int(^uint(0) >> 1) // Initialize with the maximum possible integer value
Expand All @@ -327,6 +331,8 @@ func (t *topKMetrics) findMinMetric() *MetricData {
}

func (s *service) admitMetricData(metric *MetricData) bool {
s.primaryTopK.mutex.RLock()
defer s.primaryTopK.mutex.RUnlock()
_, found := s.primaryTopK.metricMap[metric.hashKey]
if len(s.primaryTopK.metricMap) < s.primaryTopK.sizeLimit || found {
return true
Expand Down
Loading