侧边栏壁纸
博主头像
问道

问道的小花园,总能给你带来惊喜

  • 累计撰写 68 篇文章
  • 累计创建 35 个标签
  • 累计收到 3 条评论

混合云资产管理项目(五)

问道
2023-01-26 / 0 评论 / 0 点赞 / 317 阅读 / 14,003 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2023-01-26,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

对接云商资源,以腾讯云cvm为例。并做到可以分页和限速

获取access key

https://console.cloud.tencent.com/cam

在访问管理→用户→用户列表→新建用户。然后再赋予权限。得到SecretId和SecretKey可以用于api的访问。

封装与腾讯云交互的客户端

provider/tx/region/region.go

查询不同region对应的代码。用于做实验的主机位于上海,所以代码是ap-shanghai

package region

import "github.com/infraboard/cmdb/utils"

var Regions = []utils.EnumDescribe{
  {Name: "Bangkok", Value: "ap-bangkok", Describe: "曼谷"},
  {Name: "Beijing", Value: "ap-beijing", Describe: "北京"},
  {Name: "Chengdu", Value: "ap-chengdu", Describe: "成都"},
  {Name: "Chongqing", Value: "ap-chongqing", Describe: "重庆"},
  {Name: "GuangzhouOpen", Value: "ap-guangzhou-open", Describe: "广州Open"},
  {Name: "Guangzhou", Value: "ap-guangzhou", Describe: "广州"},
  {Name: "HongKong", Value: "ap-hongkong", Describe: "中国香港"},
  {Name: "Mumbai", Value: "ap-mumbai", Describe: "孟买"},
  {Name: "Seoul", Value: "ap-seoul", Describe: "首尔"},
  {Name: "Shanghai", Value: "ap-shanghai", Describe: "上海"},
  {Name: "Nanjing", Value: "ap-nanjing", Describe: "南京"},
  {Name: "ShanghaiFSI", Value: "ap-shanghai-fsi", Describe: "上海金融"},
  {Name: "ShenzhenFSI", Value: "ap-shenzhen-fsi", Describe: "深圳金融"},
  {Name: "Singapore", Value: "ap-singapore", Describe: "新加坡"},
  {Name: "Tokyo", Value: "ap-tokyo", Describe: "东京"},
  {Name: "Frankfurt", Value: "eu-frankfurt", Describe: "法兰克福"},
  {Name: "Moscow", Value: "eu-moscow", Describe: "莫斯科"},
  {Name: "Ashburn", Value: "na-ashburn", Describe: "阿什本"},
  {Name: "SiliconValley", Value: "na-siliconvalley", Describe: "硅谷"},
  {Name: "Toronto", Value: "na-toronto", Describe: "多伦多"},
}

provider/tx/connectivity/client.go

package connectivity

import (
  "fmt"
  billing "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/billing/v20180709"
  "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common"
  "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/profile"
  cvm "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm/v20170312"
  sts "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/sts/v20180813"
)


type TencentCloudClientSecret struct {
  Region    string `env:"TX_CLOUD_REGION"`
  SecretID  string `env:"TX_CLOUD_SECRET_ID"`
  SecretKey string `env:"TX_CLOUD_SECRET_KEY"`
}

func NewTencentCloudClientSecret() *TencentCloudClientSecret {
  return &TencentCloudClientSecret{
    Region:    "ap-shanghai",
    SecretID:  "",//id和key写上自己的
    SecretKey: "",
  }
}

type TencentCloudClient struct {
  Secret   *TencentCloudClientSecret
  cvmConn  *cvm.Client
}

// UseCvmClient cvm
func (me *TencentCloudClient) CvmClient() *cvm.Client {
  if me.cvmConn != nil {
    return me.cvmConn
  }
  Secret := NewTencentCloudClientSecret()

  credential := common.NewCredential(
    Secret.SecretID,
    Secret.SecretKey,
  )

  cpf := profile.NewClientProfile()
  cpf.HttpProfile.ReqMethod = "POST"
  cpf.HttpProfile.ReqTimeout = 300
  cpf.Language = "en-US"

  cvmConn, _ := cvm.NewClient(credential, Secret.Region, cpf)
  me.cvmConn = cvmConn
  return me.cvmConn
}

// 获取客户端账号ID
func (me *TencentCloudClient) Account() (string, error) {
  Secret := NewTencentCloudClientSecret()
  credential := common.NewCredential(
    Secret.SecretID,
    Secret.SecretKey,
  )

  cpf := profile.NewClientProfile()
  cpf.HttpProfile.ReqMethod = "POST"
  cpf.HttpProfile.ReqTimeout = 300
  cpf.Language = "en-US"

  stsConn, _ := sts.NewClient(credential, Secret.Region, cpf)

  req := sts.NewGetCallerIdentityRequest()

  resp, err := stsConn.GetCallerIdentity(req)
  if err != nil {
    return "", fmt.Errorf("unable to initialize the STS client: %#v", err)
  }

  return *resp.Response.AccountId, nil
}

测试构造的客户端能否正常连接

provider/tx/connectivity/client_test.go

package connectivity_test

import (
  "testing"

  "gitee.com/wendao365/mycmdb/provider/tx/connectivity"
)

func TestClient(t *testing.T) {
  req := new(connectivity.TencentCloudClient)
  rps, err := req.Account()
  if err != nil {
    panic(err)
  }
  t.Log(rps)
}

测试成功,可以正常返回账号ID

=== RUN   TestClient
    client_test.go:15: 100009615835
--- PASS: TestClient (0.20s)
PASS

腾讯云cvm资源同步—单页查询

provider/tx/cvm/operator.go

初始化操作器

package cvm

import (
  "github.com/infraboard/mcube/logger"
  "github.com/infraboard/mcube/logger/zap"
  cvm "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm/v20170312"
)

// 与云商交换的操作器
type CVMOperator struct {
  Client  *cvm.Client
  log     logger.Logger
  account string
}

func NewCVMOperator(client *cvm.Client, account string) *CVMOperator {
  return &CVMOperator{
    Client:  client,
    log:     zap.L().Named("provider.txyun.cvm"),
    account: account,
  }
}

provider/tx/cvm/cvm.go

获取的数据是json的格式,需要进行装换

func (o *CVMOperator) transferSet(items []*cvm.Instance) *host.HostSet {
  set := host.NewHostSet()
  for i := range items {
    set.Add(o.transferOne(items[i]))
  }
  return set
}

func (o *CVMOperator) transferOne(ins *cvm.Instance) *host.Host {
  r := host.NewDefaultHost()
  b := r.Resource.Meta
  b.CreateAt = utils.ParseDefaultSecondTime(utils.PtrStrV(ins.CreatedTime))
  b.ResourceId = utils.PtrStrV(ins.InstanceId)
  b.SerialNumber = utils.PtrStrV(ins.Uuid)

  i := r.Resource.Spec
  i.Vendor = resource.VENDOR_TENCENT
  i.Region = o.client.GetRegion()
  i.Zone = utils.PtrStrV(ins.Placement.Zone)
  i.Owner = o.account
  i.ExpireAt = utils.ParseDefaultSecondTime(utils.PtrStrV(ins.ExpiredTime))
  i.Type = utils.PtrStrV(ins.InstanceType)
  i.Name = utils.PtrStrV(ins.InstanceName)

  if ins.InternetAccessible != nil {
    i.BandWidth = int32(tea.Int64Value(ins.InternetAccessible.InternetMaxBandwidthOut))
  }
  i.Cpu = int32(utils.PtrInt64(ins.CPU))
  i.Memory = int32(utils.PtrInt64(ins.Memory))

  pm := mapping.PrasePayMode(tea.StringValue(ins.InstanceChargeType))
  r.Resource.Cost.PayMode = &pm
  r.Resource.Status.PublicIp = strings.Join(utils.SlicePtrStrv(ins.PublicIpAddresses), ",")
  r.Resource.Status.PrivateIp = strings.Join(utils.SlicePtrStrv(ins.PrivateIpAddresses), ",")
  r.Resource.Status.Phase = utils.PtrStrV(ins.InstanceState)

  r.Resource.Tags = transferTags(ins.Tags)

  r.Spec.OsName = utils.PtrStrV(ins.OsName)

  r.Spec.ImageId = utils.PtrStrV(ins.ImageId)
  return r
}

// 对transferOne方法中的tag进行转换
func transferTags(tags []*cvm.Tag) (ret []*resource.Tag) {
  for i := range tags {
    ret = append(ret, resource.NewGroupTag(
      utils.PtrStrV(tags[i].Key),
      utils.PtrStrV(tags[i].Value)),
    )
  }
  return
}

转换过程中,需要对指针值,时间格式及付费模式进行处理

utils/ptr_value.go

补充工具,读取指针的值,对transferOne中转换的云商数据进行检验

package utils

func PtrStrV(v *string) string {
  if v == nil {
    return ""
  }

  return *v
}

func PtrInt64(v *int64) int64 {
  if v == nil {
    return 0
  }

  return *v
}

func PtrInt32(v *int32) int32 {
  if v == nil {
    return 0
  }

  return *v
}

func PtrFloat64(v *float64) float64 {
  if v == nil {
    return 0
  }

  return *v
}

func SlicePtrStrv(items []*string) []string {
  vs := []string{}
  for i := range items {
    v := PtrStrV(items[i])
    if v != "" {
      vs = append(vs, v)
    }
  }

  return vs
}

utils/time.go

对时间进行格式转换

import (
  "strings"
  "time"

  "github.com/infraboard/mcube/logger/zap"
)

const (
  ISO8601_FORMAT             = "2006-01-02T15:04:05Z"
  DEFAULT_TIME_MINITE_FORMAT = "2006-01-02T15:04Z"
  TIME_SECOND_FORMAT_MOD1    = "2006-01-02 15:04:05"
)

func ParseDefaultSecondTime(t string) int64 {
  return ParseTime(ISO8601_FORMAT, t)
}

func ParseDefaultMiniteTime(t string) int64 {
  return ParseTime(DEFAULT_TIME_MINITE_FORMAT, t)
}

func ParseSecondMod1Time(t string) int64 {
  if t == "0000-00-00 00:00:00" {
    return 0
  }
  return ParseTime(TIME_SECOND_FORMAT_MOD1, t)
}

func ParseTime(format, t string) int64 {
  t = strings.TrimSpace(t)
  if t == "" {
    return 0
  }

  ts, err := time.Parse(format, t)
  if err != nil {
    zap.L().Errorf("parse time %s error, %s", t, err)
    return 0
  }

  return ts.Unix()
}

provider/tx/mapping/paymod.go

对付费模式进行转换

var (
  // CDB付费类型,可能的返回值:0-包年包月;1-按量计费
  // 负载均衡实例的计费类型,PREPAID:包年包月,POSTPAID_BY_HOUR:按量计费
  // 实例计费模式。取值范围:<br><li>`PREPAID`:表示预付费,即包年包月<br><li>`POSTPAID_BY_HOUR`:表示后付费,即按量计费<br><li>`CDHPAID`:`CDH`付费,即只对`CDH`计费,不对`CDH`上的实例计费。<br><li>`SPOTPAID`:表示竞价实例付费。
  // 磁盘付费模式。取值范围:<br><li>PREPAID:预付费,即包年包月<br><li>POSTPAID_BY_HOUR:后付费,即按量计费。
  // 弹性公网IP的网络计费模式。注意,传统账户类型账户的弹性公网IP没有网络计费模式属性,值为空。
  // 注意:此字段可能返回 null,表示取不到有效值。
  // 包括:
  // <li><strong>BANDWIDTH_PREPAID_BY_MONTH</strong></li>
  // <p style="padding-left: 30px;">表示包月带宽预付费。</p>
  // <li><strong>TRAFFIC_POSTPAID_BY_HOUR</strong></li>
  // <p style="padding-left: 30px;">表示按小时流量后付费。</p>
  // <li><strong>BANDWIDTH_POSTPAID_BY_HOUR</strong></li>
  // <p style="padding-left: 30px;">表示按小时带宽后付费。</p>
  // <li><strong>BANDWIDTH_PACKAGE</strong></li>
  // <p style="padding-left: 30px;">表示共享带宽包。</p>
  // 注意:此字段可能返回 null,表示取不到有效值。

  // 订单付费模式:prePay 预付费 postPay后付费 riPay预留实例
  PAY_TYPE_STATUS_MAP = map[string]resource.PayMode{
    "包年包月":             resource.PayMode_PRE_PAY,
    "0":                resource.PayMode_PRE_PAY,
    "PREPAID":          resource.PayMode_PRE_PAY,
    "prePay":           resource.PayMode_PRE_PAY,
    "1":                resource.PayMode_POST_PAY,
    "POSTPAID_BY_HOUR": resource.PayMode_POST_PAY,
    "postPay":          resource.PayMode_POST_PAY,
    "SPOTPAID":         resource.PayMode_POST_PAY,
    "按量计费":             resource.PayMode_POST_PAY,
    "riPay":            resource.PayMode_RESERVED_PAY,
  }
)

func PrasePayMode(s string) resource.PayMode {
  if v, ok := PAY_TYPE_STATUS_MAP[s]; ok {
    return v
  }

  return resource.PayMode_PRE_PAY
}

provider/tx/cvm/cvm.go

补充查询云商资源的具体实现

// 查询cvm列表(只拉去一页), 查询完成后转换为标准的Host对象
// https://console.cloud.tencent.com/api/explorer?Product=cvm&Version=2017-03-12&Action=DescribeInstances
func (o *CVMOperator) QueryCvm(ctx context.Context, req *cvm.DescribeInstancesRequest) (*host.HostSet, error) {
  // 1.调用cvm客户端 查询cvm实例列表
  // 返回的resp是一个DescribeInstancesResponse的实例,与请求对象对应
  response, err := o.client.DescribeInstances(req)
  if _, ok := err.(*errors.TencentCloudSDKError); ok {
    return nil, err
  }

  // 2.转换为cdmb Host对象
  set := o.transferSet(response.Response.InstanceSet)
  return set, nil
}

provider/tx/cvm/operator_test.go

单页cvm资源查询,先加载依赖

package cvm_test

import (
  "context"
  "fmt"
  "gitee.com/wendao365/mycmdb/apps/host"
  "gitee.com/wendao365/mycmdb/provider/tx/connectivity"
  "gitee.com/wendao365/mycmdb/provider/tx/cvm"
  "github.com/infraboard/mcube/logger/zap"
  "testing"
)

var (
  op  *cvm.CVMOperator
  ctx = context.Background()
)

func init() {
  zap.DevelopmentSetup()

  req := new(connectivity.TencentCloudClient)
  ac, err := req.Account()
  if err != nil {
    panic(err)
  }

  op = cvm.NewCVMOperator(req.CvmClient(), ac)
}

func TestQueryCvm(t *testing.T) {
  req := cvm.NewDescribeInstancesRequest()
  set, err := op.QueryCvm(ctx, req)
  if err != nil {
    t.Fatal(err)
  }
  t.Log(set)
}

测试结果

=== RUN   TestQueryCvm
    operator_test.go:36: items:{resource:{meta:{resource_id:"ins-rbcz77vt" create_at:1604505186 serial_number:"5332f8eb-65c4-4612-ae94-3e2b8b6591be"} spec:{vendor:TENCENT region:"ap-shanghai" zone:"ap-shanghai-2" owner:"100009615835" name:"zhiqi_cloud" type:"S4.SMALL2" expire_at:1699545186 cpu:1 memory:2 band_width:1} cost:{pay_mode:PRE_PAY} status:{phase:"RUNNING" public_ip:"182.254.217.98" private_ip:"172.17.0.6"}} spec:{os_name:"CentOS 7.8 64bit" image_id:"img-3la7wgnt"}}
--- PASS: TestQueryCvm (0.81s)

腾讯云cvm资源同步—分页控速查询

分页判断采取是否有下一页的方式来判断, 比如请求的当前页是20条,表示还有下一页, 0-19条代表没有下一页。

provider/tx/cvm/cvm.go

构建分页请求,带上默认的分页参数

// 带分页功能的查询
// 要把所有资源全部同步下来, 需要客户端拉起所有页面的实例数据
// 我们需要实现一个带分页查询功能的一个客户端, 这个客户端能把所有的页面都查询完
// 云商由速率限制? 比如每秒请求10页数据, 可以超过速率限制
// pager里面是需要控制 云商接口的访问频率

func (o *CVMOperator) PageCvmQuery() *CvmPager {
  return NewCvmPager(o, &CvmPagerRequest{PageSize: 20, PageNumber: 1})
}

provider/tx/cvm/cvm_pagger.go

使用令牌桶进行限流,算法描述:

  • 假如用户配置的平均发送速率为r,则每隔1/r秒一个令牌被加入到桶中(每秒会有r个令牌放入桶中);
  • 假设桶中最多可以存放b个令牌。如果令牌到达时令牌桶已经满了,那么这个令牌会被丢弃;
  • 服务请求时,尝试从桶里面获取一个token, 可以里面返回失败,或者直到有可用token放入

具体实现可以直接调包

type CvmPagerRequest struct {
  PageSize   int64
  PageNumber int64
}

func (req *CvmPagerRequest) Offset() int64 {
  return (req.PageNumber - 1) * req.PageSize
}

// 跟web界面逻辑一页
//  1. 从第一页开始查询, 100
//  2. 一直点Next(下一页)? 什么时候就结束了,没有下一页了喃?
//     2.1  req.page_number+1
//
// 3.
//
//    3.1. 根据Total 和当前已经的offset进行比较, 需要云商返回的Total要准确
//   3.2. 根据当前页是否满了, 比如一页20  0-19(代表没有下一页)
type CvmPager struct {
  op      *CVMOperator
  req     *cvm.DescribeInstancesRequest
  log     logger.Logger
  tb      *tokenbucket.Bucket
  hasNext bool
}

func NewCvmPager(op *CVMOperator, args *CvmPagerRequest) *CvmPager {
  req := cvm.NewDescribeInstancesRequest()
  req.Limit = &args.PageSize

  offset := args.Offset()
  req.Offset = &offset

  return &CvmPager{
    // 默认是由下一页的
    hasNext: true,
    op:      op,
    req:     req,
    log:     zap.L().Named("pager.cvm"),
    tb:      tokenbucket.NewBucketWithRate(0.1, 1),
  }
}

// 判断是否由下一页
func (c *CvmPager) Next() bool {

  if c.hasNext {
    c.tb.Wait(1)
    return true
  }
  // 等待知道有多个令牌可用,直接取出
  // time.Sleep(1 * time.Second)
  return false
}

// 请求该页的数据
func (c *CvmPager) Scan(ctx context.Context, set *host.HostSet) error {
  resp, err := c.op.QueryCvm(ctx, c.req)
  if err != nil {
    return err
  }
  set.Items = resp.Items

  // 修正Next的结果,tea是把指针读成值
  c.log.Debugf("resp length: %d, page size: %d", resp.Length(), tea.Int64Value(c.req.Limit))
  if resp.Length() < int(tea.Int64Value(c.req.Limit)) {
    c.hasNext = false
  } else {
    // 调整到下一页
    offset := tea.Int64Value(c.req.Offset) + tea.Int64Value(c.req.Limit)
    c.req.Offset = &offset
    c.hasNext = true
  }

  return nil
}

分页查询单元测试

provider/tx/cvm/operator_test.go

// 带分页功能的查询
func TestPageCvmQuery(t *testing.T) {
  // req := cvm.NewDescribeInstancesRequest()
  pager := op.PageCvmQuery()
  for pager.Next() {
    set := host.NewHostSet()
    if err := pager.Scan(ctx, set); err != nil {
      t.Fatal(err)
    }
    fmt.Println(set)
  }
}

测试结果,测试成功,会显示查询多少页,多少条

=== RUN   TestPageCvmQuery
2023-01-26 18:21:12  DEBUG  [pager.cvm]  cvm/cvm_pagger.go:77  resp length: 1, page size: 20
items:{resource:{meta:{resource_id:"ins-rbcz77vt" create_at:1604505186 serial_number:"5332f8eb-65c4-4612-ae94-3e2b8b6591be"} spec:{vendor:TENCENT region:"ap-shanghai" zone:"ap-shanghai-2" owner:"100009615835" name:"zhiqi_cloud" type:"S4.SMALL2" expire_at:1699545186 cpu:1 memory:2 band_width:1} cost:{pay_mode:PRE_PAY} status:{phase:"RUNNING" public_ip:"182.254.217.98" private_ip:"172.17.0.6"}} spec:{os_name:"CentOS 7.8 64bit" image_id:"img-3la7wgnt"}}
--- PASS: TestPageCvmQuery (0.51s)
PASS
0

评论区