环形熔断器设计与gobreaker源码分析

it2025-09-29  4

环形熔断器

本文主要是阅读微软在早些年前发表的环形熔断器的设计的文章,Circuit Breaker Pattern。该文比较详细的介绍了环形熔断器设计的背景,及解决的问题。

环形熔断器设计背景

在诸如云之类的分布式环境中,应用程序执行访问远程资源和服务的操作,这些操作可能由于诸如网络连接缓慢,超时,资源过量使用或暂时不可用之类的瞬时故障而失败。这些故障通常会在短时间后自行纠正,因此应准备使用功能强大的云应用程序来处理这些故障,例如使用重试模式所描述的策略。

但是,也可能存在由于意外事件而导致故障的情况,这种情况很难预测,因此可能需要更长的时间进行纠正。这些故障的严重性范围从部分连接中断到服务完全失败。在这些情况下,应用程序连续重试执行不太可能成功的操作可能是没有意义的,相反,应用程序应迅速接受该操作已失败并相应地处理此失败。

此外,如果服务非常繁忙,则系统某一部分的故障可能会导致级联故障。例如,可以将调用服务的操作配置为实施超时,如果该服务在此期间未能响应,则使用失败消息进行回复。但是,此策略可能导致对同一操作的许多并发请求被阻止,直到超时时间到期为止。这些被阻止的请求可能包含关键的系统资源,例如内存,线程,数据库连接等等。因此,这些资源可能会耗尽,从而导致需要使用相同资源的系统其他可能不相关的部分出现故障。在这些情况下,最好操作立即失败,并且仅在可能成功的情况下才尝试调用服务。请注意,设置较短的超时可能有助于解决此问题,但是即使对于服务的请求最终将成功,超时也不应太短以至于大多数情况下操作都会失败。

以上翻译自原文,主要的场景就是在请求超时或者响应缓慢的情况下,为了防止大量的请求去把整个服务不可用,就限制访问,用返回错误而不是继续访问服务,这样来提高局部的可用性,这也是微服务中常见的服务降级的模式,不过在这个过程中如何去设置多久超时,多少处理请求是缓慢的都需要自己根据服务状况来定义,这是一个不好把控的地方。

环形熔断器设计思路

电路断路器模式可以防止应用程序反复尝试执行可能会失败的操作,从而使其继续运行,而无需等待错误被纠正或浪费CPU周期,而该应用程序确定该错误持续时间很长。断路器模式还使应用程序能够检测故障是否已解决。如果问题似乎已得到纠正,则应用程序可以尝试调用该操作。

断路器充当可能失败的操作的代理。代理应监视最近发生的故障数,然后使用此信息来决定是允许操作继续进行,还是直接返回异常。

代理可以实现为具有以下状态的状态机,这些状态可以模拟断路器的功能:

关闭状态:来自应用程序的请求被路由到操作。 代理维护最近失败次数的计数,如果对操作的调用失败,则代理会增加该计数。 如果最近的故障数在给定的时间段内超过了指定的阈值,则代理将置于“打开”状态。 此时,代理启动一个超时计时器,并且当该计时器到期时,该代理将进入“半打开”状态。打开状态:来自应用程序的请求立即失败,并且异常返回给应用程序。半开状态:允许来自应用程序的有限数量的请求通过并调用操作。 如果这些请求成功,则假定先前引起故障的故障已修复,并且断路器切换到“闭合”状态(故障计数器已重置)。 如果任何请求失败,则断路器将认为故障仍然存在,因此它将恢复为“打开”状态,并重新启动超时计时器,以使系统有更多时间从故障中恢复。

如上为选自官网的图片,对整个流程做个描述;

关闭状态,即断路器状态为关闭,此时请求进来的时候,判断断路器状态为关闭则直接执行操作,如果操作成功则返回,否则就提高失败计数并返回失败的结果。打开状态,当积累的失败次数达到阈值的时候,则将断路器状态转为打开状态并设置一个关闭状态的定时器,在指定时间之后设置为半开状态,此时所有经过断路器的请求都是直接返回失败。半开状态,当断路器设置的关闭状态超时之后,将断路器设置为半开状态,此时经过断路器的请求就代理到对应服务,如果执行成功增加计数如果增加计数到了设置的阈值,则将断路器设置为关闭状态,如果执行失败则重新设置到打开状态,即重新进入打开状态等待超时之后再进入半开状态。

大致理清了思路之后,就来查看一下开源的实现,gobreaker

gobreaker开源实现
// Package gobreaker implements the Circuit Breaker pattern. // See https://msdn.microsoft.com/en-us/library/dn589784.aspx. package gobreaker import ( "errors" "fmt" "sync" "time" ) // State is a type that represents a state of CircuitBreaker. type State int // These constants are states of CircuitBreaker. const ( StateClosed State = iota StateHalfOpen StateOpen ) var ( // ErrTooManyRequests is returned when the CB state is half open and the requests count is over the cb maxRequests ErrTooManyRequests = errors.New("too many requests") // ErrOpenState is returned when the CB state is open ErrOpenState = errors.New("circuit breaker is open") ) // String implements stringer interface. func (s State) String() string { switch s { case StateClosed: return "closed" case StateHalfOpen: return "half-open" case StateOpen: return "open" default: return fmt.Sprintf("unknown state: %d", s) } } // Counts holds the numbers of requests and their successes/failures. // CircuitBreaker clears the internal Counts either // on the change of the state or at the closed-state intervals. // Counts ignores the results of the requests sent before clearing. type Counts struct { Requests uint32 TotalSuccesses uint32 TotalFailures uint32 ConsecutiveSuccesses uint32 ConsecutiveFailures uint32 } func (c *Counts) onRequest() { c.Requests++ } func (c *Counts) onSuccess() { // 统计成功次数 c.TotalSuccesses++ c.ConsecutiveSuccesses++ c.ConsecutiveFailures = 0 } func (c *Counts) onFailure() { // 统计失败次数 c.TotalFailures++ c.ConsecutiveFailures++ c.ConsecutiveSuccesses = 0 } func (c *Counts) clear() { // 重置清零 c.Requests = 0 c.TotalSuccesses = 0 c.TotalFailures = 0 c.ConsecutiveSuccesses = 0 c.ConsecutiveFailures = 0 } // Settings configures CircuitBreaker: // // Name is the name of the CircuitBreaker. // // MaxRequests is the maximum number of requests allowed to pass through // when the CircuitBreaker is half-open. // If MaxRequests is 0, the CircuitBreaker allows only 1 request. // // Interval is the cyclic period of the closed state // for the CircuitBreaker to clear the internal Counts. // If Interval is 0, the CircuitBreaker doesn't clear internal Counts during the closed state. // // Timeout is the period of the open state, // after which the state of the CircuitBreaker becomes half-open. // If Timeout is 0, the timeout value of the CircuitBreaker is set to 60 seconds. // // ReadyToTrip is called with a copy of Counts whenever a request fails in the closed state. // If ReadyToTrip returns true, the CircuitBreaker will be placed into the open state. // If ReadyToTrip is nil, default ReadyToTrip is used. // Default ReadyToTrip returns true when the number of consecutive failures is more than 5. // // OnStateChange is called whenever the state of the CircuitBreaker changes. type Settings struct { Name string MaxRequests uint32 Interval time.Duration Timeout time.Duration ReadyToTrip func(counts Counts) bool OnStateChange func(name string, from State, to State) } // CircuitBreaker is a state machine to prevent sending requests that are likely to fail. type CircuitBreaker struct { name string maxRequests uint32 interval time.Duration timeout time.Duration readyToTrip func(counts Counts) bool onStateChange func(name string, from State, to State) mutex sync.Mutex state State generation uint64 counts Counts expiry time.Time } // TwoStepCircuitBreaker is like CircuitBreaker but instead of surrounding a function // with the breaker functionality, it only checks whether a request can proceed and // expects the caller to report the outcome in a separate step using a callback. type TwoStepCircuitBreaker struct { cb *CircuitBreaker } // NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings. func NewCircuitBreaker(st Settings) *CircuitBreaker { // 重新初始化一个breaker cb := new(CircuitBreaker) cb.name = st.Name cb.interval = st.Interval cb.onStateChange = st.OnStateChange // 设置状态改变函数 if st.MaxRequests == 0 { // 设置同时处理的最大请求数 cb.maxRequests = 1 } else { cb.maxRequests = st.MaxRequests } if st.Timeout == 0 { // 设置断路器定时器超时时间 cb.timeout = defaultTimeout } else { cb.timeout = st.Timeout } if st.ReadyToTrip == nil { // 设置连续失败的判断条件函数 cb.readyToTrip = defaultReadyToTrip } else { cb.readyToTrip = st.ReadyToTrip } cb.toNewGeneration(time.Now()) // 通过分代来管理每一个状态周期 return cb } // NewTwoStepCircuitBreaker returns a new TwoStepCircuitBreaker configured with the given Settings. func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker { return &TwoStepCircuitBreaker{ cb: NewCircuitBreaker(st), } } const defaultTimeout = time.Duration(60) * time.Second func defaultReadyToTrip(counts Counts) bool { // 默认的失败判断条件 return counts.ConsecutiveFailures > 5 } // Name returns the name of the CircuitBreaker. func (cb *CircuitBreaker) Name() string { return cb.name } // State returns the current state of the CircuitBreaker. func (cb *CircuitBreaker) State() State { // 返回断路器当前状态 cb.mutex.Lock() defer cb.mutex.Unlock() now := time.Now() state, _ := cb.currentState(now) return state } // Execute runs the given request if the CircuitBreaker accepts it. // Execute returns an error instantly if the CircuitBreaker rejects the request. // Otherwise, Execute returns the result of the request. // If a panic occurs in the request, the CircuitBreaker handles it as an error // and causes the same panic again. func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) { generation, err := cb.beforeRequest() // 提供给代理调用的执行函数 if err != nil { return nil, err } defer func() { e := recover() if e != nil { cb.afterRequest(generation, false) panic(e) } }() result, err := req() cb.afterRequest(generation, err == nil) return result, err } // Name returns the name of the TwoStepCircuitBreaker. func (tscb *TwoStepCircuitBreaker) Name() string { return tscb.cb.Name() } // State returns the current state of the TwoStepCircuitBreaker. func (tscb *TwoStepCircuitBreaker) State() State { return tscb.cb.State() } // Allow checks if a new request can proceed. It returns a callback that should be used to // register the success or failure in a separate step. If the circuit breaker doesn't allow // requests, it returns an error. func (tscb *TwoStepCircuitBreaker) Allow() (done func(success bool), err error) { generation, err := tscb.cb.beforeRequest() if err != nil { return nil, err } return func(success bool) { tscb.cb.afterRequest(generation, success) }, nil } func (cb *CircuitBreaker) beforeRequest() (uint64, error) { cb.mutex.Lock() defer cb.mutex.Unlock() // 加锁 now := time.Now() state, generation := cb.currentState(now) // 获取当前到了第几代和状态 if state == StateOpen { // 如果是打开状态则直接返回错误 return generation, ErrOpenState } else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests { return generation, ErrTooManyRequests // 如果当前是半开状态并且当前处理的请求数大于最大请求数则返回错误 } cb.counts.onRequest() // 请求数加一 return generation, nil } func (cb *CircuitBreaker) afterRequest(before uint64, success bool) { cb.mutex.Lock() defer cb.mutex.Unlock() // 加锁 now := time.Now() state, generation := cb.currentState(now) // 获取当前状态 如果当前的这一代和下一代不同则返回 if generation != before { return } if success { cb.onSuccess(state, now) // 如果执行成功则设置成功的计数 } else { cb.onFailure(state, now) // 如果执行失败则设置失败的计数 } } func (cb *CircuitBreaker) onSuccess(state State, now time.Time) { switch state { case StateClosed: // 如果当前状态为关闭则增加成功计数 cb.counts.onSuccess() case StateHalfOpen: // 如果当前状态是半开则增加成功计数 cb.counts.onSuccess() if cb.counts.ConsecutiveSuccesses >= cb.maxRequests { // 如果连续成功次数大于最大请求数则设置断路器为关闭状态 cb.setState(StateClosed, now) } } } func (cb *CircuitBreaker) onFailure(state State, now time.Time) { switch state { case StateClosed: // 如果状态为关闭 cb.counts.onFailure() // 增加失败计数 if cb.readyToTrip(cb.counts) { // 如果当前的失败计数满足设置的失败阈值条件则设置断路器为打开状态 cb.setState(StateOpen, now) } case StateHalfOpen: // 如果当前断路器为半开状态则直接设置断路器为打开状态 cb.setState(StateOpen, now) } } func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) { switch cb.state { case StateClosed: // 如果当前超时时间不等于0并且已经过了超时时间则初始化一个新的周期 if !cb.expiry.IsZero() && cb.expiry.Before(now) { cb.toNewGeneration(now) } case StateOpen: // 如果当前状态为打开 if cb.expiry.Before(now) { // 如果已经超时则设置为半开状态 cb.setState(StateHalfOpen, now) } } return cb.state, cb.generation } func (cb *CircuitBreaker) setState(state State, now time.Time) { if cb.state == state { // 如果状态相同则不设置 return } prev := cb.state cb.state = state cb.toNewGeneration(now) if cb.onStateChange != nil { // 调用状态改变的回调函数 cb.onStateChange(cb.name, prev, state) } } func (cb *CircuitBreaker) toNewGeneration(now time.Time) { // 生成新的一代 cb.generation++ cb.counts.clear() // 重置清零 var zero time.Time switch cb.state { case StateClosed: // 如果状态为关闭 if cb.interval == 0 { // 是否有超时时间没有则为0 cb.expiry = zero } else { cb.expiry = now.Add(cb.interval) // 设置超时的时间 } case StateOpen: cb.expiry = now.Add(cb.timeout) // 如果状态为打开则设置过期时间 default: // StateHalfOpen cb.expiry = zero // 状态为半开则超时时间设置为关闭 } }

短短的三百来行代码,就将整个的环形断路器的流程实现了,代码简洁紧凑,通过代码的实现也可以看出实现的思路跟上文的思路是基本吻合的。

总结

本文简单的学习了解了有关微软有关环形熔断器的设计思路,该思路能够比较好的解决在微服务的实现过程中有关熔断的方案,学习之后发现确实很有意思,后续再继续学习有关内容。由于本人才疏学浅,如有错误请批评指正。

最新回复(0)