From 5bef01ce80474044d49a383ffb61e6f7a13ea9a4 Mon Sep 17 00:00:00 2001 From: Chyroc Chen Date: Sun, 22 Jan 2017 15:57:52 +0800 Subject: [PATCH 1/2] add s3 writer --- package.xml | 7 + .../unstructuredstorage/writer/Key.java | 9 + pom.xml | 1 + s3writer/doc/s3writer.md | 216 +++++++++++ s3writer/pom.xml | 88 +++++ s3writer/src/main/assembly/package.xml | 35 ++ .../datax/plugin/writer/s3writer/Key.java | 6 + .../datax/plugin/writer/s3writer/S3Util.java | 63 ++++ .../plugin/writer/s3writer/S3Writer.java | 357 ++++++++++++++++++ .../writer/s3writer/S3WriterErrorCode.java | 41 ++ s3writer/src/main/resources/plugin.json | 6 + .../main/resources/plugin_job_template.json | 15 + 12 files changed, 844 insertions(+) create mode 100644 s3writer/doc/s3writer.md create mode 100755 s3writer/pom.xml create mode 100755 s3writer/src/main/assembly/package.xml create mode 100755 s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/Key.java create mode 100644 s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3Util.java create mode 100755 s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3Writer.java create mode 100755 s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3WriterErrorCode.java create mode 100755 s3writer/src/main/resources/plugin.json create mode 100644 s3writer/src/main/resources/plugin_job_template.json diff --git a/package.xml b/package.xml index cbe0621ab..78e85b648 100755 --- a/package.xml +++ b/package.xml @@ -203,6 +203,13 @@ datax + + s3writer/target/datax/ + + **/*.* + + datax + ftpwriter/target/datax/ diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java index 2e7fe079f..1b8f0ad17 100755 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java @@ -35,4 +35,13 @@ public class Key { // writer file type suffix, like .txt .csv public static final String SUFFIX = "suffix"; + + public static final String S3_BUCKET = "s3Bucket"; + + public static final String S3_ACCESS_KEY = "s3AccessKey"; + + public static final String S3_SECRET_KEY = "s3SecretKey"; + + public static final String S3_ENDPOINT = "s3Endpoint"; + } diff --git a/pom.xml b/pom.xml index 15c5b98ca..ba3fd82e0 100755 --- a/pom.xml +++ b/pom.xml @@ -82,6 +82,7 @@ rdbmswriter hbase11xwriter hbase094xwriter + s3writer plugin-rdbms-util diff --git a/s3writer/doc/s3writer.md b/s3writer/doc/s3writer.md new file mode 100644 index 000000000..6d7615495 --- /dev/null +++ b/s3writer/doc/s3writer.md @@ -0,0 +1,216 @@ +# DataX S3Writer 说明 + + +------------ + +## 1 快速介绍 + +S3Writer提供了向S3写入类CSV格式的一个或者多个表文件。 + +**写入S3文件内容存放的是一张逻辑意义上的二维表,例如CSV格式的文本信息。** + + +## 2 功能与限制 + +S3Writer实现了从DataX协议转为S3TXT文件功能,S3文件本身是无结构化数据存储,S3Writer如下几个方面约定: + +1. 支持且仅支持写入 TXT的文件,且要求TXT中shema为一张二维表。 + +2. 支持类CSV格式文件,自定义分隔符。 + +3. 支持文本压缩,现有压缩格式为gzip、bzip2。 + +6. 支持多线程写入,每个线程写入不同子文件。 + +7. 文件支持滚动,当文件大于某个size值或者行数值,文件需要切换。 [暂不支持] + +我们不能做到: + +1. 单个文件不能支持并发写入。 + + +## 3 功能说明 + + +### 3.1 配置样例 + +```json +{ + "setting": {}, + "job": { + "setting": { + "speed": { + "channel": 2 + } + }, + "content": [ + { + "reader": { + "name": "txtfilereader", + "parameter": { + "path": ["/home/haiwei.luo/case00/data"], + "encoding": "UTF-8", + "column": [ + { + "index": 0, + "type": "long" + }, + { + "index": 1, + "type": "boolean" + }, + { + "index": 2, + "type": "double" + }, + { + "index": 3, + "type": "string" + }, + { + "index": 4, + "type": "date", + "format": "yyyy.MM.dd" + } + ], + "fieldDelimiter": "," + } + }, + "writer": { + "name": "txtfilewriter", + "parameter": { + "path": "/home/haiwei.luo/case00/result", + "fileName": "luohw", + "writeMode": "truncate", + "dateFormat": "yyyy-MM-dd" + } + } + } + ] + } +} +``` + +### 3.2 参数说明 + +* **path** + + * 描述:S3文件系统的路径信息,S3Writer会写入Path目录下属多个文件。
+ + * 必选:是
+ + * 默认值:无
+ +* **fileName** + + * 描述:S3Writer写入的文件名,该文件名会添加随机的后缀作为每个线程写入实际文件名。
+ + * 必选:是
+ + * 默认值:无
+ +* **writeMode** + + * 描述:S3Writer写入前数据清理处理模式:
+ + * truncate,写入前清理目录下一fileName前缀的所有文件。 + * append,写入前不做任何处理,DataX S3Writer直接使用filename写入,并保证文件名不冲突。 + * nonConflict,如果目录下有fileName前缀的文件,直接报错。 + + * 必选:是
+ + * 默认值:无
+ +* **fieldDelimiter** + + * 描述:读取的字段分隔符
+ + * 必选:否
+ + * 默认值:,
+ +* **compress** + + * 描述:文本压缩类型,默认不填写意味着没有压缩。支持压缩类型为zip、lzo、lzop、tgz、bzip2。
+ + * 必选:否
+ + * 默认值:无压缩
+ +* **encoding** + + * 描述:读取文件的编码配置。
+ + * 必选:否
+ + * 默认值:utf-8
+ + +* **nullFormat** + + * 描述:文本文件中无法使用标准字符串定义null(空指针),DataX提供nullFormat定义哪些字符串可以表示为null。
+ + 例如如果用户配置: nullFormat="\N",那么如果源头数据是"\N",DataX视作null字段。 + + * 必选:否
+ + * 默认值:\N
+ +* **dateFormat** + + * 描述:日期类型的数据序列化到文件中时的格式,例如 "dateFormat": "yyyy-MM-dd"。
+ + * 必选:否
+ + * 默认值:无
+ +* **fileFormat** + + * 描述:文件写出的格式,包括csv (http://zh.wikipedia.org/wiki/%E9%80%97%E5%8F%B7%E5%88%86%E9%9A%94%E5%80%BC) 和text两种,csv是严格的csv格式,如果待写数据包括列分隔符,则会按照csv的转义语法转义,转义符号为双引号";text格式是用列分隔符简单分割待写数据,对于待写数据包括列分隔符情况下不做转义。
+ + * 必选:否
+ + * 默认值:text
+ +* **header** + + * 描述:txt写出时的表头,示例['id', 'name', 'age']。
+ + * 必选:否
+ + * 默认值:无
+ +### 3.3 类型转换 + + +S3文件本身不提供数据类型,该类型是DataX S3Writer定义: + +| DataX 内部类型| S3文件 数据类型 | +| -------- | ----- | +| +| Long |Long | +| Double |Double| +| String |String| +| Boolean |Boolean | +| Date |Date | + +其中: + +* S3文件 Long是指S3文件文本中使用整形的字符串表示形式,例如"19901219"。 +* S3文件 Double是指S3文件文本中使用Double的字符串表示形式,例如"3.1415"。 +* S3文件 Boolean是指S3文件文本中使用Boolean的字符串表示形式,例如"true"、"false"。不区分大小写。 +* S3文件 Date是指S3文件文本中使用Date的字符串表示形式,例如"2014-12-31",Date可以指定format格式。 + + +## 4 性能报告 + + +## 5 约束限制 + +略 + +## 6 FAQ + +略 + + diff --git a/s3writer/pom.xml b/s3writer/pom.xml new file mode 100755 index 000000000..b3e13bf54 --- /dev/null +++ b/s3writer/pom.xml @@ -0,0 +1,88 @@ + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + + s3writer + s3writer + S3Writer提供了本地写入TEXT功能,建议开发、测试环境使用。 + jar + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + com.alibaba.datax + plugin-unstructured-storage-util + ${datax-project-version} + + + org.slf4j + slf4j-api + + + ch.qos.logback + logback-classic + + + com.google.guava + guava + 16.0.1 + + + com.amazonaws + aws-java-sdk-core + 1.11.52 + + + com.amazonaws + aws-java-sdk-s3 + 1.11.52 + + + + + + + + maven-compiler-plugin + + 1.6 + 1.6 + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/s3writer/src/main/assembly/package.xml b/s3writer/src/main/assembly/package.xml new file mode 100755 index 000000000..d5b762bbd --- /dev/null +++ b/s3writer/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/s3writer + + + target/ + + s3writer-0.0.1-SNAPSHOT.jar + + plugin/writer/s3writer + + + + + + false + plugin/writer/s3writer/libs + runtime + + + diff --git a/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/Key.java b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/Key.java new file mode 100755 index 000000000..f1169a841 --- /dev/null +++ b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/Key.java @@ -0,0 +1,6 @@ +package com.alibaba.datax.plugin.writer.s3writer; + +public class Key { + // must have + public static final String PATH = "path"; +} diff --git a/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3Util.java b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3Util.java new file mode 100644 index 000000000..5d9eb48ae --- /dev/null +++ b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3Util.java @@ -0,0 +1,63 @@ +package com.alibaba.datax.plugin.writer.s3writer; + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.model.Bucket; +import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; +import com.amazonaws.services.s3.transfer.TransferProgress; +import com.amazonaws.services.s3.transfer.Upload; +import java.io.File; +import java.io.IOException; +import java.util.List; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; + +public class S3Util { + + private final String bucket; + private final String accessKey; + private final String secretKey; + private final String endpoint; + private final AmazonS3 s3Client; + + public S3Util(String bucket, String accessKey, String secretKey, String endpoint) { + this.bucket = bucket; + this.accessKey = accessKey; + this.secretKey = secretKey; + this.endpoint = endpoint; + + this.s3Client = new AmazonS3Client(new BasicAWSCredentials(this.accessKey, this.secretKey)); + this.s3Client.setEndpoint(this.endpoint); + } + + public void upload(String from, String to) { + TransferManager transferManager = new TransferManager(this.s3Client); + + Upload upload = transferManager.upload(this.bucket, to, new File(from)); + TransferProgress p = upload.getProgress(); + while (upload.isDone() == false) { + int percent = (int) (p.getPercentTransferred()); + System.out.print("\r" + from + " - " + "[ " + percent + "% ] " + + p.getBytesTransferred() + " / " + p.getTotalBytesToTransfer()); + try { + Thread.sleep(500); + } catch (Exception e) { + + } + } + try { + upload.waitForCompletion(); + s3Client.setObjectAcl(this.bucket, to, CannedAccessControlList.PublicRead); + } catch (Exception e) { + System.out.println(e.getMessage()); + } finally { + transferManager.shutdownNow(); + } + System.out.print("\r" + from + " - " + "[ 100% ] " + + p.getBytesTransferred() + " / " + p.getTotalBytesToTransfer()); + } +} \ No newline at end of file diff --git a/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3Writer.java b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3Writer.java new file mode 100755 index 000000000..3a6afcdea --- /dev/null +++ b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3Writer.java @@ -0,0 +1,357 @@ +package com.alibaba.datax.plugin.writer.s3writer; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.unstructuredstorage.writer.UnstructuredStorageWriterUtil; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.filefilter.PrefixFileFilter; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +/** + * Created by haiwei.luo on 14-9-17. + */ +public class S3Writer extends Writer { + public static class Job extends Writer.Job { + private static final Logger LOG = LoggerFactory.getLogger(Job.class); + + private Configuration writerSliceConfig = null; + + @Override + public void init() { + this.writerSliceConfig = this.getPluginJobConf(); + this.validateParameter(); + String dateFormatOld = this.writerSliceConfig + .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FORMAT); + String dateFormatNew = this.writerSliceConfig + .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.DATE_FORMAT); + if (null == dateFormatNew) { + this.writerSliceConfig + .set(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.DATE_FORMAT, + dateFormatOld); + } + if (null != dateFormatOld) { + LOG.warn("您使用format配置日期格式化, 这是不推荐的行为, 请优先使用dateFormat配置项, 两项同时存在则使用dateFormat."); + } + UnstructuredStorageWriterUtil + .validateParameter(this.writerSliceConfig); + } + + private void validateParameter() { + this.writerSliceConfig + .getNecessaryValue( + com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FILE_NAME, + S3WriterErrorCode.REQUIRED_VALUE); + + String path = this.writerSliceConfig.getNecessaryValue(Key.PATH, + S3WriterErrorCode.REQUIRED_VALUE); + + try { + // warn: 这里用户需要配一个目录 + File dir = new File(path); + if (dir.isFile()) { + throw DataXException + .asDataXException( + S3WriterErrorCode.ILLEGAL_VALUE, + String.format( + "您配置的path: [%s] 不是一个合法的目录, 请您注意文件重名, 不合法目录名等情况.", + path)); + } + if (!dir.exists()) { + boolean createdOk = dir.mkdirs(); + if (!createdOk) { + throw DataXException + .asDataXException( + S3WriterErrorCode.CONFIG_INVALID_EXCEPTION, + String.format("您指定的文件路径 : [%s] 创建失败.", + path)); + } + } + } catch (SecurityException se) { + throw DataXException.asDataXException( + S3WriterErrorCode.SECURITY_NOT_ENOUGH, + String.format("您没有权限创建文件路径 : [%s] ", path), se); + } + } + + @Override + public void prepare() { + String path = this.writerSliceConfig.getString(Key.PATH); + String fileName = this.writerSliceConfig + .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FILE_NAME); + String writeMode = this.writerSliceConfig + .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.WRITE_MODE); + // truncate option handler + if ("truncate".equals(writeMode)) { + LOG.info(String.format( + "由于您配置了writeMode truncate, 开始清理 [%s] 下面以 [%s] 开头的内容", + path, fileName)); + File dir = new File(path); + // warn:需要判断文件是否存在,不存在时,不能删除 + try { + if (dir.exists()) { + // warn:不要使用FileUtils.deleteQuietly(dir); + FilenameFilter filter = new PrefixFileFilter(fileName); + File[] filesWithFileNamePrefix = dir.listFiles(filter); + for (File eachFile : filesWithFileNamePrefix) { + LOG.info(String.format("delete file [%s].", + eachFile.getName())); + FileUtils.forceDelete(eachFile); + } + // FileUtils.cleanDirectory(dir); + } + } catch (NullPointerException npe) { + throw DataXException + .asDataXException( + S3WriterErrorCode.Write_FILE_ERROR, + String.format("您配置的目录清空时出现空指针异常 : [%s]", + path), npe); + } catch (IllegalArgumentException iae) { + throw DataXException.asDataXException( + S3WriterErrorCode.SECURITY_NOT_ENOUGH, + String.format("您配置的目录参数异常 : [%s]", path)); + } catch (SecurityException se) { + throw DataXException.asDataXException( + S3WriterErrorCode.SECURITY_NOT_ENOUGH, + String.format("您没有权限查看目录 : [%s]", path)); + } catch (IOException e) { + throw DataXException.asDataXException( + S3WriterErrorCode.Write_FILE_ERROR, + String.format("无法清空目录 : [%s]", path), e); + } + } else if ("append".equals(writeMode)) { + LOG.info(String + .format("由于您配置了writeMode append, 写入前不做清理工作, [%s] 目录下写入相应文件名前缀 [%s] 的文件", + path, fileName)); + } else if ("nonConflict".equals(writeMode)) { + LOG.info(String.format( + "由于您配置了writeMode nonConflict, 开始检查 [%s] 下面的内容", path)); + // warn: check two times about exists, mkdirs + File dir = new File(path); + try { + if (dir.exists()) { + if (dir.isFile()) { + throw DataXException + .asDataXException( + S3WriterErrorCode.ILLEGAL_VALUE, + String.format( + "您配置的path: [%s] 不是一个合法的目录, 请您注意文件重名, 不合法目录名等情况.", + path)); + } + // fileName is not null + FilenameFilter filter = new PrefixFileFilter(fileName); + File[] filesWithFileNamePrefix = dir.listFiles(filter); + if (filesWithFileNamePrefix.length > 0) { + List allFiles = new ArrayList(); + for (File eachFile : filesWithFileNamePrefix) { + allFiles.add(eachFile.getName()); + } + LOG.error(String.format("冲突文件列表为: [%s]", + StringUtils.join(allFiles, ","))); + throw DataXException + .asDataXException( + S3WriterErrorCode.ILLEGAL_VALUE, + String.format( + "您配置的path: [%s] 目录不为空, 下面存在其他文件或文件夹.", + path)); + } + } else { + boolean createdOk = dir.mkdirs(); + if (!createdOk) { + throw DataXException + .asDataXException( + S3WriterErrorCode.CONFIG_INVALID_EXCEPTION, + String.format( + "您指定的文件路径 : [%s] 创建失败.", + path)); + } + } + } catch (SecurityException se) { + throw DataXException.asDataXException( + S3WriterErrorCode.SECURITY_NOT_ENOUGH, + String.format("您没有权限查看目录 : [%s]", path)); + } + } else { + throw DataXException + .asDataXException( + S3WriterErrorCode.ILLEGAL_VALUE, + String.format( + "仅支持 truncate, append, nonConflict 三种模式, 不支持您配置的 writeMode 模式 : [%s]", + writeMode)); + } + } + + @Override + public void post() { + + } + + @Override + public void destroy() { + + } + + @Override + public List split(int mandatoryNumber) { + LOG.info("begin do split..."); + List writerSplitConfigs = new ArrayList(); + String filePrefix = this.writerSliceConfig + .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FILE_NAME); + + Set allFiles = new HashSet(); + String path = null; + try { + path = this.writerSliceConfig.getString(Key.PATH); + File dir = new File(path); + allFiles.addAll(Arrays.asList(dir.list())); + } catch (SecurityException se) { + throw DataXException.asDataXException( + S3WriterErrorCode.SECURITY_NOT_ENOUGH, + String.format("您没有权限查看目录 : [%s]", path)); + } + + String fileSuffix; + for (int i = 0; i < mandatoryNumber; i++) { + // handle same file name + + Configuration splitedTaskConfig = this.writerSliceConfig + .clone(); + + String fullFileName = null; + fileSuffix = UUID.randomUUID().toString().replace('-', '_'); + fullFileName = String.format("%s__%s", filePrefix, fileSuffix); + while (allFiles.contains(fullFileName)) { + fileSuffix = UUID.randomUUID().toString().replace('-', '_'); + fullFileName = String.format("%s__%s", filePrefix, + fileSuffix); + } + allFiles.add(fullFileName); + + splitedTaskConfig + .set(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FILE_NAME, + fullFileName); + + LOG.info(String.format("splited write file name:[%s]", + fullFileName)); + + writerSplitConfigs.add(splitedTaskConfig); + } + LOG.info("end do split."); + return writerSplitConfigs; + } + } + + public static class Task extends Writer.Task { + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + private Configuration writerSliceConfig; + + private String path; + + private String fileName; + + @Override + public void init() { + this.writerSliceConfig = this.getPluginJobConf(); + this.path = this.writerSliceConfig.getString(Key.PATH); + this.fileName = this.writerSliceConfig + .getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.FILE_NAME); + } + + @Override + public void prepare() { + + } + + @Override + public void startWrite(RecordReceiver lineReceiver) { + LOG.info("begin do write..."); + String fileFullPath = this.buildFilePath(); + LOG.info(String.format("write to file : [%s]", fileFullPath)); + + OutputStream outputStream = null; + try { + File newFile = new File(fileFullPath); + newFile.createNewFile(); + outputStream = new FileOutputStream(newFile); + UnstructuredStorageWriterUtil.writeToStream(lineReceiver, + outputStream, this.writerSliceConfig, this.fileName, + this.getTaskPluginCollector()); + } catch (SecurityException se) { + throw DataXException.asDataXException( + S3WriterErrorCode.SECURITY_NOT_ENOUGH, + String.format("您没有权限创建文件 : [%s]", this.fileName)); + } catch (IOException ioe) { + throw DataXException.asDataXException( + S3WriterErrorCode.Write_FILE_IO_ERROR, + String.format("无法创建待写文件 : [%s]", this.fileName), ioe); + } finally { + IOUtils.closeQuietly(outputStream); + } + + String s3Bucket = this.writerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.S3_BUCKET); + String s3AccessKey = this.writerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.S3_ACCESS_KEY); + String s3SecretKey = this.writerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.S3_SECRET_KEY); + String s3Endpoint = this.writerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.writer.Key.S3_ENDPOINT); + + S3Util s3Util = new S3Util(s3Bucket, s3AccessKey, s3SecretKey, s3Endpoint); + s3Util.upload(fileFullPath, fileFullPath); + + try{ + File file = new File(fileFullPath); + file.delete(); + }catch(Exception e){ + System.out.println("delete file failed"); + } + + LOG.info("end do write"); + } + + private String buildFilePath() { + boolean isEndWithSeparator = false; + switch (IOUtils.DIR_SEPARATOR) { + case IOUtils.DIR_SEPARATOR_UNIX: + isEndWithSeparator = this.path.endsWith(String + .valueOf(IOUtils.DIR_SEPARATOR)); + break; + case IOUtils.DIR_SEPARATOR_WINDOWS: + isEndWithSeparator = this.path.endsWith(String + .valueOf(IOUtils.DIR_SEPARATOR_WINDOWS)); + break; + default: + break; + } + if (!isEndWithSeparator) { + this.path = this.path + IOUtils.DIR_SEPARATOR; + } + return String.format("%s%s", this.path, this.fileName); + } + + @Override + public void post() { + + } + + @Override + public void destroy() { + + } + } +} diff --git a/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3WriterErrorCode.java b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3WriterErrorCode.java new file mode 100755 index 000000000..3bd817143 --- /dev/null +++ b/s3writer/src/main/java/com/alibaba/datax/plugin/writer/s3writer/S3WriterErrorCode.java @@ -0,0 +1,41 @@ +package com.alibaba.datax.plugin.writer.s3writer; + +import com.alibaba.datax.common.spi.ErrorCode; + +/** + * Created by haiwei.luo on 14-9-17. + */ +public enum S3WriterErrorCode implements ErrorCode { + + CONFIG_INVALID_EXCEPTION("TxtFileWriter-00", "您的参数配置错误."), + REQUIRED_VALUE("TxtFileWriter-01", "您缺失了必须填写的参数值."), + ILLEGAL_VALUE("TxtFileWriter-02", "您填写的参数值不合法."), + Write_FILE_ERROR("TxtFileWriter-03", "您配置的目标文件在写入时异常."), + Write_FILE_IO_ERROR("TxtFileWriter-04", "您配置的文件在写入时出现IO异常."), + SECURITY_NOT_ENOUGH("TxtFileWriter-05", "您缺少权限执行相应的文件写入操作."); + + private final String code; + private final String description; + + private S3WriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, + this.description); + } + +} diff --git a/s3writer/src/main/resources/plugin.json b/s3writer/src/main/resources/plugin.json new file mode 100755 index 000000000..5ca1cb7d9 --- /dev/null +++ b/s3writer/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "s3writer", + "class": "com.alibaba.datax.plugin.writer.s3writer.S3Writer", + "description": "upload file to aws s3", + "developer": "Chyroc" +} \ No newline at end of file diff --git a/s3writer/src/main/resources/plugin_job_template.json b/s3writer/src/main/resources/plugin_job_template.json new file mode 100644 index 000000000..142e9b386 --- /dev/null +++ b/s3writer/src/main/resources/plugin_job_template.json @@ -0,0 +1,15 @@ +{ + "name": "s3writer", + "parameter": { + "s3Bucket": "", + "s3AccessKey": "", + "s3SecretKey": "", + "s3Endpoint": "", + + "path": "", + "fileName": "", + "writeMode": "", + "fieldDelimiter":"", + "dateFormat": "" + } +} \ No newline at end of file From 5af843364c97aa67bada4d36b734bd22126d475a Mon Sep 17 00:00:00 2001 From: Chyroc Chen Date: Sun, 22 Jan 2017 16:31:34 +0800 Subject: [PATCH 2/2] doc --- s3writer/doc/s3writer.md | 95 ++++++++++++++++++---------------------- 1 file changed, 43 insertions(+), 52 deletions(-) diff --git a/s3writer/doc/s3writer.md b/s3writer/doc/s3writer.md index 6d7615495..231dc2c19 100644 --- a/s3writer/doc/s3writer.md +++ b/s3writer/doc/s3writer.md @@ -36,58 +36,49 @@ S3Writer实现了从DataX协议转为S3TXT文件功能,S3文件本身是无结 ```json { - "setting": {}, - "job": { - "setting": { - "speed": { - "channel": 2 - } - }, - "content": [ - { - "reader": { - "name": "txtfilereader", - "parameter": { - "path": ["/home/haiwei.luo/case00/data"], - "encoding": "UTF-8", - "column": [ - { - "index": 0, - "type": "long" - }, - { - "index": 1, - "type": "boolean" - }, - { - "index": 2, - "type": "double" - }, - { - "index": 3, - "type": "string" - }, - { - "index": 4, - "type": "date", - "format": "yyyy.MM.dd" - } - ], - "fieldDelimiter": "," - } - }, - "writer": { - "name": "txtfilewriter", - "parameter": { - "path": "/home/haiwei.luo/case00/result", - "fileName": "luohw", - "writeMode": "truncate", - "dateFormat": "yyyy-MM-dd" - } - } - } - ] - } + { + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "column": ["*"], + "connection": [ + { + "jdbcUrl": ["jdbc:mysql://xxx:3306/xxx"], + "table": ["yyy"] + } + ], + "password": "root", + "username": "root", + "where": "" + } + }, + "writer": { + "name": "s3writer", + "parameter": { + "s3Bucket": "xxx", + "s3AccessKey": "xxx", + "s3SecretKey": "xxx+", + "s3Endpoint": "s3.cn-north-1.amazonaws.com.cn", + + "dateFormat": "", + "fieldDelimiter": ",", + "fileName": "yyy", + "path": "xxx/xxx", + "writeMode": "truncate" + } + } + } + ], + "setting": { + "speed": { + "channel": 10 + } + } + } + } } ```