Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,12 @@ sharding_ring:
# CLI flag: -alertmanager.sharding-ring.detailed-metrics-enabled
[detailed_metrics_enabled: <boolean> | default = true]

# Disable extending the replica set when instances are unhealthy. This limits
# blast radius during config corruption incidents but reduces availability
# during normal failures.
# CLI flag: -alertmanager.sharding-ring.disable-replica-set-extension
[disable_replica_set_extension: <boolean> | default = false]

# The sleep seconds when alertmanager is shutting down. Need to be close to or
# larger than KV Store information propagation delay
# CLI flag: -alertmanager.sharding-ring.final-sleep
Expand Down
42 changes: 35 additions & 7 deletions pkg/alertmanager/alertmanager_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,45 @@ var SyncRingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE, ring.JOINING}, fun
return s != ring.ACTIVE
})

// Blast radius limited ring operations (with extension disabled)
var RingOpNoExtension = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool {
// Never extend replica set to limit blast radius during config corruption incidents
return false
})

var SyncRingOpNoExtension = ring.NewOp([]ring.InstanceState{ring.ACTIVE, ring.JOINING}, func(s ring.InstanceState) bool {
// Never extend replica set during sync to limit blast radius during config corruption incidents
return false
})

// Helper functions to select the appropriate ring operation based on config
func getRingOp(disableExtension bool) ring.Operation {
if disableExtension {
return RingOpNoExtension
}
return RingOp
}

func getSyncRingOp(disableExtension bool) ring.Operation {
if disableExtension {
return SyncRingOpNoExtension
}
return SyncRingOp
}

// RingConfig masks the ring lifecycler config which contains
// many options not really required by the alertmanager ring. This config
// is used to strip down the config to the minimum, and avoid confusion
// to the user.
type RingConfig struct {
KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
ReplicationFactor int `yaml:"replication_factor"`
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
TokensFilePath string `yaml:"tokens_file_path"`
DetailedMetricsEnabled bool `yaml:"detailed_metrics_enabled"`
KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances."`
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
ReplicationFactor int `yaml:"replication_factor"`
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
TokensFilePath string `yaml:"tokens_file_path"`
DetailedMetricsEnabled bool `yaml:"detailed_metrics_enabled"`
DisableReplicaSetExtension bool `yaml:"disable_replica_set_extension"`

FinalSleep time.Duration `yaml:"final_sleep"`
WaitInstanceStateTimeout time.Duration `yaml:"wait_instance_state_timeout"`
Expand Down Expand Up @@ -90,6 +117,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ZoneAwarenessEnabled, rfprefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate alerts across different availability zones.")
f.StringVar(&cfg.TokensFilePath, rfprefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
f.BoolVar(&cfg.DetailedMetricsEnabled, rfprefix+"detailed-metrics-enabled", true, "Set to true to enable ring detailed metrics. These metrics provide detailed information, such as token count and ownership per tenant. Disabling them can significantly decrease the number of metrics emitted.")
f.BoolVar(&cfg.DisableReplicaSetExtension, rfprefix+"disable-replica-set-extension", false, "Disable extending the replica set when instances are unhealthy. This limits blast radius during config corruption incidents but reduces availability during normal failures.")

// Instance flags
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
Expand Down
49 changes: 49 additions & 0 deletions pkg/alertmanager/alertmanager_ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,52 @@ func TestIsHealthyForAlertmanagerOperations(t *testing.T) {
})
}
}

func TestBlastRadiusProtection(t *testing.T) {
t.Parallel()

tests := map[string]struct {
operation ring.Operation
instance *ring.InstanceDesc
timeout time.Duration
expected bool
}{
"RingOp extends to unhealthy ACTIVE instance": {
operation: RingOp,
instance: &ring.InstanceDesc{State: ring.ACTIVE, Timestamp: time.Now().Add(-90 * time.Second).Unix()},
timeout: time.Minute,
expected: false,
},
"RingOpNoExtension excludes unhealthy ACTIVE instance": {
operation: RingOpNoExtension,
instance: &ring.InstanceDesc{State: ring.ACTIVE, Timestamp: time.Now().Add(-90 * time.Second).Unix()},
timeout: time.Minute,
expected: false,
},
"RingOp extends to LEAVING instance": {
operation: RingOp,
instance: &ring.InstanceDesc{State: ring.LEAVING, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
timeout: time.Minute,
expected: false,
},
"RingOpNoExtension excludes LEAVING instance": {
operation: RingOpNoExtension,
instance: &ring.InstanceDesc{State: ring.LEAVING, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
timeout: time.Minute,
expected: false,
},
"Both operations include healthy ACTIVE instance": {
operation: RingOp,
instance: &ring.InstanceDesc{State: ring.ACTIVE, Timestamp: time.Now().Add(-30 * time.Second).Unix()},
timeout: time.Minute,
expected: true,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
actual := testData.instance.IsHealthy(testData.operation, testData.timeout, time.Now())
assert.Equal(t, testData.expected, actual)
})
}
}
11 changes: 6 additions & 5 deletions pkg/alertmanager/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ type Distributor struct {

alertmanagerRing ring.ReadRing
alertmanagerClientsPool ClientsPool

logger log.Logger
ringConfig RingConfig
logger log.Logger
}

// NewDistributor constructs a new Distributor
func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *ring.Ring, alertmanagerClientsPool ClientsPool, logger log.Logger, reg prometheus.Registerer) (d *Distributor, err error) {
func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *ring.Ring, alertmanagerClientsPool ClientsPool, ringConfig RingConfig, logger log.Logger, reg prometheus.Registerer) (d *Distributor, err error) {
if alertmanagerClientsPool == nil {
alertmanagerClientsPool = newAlertmanagerClientsPool(client.NewRingServiceDiscovery(alertmanagersRing), cfg, logger, reg)
}
Expand All @@ -52,6 +52,7 @@ func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *r
maxRecvMsgSize: maxRecvMsgSize,
alertmanagerRing: alertmanagersRing,
alertmanagerClientsPool: alertmanagerClientsPool,
ringConfig: ringConfig,
}

d.Service = services.NewBasicService(nil, d.running, nil)
Expand Down Expand Up @@ -160,7 +161,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req
var responses []*httpgrpc.HTTPResponse
var responsesMtx sync.Mutex
grpcHeaders := httpToHttpgrpcHeaders(r.Header)
err = ring.DoBatch(r.Context(), RingOp, d.alertmanagerRing, nil, []uint32{users.ShardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
err = ring.DoBatch(r.Context(), getRingOp(d.ringConfig.DisableReplicaSetExtension), d.alertmanagerRing, nil, []uint32{users.ShardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error {
// Use a background context to make sure all alertmanagers get the request even if we return early.
localCtx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), userID), opentracing.SpanFromContext(r.Context()))
sp, localCtx := opentracing.StartSpanFromContext(localCtx, "Distributor.doQuorum")
Expand Down Expand Up @@ -207,7 +208,7 @@ func (d *Distributor) doQuorum(userID string, w http.ResponseWriter, r *http.Req

func (d *Distributor) doUnary(userID string, w http.ResponseWriter, r *http.Request, logger log.Logger) {
key := users.ShardByUser(userID)
replicationSet, err := d.alertmanagerRing.Get(key, RingOp, nil, nil, nil)
replicationSet, err := d.alertmanagerRing.Get(key, getRingOp(d.ringConfig.DisableReplicaSetExtension), nil, nil, nil)
if err != nil {
level.Error(logger).Log("msg", "failed to get replication set from the ring", "err", err)
w.WriteHeader(http.StatusInternalServerError)
Expand Down
3 changes: 2 additions & 1 deletion pkg/alertmanager/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,9 @@ func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int, responseBod

cfg := &MultitenantAlertmanagerConfig{}
flagext.DefaultValues(cfg)
cfg.ShardingRing.DisableReplicaSetExtension = false

d, err := NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, amRing, newMockAlertmanagerClientFactory(amByAddr), util_log.Logger, prometheus.NewRegistry())
d, err := NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, amRing, newMockAlertmanagerClientFactory(amByAddr), cfg.ShardingRing, util_log.Logger, prometheus.NewRegistry())
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))

Expand Down
14 changes: 7 additions & 7 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackC
am.grpcServer = server.NewServer(&handlerForGRPCServer{am: am})

am.alertmanagerClientsPool = newAlertmanagerClientsPool(client.NewRingServiceDiscovery(am.ring), cfg.AlertmanagerClient, logger, am.registry)
am.distributor, err = NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, am.ring, am.alertmanagerClientsPool, log.With(logger, "component", "AlertmanagerDistributor"), am.registry)
am.distributor, err = NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, am.ring, am.alertmanagerClientsPool, cfg.ShardingRing, log.With(logger, "component", "AlertmanagerDistributor"), am.registry)
if err != nil {
return nil, errors.Wrap(err, "create distributor")
}
Expand Down Expand Up @@ -515,7 +515,7 @@ func (am *MultitenantAlertmanager) starting(ctx context.Context) (err error) {
if am.cfg.ShardingEnabled {
// Store the ring state after the initial Alertmanager configs sync has been done and before we do change
// our state in the ring.
am.ringLastState, _ = am.ring.GetAllHealthy(RingOp)
am.ringLastState, _ = am.ring.GetAllHealthy(getRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension))

// Make sure that all the alertmanagers we were initially configured with have
// fetched state from the replicas, before advertising as ACTIVE. This will
Expand Down Expand Up @@ -688,7 +688,7 @@ func (am *MultitenantAlertmanager) run(ctx context.Context) error {
case <-ringTickerChan:
// We ignore the error because in case of error it will return an empty
// replication set which we use to compare with the previous state.
currRingState, _ := am.ring.GetAllHealthy(RingOp)
currRingState, _ := am.ring.GetAllHealthy(getRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension))

if ring.HasReplicationSetChanged(am.ringLastState, currRingState) {
am.ringLastState = currRingState
Expand Down Expand Up @@ -828,7 +828,7 @@ func (am *MultitenantAlertmanager) isUserOwned(userID string) bool {
return true
}

alertmanagers, err := am.ring.Get(users.ShardByUser(userID), SyncRingOp, nil, nil, nil)
alertmanagers, err := am.ring.Get(users.ShardByUser(userID), getSyncRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension), nil, nil, nil)
if err != nil {
am.ringCheckErrors.Inc()
level.Error(am.logger).Log("msg", "failed to load alertmanager configuration", "user", userID, "err", err)
Expand Down Expand Up @@ -1038,7 +1038,7 @@ func (am *MultitenantAlertmanager) GetPositionForUser(userID string) int {
return 0
}

set, err := am.ring.Get(users.ShardByUser(userID), RingOp, nil, nil, nil)
set, err := am.ring.Get(users.ShardByUser(userID), getRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension), nil, nil, nil)
if err != nil {
level.Error(am.logger).Log("msg", "unable to read the ring while trying to determine the alertmanager position", "err", err)
// If we're unable to determine the position, we don't want a tenant to miss out on the notification - instead,
Expand Down Expand Up @@ -1139,7 +1139,7 @@ func (am *MultitenantAlertmanager) ReplicateStateForUser(ctx context.Context, us
level.Debug(am.logger).Log("msg", "message received for replication", "user", userID, "key", part.Key)

selfAddress := am.ringLifecycler.GetInstanceAddr()
err := ring.DoBatch(ctx, RingOp, am.ring, nil, []uint32{users.ShardByUser(userID)}, func(desc ring.InstanceDesc, _ []int) error {
err := ring.DoBatch(ctx, getRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension), am.ring, nil, []uint32{users.ShardByUser(userID)}, func(desc ring.InstanceDesc, _ []int) error {
if desc.GetAddr() == selfAddress {
return nil
}
Expand Down Expand Up @@ -1171,7 +1171,7 @@ func (am *MultitenantAlertmanager) ReplicateStateForUser(ctx context.Context, us
func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, userID string) ([]*clusterpb.FullState, error) {
// Only get the set of replicas which contain the specified user.
key := users.ShardByUser(userID)
replicationSet, err := am.ring.Get(key, RingOp, nil, nil, nil)
replicationSet, err := am.ring.Get(key, getRingOp(am.cfg.ShardingRing.DisableReplicaSetExtension), nil, nil, nil)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions schemas/cortex-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,12 @@
"type": "boolean",
"x-cli-flag": "alertmanager.sharding-ring.detailed-metrics-enabled"
},
"disable_replica_set_extension": {
"default": false,
"description": "Disable extending the replica set when instances are unhealthy. This limits blast radius during config corruption incidents but reduces availability during normal failures.",
"type": "boolean",
"x-cli-flag": "alertmanager.sharding-ring.disable-replica-set-extension"
},
"final_sleep": {
"default": "0s",
"description": "The sleep seconds when alertmanager is shutting down. Need to be close to or larger than KV Store information propagation delay",
Expand Down
Loading