概览
在kubernetes
的很多组件中,都会用到队列,为了方便使用,client-go
中k8s.io/client-go/util/workqueue
实现了通用的队列:
util/workqueue/doc.go
1
2
3
4
5
6
7
8
9
|
// Package workqueue provides a simple queue that supports the following
// features:
// * Fair: items processed in the order in which they are added.
// * Stingy: a single item will not be processed multiple times concurrently,
// and if an item is added multiple times before it can be processed, it
// will only be processed once.
// * Multiple consumers and producers. In particular, it is allowed for an
// item to be reenqueued while it is being processed.
// * Shutdown notifications.
|
根据注释中的文档,它具有以下特性:
- 公平:元素是按照先进先出的顺序处理的
- 吝啬:一个元素不会被处理多次,并且即使一个元素被添加多次,也只处理一次
- 支持多个生产者和消费者, 并且允许处理中的元素重新入队
- 关闭通知
利用 Go 的接口组合,延迟队列和限速队列的接口定义,都是基于最基础的通用队列 的 interface:
util/workqueue/queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
type Interface interface {
// 入队 将元素加入队尾
Add(item interface{})
// 队列长度
Len() int
// 出队 获取队列头部的元素
Get() (item interface{}, shutdown bool)
// 标记队列中该元素已经处理
Done(item interface{})
// 关闭队列
ShutDown()
// 查询队列是否关闭
ShuttingDown() bool
}
|
可以用一个简单的类图来表示他们的 interface 和实现之间的关系
实现
通用队列
通用队列是最基础的队列,支持最基本的队列功能, 它实现了workqueue.Interface
,它的结构如下
util/workqueue/queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
type Type struct {
//基于slice来保证顺序,里面的元素应该在dirty set中,不应该在processing set中
queue []t
// 保存需要被处理的元素
// set用于去重
dirty set
// 包含正在被处理的元素,他们同时也在dirty set,
// 当处理完成后,他们会从processing set中被移除
// 同时会检查是否在dirty set中,如果在,则将它加入queue中
processing set
cond *sync.Cond
// 标识是否在关闭
shuttingDown bool
// 监控指标
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
|
首先先看下入队方法Add
的实现,逻辑比较简单,正常的逻辑就是先加入dirty set
,然后加入队列中;如果已经存在dirty set
中则忽略,这就是前文中说到的workqueue
吝啬的特性.;如果插入dirty set
后发现正在处理中,则直接返回,因为在处理完调用Done
时,会将dirty set
中的元素重新加入队列.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func (q *Type) Add(item interface{}) {
//加锁
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 如果队列关闭中,则不再添加元素
if q.shuttingDown {
return
}
// 如果在dirty set中,则忽略
if q.dirty.has(item) {
return
}
q.metrics.add(item)
//先插入dirty set中
q.dirty.insert(item)
// 如果 item正在处理,则忽略
if q.processing.has(item) {
return
}
//加入队列
q.queue = append(q.queue, item)
//发送广播,这样其他阻塞的Get协程就会继续
q.cond.Signal()
}
|
在Get
时,如果队列中没有元素且没有关闭,会一直阻塞直到有数据入队或者队列关闭;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func (q *Type) Get() (item interface{}, shutdown bool) {
//加锁
q.cond.L.Lock()
defer q.cond.L.Unlock()
//如果队列中没有数据且没关闭,会阻塞,直到有元素加入队列
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
//如果没有数据,说明队列被关闭了
if len(q.queue) == 0 {
// 返回队列已经关闭
return nil, true
}
//出队
item, q.queue = q.queue[0], q.queue[1:]
q.metrics.get(item)
//将元素插入处理中
q.processing.insert(item)
//删除dirty中的元素
q.dirty.delete(item)
return item, false
}
|
Done
将元素标记为处理完成,如果处理中却还在 dirty 中,则将它重新入队.正常的逻辑是处理完一个元素后,调用Done
方法,会将processing set
中的元素删除.根据Add
的逻辑,如果在元素处理时Add
,元素会被放入 dirty set
中,这样在调用 Done
时便会重新加入队列.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func (q *Type) Done(item interface{}) {
//加锁
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
//将它从proccess set删除
q.processing.delete(item)
// 如果出现在dirty set中,说明在处理的过程中又被添加了,则从新放回队列中
if q.dirty.has(item) {
q.queue = append(q.queue, item)
//发送广播事件,通知关闭阻塞的Get方法
q.cond.Signal()
}
}
|
延时队列
从接口定义上看,延时队列除了通过接口组合拥有通用队列的全部方法,还多了一个 AddAfter
方法:
util/workqueue/delaying_queue.go
1
2
3
4
5
|
type DelayingInterface interface {
Interface
//在经过一段时间后添加元素
AddAfter(item interface{}, duration time.Duration)
}
|
DelayingInterface
对应的实现则是delayingType
,它的结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
type delayingType struct {
//包含一个通用队列
Interface
// 用于获取时间
clock clock.Clock
// 通知结束的channel
stopCh chan struct{}
// stopOnce guarantees we only signal shutdown a single time
// 确保只通知一次关闭
stopOnce sync.Once
// 心跳定时器,确保协程不会超过maxWait
heartbeat clock.Ticker
// waitingForAddCh is a buffered channel that feeds waitingForAdd
// 用来缓冲延迟添加的元素,最大容量1000,超过将会阻塞
waitingForAddCh chan *waitFor
// 重试的监控指标
metrics retryMetrics
}
|
既然延迟队列多了一个AddAfter
方法,首先看一下AddAfter
都做了什么:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
// 如果已经关闭则不再添加
if q.ShuttingDown() {
return
}
q.metrics.retry()
// immediately add things with no delay
// 如果延迟<=0则直接添加
if duration <= 0 {
q.Add(item)
return
}
select {
//当waitingForAddCh>1000时会被阻塞,所以这里读取stopChan确保在关闭时能正常退出
case <-q.stopCh:
//将数据写入缓冲channel
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
|
既然在AddAfter
往waitingForAddCh
中发送数据,自然有地方去消费数据,那就是waitingLoop
,它在初始化的时候创建 goroutine 调用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
never := make(<-chan time.Time)
var nextReadyAtTimer clock.Timer
//初始化一个waitFor的优先级队列
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
//使用map来避免重复添加元素,如果重复可能会更新时间
waitingEntryByData := map[t]*waitFor{}
for {
//如果队列关闭,则直接返回
if q.Interface.ShuttingDown() {
return
}
//获取当前时间
now := q.clock.Now()
// waitFor优先级队列中有数据
for waitingForQueue.Len() > 0 {
//获取优先级队列的第一个元素
entry := waitingForQueue.Peek().(*waitFor)
//如果还没到延迟的时间点,则跳出当前循环
if entry.readyAt.After(now) {
break
}
//取出第一个元素,放入队列中
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
//优先级队列中的第一个肯定是最早ready的
entry := waitingForQueue.Peek().(*waitFor)
// 这个元素ready的时间
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}
select {
case <-q.stopCh://如果队列关闭则退出循环
return
case <-q.heartbeat.C():// 定时器,每隔一段时间开始最外层循环
case <-nextReadyAt: //元素ready了,继续循环,会将元素加入队列
case waitEntry := <-q.waitingForAddCh://取出缓冲channel中的元素
//时间没到
if waitEntry.readyAt.After(q.clock.Now()) {
//将元素放入优先级队列,插入或者更新前面定义的waitingEntryByData
//这里需要注意的是如果添加的item已经重复了,使用的是最早ready的那个时间
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
//时间已经到了则直接放入队列
q.Add(waitEntry.data)
}
//waitingForAddCh是带缓冲的,可能有多个元素
//这里将他们都取出来出,处理逻辑和上面一样
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
|
这段代码中有使用到一个waitForPriorityQueue
的优先级队列,它是基于二叉堆实现的,二叉堆是用来实现优先级队列的最常见的数据结构.优先级队列最主要的性质就是父节点的值总是大于(小于)或者等于它的每一个子节点的值,并且在取出或者添加数据或能够依然维持这个性质.
限速队列
限速队列主要用在一些需要重试的场景,根据错误的次数逐渐增加等待时间.从接口定义上看,限速队列在延时队列的基础上,新增了限速相关的功能:
util/workqueue/rate_limiting_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
|
type RateLimitingInterface interface {
//延时队列
DelayingInterface
// 限速之后添加一个元素
AddRateLimited(item interface{})
// 元素已经结束重试,无论是成功还是失败都会停止限速,这个元素会被抛弃
Forget(item interface{})
// 返回元素重新入队的次数
NumRequeues(item interface{}) int
}
|
与之对应的实现则是rateLimitingType
,
1
2
3
4
5
|
type rateLimitingType struct {
DelayingInterface
rateLimiter RateLimiter
}
|
关于rateLimitingType
,最重要的就是rateLimiter
,它的定义也是个接口:
util/workqueue/default_rate_limiters.go
1
2
3
4
5
6
7
8
|
type RateLimiter interface {
// 获取元素等待的时间
When(item interface{}) time.Duration
// 元素已经结束重试,无论是成功还是失败都会停止限速,这个元素会被抛弃
Forget(item interface{})
// 返回元素重新入队的次数
NumRequeues(item interface{}) int
}
|
针对RateLimiter
,client-go 中有五种基于不同算法的实现:
- BucketRateLimiter:令牌桶算法,基于
golang.org/x/time/rate
实现
- ItemBucketRateLimiter:令牌桶算法,与 BucketRateLimiter 的区别是每个元素使用独立的限速器
- ItemExponentialFailureRateLimiter:指数退避算法
- ItemFastSlowRateLimiter:计数器算法
- MaxOfRateLimiter:混合模式算法,多种限速算法混合使用
BucketRateLimiter
令牌桶算法是限流的最常见的算法那,它的原理就是系统以一个很定的速度往桶里放令牌,每当有一个元素需要加入队列,需要从桶里获得令牌,只有拥有令牌的元素才能入队,而没有令牌的元素,则只能等待。这样就能通过控制令牌的发放速率来控制入队的速率。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
type BucketRateLimiter struct {
*rate.Limiter
}
func (r *BucketRateLimiter) When(item interface{}) time.Duration {
//返回等待的时间
return r.Limiter.Reserve().Delay()
}
func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
//不存在重新入队的情况
return 0
}
func (r *BucketRateLimiter) Forget(item interface{}) {
//没有重试机制
}
|
BucketRateLimiter
简单封装了 go 官方开发的golang.org/x/time/rate
包,调用when
方法,r.Limiter.Reserve().Delay()
返回等待时间.
ItemBucketRateLimiter
1
2
3
4
5
6
7
|
type ItemBucketRateLimiter struct {
r rate.Limit
burst int
limitersLock sync.Mutex
limiters map[interface{}]*rate.Limiter
}
|
ItemBucketRateLimiter
同样是基于golang.org/x/time/rate
,与ItemBucketRateLimiter
的区别是它是针对每个元素单独启用一个限流器.
ItemExponentialFailureRateLimiter
1
2
3
4
5
6
7
8
9
|
type ItemExponentialFailureRateLimiter struct {
failuresLock sync.Mutex
//元素入队失败的次数
failures map[interface{}]int
//最初的延迟
baseDelay time.Duration
//最大的延迟
maxDelay time.Duration
}
|
ItemExponentialFailureRateLimiter
使用元素的入队失败次数作为指数,随着失败次数的增加,重新入队的等待间隔也越来越长,但不会超过 maxDelay
.通过这种方式来限制相同元素的入队速率.其算法实现在When
函数中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
//获取当前元素的失败次数
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
//计算指数 2^exp*baseDelay
backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
//如果int64溢出了使用maxDelay
if backoff > math.MaxInt64 {
return r.maxDelay
}
calculated := time.Duration(backoff)
// 超过了maxDelay也使用maxDelay
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
|
ItemFastSlowRateLimiter
1
2
3
4
5
6
7
8
9
10
11
12
|
type ItemFastSlowRateLimiter struct {
failuresLock sync.Mutex
//失败次数
failures map[interface{}]int
//快速重试的阈值
maxFastAttempts int
//短延迟
fastDelay time.Duration
//长延迟
slowDelay time.Duration
}
|
ItemFastSlowRateLimiter
和ItemExponentialFailureRateLimiter
有些类似,它也是根据入队失败次数使用不同的延迟,只是在达到一定阈值(maxFastAttempts)前使用较低的延迟,在超过阈值后使用较高的延迟.它的实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
//错误次数+1
r.failures[item] = r.failures[item] + 1
//低于阈值使用短延迟
if r.failures[item] <= r.maxFastAttempts {
return r.fastDelay
}
return r.slowDelay
}
|
MaxOfRateLimiter
1
2
3
4
|
type MaxOfRateLimiter struct {
limiters []RateLimiter
}
|
MaxOfRateLimiter
的结构是一个RateLimiter
的 Slice,也就是说,它实际上可以使多种RateLimiter
实现的组合,在workqueue
包中DefaultControllerRateLimiter
使用的就是MaxOfRateLimiter
,需要注意的是,在调用When
和NumRequeues
时,他都是以其中的最大值为准,如When
:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
//如果等待时间比上一个大,则使用当前的等待时间
if curr > ret {
ret = curr
}
}
return ret
}
|
总结
通用队列,延迟队列和限速队列充分利用了 go 的接口和结构体组合的特性,并在实现过程中使用了队列,堆等基础的数据结构。通用队列的实现中使用sync.cond
来唤醒等待的 goroutine 也值得学习,可以说 workqueue 非常好的利用 go 语言的特性与思想。
参考
《Kubernetes 源码剖析》
WorkQueue