Scheduler clean up kubernetes unuseful resources
Prepare Golang program code use clean up unused pods and replicaset resources in kubernetes
we can clean up not running status pods and deactive replicasets
- initialization Go lanugage project evnironment
1 mkdir k8s_res_cleanup && cd k8s_res_cleanup
2 go mod init example.com/cleanup/resources
3 go get -v -u k8s.io/client-go@v0.18.2 # install packages
4 go get -v -u github.com/prometheus/common@v0.1.0
- create sub package
config/k8s.go
indicate how can connect to kubernetes cluster
1package config
2
3import (
4 "flag"
5 logs "github.com/prometheus/common/log"
6 "k8s.io/client-go/kubernetes"
7 "k8s.io/client-go/rest"
8 "k8s.io/client-go/tools/clientcmd"
9 "os"
10 "path/filepath"
11)
12
13type K8s struct {
14 Client kubernetes.Interface
15 Config *rest.Config
16}
17
18// NewK8s will provide a new k8s client interface
19// resolves where it is running whether inside the kubernetes cluster or outside
20// While running outside of the cluster, tries to make use of the kubeconfig file
21// While running inside the cluster resolved via pod environment uses the in-cluster config
22func NewK8s() (*K8s, error) {
23 client := K8s{}
24 if _, inCluster := os.LookupEnv("KUBERNETES_SERVICE_HOST"); inCluster == true {
25 logs.Info("Program running inside the cluster, picking the in-cluster configuration")
26
27 config, err := rest.InClusterConfig()
28 if err != nil {
29 return nil, err
30 }
31 client.Client, err = kubernetes.NewForConfig(config)
32 client.Config = config
33 if err != nil {
34 return nil, err
35 }
36 return &client, nil
37 }
38
39 logs.Info("Program running from outside of the cluster")
40 var kubeconfig *string
41 if home := homeDir(); home != "" {
42 kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
43 } else {
44 kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
45 }
46 flag.Parse()
47 config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
48 if err != nil {
49 return nil, err
50 }
51 client.Client, err = kubernetes.NewForConfig(config)
52 if err != nil {
53 return nil, err
54 }
55 client.Config = config
56 return &client, nil
57}
58
59func homeDir() string {
60 if h := os.Getenv("HOME"); h != "" {
61 return h
62 }
63 return os.Getenv("USERPROFILE")
64}
- create and fill code to
main.go
1package main
2
3import (
4 "flag"
5 logs "github.com/prometheus/common/log"
6 "snc.com/cleanup/resources/resources"
7 "strings"
8)
9
10var (
11 namespace string
12 dryRun bool
13 expirationDays int
14 resourceType string
15)
16
17func init() {
18 const (
19 DefaultNamespace = ""
20 DefaultDryRun = false
21 DefaultExpirationDays = 100
22 DefaultResourceType = "pod"
23 )
24 flag.StringVar(&namespace, "namespace", DefaultNamespace, "the namespace for cleanup resources")
25 flag.BoolVar(&dryRun, "dry_run", DefaultDryRun, "indicate whether dry run")
26 flag.IntVar(&expirationDays, "expire_day", DefaultExpirationDays, "the expiration days to be cleanup resources")
27 flag.StringVar(&resourceType, "resource_type", DefaultResourceType, "Which resource cleanup enabled")
28}
29
30func main() {
31 flag.Parse()
32 resources.InitClient(dryRun, expirationDays, namespace)
33 logs.Info("The pass arguments is namespace: ", namespace, " dry_run: ", dryRun, " expire_day:", expirationDays,
34 " resource_type: ", resourceType)
35 logs.Info("Start delete expire resources:")
36 resourceType = strings.ToLower(resourceType)
37 resources.CleanUp(resourceType)
38}
- create
resources/init.go
sub packages the contents like following
1package resources
2
3import (
4 logs "github.com/prometheus/common/log"
5 "k8s.io/client-go/kubernetes"
6 "snc.com/cleanup/resources/config"
7 "strings"
8 "sync"
9 "time"
10)
11
12var (
13 client kubernetes.Interface
14 listLimit int64 = 50
15 dryRun []string
16 expireDays int
17 namespace string
18 wg sync.WaitGroup
19 cleanMap = map[string]func(w *sync.WaitGroup){
20 "pod": func(w *sync.WaitGroup) {
21 logs.Info("====== Start clean up pods ======= ")
22 defer w.Done()
23 cleanUpExpiredPods("")
24 },
25 "replicaset": func(w *sync.WaitGroup) {
26 logs.Info("====== Start clean up replicaset ======= ")
27 defer w.Done()
28 cleanUpExpiredRS("")
29 },
30 "airflow": func(w *sync.WaitGroup) {
31 logs.Info("====== Start clean up airflow logs ======= ")
32 },
33 }
34)
35
36func InitClient(testing bool, expires int, ns string) {
37 k8sClient, err := config.NewK8s()
38 expireDays = expires
39 namespace = ns
40 if err != nil {
41 logs.Fatal("Init kubernetes error.", err)
42 }
43 client = k8sClient.Client
44 if testing {
45 dryRun = []string{"All"}
46 }
47}
48
49func CleanUp(resourceType string) {
50 rt := strings.Split(resourceType, ",")
51 for _, v := range rt {
52 v = strings.TrimSpace(v)
53 if _, ok := cleanMap[v]; ok {
54 wg.Add(1)
55 go cleanMap[v](&wg)
56 }
57 }
58 wg.Wait()
59 logs.Info("CleanUp Done!")
60}
61
62func daysAgo(sourceTime time.Time) int {
63 return int(time.Now().UTC().Sub(sourceTime).Hours() / 24)
64}
- the
resources/pods.go
like following this
1package resources
2
3import (
4 "context"
5 logs "github.com/prometheus/common/log"
6 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
7)
8
9var podsFieldsOpts = "status.phase!=Running"
10
11func cleanUpExpiredPods(goOn string) {
12 lst, err := client.CoreV1().Pods(namespace).List(
13 context.TODO(),
14 v1.ListOptions{
15 FieldSelector: podsFieldsOpts,
16 Limit: listLimit,
17 Continue: goOn,
18 },
19 )
20 if err != nil {
21 logs.Error("has error list pods:", err.Error())
22 }
23 for _, item := range lst.Items {
24 days := daysAgo(item.CreationTimestamp.UTC())
25 if days >= expireDays {
26 err := deletePodByName(item.Name, item.Namespace)
27 if err != nil {
28 logs.Error("Delete Pod", item.Name, " has error: ", err.Error())
29 }
30 logs.Info("Delete Pod The Name: ", item.Name, " NameSpace: ", item.Namespace, " Status: ",
31 item.Status.Phase, " Age:", days, "d")
32 }
33 }
34 if lst.Continue != "" {
35 cleanUpExpiredPods(lst.Continue)
36 }
37}
38
39func deletePodByName(name string, ns string) error {
40 deletePolicy := v1.DeletePropagationForeground
41 return client.CoreV1().Pods(ns).Delete(context.TODO(), name, v1.DeleteOptions{
42 PropagationPolicy: &deletePolicy,
43 DryRun: dryRun,
44 })
45}
- And
resources/replicasets.go
clean up the unuse replicasets resource
1package resources
2
3import (
4 "context"
5 logs "github.com/prometheus/common/log"
6 v12 "k8s.io/api/apps/v1"
7 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8)
9
10func cleanUpExpiredRS(goOn string) {
11 lstOps := v1.ListOptions{
12 Limit: listLimit,
13 Continue: goOn,
14 }
15 lstRs, err := client.AppsV1().ReplicaSets(namespace).List(context.TODO(), lstOps)
16 if err != nil {
17 logs.Fatal("Get Resource has error: ", err.Error())
18 }
19 if lstRs != nil {
20 for _, item := range lstRs.Items {
21 days := daysAgo(item.CreationTimestamp.UTC())
22 if matchCondition(days, item) {
23 err := deleteRSByName(item.Name, item.Namespace)
24 if err != nil {
25 logs.Error("Delete replicaset", item.Name, " has error", err)
26 }
27 logs.Info("Delete the replicaset Name: ", item.Name, " Namespace: ", item.Namespace, " Replicas: ",
28 item.Status.Replicas, " Ready: ", item.Status.ReadyReplicas, " Available: ",
29 item.Status.AvailableReplicas, " Age: ", days, "d")
30 }
31 }
32 if lstRs.Continue != "" {
33 cleanUpExpiredRS(lstRs.Continue)
34 }
35 }
36}
37
38func matchCondition(days int, item v12.ReplicaSet) bool {
39 if days >= expireDays && item.Status.Replicas == 0 &&
40 item.Status.ReadyReplicas == 0 && item.Status.AvailableReplicas == 0 {
41 return true
42 }
43 return false
44}
45
46func deleteRSByName(name string, ns string) error {
47 deletePolicy := v1.DeletePropagationForeground
48 return client.AppsV1().ReplicaSets(ns).Delete(context.TODO(), name, v1.DeleteOptions{
49 DryRun: dryRun,
50 PropagationPolicy: &deletePolicy,
51 })
52}
- Then create
Dockerfile
build and construct the docker image, use to kubernetes cronjob task scheduler
1FROM golang:alpine AS build-env
2RUN mkdir /go/src/app && apk update && apk add git
3ADD . /go/src/app/
4WORKDIR /go/src/app
5RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o cleanup .
6
7FROM scratch
8WORKDIR /app
9COPY --from=build-env /go/src/app/cleanup .
10CMD ["./cleanup", "--dry_run"]
- At end create kubernetes resource yaml file to definition cronjob resource, schedule the clean up job
1---
2apiVersion: rbac.authorization.k8s.io/v1
3kind: ClusterRole
4metadata:
5 name: k8s-cleanup-resource
6rules:
7 - apiGroups: ["", "extension", "apps"]
8 resources: ["pods", "replicasets"]
9 verbs: ["get", "list", "delete"]
10---
11apiVersion: v1
12kind: ServiceAccount
13metadata:
14 name: k8s-cleanup-resource
15 namespace: kube-system
16---
17apiVersion: rbac.authorization.k8s.io/v1
18kind: ClusterRoleBinding
19metadata:
20 name: k8s-cleanup-resource
21roleRef:
22 kind: ClusterRole
23 name: k8s-cleanup-resource
24 apiGroup: rbac.authorization.k8s.io
25subjects:
26 - kind: ServiceAccount
27 name: "k8s-cleanup-resource"
28 namespace: kube-system
29
30---
31apiVersion: batch/v1beta1
32kind: CronJob
33metadata:
34 name: cleanup-resources
35 namespace: kube-system
36spec:
37 schedule: "0 15 * * *"
38 concurrencyPolicy: Allow
39 startingDeadlineSeconds: 10
40 successfulJobsHistoryLimit: 3
41 failedJobsHistoryLimit: 3
42 suspend: false
43 jobTemplate:
44 spec:
45 template:
46 spec:
47 securityContext:
48 runAsUser: 50000
49 runAsgGroup: 50000
50 fsGroup: 65534
51 containers:
52 - name: cleanup-resources
53 image: 'localhost.reg/library/cleanup-resource:latest'
54 command:
55 - "./cleanup"
56 args:
57 - "--expire_day"
58 - "5"
59 - "--resource_type"
60 - "pod,replicaset"
61 resources:
62 limits:
63 memory: 200Mi
64 requests:
65 memory: 50Mi
66 restartPolicy: OnFailure
67 serviceAccountName: k8s-cleanup-resource
68 imagePullSecrets:
69 - name: k8s-registry