Skip to content

Commit d731692

Browse files
author
Cody Lee
committed
merges upstream fixes for gradient2 warmup and drift
1 parent f8fe453 commit d731692

File tree

9 files changed

+127
-15
lines changed

9 files changed

+127
-15
lines changed

core/limit.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type MeasurementInterface interface {
1616
// Reset the internal state as if no samples were ever added.
1717
Reset()
1818

19-
// OnSample will update the value given an operation function
19+
// Update will update the value given an operation function
2020
Update(operation func(value float64) float64)
2121
}
2222

limit/gradient2.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func NewDefaultGradient2Limit(
5454
5,
5555
10,
5656
100,
57+
nil,
5758
logger,
5859
registry,
5960
tags...,
@@ -84,6 +85,7 @@ func NewGradient2Limit(
8485
driftMultiplier float64,
8586
shortWindow int,
8687
longWindow int,
88+
longWindowWarmupFunc measurements.ExponentialWarmUpFunction,
8789
logger Logger,
8890
registry core.MetricRegistry,
8991
tags ...string,
@@ -130,8 +132,8 @@ func NewGradient2Limit(
130132
minLimit: minLimit,
131133
queueSizeFunc: queueSizeFunc,
132134
smoothing: smoothing,
133-
shortRTT: measurements.NewExponentialAverageMeasurement(shortWindow, 10),
134-
longRTT: measurements.NewExponentialAverageMeasurement(longWindow, 10),
135+
shortRTT: &measurements.SingleMeasurement{},
136+
longRTT: measurements.NewExponentialAverageMeasurement(longWindow, 10, longWindowWarmupFunc),
135137
maxDriftIntervals: int(float64(shortWindow) * driftMultiplier),
136138
longRTTSampleListener: registry.RegisterDistribution(core.PrefixMetricWithName(core.MetricMinRTT, name), tags...),
137139
shortRTTSampleListener: registry.RegisterDistribution(core.PrefixMetricWithName(core.MetricWindowMinRTT, name), tags...),
@@ -199,6 +201,11 @@ func (l *Gradient2Limit) OnSample(startTime int64, rtt int64, inFlight int, didD
199201
}
200202
} else {
201203
l.intervalsAbove = 0
204+
currentLongRTT := l.longRTT.Get()
205+
currentShortRTT := l.shortRTT.Get()
206+
l.longRTT.Update(func(_ float64) float64 {
207+
return (currentLongRTT + currentShortRTT) / 2
208+
})
202209
}
203210

204211
l.shortRTTSampleListener.AddSample(shortRTT)

limit/gradient2_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func TestGradient2Limit(t *testing.T) {
3434
-1,
3535
-1,
3636
-1,
37+
nil,
3738
NoopLimitLogger{},
3839
core.EmptyMetricRegistryInstance,
3940
)
@@ -46,7 +47,7 @@ func TestGradient2Limit(t *testing.T) {
4647
l.OnSample(0, 10, 1, false)
4748
asrt.Equal(50, l.EstimatedLimit())
4849

49-
for i := 0; i < 59; i++ {
50+
for i := 0; i < 51; i++ {
5051
l.OnSample(int64(i), 100, 1, false)
5152
asrt.Equal(50, l.EstimatedLimit())
5253
}
@@ -64,12 +65,12 @@ func TestGradient2Limit(t *testing.T) {
6465
for i := 0; i < 100; i++ {
6566
l.OnSample(int64(i*10+30), 10, 1, true)
6667
}
67-
asrt.Equal(4, l.EstimatedLimit())
68+
asrt.Equal(6, l.EstimatedLimit())
6869

6970
// slowly grow back up
7071
for i := 0; i < 100; i++ {
7172
l.OnSample(int64(i*10+3030), 1, 5, false)
7273
}
73-
asrt.Equal(12, l.EstimatedLimit())
74+
asrt.Equal(13, l.EstimatedLimit())
7475
})
7576
}

measurements/exponential_average.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
package measurements
22

33
import (
4+
"fmt"
5+
"math"
46
"sync"
57
)
68

9+
// ExponentialWarmUpFunction describes a warmup function
10+
type ExponentialWarmUpFunction func(currentValue, newSampleValue float64) float64
11+
712
// ExponentialAverageMeasurement is an exponential average measurement implementation.
813
type ExponentialAverageMeasurement struct {
914
value float64
10-
sum float64
1115
window int
1216
warmupWindow int
17+
warmupFunc ExponentialWarmUpFunction
1318
count int
1419

1520
mu sync.RWMutex
@@ -19,10 +24,18 @@ type ExponentialAverageMeasurement struct {
1924
func NewExponentialAverageMeasurement(
2025
window int,
2126
warmupWindow int,
27+
warmupFunc func(currentValue, newSampleValue float64) float64,
2228
) *ExponentialAverageMeasurement {
29+
if warmupFunc == nil {
30+
warmupFunc = ExponentialWarmUpFunction(func(currentValue, newSampleValue float64) float64 {
31+
return math.Min(currentValue, newSampleValue)
32+
})
33+
}
34+
2335
return &ExponentialAverageMeasurement{
2436
window: window,
2537
warmupWindow: warmupWindow,
38+
warmupFunc: warmupFunc,
2639
}
2740
}
2841

@@ -32,12 +45,10 @@ func (m *ExponentialAverageMeasurement) Add(value float64) (float64, bool) {
3245
defer m.mu.Unlock()
3346
if m.count == 0 {
3447
m.count++
35-
m.sum = value
3648
m.value = value
3749
} else if m.count < m.warmupWindow {
3850
m.count++
39-
m.sum += value
40-
m.value = m.sum / float64(m.count)
51+
m.value = m.warmupFunc(m.value, value)
4152
} else {
4253
f := factor(m.window)
4354
m.value = m.value*(1-f) + value*f
@@ -58,7 +69,6 @@ func (m *ExponentialAverageMeasurement) Reset() {
5869
defer m.mu.Unlock()
5970
m.value = 0
6071
m.count = 0
61-
m.sum = 0
6272
}
6373

6474
// Update will update the value given an operation function
@@ -71,3 +81,12 @@ func (m *ExponentialAverageMeasurement) Update(operation func(value float64) flo
7181
func factor(n int) float64 {
7282
return 2.0 / float64(n+1)
7383
}
84+
85+
func (m *ExponentialAverageMeasurement) String() string {
86+
m.mu.RLock()
87+
defer m.mu.RUnlock()
88+
return fmt.Sprintf(
89+
"ExponentialAverageMeasurement{value=%0.5f, count=%d, window=%d, warmupWindow=%d}",
90+
m.value, m.count, m.window, m.warmupWindow,
91+
)
92+
}

measurements/exponential_average_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,27 @@ import (
99
func TestExponentialAverageMeasurement(t *testing.T) {
1010
t.Parallel()
1111
asrt := assert.New(t)
12-
m := NewExponentialAverageMeasurement(100, 10)
12+
m := NewExponentialAverageMeasurement(100, 10, nil)
1313

14-
expected := []float64{10, 10.5, 11, 11.5, 12, 12.5, 13, 13.5, 14, 14.5}
14+
expected := []float64{10, 10, 10, 10, 10, 10, 10, 10, 10, 10}
1515
for i := 0; i < 10; i++ {
1616
result, _ := m.Add(float64(i + 10))
1717
asrt.Equal(expected[i], result)
1818
}
1919

2020
m.Add(100)
21-
asrt.InDelta(float64(16.19), m.Get(), 0.01)
21+
asrt.InDelta(float64(11.78), m.Get(), 0.01)
2222

2323
m.Update(func(value float64) float64 {
2424
return value - 1
2525
})
26-
asrt.InDelta(float64(15.19), m.Get(), 0.01)
26+
asrt.InDelta(float64(10.78), m.Get(), 0.01)
2727

2828
m.Reset()
2929
asrt.Equal(float64(0), m.Get())
30+
31+
asrt.Equal(
32+
"ExponentialAverageMeasurement{value=0.00000, count=0, window=100, warmupWindow=10}",
33+
m.String(),
34+
)
3035
}

measurements/minimum.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package measurements
22

33
import (
4+
"fmt"
45
"sync"
56
)
67

@@ -42,3 +43,9 @@ func (m *MinimumMeasurement) Update(operation func(value float64) float64) {
4243
m.mu.RUnlock()
4344
m.Add(operation(current))
4445
}
46+
47+
func (m *MinimumMeasurement) String() string {
48+
m.mu.RLock()
49+
defer m.mu.RUnlock()
50+
return fmt.Sprintf("MinimumMeasurement{value=%0.5f}", m.value)
51+
}

measurements/minimum_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@ func TestMinimumMeasurement(t *testing.T) {
2121
return value - 1
2222
})
2323
asrt.Equal(float64(-1.0), m.Get())
24+
25+
asrt.Equal("MinimumMeasurement{value=-1.00000}", m.String())
2426
}

measurements/single.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package measurements
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
)
7+
8+
// SingleMeasurement only keeps the latest value used.
9+
type SingleMeasurement struct {
10+
value float64
11+
mu sync.RWMutex
12+
}
13+
14+
// Add a single sample and update the internal state.
15+
func (m *SingleMeasurement) Add(value float64) (float64, bool) {
16+
m.mu.Lock()
17+
oldValue := float64(m.value)
18+
m.value = value
19+
m.mu.Unlock()
20+
return oldValue, true
21+
}
22+
23+
// Get the current value.
24+
func (m *SingleMeasurement) Get() float64 {
25+
m.mu.RLock()
26+
defer m.mu.RUnlock()
27+
return m.value
28+
}
29+
30+
// Reset the internal state as if no samples were ever added.
31+
func (m *SingleMeasurement) Reset() {
32+
m.mu.Lock()
33+
m.value = 0
34+
m.mu.Unlock()
35+
}
36+
37+
// Update will update the value given an operation function
38+
func (m *SingleMeasurement) Update(operation func(value float64) float64) {
39+
m.mu.Lock()
40+
m.value = operation(m.value)
41+
m.mu.Unlock()
42+
}
43+
44+
func (m *SingleMeasurement) String() string {
45+
m.mu.RLock()
46+
defer m.mu.RUnlock()
47+
return fmt.Sprintf("SingleMeasurement{value=%0.5f}", m.value)
48+
}

measurements/single_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package measurements
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestSingleMeasurement(t *testing.T) {
10+
asrt := assert.New(t)
11+
m := SingleMeasurement{}
12+
asrt.Equal(float64(0.0), m.Get())
13+
14+
m.Add(1)
15+
m.Add(2)
16+
asrt.Equal(float64(2.0), m.Get())
17+
18+
m.Update(func(value float64) float64 {
19+
return value * 2
20+
})
21+
asrt.Equal(float64(4.0), m.Get())
22+
asrt.Equal("SingleMeasurement{value=4.00000}", m.String())
23+
}

0 commit comments

Comments
 (0)