环形熔断器
本文主要是阅读微软在早些年前发表的环形熔断器的设计的文章,Circuit Breaker Pattern。该文比较详细的介绍了环形熔断器设计的背景,及解决的问题。
环形熔断器设计背景
在诸如云之类的分布式环境中,应用程序执行访问远程资源和服务的操作,这些操作可能由于诸如网络连接缓慢,超时,资源过量使用或暂时不可用之类的瞬时故障而失败。这些故障通常会在短时间后自行纠正,因此应准备使用功能强大的云应用程序来处理这些故障,例如使用重试模式所描述的策略。
但是,也可能存在由于意外事件而导致故障的情况,这种情况很难预测,因此可能需要更长的时间进行纠正。这些故障的严重性范围从部分连接中断到服务完全失败。在这些情况下,应用程序连续重试执行不太可能成功的操作可能是没有意义的,相反,应用程序应迅速接受该操作已失败并相应地处理此失败。
此外,如果服务非常繁忙,则系统某一部分的故障可能会导致级联故障。例如,可以将调用服务的操作配置为实施超时,如果该服务在此期间未能响应,则使用失败消息进行回复。但是,此策略可能导致对同一操作的许多并发请求被阻止,直到超时时间到期为止。这些被阻止的请求可能包含关键的系统资源,例如内存,线程,数据库连接等等。因此,这些资源可能会耗尽,从而导致需要使用相同资源的系统其他可能不相关的部分出现故障。在这些情况下,最好操作立即失败,并且仅在可能成功的情况下才尝试调用服务。请注意,设置较短的超时可能有助于解决此问题,但是即使对于服务的请求最终将成功,超时也不应太短以至于大多数情况下操作都会失败。
以上翻译自原文,主要的场景就是在请求超时或者响应缓慢的情况下,为了防止大量的请求去把整个服务不可用,就限制访问,用返回错误而不是继续访问服务,这样来提高局部的可用性,这也是微服务中常见的服务降级的模式,不过在这个过程中如何去设置多久超时,多少处理请求是缓慢的都需要自己根据服务状况来定义,这是一个不好把控的地方。
环形熔断器设计思路
电路断路器模式可以防止应用程序反复尝试执行可能会失败的操作,从而使其继续运行,而无需等待错误被纠正或浪费CPU周期,而该应用程序确定该错误持续时间很长。断路器模式还使应用程序能够检测故障是否已解决。如果问题似乎已得到纠正,则应用程序可以尝试调用该操作。
断路器充当可能失败的操作的代理。代理应监视最近发生的故障数,然后使用此信息来决定是允许操作继续进行,还是直接返回异常。
代理可以实现为具有以下状态的状态机,这些状态可以模拟断路器的功能:
关闭状态:来自应用程序的请求被路由到操作。 代理维护最近失败次数的计数,如果对操作的调用失败,则代理会增加该计数。 如果最近的故障数在给定的时间段内超过了指定的阈值,则代理将置于“打开”状态。 此时,代理启动一个超时计时器,并且当该计时器到期时,该代理将进入“半打开”状态。打开状态:来自应用程序的请求立即失败,并且异常返回给应用程序。半开状态:允许来自应用程序的有限数量的请求通过并调用操作。 如果这些请求成功,则假定先前引起故障的故障已修复,并且断路器切换到“闭合”状态(故障计数器已重置)。 如果任何请求失败,则断路器将认为故障仍然存在,因此它将恢复为“打开”状态,并重新启动超时计时器,以使系统有更多时间从故障中恢复。
如上为选自官网的图片,对整个流程做个描述;
关闭状态,即断路器状态为关闭,此时请求进来的时候,判断断路器状态为关闭则直接执行操作,如果操作成功则返回,否则就提高失败计数并返回失败的结果。打开状态,当积累的失败次数达到阈值的时候,则将断路器状态转为打开状态并设置一个关闭状态的定时器,在指定时间之后设置为半开状态,此时所有经过断路器的请求都是直接返回失败。半开状态,当断路器设置的关闭状态超时之后,将断路器设置为半开状态,此时经过断路器的请求就代理到对应服务,如果执行成功增加计数如果增加计数到了设置的阈值,则将断路器设置为关闭状态,如果执行失败则重新设置到打开状态,即重新进入打开状态等待超时之后再进入半开状态。
大致理清了思路之后,就来查看一下开源的实现,gobreaker
gobreaker开源实现
package gobreaker
import (
"errors"
"fmt"
"sync"
"time"
)
type State
int
const (
StateClosed State
= iota
StateHalfOpen
StateOpen
)
var (
ErrTooManyRequests
= errors
.New("too many requests")
ErrOpenState
= errors
.New("circuit breaker is open")
)
func (s State
) String() string {
switch s
{
case StateClosed
:
return "closed"
case StateHalfOpen
:
return "half-open"
case StateOpen
:
return "open"
default:
return fmt
.Sprintf("unknown state: %d", s
)
}
}
type Counts
struct {
Requests
uint32
TotalSuccesses
uint32
TotalFailures
uint32
ConsecutiveSuccesses
uint32
ConsecutiveFailures
uint32
}
func (c
*Counts
) onRequest() {
c
.Requests
++
}
func (c
*Counts
) onSuccess() {
c
.TotalSuccesses
++
c
.ConsecutiveSuccesses
++
c
.ConsecutiveFailures
= 0
}
func (c
*Counts
) onFailure() {
c
.TotalFailures
++
c
.ConsecutiveFailures
++
c
.ConsecutiveSuccesses
= 0
}
func (c
*Counts
) clear() {
c
.Requests
= 0
c
.TotalSuccesses
= 0
c
.TotalFailures
= 0
c
.ConsecutiveSuccesses
= 0
c
.ConsecutiveFailures
= 0
}
type Settings
struct {
Name
string
MaxRequests
uint32
Interval time
.Duration
Timeout time
.Duration
ReadyToTrip
func(counts Counts
) bool
OnStateChange
func(name
string, from State
, to State
)
}
type CircuitBreaker
struct {
name
string
maxRequests
uint32
interval time
.Duration
timeout time
.Duration
readyToTrip
func(counts Counts
) bool
onStateChange
func(name
string, from State
, to State
)
mutex sync
.Mutex
state State
generation
uint64
counts Counts
expiry time
.Time
}
type TwoStepCircuitBreaker
struct {
cb
*CircuitBreaker
}
func NewCircuitBreaker(st Settings
) *CircuitBreaker
{
cb
:= new(CircuitBreaker
)
cb
.name
= st
.Name
cb
.interval
= st
.Interval
cb
.onStateChange
= st
.OnStateChange
if st
.MaxRequests
== 0 {
cb
.maxRequests
= 1
} else {
cb
.maxRequests
= st
.MaxRequests
}
if st
.Timeout
== 0 {
cb
.timeout
= defaultTimeout
} else {
cb
.timeout
= st
.Timeout
}
if st
.ReadyToTrip
== nil {
cb
.readyToTrip
= defaultReadyToTrip
} else {
cb
.readyToTrip
= st
.ReadyToTrip
}
cb
.toNewGeneration(time
.Now())
return cb
}
func NewTwoStepCircuitBreaker(st Settings
) *TwoStepCircuitBreaker
{
return &TwoStepCircuitBreaker
{
cb
: NewCircuitBreaker(st
),
}
}
const defaultTimeout
= time
.Duration(60) * time
.Second
func defaultReadyToTrip(counts Counts
) bool {
return counts
.ConsecutiveFailures
> 5
}
func (cb
*CircuitBreaker
) Name() string {
return cb
.name
}
func (cb
*CircuitBreaker
) State() State
{
cb
.mutex
.Lock()
defer cb
.mutex
.Unlock()
now
:= time
.Now()
state
, _ := cb
.currentState(now
)
return state
}
func (cb
*CircuitBreaker
) Execute(req
func() (interface{}, error)) (interface{}, error) {
generation
, err
:= cb
.beforeRequest()
if err
!= nil {
return nil, err
}
defer func() {
e
:= recover()
if e
!= nil {
cb
.afterRequest(generation
, false)
panic(e
)
}
}()
result
, err
:= req()
cb
.afterRequest(generation
, err
== nil)
return result
, err
}
func (tscb
*TwoStepCircuitBreaker
) Name() string {
return tscb
.cb
.Name()
}
func (tscb
*TwoStepCircuitBreaker
) State() State
{
return tscb
.cb
.State()
}
func (tscb
*TwoStepCircuitBreaker
) Allow() (done
func(success
bool), err
error) {
generation
, err
:= tscb
.cb
.beforeRequest()
if err
!= nil {
return nil, err
}
return func(success
bool) {
tscb
.cb
.afterRequest(generation
, success
)
}, nil
}
func (cb
*CircuitBreaker
) beforeRequest() (uint64, error) {
cb
.mutex
.Lock()
defer cb
.mutex
.Unlock()
now
:= time
.Now()
state
, generation
:= cb
.currentState(now
)
if state
== StateOpen
{
return generation
, ErrOpenState
} else if state
== StateHalfOpen
&& cb
.counts
.Requests
>= cb
.maxRequests
{
return generation
, ErrTooManyRequests
}
cb
.counts
.onRequest()
return generation
, nil
}
func (cb
*CircuitBreaker
) afterRequest(before
uint64, success
bool) {
cb
.mutex
.Lock()
defer cb
.mutex
.Unlock()
now
:= time
.Now()
state
, generation
:= cb
.currentState(now
)
if generation
!= before
{
return
}
if success
{
cb
.onSuccess(state
, now
)
} else {
cb
.onFailure(state
, now
)
}
}
func (cb
*CircuitBreaker
) onSuccess(state State
, now time
.Time
) {
switch state
{
case StateClosed
:
cb
.counts
.onSuccess()
case StateHalfOpen
:
cb
.counts
.onSuccess()
if cb
.counts
.ConsecutiveSuccesses
>= cb
.maxRequests
{
cb
.setState(StateClosed
, now
)
}
}
}
func (cb
*CircuitBreaker
) onFailure(state State
, now time
.Time
) {
switch state
{
case StateClosed
:
cb
.counts
.onFailure()
if cb
.readyToTrip(cb
.counts
) {
cb
.setState(StateOpen
, now
)
}
case StateHalfOpen
:
cb
.setState(StateOpen
, now
)
}
}
func (cb
*CircuitBreaker
) currentState(now time
.Time
) (State
, uint64) {
switch cb
.state
{
case StateClosed
:
if !cb
.expiry
.IsZero() && cb
.expiry
.Before(now
) {
cb
.toNewGeneration(now
)
}
case StateOpen
:
if cb
.expiry
.Before(now
) {
cb
.setState(StateHalfOpen
, now
)
}
}
return cb
.state
, cb
.generation
}
func (cb
*CircuitBreaker
) setState(state State
, now time
.Time
) {
if cb
.state
== state
{
return
}
prev
:= cb
.state
cb
.state
= state
cb
.toNewGeneration(now
)
if cb
.onStateChange
!= nil {
cb
.onStateChange(cb
.name
, prev
, state
)
}
}
func (cb
*CircuitBreaker
) toNewGeneration(now time
.Time
) {
cb
.generation
++
cb
.counts
.clear()
var zero time
.Time
switch cb
.state
{
case StateClosed
:
if cb
.interval
== 0 {
cb
.expiry
= zero
} else {
cb
.expiry
= now
.Add(cb
.interval
)
}
case StateOpen
:
cb
.expiry
= now
.Add(cb
.timeout
)
default:
cb
.expiry
= zero
}
}
短短的三百来行代码,就将整个的环形断路器的流程实现了,代码简洁紧凑,通过代码的实现也可以看出实现的思路跟上文的思路是基本吻合的。
总结
本文简单的学习了解了有关微软有关环形熔断器的设计思路,该思路能够比较好的解决在微服务的实现过程中有关熔断的方案,学习之后发现确实很有意思,后续再继续学习有关内容。由于本人才疏学浅,如有错误请批评指正。