Skip to content

Commit

Permalink
修正RecordFile的header和trailer的生成
Browse files Browse the repository at this point in the history
  • Loading branch information
entropy-cloud committed Dec 5, 2024
1 parent 761f3b6 commit 9f85129
Show file tree
Hide file tree
Showing 22 changed files with 286 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.nop.batch.core;

import io.nop.api.core.beans.IntRangeBean;
import io.nop.commons.cache.ICache;
import io.nop.core.context.IExecutionContext;
import io.nop.core.context.IServiceContext;
import io.nop.core.utils.IVarSet;
Expand All @@ -23,6 +24,8 @@
public interface IBatchTaskContext extends IExecutionContext {
IServiceContext getServiceContext();

ICache<Object,Object> getCache();

String getTaskName();

void setTaskName(String taskName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.nop.batch.core.consumer;

import io.nop.api.core.exceptions.NopException;
import io.nop.batch.core.BatchConstants;
import io.nop.batch.core.IBatchAggregator;
import io.nop.batch.core.IBatchConsumerProvider;
import io.nop.batch.core.IBatchMetaProvider;
Expand All @@ -19,6 +20,7 @@
import io.nop.core.resource.record.IResourceRecordOutputProvider;
import io.nop.dataset.record.IRecordOutput;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -96,7 +98,14 @@ ConsumerState<R> newConsumerState(IBatchTaskContext context) {
if (metaProvider != null) {
// 写入header
header = metaProvider.getMeta(context);
} else {
header = new HashMap<>();
header.put(BatchConstants.VAR_BATCH_TASK_CTX, context);
}
try {
state.output.beginWrite(header);
} catch (Exception e) {
throw NopException.adapt(e);
}

// 用于汇总计算trailer
Expand All @@ -105,18 +114,28 @@ ConsumerState<R> newConsumerState(IBatchTaskContext context) {

context.onBeforeComplete(() -> {
Map<String, Object> trailer = aggregator.complete(null, state.combinedValue);
state.output.endWrite(trailer);
try {
state.output.endWrite(trailer);
state.output.flush();
} catch (Exception e) {
throw NopException.adapt(e);
}
});

context.onAfterComplete(err -> {
IoHelper.safeCloseObject(state.output);
} else {
Map<String, Object> finalHeader = header;
context.onBeforeComplete(() -> {
try {
state.output.endWrite(finalHeader);
state.output.flush();
} catch (Exception e) {
throw NopException.adapt(e);
}
});
}

context.onAfterComplete(err -> {
IoHelper.safeCloseObject(state.output);
});
return state;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import io.nop.batch.core.IBatchChunkContext;
import io.nop.batch.core.IBatchTaskContext;
import io.nop.batch.core.IBatchTaskMetrics;
import io.nop.commons.cache.ICache;
import io.nop.commons.cache.MapCache;
import io.nop.core.CoreConstants;
import io.nop.core.context.ExecutionContextImpl;
import io.nop.core.context.IServiceContext;
Expand All @@ -33,6 +35,7 @@ public class BatchTaskContextImpl extends ExecutionContextImpl implements IBatch
static final Logger LOG = LoggerFactory.getLogger(BatchTaskContextImpl.class);

private final IServiceContext serviceContext;
private ICache<Object, Object> cache;

private String taskName;
private Long taskVersion;
Expand Down Expand Up @@ -84,6 +87,18 @@ public IServiceContext getServiceContext() {
return serviceContext;
}

@Override
public synchronized ICache<Object, Object> getCache() {
ICache<Object, Object> cache = this.cache;
if (cache == null) {
cache = serviceContext == null ? null : serviceContext.getCache();
if (cache == null)
cache = new MapCache<>("batch-task-cache", true);
this.cache = cache;
}
return cache;
}

@Override
public String getTaskName() {
return taskName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/
package io.nop.batch.jdbc.consumer;

import io.nop.api.core.exceptions.NopException;
import io.nop.commons.util.StringHelper;
import io.nop.core.resource.IResource;
import io.nop.core.resource.record.IResourceRecordIO;
Expand Down Expand Up @@ -68,13 +67,10 @@ public long getWriteCount() {
}

@Override
public void write(Map<String, Object> record) {
public void write(Map<String, Object> record) throws IOException {
count++;
try {
out.write(buildInsertSql(record));
} catch (Exception e) {
throw NopException.adapt(e);
}

out.write(buildInsertSql(record));
}

String buildInsertSql(Map<String, Object> record) {
Expand Down Expand Up @@ -133,12 +129,8 @@ private String encode(Object value) {
}

@Override
public void flush() {
try {
out.flush();
} catch (IOException e) {
throw NopException.adapt(e);
}
public void flush() throws IOException {
out.flush();
}

@Override
Expand Down
Binary file modified nop-cli/demo/_vfs/batch/txn.record-file.xlsx
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package io.nop.core.resource.record.csv;

import io.nop.api.core.exceptions.NopException;
import io.nop.commons.util.IoHelper;
import io.nop.core.reflect.ReflectionManager;
import io.nop.core.resource.IResource;
Expand All @@ -20,7 +21,7 @@

public class CsvHelper {

public static <T> List<T> readCsv(IResource resource, Type type, CSVFormat format){
public static <T> List<T> readCsv(IResource resource, Type type, CSVFormat format) {
return readCsv(resource, type, format, null);
}

Expand Down Expand Up @@ -51,6 +52,9 @@ public static <T> void writeCsv(IResource resource, CSVFormat format, List<Strin
CsvRecordOutput<T> output = new CsvRecordOutput<>(resource, null, format, headers, true);
try {
output.writeBatch(data);
output.flush();
} catch (IOException e) {
throw NopException.adapt(e);
} finally {
IoHelper.safeCloseObject(output);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public long getWriteCount() {
}

@Override
public void writeBatch(Collection<? extends T> records) {
public void writeBatch(Collection<? extends T> records) throws IOException {
if (records == null || records.isEmpty()) {
if (!headersWritten) {
if (!CollectionHelper.isEmpty(headers)) {
Expand All @@ -66,45 +66,37 @@ public void writeBatch(Collection<? extends T> records) {
return;
}

try {
if (!headersWritten) {
writeHeaders(CollectionHelper.first(records));
headersWritten = true;
}
if (!headersWritten) {
writeHeaders(CollectionHelper.first(records));
headersWritten = true;
}

for (T record : records) {
write(record);
}
} catch (IOException e) {
throw NopException.adapt(e);
for (T record : records) {
write(record);
}
}

@Override
public void write(T record) {
try {
if (!headersWritten) {
writeHeaders(record);
headersWritten = true;
}

if (headers == null) {
writer.printRecord((Iterable<?>) record);
writeCount++;
return;
}
public void write(T record) throws IOException {
if (!headersWritten) {
writeHeaders(record);
headersWritten = true;
}

Object[] row = new String[headers.size()];
int index = 0;
for (String header : headers) {
Object value = BeanTool.getComplexProperty(record, header);
row[index++] = toString(value);
}
writer.printRecord(row);
if (headers == null) {
writer.printRecord((Iterable<?>) record);
writeCount++;
} catch (IOException e) {
throw NopException.adapt(e);
return;
}

Object[] row = new String[headers.size()];
int index = 0;
for (String header : headers) {
Object value = BeanTool.getComplexProperty(record, header);
row[index++] = toString(value);
}
writer.printRecord(row);
writeCount++;
}

private String toString(Object value) {
Expand All @@ -124,12 +116,8 @@ private void writeHeaders(T record) throws IOException {
}

@Override
public void flush() {
try {
writer.flush();
} catch (IOException e) {
throw NopException.adapt(e);
}
public void flush() throws IOException {
writer.flush();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@
*/
public interface IRecordOutput<T> extends Closeable, Flushable {

default void beginWrite(Map<String, Object> headerMeta) {
default void beginWrite(Map<String, Object> headerMeta) throws IOException {

}

default void setHeaders(List<String> headers) {

}

default void endWrite(Map<String, Object> trailerMeta) {
default void endWrite(Map<String, Object> trailerMeta) throws IOException {

}

long getWriteCount();

void write(T record);
void write(T record) throws IOException;

default void writeBatch(Collection<? extends T> records) {
default void writeBatch(Collection<? extends T> records) throws IOException {
if (records != null) {
for (T record : records) {
write(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.nop.ooxml.xlsx.model.WorkbookPart;
import io.nop.ooxml.xlsx.model.XSSFSheetRef;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -87,7 +88,11 @@ public void startRow(int rowNum, Double height, boolean hidden) {

@Override
public void endRow(int rowNum) {
output.write(row);
try {
output.write(row);
} catch (IOException e) {
throw NopException.adapt(e);
}
}

@Override
Expand Down
3 changes: 0 additions & 3 deletions nop-record/src/main/java/io/nop/record/RecordConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ public interface RecordConstants {

String PAGE_HEADER_NAME = "pageHeader";

String VAR_WRITE_COUNT = "$writeCount";

String VAR_INDEX_IN_PAGE = "$indexInPage";

String VAR_AGG_STATE = "aggState";

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
package io.nop.record.codec;

import io.nop.api.core.util.IVariableScope;
import io.nop.core.context.IEvalContext;
import io.nop.record.model.RecordTypeMeta;

public interface IFieldCodecContext extends IEvalContext {
public interface IFieldCodecContext extends IEvalContext, IVariableScope {

@Override
default Object getValueByPropPath(String propPath) {
return getEvalScope().getValueByPropPath(propPath);
}

@Override
default Object getValue(String name) {
return getEvalScope().getValue(name);
}

String getFieldPath();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

public interface IFieldCodecFactory {
default IFieldBinaryCodec newBinaryCodec(IFieldConfig config) {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("not binary codec");
}

default IFieldTextCodec newTextCodec(IFieldConfig config) {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("not text codec");
}
}
Loading

0 comments on commit 9f85129

Please sign in to comment.