Scheduler clean up kubernetes unuseful resources

Prepare Golang program code use clean up unused pods and replicaset resources in kubernetes

we can clean up not running status pods and deactive replicasets

  • initialization Go lanugage project evnironment
1	mkdir k8s_res_cleanup && cd k8s_res_cleanup
2	go mod init  example.com/cleanup/resources
3	go get -v -u k8s.io/client-go@v0.18.2 # install packages
4	go get -v -u github.com/prometheus/common@v0.1.0
  • create sub packageconfig/k8s.go indicate how can connect to kubernetes cluster
 1package config
 2
 3import (
 4	"flag"
 5	logs "github.com/prometheus/common/log"
 6	"k8s.io/client-go/kubernetes"
 7	"k8s.io/client-go/rest"
 8	"k8s.io/client-go/tools/clientcmd"
 9	"os"
10	"path/filepath"
11)
12
13type K8s struct {
14	Client kubernetes.Interface
15	Config *rest.Config
16}
17
18// NewK8s will provide a new k8s client interface
19// resolves where it is running whether inside the kubernetes cluster or outside
20// While running outside of the cluster, tries to make use of the kubeconfig file
21// While running inside the cluster resolved via pod environment uses the in-cluster config
22func NewK8s() (*K8s, error) {
23	client := K8s{}
24	if _, inCluster := os.LookupEnv("KUBERNETES_SERVICE_HOST"); inCluster == true {
25		logs.Info("Program running inside the cluster, picking the in-cluster configuration")
26
27		config, err := rest.InClusterConfig()
28		if err != nil {
29			return nil, err
30		}
31		client.Client, err = kubernetes.NewForConfig(config)
32		client.Config = config
33		if err != nil {
34			return nil, err
35		}
36		return &client, nil
37	}
38
39	logs.Info("Program running from outside of the cluster")
40	var kubeconfig *string
41	if home := homeDir(); home != "" {
42		kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
43	} else {
44		kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
45	}
46	flag.Parse()
47	config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
48	if err != nil {
49		return nil, err
50	}
51	client.Client, err = kubernetes.NewForConfig(config)
52	if err != nil {
53		return nil, err
54	}
55	client.Config = config
56	return &client, nil
57}
58
59func homeDir() string {
60	if h := os.Getenv("HOME"); h != "" {
61		return h
62	}
63	return os.Getenv("USERPROFILE")
64}
  • create and fill code to main.go
 1package main
 2
 3import (
 4	"flag"
 5	logs "github.com/prometheus/common/log"
 6	"snc.com/cleanup/resources/resources"
 7	"strings"
 8)
 9
10var (
11	namespace      string
12	dryRun         bool
13	expirationDays int
14	resourceType   string
15)
16
17func init() {
18	const (
19		DefaultNamespace      = ""
20		DefaultDryRun         = false
21		DefaultExpirationDays = 100
22		DefaultResourceType   = "pod"
23	)
24	flag.StringVar(&namespace, "namespace", DefaultNamespace, "the namespace for cleanup resources")
25	flag.BoolVar(&dryRun, "dry_run", DefaultDryRun, "indicate whether dry run")
26	flag.IntVar(&expirationDays, "expire_day", DefaultExpirationDays, "the expiration days to be cleanup resources")
27	flag.StringVar(&resourceType, "resource_type", DefaultResourceType, "Which resource cleanup enabled")
28}
29
30func main() {
31	flag.Parse()
32	resources.InitClient(dryRun, expirationDays, namespace)
33	logs.Info("The pass arguments is namespace: ", namespace, " dry_run: ", dryRun, " expire_day:", expirationDays,
34		" resource_type: ", resourceType)
35	logs.Info("Start delete expire resources:")
36	resourceType = strings.ToLower(resourceType)
37	resources.CleanUp(resourceType)
38}
  • create resources/init.go sub packages the contents like following
 1package resources
 2
 3import (
 4	logs "github.com/prometheus/common/log"
 5	"k8s.io/client-go/kubernetes"
 6	"snc.com/cleanup/resources/config"
 7	"strings"
 8	"sync"
 9	"time"
10)
11
12var (
13	client     kubernetes.Interface
14	listLimit  int64 = 50
15	dryRun     []string
16	expireDays int
17	namespace  string
18	wg         sync.WaitGroup
19	cleanMap   = map[string]func(w *sync.WaitGroup){
20		"pod": func(w *sync.WaitGroup) {
21			logs.Info("====== Start clean up pods ======= ")
22			defer w.Done()
23			cleanUpExpiredPods("")
24		},
25		"replicaset": func(w *sync.WaitGroup) {
26			logs.Info("====== Start clean up replicaset ======= ")
27			defer w.Done()
28			cleanUpExpiredRS("")
29		},
30		"airflow": func(w *sync.WaitGroup) {
31			logs.Info("====== Start clean up airflow logs ======= ")
32		},
33	}
34)
35
36func InitClient(testing bool, expires int, ns string) {
37	k8sClient, err := config.NewK8s()
38	expireDays = expires
39	namespace = ns
40	if err != nil {
41		logs.Fatal("Init kubernetes error.", err)
42	}
43	client = k8sClient.Client
44	if testing {
45		dryRun = []string{"All"}
46	}
47}
48
49func CleanUp(resourceType string) {
50	rt := strings.Split(resourceType, ",")
51	for _, v := range rt {
52		v = strings.TrimSpace(v)
53		if _, ok := cleanMap[v]; ok {
54			wg.Add(1)
55			go cleanMap[v](&wg)
56		}
57	}
58	wg.Wait()
59	logs.Info("CleanUp Done!")
60}
61
62func daysAgo(sourceTime time.Time) int {
63	return int(time.Now().UTC().Sub(sourceTime).Hours() / 24)
64}
  • the resources/pods.go like following this
 1package resources
 2
 3import (
 4	"context"
 5	logs "github.com/prometheus/common/log"
 6	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 7)
 8
 9var podsFieldsOpts = "status.phase!=Running"
10
11func cleanUpExpiredPods(goOn string) {
12	lst, err := client.CoreV1().Pods(namespace).List(
13		context.TODO(),
14		v1.ListOptions{
15			FieldSelector: podsFieldsOpts,
16			Limit:         listLimit,
17			Continue:      goOn,
18		},
19	)
20	if err != nil {
21		logs.Error("has error list pods:", err.Error())
22	}
23	for _, item := range lst.Items {
24		days := daysAgo(item.CreationTimestamp.UTC())
25		if days >= expireDays {
26			err := deletePodByName(item.Name, item.Namespace)
27			if err != nil {
28				logs.Error("Delete Pod", item.Name, " has error: ", err.Error())
29			}
30			logs.Info("Delete Pod The Name: ", item.Name, " NameSpace: ", item.Namespace, " Status: ",
31				item.Status.Phase, " Age:", days, "d")
32		}
33	}
34	if lst.Continue != "" {
35		cleanUpExpiredPods(lst.Continue)
36	}
37}
38
39func deletePodByName(name string, ns string) error {
40	deletePolicy := v1.DeletePropagationForeground
41	return client.CoreV1().Pods(ns).Delete(context.TODO(), name, v1.DeleteOptions{
42		PropagationPolicy: &deletePolicy,
43		DryRun:            dryRun,
44	})
45}
  • And resources/replicasets.go clean up the unuse replicasets resource
 1package resources
 2
 3import (
 4	"context"
 5	logs "github.com/prometheus/common/log"
 6	v12 "k8s.io/api/apps/v1"
 7	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 8)
 9
10func cleanUpExpiredRS(goOn string) {
11	lstOps := v1.ListOptions{
12		Limit:    listLimit,
13		Continue: goOn,
14	}
15	lstRs, err := client.AppsV1().ReplicaSets(namespace).List(context.TODO(), lstOps)
16	if err != nil {
17		logs.Fatal("Get Resource has error: ", err.Error())
18	}
19	if lstRs != nil {
20		for _, item := range lstRs.Items {
21			days := daysAgo(item.CreationTimestamp.UTC())
22			if matchCondition(days, item) {
23				err := deleteRSByName(item.Name, item.Namespace)
24				if err != nil {
25					logs.Error("Delete replicaset", item.Name, " has error", err)
26				}
27				logs.Info("Delete the replicaset Name: ", item.Name, " Namespace: ", item.Namespace, " Replicas: ",
28					item.Status.Replicas, " Ready: ", item.Status.ReadyReplicas, " Available: ",
29					item.Status.AvailableReplicas, " Age: ", days, "d")
30			}
31		}
32		if lstRs.Continue != "" {
33			cleanUpExpiredRS(lstRs.Continue)
34		}
35	}
36}
37
38func matchCondition(days int, item v12.ReplicaSet) bool {
39	if days >= expireDays && item.Status.Replicas == 0 &&
40		item.Status.ReadyReplicas == 0 && item.Status.AvailableReplicas == 0 {
41		return true
42	}
43	return false
44}
45
46func deleteRSByName(name string, ns string) error {
47	deletePolicy := v1.DeletePropagationForeground
48	return client.AppsV1().ReplicaSets(ns).Delete(context.TODO(), name, v1.DeleteOptions{
49		DryRun:            dryRun,
50		PropagationPolicy: &deletePolicy,
51	})
52}
  • Then create Dockerfile build and construct the docker image, use to kubernetes cronjob task scheduler
 1FROM golang:alpine AS build-env
 2RUN mkdir /go/src/app && apk update && apk add git
 3ADD . /go/src/app/
 4WORKDIR /go/src/app
 5RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o cleanup .
 6
 7FROM scratch
 8WORKDIR /app
 9COPY --from=build-env /go/src/app/cleanup .
10CMD ["./cleanup", "--dry_run"]
  • At end create kubernetes resource yaml file to definition cronjob resource, schedule the clean up job
 1---
 2apiVersion: rbac.authorization.k8s.io/v1
 3kind: ClusterRole
 4metadata:
 5  name: k8s-cleanup-resource
 6rules:
 7  - apiGroups: ["", "extension", "apps"]
 8    resources: ["pods", "replicasets"]
 9    verbs: ["get", "list", "delete"]
10---
11apiVersion: v1
12kind: ServiceAccount
13metadata:
14  name: k8s-cleanup-resource
15  namespace: kube-system
16---
17apiVersion: rbac.authorization.k8s.io/v1
18kind: ClusterRoleBinding
19metadata:
20  name: k8s-cleanup-resource
21roleRef:
22  kind: ClusterRole
23  name: k8s-cleanup-resource
24  apiGroup: rbac.authorization.k8s.io
25subjects:
26  - kind: ServiceAccount
27    name: "k8s-cleanup-resource"
28    namespace: kube-system
29
30---
31apiVersion: batch/v1beta1
32kind: CronJob
33metadata:
34  name: cleanup-resources
35  namespace: kube-system
36spec:
37  schedule: "0 15 * * *"
38  concurrencyPolicy: Allow
39  startingDeadlineSeconds: 10
40  successfulJobsHistoryLimit: 3
41  failedJobsHistoryLimit: 3
42  suspend: false
43  jobTemplate:
44    spec:
45      template:
46         spec:
47            securityContext:
48              runAsUser: 50000
49              runAsgGroup: 50000
50              fsGroup: 65534
51          containers:
52            - name: cleanup-resources
53              image: 'localhost.reg/library/cleanup-resource:latest'
54              command:
55                - "./cleanup"
56              args:
57                - "--expire_day"
58                - "5"
59                - "--resource_type"
60                - "pod,replicaset"
61              resources:
62                limits:
63                  memory: 200Mi
64                requests:
65                  memory: 50Mi
66          restartPolicy: OnFailure
67          serviceAccountName: k8s-cleanup-resource
68          imagePullSecrets:
69            - name: k8s-registry