Skip to content

Commit b00cc53

Browse files
committed
Add support for alertmanager alert observer
Signed-off-by: Emmanuel Lodovice <[email protected]>
1 parent 32911bb commit b00cc53

File tree

8 files changed

+725
-52
lines changed

8 files changed

+725
-52
lines changed

pkg/alertmanager/alert_observer.go

Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
package alertmanager
2+
3+
import (
4+
"context"
5+
6+
"github.com/go-kit/log"
7+
"github.com/go-kit/log/level"
8+
"github.com/prometheus/alertmanager/alertobserver"
9+
"github.com/prometheus/alertmanager/notify"
10+
"github.com/prometheus/alertmanager/types"
11+
)
12+
13+
type ObserverLimits interface {
14+
AlertmanagerAlertLifeCycleObserverLevel(tenant string) int
15+
}
16+
17+
type AlertLifeCycleObserverLimiter struct {
18+
limits ObserverLimits
19+
tenant string
20+
}
21+
22+
func NewAlertLifeCycleObserverLimiter(tenant string, limits ObserverLimits) *AlertLifeCycleObserverLimiter {
23+
return &AlertLifeCycleObserverLimiter{
24+
tenant: tenant,
25+
limits: limits,
26+
}
27+
}
28+
29+
func (a *AlertLifeCycleObserverLimiter) Level() int {
30+
return a.limits.AlertmanagerAlertLifeCycleObserverLevel(a.tenant)
31+
}
32+
33+
type LogAlertLifeCycleObserver struct {
34+
logger log.Logger
35+
limiter *AlertLifeCycleObserverLimiter
36+
}
37+
38+
func NewLogAlertLifeCycleObserver(logger log.Logger, user string, limiter *AlertLifeCycleObserverLimiter) *LogAlertLifeCycleObserver {
39+
logger = log.With(logger, "user", user)
40+
logger = log.With(logger, "component", "observer")
41+
return &LogAlertLifeCycleObserver{
42+
logger: logger,
43+
limiter: limiter,
44+
}
45+
}
46+
47+
// Observe implements LifeCycleObserver
48+
func (o *LogAlertLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta alertobserver.AlertEventMeta) {
49+
if alerts == nil || o.limiter == nil || o.limiter.Level() <= 0 {
50+
return
51+
}
52+
// The general idea of having levels in the limiter is to control the volume of logs AM is producing. The observer
53+
// logs many types of events and some events are more important than others. By configuring the level we can
54+
// continue to have observability on the alerts at lower granularity if we see that the volume of logs is getting
55+
// too expensive.
56+
// What is log per level is as follows:
57+
//* 1
58+
// * Alert is rejected because of validation error
59+
// * Alert joins an aggregation group
60+
// * Alert is muted
61+
// * Aggregation Group notification Sent / Failed
62+
//* 2
63+
// * Alert is rejected because of validation error
64+
// * Alert joins an aggregation group
65+
// * Alert is muted
66+
// * Aggregation Group pipeline start
67+
// * Aggregation Group notification Sent / Failed
68+
//* 3
69+
// * Alert is rejected because of validation error
70+
// * Alert joins an aggregation group
71+
// * Alert is muted
72+
// * Aggregation Group pipeline start
73+
// * Aggregation Group passed a stage in the pipeline
74+
// * Aggregation Group notification Sent / Failed
75+
//* 4
76+
// * Alert is rejected because of validation error
77+
// * Alert joins an aggregation group
78+
// * Alert is muted
79+
// * Aggregation group pipeline start
80+
// * Aggregation Group passed a stage in the pipeline
81+
// * Alert in aggregation group is Sent / Failed
82+
//* 5
83+
// * Alert is rejected because of validation error
84+
// * Alert is received
85+
// * Alert joins an aggregation group
86+
// * Alert is muted
87+
// * Alert in aggregation group pipeline start
88+
// * Aggregation Group passed a stage in the pipeline
89+
// * Alert in aggregation group is Sent / Failed
90+
91+
switch event {
92+
case alertobserver.EventAlertReceived:
93+
o.Received(alerts)
94+
case alertobserver.EventAlertRejected:
95+
o.Rejected(alerts, meta)
96+
case alertobserver.EventAlertAddedToAggrGroup:
97+
o.AddedAggrGroup(alerts, meta)
98+
case alertobserver.EventAlertFailedAddToAggrGroup:
99+
o.FailedAddToAggrGroup(alerts, meta)
100+
case alertobserver.EventAlertPipelineStart:
101+
o.PipelineStart(alerts, meta)
102+
case alertobserver.EventAlertPipelinePassStage:
103+
o.PipelinePassStage(alerts, meta)
104+
case alertobserver.EventAlertSent:
105+
o.Sent(alerts, meta)
106+
case alertobserver.EventAlertSendFailed:
107+
o.SendFailed(alerts, meta)
108+
case alertobserver.EventAlertMuted:
109+
o.Muted(alerts, meta)
110+
}
111+
}
112+
113+
func (o *LogAlertLifeCycleObserver) Received(alerts []*types.Alert) {
114+
if o.limiter.Level() < 5 {
115+
return
116+
}
117+
for _, a := range alerts {
118+
o.logWithAlert(a, true, "msg", "Received")
119+
}
120+
}
121+
122+
func (o *LogAlertLifeCycleObserver) Rejected(alerts []*types.Alert, meta alertobserver.AlertEventMeta) {
123+
reason, ok := meta["msg"]
124+
if !ok {
125+
reason = "Unknown"
126+
}
127+
for _, a := range alerts {
128+
o.logWithAlert(a, true, "msg", "Rejected", "reason", reason)
129+
}
130+
}
131+
132+
func (o *LogAlertLifeCycleObserver) AddedAggrGroup(alerts []*types.Alert, meta alertobserver.AlertEventMeta) {
133+
groupKey, ok := meta["groupKey"]
134+
if !ok {
135+
return
136+
}
137+
for _, a := range alerts {
138+
o.logWithAlert(a, true, "msg", "Added to aggregation group", "groupKey", groupKey)
139+
}
140+
}
141+
142+
func (o *LogAlertLifeCycleObserver) FailedAddToAggrGroup(alerts []*types.Alert, meta alertobserver.AlertEventMeta) {
143+
reason, ok := meta["msg"]
144+
if !ok {
145+
reason = "Unknown"
146+
}
147+
for _, a := range alerts {
148+
o.logWithAlert(a, true, "msg", "Failed to add aggregation group", "reason", reason)
149+
}
150+
}
151+
152+
func (o *LogAlertLifeCycleObserver) PipelineStart(alerts []*types.Alert, meta alertobserver.AlertEventMeta) {
153+
logLvl := o.limiter.Level()
154+
if logLvl < 2 {
155+
return
156+
}
157+
ctx, ok := meta["ctx"]
158+
if !ok {
159+
return
160+
}
161+
receiver, ok := notify.ReceiverName(ctx.(context.Context))
162+
if !ok {
163+
return
164+
}
165+
groupKey, ok := notify.GroupKey(ctx.(context.Context))
166+
if !ok {
167+
return
168+
}
169+
if logLvl < 5 {
170+
level.Info(o.logger).Log("msg", "Entered the pipeline", "groupKey", groupKey, "receiver", receiver, "alertsCount", len(alerts))
171+
} else {
172+
for _, a := range alerts {
173+
o.logWithAlert(a, false, "msg", "Entered the pipeline", "groupKey", groupKey, "receiver", receiver)
174+
}
175+
}
176+
}
177+
178+
func (o *LogAlertLifeCycleObserver) PipelinePassStage(alerts []*types.Alert, meta alertobserver.AlertEventMeta) {
179+
if o.limiter.Level() < 3 {
180+
return
181+
}
182+
stageName, ok := meta["stageName"]
183+
if !ok {
184+
return
185+
}
186+
if stageName == "FanoutStage" {
187+
// Fanout stage is just a collection of stages, so we don't really need to log it. We know if the pipeline
188+
// enters the Fanout stage based on the logs of its substages
189+
return
190+
}
191+
ctx, ok := meta["ctx"]
192+
if !ok {
193+
return
194+
}
195+
receiver, ok := notify.ReceiverName(ctx.(context.Context))
196+
if !ok {
197+
return
198+
}
199+
groupKey, ok := notify.GroupKey(ctx.(context.Context))
200+
if !ok {
201+
return
202+
}
203+
level.Info(o.logger).Log("msg", "Passed stage", "groupKey", groupKey, "receiver", receiver, "stage", stageName, "alertsCount", len(alerts))
204+
}
205+
206+
func (o *LogAlertLifeCycleObserver) Sent(alerts []*types.Alert, meta alertobserver.AlertEventMeta) {
207+
ctx, ok := meta["ctx"]
208+
if !ok {
209+
return
210+
}
211+
integration, ok := meta["integration"]
212+
if !ok {
213+
return
214+
}
215+
receiver, ok := notify.ReceiverName(ctx.(context.Context))
216+
if !ok {
217+
return
218+
}
219+
groupKey, ok := notify.GroupKey(ctx.(context.Context))
220+
if !ok {
221+
return
222+
}
223+
if o.limiter.Level() < 4 {
224+
level.Info(o.logger).Log("msg", "Sent", "groupKey", groupKey, "receiver", receiver, "integration", integration, "alertsCount", len(alerts))
225+
} else {
226+
for _, a := range alerts {
227+
o.logWithAlert(a, false, "msg", "Sent", "groupKey", groupKey, "receiver", receiver, "integration", integration)
228+
}
229+
}
230+
}
231+
232+
func (o *LogAlertLifeCycleObserver) SendFailed(alerts []*types.Alert, meta alertobserver.AlertEventMeta) {
233+
ctx, ok := meta["ctx"]
234+
if !ok {
235+
return
236+
}
237+
integration, ok := meta["integration"]
238+
if !ok {
239+
return
240+
}
241+
receiver, ok := notify.ReceiverName(ctx.(context.Context))
242+
if !ok {
243+
return
244+
}
245+
groupKey, ok := notify.GroupKey(ctx.(context.Context))
246+
if !ok {
247+
return
248+
}
249+
if o.limiter.Level() < 4 {
250+
level.Info(o.logger).Log("msg", "Send failed", "groupKey", groupKey, "receiver", receiver, "integration", integration, "alertsCount", len(alerts))
251+
} else {
252+
for _, a := range alerts {
253+
o.logWithAlert(a, false, "msg", "Send failed", "groupKey", groupKey, "receiver", receiver, "integration", integration)
254+
}
255+
}
256+
}
257+
258+
func (o *LogAlertLifeCycleObserver) Muted(alerts []*types.Alert, meta alertobserver.AlertEventMeta) {
259+
ctx, ok := meta["ctx"]
260+
if !ok {
261+
return
262+
}
263+
groupKey, ok := notify.GroupKey(ctx.(context.Context))
264+
if !ok {
265+
return
266+
}
267+
for _, a := range alerts {
268+
o.logWithAlert(a, false, "msg", "Muted", "groupKey", groupKey)
269+
}
270+
}
271+
272+
func (o *LogAlertLifeCycleObserver) logWithAlert(alert *types.Alert, addLabels bool, keyvals ...interface{}) {
273+
keyvals = append(
274+
keyvals,
275+
"fingerprint",
276+
alert.Fingerprint().String(),
277+
"start",
278+
alert.StartsAt.Unix(),
279+
"end",
280+
alert.EndsAt.Unix(),
281+
)
282+
if addLabels {
283+
keyvals = append(keyvals, "labels", alert.Labels.String())
284+
}
285+
level.Info(o.logger).Log(keyvals...)
286+
}

0 commit comments

Comments
 (0)