KPNG:下一代Kube-Proxy
kpng是社区新设计开发的下一代Kube-Proxy,目前仍然在积极开发中。
官方repo:https://github.com/kubernetes-sigs/kpng
kep:https://github.com/kubernetes/enhancements/pull/2094
简介
kpng主要目的是:
- 解耦Kubernetes的业务逻辑(比如serivce的externalTrafficPolicy、internalTrafficPolicy之类逻辑)与后端proxy的实现,以实现一套通用的框架。
对于当前的Kube-Proxy,如果要自己开发其他类型的proxy,需要实现自己的proxier来实现Provider 接口,而Provider接口包含了对Service、Endpoint、Node资源的处理逻辑,这部分对于所有proxier来说都是重复的代码。同时,不同proxier内部也存在某些子系统的重复逻辑,比如conntrack模块。 - 提高可扩展性,支撑更大规模的集群。目前Kube-Proxy的扩展需要增加对Kube-APIServer的watch请求,不利于规模的扩大。
kpng的主要思路如下:
[k8s API] ----> [local model] ----> [plugin] ----> [aggregator] ----> [subsystem]
1)根据k8s API的资源,构建一个本地期望状态ENLS(expected node-local state),只有当ENLS发生变化的时候,后端plugin才会触发操作。
相比Kube-Proxy直接监听Kube-APIServer的event,这种方式能减少无效触发(不改变ENLS的API event)。引用社区在1K service、1.5K pod下的测试。其中rev是ENLS发送变化的event数量,events是API events。
stats: time events rev usr cpu sys cpu tot cpu mem revs/events
stats: ms count count ms ms % MiB %
stats: 0 0 0 0 0 +Inf 1.22 NaN
stats: 1000 2134 1064 3 308 31.280 3.93 49.859
stats: 2000 2137 1064 0 1 0.131 3.94 49.789
stats: 3000 2138 1064 0 0 0.050 3.94 49.766
[...]
stats: 298000 21785 1067 0 0 0.050 4.16 4.898
stats: 299000 21787 1067 0 0 0.089 4.16 4.897
stats: 300000 21788 1067 0 0 0.061 4.17 4.897
stats: 301000 23925 1068 4 64 6.882 3.35 4.464 # GC after 2k+ events (without a proxy-related change)
stats: 302000 23926 1068 0 0 0.052 3.35 4.464
[...]
stats: 1798000 130941 1218 0 0 0.090 4.68 0.930
stats: 1799000 130942 1218 0 0 0.055 4.69 0.930
2)aggregate
聚合多个组件的对子系统的操作,最终进行统一的提交。比如calico、Kube-Proxy、hostport cni等对iptables的操作,统一由aggregate
进行操作的合并,避免相互之间的锁竞争。
KPNG的设计
由于KPNG目前仍在开发中,最终的技术方案可能会有变动,暂时使用最新版本 的实现来介绍。KPNG架构图如下。
首先是上半Server部分,核心是存储了Services、Endpoints、Nodes信息的proxystore.Store
,底层是一个B树,proxystore.Store
的输入来源可以是其他kpng的gRPC API(api2store)、Kube-APIServer(kube2store)以及本地文件(file2store),对外提供访问的方式可以是gRPC server(store2api)、本地内存(store2localdiff)、本地文件(store2file)。代码实现都在server/jobs/xxx2store
以及server/jobs/store2xxx
下。
下半Client部分,获取数据并交于backend进行处理,backend可以是简单的日志打印,或是实现iptables、ipvs、ebpf规则等。获取数据方式目前有两种,一种是gRPC,对应上半部分的store2api
;另一种是内存获取,对应store2localdiff
。两者都会调用localsink.Sink
的func Send(op *OpItem) error
,将相应的变更操作传递给后面的逻辑进行处理。
KPNG使用
官方博客里的例子 是通过gRPC的方式实现资源变更的打印,backend的实现在代码库的examples/pipe-exec/cmd/kpng-json/main.go中。
除此外,这里给出iptables模式的KPNG部署。首先在代码库的根目录下生成镜像。
docker build -t kpng:latest .
创建如下的daemonset:
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: kpng-proxy
namespace: kube-system
spec:
selector:
matchLabels:
app: kpng-proxy
template:
metadata:
labels:
app: kpng-proxy
spec:
# 偷懒,直接使用Kube-Proxy的sa
serviceAccount: kube-proxy
hostNetwork: true
priorityClassName: system-node-critical
nodeSelector:
kubernetes.io/os: linux
containers:
- name: kpng
image: kpng:latest
imagePullPolicy: IfNotPresent
command:
- kpng
- kube
- --service-proxy-name=kpng-example
- to-local
- to-iptables
securityContext:
privileged: true
接着创建带有service.kubernetes.io/service-proxy-name: kpng-example
标签的service,就能看到在iptables里生成对应的规则了。(如果不确定是Kube-Proxy生成的iptables规则还是kpng,可以在测试前,先把Kube-Proxy切换为ipvs模式或删除)。
上面例子中,kube
子命令指定了通过kube2store
监听kube-apiserver来获取proxystore.Store
,to-local
子命令指定了通过store2localdiff
将proxystore.Store
变更增量发送到后端,to-iptables
子命令指定使用后端iptables
。
KPNG代码粗读
首先,kpng代码里实现了很多种名叫store
、sink
、job
的类。总的来说:
job
实现数据的处理与传递。比如server/jobs/kube2store/kube2store.go
中的Job
实现数据从Kube-APIServer(通过List-Watch)到proxystore.Store
的传递;server/jobs/store2diff/store2diff.go
中的Job
实现数据从proxystore.Store
到store2diff.Sink
的传递。store
用于存储数据,一般底层都是B树,主要的包括存储Services、Endpoints、Nodes信息的proxystore.Store
,能提供数据变化跟踪的lightdiffstore.DiffStore
与diffstore.Store
。sink
提供一些接口,用于接受job
数据的传递,一般定义为interface
。比如上面提到的client/localsink/localsink.go
中的localsink.Sink
。sink
的实现也可能会是另一个job
。
代码结构
主要的代码包括:
api/
:grpc api的定义,Services、Endpoints、Nodes资源信息的api定义。backends/
:后端实现,都实现了client/backendcmd
中的Cmd interface
,使用client/backendcmd
中的Register()
注册,然后就能通过子命令调用对应的后端。func init() { backendcmd.Register("to-iptables", func() backendcmd.Cmd { return &Backend{} }) }
client/
:内容比较杂,包含了client用到的store
、sink
等结构。cmd/kpng/
:main程序代码from-k8s/
:从kubernetes代码库中移植过来的代码server/
:server/jobs
:包含了server端xxx2store
与store2xxx
的各种jobserver/proxystore
:定义了基本的存储结构proxystore.Store
接下来分别以gRPC server + examples/print-state
的用例和local server + iptables backend
的用例介绍代码流程,两个用例能覆盖大多数核心代码,对KPNG有个大体的了解。
gPRC Server
当执行kpng kube to-api
时,kpng会监听Kube-APIServer,并启动gRPC server。
kube子命令
kube
子命令的定义:
// cmd/kpng/k2s.go
func kube2storeCmd() *cobra.Command {
// kube to * command
k2sCmd := &cobra.Command{
Use: "kube",
Short: "watch Kubernetes API to the globalv1 state",
}
...
// setupKube2store定义了从kube-apiserver到store的逻辑
// storecmds.Commands为kube子命令添加store到local\gRPC\file的逻辑
k2sCmd.AddCommand(storecmds.Commands(setupKube2store)...)
return k2sCmd
}
func setupKube2store() (ctx context.Context, store *proxystore.Store, err error) {
...
// 创建新的proxystore.store
store = proxystore.New()
// kube2stroe job是Kube-APIServer到store主要逻辑的实现
go kube2store.Job{
Kube: kubeClient,
Store: store,
Config: k2sCfg,
}.Run(ctx)
return
}
kube2store.Job
的实现在server/jobs/kube2store
下,server/jobs
下也包含其他与store相关的job,大同小异。
// server/jobs/kube2store/kube2store.go
func (j Job) Run(ctx context.Context) {
...
// service的informer factory,list时只选择label service.kubernetes.io/service-proxy-name的值与kpng ServiceProxyName相同的service
// 由于informer factory的ListOptions是对所有资源都进行过滤的,因此后面还创建了一个coreFactory
svcFactory := informers.NewSharedInformerFactoryWithOptions(j.Kube, time.Second*30,
informers.WithTweakListOptions(func(options *metav1.ListOptions) { options.LabelSelector = labelSelector }))
...
// start watches
coreFactory := factory.Core().V1()
...
//启动service、nodes、endpointSlices的List-watch
servicesInformer.AddEventHandler(&serviceEventHandler{j.eventHandler(servicesInformer)})
go servicesInformer.Run(stopCh)
nodesInformer := coreFactory.Nodes().Informer()
nodesInformer.AddEventHandler(&nodeEventHandler{j.eventHandler(nodesInformer)})
go nodesInformer.Run(stopCh)
slicesInformer := factory.Discovery().V1().EndpointSlices().Informer()
slicesInformer.AddEventHandler(&sliceEventHandler{j.eventHandler(slicesInformer)})
go slicesInformer.Run(stopCh)
...
}
上面的各个eventHandler最终根据List-watch的事件,更新store,以add service event为例。
// server/jobs/kube2store/service-event-handler.go
func (h *serviceEventHandler) OnAdd(obj interface{}) {
h.onChange(obj)
}
func (h *serviceEventHandler) onChange(obj interface{}) {
...
// 把corev1.service转换为kpng api中定义的localv1.Service,用于保存在store中
service := &localv1.Service{
Namespace: svc.Namespace,
Name: svc.Name,
Type: string(svc.Spec.Type),
Labels: globsFilter(svc.Labels, h.config.ServiceLabelGlobs),
Annotations: globsFilter(svc.Annotations, h.config.ServiceAnnonationGlobs),
IPs: &localv1.ServiceIPs{
ClusterIPs: &localv1.IPSet{},
ExternalIPs: localv1.NewIPSet(svc.Spec.ExternalIPs...),
},
ExternalTrafficToLocal: svc.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal,
InternalTrafficToLocal: internalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal,
}
...
// 存储到store中,proxystore.Tx是在proxystore.Store上封装了一层,前者能把service、nodes、endpoint转成kv,存储到proxystore.Store中
// Update()传入闭包,操作proxystore.Tx
h.s.Update(func(tx *proxystore.Tx) {
...
tx.SetService(service)
})
}
到此,kube
子命令的功能就结束了。
to-api子命令
上面有提到,为kube子命令添加store到local、gRPC、file的逻辑,都是在storecmds.Commands
中实现的。
// cmd/kpng/storecmds/storecmds.go
func (c SetupFunc) ToAPICmd() *cobra.Command {
cmd := &cobra.Command{
Use: "to-api",
}
...
cmd.RunE = func(_ *cobra.Command, _ []string) (err error) {
...
// 从xxx2store对应的SetupFunc中获取store
ctx, store, err := c()
// store2api.Job实现将proxystore.Store暴露给gRPC server上
j := &store2api.Job{
Store: store,
Config: cfg,
}
return j.Run(ctx)
}
...
}
// server/jobs/store2api/store2api.go
func (j *Job) Run(ctx context.Context) error {
lis := server.MustListen(j.Config.BindSpec)
// 创建gRPC服务
...
srv = grpc.NewServer()
// 启动global API与endpoint API的server
if j.Config.GlobalAPI {
global.Setup(srv, j.Store)
}
if j.Config.LocalAPI {
endpoints.Setup(srv, j.Store)
}
return srv.Serve(lis)
}
以endpoints.Setup()
为例,注册的gRPC server设置如下:
// api/localv1/api_grpc.pb.go
func RegisterSetsServer(s grpc.ServiceRegistrar, srv SetsServer) {
// 当客户端请求gRPC server时,会使用srv调用_Sets_Watch_Handler()方法
s.RegisterService(&Sets_ServiceDesc, srv)
}
func _Sets_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
// 调用SetsServer的Watch方法,传入setsWatchServer,用于信息的接受与发送
return srv.(SetsServer).Watch(&setsWatchServer{stream})
}
// setsWatchServer将grpc.ServerStream封装为Send()与Recv()方法
type setsWatchServer struct {
grpc.ServerStream
}
func (x *setsWatchServer) Send(m *OpItem) error {
return x.ServerStream.SendMsg(m)
}
func (x *setsWatchServer) Recv() (*WatchReq, error) {
if err := x.ServerStream.RecvMsg(m); err != nil {
...
}
...
}
var Sets_ServiceDesc = grpc.ServiceDesc{
...
Streams: []grpc.StreamDesc{
{
...
Handler: _Sets_Watch_Handler,
},
},
}
可以看出gRPC Server接收的是WatchReq
,而发送的OpItem
,OpItem
有点像List-watch的event,但只包括set、delete、reset、sync四种,set、delete用来告诉客户端资源有哪些增量的变化,sync表示最新的增量已传输完毕,告诉客户端可以进行backend规则的同步了,reset一般用在gRPC重连的情况。SetsServer
是个interface,实现是在server/pkg/server/endpoints/server.go
里的Server
。
func (s *Server) Watch(res localv1.Sets_WatchServer) error {
...
// store2localdiff会根据Store计算资源的变化,通过Sink的Send方法将增量发送
job := &store2localdiff.Job{
Store: s.Store,
// serverSink继承了localv1.Sets_WatchServer的Send方法,也就是上面setsWatchServer的Send方法,因此最终通过gRPC发到Client端
Sink: serverSink{res, remote},
}
...
return job.Run(res.Context())
}
diff
store2localdiff.Job.Run()
内部调用了store2diff.Job.Run()
,这块是实现资源变更的增量发送逻辑,即每次只将资源变更的增量通过localsink.Sink.Send()
发送,不发送资源的全部信息。
diff这块代码较多,简言之就是:
- 资源变更的追踪是依靠
client/lightdiffstore
中的DiffStore
完成的,其本质是一个B树,在value中记录了此value在上一次Reset之后的状态(changed or unchanged)。而server/pkg/server/watchstate
中的WatchState
在DiffStore
上封装一层,可以将DiffStore
diff的结果通过localv1.OpSink.Send()
发送出去。 store2diff.Job.Run()
会先创建WatchState
。然后,在每一轮循环中,先通过store2diff.Sink.Update()
从proxystore.Store
中更新WatchState
;再通过store2diff.Sink.SendDiff()
发送diff的结果;当diff结果发送完成后,在发送sync的OpItem,表示发送结束,可以同步。// server/jobs/store2diff/store2diff.go func (j *Job) Run(ctx context.Context) (err error) { // 创建watchstate w := watchstate.New(j.Sink, j.Sets) ... for { ... for !updated { // block until the revision has been // incremented... then, we update our state from the // proxystore rev, closed = j.Store.View(rev, func(tx *proxystore.Tx) { j.Sink.Update(tx, w) }) ... // send the diff updated = j.Sink.SendDiff(w) } // signal the change set is fully sent w.SendSync() if w.Err != nil { return w.Err } }
store2localdiff.Job
实现了store2diff.Sink
接口,其中在SendDiff()
中,发送完diff结果后,会调用WatchState
的Reset()
进行资源状态的重置。
到此为止,整个gRPC Server的流程就结束了。
gRPC Client
在kpng中,要实现一个自定义的gRPC Client逻辑只要调用client.Run(func)
,将实现func
传入即可。以examples/print-state
为例,其实现了接收内容的打印。
// examples/print-state/main.go
func main() {
client.Run(printState)
}
// ServiceEndpoints结构记录了一个Service与它对应的本节点的Endpoint
func printState(items []*client.ServiceEndpoints) {
fmt.Println("# ------------------------------------------------------------------------")
fmt.Println("#", time.Now())
fmt.Println("#")
for _, item := range items {
fmt.Fprintln(os.Stdout, item)
}
}
client.Run
使用了fullstate.Sink
,fullstate.Sink
内部有个B树结构,存储gRPC客户端接受到的数据,然后每次在收到Server端发送的sync信号后,将全量的数据发给回调函数,也就是传入client.Run()
的自定义逻辑。
到此为止,整个gRPC Server + gRPC Client的模式就结束了。
local Server
除了上面的gRPC的方式外,还有就是上面iptable用例中,kpng kube to-local to-iptables
命令通过内存的方式传递数据。Server和Client会编译到一个二进制文件中。
to-local
to-local
接kube子命令,将proxystore.Store
中的资源通过store2localdiff.Job
处理后发送给下一个localsink.Sink
的实现。
// cmd/kpng/storecmds/storecmds.go
func (c SetupFunc) ToLocalCmd() (cmd *cobra.Command) {
cmd = &cobra.Command{
Use: "to-local",
}
job := &store2localdiff.Job{}
...
cmd.AddCommand(LocalCmds(func(sink localsink.Sink) error {
job.Sink = sink
return job.Run(ctx)
})...)
return
}
backend
上面有介绍,backend都是通过backendcmd.Register
方法注册,然后在LocalCmds()
创建相应的命令。
// cmd/kpng/storecmds/storecmds.go
func LocalCmds(run func(sink localsink.Sink) error) (cmds []*cobra.Command) {
// 获取所有注册的backend
for _, useCmd := range backendcmd.Registered() {
backend := useCmd.New()
cmd := &cobra.Command{
Use: useCmd.Use,
RunE: func(_ *cobra.Command, _ []string) error {
// 获取backend的Sink,用于在to-local子命令创建的store2localdiff.Job中获取diff结果
return run(backend.Sink())
},
}
...
}
}
以iptable为例,Sink()
方法如下:
// backends/iptables/sink.go
func (s *Backend) Sink() localsink.Sink {
return filterreset.New(pipe.New(decoder.New(s), decoder.New(conntrack.NewSink())))
}
Server端发送的diff结果,流向如下:
| --> decoder.sink --> iptables
|
diff --> filterreset.sink --> pipe.sink --> decoder.sink --> conntrack
可以看到kpng中真的是有各种各样的Sink:pipe.Sink
将数据发送给多个目的地;decoder.Sink
将raw数据转换成service、endpoint、node API;filterreset.Sink
是相对于上面介绍的fullstate.Sink
来说的,不会将全量数据往后发送,而是过滤掉之前发送过且无变化的资源,相当于是发送增量数据。
如上图所示,资源数据最后到达backend,由backend配置数据路径。kpng中的iptables、ipvs等实现主要是搬的Kube-Proxy代码,同时还在开发eBPF的数据路径。
最后
除了上面的介绍外,kpng项目中还有很多值得一看的亮点。比如kpng在diffstore.Store
中用到了go 1.18新支持的范型;代码库中的doc/service-proxy.md
是一篇非常值得一读的Kube-Proxy设计思想的文档。
转载请注明出处