对接云商资源,以腾讯云cvm为例。并做到可以分页和限速
获取access key
https://console.cloud.tencent.com/cam
在访问管理→用户→用户列表→新建用户。然后再赋予权限。得到SecretId和SecretKey可以用于api的访问。
封装与腾讯云交互的客户端
provider/tx/region/region.go
查询不同region对应的代码。用于做实验的主机位于上海,所以代码是ap-shanghai
package region
import "github.com/infraboard/cmdb/utils"
var Regions = []utils.EnumDescribe{
{Name: "Bangkok", Value: "ap-bangkok", Describe: "曼谷"},
{Name: "Beijing", Value: "ap-beijing", Describe: "北京"},
{Name: "Chengdu", Value: "ap-chengdu", Describe: "成都"},
{Name: "Chongqing", Value: "ap-chongqing", Describe: "重庆"},
{Name: "GuangzhouOpen", Value: "ap-guangzhou-open", Describe: "广州Open"},
{Name: "Guangzhou", Value: "ap-guangzhou", Describe: "广州"},
{Name: "HongKong", Value: "ap-hongkong", Describe: "中国香港"},
{Name: "Mumbai", Value: "ap-mumbai", Describe: "孟买"},
{Name: "Seoul", Value: "ap-seoul", Describe: "首尔"},
{Name: "Shanghai", Value: "ap-shanghai", Describe: "上海"},
{Name: "Nanjing", Value: "ap-nanjing", Describe: "南京"},
{Name: "ShanghaiFSI", Value: "ap-shanghai-fsi", Describe: "上海金融"},
{Name: "ShenzhenFSI", Value: "ap-shenzhen-fsi", Describe: "深圳金融"},
{Name: "Singapore", Value: "ap-singapore", Describe: "新加坡"},
{Name: "Tokyo", Value: "ap-tokyo", Describe: "东京"},
{Name: "Frankfurt", Value: "eu-frankfurt", Describe: "法兰克福"},
{Name: "Moscow", Value: "eu-moscow", Describe: "莫斯科"},
{Name: "Ashburn", Value: "na-ashburn", Describe: "阿什本"},
{Name: "SiliconValley", Value: "na-siliconvalley", Describe: "硅谷"},
{Name: "Toronto", Value: "na-toronto", Describe: "多伦多"},
}
provider/tx/connectivity/client.go
package connectivity
import (
"fmt"
billing "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/billing/v20180709"
"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common"
"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/profile"
cvm "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm/v20170312"
sts "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/sts/v20180813"
)
type TencentCloudClientSecret struct {
Region string `env:"TX_CLOUD_REGION"`
SecretID string `env:"TX_CLOUD_SECRET_ID"`
SecretKey string `env:"TX_CLOUD_SECRET_KEY"`
}
func NewTencentCloudClientSecret() *TencentCloudClientSecret {
return &TencentCloudClientSecret{
Region: "ap-shanghai",
SecretID: "",//id和key写上自己的
SecretKey: "",
}
}
type TencentCloudClient struct {
Secret *TencentCloudClientSecret
cvmConn *cvm.Client
}
// UseCvmClient cvm
func (me *TencentCloudClient) CvmClient() *cvm.Client {
if me.cvmConn != nil {
return me.cvmConn
}
Secret := NewTencentCloudClientSecret()
credential := common.NewCredential(
Secret.SecretID,
Secret.SecretKey,
)
cpf := profile.NewClientProfile()
cpf.HttpProfile.ReqMethod = "POST"
cpf.HttpProfile.ReqTimeout = 300
cpf.Language = "en-US"
cvmConn, _ := cvm.NewClient(credential, Secret.Region, cpf)
me.cvmConn = cvmConn
return me.cvmConn
}
// 获取客户端账号ID
func (me *TencentCloudClient) Account() (string, error) {
Secret := NewTencentCloudClientSecret()
credential := common.NewCredential(
Secret.SecretID,
Secret.SecretKey,
)
cpf := profile.NewClientProfile()
cpf.HttpProfile.ReqMethod = "POST"
cpf.HttpProfile.ReqTimeout = 300
cpf.Language = "en-US"
stsConn, _ := sts.NewClient(credential, Secret.Region, cpf)
req := sts.NewGetCallerIdentityRequest()
resp, err := stsConn.GetCallerIdentity(req)
if err != nil {
return "", fmt.Errorf("unable to initialize the STS client: %#v", err)
}
return *resp.Response.AccountId, nil
}
测试构造的客户端能否正常连接
provider/tx/connectivity/client_test.go
package connectivity_test
import (
"testing"
"gitee.com/wendao365/mycmdb/provider/tx/connectivity"
)
func TestClient(t *testing.T) {
req := new(connectivity.TencentCloudClient)
rps, err := req.Account()
if err != nil {
panic(err)
}
t.Log(rps)
}
测试成功,可以正常返回账号ID
=== RUN TestClient
client_test.go:15: 100009615835
--- PASS: TestClient (0.20s)
PASS
腾讯云cvm资源同步—单页查询
provider/tx/cvm/operator.go
初始化操作器
package cvm
import (
"github.com/infraboard/mcube/logger"
"github.com/infraboard/mcube/logger/zap"
cvm "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/cvm/v20170312"
)
// 与云商交换的操作器
type CVMOperator struct {
Client *cvm.Client
log logger.Logger
account string
}
func NewCVMOperator(client *cvm.Client, account string) *CVMOperator {
return &CVMOperator{
Client: client,
log: zap.L().Named("provider.txyun.cvm"),
account: account,
}
}
provider/tx/cvm/cvm.go
获取的数据是json的格式,需要进行装换
func (o *CVMOperator) transferSet(items []*cvm.Instance) *host.HostSet {
set := host.NewHostSet()
for i := range items {
set.Add(o.transferOne(items[i]))
}
return set
}
func (o *CVMOperator) transferOne(ins *cvm.Instance) *host.Host {
r := host.NewDefaultHost()
b := r.Resource.Meta
b.CreateAt = utils.ParseDefaultSecondTime(utils.PtrStrV(ins.CreatedTime))
b.ResourceId = utils.PtrStrV(ins.InstanceId)
b.SerialNumber = utils.PtrStrV(ins.Uuid)
i := r.Resource.Spec
i.Vendor = resource.VENDOR_TENCENT
i.Region = o.client.GetRegion()
i.Zone = utils.PtrStrV(ins.Placement.Zone)
i.Owner = o.account
i.ExpireAt = utils.ParseDefaultSecondTime(utils.PtrStrV(ins.ExpiredTime))
i.Type = utils.PtrStrV(ins.InstanceType)
i.Name = utils.PtrStrV(ins.InstanceName)
if ins.InternetAccessible != nil {
i.BandWidth = int32(tea.Int64Value(ins.InternetAccessible.InternetMaxBandwidthOut))
}
i.Cpu = int32(utils.PtrInt64(ins.CPU))
i.Memory = int32(utils.PtrInt64(ins.Memory))
pm := mapping.PrasePayMode(tea.StringValue(ins.InstanceChargeType))
r.Resource.Cost.PayMode = &pm
r.Resource.Status.PublicIp = strings.Join(utils.SlicePtrStrv(ins.PublicIpAddresses), ",")
r.Resource.Status.PrivateIp = strings.Join(utils.SlicePtrStrv(ins.PrivateIpAddresses), ",")
r.Resource.Status.Phase = utils.PtrStrV(ins.InstanceState)
r.Resource.Tags = transferTags(ins.Tags)
r.Spec.OsName = utils.PtrStrV(ins.OsName)
r.Spec.ImageId = utils.PtrStrV(ins.ImageId)
return r
}
// 对transferOne方法中的tag进行转换
func transferTags(tags []*cvm.Tag) (ret []*resource.Tag) {
for i := range tags {
ret = append(ret, resource.NewGroupTag(
utils.PtrStrV(tags[i].Key),
utils.PtrStrV(tags[i].Value)),
)
}
return
}
转换过程中,需要对指针值,时间格式及付费模式进行处理
utils/ptr_value.go
补充工具,读取指针的值,对transferOne中转换的云商数据进行检验
package utils
func PtrStrV(v *string) string {
if v == nil {
return ""
}
return *v
}
func PtrInt64(v *int64) int64 {
if v == nil {
return 0
}
return *v
}
func PtrInt32(v *int32) int32 {
if v == nil {
return 0
}
return *v
}
func PtrFloat64(v *float64) float64 {
if v == nil {
return 0
}
return *v
}
func SlicePtrStrv(items []*string) []string {
vs := []string{}
for i := range items {
v := PtrStrV(items[i])
if v != "" {
vs = append(vs, v)
}
}
return vs
}
utils/time.go
对时间进行格式转换
import (
"strings"
"time"
"github.com/infraboard/mcube/logger/zap"
)
const (
ISO8601_FORMAT = "2006-01-02T15:04:05Z"
DEFAULT_TIME_MINITE_FORMAT = "2006-01-02T15:04Z"
TIME_SECOND_FORMAT_MOD1 = "2006-01-02 15:04:05"
)
func ParseDefaultSecondTime(t string) int64 {
return ParseTime(ISO8601_FORMAT, t)
}
func ParseDefaultMiniteTime(t string) int64 {
return ParseTime(DEFAULT_TIME_MINITE_FORMAT, t)
}
func ParseSecondMod1Time(t string) int64 {
if t == "0000-00-00 00:00:00" {
return 0
}
return ParseTime(TIME_SECOND_FORMAT_MOD1, t)
}
func ParseTime(format, t string) int64 {
t = strings.TrimSpace(t)
if t == "" {
return 0
}
ts, err := time.Parse(format, t)
if err != nil {
zap.L().Errorf("parse time %s error, %s", t, err)
return 0
}
return ts.Unix()
}
provider/tx/mapping/paymod.go
对付费模式进行转换
var (
// CDB付费类型,可能的返回值:0-包年包月;1-按量计费
// 负载均衡实例的计费类型,PREPAID:包年包月,POSTPAID_BY_HOUR:按量计费
// 实例计费模式。取值范围:<br><li>`PREPAID`:表示预付费,即包年包月<br><li>`POSTPAID_BY_HOUR`:表示后付费,即按量计费<br><li>`CDHPAID`:`CDH`付费,即只对`CDH`计费,不对`CDH`上的实例计费。<br><li>`SPOTPAID`:表示竞价实例付费。
// 磁盘付费模式。取值范围:<br><li>PREPAID:预付费,即包年包月<br><li>POSTPAID_BY_HOUR:后付费,即按量计费。
// 弹性公网IP的网络计费模式。注意,传统账户类型账户的弹性公网IP没有网络计费模式属性,值为空。
// 注意:此字段可能返回 null,表示取不到有效值。
// 包括:
// <li><strong>BANDWIDTH_PREPAID_BY_MONTH</strong></li>
// <p style="padding-left: 30px;">表示包月带宽预付费。</p>
// <li><strong>TRAFFIC_POSTPAID_BY_HOUR</strong></li>
// <p style="padding-left: 30px;">表示按小时流量后付费。</p>
// <li><strong>BANDWIDTH_POSTPAID_BY_HOUR</strong></li>
// <p style="padding-left: 30px;">表示按小时带宽后付费。</p>
// <li><strong>BANDWIDTH_PACKAGE</strong></li>
// <p style="padding-left: 30px;">表示共享带宽包。</p>
// 注意:此字段可能返回 null,表示取不到有效值。
// 订单付费模式:prePay 预付费 postPay后付费 riPay预留实例
PAY_TYPE_STATUS_MAP = map[string]resource.PayMode{
"包年包月": resource.PayMode_PRE_PAY,
"0": resource.PayMode_PRE_PAY,
"PREPAID": resource.PayMode_PRE_PAY,
"prePay": resource.PayMode_PRE_PAY,
"1": resource.PayMode_POST_PAY,
"POSTPAID_BY_HOUR": resource.PayMode_POST_PAY,
"postPay": resource.PayMode_POST_PAY,
"SPOTPAID": resource.PayMode_POST_PAY,
"按量计费": resource.PayMode_POST_PAY,
"riPay": resource.PayMode_RESERVED_PAY,
}
)
func PrasePayMode(s string) resource.PayMode {
if v, ok := PAY_TYPE_STATUS_MAP[s]; ok {
return v
}
return resource.PayMode_PRE_PAY
}
provider/tx/cvm/cvm.go
补充查询云商资源的具体实现
// 查询cvm列表(只拉去一页), 查询完成后转换为标准的Host对象
// https://console.cloud.tencent.com/api/explorer?Product=cvm&Version=2017-03-12&Action=DescribeInstances
func (o *CVMOperator) QueryCvm(ctx context.Context, req *cvm.DescribeInstancesRequest) (*host.HostSet, error) {
// 1.调用cvm客户端 查询cvm实例列表
// 返回的resp是一个DescribeInstancesResponse的实例,与请求对象对应
response, err := o.client.DescribeInstances(req)
if _, ok := err.(*errors.TencentCloudSDKError); ok {
return nil, err
}
// 2.转换为cdmb Host对象
set := o.transferSet(response.Response.InstanceSet)
return set, nil
}
provider/tx/cvm/operator_test.go
单页cvm资源查询,先加载依赖
package cvm_test
import (
"context"
"fmt"
"gitee.com/wendao365/mycmdb/apps/host"
"gitee.com/wendao365/mycmdb/provider/tx/connectivity"
"gitee.com/wendao365/mycmdb/provider/tx/cvm"
"github.com/infraboard/mcube/logger/zap"
"testing"
)
var (
op *cvm.CVMOperator
ctx = context.Background()
)
func init() {
zap.DevelopmentSetup()
req := new(connectivity.TencentCloudClient)
ac, err := req.Account()
if err != nil {
panic(err)
}
op = cvm.NewCVMOperator(req.CvmClient(), ac)
}
func TestQueryCvm(t *testing.T) {
req := cvm.NewDescribeInstancesRequest()
set, err := op.QueryCvm(ctx, req)
if err != nil {
t.Fatal(err)
}
t.Log(set)
}
测试结果
=== RUN TestQueryCvm
operator_test.go:36: items:{resource:{meta:{resource_id:"ins-rbcz77vt" create_at:1604505186 serial_number:"5332f8eb-65c4-4612-ae94-3e2b8b6591be"} spec:{vendor:TENCENT region:"ap-shanghai" zone:"ap-shanghai-2" owner:"100009615835" name:"zhiqi_cloud" type:"S4.SMALL2" expire_at:1699545186 cpu:1 memory:2 band_width:1} cost:{pay_mode:PRE_PAY} status:{phase:"RUNNING" public_ip:"182.254.217.98" private_ip:"172.17.0.6"}} spec:{os_name:"CentOS 7.8 64bit" image_id:"img-3la7wgnt"}}
--- PASS: TestQueryCvm (0.81s)
腾讯云cvm资源同步—分页控速查询
分页判断采取是否有下一页的方式来判断, 比如请求的当前页是20条,表示还有下一页, 0-19条代表没有下一页。
provider/tx/cvm/cvm.go
构建分页请求,带上默认的分页参数
// 带分页功能的查询
// 要把所有资源全部同步下来, 需要客户端拉起所有页面的实例数据
// 我们需要实现一个带分页查询功能的一个客户端, 这个客户端能把所有的页面都查询完
// 云商由速率限制? 比如每秒请求10页数据, 可以超过速率限制
// pager里面是需要控制 云商接口的访问频率
func (o *CVMOperator) PageCvmQuery() *CvmPager {
return NewCvmPager(o, &CvmPagerRequest{PageSize: 20, PageNumber: 1})
}
provider/tx/cvm/cvm_pagger.go
使用令牌桶进行限流,算法描述:
- 假如用户配置的平均发送速率为r,则每隔1/r秒一个令牌被加入到桶中(每秒会有r个令牌放入桶中);
- 假设桶中最多可以存放b个令牌。如果令牌到达时令牌桶已经满了,那么这个令牌会被丢弃;
- 服务请求时,尝试从桶里面获取一个token, 可以里面返回失败,或者直到有可用token放入
具体实现可以直接调包
type CvmPagerRequest struct {
PageSize int64
PageNumber int64
}
func (req *CvmPagerRequest) Offset() int64 {
return (req.PageNumber - 1) * req.PageSize
}
// 跟web界面逻辑一页
// 1. 从第一页开始查询, 100
// 2. 一直点Next(下一页)? 什么时候就结束了,没有下一页了喃?
// 2.1 req.page_number+1
//
// 3.
//
// 3.1. 根据Total 和当前已经的offset进行比较, 需要云商返回的Total要准确
// 3.2. 根据当前页是否满了, 比如一页20 0-19(代表没有下一页)
type CvmPager struct {
op *CVMOperator
req *cvm.DescribeInstancesRequest
log logger.Logger
tb *tokenbucket.Bucket
hasNext bool
}
func NewCvmPager(op *CVMOperator, args *CvmPagerRequest) *CvmPager {
req := cvm.NewDescribeInstancesRequest()
req.Limit = &args.PageSize
offset := args.Offset()
req.Offset = &offset
return &CvmPager{
// 默认是由下一页的
hasNext: true,
op: op,
req: req,
log: zap.L().Named("pager.cvm"),
tb: tokenbucket.NewBucketWithRate(0.1, 1),
}
}
// 判断是否由下一页
func (c *CvmPager) Next() bool {
if c.hasNext {
c.tb.Wait(1)
return true
}
// 等待知道有多个令牌可用,直接取出
// time.Sleep(1 * time.Second)
return false
}
// 请求该页的数据
func (c *CvmPager) Scan(ctx context.Context, set *host.HostSet) error {
resp, err := c.op.QueryCvm(ctx, c.req)
if err != nil {
return err
}
set.Items = resp.Items
// 修正Next的结果,tea是把指针读成值
c.log.Debugf("resp length: %d, page size: %d", resp.Length(), tea.Int64Value(c.req.Limit))
if resp.Length() < int(tea.Int64Value(c.req.Limit)) {
c.hasNext = false
} else {
// 调整到下一页
offset := tea.Int64Value(c.req.Offset) + tea.Int64Value(c.req.Limit)
c.req.Offset = &offset
c.hasNext = true
}
return nil
}
分页查询单元测试
provider/tx/cvm/operator_test.go
// 带分页功能的查询
func TestPageCvmQuery(t *testing.T) {
// req := cvm.NewDescribeInstancesRequest()
pager := op.PageCvmQuery()
for pager.Next() {
set := host.NewHostSet()
if err := pager.Scan(ctx, set); err != nil {
t.Fatal(err)
}
fmt.Println(set)
}
}
测试结果,测试成功,会显示查询多少页,多少条
=== RUN TestPageCvmQuery
2023-01-26 18:21:12 DEBUG [pager.cvm] cvm/cvm_pagger.go:77 resp length: 1, page size: 20
items:{resource:{meta:{resource_id:"ins-rbcz77vt" create_at:1604505186 serial_number:"5332f8eb-65c4-4612-ae94-3e2b8b6591be"} spec:{vendor:TENCENT region:"ap-shanghai" zone:"ap-shanghai-2" owner:"100009615835" name:"zhiqi_cloud" type:"S4.SMALL2" expire_at:1699545186 cpu:1 memory:2 band_width:1} cost:{pay_mode:PRE_PAY} status:{phase:"RUNNING" public_ip:"182.254.217.98" private_ip:"172.17.0.6"}} spec:{os_name:"CentOS 7.8 64bit" image_id:"img-3la7wgnt"}}
--- PASS: TestPageCvmQuery (0.51s)
PASS
评论区