Kubernetes Custom Controller 구현

Kubernetes는 기본적으로 Deployment, Daemon, Service와 같은 core resource와 인터렉션할 수 있는 client나 core resource의 변화를 들을 수 있는 informer는 Kubernetes 라이브러리에서 제공된다.


import (
    appslisters "k8s.io/client-go/listers/apps/v1"
    nodelisters "k8s.io/client-go/listers/core/v1"
    appsinformers "k8s.io/client-go/informers/apps/v1"
    nodeinformers "k8s.io/client-go/informers/core/v1"
)

type Controller struct {
    kubeclient        kubernetes.Interface
    deploymentsLister appslisters.DeploymentLister
    deploymentsSynced cache.InformerSynced
}

c := &Controller{
    kubeclient:        kubeclient,
    deploymentsLister: appsinformers.DeploymentInformer.Lister(),
    deploymentsSynced: deploymentInformer.Informer().HasSynced,
}

// Get the deployment with the name 
deployment, err := c.deploymentsLister.Deployments(NAMESPACE).Get(NAME)


하지만


우리가 만들 Custom Controller 는 Kubernetes 에 정의되어 있지 않은 Resource 이고 이와 인터렉션할 수 있는 client나 informer는 존재하지 않는다. 그렇기 때문에 custom controller를 생성하기 위해서는 우선 우리가 만들 CRD에 대한 변화를 포착할 수 있는 informer와 client를 생성해야 한다.


다행히도 Kubernetes에서는 CRD에 대한 기본적인 자료구조를 생성했을 때 해당 CRD에 대해서 informer와 client를 생성해주는 CRD informer, client auto-gen 라이브러리가 존재한다.


Customer Controller 에서 사용할 구조체(CRD:Custom Resource Definition) 정의

lister 는 Customer Resource 들에 대해 조회하는 책임을 함
informer는 watch 하고 있는 Resource 에 변화가 생길 때마다 이를 핸들링할 수 있도록 변화에 대한 타입마다 이벤트 핸들러를 등록하여 관리함
client 는 CRD 에 정의된 자료 구조에 대해서 Kubernetes 를 통해 사용자가 인터랙션할 수 있도록 하는 모듈임
  • pkg/apis/logger/v1/register.go : Schema Group 정의

package v1

import (
 "github.com/NLX-SeokHwanKong/NLXK8SController/pkg/apis/logger"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 "k8s.io/apimachinery/pkg/runtime"
 "k8s.io/apimachinery/pkg/runtime/schema"
)

// GroupVersion is the identifier for the API which includes
// the name of the group and the version of the API
var SchemeGroupVersion = schema.GroupVersion{
    Group:   logger.GroupName,
    Version: "v1",
}

// AddToScheme creates a SchemeBuilder which uses functions to add types
// to the scheme
var (
 SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
 AddToScheme   = SchemeBuilder.AddToScheme
)

func Resource(resource string) schema.GroupResource {
 return SchemeGroupVersion.WithResource(resource).GroupResource()
}

func addKnownTypes(scheme *runtime.Scheme) error {
    scheme.AddKnownTypes(
        SchemeGroupVersion,
        &Logger{},
        &LoggerList{},
    )
    metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
 return nil
}

  • pkg/apis/logger/v1/types.go

그리고 // +<tag_name>[=value] 형태의 주석들을 살펴볼 수 있는데 이것들은 code generator을 위한 주석이다. code generator는 해당 주석들을 만났을 때 어떻게 동작할지에 대해서 알고 있다.


+genclient: 현재 패키지에 정의된 자료 구조에 대해서 Kubernetes Client를 생성하라. 사용자는 해당 client를 통해 Kubernetes와 인터렉션할 수 있다.

+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object: 다음 코드에 대해서 deep copy 로직을 생성하라.


package v1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type Logger struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata"`
    Status            LoggerStatus `json:"status,omitempty"`
    Spec              LoggerSpec   `json:"spec,omitempty"`
}

type LoggerStatus struct {
    Value StatusValue `json:"state"`
}

type StatusValue string

const (
    Available   StatusValue = "Available"
    Unavailable StatusValue = "Unavailable"
)

type LoggerSpec struct {
    Name         string `json:"name"`
    TimeInterval int `json:"timeInterval"`
    Replicas     *int32 `json:"replicas"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type LoggerList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `son:"metadata,omitempty"`
    Items           []*Logger `json:"items"`
}


  • pkg/apis/logger/v1/doc.go

// +k8s:deepcopy-gen=package
// +groupName=example.com

package v1


예를 들어 v1 패키지에 속한 모든 타입들에 대해 deep copy 함수가 생성될 것이고 Kuberetes API group명은 example.com이 될 것이다.


Code Generator 를 통한 lister, informer, client 생성

code generator는 https://github.com/kubernetes/code-generator 여기에서 살펴볼 수 있다.

code generator를 사용하기 위해서 우선 $GOPATH/k8s.io에 해당 레포지토리를 클론하자.

사용 방법은 다음과 같다.

cd $GOPATH/src/github.com/zeroFruit/operator-demo

$GOPATH/src/k8s.io/code-generator/generate-groups.sh all \
github.com/zeroFruit/operator-demo/pkg/client \
github.com/zeroFruit/operator-demo/pkg/apis \"logger:v1"
  • $GOPATH/src/k8s.io/code-generator/generate-groups.sh all: generate-groups.sh를 실행시키고 all subcommand를 통해 lister, informer, client를 모두 생성한다.

  • github.com/zeroFruit/operator-demo/pkg/client: 현재 데모 코드를 github.com/zeroFruit/operator-demo에서 작성하고 있고 client를 생성할 디렉토리 위치를 가리킨다. 이 때 $GOPATH/src에서 relative한 경로를 써준다.

  • github.com/zeroFruit/operator-demo/pkg/apis: CRD 자료구조를 작성한 디렉토리를 가리킨다. deep copy 함수가 생성된다. 이 때 $GOPATH/src에서 relative한 경로를 써준다.

  • "logger:v1": <resource_name>:<version> 형태로 입력해준다.


Customer Controller 구현

  1. Kubernetes Controller 인스턴스 생성

 c := &Controller{
        kubeclient:        kubeclient,
        loggerclient:      loggerclient,
        deploymentsLister: deploymentInformer.Lister(),
        deploymentsSynced: deploymentInformer.Informer().HasSynced,
        loggerLister:      loggerInformer.Lister(),
        loggerSynced:      loggerInformer.Informer().HasSynced,
        nodeLister:        nodeInformer.Lister(),
        nodeSynced:        nodeInformer.Informer().HasSynced,
        queue:             workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Workers"),
        recorder:          recorder,
    }
  • kubeclient: kubeclient는 Kubernetes의 Core resource를 관리하기위해 필요하다. 조금 더 구체적으로는 우리가 처음에 정의한 Logger 커스텀 리소스가 필요로 하는 Deployment를 관리하기 위해 필요하다. Kubernetes API와 통신한다.

  • loggerclient: loggerclient는 Logger 커스텀 리소스 자체를 관리하기 위해서 필요하다. Code generator로 만든 Logger custom clientset과 통신한다.

  • deploymentLister: deploymentLister는 클러스터 내에 있는 Deployment resource들을 조회하는 책임을 가진다.

  • deploymentSynced: deploymentSynced는 현재 클러스터 내부에 떠있는 Deployment resource와 etcd에 기록되어있는 desired state가 같은지 조회할 수 있는 함수이다.

  • nodeLister: nodeLister는 클러스터 내에 있는 Node resource들을 조회하는 책임을 가진다.

  • nodeSynced: nodeSynced는 현재 클러스터 내부에 떠있는 Node resource와 etcd에 기록되어있는 desired state가 같은지 조회할 수 있는 함수이다.

  • loggerSynced: loggerSynced는 deploymentSynced와 마찬가지로 Logger 리소스가 desired state와 같은지 조회할 수 있는 함수이다.

  • queue: 타입을 보면 추측할 수 있겠지만 rate limiting 기능이 들어있는 queue이다.

  • recorder: recorder는 우리가 정의한 custom resource에서 발생하는 이벤트들을 기록하는 역할을 함


2. Node Event 핸들러 등록

// Setup an event handler for when Node resources change. 
 // If New node added, this controller send this node informations to external API Server
    nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
 node := obj.(*corev1.Node)
 nodeIP, err := utils.GetNodeIP(node)
 if err != nil {
                klog.Errorf("New node received, but we were unable to add it as we were couldn't find it's node IP: %v", err)
 return
            }
            klog.Infof("Received node %s added update from watch API so peer with new node", nodeIP)

 values := map[string]string{"devType": "linux", "nodeUrl": nodeIP.String(), "namespace": "linux", "ssh_key_file": ""}
 json_data, err := json.Marshal(values)

 if err != nil {
                klog.Errorf("Recieved JSON Marshal error : %s", err)
 return
            }

 k8sNodeAddUrl := k8sBaseUrl + "/api/k8s/nodes"
 resp, err := http.Post(k8sNodeAddUrl, "application/json", bytes.NewBuffer(json_data))

 if err != nil {
                klog.Errorf("HTTP Post request error : %s", err)
 return
            }

 var res map[string]interface{}
            json.NewDecoder(resp.Body).Decode(&res)
            klog.Infof("HTTP Resp: %s ", res["json"])
 return
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
 // we are only interested in node add/delete, so skip update
        },
        DeleteFunc: func(obj interface{}) {
 node, ok := obj.(*corev1.Node)
 if !ok {
 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
 if !ok {
                    klog.Errorf("unexpected object type: %v", obj)
 return
                }
 if node, ok = tombstone.Obj.(*corev1.Node); !ok {
                    klog.Errorf("unexpected object type: %v", obj)
 return
                }
            }
 nodeIP, err := utils.GetNodeIP(node)
 // In this case even if we can't get the NodeIP that's alright as the node is being removed anyway and
 // future node lister operations that happen in OnNodeUpdate won't be affected as the node won't be returned
 if err == nil {
                klog.Infof("Received node %s removed update from watch API, so remove node from peer", nodeIP)
            } else {
                klog.Infof("Received node (IP unavailable) removed update from watch API, so remove node from peer")
 return
            }

 k8sNodeDelUrl := k8sBaseUrl + "/api/k8s/nodes/" + string(nodeIP)
 // Create client
 client := &http.Client{}

 // Create request
 req, err := http.NewRequest("DELETE", k8sNodeDelUrl, nil)
 if err != nil {
                klog.Errorf("unexpected object type: %v", obj)
            }

 // Fetch Request
 resp, err := client.Do(req)
 if err != nil {
                klog.Errorf("HTTP DELETE request error: %s", err)
 return
            }
 defer resp.Body.Close()

 // Read Response Body
 respBody, err := ioutil.ReadAll(resp.Body)
 if err != nil {
                klog.Errorf("HTTP DELETE Error : %s, %s", err, respBody)
 return
            }
 return
        },
    })

 return c
}

개변 Informer는 각자 watch하고 있는 resource에 변화가 생길 때마다 이를 핸들링할 수 있도록 변화에 대한 타입마다 이벤트 핸들러를 등록할 수 있다.

NodeInformer의 경우 새로운 Node resource가 생성되거나 업데이트 될 때 외부 REST API 를 호출하고 있다.


CRD 생성

이것은 Kubernetes 클러스터에 Logger custom resource 정의에 대해 알려줘야하는데 이를 통해 클러스터는 해당 Logger resource를 어떻게 생성해야하는지 알게 된다.


logger.yaml

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
 name: loggers.example.com
spec:
 group: example.com
 versions:
  - name: v1
 served: true
 storage: true
 names:
 kind: Logger
 plural: loggers
 scope: Namespaced
 validation:
 openAPIV3Schema:
 required: ["spec"]
 properties:
 spec:
 required: ["name", "timeInterval"]
 properties:
 name:
 type: "string"
 minimum: 1
 timeInterval:
 type: "integer"
 minimum: 1

custom resource 정의는 Kubernetes에서 제공해주는 CustomResourceDefinition를 통해서 하게 되는데 custom resource에 대한 정의도 Kubernetes에서는 또 하나의 resource이다.


spec.group, spec.versions 의 경우 처음에 code auto-gen을 할 때 명시해주었던 그룹과 버전을 써주도록한다.


그리고 validation 필드에서 Logger resource를 생성하기 위해 필수적으로 명시해야할 필드들을 나열할 수 있다. validation 필드에 들어간 필드들은 resource를 생성할 때 명시해주지 않으면 생성할 수 없다는 에러 메세지를 보여준다.


이 경우는 name과 timeInterval 모두 필수적으로 액터가 명시해주어야하는 필드라 생각하고 Replicas는 그렇지 않다고 생각하여 Replicas를 제외한 필드들을 validation 필드에 적어주었다.

kubectl apply -f artifacts/crd/logger.yaml

custom resource에 대한 정의를 마쳤다면 해당 CustomResourceDefinition resource를 클러스터에 생성하자.


Custom Controller 실행

go run ./cmd/main.go

(자료 출처) https://getoutsidedoor.com/2020/05/09/kubernetes-controller-%ea%b5%ac%ed%98%84%ed%95%b4%eb%b3%b4%ea%b8%b0/