目录

[源码阅读] Kubernetes 1.18 client-go (一) 深入理解 Client 对象

前言

近期在做KubernetesIstio微服务治理的落地项目,随着项目的推进,working on learning,对于KubernetesIstio算是入了门。机缘巧合之下,加入了云原生社区的 Kubernetes 源码研习社,一起学习 kubernetes 源码,希望能对于 Kubernetes 有更加深入的理解。

本系列文章将参考郑东旭老师《Kubernetes 源码剖析》一书的目录结构阅读。

文中涉及代码及版本为
1
git clone -b tags/kubernetes-1.18.6 https://github.com/kubernetes/client-go.git --depth=1
如涉及到编译和 Go 语言相关
1
2
➜ go version
go version go1.14.6 linux/amd64

概览

我们在使用 Go 基于 k8s 做二次开发时,client-go是必不可少的依赖库,它封装了所有 与kube-apiserver交互的操作,可以理解成 kube-apiserver 的 sdk。

在我们拉取 kubernetes 主仓库源码时,在vendor/k8s.io 目录下有一个 client-go,这个 client-go 是通过 git subtree 方式引用 kubernetes/client-go 的,从历史提交记录来看,我猜测它应该在在go moudle诞生之前作为 vendor 依赖包用的,而目前 kubernetes 已支持 go module,go.mod 中则通过replace引用 staging/目录下的k8s.io/client-go,stage 目录下的该包会定时同步到 kubernetes/client-go。不过,这些都不重要,无论哪里的代码,只要是同一个版本,其实都是一样的代码。

我们在阅读源码之前,我们熟悉下 client-go 的目录结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
➜ tree -d  -L 1
.
├── deprecated // 要废弃的代码
├── discovery // DiscoveryClient 相关,用于发现kube-apiserver所支持的资源组,资源版本,资源信息
├── dynamic // DynamicClient 相关,用于
├── examples // client-go 的一些example
├── Godeps  // 使用 Godep 作为包管理时的相关文件
├── informers // 每种 kubernetes 资源的 Informer 的实现
├── kubernetes // ClientSet 相关,包含了所有group的client
├── kubernetes_test // 只包含有一个超时的单元测试函数,一个很奇怪的目录
├── listers // 为每个 Kubernetes 资源提供的 Lister功能,该功能对 Get 和 List 请求提供只读的缓存数据
├── metadata //GVR的interface定义
├── pkg // 包含一些导出的
├── plugin // 提供一些云厂商插件
├── rest // RestClient,封装了基础的 restful 操作
├── restmapper
├── scale // ScaleClient,用于deployment、replicaSet等资源的扩缩容
├── testing // 测试相关的包
├── third_party //第三方包,目前只是包含了从go标准库`text/template`中的部分私有代码
├── tools
├── transport //提供安全的TCP连接,支持http Stream,用于一些需要传输二进制流的场景,例如exec,attach等操作
└── util //工具包,包含 WorkQueue 工作队列,Certificate证书等操作

Client 对象

k8s.io/client-go 中有几个最重要的 Client 对象

  • RestClient
  • DiscoveryCient
  • Clientset
  • DynamicClient

他们提供了各种与kube-apiserver交互的功能,本文也将主要阅读这几个 Client 的源码

RestClient

rest/client.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type RESTClient struct {
    // net/url对象,可以方便地处理url的scheme,host,path,query等
    base *url.URL

    // url中的vxxx,比如v1/
	versionedAPIPath string

    // 包含序列化和反序列化的配置,比如配置Content-Type来设置body时返回yaml或者json
	content ClientContentConfig

    // RestClient的重试机制,BackoffManager 是一个interface
    // 默认是不带Backoff的,可以自己实现重试机制,
    // 或者用rest包中自带的URLBackoff,它使引用flowcontrol包中
    // 基于指数退避算法的Backoff
	createBackoffMgr func() BackoffManager

    // 限流器,作用于所有使用RestClient实例化的对象发起的请求
    // 它是一个interface,可以自己写一个实现该接口的限流器,
    // 当然也可以用flowcontrol包中实现的令牌桶限流tokenBucketRateLimiter,
    // 它使用golang.org/x/time/rate实现
	rateLimiter flowcontrol.RateLimiter

    // 如果想使用自己定制的http.Client,赋值给它就行
    // 如果是nil,那么将使用默认的http.DefaultClient
	Client *http.Client
}

k8s.io/client-go/rest 包的RestClient 基于 Go 标准库封的http.Client封装了 RESTful API 相关约定的实现,所有对于 kube-apiserver 的 RESTful 请求都基于 RestClient,它实现了rest.Interface:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Interface interface {
	GetRateLimiter() flowcontrol.RateLimiter
	Verb(verb string) *Request
	Post() *Request
	Put() *Request
	Patch(pt types.PatchType) *Request
	Get() *Request
	Delete() *Request
	APIVersion() schema.GroupVersion
}

实际上,client-go 中基于 RestClient 的 Client,都是基于 rest.Interface 而不是 RestClient 本身,理论上你可以自己实现一个 RestClient 来替代,这也充分体现了 Go 语言提倡的面向接口编程的思想。rest.Interface的定义本身没啥难以理解的,但是其中Verb函数返回的Request却值得一看:

rest/request.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

type Request struct {
	c *RESTClient

    //限流器
    rateLimiter flowcontrol.RateLimiter
    //重试机制
    backoff     BackoffManager
    //超时
	timeout     time.Duration

	verb       string
	pathPrefix string
	subpath    string
	params     url.Values
	headers    http.Header

	namespace    string
	namespaceSet bool
	resource     string
	resourceName string
	subresource  string

	// output
	err  error
	body io.Reader
}

Request这个结构封装了大量的函数用来构造 request 来实际发起 http 请求,为了方便地使用链式调用,把不同函数的参数甚至 err 都放在了Request结构中, RestClient的一些初始化操作也放在了NewRequest的时候,比如rateLimiterbackoff。Request

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
	// 监控指标收集
	start := time.Now()
	defer func() {
		metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
	}()

	if r.err != nil {
		klog.V(4).Infof("Error in request: %v", r.err)
		return r.err
	}

	if err := r.requestPreflightCheck(); err != nil {
		return err
	}

    //如果client为空,使用http.DefaultClient
	client := r.c.Client
	if client == nil {
		client = http.DefaultClient
	}


    // 避免一个超时的请求被限流
	if err := r.tryThrottle(ctx); err != nil {
		return err
	}

    //超时处理
	if r.timeout > 0 {
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, r.timeout)
		defer cancel()
	}

    // 如果返回了Retry-After,重试10次
	maxRetries := 10
	retries := 0
	for {

		url := r.URL().String()
		req, err := http.NewRequest(r.verb, url, r.body)
		if err != nil {
			return err
		}
		req = req.WithContext(ctx)
		req.Header = r.headers

		r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
		if retries > 0 {
            // 重试的请求也会被限流
			if err := r.tryThrottle(ctx); err != nil {
				return err
			}
		}
		resp, err := client.Do(req)
		updateURLMetrics(r, resp, err)
		if err != nil {
			r.backoff.UpdateBackoff(r.URL(), err, 0)
		} else {
			r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
		}
		if err != nil {

            //重试只针对GET方法,因为其他方法不是幂等的
			if r.verb != "GET" {
				return err
			}
            // 连接错误或者apiserver宕机才会重试
			if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
                // 通过返回Retry-After来控制重试
				resp = &http.Response{
					StatusCode: http.StatusInternalServerError,
					Header:     http.Header{"Retry-After": []string{"1"}},
					Body:       ioutil.NopCloser(bytes.NewReader([]byte{})),
				}
			} else {
				return err
			}
		}

		done := func() bool {
            // 确保response body 被读完且关闭,这样才能复用TCP连接
            // 参考http.Client源码https://github.com/golang/go/blob/master/src/net/http/client.go#L698
			defer func() {
				const maxBodySlurpSize = 2 << 10
				if resp.ContentLength <= maxBodySlurpSize {
					io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
				}
				resp.Body.Close()
			}()

			retries++
			if seconds, wait := checkWait(resp); wait && retries < maxRetries {
				if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
					_, err := seeker.Seek(0, 0)
					if err != nil {
						klog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
						fn(req, resp)
						return true
					}
				}

				klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
				r.backoff.Sleep(time.Duration(seconds) * time.Second)
				return false
			}
			fn(req, resp)
			return true
		}()
		if done {
			return nil
		}
	}
}

DiscoveryClient

discovery/discovery.go

1
2
3
4
5
6

type DiscoveryClient struct {
	restClient restclient.Interface

	LegacyPrefix string
}

k8s.io/client-go/discovery包中的 DiscoveryClient 是用来发现 Kubernetes 所支持的 GVR(Group,Resource,Version)的,它核心的的结构也是一个rest.Interface,同时,他也实现了DiscoveryInterface:

1
2
3
4
5
6
7
8
9
type DiscoveryInterface interface {
    //返回底层restClient
	RESTClient() restclient.Interface
	ServerGroupsInterface
	ServerResourcesInterface
	ServerVersionInterface
	OpenAPISchemaInterface
}

可以看到它是由多个接口组合而成,不难看出就是 GVR 相关的接口。文件中还有一个带缓存的 DiscoverClient 的接口定义CachedDiscoveryInterface,它基于DiscoveryInterface组合而成,具体在 client-go/discovery/diskclient-go/discovery/memory中各有一个实现,一个基于磁盘,另一个则基于内存。

discovery/cached/disk/cached_discovery.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
type CachedDiscoveryClient struct {
	delegate discovery.DiscoveryInterface

    // 缓存目录,每个host:port都是唯一的
	cacheDirectory string

	// 缓存TTL
    ttl time.Duration

    // 防止并发读写
	mutex sync.Mutex

    // ourFiles are all filenames of cache files created by this process
    // 该实例创建的所有缓存的文件名
	ourFiles map[string]struct{}
	// 如果为true则所有缓存文件失效
	invalidated bool
	// 如果都为true则所有缓存文件都是有效的
	fresh bool
}

CachedDiscoveryClient从磁盘读取缓存的逻辑在getCachedFile

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (d *CachedDiscoveryClient) getCachedFile(filename string) ([]byte, error) {
    // after invalidation ignore cache files not created by this process
    // 如果缓存失效,或则没有缓存,返回err
	d.mutex.Lock()
	_, ourFile := d.ourFiles[filename]
	if d.invalidated && !ourFile {
		d.mutex.Unlock()
		return nil, errors.New("cache invalidated")
	}
	d.mutex.Unlock()

	file, err := os.Open(filename)
	if err != nil {
		return nil, err
	}
	defer file.Close()
    //获取缓存文件的描述信息
	fileInfo, err := file.Stat()
	if err != nil {
		return nil, err
	}
    //缓存是否过期
	if time.Now().After(fileInfo.ModTime().Add(d.ttl)) {
		return nil, errors.New("cache expired")
	}

    //缓存有效则从文件读取
	cachedBytes, err := ioutil.ReadAll(file)
	if err != nil {
		return nil, err
	}

	d.mutex.Lock()
	defer d.mutex.Unlock()
	d.fresh = d.fresh && ourFile

	return cachedBytes, nil
}

discovery/cached/disk/round_tripper.go

另外,CachedDiscoveryClient 还封装了 Go 原生的http.RoundTripper,支持了符合HTTP 标准的 HTTP cache,需要在程序启动时指定 --cache-dir flag。

discovery/cached/memory/memcache.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type memCacheClient struct {
	delegate discovery.DiscoveryInterface

    //  防止并发读写map
    lock                   sync.RWMutex
    // response缓存在map
    groupToServerResources map[string]*cacheEntry
    // 缓存group
    groupList              *metav1.APIGroupList
    // 缓存是否有效
	cacheValid             bool
}
content/posts/2020/go-ddd-micro_services.md

memCacheClient则是使用内存来缓存的,缓存的核心逻辑是refreshLocked,主要用于更新缓存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (d *memCacheClient) refreshLocked() error {
	//获取所有的ResourceGroup
	gl, err := d.delegate.ServerGroups()
	if err != nil || len(gl.Groups) == 0 {
		utilruntime.HandleError(fmt.Errorf("couldn't get current server API group list: %v", err))
		return err
	}

    wg := &sync.WaitGroup{}
	resultLock := &sync.Mutex{}
	rl := map[string]*cacheEntry{}
	for _, g := range gl.Groups {
		for _, v := range g.Versions {
			gv := v.GroupVersion
			wg.Add(1)
			go func() {
				defer wg.Done()
				defer utilruntime.HandleCrash()

				r, err := d.serverResourcesForGroupVersion(gv)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", gv, err))
				}
                //因为是并发去更新map,所以需要加锁保护
				resultLock.Lock()
				defer resultLock.Unlock()
				rl[gv] = &cacheEntry{r, err}
			}()
		}
	}
    wg.Wait()

    //指针赋值是原子的无需加锁
	d.groupToServerResources, d.groupList = rl, gl
	d.cacheValid = true
	return nil
}

k8s.io/client-go/discovery 包中的接口和结构体的关系相对复杂,这里用一个类图可以很很好地理清楚他们之间的关系

/svgs/kubernetes-client-go-discovery.svg

ClientSet

1
2
3
4
5
6
7
8
9
type Clientset struct {
	*discovery.DiscoveryClient
	admissionregistrationV1      *admissionregistrationv1.AdmissionregistrationV1Client
	admissionregistrationV1beta1 *admissionregistrationv1beta1.AdmissionregistrationV1beta1Client
	appsV1                       *appsv1.AppsV1Client
	appsV1beta1                  *appsv1beta1.AppsV1beta1Client
	appsV1beta2                  *appsv1beta2.AppsV1beta2Client
    ...
}

Clientset 的实现在 k8s.io/client-go/kubernetes中,封装了所有 Kubernetes 内置资源的 RESTful 操作,其中每 ResourceGroup 都是基于 rest.Interface的封装,Resource 和 Version 都是以函数形式暴露的,比起直接使用RestClient要方便的多。 由于kubernetes的内置资源非常多,实际上它是通过代码生成器生成的,关于代码生成器,将会在未来的文章中涉及。需要注意的是,Clientset不支持直接访问 CRD,如果有需求,也可以通过代码生成器重新生成Clientset

DynamicClient

dynamic/simple.go

1
2
3
type dynamicClient struct {
	client *rest.RESTClient
}

dynamicClient的功能和Clientset类似,它直接使用了rest.RestClient,与Clientset不同的是,dynamicClient使用的是非结构化的数据,所有的返回结构都是 *unstructured.Unstructured,在其内部实现了序列化和反序列化的操作, 所以他直接能支持 CRD。

总结

RestClient 是最基础的客户端,它是 Clientset,DynamicClient,DiscoveryClient 的基础。Clientset 和 DynamicClient 两个客户端的作用类似,DiscoveryClient 则提供了发现 kube-apiservGVR 的功能。用一个类图可以很好的体现他们之间的关系(图片比较大,可以右键新窗口打开):

/svgs/kubernetes-client-go-client-all.svg

参考

《Kubernetes 源码剖析》