KPNG:下一代Kube-Proxy

kpng是社区新设计开发的下一代Kube-Proxy,目前仍然在积极开发中。
官方repo:https://github.com/kubernetes-sigs/kpng
kep:https://github.com/kubernetes/enhancements/pull/2094

简介

kpng主要目的是:

  • 解耦Kubernetes的业务逻辑(比如serivce的externalTrafficPolicy、internalTrafficPolicy之类逻辑)与后端proxy的实现,以实现一套通用的框架。
    对于当前的Kube-Proxy,如果要自己开发其他类型的proxy,需要实现自己的proxier来实现Provider 接口,而Provider接口包含了对Service、Endpoint、Node资源的处理逻辑,这部分对于所有proxier来说都是重复的代码。同时,不同proxier内部也存在某些子系统的重复逻辑,比如conntrack模块。
  • 提高可扩展性,支撑更大规模的集群。目前Kube-Proxy的扩展需要增加对Kube-APIServer的watch请求,不利于规模的扩大。

kpng的主要思路如下:

[k8s API] ----> [local model] ----> [plugin] ----> [aggregator] ----> [subsystem]

1)根据k8s API的资源,构建一个本地期望状态ENLS(expected node-local state),只有当ENLS发生变化的时候,后端plugin才会触发操作。
相比Kube-Proxy直接监听Kube-APIServer的event,这种方式能减少无效触发(不改变ENLS的API event)。引用社区在1K service、1.5K pod下的测试。其中rev是ENLS发送变化的event数量,events是API events。

stats:	time	events	rev	usr cpu	sys cpu	tot cpu	mem	revs/events
stats:	ms	count	count	ms	ms	%	MiB	%
stats:	0	0	0	0	0	+Inf	1.22	NaN
stats:	1000	2134	1064	3	308	31.280	3.93	49.859
stats:	2000	2137	1064	0	1	0.131	3.94	49.789
stats:	3000	2138	1064	0	0	0.050	3.94	49.766
[...]
stats:	298000	21785	1067	0	0	0.050	4.16	4.898
stats:	299000	21787	1067	0	0	0.089	4.16	4.897
stats:	300000	21788	1067	0	0	0.061	4.17	4.897
stats:	301000	23925	1068	4	64	6.882	3.35	4.464 # GC after 2k+ events (without a proxy-related change)
stats:	302000	23926	1068	0	0	0.052	3.35	4.464
[...]
stats:	1798000	130941	1218	0	0	0.090	4.68	0.930
stats:	1799000	130942	1218	0	0	0.055	4.69	0.930


2)aggregate聚合多个组件的对子系统的操作,最终进行统一的提交。比如calico、Kube-Proxy、hostport cni等对iptables的操作,统一由aggregate进行操作的合并,避免相互之间的锁竞争。

KPNG的设计

由于KPNG目前仍在开发中,最终的技术方案可能会有变动,暂时使用最新版本 的实现来介绍。KPNG架构图如下。

首先是上半Server部分,核心是存储了Services、Endpoints、Nodes信息的proxystore.Store,底层是一个B树,proxystore.Store的输入来源可以是其他kpng的gRPC API(api2store)、Kube-APIServer(kube2store)以及本地文件(file2store),对外提供访问的方式可以是gRPC server(store2api)、本地内存(store2localdiff)、本地文件(store2file)。代码实现都在server/jobs/xxx2store以及server/jobs/store2xxx下。

下半Client部分,获取数据并交于backend进行处理,backend可以是简单的日志打印,或是实现iptables、ipvs、ebpf规则等。获取数据方式目前有两种,一种是gRPC,对应上半部分的store2api;另一种是内存获取,对应store2localdiff。两者都会调用localsink.Sinkfunc Send(op *OpItem) error,将相应的变更操作传递给后面的逻辑进行处理。

KPNG使用

官方博客里的例子 是通过gRPC的方式实现资源变更的打印,backend的实现在代码库的examples/pipe-exec/cmd/kpng-json/main.go中。

除此外,这里给出iptables模式的KPNG部署。首先在代码库的根目录下生成镜像。

docker build -t kpng:latest .

创建如下的daemonset:

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: kpng-proxy
  namespace: kube-system
spec:
  selector:
    matchLabels:
      app: kpng-proxy
  template:
    metadata:
      labels:
        app: kpng-proxy
    spec:
      # 偷懒,直接使用Kube-Proxy的sa
      serviceAccount: kube-proxy
      hostNetwork: true
      priorityClassName: system-node-critical
      nodeSelector:
        kubernetes.io/os: linux
      containers:
      - name: kpng
        image: kpng:latest
        imagePullPolicy: IfNotPresent
        command:
        - kpng
        - kube
        - --service-proxy-name=kpng-example
        - to-local
        - to-iptables
        securityContext:
          privileged: true

接着创建带有service.kubernetes.io/service-proxy-name: kpng-example标签的service,就能看到在iptables里生成对应的规则了。(如果不确定是Kube-Proxy生成的iptables规则还是kpng,可以在测试前,先把Kube-Proxy切换为ipvs模式或删除)。
上面例子中,kube子命令指定了通过kube2store监听kube-apiserver来获取proxystore.Storeto-local子命令指定了通过store2localdiffproxystore.Store变更增量发送到后端,to-iptables子命令指定使用后端iptables

KPNG代码粗读

首先,kpng代码里实现了很多种名叫storesinkjob的类。总的来说:

  • job实现数据的处理与传递。比如server/jobs/kube2store/kube2store.go中的Job实现数据从Kube-APIServer(通过List-Watch)到proxystore.Store的传递;server/jobs/store2diff/store2diff.go中的Job实现数据从proxystore.Storestore2diff.Sink的传递。
  • store用于存储数据,一般底层都是B树,主要的包括存储Services、Endpoints、Nodes信息的proxystore.Store,能提供数据变化跟踪的lightdiffstore.DiffStorediffstore.Store
  • sink提供一些接口,用于接受job数据的传递,一般定义为interface。比如上面提到的client/localsink/localsink.go中的localsink.Sinksink的实现也可能会是另一个job

代码结构

主要的代码包括:

  • api/:grpc api的定义,Services、Endpoints、Nodes资源信息的api定义。
  • backends/:后端实现,都实现了client/backendcmd中的Cmd interface,使用client/backendcmd中的Register()注册,然后就能通过子命令调用对应的后端。
    func init() {
    	backendcmd.Register("to-iptables", func() backendcmd.Cmd { return &Backend{} })
    }
  • client/:内容比较杂,包含了client用到的storesink等结构。
  • cmd/kpng/:main程序代码
  • from-k8s/:从kubernetes代码库中移植过来的代码
  • server/
    • server/jobs:包含了server端xxx2storestore2xxx的各种job
    • server/proxystore:定义了基本的存储结构proxystore.Store

接下来分别以gRPC server + examples/print-state的用例和local server + iptables backend的用例介绍代码流程,两个用例能覆盖大多数核心代码,对KPNG有个大体的了解。

gPRC Server

当执行kpng kube to-api时,kpng会监听Kube-APIServer,并启动gRPC server。

kube子命令

kube子命令的定义:

// cmd/kpng/k2s.go
func kube2storeCmd() *cobra.Command {
	// kube to * command
	k2sCmd := &cobra.Command{
		Use:   "kube",
		Short: "watch Kubernetes API to the globalv1 state",
	}
	...

	// setupKube2store定义了从kube-apiserver到store的逻辑
	// storecmds.Commands为kube子命令添加store到local\gRPC\file的逻辑
	k2sCmd.AddCommand(storecmds.Commands(setupKube2store)...)

	return k2sCmd
}

func setupKube2store() (ctx context.Context, store *proxystore.Store, err error) {
	...
	// 创建新的proxystore.store
	store = proxystore.New()

	// kube2stroe job是Kube-APIServer到store主要逻辑的实现
	go kube2store.Job{
		Kube:   kubeClient,
		Store:  store,
		Config: k2sCfg,
	}.Run(ctx)

	return
}

kube2store.Job的实现在server/jobs/kube2store下,server/jobs下也包含其他与store相关的job,大同小异。

// server/jobs/kube2store/kube2store.go
func (j Job) Run(ctx context.Context) {
	...
	// service的informer factory,list时只选择label service.kubernetes.io/service-proxy-name的值与kpng ServiceProxyName相同的service
	// 由于informer factory的ListOptions是对所有资源都进行过滤的,因此后面还创建了一个coreFactory
	svcFactory := informers.NewSharedInformerFactoryWithOptions(j.Kube, time.Second*30,
		informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = labelSelector }))
	...
	// start watches
	coreFactory := factory.Core().V1()
	
	...
	//启动service、nodes、endpointSlices的List-watch
	servicesInformer.AddEventHandler(&serviceEventHandler{j.eventHandler(servicesInformer)})
	go servicesInformer.Run(stopCh)

	nodesInformer := coreFactory.Nodes().Informer()
	nodesInformer.AddEventHandler(&nodeEventHandler{j.eventHandler(nodesInformer)})
	go nodesInformer.Run(stopCh)

	slicesInformer := factory.Discovery().V1().EndpointSlices().Informer()
	slicesInformer.AddEventHandler(&sliceEventHandler{j.eventHandler(slicesInformer)})
	go slicesInformer.Run(stopCh)
	...
}

上面的各个eventHandler最终根据List-watch的事件,更新store,以add service event为例。

// server/jobs/kube2store/service-event-handler.go
func (h *serviceEventHandler) OnAdd(obj interface{}) {
	h.onChange(obj)
}

func (h *serviceEventHandler) onChange(obj interface{}) {
	...
	// 把corev1.service转换为kpng api中定义的localv1.Service,用于保存在store中
	service := &localv1.Service{
		Namespace:   svc.Namespace,
		Name:        svc.Name,
		Type:        string(svc.Spec.Type),
		Labels:      globsFilter(svc.Labels, h.config.ServiceLabelGlobs),
		Annotations: globsFilter(svc.Annotations, h.config.ServiceAnnonationGlobs),
		IPs: &localv1.ServiceIPs{
			ClusterIPs:  &localv1.IPSet{},
			ExternalIPs: localv1.NewIPSet(svc.Spec.ExternalIPs...),
		},
		ExternalTrafficToLocal: svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal,
		InternalTrafficToLocal: internalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal,
	}
	...
	// 存储到store中,proxystore.Tx是在proxystore.Store上封装了一层,前者能把service、nodes、endpoint转成kv,存储到proxystore.Store中
	// Update()传入闭包,操作proxystore.Tx
	h.s.Update(func(tx *proxystore.Tx) {
		...
		tx.SetService(service)
	})
}

到此,kube子命令的功能就结束了。

to-api子命令

上面有提到,为kube子命令添加store到local、gRPC、file的逻辑,都是在storecmds.Commands中实现的。

// cmd/kpng/storecmds/storecmds.go
func (c SetupFunc) ToAPICmd() *cobra.Command {
	cmd := &cobra.Command{
		Use: "to-api",
	}
	...
	cmd.RunE = func(_ *cobra.Command, _ []string) (err error) {
		...
		// 从xxx2store对应的SetupFunc中获取store
		ctx, store, err := c()
		// store2api.Job实现将proxystore.Store暴露给gRPC server上
		j := &store2api.Job{
			Store:  store,
			Config: cfg,
		}
		return j.Run(ctx)
	}
	...
}

// server/jobs/store2api/store2api.go
func (j *Job) Run(ctx context.Context) error {
	lis := server.MustListen(j.Config.BindSpec)

	// 创建gRPC服务
	...
		srv = grpc.NewServer()
		
	// 启动global API与endpoint API的server
	if j.Config.GlobalAPI {
		global.Setup(srv, j.Store)
	}
	if j.Config.LocalAPI {
		endpoints.Setup(srv, j.Store)
	}
	
	return srv.Serve(lis)
}

endpoints.Setup()为例,注册的gRPC server设置如下:

// api/localv1/api_grpc.pb.go

func RegisterSetsServer(s grpc.ServiceRegistrar, srv SetsServer) {
	// 当客户端请求gRPC server时,会使用srv调用_Sets_Watch_Handler()方法
	s.RegisterService(&Sets_ServiceDesc, srv)
}

func _Sets_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
	// 调用SetsServer的Watch方法,传入setsWatchServer,用于信息的接受与发送
	return srv.(SetsServer).Watch(&setsWatchServer{stream})
}

// setsWatchServer将grpc.ServerStream封装为Send()与Recv()方法
type setsWatchServer struct {
	grpc.ServerStream
}

func (x *setsWatchServer) Send(m *OpItem) error {
	return x.ServerStream.SendMsg(m)
}

func (x *setsWatchServer) Recv() (*WatchReq, error) {
	if err := x.ServerStream.RecvMsg(m); err != nil {
		...
	}
	...
}

var Sets_ServiceDesc = grpc.ServiceDesc{
	...
	Streams: []grpc.StreamDesc{
		{
		...
			Handler:       _Sets_Watch_Handler,
		},
	},
}

可以看出gRPC Server接收的是WatchReq,而发送的OpItemOpItem有点像List-watch的event,但只包括set、delete、reset、sync四种,set、delete用来告诉客户端资源有哪些增量的变化,sync表示最新的增量已传输完毕,告诉客户端可以进行backend规则的同步了,reset一般用在gRPC重连的情况。
SetsServer是个interface,实现是在server/pkg/server/endpoints/server.go里的Server

func (s *Server) Watch(res localv1.Sets_WatchServer) error {
	...
	// store2localdiff会根据Store计算资源的变化,通过Sink的Send方法将增量发送
	job := &store2localdiff.Job{
		Store: s.Store,
		// serverSink继承了localv1.Sets_WatchServer的Send方法,也就是上面setsWatchServer的Send方法,因此最终通过gRPC发到Client端
		Sink:  serverSink{res, remote},
	}
	...
	return job.Run(res.Context())
}

diff

store2localdiff.Job.Run()内部调用了store2diff.Job.Run(),这块是实现资源变更的增量发送逻辑,即每次只将资源变更的增量通过localsink.Sink.Send()发送,不发送资源的全部信息。

diff这块代码较多,简言之就是:

  • 资源变更的追踪是依靠client/lightdiffstore中的DiffStore完成的,其本质是一个B树,在value中记录了此value在上一次Reset之后的状态(changed or unchanged)。而server/pkg/server/watchstate中的WatchStateDiffStore上封装一层,可以将DiffStore diff的结果通过localv1.OpSink.Send()发送出去。
  • store2diff.Job.Run()会先创建WatchState。然后,在每一轮循环中,先通过store2diff.Sink.Update()proxystore.Store中更新WatchState;再通过store2diff.Sink.SendDiff()发送diff的结果;当diff结果发送完成后,在发送sync的OpItem,表示发送结束,可以同步。
    // server/jobs/store2diff/store2diff.go
    func (j *Job) Run(ctx context.Context) (err error) {
    	// 创建watchstate
    	w := watchstate.New(j.Sink, j.Sets)
    	...
    	for {
    		...
    		for !updated {
    			// block until the revision has been
    			// incremented... then, we update our state from the
    			// proxystore
    			rev, closed = j.Store.View(rev, func(tx *proxystore.Tx) {
    				j.Sink.Update(tx, w)
    			})
    			...
    			// send the diff
    			updated = j.Sink.SendDiff(w)
    		}
    
    		// signal the change set is fully sent
    		w.SendSync()
    
    		if w.Err != nil {
    			return w.Err
    		}
    	}
  • store2localdiff.Job实现了store2diff.Sink接口,其中在SendDiff()中,发送完diff结果后,会调用WatchStateReset()进行资源状态的重置。

到此为止,整个gRPC Server的流程就结束了。

gRPC Client

在kpng中,要实现一个自定义的gRPC Client逻辑只要调用client.Run(func),将实现func传入即可。以examples/print-state为例,其实现了接收内容的打印。

// examples/print-state/main.go
func main() {
	client.Run(printState)
}

// ServiceEndpoints结构记录了一个Service与它对应的本节点的Endpoint
func printState(items []*client.ServiceEndpoints) {
	fmt.Println("# ------------------------------------------------------------------------")
	fmt.Println("#", time.Now())
	fmt.Println("#")
	for _, item := range items {
		fmt.Fprintln(os.Stdout, item)
	}
}

client.Run使用了fullstate.Sinkfullstate.Sink内部有个B树结构,存储gRPC客户端接受到的数据,然后每次在收到Server端发送的sync信号后,将全量的数据发给回调函数,也就是传入client.Run()的自定义逻辑。

到此为止,整个gRPC Server + gRPC Client的模式就结束了。

local Server

除了上面的gRPC的方式外,还有就是上面iptable用例中,kpng kube to-local to-iptables命令通过内存的方式传递数据。Server和Client会编译到一个二进制文件中。

to-local

to-local接kube子命令,将proxystore.Store中的资源通过store2localdiff.Job处理后发送给下一个localsink.Sink的实现。

// cmd/kpng/storecmds/storecmds.go
func (c SetupFunc) ToLocalCmd() (cmd *cobra.Command) {
	cmd = &cobra.Command{
		Use: "to-local",
	}

	job := &store2localdiff.Job{}
	...
	cmd.AddCommand(LocalCmds(func(sink localsink.Sink) error {
		job.Sink = sink
		return job.Run(ctx)
	})...)

	return
}

backend

上面有介绍,backend都是通过backendcmd.Register方法注册,然后在LocalCmds()创建相应的命令。

// cmd/kpng/storecmds/storecmds.go
func LocalCmds(run func(sink localsink.Sink) error) (cmds []*cobra.Command) {
	// 获取所有注册的backend
	for _, useCmd := range backendcmd.Registered() {
		backend := useCmd.New()

		cmd := &cobra.Command{
			Use: useCmd.Use,
			RunE: func(_ *cobra.Command, _ []string) error {
			// 获取backend的Sink,用于在to-local子命令创建的store2localdiff.Job中获取diff结果
				return run(backend.Sink())
			},
		}
		...
	}
}

以iptable为例,Sink()方法如下:

// backends/iptables/sink.go
func (s *Backend) Sink() localsink.Sink {
	return filterreset.New(pipe.New(decoder.New(s), decoder.New(conntrack.NewSink())))
}

Server端发送的diff结果,流向如下:

                                    | --> decoder.sink --> iptables
                                    |
diff --> filterreset.sink --> pipe.sink --> decoder.sink --> conntrack

可以看到kpng中真的是有各种各样的Sink:pipe.Sink将数据发送给多个目的地;decoder.Sink将raw数据转换成service、endpoint、node API;filterreset.Sink是相对于上面介绍的fullstate.Sink来说的,不会将全量数据往后发送,而是过滤掉之前发送过且无变化的资源,相当于是发送增量数据。

如上图所示,资源数据最后到达backend,由backend配置数据路径。kpng中的iptables、ipvs等实现主要是搬的Kube-Proxy代码,同时还在开发eBPF的数据路径。

最后

除了上面的介绍外,kpng项目中还有很多值得一看的亮点。比如kpng在diffstore.Store中用到了go 1.18新支持的范型;代码库中的doc/service-proxy.md是一篇非常值得一读的Kube-Proxy设计思想的文档。