前言
近期在做Kubernetes
和Istio
微服务治理的落地项目,随着项目的推进,working on learning,对于Kubernetes
和Istio
算是入了门。机缘巧合之下,加入了云原生社区的 Kubernetes 源码研习社,一起学习 kubernetes 源码,希望能对于 Kubernetes 有更加深入的理解。
本系列文章将参考郑东旭老师《Kubernetes 源码剖析》一书的目录结构阅读。
文中涉及代码及版本为
1
|
git clone -b tags/kubernetes-1.18.6 https://github.com/kubernetes/client-go.git --depth=1
|
如涉及到编译和 Go 语言相关
1
2
|
➜ go version
go version go1.14.6 linux/amd64
|
概览
我们在使用 Go 基于 k8s 做二次开发时,client-go
是必不可少的依赖库,它封装了所有 与kube-apiserver
交互的操作,可以理解成 kube-apiserver
的 sdk。
在我们拉取 kubernetes 主仓库源码时,在vendor/k8s.io
目录下有一个 client-go,这个 client-go
是通过 git subtree
方式引用 kubernetes/client-go 的,从历史提交记录来看,我猜测它应该在在go moudle
诞生之前作为 vendor 依赖包用的,而目前 kubernetes 已支持 go module
,go.mod
中则通过replace
引用 staging/
目录下的k8s.io/client-go
,stage 目录下的该包会定时同步到 kubernetes/client-go。不过,这些都不重要,无论哪里的代码,只要是同一个版本,其实都是一样的代码。
我们在阅读源码之前,我们熟悉下 client-go 的目录结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
➜ tree -d -L 1
.
├── deprecated // 要废弃的代码
├── discovery // DiscoveryClient 相关,用于发现kube-apiserver所支持的资源组,资源版本,资源信息
├── dynamic // DynamicClient 相关,用于
├── examples // client-go 的一些example
├── Godeps // 使用 Godep 作为包管理时的相关文件
├── informers // 每种 kubernetes 资源的 Informer 的实现
├── kubernetes // ClientSet 相关,包含了所有group的client
├── kubernetes_test // 只包含有一个超时的单元测试函数,一个很奇怪的目录
├── listers // 为每个 Kubernetes 资源提供的 Lister功能,该功能对 Get 和 List 请求提供只读的缓存数据
├── metadata //GVR的interface定义
├── pkg // 包含一些导出的
├── plugin // 提供一些云厂商插件
├── rest // RestClient,封装了基础的 restful 操作
├── restmapper
├── scale // ScaleClient,用于deployment、replicaSet等资源的扩缩容
├── testing // 测试相关的包
├── third_party //第三方包,目前只是包含了从go标准库`text/template`中的部分私有代码
├── tools
├── transport //提供安全的TCP连接,支持http Stream,用于一些需要传输二进制流的场景,例如exec,attach等操作
└── util //工具包,包含 WorkQueue 工作队列,Certificate证书等操作
|
Client 对象
k8s.io/client-go
中有几个最重要的 Client 对象
- RestClient
- DiscoveryCient
- Clientset
- DynamicClient
他们提供了各种与kube-apiserver
交互的功能,本文也将主要阅读这几个 Client 的源码
RestClient
rest/client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
type RESTClient struct {
// net/url对象,可以方便地处理url的scheme,host,path,query等
base *url.URL
// url中的vxxx,比如v1/
versionedAPIPath string
// 包含序列化和反序列化的配置,比如配置Content-Type来设置body时返回yaml或者json
content ClientContentConfig
// RestClient的重试机制,BackoffManager 是一个interface
// 默认是不带Backoff的,可以自己实现重试机制,
// 或者用rest包中自带的URLBackoff,它使引用flowcontrol包中
// 基于指数退避算法的Backoff
createBackoffMgr func() BackoffManager
// 限流器,作用于所有使用RestClient实例化的对象发起的请求
// 它是一个interface,可以自己写一个实现该接口的限流器,
// 当然也可以用flowcontrol包中实现的令牌桶限流tokenBucketRateLimiter,
// 它使用golang.org/x/time/rate实现
rateLimiter flowcontrol.RateLimiter
// 如果想使用自己定制的http.Client,赋值给它就行
// 如果是nil,那么将使用默认的http.DefaultClient
Client *http.Client
}
|
k8s.io/client-go/rest
包的RestClient
基于 Go 标准库封的http.Client
封装了 RESTful API 相关约定的实现,所有对于 kube-apiserver
的 RESTful 请求都基于 RestClient
,它实现了rest.Interface
:
1
2
3
4
5
6
7
8
9
10
|
type Interface interface {
GetRateLimiter() flowcontrol.RateLimiter
Verb(verb string) *Request
Post() *Request
Put() *Request
Patch(pt types.PatchType) *Request
Get() *Request
Delete() *Request
APIVersion() schema.GroupVersion
}
|
实际上,client-go 中基于 RestClient
的 Client,都是基于 rest.Interface
而不是 RestClient 本身,理论上你可以自己实现一个 RestClient 来替代,这也充分体现了 Go 语言提倡的面向接口编程的思想。rest.Interface
的定义本身没啥难以理解的,但是其中Verb
函数返回的Request
却值得一看:
rest/request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
type Request struct {
c *RESTClient
//限流器
rateLimiter flowcontrol.RateLimiter
//重试机制
backoff BackoffManager
//超时
timeout time.Duration
verb string
pathPrefix string
subpath string
params url.Values
headers http.Header
namespace string
namespaceSet bool
resource string
resourceName string
subresource string
// output
err error
body io.Reader
}
|
Request
这个结构封装了大量的函数用来构造 request 来实际发起 http 请求,为了方便地使用链式调用,把不同函数的参数甚至 err 都放在了Request
结构中, RestClient
的一些初始化操作也放在了NewRequest
的时候,比如rateLimiter
和backoff
。Request
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
// 监控指标收集
start := time.Now()
defer func() {
metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
}()
if r.err != nil {
klog.V(4).Infof("Error in request: %v", r.err)
return r.err
}
if err := r.requestPreflightCheck(); err != nil {
return err
}
//如果client为空,使用http.DefaultClient
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
// 避免一个超时的请求被限流
if err := r.tryThrottle(ctx); err != nil {
return err
}
//超时处理
if r.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, r.timeout)
defer cancel()
}
// 如果返回了Retry-After,重试10次
maxRetries := 10
retries := 0
for {
url := r.URL().String()
req, err := http.NewRequest(r.verb, url, r.body)
if err != nil {
return err
}
req = req.WithContext(ctx)
req.Header = r.headers
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retries > 0 {
// 重试的请求也会被限流
if err := r.tryThrottle(ctx); err != nil {
return err
}
}
resp, err := client.Do(req)
updateURLMetrics(r, resp, err)
if err != nil {
r.backoff.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
if err != nil {
//重试只针对GET方法,因为其他方法不是幂等的
if r.verb != "GET" {
return err
}
// 连接错误或者apiserver宕机才会重试
if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
// 通过返回Retry-After来控制重试
resp = &http.Response{
StatusCode: http.StatusInternalServerError,
Header: http.Header{"Retry-After": []string{"1"}},
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}
} else {
return err
}
}
done := func() bool {
// 确保response body 被读完且关闭,这样才能复用TCP连接
// 参考http.Client源码https://github.com/golang/go/blob/master/src/net/http/client.go#L698
defer func() {
const maxBodySlurpSize = 2 << 10
if resp.ContentLength <= maxBodySlurpSize {
io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
}
resp.Body.Close()
}()
retries++
if seconds, wait := checkWait(resp); wait && retries < maxRetries {
if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
_, err := seeker.Seek(0, 0)
if err != nil {
klog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
fn(req, resp)
return true
}
}
klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
r.backoff.Sleep(time.Duration(seconds) * time.Second)
return false
}
fn(req, resp)
return true
}()
if done {
return nil
}
}
}
|
DiscoveryClient
discovery/discovery.go
1
2
3
4
5
6
|
type DiscoveryClient struct {
restClient restclient.Interface
LegacyPrefix string
}
|
k8s.io/client-go/discovery
包中的 DiscoveryClient
是用来发现 Kubernetes 所支持的 GVR(Group,Resource,Version)的,它核心的的结构也是一个rest.Interface
,同时,他也实现了DiscoveryInterface
:
1
2
3
4
5
6
7
8
9
|
type DiscoveryInterface interface {
//返回底层restClient
RESTClient() restclient.Interface
ServerGroupsInterface
ServerResourcesInterface
ServerVersionInterface
OpenAPISchemaInterface
}
|
可以看到它是由多个接口组合
而成,不难看出就是 GVR 相关的接口。文件中还有一个带缓存的 DiscoverClient 的接口定义CachedDiscoveryInterface
,它基于DiscoveryInterface
组合而成,具体在 client-go/discovery/disk
和client-go/discovery/memory
中各有一个实现,一个基于磁盘,另一个则基于内存。
discovery/cached/disk/cached_discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
type CachedDiscoveryClient struct {
delegate discovery.DiscoveryInterface
// 缓存目录,每个host:port都是唯一的
cacheDirectory string
// 缓存TTL
ttl time.Duration
// 防止并发读写
mutex sync.Mutex
// ourFiles are all filenames of cache files created by this process
// 该实例创建的所有缓存的文件名
ourFiles map[string]struct{}
// 如果为true则所有缓存文件失效
invalidated bool
// 如果都为true则所有缓存文件都是有效的
fresh bool
}
|
CachedDiscoveryClient
从磁盘读取缓存的逻辑在getCachedFile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
func (d *CachedDiscoveryClient) getCachedFile(filename string) ([]byte, error) {
// after invalidation ignore cache files not created by this process
// 如果缓存失效,或则没有缓存,返回err
d.mutex.Lock()
_, ourFile := d.ourFiles[filename]
if d.invalidated && !ourFile {
d.mutex.Unlock()
return nil, errors.New("cache invalidated")
}
d.mutex.Unlock()
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()
//获取缓存文件的描述信息
fileInfo, err := file.Stat()
if err != nil {
return nil, err
}
//缓存是否过期
if time.Now().After(fileInfo.ModTime().Add(d.ttl)) {
return nil, errors.New("cache expired")
}
//缓存有效则从文件读取
cachedBytes, err := ioutil.ReadAll(file)
if err != nil {
return nil, err
}
d.mutex.Lock()
defer d.mutex.Unlock()
d.fresh = d.fresh && ourFile
return cachedBytes, nil
}
|
discovery/cached/disk/round_tripper.go
另外,CachedDiscoveryClient
还封装了 Go 原生的http.RoundTripper
,支持了符合HTTP 标准的 HTTP cache,需要在程序启动时指定 --cache-dir
flag。
discovery/cached/memory/memcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
|
type memCacheClient struct {
delegate discovery.DiscoveryInterface
// 防止并发读写map
lock sync.RWMutex
// response缓存在map
groupToServerResources map[string]*cacheEntry
// 缓存group
groupList *metav1.APIGroupList
// 缓存是否有效
cacheValid bool
}
content/posts/2020/go-ddd-micro_services.md
|
memCacheClient
则是使用内存来缓存的,缓存的核心逻辑是refreshLocked
,主要用于更新缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
func (d *memCacheClient) refreshLocked() error {
//获取所有的ResourceGroup
gl, err := d.delegate.ServerGroups()
if err != nil || len(gl.Groups) == 0 {
utilruntime.HandleError(fmt.Errorf("couldn't get current server API group list: %v", err))
return err
}
wg := &sync.WaitGroup{}
resultLock := &sync.Mutex{}
rl := map[string]*cacheEntry{}
for _, g := range gl.Groups {
for _, v := range g.Versions {
gv := v.GroupVersion
wg.Add(1)
go func() {
defer wg.Done()
defer utilruntime.HandleCrash()
r, err := d.serverResourcesForGroupVersion(gv)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", gv, err))
}
//因为是并发去更新map,所以需要加锁保护
resultLock.Lock()
defer resultLock.Unlock()
rl[gv] = &cacheEntry{r, err}
}()
}
}
wg.Wait()
//指针赋值是原子的无需加锁
d.groupToServerResources, d.groupList = rl, gl
d.cacheValid = true
return nil
}
|
k8s.io/client-go/discovery
包中的接口和结构体的关系相对复杂,这里用一个类图可以很很好地理清楚他们之间的关系
ClientSet
1
2
3
4
5
6
7
8
9
|
type Clientset struct {
*discovery.DiscoveryClient
admissionregistrationV1 *admissionregistrationv1.AdmissionregistrationV1Client
admissionregistrationV1beta1 *admissionregistrationv1beta1.AdmissionregistrationV1beta1Client
appsV1 *appsv1.AppsV1Client
appsV1beta1 *appsv1beta1.AppsV1beta1Client
appsV1beta2 *appsv1beta2.AppsV1beta2Client
...
}
|
Clientset
的实现在 k8s.io/client-go/kubernetes
中,封装了所有 Kubernetes
内置资源的 RESTful 操作,其中每 ResourceGroup 都是基于 rest.Interface
的封装,Resource 和 Version 都是以函数形式暴露的,比起直接使用RestClient
要方便的多。 由于kubernetes
的内置资源非常多,实际上它是通过代码生成器生成的,关于代码生成器,将会在未来的文章中涉及。需要注意的是,Clientset
不支持直接访问 CRD,如果有需求,也可以通过代码生成器重新生成Clientset
。
DynamicClient
dynamic/simple.go
1
2
3
|
type dynamicClient struct {
client *rest.RESTClient
}
|
dynamicClient
的功能和Clientset
类似,它直接使用了rest.RestClient
,与Clientset
不同的是,dynamicClient
使用的是非结构化的数据,所有的返回结构都是 *unstructured.Unstructured
,在其内部实现了序列化和反序列化的操作, 所以他直接能支持 CRD。
总结
RestClient
是最基础的客户端,它是 Clientset
,DynamicClient
,DiscoveryClient
的基础。Clientset 和 DynamicClient 两个客户端的作用类似,DiscoveryClient 则提供了发现 kube-apiserv
GVR 的功能。用一个类图可以很好的体现他们之间的关系(图片比较大,可以右键新窗口打开):
参考
《Kubernetes 源码剖析》