苹果|OpenYurt 之 Yurthub 数据过滤框架解析( 二 )


; ok { nodePool err := fh.nodePoolLister.Get(nodePoolName) if err != nil { klog.Infof(\"skip reassemble endpointSlice failed to get nodepool %s err: %v\" nodePoolName err) return endpointSlicefor i := range endpointSlice.Endpoints { if inSameNodePool(endpointSlice.Endpoints[i
.Topology[v1.LabelHostname
nodePool.Status.Nodes) { newEps = append(newEps endpointSlice.Endpoints[i
)endpointSlice.Endpoints = newEpsreturn endpointSlice EndpointsFilter
针对 endpoints 资源进行相应的数据过滤 , 首先判断 endpoint 是否存在对应的 service , 通过 node 的 label: apps.openyurt.io/nodepool 获取节点池 , 之后获取节点池下的所有节点 , 遍历 endpoints.Subsets 下的资源找出同一个节点池的 Ready pod address 以及 NotReady pod address 重组成新的 endpoints 之后返回给 addons 。
func (fh *endpointsFilterHandler) reassembleEndpoint(endpoints *v1.Endpoints) *v1.Endpoints { svcName := endpoints.Name _ err := fh.serviceLister.Services(endpoints.Namespace).Get(svcName) if err != nil { klog.Infof(\"skip reassemble endpoints failed to get service %s/%s err: %v\" endpoints.Namespace svcName err) return endpoints// filter the endpoints on the node which is in the same nodepool with current node currentNode err := fh.nodeGetter(fh.nodeName) if err != nil { klog.Infof(\"skip reassemble endpoints failed to get current node %s err: %v\" fh.nodeName err) return endpointsif nodePoolName ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool
; ok { nodePool err := fh.nodePoolLister.Get(nodePoolName) if err != nil { klog.Infof(\"skip reassemble endpoints failed to get nodepool %s err: %v\" nodePoolName err) return endpointsvar newEpSubsets [
v1.EndpointSubset for i := range endpoints.Subsets { endpoints.Subsets[i
.Addresses = filterValidEndpointsAddr(endpoints.Subsets[i
.Addresses nodePool) endpoints.Subsets[i
.NotReadyAddresses = filterValidEndpointsAddr(endpoints.Subsets[i
.NotReadyAddresses nodePool) if endpoints.Subsets[i
.Addresses != nil || endpoints.Subsets[i
.NotReadyAddresses != nil { newEpSubsets = append(newEpSubsets endpoints.Subsets[i
)endpoints.Subsets = newEpSubsets if len(endpoints.Subsets) == 0 { // this endpoints has no nodepool valid addresses for ingress controller return nil to ignore it return nilreturn endpoints MasterServiceFilter
针对 services 下的域名进行 ip 以及端口替换 , 这个过滤器的场景主要在于边缘端的 pod 无缝使用 InClusterConfig 访问集群资源 。
func (fh *masterServiceFilterHandler) ObjectResponseFilter(b [
byte) ([
byte error) { list err := fh.serializer.Decode(b) if err != nil || list == nil { klog.Errorf(\"skip filter failed to decode response in ObjectResponseFilter of masterServiceFilterHandler %v\" err) return b nil// return data un-mutated if not ServiceList serviceList ok := list.(*v1.ServiceList) if !ok { return b nil// mutate master service for i := range serviceList.Items { if serviceList.Items[i
.Namespace == MasterServiceNamespaceserviceList.Items[i
.Name == MasterServiceName { serviceList.Items[i
.Spec.ClusterIP = fh.host for j := range serviceList.Items[i
.Spec.Ports { if serviceList.Items[i
.Spec.Ports[j
.Name == MasterServicePortName { serviceList.Items[i
.Spec.Ports[j
.Port = fh.port breakklog.V(2).Infof(\"mutate master service into ClusterIP:Port=%s:%d for request %s\" fh.host fh.port util.ReqString(fh.req)) break// return the mutated serviceList return fh.serializer.Encode(serviceList) DiscardCloudService
该过滤器针对两种 service 其中的一种类型是 LoadBalancer , 因为边缘端无法访问 LoadBalancer 类型的资源 , 所以该过滤器会将这种类型的资源直接过滤掉 。 另外一种是针对 kube-system 名称空间下的 x-tunnel-server-internal-svc , 这个 services 主要存在 cloud 节点用于访问 yurt-tunnel-server , 对于 edge 节点会直接过滤掉该 service 。
func (fh *discardCloudServiceFilterHandler) ObjectResponseFilter(b [

相关经验推荐