Skip to content

Commit 2c8c1cb

Browse files
pstibranypracucci
andauthored
Merge timeseries and chunkseries from ingesters (#3013)
* Merge incoming chunks and timeseries. Signed-off-by: Peter Štibraný <[email protected]> * Added CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]> * Update CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]> * Remove obsolete comments. Signed-off-by: Peter Štibraný <[email protected]> * Mention that disabling streaming is only necessary in Cortex 1.3.0. Signed-off-by: Peter Štibraný <[email protected]> * Added integration test, and fixed panic when mixing chunk-based series and other series. Signed-off-by: Peter Štibraný <[email protected]> * Fix lint warnings. Signed-off-by: Peter Štibraný <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent 821630b commit 2c8c1cb

File tree

6 files changed

+319
-59
lines changed

6 files changed

+319
-59
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
* [ENHANCEMENT] Added `cortex_request_message_bytes` and `cortex_response_message_bytes` histograms to track received and sent gRPC message and HTTP request/response sizes. Added `cortex_inflight_requests` gauge to track number of inflight gRPC and HTTP requests. #3064
2020
* [ENHANCEMENT] Add config validation to the experimental Alertmanager API. Invalid configs are no longer accepted. #3053
2121
* [BUGFIX] Query-frontend: Fixed rounding for incoming query timestamps, to be 100% Prometheus compatible. #2990
22+
* [BUGFIX] Querier: Merge results from chunks and blocks ingesters when using streaming of results. #3013
2223
* [BUGFIX] Querier: query /series from ingesters regardless the `-querier.query-ingesters-within` setting. #3035
2324
* [BUGFIX] Experimental blocks storage: Ingester is less likely to hit gRPC message size limit when streaming data to queriers. #3015
2425
* [BUGFIX] Fix configuration for TLS server validation, TLS skip verify was hardcoded to true for all TLS configurations and prevented validation of server certificates. #3030

docs/blocks-storage/migrate-from-chunks-to-blocks.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ This article **assumes** that:
1818
- Cortex cluster is managed by Kubernetes
1919
- Cortex is using chunks storage
2020
- Ingesters are using WAL
21+
- Cortex version 1.3.0 or later.
2122

2223
_If your ingesters are not using WAL, the documented procedure will still apply, but the presented migration script will not work properly without changes, as it assumes that ingesters are managed via StatefulSet._
2324

@@ -59,9 +60,9 @@ and need to use storage instead.
5960

6061
#### `-querier.ingester-streaming=false`
6162

62-
Querier (and ruler) has a [bug](https://github.com/cortexproject/cortex/issues/2935) and doesn't properly
63-
merge streamed results from chunks and blocks-based ingesters. Instead it only returns data from blocks- instesters.
64-
To avoid this problem, we need to temporarily disable this feature by setting `-querier.ingester-streaming=false`.
63+
Querier (and ruler) in Cortex version 1.3.0 has a [bug](https://github.com/cortexproject/cortex/issues/2935) and doesn't properly
64+
merge streamed results from chunks and blocks-based ingesters. Instead it only returns data from blocks instesters.
65+
To avoid this problem we can use newer Cortex release, or temporarily disable this feature by setting `-querier.ingester-streaming=false`.
6566
After migration is complete (i.e. all ingesters are running blocks only), this can be turned back to true, which is the default value.
6667

6768
### Query-frontend
@@ -132,7 +133,7 @@ This flag can be set to a timestamp when migration has finished, and it avoids q
132133

133134
#### `-querier.ingester-streaming=true`
134135

135-
Querier can be configured to make use of streamed responses from ingester at this point (`-querier.ingester-streaming=true`).
136+
If querier was configured to disable ingester streaming during migration (required for Cortex 1.3.0), Querier can be configured to make use of streamed responses from ingester at this point (`-querier.ingester-streaming=true`).
136137

137138
## Rollback
138139

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// +build requires_docker
2+
3+
package main
4+
5+
import (
6+
"context"
7+
"flag"
8+
"strings"
9+
"testing"
10+
"time"
11+
12+
"github.com/prometheus/common/model"
13+
"github.com/prometheus/prometheus/pkg/labels"
14+
"github.com/stretchr/testify/require"
15+
"github.com/weaveworks/common/user"
16+
17+
"github.com/cortexproject/cortex/integration/e2e"
18+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
19+
"github.com/cortexproject/cortex/integration/e2ecortex"
20+
client2 "github.com/cortexproject/cortex/pkg/ingester/client"
21+
)
22+
23+
func TestQuerierWithStreamingBlocksAndChunksIngesters(t *testing.T) {
24+
s, err := e2e.NewScenario(networkName)
25+
require.NoError(t, err)
26+
defer s.Close()
27+
28+
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))
29+
chunksFlags := mergeFlags(ChunksStorageFlags, map[string]string{})
30+
31+
blockFlags := mergeFlags(BlocksStorageFlags, map[string]string{
32+
"-experimental.blocks-storage.tsdb.block-ranges-period": "1h",
33+
"-experimental.blocks-storage.tsdb.head-compaction-interval": "1m",
34+
"-experimental.store-gateway.sharding-enabled": "false",
35+
"-querier.ingester-streaming": "true",
36+
})
37+
38+
// Start dependencies.
39+
consul := e2edb.NewConsul()
40+
minio := e2edb.NewMinio(9000, blockFlags["-experimental.blocks-storage.s3.bucket-name"])
41+
require.NoError(t, s.StartAndWaitReady(consul, minio))
42+
43+
// Start Cortex components.
44+
ingesterBlocks := e2ecortex.NewIngester("ingester-blocks", consul.NetworkHTTPEndpoint(), blockFlags, "")
45+
ingesterChunks := e2ecortex.NewIngester("ingester-chunks", consul.NetworkHTTPEndpoint(), chunksFlags, "")
46+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), blockFlags, "")
47+
require.NoError(t, s.StartAndWaitReady(ingesterBlocks, ingesterChunks, storeGateway))
48+
49+
// Sharding is disabled, pass gateway address.
50+
querierFlags := mergeFlags(blockFlags, map[string]string{
51+
"-experimental.querier.store-gateway-addresses": strings.Join([]string{storeGateway.NetworkGRPCEndpoint()}, ","),
52+
"-distributor.shard-by-all-labels": "true",
53+
})
54+
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), querierFlags, "")
55+
require.NoError(t, s.StartAndWaitReady(querier))
56+
57+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(1024), "cortex_ring_tokens_total"))
58+
59+
s1 := []client2.Sample{
60+
{Value: 1, TimestampMs: 1000},
61+
{Value: 2, TimestampMs: 2000},
62+
{Value: 3, TimestampMs: 3000},
63+
{Value: 4, TimestampMs: 4000},
64+
{Value: 5, TimestampMs: 5000},
65+
}
66+
67+
s2 := []client2.Sample{
68+
{Value: 1, TimestampMs: 1000},
69+
{Value: 2.5, TimestampMs: 2500},
70+
{Value: 3, TimestampMs: 3000},
71+
{Value: 5.5, TimestampMs: 5500},
72+
}
73+
74+
clientConfig := client2.Config{}
75+
clientConfig.RegisterFlags(flag.NewFlagSet("unused", flag.ContinueOnError)) // registers default values
76+
77+
// Push data to chunks ingester.
78+
{
79+
ingesterChunksClient, err := client2.MakeIngesterClient(ingesterChunks.GRPCEndpoint(), clientConfig)
80+
require.NoError(t, err)
81+
defer ingesterChunksClient.Close()
82+
83+
_, err = ingesterChunksClient.Push(user.InjectOrgID(context.Background(), "user"), &client2.WriteRequest{
84+
Timeseries: []client2.PreallocTimeseries{
85+
{TimeSeries: &client2.TimeSeries{Labels: []client2.LabelAdapter{{Name: labels.MetricName, Value: "s"}, {Name: "l", Value: "1"}}, Samples: s1}},
86+
{TimeSeries: &client2.TimeSeries{Labels: []client2.LabelAdapter{{Name: labels.MetricName, Value: "s"}, {Name: "l", Value: "2"}}, Samples: s1}}},
87+
Source: client2.API,
88+
})
89+
require.NoError(t, err)
90+
}
91+
92+
// Push data to blocks ingester.
93+
{
94+
ingesterBlocksClient, err := client2.MakeIngesterClient(ingesterBlocks.GRPCEndpoint(), clientConfig)
95+
require.NoError(t, err)
96+
defer ingesterBlocksClient.Close()
97+
98+
_, err = ingesterBlocksClient.Push(user.InjectOrgID(context.Background(), "user"), &client2.WriteRequest{
99+
Timeseries: []client2.PreallocTimeseries{
100+
{TimeSeries: &client2.TimeSeries{Labels: []client2.LabelAdapter{{Name: labels.MetricName, Value: "s"}, {Name: "l", Value: "2"}}, Samples: s2}},
101+
{TimeSeries: &client2.TimeSeries{Labels: []client2.LabelAdapter{{Name: labels.MetricName, Value: "s"}, {Name: "l", Value: "3"}}, Samples: s1}}},
102+
Source: client2.API,
103+
})
104+
require.NoError(t, err)
105+
}
106+
107+
c, err := e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user")
108+
require.NoError(t, err)
109+
110+
// Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both).
111+
result, err := c.Query("s[1m]", time.Unix(10, 0))
112+
require.NoError(t, err)
113+
114+
s1Values := []model.SamplePair{
115+
{Value: 1, Timestamp: 1000},
116+
{Value: 2, Timestamp: 2000},
117+
{Value: 3, Timestamp: 3000},
118+
{Value: 4, Timestamp: 4000},
119+
{Value: 5, Timestamp: 5000},
120+
}
121+
122+
s1AndS2ValuesMerged := []model.SamplePair{
123+
{Value: 1, Timestamp: 1000},
124+
{Value: 2, Timestamp: 2000},
125+
{Value: 2.5, Timestamp: 2500},
126+
{Value: 3, Timestamp: 3000},
127+
{Value: 4, Timestamp: 4000},
128+
{Value: 5, Timestamp: 5000},
129+
{Value: 5.5, Timestamp: 5500},
130+
}
131+
132+
expectedMatrix := model.Matrix{
133+
// From chunks ingester only.
134+
&model.SampleStream{
135+
Metric: model.Metric{labels.MetricName: "s", "l": "1"},
136+
Values: s1Values,
137+
},
138+
139+
// From blocks ingester only.
140+
&model.SampleStream{
141+
Metric: model.Metric{labels.MetricName: "s", "l": "3"},
142+
Values: s1Values,
143+
},
144+
145+
// Merged from both ingesters.
146+
&model.SampleStream{
147+
Metric: model.Metric{labels.MetricName: "s", "l": "2"},
148+
Values: s1AndS2ValuesMerged,
149+
},
150+
}
151+
152+
require.Equal(t, model.ValMatrix, result.Type())
153+
require.ElementsMatch(t, expectedMatrix, result.(model.Matrix))
154+
}

pkg/querier/distributor_queryable.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,9 @@ func (q *distributorQuerier) streamingSelect(minT, maxT int64, matchers []*label
137137
return storage.ErrSeriesSet(err)
138138
}
139139

140-
if len(results.Timeseries) != 0 {
141-
return newTimeSeriesSeriesSet(results.Timeseries)
140+
sets := []storage.SeriesSet(nil)
141+
if len(results.Timeseries) > 0 {
142+
sets = append(sets, newTimeSeriesSeriesSet(results.Timeseries))
142143
}
143144

144145
serieses := make([]storage.Series, 0, len(results.Chunkseries))
@@ -156,15 +157,27 @@ func (q *distributorQuerier) streamingSelect(minT, maxT int64, matchers []*label
156157
return storage.ErrSeriesSet(err)
157158
}
158159

159-
series := &chunkSeries{
160+
serieses = append(serieses, &chunkSeries{
160161
labels: ls,
161162
chunks: chunks,
162163
chunkIteratorFunc: q.chunkIterFn,
163-
}
164-
serieses = append(serieses, series)
164+
mint: minT,
165+
maxt: maxT,
166+
})
167+
}
168+
169+
if len(serieses) > 0 {
170+
sets = append(sets, series.NewConcreteSeriesSet(serieses))
165171
}
166172

167-
return series.NewConcreteSeriesSet(serieses)
173+
if len(sets) == 0 {
174+
return storage.EmptySeriesSet()
175+
}
176+
if len(sets) == 1 {
177+
return sets[0]
178+
}
179+
// Sets need to be sorted. Both series.NewConcreteSeriesSet and newTimeSeriesSeriesSet take care of that.
180+
return storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
168181
}
169182

170183
func (q *distributorQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {

pkg/querier/distributor_queryable_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,118 @@ func TestIngesterStreaming(t *testing.T) {
214214
require.NoError(t, seriesSet.Err())
215215
}
216216

217+
func TestIngesterStreamingMixedResults(t *testing.T) {
218+
const (
219+
mint = 0
220+
maxt = 10000
221+
)
222+
s1 := []client.Sample{
223+
{Value: 1, TimestampMs: 1000},
224+
{Value: 2, TimestampMs: 2000},
225+
{Value: 3, TimestampMs: 3000},
226+
{Value: 4, TimestampMs: 4000},
227+
{Value: 5, TimestampMs: 5000},
228+
}
229+
s2 := []client.Sample{
230+
{Value: 1, TimestampMs: 1000},
231+
{Value: 2.5, TimestampMs: 2500},
232+
{Value: 3, TimestampMs: 3000},
233+
{Value: 5.5, TimestampMs: 5500},
234+
}
235+
236+
mergedSamplesS1S2 := []client.Sample{
237+
{Value: 1, TimestampMs: 1000},
238+
{Value: 2, TimestampMs: 2000},
239+
{Value: 2.5, TimestampMs: 2500},
240+
{Value: 3, TimestampMs: 3000},
241+
{Value: 4, TimestampMs: 4000},
242+
{Value: 5, TimestampMs: 5000},
243+
{Value: 5.5, TimestampMs: 5500},
244+
}
245+
246+
d := &mockDistributor{}
247+
d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
248+
&client.QueryStreamResponse{
249+
Chunkseries: []client.TimeSeriesChunk{
250+
{
251+
Labels: []client.LabelAdapter{{Name: labels.MetricName, Value: "one"}},
252+
Chunks: convertToChunks(t, s1),
253+
},
254+
{
255+
Labels: []client.LabelAdapter{{Name: labels.MetricName, Value: "two"}},
256+
Chunks: convertToChunks(t, s1),
257+
},
258+
},
259+
260+
Timeseries: []client.TimeSeries{
261+
{
262+
Labels: []client.LabelAdapter{{Name: labels.MetricName, Value: "two"}},
263+
Samples: s2,
264+
},
265+
{
266+
Labels: []client.LabelAdapter{{Name: labels.MetricName, Value: "three"}},
267+
Samples: s1,
268+
},
269+
},
270+
},
271+
nil)
272+
273+
ctx := user.InjectOrgID(context.Background(), "0")
274+
queryable := newDistributorQueryable(d, true, mergeChunks, 0)
275+
querier, err := queryable.Querier(ctx, mint, maxt)
276+
require.NoError(t, err)
277+
278+
seriesSet := querier.Select(true, &storage.SelectHints{Start: mint, End: maxt}, labels.MustNewMatcher(labels.MatchRegexp, labels.MetricName, ".*"))
279+
require.NoError(t, seriesSet.Err())
280+
281+
require.True(t, seriesSet.Next())
282+
verifySeries(t, seriesSet.At(), labels.Labels{{Name: labels.MetricName, Value: "one"}}, s1)
283+
284+
require.True(t, seriesSet.Next())
285+
verifySeries(t, seriesSet.At(), labels.Labels{{Name: labels.MetricName, Value: "three"}}, s1)
286+
287+
require.True(t, seriesSet.Next())
288+
verifySeries(t, seriesSet.At(), labels.Labels{{Name: labels.MetricName, Value: "two"}}, mergedSamplesS1S2)
289+
290+
require.False(t, seriesSet.Next())
291+
require.NoError(t, seriesSet.Err())
292+
}
293+
294+
func verifySeries(t *testing.T, series storage.Series, l labels.Labels, samples []client.Sample) {
295+
require.Equal(t, l, series.Labels())
296+
297+
it := series.Iterator()
298+
for _, s := range samples {
299+
require.True(t, it.Next())
300+
require.Nil(t, it.Err())
301+
ts, v := it.At()
302+
require.Equal(t, s.Value, v)
303+
require.Equal(t, s.TimestampMs, ts)
304+
}
305+
require.False(t, it.Next())
306+
require.Nil(t, it.Err())
307+
}
308+
309+
func convertToChunks(t *testing.T, samples []client.Sample) []client.Chunk {
310+
// We need to make sure that there is atleast one chunk present,
311+
// else no series will be selected.
312+
promChunk, err := encoding.NewForEncoding(encoding.Bigchunk)
313+
require.NoError(t, err)
314+
315+
for _, s := range samples {
316+
c, err := promChunk.Add(model.SamplePair{Value: model.SampleValue(s.Value), Timestamp: model.Time(s.TimestampMs)})
317+
require.NoError(t, err)
318+
require.Nil(t, c)
319+
}
320+
321+
clientChunks, err := chunkcompat.ToChunks([]chunk.Chunk{
322+
chunk.NewChunk("", 0, nil, promChunk, model.Time(samples[0].TimestampMs), model.Time(samples[len(samples)-1].TimestampMs)),
323+
})
324+
require.NoError(t, err)
325+
326+
return clientChunks
327+
}
328+
217329
type mockDistributor struct {
218330
mock.Mock
219331
}

0 commit comments

Comments
 (0)