go使用sentinel实现限流和熔断重连
package main import ( "errors" "fmt" "log" "math/rand" "time" sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/core/circuitbreaker" "github.com/alibaba/sentinel-golang/core/config" "github.com/alibaba/sentinel-golang/core/flow" "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" ) func main() { err := sentinel.InitDefault() if err != nil { fmt.Println(err) return } _, err = flow.LoadRules([]*flow.Rule{ { Resource: "some-test", TokenCalculateStrategy: flow.Direct, // 直接 ControlBehavior: flow.Reject, // 直接拒绝 Threshold: 10, // 允许多少个 StatIntervalInMs: 1000, // 多长时间内 }, { Resource: "some-test1", TokenCalculateStrategy: flow.Direct, // 直接 ControlBehavior: flow.Throttling, // 匀速通过 Threshold: 10, // 允许多少个 StatIntervalInMs: 1000, // 多长时间内 当前设置代表1秒只允许10个,相当于每100毫秒放一个 }, { Resource: "some-test2", TokenCalculateStrategy: flow.WarmUp, // 冷启动 预热方式 缓慢增长访问量 ControlBehavior: flow.Reject, // 直接拒绝 Threshold: 1000, // 允许多少个 WarmUpPeriodSec: 10, // 多长时间内达到顶峰 WarmUpColdFactor: 3, // 预热因子,默认3 }, }) if err != nil { fmt.Println(err) return } // qps() // warmup() // throttling() breakerCount() } type stateChangeTestListener struct { } func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) { fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis()) } func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) { fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %d, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis()) } func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) { fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis()) } func breakerCount() { conf := config.NewDefaultConfig() // for testing, logging output to console conf.Sentinel.Log.Logger = logging.NewConsoleLogger() err := sentinel.InitWithConfig(conf) if err != nil { log.Fatal(err) } ch := make(chan struct{}) // Register a state change listener so that we could observer the state change of the internal circuit breaker. circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{}) _, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{ // Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50 { Resource: "abc", Strategy: circuitbreaker.ErrorCount, RetryTimeoutMs: 3000, // 熔断后重试时间 MinRequestAmount: 10, // 忽略前多少个请求 StatIntervalMs: 5000, // 几秒统计一次 StatSlidingWindowBucketCount: 10, Threshold: 50, // 达到多少个错误 }, }) if err != nil { log.Fatal(err) } logging.Info("[CircuitBreaker ErrorCount] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.") go func() { for { e, b := sentinel.Entry("abc") if b != nil { // g1 blocked time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond) } else { if rand.Uint64()%20 > 9 { // Record current invocation as error. sentinel.TraceError(e, errors.New("biz error")) } // g1 passed time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond) e.Exit() } } }() go func() { for { e, b := sentinel.Entry("abc") if b != nil { // g2 blocked time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond) } else { // g2 passed time.Sleep(time.Duration(rand.Uint64()%80) * time.Millisecond) e.Exit() } } }() <-ch } func throttling() { for i := 0; i < 200; i++ { e, b := sentinel.Entry("some-test1", sentinel.WithTrafficType(base.Inbound)) if b != nil { fmt.Println("限流了") } else { fmt.Println("访问成功") e.Exit() } time.Sleep(99 * time.Millisecond) // time.Sleep(100 * time.Millisecond) } } func warmup() { var total, totalBlock, totalSuc int for i := 0; i < 100; i++ { go func() { for { total++ e, b := sentinel.Entry("some-test2", sentinel.WithTrafficType(base.Inbound)) if b != nil { totalBlock++ // fmt.Println("限流了") } else { totalSuc++ // fmt.Println("访问成功") e.Exit() } time.Sleep(time.Duration(rand.Uint64()%20+10) * time.Millisecond) } }() } go func() { for { time.Sleep(1 * time.Second) fmt.Printf("请求数:%d,限流数:%d,通过数:%d\n\n", total, totalBlock, totalSuc) total, totalBlock, totalSuc = 0, 0, 0 } }() time.Sleep(20 * time.Second) } func qps() { for i := 0; i < 12; i++ { e, b := sentinel.Entry("some-test", sentinel.WithTrafficType(base.Inbound)) if b != nil { fmt.Println("限流了") } else { fmt.Println("访问成功") e.Exit() } } }
在gin中实际投入使用:
初始化时调用此方法
package initialize import ( "fmt" sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/flow" ) func Sentinel() { err := sentinel.InitDefault() if err != nil { fmt.Println(err) return } _, err = flow.LoadRules([]*flow.Rule{ { Resource: "some-test", TokenCalculateStrategy: flow.Direct, // 直接 ControlBehavior: flow.Reject, // 直接拒绝 Threshold: 1, // 允许多少个 StatIntervalInMs: 2000, // 多长时间内 }, { Resource: "some-test1", TokenCalculateStrategy: flow.Direct, // 直接 ControlBehavior: flow.Throttling, // 匀速通过 Threshold: 10, // 允许多少个 StatIntervalInMs: 1000, // 多长时间内 当前设置代表1秒只允许10个,相当于每100毫秒放一个 }, { Resource: "some-test2", TokenCalculateStrategy: flow.WarmUp, // 冷启动 预热方式 缓慢增长访问量 ControlBehavior: flow.Reject, // 直接拒绝 Threshold: 1000, // 允许多少个 WarmUpPeriodSec: 10, // 多长时间内达到顶峰 WarmUpColdFactor: 3, // 预热因子,默认3 }, }) if err != nil { fmt.Println(err) return } }
需要限流时这样用
e, b := sentinel.Entry("some-test") if b != nil { zap.S().Info("请求被限流") c.JSON(http.StatusTooManyRequests, gin.H{ "msg": "请求频繁,请稍后重试", }) return } list, err := global.GoodsClient.GoodsList(context.WithValue(context.Background(), "ginContext", c), &req) //这是远程请求 e.Exit()
发表评论