编写集群模块,命名为cluster,用于录入进群信息,修改集群信息。
cluster模块的proto文件
apps/cluster/pb/cluster.proto
syntax = "proto3";
package infraboard.mpaas.cluster;
option go_package = "github.com/infraboard/mpaas/apps/cluster";
// Cluster todo
message Cluster {
// 唯一ID
// @gotags: json:"id" bson:"_id"
string id = 1;
// 录入时间
// @gotags: json:"create_at" bson:"create_at"
int64 create_at = 2;
// 更新时间
// @gotags: json:"update_at" bson:"update_at"
int64 update_at = 3;
// 更新人
// @gotags: json:"update_by" bson:"update_by"
string update_by = 4;
// 集群相关信息
// @gotags: json:"server_info" bson:"server_info"
ServerInfo server_info = 8;
// 基础信息
// @gotags: json:"data" bson:"data"
CreateClusterRequest data = 9;
// 集群状态
// @gotags: json:"status" bson:"status"
Status status = 10;
}
message ServerInfo {
// k8s的地址
// @gotags: json:"server" bson:"server"
string server = 1;
// k8s版本
// @gotags: json:"version" bson:"version"
string version = 2;
// 连接用户
// @gotags: json:"auth_user" bson:"auth_user"
string auth_user = 3;
}
message Status {
// 检查时间
// @gotags: json:"check_at" bson:"check_at"
int64 check_at = 1;
// API Server是否正常
// @gotags: json:"is_alive" bson:"is_alive"
bool is_alive = 2;
// 异常消息
// @gotags: json:"message" bson:"message"
string message = 10;
}
message CreateClusterRequest {
// 集群所属域
// @gotags: json:"domain" form:"domain" bson:"domain"
string domain = 1;
// 集群所属空间
// @gotags: json:"namespace" form:"namespace" bson:"namespace"
string namespace = 2;
// 创建人
// @gotags: json:"create_by" form:"create_by" bson:"create_by"
string create_by = 3;
// 集群提供商
// @gotags: json:"vendor" bson:"vendor" form:"vendor" validate:"required"
string vendor = 4;
// 集群所处地域
// @gotags: json:"region" bson:"region" form:"region" validate:"required"
string region = 5;
// 名称
// @gotags: json:"name" bson:"name" form:"name" validate:"required"
string name = 6;
// 集群客户端访问凭证
// @gotags: json:"kube_config" bson:"kube_config" form:"kube_config" validate:"required"
string kube_config = 7;
// 集群描述
// @gotags: json:"description" form:"description" bson:"description"
string description = 8;
// 集群标签, env=prod
// @gotags: json:"lables" form:"lables" bson:"lables"
map<string, string> lables = 9;
}
// ClusterSet todo
message ClusterSet {
// 分页时,返回总数量
// @gotags: json:"total"
int64 total = 1;
// 一页的数据
// @gotags: json:"items"
repeated Cluster items = 2;
}
apps/cluster/pb/rpc.proto
syntax = "proto3";
package infraboard.mpaas.cluster;
option go_package = "github.com/infraboard/mpaas/apps/cluster";
// Cluster todo
message Cluster {
// 唯一ID
// @gotags: json:"id" bson:"_id"
string id = 1;
// 录入时间
// @gotags: json:"create_at" bson:"create_at"
int64 create_at = 2;
// 更新时间
// @gotags: json:"update_at" bson:"update_at"
int64 update_at = 3;
// 更新人
// @gotags: json:"update_by" bson:"update_by"
string update_by = 4;
// 集群相关信息
// @gotags: json:"server_info" bson:"server_info"
ServerInfo server_info = 8;
// 基础信息
// @gotags: json:"data" bson:"data"
CreateClusterRequest data = 9;
// 集群状态
// @gotags: json:"status" bson:"status"
Status status = 10;
}
message ServerInfo {
// k8s的地址
// @gotags: json:"server" bson:"server"
string server = 1;
// k8s版本
// @gotags: json:"version" bson:"version"
string version = 2;
// 连接用户
// @gotags: json:"auth_user" bson:"auth_user"
string auth_user = 3;
}
message Status {
// 检查时间
// @gotags: json:"check_at" bson:"check_at"
int64 check_at = 1;
// API Server是否正常
// @gotags: json:"is_alive" bson:"is_alive"
bool is_alive = 2;
// 异常消息
// @gotags: json:"message" bson:"message"
string message = 10;
}
message CreateClusterRequest {
// 集群所属域
// @gotags: json:"domain" form:"domain" bson:"domain"
string domain = 1;
// 集群所属空间
// @gotags: json:"namespace" form:"namespace" bson:"namespace"
string namespace = 2;
// 创建人
// @gotags: json:"create_by" form:"create_by" bson:"create_by"
string create_by = 3;
// 集群提供商
// @gotags: json:"vendor" bson:"vendor" form:"vendor" validate:"required"
string vendor = 4;
// 集群所处地域
// @gotags: json:"region" bson:"region" form:"region" validate:"required"
string region = 5;
// 名称
// @gotags: json:"name" bson:"name" form:"name" validate:"required"
string name = 6;
// 集群客户端访问凭证
// @gotags: json:"kube_config" bson:"kube_config" form:"kube_config" validate:"required"
string kube_config = 7;
// 集群描述
// @gotags: json:"description" form:"description" bson:"description"
string description = 8;
// 集群标签, env=prod
// @gotags: json:"lables" form:"lables" bson:"lables"
map<string, string> lables = 9;
}
// ClusterSet todo
message ClusterSet {
// 分页时,返回总数量
// @gotags: json:"total"
int64 total = 1;
// 一页的数据
// @gotags: json:"items"
repeated Cluster items = 2;
}
make gen生成go文件
apps/cluster/cluster.pb.go
apps/cluster/rpc.pb.go
apps/cluster/rpc_grpc.pb.go
cluster模块的全局app
apps/cluster/app.go
设置resource模块对应的模块名,校验以及请求的构造函数
const (
AppName = "clusters"
)
var (
validate = validator.New()
)
func NewCreateClusterRequest() *CreateClusterRequest {
return &CreateClusterRequest{
Domain: "default",
Namespace: "default",
}
}
func NewCluster(req *CreateClusterRequest) (*Cluster, error) {
if err := req.Validate(); err != nil {
return nil, err
}
return &Cluster{
Id: xid.New().String(),
CreateAt: time.Now().UnixMicro(),
Data: req,
ServerInfo: &ServerInfo{},
Status: &Status{},
}, nil
}
func (req *CreateClusterRequest) Validate() error {
return validate.Struct(req)
}
func (req *CreateClusterRequest) UpdateOwner() {
req.CreateBy = "default"
req.Domain = "default"
req.Namespace = "default"
}
func NewClusterSet() *ClusterSet {
return &ClusterSet{
Items: []*Cluster{},
}
}
func (s *ClusterSet) Add(item *Cluster) {
s.Items = append(s.Items, item)
}
func (s *ClusterSet) Desense() {
for i := range s.Items {
s.Items[i].Desense()
}
}
func (s *ClusterSet) DecryptKubeConf(key string) error {
errs := []string{}
for i := range s.Items {
err := s.Items[i].DecryptKubeConf(key)
if err != nil {
errs = append(errs, fmt.Sprintf(
"decrypt %s kubeconf error, %s",
s.Items[i].Data.Name,
err))
}
}
if len(errs) > 0 {
return fmt.Errorf("%s", strings.Join(errs, ","))
}
return nil
}
func NewDefaultCluster() *Cluster {
return &Cluster{
Data: &CreateClusterRequest{},
}
}
func (i *Cluster) IsAlive() error {
if i.Status == nil {
return fmt.Errorf("status is nil")
}
if !i.Status.IsAlive {
return fmt.Errorf(i.Status.Message)
}
return nil
}
func (i *Cluster) Update(req *UpdateClusterRequest) {
i.UpdateAt = time.Now().UnixMicro()
i.UpdateBy = req.UpdateBy
i.Data = req.Data
}
func (i *Cluster) Patch(req *UpdateClusterRequest) error {
i.UpdateAt = time.Now().UnixMicro()
i.UpdateBy = req.UpdateBy
return mergo.MergeWithOverwrite(i.Data, req.Data)
}
func (i *Cluster) EncryptKubeConf(key string) error {
// 判断文本是否已经加密
if strings.HasPrefix(i.Data.KubeConfig, conf.CIPHER_TEXT_PREFIX) {
return fmt.Errorf("text has ciphered")
}
cipherText, err := cbc.Encrypt([]byte(i.Data.KubeConfig), []byte(key))
if err != nil {
return err
}
base64Str := base64.StdEncoding.EncodeToString(cipherText)
i.Data.KubeConfig = fmt.Sprintf("%s%s", conf.CIPHER_TEXT_PREFIX, base64Str)
return nil
}
func (i *Cluster) DecryptKubeConf(key string) error {
// 判断文本是否已经是明文
if !strings.HasPrefix(i.Data.KubeConfig, conf.CIPHER_TEXT_PREFIX) {
return nil
}
base64CipherText := strings.TrimPrefix(i.Data.KubeConfig, conf.CIPHER_TEXT_PREFIX)
cipherText, err := base64.StdEncoding.DecodeString(base64CipherText)
if err != nil {
return err
}
planText, err := cbc.Decrypt([]byte(cipherText), []byte(key))
if err != nil {
return err
}
i.Data.KubeConfig = string(planText)
return nil
}
func (i *Cluster) Desense() {
if i.Data.KubeConfig != "" {
i.Data.KubeConfig = "****"
}
}
func NewDescribeClusterRequest(id string) *DescribeClusterRequest {
return &DescribeClusterRequest{
Id: id,
}
}
func NewQueryClusterRequest() *QueryClusterRequest {
return &QueryClusterRequest{
Page: request.NewDefaultPageRequest(),
}
}
func NewQueryClusterRequestFromHTTP(r *http.Request) *QueryClusterRequest {
qs := r.URL.Query()
return &QueryClusterRequest{
Page: request.NewPageRequestFromHTTP(r),
Keywords: qs.Get("keywords"),
Vendor: qs.Get("vendor"),
Region: qs.Get("region"),
}
}
func (req *QueryClusterRequest) UpdateNamespace() {
req.Domain = "default"
req.Namespace = "default"
}
func NewPutClusterRequest(id string) *UpdateClusterRequest {
return &UpdateClusterRequest{
Id: id,
UpdateMode: pb_request.UpdateMode_PUT,
UpdateAt: time.Now().UnixMicro(),
Data: NewCreateClusterRequest(),
}
}
func NewPatchClusterRequest(id string) *UpdateClusterRequest {
return &UpdateClusterRequest{
Id: id,
UpdateMode: pb_request.UpdateMode_PATCH,
UpdateAt: time.Now().UnixMicro(),
Data: NewCreateClusterRequest(),
}
}
func NewDeleteClusterRequestWithID(id string) *DeleteClusterRequest {
return &DeleteClusterRequest{
Id: id,
}
}
cluster模块暴露的接口
apps/cluster/interface.go
package cluster
import context "context"
type Service interface {
RPCServer
CreateCluster(context.Context, *CreateClusterRequest) (*Cluster, error)
UpdateCluster(context.Context, *UpdateClusterRequest) (*Cluster, error)
DeleteCluster(context.Context, *DeleteClusterRequest) (*Cluster, error)
}
cluster的逻辑实现模块
apps/cluster/impl/impl.go
将模块实现IOC
package impl
import (
"go.mongodb.org/mongo-driver/mongo"
"github.com/infraboard/mcube/app"
"github.com/infraboard/mcube/logger"
"github.com/infraboard/mcube/logger/zap"
"google.golang.org/grpc"
"github.com/infraboard/mpaas/apps/cluster"
"github.com/infraboard/mpaas/conf"
)
var (
// Service 服务实例
svr = &service{}
)
type service struct {
col *mongo.Collection
log logger.Logger
cluster cluster.Service
cluster.UnimplementedRPCServer
encryptoKey string
}
func (s *service) Config() error {
db, err := conf.C().Mongo.GetDB()
if err != nil {
return err
}
s.col = db.Collection(s.Name())
s.encryptoKey = conf.C().App.EncryptKey
s.log = zap.L().Named(s.Name())
s.cluster = app.GetGrpcApp(cluster.AppName).(cluster.Service)
return nil
}
func (s *service) Name() string {
return cluster.AppName
}
func (s *service) Registry(server *grpc.Server) {
cluster.RegisterRPCServer(server, svr)
}
func init() {
app.RegistryInternalApp(svr)
app.RegistryGrpcApp(svr)
}
apps/cluster/impl/dao.go
cluster与数据库的交互
package impl
import (
"context"
"fmt"
"github.com/infraboard/mpaas/apps/cluster"
"github.com/infraboard/mcube/exception"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
func (s *service) save(ctx context.Context, ins *cluster.Cluster) error {
if _, err := s.col.InsertOne(ctx, ins); err != nil {
return exception.NewInternalServerError("inserted cluster(%s) document error, %s",
ins.Data.Name, err)
}
return nil
}
func (s *service) get(ctx context.Context, id string) (*cluster.Cluster, error) {
filter := bson.M{"_id": id}
ins := cluster.NewDefaultCluster()
if err := s.col.FindOne(ctx, filter).Decode(ins); err != nil {
if err == mongo.ErrNoDocuments {
return nil, exception.NewNotFound("cluster %s not found", id)
}
return nil, exception.NewInternalServerError("find cluster %s error, %s", id, err)
}
return ins, nil
}
func newQueryclusterRequest(r *cluster.QueryClusterRequest) *queryclusterRequest {
return &queryclusterRequest{
r,
}
}
type queryclusterRequest struct {
*cluster.QueryClusterRequest
}
func (r *queryclusterRequest) FindOptions() *options.FindOptions {
pageSize := int64(r.Page.PageSize)
skip := int64(r.Page.PageSize) * int64(r.Page.PageNumber-1)
opt := &options.FindOptions{
Sort: bson.D{
{Key: "create_at", Value: -1},
},
Limit: &pageSize,
Skip: &skip,
}
return opt
}
func (r *queryclusterRequest) FindFilter() bson.M {
filter := bson.M{}
if r.Domain != "" {
filter["data.domain"] = r.Domain
}
if r.Namespace != "" {
filter["data.namespace"] = r.Namespace
}
if r.Vendor != "" {
filter["data.vendor"] = r.Vendor
}
if r.Region != "" {
filter["data.region"] = r.Region
}
if r.Keywords != "" {
filter["$or"] = bson.A{
bson.M{"data.name": bson.M{"$regex": r.Keywords, "$options": "im"}},
}
}
return filter
}
func (s *service) query(ctx context.Context, req *queryclusterRequest) (*cluster.ClusterSet, error) {
s.log.Debugf("find filter: %s", req.FindFilter())
resp, err := s.col.Find(ctx, req.FindFilter(), req.FindOptions())
if err != nil {
return nil, exception.NewInternalServerError("find cluster error, error is %s", err)
}
set := cluster.NewClusterSet()
// 循环
for resp.Next(ctx) {
ins := cluster.NewDefaultCluster()
if err := resp.Decode(ins); err != nil {
return nil, exception.NewInternalServerError("decode cluster error, error is %s", err)
}
ins.Desense()
set.Add(ins)
}
// count
count, err := s.col.CountDocuments(ctx, req.FindFilter())
if err != nil {
return nil, exception.NewInternalServerError("get cluster count error, error is %s", err)
}
set.Total = count
return set, nil
}
func (s *service) update(ctx context.Context, ins *cluster.Cluster) error {
if _, err := s.col.UpdateByID(ctx, ins.Id, ins); err != nil {
return exception.NewInternalServerError("inserted cluster(%s) document error, %s",
ins.Data.Name, err)
}
return nil
}
func (s *service) deletecluster(ctx context.Context, ins *cluster.Cluster) error {
if ins == nil || ins.Id == "" {
return fmt.Errorf("cluster is nil")
}
result, err := s.col.DeleteOne(ctx, bson.M{"_id": ins.Id})
if err != nil {
return exception.NewInternalServerError("delete cluster(%s) error, %s", ins.Id, err)
}
if result.DeletedCount == 0 {
return exception.NewNotFound("cluster %s not found", ins.Id)
}
return nil
}
apps/cluster/impl/cluster.go
集群的操作逻辑
package impl
import (
"context"
"time"
"github.com/infraboard/mcube/exception"
"github.com/infraboard/mcube/pb/request"
"github.com/infraboard/mpaas/apps/cluster"
"github.com/infraboard/mpaas/provider/k8s"
)
// 集群录入
func (s *service) CreateCluster(ctx context.Context, req *cluster.CreateClusterRequest) (
*cluster.Cluster, error) {
ins, err := cluster.NewCluster(req)
if err != nil {
return nil, exception.NewBadRequest("validate create cluster error, %s", err)
}
// 连接集群检查状态
s.checkStatus(ins)
if err := ins.IsAlive(); err != nil {
return nil, err
}
// 加密
err = ins.EncryptKubeConf(s.encryptoKey)
if err != nil {
return nil, err
}
ins.Id = xid.New().String()
if err := s.save(ctx, ins); err != nil {
return nil, err
}
return ins, nil
}
func (s *service) checkStatus(ins *cluster.Cluster) {
client, err := k8s.NewClient(ins.Data.KubeConfig)
if err != nil {
ins.Status.Message = err.Error()
return
}
if ctx := client.CurrentContext(); ctx != nil {
ins.Id = ctx.Cluster
ins.ServerInfo.AuthUser = ctx.AuthInfo
}
if k := client.CurrentCluster(); k != nil {
ins.ServerInfo.Server = k.Server
}
// 检查凭证是否可用
ins.Status.CheckAt = time.Now().UnixMilli()
v, err := client.ServerVersion()
if err != nil {
ins.Status.IsAlive = false
ins.Status.Message = err.Error()
} else {
ins.Status.IsAlive = true
ins.ServerInfo.Version = v
}
}
// 查询集群列表
func (s *service) QueryCluster(ctx context.Context, req *cluster.QueryClusterRequest) (
*cluster.ClusterSet, error) {
query := newQueryclusterRequest(req)
set, err := s.query(ctx, query)
if err != nil {
return nil, err
}
return set, nil
}
// 查询集群详情, 查询出来后进行解密
func (s *service) DescribeCluster(ctx context.Context, req *cluster.DescribeClusterRequest) (
*cluster.Cluster, error) {
ins, err := s.get(ctx, req.Id)
if err != nil {
return nil, err
}
if err := ins.DecryptKubeConf(s.encryptoKey); err != nil {
return nil, err
}
return ins, nil
}
// 集群更新
func (s *service) UpdateCluster(ctx context.Context, req *cluster.UpdateClusterRequest) (
*cluster.Cluster, error) {
ins, err := s.DescribeCluster(ctx, cluster.NewDescribeClusterRequest(req.Id))
if err != nil {
return nil, err
}
// 配置kubeconfig是否有变更
isKubeConfigChanged := req.Data.KubeConfig == ins.Data.KubeConfig
switch req.UpdateMode {
case request.UpdateMode_PUT:
ins.Update(req)
case request.UpdateMode_PATCH:
err := ins.Patch(req)
if err != nil {
return nil, err
}
}
// 校验更新后数据合法性
if err := ins.Data.Validate(); err != nil {
return nil, err
}
// 如果有变更检查集群状态
if isKubeConfigChanged {
s.checkStatus(ins)
}
// 加密
err = ins.EncryptKubeConf(s.encryptoKey)
if err != nil {
return nil, err
}
if err := s.update(ctx, ins); err != nil {
return nil, err
}
return ins, nil
}
// 集群的删除
func (s *service) DeleteCluster(ctx context.Context, req *cluster.DeleteClusterRequest) (
*cluster.Cluster, error) {
ins, err := s.DescribeCluster(ctx, cluster.NewDescribeClusterRequest(req.Id))
if err != nil {
return nil, err
}
if err := s.deletecluster(ctx, ins); err != nil {
return nil, err
}
return ins, nil
}
集群操作方法单元测试
apps/cluster/impl/impl_test.go
加载依赖
var (
impl cluster.Service
ctx = context.Background()
)
func init() {
tools.DevelopmentSetup()
impl = app.GetInternalApp(cluster.AppName).(cluster.Service)
}
测试创建集群信息
func TestCreateCluster(t *testing.T) {
req := cluster.NewCreateClusterRequest()
req.Vendor = "腾讯云"
req.Region = "上海"
req.Name = "生产环境"
kubeConf, err := tools.ReadFile("F:\\go\\project\\0-shizhanxiangmu\\mpaas\\test\\kube_config.yml")
if err != nil {
t.Fatal(err)
}
req.KubeConfig = string(kubeConf)
ins, err := impl.CreateCluster(ctx, req)
if err != nil {
t.Fatal(err)
}
t.Log(ins)
}
测试结果
=== RUN TestCreateCluster
impl_test.go:32: id:"default" create_at:1675498691027224 server_info:{server:"[https://43.143.26.245:6443](https://43.143.26.245:6443)" version:"v1.25.4+k3s1" auth_user:"default"} data:{domain:"default" namespace:"default" vendor:"腾讯云" region:"上海" name:"生产环境" kube_config:"j0re5uxJZgceXwzbk6LGs0dHcYopuSzFCqKhNjM8exUNrKl270uJzhwjqyv9XABkdbG8jZrLtlGNorN0pAsIHfty+"} status:{check_at:1675498691029 is_alive:true}
--- PASS: TestCreateCluster (0.39s)
PASS
测试查询集群
func TestQueryCluster(t *testing.T) {
req := cluster.NewQueryClusterRequest()
set, err := impl.QueryCluster(ctx, req)
if err != nil {
t.Fatal(err)
}
t.Log(set)
}
测试结果
2023-02-04 16:33:04 DEBUG [clusters] impl/dao.go:88 find filter: map[]
impl_test.go:41: total:1 items:{id:"default" create_at:1675498691027224 server_info:{server:"https://43.143.26.245:6443" version:"v1.25.4+k3s1" auth_user:"default"} data:{domain:"default" namespace:"default" vendor:"腾讯云" region:"上海" name:"生产环境" kube_config:"* ***"} status:{check_at:1675498691029 is_alive:true}}
--- PASS: TestQueryCluster (0.02s)
PASS
测试删除集群
func TestDeleteCluster(t *testing.T) {
req := cluster.NewDeleteClusterRequestWithID("cff1mcjha7hkf13qnl10")
set, err := impl.DeleteCluster(ctx, req)
if err != nil {
t.Fatal(err)
}
t.Log(set)
}
测试结果
=== RUN TestDeleteCluster
impl_test.go:50: id:"cff1mcjha7hkf13qnl10" create_at:1675500338781822 server_info:{server:"https://43.143.26.245:6443" version:"v1.25.4+k3s1" auth_user:"default"} data:{domain:"default" namespace:"default" vendor:"腾讯云" region:"上海" name:"生产环境" kube_config:"apiVersion: v 1\r\nclusters:\r\n - cluster:\r\n certificate-authority-data: \r\n server: https://43.143.26.245:6443\r\n name: default\r\ncontexts:\r\n - context:\r\n cluster: default\r\n user: default\r\n name: default\r\ncurrent-context: default\r\nkind: Config\r\npreferences: {}\r\nusers:\r\n - name: default\r\n user:\r\n client-certificate-data: ==\r\n client-key-data: =\r\n"} status:{check_at:1675500338783 is_alive:true}
--- PASS: TestDeleteCluster (0.02s)
PASS
评论区