diff --git a/rel/src/main/java/io/dingodb/expr/rel/TupleKey.java b/rel/src/main/java/io/dingodb/expr/rel/TupleKey.java index 84e26248..f1870c3a 100644 --- a/rel/src/main/java/io/dingodb/expr/rel/TupleKey.java +++ b/rel/src/main/java/io/dingodb/expr/rel/TupleKey.java @@ -42,4 +42,9 @@ public boolean equals(Object obj) { } return false; } + + @Override + public String toString() { + return Arrays.toString(tuple); + } } diff --git a/rel/src/main/java/io/dingodb/expr/rel/op/FilterOp.java b/rel/src/main/java/io/dingodb/expr/rel/op/FilterOp.java index 6387c658..145a0c0c 100644 --- a/rel/src/main/java/io/dingodb/expr/rel/op/FilterOp.java +++ b/rel/src/main/java/io/dingodb/expr/rel/op/FilterOp.java @@ -32,6 +32,8 @@ import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import java.util.Arrays; + @Slf4j @EqualsAndHashCode(callSuper = true, of = {"filter"}) public final class FilterOp extends AbstractRelOp implements PipeOp { @@ -60,6 +62,9 @@ private FilterOp( public Object @Nullable [] put(Object @NonNull [] tuple) { evalContext.setTuple(tuple); Object v = filter.eval(evalContext, exprConfig); + if (log.isTraceEnabled()) { + log.trace("Input: {}, filter evaluated result: {}", Arrays.toString(tuple), v); + } return (v != null && (Boolean) v) ? tuple : null; } diff --git a/rel/src/main/java/io/dingodb/expr/rel/op/GroupedAggregateOp.java b/rel/src/main/java/io/dingodb/expr/rel/op/GroupedAggregateOp.java index a640bbe7..f88e2990 100644 --- a/rel/src/main/java/io/dingodb/expr/rel/op/GroupedAggregateOp.java +++ b/rel/src/main/java/io/dingodb/expr/rel/op/GroupedAggregateOp.java @@ -31,13 +31,16 @@ import io.dingodb.expr.runtime.type.Types; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.NonNull; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.Stream; +@Slf4j @EqualsAndHashCode(callSuper = true, of = {"groupIndices"}) public final class GroupedAggregateOp extends AggregateOp { public static final String NAME = "AGG"; @@ -73,29 +76,48 @@ private Object[] createVars(TupleKey tupleKey) { } @Override - public void put(Object @NonNull [] tuple) { + public synchronized void put(Object @NonNull [] tuple) { assert cacheSupplier != null && cache != null : "Cache not initialized, call `this.setCache` first."; TupleKey tupleKey = new TupleKey(ArrayUtils.map(tuple, groupIndices)); calc(cache.get(tupleKey), tuple, () -> createVars(tupleKey)); + if (log.isTraceEnabled()) { + log.trace("Input: {}", Arrays.toString(tuple)); + } } @Override - public @NonNull Stream get() { + public synchronized @NonNull Stream get() { + if (log.isTraceEnabled()) { + if (cache.isEmpty()) { + log.trace("No result in cache."); + } else { + String content = + "{" + + cache.entrySet().stream() + .map(e -> e.getKey() + ": " + Arrays.toString(e.getValue())) + .collect(Collectors.joining(", ")) + + "}"; + log.trace("Result in cache: {} items, {}", cache.size(), content); + } + } return cache.entrySet().stream() .map(e -> ArrayUtils.concat(e.getKey().getTuple(), e.getValue())); } @Override - public void reduce(Object @NonNull [] tuple) { + public synchronized void reduce(Object @NonNull [] tuple) { int length = groupIndices.length; TupleKey tupleKey = new TupleKey(Arrays.copyOf(tuple, length)); merge(cache.get(tupleKey), tuple, length, () -> createVars(tupleKey)); } @Override - public void clear() { + public synchronized void clear() { cache.clear(); + if (log.isTraceEnabled()) { + log.trace("Cache cleared."); + } } @Override diff --git a/rel/src/main/java/io/dingodb/expr/rel/op/ProjectOp.java b/rel/src/main/java/io/dingodb/expr/rel/op/ProjectOp.java index 83982e92..0726e085 100644 --- a/rel/src/main/java/io/dingodb/expr/rel/op/ProjectOp.java +++ b/rel/src/main/java/io/dingodb/expr/rel/op/ProjectOp.java @@ -30,10 +30,12 @@ import io.dingodb.expr.runtime.type.Types; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.NonNull; import java.util.Arrays; +@Slf4j @EqualsAndHashCode(callSuper = true, of = {"projects"}) public final class ProjectOp extends AbstractRelOp implements PipeOp { public static final String NAME = "PROJECT"; @@ -55,15 +57,18 @@ private ProjectOp( ) { super(type, evalContext, exprConfig); this.projects = projects; - } @Override public Object @NonNull [] put(Object @NonNull [] tuple) { evalContext.setTuple(tuple); - return Arrays.stream(projects) + Object[] result = Arrays.stream(projects) .map(p -> p.eval(evalContext, exprConfig)) .toArray(Object[]::new); + if (log.isTraceEnabled()) { + log.trace("Input: {}, output: {}", Arrays.toString(tuple), Arrays.toString(result)); + } + return result; } @Override diff --git a/rel/src/main/java/io/dingodb/expr/rel/op/UngroupedAggregateOp.java b/rel/src/main/java/io/dingodb/expr/rel/op/UngroupedAggregateOp.java index 8c41bc00..6dc32d1f 100644 --- a/rel/src/main/java/io/dingodb/expr/rel/op/UngroupedAggregateOp.java +++ b/rel/src/main/java/io/dingodb/expr/rel/op/UngroupedAggregateOp.java @@ -28,12 +28,17 @@ import io.dingodb.expr.runtime.type.TupleType; import io.dingodb.expr.runtime.type.Type; import io.dingodb.expr.runtime.type.Types; +import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.NonNull; +import java.util.Arrays; import java.util.List; import java.util.stream.IntStream; import java.util.stream.Stream; +@Slf4j +@EqualsAndHashCode(callSuper = true) public final class UngroupedAggregateOp extends AggregateOp { public static final String NAME = "AGG"; @@ -63,26 +68,30 @@ private Object[] createVars() { } @Override - public void reduce(Object @NonNull [] tuple) { - merge(vars, tuple, 0, this::createVars); - } - - @Override - public void put(Object @NonNull [] tuple) { + public synchronized void put(Object @NonNull [] tuple) { assert cacheSupplier != null : "Cache not initialized, call `this.setCache` first."; calc(vars, tuple, this::createVars); + if (log.isTraceEnabled()) { + log.trace("Input: {}", Arrays.toString(tuple)); + } } @Override - public @NonNull Stream get() { + public synchronized @NonNull Stream get() { if (vars != null) { + if (log.isTraceEnabled()) { + log.trace("Result in cache: {}", Arrays.toString(vars)); + } return Stream.of( IntStream.range(0, vars.length) .mapToObj(i -> vars[i] != null ? vars[i] : ((AggExpr) aggList.get(i)).emptyValue()) .toArray() ); } + if (log.isTraceEnabled()) { + log.trace("No result in cache."); + } return Stream.of( aggList.stream() .map(agg -> ((AggExpr) agg).emptyValue()) @@ -91,8 +100,16 @@ public void put(Object @NonNull [] tuple) { } @Override - public void clear() { + public synchronized void reduce(Object @NonNull [] tuple) { + merge(vars, tuple, 0, this::createVars); + } + + @Override + public synchronized void clear() { vars = null; + if (log.isTraceEnabled()) { + log.trace("Cache cleared."); + } } @Override