Skip to content

Commit

Permalink
feat: streamjob api add in sreworks-job
Browse files Browse the repository at this point in the history
  • Loading branch information
Twwy committed Apr 11, 2023
1 parent d9247c5 commit 753bb18
Show file tree
Hide file tree
Showing 15 changed files with 582 additions and 180 deletions.
52 changes: 52 additions & 0 deletions contribute-to-sreworks.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/bin/bash


#
# sreworks_project_path/contribute-to-sreworks.sh "paas/appmanager"
#

SW_ROOT=$(cd `dirname $0`; pwd)

IS_GIT_ROOT=$(ls -l .git|wc -l|awk '{print $1}')

if [ "$IS_GIT_ROOT" = "0" ];then
echo ""
echo "Please run contribute-to-sreworks.sh in Git Project root path /"
exit 1
fi

# 判断目标路径是否存在
TARGET_PATH=$1
if [ ! -d ${SW_ROOT}/${TARGET_PATH} ];then
echo "Target app path not found"
exit 1
fi


# 只能拷贝到paas/saas这两个目录下
if [[ "$TARGET_PATH" =~ ^paas/.* ]] || [[ "$TARGET_PATH" =~ ^saas/.* ]] || [[ "$TARGET_PATH" =~ ^/paas/.* ]] || [[ "$TARGET_PATH" =~ ^/saas/.* ]]; then
echo "Path check ok"
else
echo "Please copy code to paas/* or saas/*"
echo ""
echo "List paas/"
ls -l ${SW_ROOT}/${TARGET_PATH}/paas/|grep -v "total "
echo ""
echo "List saas/"
ls -l ${SW_ROOT}/${TARGET_PATH}/saas/|grep -v "total "
exit 1
fi


# 将当前代码拷贝到一个临时目录,移除.git文件
rm -rf /tmp/tmp_sw_project
mkdir -p /tmp/tmp_sw_project
cp -r ./ /tmp/tmp_sw_project/
rm -rf /tmp/tmp_sw_project/.git

mv /tmp/tmp_sw_project ${SW_ROOT}/${TARGET_PATH}/../
mv ${SW_ROOT}/${TARGET_PATH}/../tmp_sw_project ${SW_ROOT}/${TARGET_PATH}.bak
rm -rf ${SW_ROOT}/${TARGET_PATH}
mv ${SW_ROOT}/${TARGET_PATH}.bak ${SW_ROOT}/${TARGET_PATH}
echo "Copy code ok"

Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,8 @@

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.sreworks.job.master.domain.DO.SreworksJob;
import com.alibaba.sreworks.job.master.domain.DTO.ElasticJobInstanceDTO;
import com.alibaba.sreworks.job.master.domain.DTO.FlinkConnectorDTO;
import com.alibaba.sreworks.job.master.domain.DTO.SreworksJobDTO;
import com.alibaba.sreworks.job.master.domain.DTO.SreworksStreamJobDTO;
import com.alibaba.sreworks.job.master.domain.repository.SreworksJobRepository;
import com.alibaba.sreworks.job.master.jobscene.JobSceneService;
import com.alibaba.sreworks.job.master.jobschedule.JobScheduleService;
import com.alibaba.sreworks.job.master.jobtrigger.JobTriggerService;
import com.alibaba.sreworks.job.master.domain.DTO.*;
import com.alibaba.sreworks.job.master.params.*;
import com.alibaba.sreworks.job.master.services.JobService;
import com.alibaba.sreworks.job.master.services.StreamJobService;
import com.alibaba.sreworks.job.master.services.VvpService;
import com.alibaba.sreworks.job.utils.JsonUtil;
Expand All @@ -22,15 +13,9 @@
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import static com.alibaba.sreworks.job.utils.PageUtil.pageable;

Expand All @@ -52,9 +37,6 @@ public class StreamJobController extends BaseController {
public TeslaBaseResult create(@RequestBody StreamJobCreateParam param) throws Exception {
param.setCreator(getUserEmployeeId());
param.setOperator(getUserEmployeeId());
if (param.getAppId() == null) {
param.setAppId(getAppId());
}
if (param.getTags() == null) {
param.setTags(new JSONArray());
}
Expand All @@ -67,14 +49,80 @@ public TeslaBaseResult create(@RequestBody StreamJobCreateParam param) throws Ex
return buildSucceedResult(streamJobService.create(param));
}

@RequestMapping(value = "{id}/source", method = RequestMethod.POST)
public TeslaBaseResult addSource(@PathVariable("id") Long streamJobId, @RequestBody StreamJobSourceCreateParam param) throws Exception {
SreworksStreamJobDTO job = streamJobService.get(streamJobId);
if(job == null){
return buildClientErrorResult("streamJob not found");
}
param.setCreator(getUserEmployeeId());
param.setOperator(getUserEmployeeId());
return buildSucceedResult(streamJobService.addSource(streamJobId, job.getAppId(), param));
}

@RequestMapping(value = "{id}/block/{blockId}", method = RequestMethod.DELETE)
public TeslaBaseResult deleteBlock(@PathVariable("id") Long streamJobId, @PathVariable("blockId") Long streamJobBlockId) throws Exception {
SreworksStreamJobDTO job = streamJobService.get(streamJobId);
if(job == null){
return buildClientErrorResult("streamJob not found");
}
streamJobService.deleteBlock(streamJobBlockId);
return buildSucceedResult(JsonUtil.map(
"streamJobId", streamJobId,
"blockId", streamJobBlockId
));
}

@RequestMapping(value = "{id}/sink", method = RequestMethod.POST)
public TeslaBaseResult addSink(@PathVariable("id") Long streamJobId, @RequestBody StreamJobSinkCreateParam param) throws Exception {
SreworksStreamJobDTO job = streamJobService.get(streamJobId);
if(job == null){
return buildClientErrorResult("streamJob not found");
}
param.setCreator(getUserEmployeeId());
param.setOperator(getUserEmployeeId());
return buildSucceedResult(streamJobService.addSink(streamJobId, job.getAppId(), param));
}

@RequestMapping(value = "{id}/python", method = RequestMethod.POST)
public TeslaBaseResult addPython(@PathVariable("id") Long streamJobId, @RequestBody StreamJobPythonCreateParam param) throws Exception {
SreworksStreamJobDTO job = streamJobService.get(streamJobId);
if(job == null){
return buildClientErrorResult("streamJob not found");
}
param.setCreator(getUserEmployeeId());
param.setOperator(getUserEmployeeId());
return buildSucceedResult(streamJobService.addPython(streamJobId, job.getAppId(), param));
}

@RequestMapping(value = "{id}/blocks", method = RequestMethod.GET)
public TeslaBaseResult getBlocks(@PathVariable("id") Long streamJobId) throws Exception {
SreworksStreamJobDTO job = streamJobService.get(streamJobId);
if(job == null){
return buildClientErrorResult("streamJob not found");
}
return buildSucceedResult(streamJobService.listBlockByStreamJobId(streamJobId));
}

@RequestMapping(value = "{id}/preview", method = RequestMethod.GET)
public TeslaBaseResult getPreview(@PathVariable("id") Long streamJobId) throws Exception {
SreworksStreamJobDTO job = streamJobService.get(streamJobId);
if(job == null){
return buildClientErrorResult("streamJob not found");
}
return buildSucceedResult(JsonUtil.map(
"content", streamJobService.generateScript(streamJobId)
));
}

@RequestMapping(value = "gets", method = RequestMethod.GET)
public TeslaBaseResult gets(Integer page, Integer pageSize) throws Exception {
Page<SreworksStreamJobDTO> jobList = streamJobService.gets(pageable(page, pageSize));
return buildSucceedResult(JsonUtil.map(
"page", jobList.getNumber(),
"pageSize", jobList.getSize(),
"total", jobList.getTotalElements(),
"items", jobList.getContent()
"items", jobList.getContent()
));

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@

import javax.persistence.*;

/**
* @author jinghua.yjh
*/
@Slf4j
@Entity
@EntityListeners(AuditingEntityListener.class)
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class SreworksStreamJobPython {
public class SreworksStreamJobBlock {

@Id
@GeneratedValue
Expand Down Expand Up @@ -47,6 +44,10 @@ public class SreworksStreamJobPython {
private String name;

@Column(columnDefinition = "longtext")
private String script;
private String data;

@Column
private String blockType;


}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.alibaba.sreworks.job.master.domain.DTO;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.sreworks.job.master.domain.DO.SreworksStreamJobBlock;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

@Data
@Slf4j
@NoArgsConstructor
@AllArgsConstructor
public class SreworksStreamJobBlockDTO {

private Long id;

private Long streamJobId;
private Long gmtCreate;

private Long gmtModified;

private String creator;

private String operator;

private String appId;

private String name;

private String blockType;

private String blockTypeDisplay;

private JSONObject data;



public SreworksStreamJobBlockDTO(SreworksStreamJobBlock jobBlock) {
id = jobBlock.getId();
gmtCreate = jobBlock.getGmtCreate();
gmtModified = jobBlock.getGmtModified();
streamJobId = jobBlock.getStreamJobId();
appId = jobBlock.getAppId();
name = jobBlock.getName();
blockType = jobBlock.getBlockType();
data = JSONObject.parseObject(jobBlock.getData());
if (StringUtils.equals(blockType, "source") && data.getString("sourceType") != null) {
blockTypeDisplay = "输入源:" + data.getString("sourceType");
} else if (StringUtils.equals(blockType, "sink") && data.getString("sinkType") != null) {
blockTypeDisplay = "输出:" + data.getString("sinkType");
} else if (StringUtils.equals(blockType, "python")){
blockTypeDisplay = "Python处理";
} else {
blockTypeDisplay = blockType;
}
}

}
Loading

0 comments on commit 753bb18

Please sign in to comment.