这篇文章会从一些源代码的角度来分析alertmanager告警的这个流程,基于版本0.16.1。由于本人水平有限,分析可能有一些不到位的地方,欢迎大家指出。由于涉及的阶段比较多,我打算分2篇写完。先上一张官方的架构图。

Alertmanager架构

alertmanager的架构非常清晰:图中左上角的API层包括从prometheus接收到告警以及silence的增删改查。接收到alert和silence之后,各自保存在一个provider对象中存储起来。后续,真正的告警流程由Dispatcher对象来进行处理。Dispatcher对象面会对告警进行分组和聚合,然后每一个告警组(Group)会依次执行Notification Pipeline,这就是基本的告警流程。

Dispatcher

Dispatcher对象是告警流程的执行者,在main中调用NewDispatcher初始化

disp = dispatch.NewDispatcher(alerts, dispatch.NewRoute(conf.Route, nil), pipeline, marker, timeoutFunc, logger)

下面是Dispatcher结构体的主要定义

type Dispatcher struct {
	route  *Route
	alerts provider.Alerts
	stage  notify.Stage

	marker  types.Marker
	timeout func(time.Duration) time.Duration

	aggrGroups map[*Route]map[model.Fingerprint]*aggrGroup
	mtx        sync.RWMutex

	done   chan struct{}
	ctx    context.Context
	cancel func()

	logger log.Logger
}

结构体中,首先是route。route是路由信息,在alertmanager的配置中配置,当收到告警后,匹配路由(其实也就是匹配label),然后执行指定的操作。接下来是alerts。alerts是alertmanager中待发送的告警信息store,prometheus发出的告警就是进入这个store保存下来,然后在Dispatcher中被消费。stage是一个接口,主要实现了exec方法,即流水线的执行动作。在Dispatcher中使用的stage结构是pipelines,pipelines里面包含了多个阶段,也就是架构图右侧的notification pipelines。其次是marker接口,marker接口实现了以下方法,相当于它是一个告警的处理者的抽象,对告警状态进行更新。后面是aggrGroups,在alertmanager中有对告警根据labels进行分组的概念,这就是保存每个组每条告警的map缓存。

type Marker interface {
	SetActive(alert model.Fingerprint)
	SetInhibited(alert model.Fingerprint, ids ...string)
	SetSilenced(alert model.Fingerprint, ids ...string)

	Count(...AlertState) int

	Status(model.Fingerprint) AlertStatus
	Delete(model.Fingerprint)

	Unprocessed(model.Fingerprint) bool
	Active(model.Fingerprint) bool
	Silenced(model.Fingerprint) ([]string, bool)
	Inhibited(model.Fingerprint) ([]string, bool)
}

下面看下Dispatcher的 run 方法

func (d *Dispatcher) run(it provider.AlertIterator) {
	cleanup := time.NewTicker(30 * time.Second)
	defer cleanup.Stop()

	defer it.Close()

	for {
		select {
		case alert, ok := <-it.Next():
			if !ok {
				// Iterator exhausted for some reason.
				if err := it.Err(); err != nil {
					level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
				}
				return
			}

			level.Debug(d.logger).Log("msg", "Received alert", "alert", alert)

			// Log errors but keep trying.
			if err := it.Err(); err != nil {
				level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
				continue
			}

			for _, r := range d.route.Match(alert.Labels) {
				d.processAlert(alert, r)
			}

		case <-cleanup.C:
			d.mtx.Lock()

			for _, groups := range d.aggrGroups {
				for _, ag := range groups {
					if ag.empty() {
						ag.stop()
						delete(groups, ag.fingerprint())
					}
				}
			}

			d.mtx.Unlock()

		case <-d.ctx.Done():
			return
		}
	}
}

it对象是一个alerts的迭代器,它的Next()方法是一个channel,从里面消费到alert。整个run方法的核心逻辑很简单,从it.Next()这个channel里面消费到alert之后,查看是否与所配置的路由匹配,如果匹配的话,进入alert的处理方法processAlert。

func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
	groupLabels := getGroupLabels(alert, route)

	fp := groupLabels.Fingerprint()

	d.mtx.Lock()
	defer d.mtx.Unlock()

	group, ok := d.aggrGroups[route]
	if !ok {
		group = map[model.Fingerprint]*aggrGroup{}
		d.aggrGroups[route] = group
	}

	// If the group does not exist, create it.
	ag, ok := group[fp]
	if !ok {
		ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
		group[fp] = ag

		go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
			_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
			if err != nil {
				level.Error(d.logger).Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err)
			}
			return err == nil
		})
	}

	ag.insert(alert)
}

这个方法的逻辑也是非常的清楚。在这里就是先拿出groupLabel,查看这个labels的hash是否存在,不存在则新建一个aggrGroup。接下来,执行stage的Exec方法。至此,Dispatcher的处理流程结束,已经进入了notification pipelines的阶段。

pipelines

先看看pipelines的创建,pipelines就是RoutingStage对象,RoutingStage是一个map,它的key是每一个receiver的名字,value是一个multistage对象。在每个multistage中,可以看到都会包括这么几个阶段,GossipSettleStage、InhibitStage、SilenceStage,后面的阶段由createStage函数创建,createStage返回一个FanoutStage,FanoutStage中为每一个receiver都创建了WaitStage、DedupStage、RetryStage、SetNotifierStage,可以 发现这都和上面的架构图是一一对应的。

func BuildPipeline(
	confs []*config.Receiver,
	tmpl *template.Template,
	wait func() time.Duration,
	muter types.Muter,
	silences *silence.Silences,
	notificationLog NotificationL执行时都会跑一个Goroutine。og,
	marker types.Marker,
	peer *cluster.Peer,
	logger log.Logger,
) RoutingStage {
	rs := RoutingStage{}

	ms := NewGossipSettleStage(peer)
	is := NewInhibitStage(muter)
	ss := NewSilenceStage(silences, marker)

	for _, rc := range confs {
		rs[rc.Name] = MultiStage{ms, is, ss, createStage(rc, tmpl, wait, notificationLog, logger)}
	}
	return rs
}

func createStage(rc *config.Receiver, tmpl *template.Template, wait func() time.Duration, notificationLog NotificationLog, logger log.Logger) Stage {
	var fs FanoutStage
	for _, i := range BuildReceiverIntegrations(rc, tmpl, logger) {
		recv := &nflogpb.Receiver{
			GroupName:   rc.Name,
			Integration: i.name,
			Idx:         uint32(i.idx),
		}
		var s MultiStage
		s = append(s, NewWaitStage(wait))
		s = append(s, NewDedupStage(i, notificationLog, recv))
		s = append(s, NewRetryStage(i, rc.Name))
		s = append(s, NewSetNotifiesStage(notificationLog, recv))

		fs = append(fs, s)
	}
	return fs
}

所有的小阶段都被包装在MultiStage和FanoutStage中,先来看看它们的exec方法是怎么实现的。在MultiStage中,exec很简单,遍历自身的Stage,依次执行每个阶段各自的exec方法,根据之前所说pipelines中的MultiStage中,保存了4个阶段,GossipSettle、Inhibit、Silence、FanoutStage,这四个阶段会依次调用exec方法。后面看看FanoutStage,它的类型其实与MultiStage相同,不同的是它的每个阶段都是通过一个Goroutine来执行。其实也比较好理解,GossipSettle、Inhibit、Silence这几个阶段属于发送前的准备工作,开启Gossip,禁止某些警报以及沉默某些警报,后面的几个阶段才是真正发送警报的流程。包括了Wait、Dedup、Retry以及SetNotify。

type MultiStage []Stage

func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
	var err error
	for _, s := range ms {
		if len(alerts) == 0 {
			return ctx, nil, nil
		}

		ctx, alerts, err = s.Exec(ctx, l, alerts...)
		if err != nil {
			return ctx, nil, err
		}
	}
	return ctx, alerts, nil
}

type FanoutStage []Stage

func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
	var (
		wg sync.WaitGroup
		me types.MultiError
	)
	wg.Add(len(fs))

	for _, s := range fs {
		go func(s Stage) {
			if _, _, err := s.Exec(ctx, l, alerts...); err != nil {
				me.Add(err)
				level.Error(l).Log("msg", "Error on notify", "err", err)
			}
			wg.Done()
		}(s)
	}
	wg.Wait()

	if me.Len() > 0 {
		return ctx, alerts, &me
	}
	return ctx, alerts, nil
}

GossipSettleStage

GossipSettle是第一个阶段,很简单,这个阶段的主要作用就是执行 n.peer.WaitReady方法,这个方法与alertmanager的高可用有关,对于每一个peer有一个readyChannel,当某一个peer处于unReady状态,就一直阻塞,ready之后就进入下一个阶段

type GossipSettleStage struct {
	peer *cluster.Peer
}

func NewGossipSettleStage(p *cluster.Peer) *GossipSettleStage {
	return &GossipSettleStage{peer: p}
}

func (n *GossipSettleStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
	if n.peer != nil {
		n.peer.WaitReady()
	}
	return ctx, alerts, nil
}

InhibitStage

下一个阶段是InhibitStage,顾名思义,对告警执行抑制。抑制在alertmanager中的配置文件中配置,满足某些label的条件则抑制该告警。下面的实现中是对alerts进行遍历,如果不满足抑制的匹配规则,则加入filtered ,进入下一个阶段;而匹配上的相当于被丢弃,不被处理

// InhibitStage filters alerts through an inhibition muter.
type InhibitStage struct {
	muter types.Muter
}

func NewInhibitStage(m types.Muter) *InhibitStage {
	return &InhibitStage{muter: m}
}

func (n *InhibitStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
	var filtered []*types.Alert
	for _, a := range alerts {
		if !n.muter.Mutes(a.Labels) {
			filtered = append(filtered, a)
		}
	}

	return ctx, filtered, nil
}

SilenceStage

Silence阶段是判断是否需要对告警执行静默处理。silence这个东西比较复杂,与inhibit不同,inhibit是在alertmanager中的配置文件进行配置,在3个AM组成的集群中,是允许不同的AM有不同的inhibit配置。而silence则是通过AM的API进行创建删除。在高可用的情况下,总不能某个AM的某条告警被silence掉了,而另外两个AM由于本身没有创建silence,就把告警发送出去了,这就无法起到高可用的作用。所以silence信息一样会通过gossip的消息在AM集群之间进行同步。由于gossip是一个最终一致性协议,所以肯定在中间会出现数据一致性的问题,让我们看看在发送silence的时候是怎么处理的。

// SilenceStage filters alerts through a silence muter.
type SilenceStage struct {
	silences *silence.Silences
	marker   types.Marker
}

func NewSilenceStage(s *silence.Silences, mk types.Marker) *SilenceStage {
	return &SilenceStage{
		silences: s,
		marker:   mk,
	}
}

func (n *SilenceStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
	var filtered []*types.Alert
	for _, a := range alerts {
		sils, err := n.silences.Query(
			silence.QState(types.SilenceStateActive),
			silence.QMatches(a.Labels),
		)
		if err != nil {
			level.Error(l).Log("msg", "Querying silences failed", "err", err)
		}

		if len(sils) == 0 {
			filtered = append(filtered, a)
			n.marker.SetSilenced(a.Labels.Fingerprint())
		} else {
			ids := make([]string, len(sils))
			for i, s := range sils {
				ids[i] = s.Id
			}
			n.marker.SetSilenced(a.Labels.Fingerprint(), ids...)
		}
	}

	return ctx, filtered, nil
}

首先遍历所有的alerts,根据每条告警,查询silences的store中是否有目前状态是active(即没有过期的),且labels与该告警匹配的silence,如果有,那么通过marker的SetSilenced方法将指定labels的告警设置为silenced状态。方法的关键就在于silences的查询过程,那就看看silences.Query这个方法

func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, error) {
	start := time.Now()
	s.metrics.queriesTotal.Inc()

	sils, err := func() ([]*pb.Silence, error) {
		q := &query{}
		for _, p := range params {
			if err := p(q); err != nil {
				return nil, err
			}
		}
		return s.query(q, s.now())
	}()
	if err != nil {
		s.metrics.queryErrorsTotal.Inc()
	}
	s.metrics.queryDuration.Observe(time.Since(start).Seconds())
	return sils, err
}

func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, error) {
	// If we have an ID constraint, all silences are our base set.
	// This and the use of post-filter functions is the
	// the trivial solution for now.
	var res []*pb.Silence

	s.mtx.Lock()
	defer s.mtx.Unlock()

	if q.ids != nil {
		for _, id := range q.ids {
			if s, ok := s.st[id]; ok {
				res = append(res, s.Silence)
			}
		}
	} else {
		for _, sil := range s.st {
			res = append(res, sil.Silence)
		}
	}

	var resf []*pb.Silence
	for _, sil := range res {
		remove := false
		for _, f := range q.filters {
			ok, err := f(sil, s, now)
			if err != nil {
				return nil, err
			}
			if !ok {
				remove = true
				break
			}
		}
		if !remove {
			resf = append(resf, cloneSilence(sil))
		}
	}

	return resf, nil
}

这两个方法比较长,silences.Query这个方法实际调用的是query方法,所以我们直接看query方法。query方法中,首先看参数的query里面有没有带上id限制,如果有指定id,就从s.st这个保存了当前所有silence的map找出指定id的silence。如果没有指定,则将map里面的所有silence加入res这个切片中,在这个方法中肯定是没有指定id,所以会先得到所有的silence。之后对res这个切片进行遍历,执行filter进行过滤。所谓的过滤即是比较之前调用Query时候指定的两个参数:状态为active,且能匹配上警报的标签。通过filter过滤,如果不满足,则remove标志为true,并break退出;如果一直都满足,则将这个silence对象加入到resf这个切片中,返回回去。可以看出,在上述silence的查询过程中,是有可能出现多个AM实例中silence状态不同的情况的,可能会出现数据一致性问题。

sils, err := n.silences.Query(
			silence.QState(types.SilenceStateActive),
			silence.QMatches(a.Labels),
		)