diff --git a/client_etl_job.go b/client_etl_job.go new file mode 100644 index 00000000..e3691ac4 --- /dev/null +++ b/client_etl_job.go @@ -0,0 +1,213 @@ +package sls + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "time" +) + +type ETL struct { + Configuration ETLConfiguration `json:"configuration"` + Description string `json:"description"` + DisplayName string `json:"displayName"` + Name string `json:"name"` + Schedule ETLSchedule `json:"schedule"` + Type string `json:"type"` + Status string `json:"status"` + CreateTime int32 `json:"createTime,omitempty"` + LastModifiedTime int32 `json:"lastModifiedTime,omitempty"` +} + +type ETLConfiguration struct { + AccessKeyId string `json:"accessKeyId"` + AccessKeySecret string `json:"accessKeySecret"` + FromTime int64 `json:"fromTime"` + Logstore string `json:"logstore"` + Parameters map[string]string `json:"parameters"` + RoleArn string `json:"roleArn,omitempty"` + Script string `json:"script"` + ToTime int32 `json:"toTime"` + Version int8 `json:"version"` + ETLSinks []ETLSink `json:"sinks"` +} + +type ETLSchedule struct { + Type string `json:"type"` +} + +type ETLSink struct { + AccessKeyId string `json:"accessKeyId"` + AccessKeySecret string `json:"accessKeySecret"` + Endpoint string `json:"endpoint"` + Logstore string `json:"logstore"` + Name string `json:"name"` + Project string `json:"project"` + RoleArn string `json:"roleArn,omitempty"` +} + +type ListETLResponse struct { + Total int `json:"total"` + Count int `json:"count"` + Results []*ETL `json:"results"` +} + + +func NewETL(endpoint, accessKeyId, accessKeySecret, logstore, name, project string) ETL { + sink := ETLSink{ + AccessKeyId:accessKeyId, + AccessKeySecret:accessKeySecret, + Endpoint:endpoint, + Logstore:logstore, + Name:name, + Project:project, + } + config := ETLConfiguration { + AccessKeyId:accessKeyId, + AccessKeySecret:accessKeySecret, + FromTime: time.Now().Unix(), + Script: "e_set('new','aliyun')", + Version:2, + Logstore:logstore, + ETLSinks:[]ETLSink{sink}, + Parameters: map[string]string{}, + + } + schedule := ETLSchedule{ + Type:"Resident", + } + etljob := ETL { + Configuration:config, + DisplayName:"displayname", + Description:"go sdk case", + Name:name, + Schedule:schedule, + Type:"ETL", + + } + return etljob +} + + + +func (c *Client) CreateETL(project string, etljob ETL) error { + body, err := json.Marshal(etljob) + if err != nil { + return NewClientError(err) + } + h := map[string]string{ + "x-log-bodyrawsize": fmt.Sprintf("%v", len(body)), + "Content-Type": "application/json", + } + uri := "/jobs" + + r, err := c.request(project, "POST", uri, h, body) + if err != nil { + return err + } + r.Body.Close() + return nil +} + +func (c *Client) GetETL(project string, etlName string) (ETLJob *ETL, err error) { + h := map[string]string{ + "x-log-bodyrawsize": "0", + "Content-Type": "application/json", + } + uri := "/jobs/" + etlName + r, err := c.request(project, "GET", uri, h, nil) + if err != nil { + return nil, err + } + defer r.Body.Close() + buf, _ := ioutil.ReadAll(r.Body) + etlJob := &ETL{} + if err = json.Unmarshal(buf, etlJob); err != nil { + err = NewClientError(err) + } + return etlJob, nil +} + +func (c *Client) UpdateETL(project string, etljob ETL) error { + body, err := json.Marshal(etljob) + if err != nil { + return NewClientError(err) + } + h := map[string]string{ + "x-log-bodyrawsize": fmt.Sprintf("%v", len(body)), + "Content-Type": "application/json", + } + uri := "/jobs/" + etljob.Name + r, err := c.request(project, "PUT", uri, h, body) + if err != nil { + return err + } + r.Body.Close() + return nil +} + +func (c *Client) DeleteETL(project string, etlName string) error { + h := map[string]string{ + "x-log-bodyrawsize": "0", + "Content-Type": "application/json", + } + uri := "/jobs/" + etlName + r, err := c.request(project, "DELETE", uri, h, nil) + if err != nil { + return err + } + r.Body.Close() + return nil +} + +func (c *Client) ListETL(project string, offset int, size int) (*ListETLResponse, error) { + h := map[string]string{ + "x-log-bodyrawsize": "0", + "Content-Type": "application/json", + } + + uri := fmt.Sprintf("/jobs?offset=%d&size=%d", offset, size) + r, err := c.request(project, "GET", uri, h, nil) + if err != nil { + return nil, err + } + defer r.Body.Close() + buf, _ := ioutil.ReadAll(r.Body) + + listETLResponse := &ListETLResponse{} + if err = json.Unmarshal(buf, listETLResponse); err != nil { + err = NewClientError(err) + } + return listETLResponse, err +} + +func (c *Client) StartETL(project, name string) error { + h := map[string]string{ + "x-log-bodyrawsize": "0", + "Content-Type": "application/json", + } + + uri := fmt.Sprintf("/jobs/%s?action=START", name) + r, err := c.request(project, "PUT", uri, h, nil) + if err != nil { + return err + } + r.Body.Close() + return nil +} + +func (c *Client) StopETL(project, name string) error { + h := map[string]string{ + "x-log-bodyrawsize": "0", + "Content-Type": "application/json", + } + + uri := fmt.Sprintf("/jobs/%s?action=STOP", name) + fmt.Println(uri) + r, err := c.request(project, "PUT", uri, h, nil) + if err != nil { + return err + } + r.Body.Close() + return nil +} diff --git a/client_etl_job_test.go b/client_etl_job_test.go new file mode 100644 index 00000000..5f6218b9 --- /dev/null +++ b/client_etl_job_test.go @@ -0,0 +1,148 @@ +package sls + +import ( + "github.com/stretchr/testify/suite" + "os" + "testing" + "time" +) + +func TestETLJobV2(t *testing.T) { + suite.Run(t, new(ETLJobTestV2Suite)) +} + +type ETLJobTestV2Suite struct { + suite.Suite + endpoint string + projectName string + logstoreName string + accessKeyID string + accessKeySecret string + targetLogstoreName string + etlName string + client *Client +} + +func (s *ETLJobTestV2Suite) SetupTest() { + s.endpoint = os.Getenv("LOG_TEST_ENDPOINT") + s.projectName = os.Getenv("LOG_TEST_PROJECT") + s.logstoreName = os.Getenv("LOG_TEST_LOGSTORE") + s.targetLogstoreName = os.Getenv("LOG_TEST_TARGET_LOGSTORE") + s.accessKeyID = os.Getenv("LOG_TEST_ACCESS_KEY_ID") + s.accessKeySecret = os.Getenv("LOG_TEST_ACCESS_KEY_SECRET") + s.client = &Client{ + AccessKeyID: s.accessKeyID, + AccessKeySecret: s.accessKeySecret, + Endpoint: s.endpoint, + } +} + +func (s *ETLJobTestV2Suite) createETLJobV2() error { + sink := ETLSink{ + AccessKeyId: s.accessKeyID, + AccessKeySecret: s.accessKeySecret, + Endpoint: s.endpoint, + Logstore: s.logstoreName, + Name: "aliyun-etl-test", + Project: s.projectName, + } + config := ETLConfiguration{ + AccessKeyId: s.accessKeyID, + AccessKeySecret: s.accessKeySecret, + FromTime: time.Now().Unix(), + Script: "e_set('aliyun','new')", + Version: 2, + Logstore: s.logstoreName, + ETLSinks: []ETLSink{sink}, + Parameters: map[string]string{}, + } + schedule := ETLSchedule{ + Type: "Resident", + } + etljob := ETL{ + Configuration: config, + DisplayName: "displayName", + Description: "go sdk case", + Name: s.etlName, + Schedule: schedule, + Type: "ETL", + } + return s.client.CreateETL(s.projectName, etljob) +} + +func (s *ETLJobTestV2Suite) TestClient_UpdateETLJobV2() { + err := s.createETLJobV2() + s.Require().Nil(err) + etljob, err := s.client.GetETL(s.projectName, s.etlName) + s.Require().Nil(err) + etljob.DisplayName = "update" + etljob.Description = "update description" + etljob.Configuration.Script = "e_set('update','update')" + err = s.client.UpdateETL(s.projectName, *etljob) + s.Require().Nil(err) + etljob, err = s.client.GetETL(s.projectName, s.etlName) + s.Require().Nil(err) + s.Require().Equal("update", etljob.DisplayName) + s.Require().Equal("update description", etljob.Description) + err = s.client.DeleteETL(s.projectName, s.etlName) + s.Require().Nil(err) +} + +func (s *ETLJobTestV2Suite) TestClient_DeleteETLJobV2() { + err := s.createETLJobV2() + s.Require().Nil(err) + _, err = s.client.GetETL(s.projectName, s.etlName) + s.Require().Nil(err) + err = s.client.DeleteETL(s.projectName, s.etlName) + s.Require().Nil(err) + time.Sleep(time.Second * 100) + _, err = s.client.GetETL(s.projectName, s.etlName) + s.Require().NotNil(err) + +} + +func (s *ETLJobTestV2Suite) TestClient_ListETLJobV2() { + err := s.createETLJobV2() + s.Require().Nil(err) + etljobList, err := s.client.ListETL(s.projectName, 0, 100) + s.Require().Nil(err) + s.Require().Equal(1, etljobList.Total) + s.Require().Equal(1, etljobList.Count) + err = s.client.DeleteETL(s.projectName, s.etlName) + s.Require().Nil(err) + +} + +func (s *ETLJobTestV2Suite) TestClient_StartStopETLJobV2() { + err := s.createETLJobV2() + s.Require().Nil(err) + for { + etljob, err := s.client.GetETL(s.projectName, s.etlName) + s.Require().Nil(err) + time.Sleep(10 * time.Second) + if etljob.Status == "RUNNING" { + break + } + } + + err = s.client.StopETL(s.projectName, s.etlName) + for { + etljob, err := s.client.GetETL(s.projectName, s.etlName) + s.Require().Nil(err) + time.Sleep(10 * time.Second) + if etljob.Status == "STOPPED" { + break + } + } + err = s.client.StartETL(s.projectName, s.etlName) + for { + etljob, err := s.client.GetETL(s.projectName, s.etlName) + s.Require().Nil(err) + time.Sleep(10 * time.Second) + if etljob.Status == "RUNNING" { + break + } + } + + +}