Skip to content

Commit

Permalink
hotfix-#1795][hdfs] fix sink hdfs bug
Browse files Browse the repository at this point in the history
  • Loading branch information
zhourui committed Aug 22, 2023
1 parent 74c1591 commit 8f347c9
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 24 deletions.
33 changes: 15 additions & 18 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class HdfsOrcSyncConverter extends AbstractRowConverter<RowData, RowData,

private static final long serialVersionUID = 4254984437380862131L;

private List<String> ColumnNameList;
private List<String> columnNameList;
private transient Map<String, ColumnTypeUtil.DecimalInfo> decimalColInfo;

public HdfsOrcSyncConverter(List<FieldConfig> fieldConfigList, HdfsConfig hdfsConfig) {
Expand Down Expand Up @@ -107,7 +107,7 @@ public RowData toInternal(RowData input) throws Exception {
@Override
@SuppressWarnings("unchecked")
public Object[] toExternal(RowData rowData, Object[] data) throws Exception {
for (int index = 0; index < fieldTypes.length; index++) {
for (int index = 0; index < columnNameList.size(); index++) {
toExternalConverters.get(index).serialize(rowData, index, data);
}
return data;
Expand Down Expand Up @@ -195,7 +195,7 @@ protected ISerializationConverter<Object[]> createExternalConverter(String type)
case "DECIMAL":
return (rowData, index, data) -> {
ColumnTypeUtil.DecimalInfo decimalInfo =
decimalColInfo.get(ColumnNameList.get(index));
decimalColInfo.get(columnNameList.get(index));
HiveDecimal hiveDecimal =
HiveDecimal.create(new BigDecimal(rowData.getString(index).toString()));
hiveDecimal =
Expand Down Expand Up @@ -245,7 +245,7 @@ protected ISerializationConverter<Object[]> createExternalConverter(String type)
}

public void setColumnNameList(List<String> columnNameList) {
this.ColumnNameList = columnNameList;
this.columnNameList = columnNameList;
}

public void setDecimalColInfo(Map<String, ColumnTypeUtil.DecimalInfo> decimalColInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public RowData toInternal(RowData input) throws Exception {
@Override
@SuppressWarnings("unchecked")
public Group toExternal(RowData rowData, Group group) throws Exception {
for (int index = 0; index < fieldTypes.length; index++) {
for (int index = 0; index < columnNameList.size(); index++) {
toExternalConverters.get(index).serialize(rowData, index, group);
}
return group;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,22 @@
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;

@Slf4j
public class HdfsTextSyncConverter
extends AbstractRowConverter<RowData, RowData, String[], String> {

private static final long serialVersionUID = 1191849197062775272L;

private List<String> columnNameList;

public HdfsTextSyncConverter(List<FieldConfig> fieldConfigList, HdfsConfig hdfsConfig) {
super(fieldConfigList.size(), hdfsConfig);
columnNameList =
hdfsConfig.getColumn().stream()
.map(FieldConfig::getName)
.collect(Collectors.toList());
for (FieldConfig fieldConfig : fieldConfigList) {
String type = fieldConfig.getType().getType();
int left = type.indexOf(ConstantValue.LEFT_PARENTHESIS_SYMBOL);
Expand Down Expand Up @@ -103,7 +110,7 @@ public RowData toInternal(RowData input) throws Exception {
@Override
@SuppressWarnings("unchecked")
public String[] toExternal(RowData rowData, String[] data) throws Exception {
for (int index = 0; index < fieldTypes.length; index++) {
for (int index = 0; index < columnNameList.size(); index++) {
toExternalConverters.get(index).serialize(rowData, index, data);
}
return data;
Expand Down

0 comments on commit 8f347c9

Please sign in to comment.