Skip to content

Commit 3e845a4

Browse files
authored
Created new grpc instrument to record stream push metrics (#6855)
Signed-off-by: Alex Le <[email protected]>
1 parent cc202c7 commit 3e845a4

File tree

3 files changed

+72
-2
lines changed

3 files changed

+72
-2
lines changed

pkg/ingester/client/client.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,11 @@ func (c *closableHealthAndIngesterClient) handlePushRequest(mainFunc func() (*co
139139

140140
// MakeIngesterClient makes a new IngesterClient
141141
func MakeIngesterClient(addr string, cfg Config, useStreamConnection bool) (HealthAndIngesterClient, error) {
142-
dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(ingesterClientRequestDuration))
142+
unaryClientInterceptor, streamClientInterceptor := grpcclient.Instrument(ingesterClientRequestDuration)
143+
if useStreamConnection {
144+
unaryClientInterceptor, streamClientInterceptor = grpcclient.InstrumentReusableStream(ingesterClientRequestDuration)
145+
}
146+
dialOpts, err := cfg.GRPCClientConfig.DialOption(unaryClientInterceptor, streamClientInterceptor)
143147
if err != nil {
144148
return nil, err
145149
}
@@ -202,7 +206,7 @@ func (c *closableHealthAndIngesterClient) Run(streamPushChan chan *streamWriteJo
202206
var workerErr error
203207
var wg sync.WaitGroup
204208
for i := 0; i < INGESTER_CLIENT_STREAM_WORKER_COUNT; i++ {
205-
workerName := fmt.Sprintf("stream-push-worker-%d", i)
209+
workerName := fmt.Sprintf("ingester-%s-stream-push-worker-%d", c.addr, i)
206210
wg.Add(1)
207211
go func() {
208212
workerCtx := user.InjectOrgID(streamCtx, workerName)

pkg/util/grpcclient/instrumentation.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,18 @@ func Instrument(requestDuration *prometheus.HistogramVec) ([]grpc.UnaryClientInt
2525
cortexmiddleware.PrometheusGRPCStreamInstrumentation(requestDuration),
2626
}
2727
}
28+
29+
func InstrumentReusableStream(requestDuration *prometheus.HistogramVec) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
30+
return []grpc.UnaryClientInterceptor{
31+
grpcutil.HTTPHeaderPropagationClientInterceptor,
32+
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
33+
middleware.ClientUserHeaderInterceptor,
34+
cortexmiddleware.PrometheusGRPCUnaryInstrumentation(requestDuration),
35+
}, []grpc.StreamClientInterceptor{
36+
grpcutil.HTTPHeaderPropagationStreamClientInterceptor,
37+
unwrapErrorStreamClientInterceptor(),
38+
otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()),
39+
middleware.StreamClientUserHeaderInterceptor,
40+
cortexmiddleware.PrometheusGRPCReusableStreamInstrumentation(requestDuration),
41+
}
42+
}

pkg/util/middleware/grpc.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,57 @@ func (s *instrumentedClientStream) Header() (metadata.MD, error) {
8484
return md, err
8585
}
8686

87+
// PrometheusGRPCReusableStreamInstrumentation records duration of reusable streaming gRPC requests client side.
88+
func PrometheusGRPCReusableStreamInstrumentation(metric *prometheus.HistogramVec) grpc.StreamClientInterceptor {
89+
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
90+
streamer grpc.Streamer, opts ...grpc.CallOption,
91+
) (grpc.ClientStream, error) {
92+
stream, err := streamer(ctx, desc, cc, method, opts...)
93+
return &instrumentedReusableClientStream{
94+
metric: metric,
95+
method: method,
96+
ClientStream: stream,
97+
}, err
98+
}
99+
}
100+
101+
type instrumentedReusableClientStream struct {
102+
metric *prometheus.HistogramVec
103+
method string
104+
grpc.ClientStream
105+
}
106+
107+
func (s *instrumentedReusableClientStream) SendMsg(m interface{}) error {
108+
start := time.Now()
109+
err := s.ClientStream.SendMsg(m)
110+
if err != nil && err != io.EOF {
111+
s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(start).Seconds())
112+
return err
113+
}
114+
s.metric.WithLabelValues(s.method, errorCode(nil)).Observe(time.Since(start).Seconds())
115+
return err
116+
}
117+
118+
func (s *instrumentedReusableClientStream) RecvMsg(m interface{}) error {
119+
start := time.Now()
120+
err := s.ClientStream.RecvMsg(m)
121+
if err != nil && err != io.EOF {
122+
s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(start).Seconds())
123+
return err
124+
}
125+
s.metric.WithLabelValues(s.method, errorCode(nil)).Observe(time.Since(start).Seconds())
126+
return err
127+
}
128+
129+
func (s *instrumentedReusableClientStream) Header() (metadata.MD, error) {
130+
start := time.Now()
131+
md, err := s.ClientStream.Header()
132+
if err != nil {
133+
s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(start).Seconds())
134+
}
135+
return md, err
136+
}
137+
87138
func errorCode(err error) string {
88139
respStatus := "2xx"
89140
if err != nil {

0 commit comments

Comments
 (0)