Skip to content

Commit

Permalink
Yyin/QTDI-653 add LastGroup annotation to Processor's afterGroup meth…
Browse files Browse the repository at this point in the history
…od. (#966)

* feat(QTDI-653): add annotation LastGroup for AfterGroup
https://qlik-dev.atlassian.net/browse/QTDI-653
---------

Co-authored-by: undx <[email protected]>
Co-authored-by: Yves Piel <[email protected]>
  • Loading branch information
3 people authored Jan 16, 2025
1 parent 422cfed commit 83cd8b3
Show file tree
Hide file tree
Showing 17 changed files with 1,070 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright (C) 2006-2024 Talend Inc. - www.talend.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.talend.sdk.component.api.processor;

import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

@Target(PARAMETER)
@Retention(RUNTIME)
public @interface LastGroup {
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,11 @@ public interface Processor extends Lifecycle {
// since it will never work in the studio with current generation logic
void afterGroup(OutputFactory output);

default void afterGroup(OutputFactory output, boolean last) {
afterGroup(output);
}

default boolean isLastGroupUsed() { return false; }

void onNext(InputFactory input, OutputFactory output);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;
Expand All @@ -53,6 +55,7 @@
import org.talend.sdk.component.api.processor.BeforeGroup;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.processor.Input;
import org.talend.sdk.component.api.processor.LastGroup;
import org.talend.sdk.component.api.processor.Output;
import org.talend.sdk.component.api.service.record.RecordBuilderFactory;
import org.talend.sdk.component.runtime.base.Delegated;
Expand Down Expand Up @@ -122,16 +125,18 @@ public void beforeGroup() {
: Stream.of(process.getParameters()).map(this::buildProcessParamBuilder).collect(toList());
parameterBuilderAfterGroup = afterGroup
.stream()
.map(after -> new AbstractMap.SimpleEntry<>(after, Stream.of(after.getParameters()).map(param -> {
if (isGroupBuffer(param.getParameterizedType())) {
expectedRecordType = Class.class
.cast(ParameterizedType.class
.cast(param.getParameterizedType())
.getActualTypeArguments()[0]);
return (Function<OutputFactory, Object>) o -> records;
}
return toOutputParamBuilder(param);
}).collect(toList())))
.map(after -> new AbstractMap.SimpleEntry<>(after, Stream.of(after.getParameters())
.map(param -> {
if (isGroupBuffer(param.getParameterizedType())) {
expectedRecordType = Class.class
.cast(ParameterizedType.class
.cast(param.getParameterizedType())
.getActualTypeArguments()[0]);
return (Function<OutputFactory, Object>) o -> records;
}
return toOutputParamBuilder(param);
})
.collect(toList())))
.collect(toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
forwardReturn = process != null && process.getReturnType() != void.class;

Expand Down Expand Up @@ -162,6 +167,9 @@ private BiFunction<InputFactory, OutputFactory, Object> buildProcessParamBuilder

private Function<OutputFactory, Object> toOutputParamBuilder(final Parameter parameter) {
return outputs -> {
if (parameter.isAnnotationPresent(LastGroup.class)) {
return false;
}
final String name = parameter.getAnnotation(Output.class).value();
return outputs.create(name);
};
Expand Down Expand Up @@ -239,13 +247,44 @@ private JsonProvider jsonProvider() {

@Override
public void afterGroup(final OutputFactory output) {
afterGroup
.forEach(after -> doInvoke(after,
parameterBuilderAfterGroup
.get(after)
.stream()
.map(b -> b.apply(output))
.toArray(Object[]::new)));
afterGroup.forEach(after -> {
Object[] params = parameterBuilderAfterGroup.get(after)
.stream()
.map(b -> b.apply(output))
.toArray(Object[]::new);
doInvoke(after, params);
});
if (records != null) {
records = null;
}
}

@Override
public boolean isLastGroupUsed() {
AtomicReference<Boolean> hasLastGroup = new AtomicReference<>(false);
Optional.ofNullable(afterGroup)
.orElse(new ArrayList<>())
.forEach(after -> {
for (Parameter param : after.getParameters()) {
if (param.isAnnotationPresent(LastGroup.class)) {
hasLastGroup.set(true);
}
}
});
return hasLastGroup.get();
}

@Override
public void afterGroup(final OutputFactory output, final boolean last) {
afterGroup.forEach(after -> {
Object[] params = Stream.concat(
parameterBuilderAfterGroup.get(after)
.stream()
.map(b -> b.apply(output))
.filter(b -> !b.equals(false)),
Stream.of(last)).toArray(Object[]::new);
doInvoke(after, params);
});
if (records != null) {
records = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.talend.sdk.component.api.processor.AfterGroup;
import org.talend.sdk.component.api.processor.BeforeGroup;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.processor.LastGroup;
import org.talend.sdk.component.api.processor.Output;
import org.talend.sdk.component.api.processor.OutputEmitter;
import org.talend.sdk.component.api.processor.Processor;
Expand All @@ -58,6 +59,8 @@ public class ModelVisitor {
.asList(Boolean.class, Byte.class, byte[].class, Character.class, Date.class, Double.class, Float.class,
BigDecimal.class, Integer.class, Long.class, Object.class, Short.class, String.class, List.class));

public static final String MUST_NOT_HAVE_ANY_PARAMETER = " must not have any parameter";

public void visit(final Class<?> type, final ModelListener listener, final boolean validate) {
if (getSupportedComponentTypes().noneMatch(type::isAnnotationPresent)) { // unlikely but just in case
return;
Expand Down Expand Up @@ -112,7 +115,7 @@ private void validatePartitionMapper(final Class<?> type) {
if (!infinite) {
Stream.of(type.getMethods()).filter(m -> m.isAnnotationPresent(Assessor.class)).forEach(m -> {
if (m.getParameterCount() > 0) {
throw new IllegalArgumentException(m + " must not have any parameter");
throw new IllegalArgumentException(m + MUST_NOT_HAVE_ANY_PARAMETER);
}
});
}
Expand Down Expand Up @@ -149,7 +152,7 @@ private void validatePartitionMapper(final Class<?> type) {
// for now we don't support injection propagation since the mapper should
// already own all the config
if (m.getParameterCount() > 0) {
throw new IllegalArgumentException(m + " must not have any parameter");
throw new IllegalArgumentException(m + MUST_NOT_HAVE_ANY_PARAMETER);
}
});

Expand All @@ -165,7 +168,7 @@ private void validateEmitter(final Class<?> input) {
}

if (producers.get(0).getParameterCount() > 0) {
throw new IllegalArgumentException(producers.get(0) + " must not have any parameter");
throw new IllegalArgumentException(producers.get(0) + MUST_NOT_HAVE_ANY_PARAMETER);
}

validateAfterVariableAnnotationDeclaration(input);
Expand All @@ -182,7 +185,7 @@ private void validateDriverRunner(final Class<?> standalone) {
}

if (driverRunners.get(0).getParameterCount() > 0) {
throw new IllegalArgumentException(driverRunners.get(0) + " must not have any parameter");
throw new IllegalArgumentException(driverRunners.get(0) + MUST_NOT_HAVE_ANY_PARAMETER);
}

validateAfterVariableAnnotationDeclaration(standalone);
Expand All @@ -199,13 +202,34 @@ private void validateProcessor(final Class<?> input) {
}
})
.filter(p -> !p.isAnnotationPresent(Output.class))
.filter(p -> !p.isAnnotationPresent(LastGroup.class))
.filter(p -> !Parameters.isGroupBuffer(p.getParameterizedType()))
.collect(toList());
if (!invalidParams.isEmpty()) {
throw new IllegalArgumentException("Parameter of AfterGroup method need to be annotated with Output");
}
});
if (afterGroups
.stream()
.anyMatch(m -> Stream.of(m.getParameters()).anyMatch(p -> p.isAnnotationPresent(LastGroup.class)))
&& afterGroups.size() > 1) {
throw new IllegalArgumentException(input
+ " must have a single @AfterGroup method with @LastGroup parameter");
}

validateProducer(input, afterGroups);

Stream.of(input.getMethods()).filter(m -> m.isAnnotationPresent(BeforeGroup.class)).forEach(m -> {
if (m.getParameterCount() > 0) {
throw new IllegalArgumentException(m + MUST_NOT_HAVE_ANY_PARAMETER);
}
});

validateAfterVariableAnnotationDeclaration(input);
validateAfterVariableContainer(input);
}

private void validateProducer(final Class<?> input, final List<Method> afterGroups) {
final List<Method> producers = Stream
.of(input.getMethods())
.filter(m -> m.isAnnotationPresent(ElementListener.class))
Expand All @@ -227,15 +251,6 @@ private void validateProcessor(final Class<?> input) {
}).filter(p -> !p.isAnnotationPresent(Output.class)).count() < 1) {
throw new IllegalArgumentException(input + " doesn't have the input parameter on its producer method");
}

Stream.of(input.getMethods()).filter(m -> m.isAnnotationPresent(BeforeGroup.class)).forEach(m -> {
if (m.getParameterCount() > 0) {
throw new IllegalArgumentException(m + " must not have any parameter");
}
});

validateAfterVariableAnnotationDeclaration(input);
validateAfterVariableContainer(input);
}

private boolean validOutputParam(final Parameter p) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -37,6 +39,9 @@
import org.talend.sdk.component.api.processor.AfterGroup;
import org.talend.sdk.component.api.processor.BeforeGroup;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.processor.LastGroup;
import org.talend.sdk.component.api.processor.Output;
import org.talend.sdk.component.api.processor.OutputEmitter;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.runtime.record.RecordImpl;
import org.talend.sdk.component.runtime.serialization.Serializer;
Expand Down Expand Up @@ -64,12 +69,24 @@ void bulkGroup() {
data.forEach(it -> processor.onNext(n -> it, null));
assertNull(Bufferized.RECORDS);
processor.afterGroup(null);
assertFalse(processor.isLastGroupUsed());
assertEquals(data, Bufferized.RECORDS);
Bufferized.RECORDS = null;
}
processor.stop();
}

@Test
void bulkGroupWithLastGroup() {
final Processor processor = new ProcessorImpl("Root", "Test", "Plugin", emptyMap(), new SampleLastGroupOutput());
processor.start();
processor.beforeGroup();
assertTrue(processor.isLastGroupUsed());
processor.afterGroup(NO_OUTPUT, true);
assertTrue(SampleLastGroupOutput.isCalled);
processor.stop();
}

@Test
void lifecycle() {
assertLifecycle(new SampleProcessor());
Expand Down Expand Up @@ -186,4 +203,19 @@ public static class Sample {

private int data;
}

public static class SampleLastGroupOutput implements Serializable {
private static boolean isCalled = false;

@ElementListener
public void onNext(final Sample sample) {

}

@AfterGroup
public void afterGroup(@Output("REJECT") final OutputEmitter<Record> records, @LastGroup final boolean isLast) {
isCalled = isLast;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import org.talend.sdk.component.api.input.Split;
import org.talend.sdk.component.api.processor.AfterGroup;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.processor.LastGroup;
import org.talend.sdk.component.api.processor.Output;
import org.talend.sdk.component.api.processor.OutputEmitter;
import org.talend.sdk.component.api.processor.Processor;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.standalone.DriverRunner;
Expand All @@ -65,6 +68,19 @@ void validBulk() {
assertEquals(singletonList(
"@Processor(org.talend.sdk.component.runtime.visitor.visitor.ModelVisitorTest$ProcessorBulk$Out)"),
visit(ProcessorBulk.class));

}

@Test
void validAfterGroupWithLastGroup() {
assertEquals(singletonList(
"@Processor(org.talend.sdk.component.runtime.visitor.visitor.ModelVisitorTest$ProcessorOneAfterGroup$Out)"),
visit(ProcessorOneAfterGroup.class));
}

@Test
void afterGroupWithLastGroupMoreThanOne() {
assertThrows(IllegalArgumentException.class, () -> visit(ProcessorTwoAfterGroup.class));
}

@Test
Expand Down Expand Up @@ -589,6 +605,42 @@ public void commit(final Collection<Record> records) {
}
}

public static class ProcessorOneAfterGroup {

@Processor(family = "comp", name = "Bulk")
public static class Out {

@ElementListener
public void onNext(final In in) {
// no-op
}

@AfterGroup
public void afterGroup(@Output("REJECT") final OutputEmitter<Record> rejected,
@LastGroup final boolean isLast) {
// no-op
}
}
}

public static class ProcessorTwoAfterGroup {

@Processor(family = "comp", name = "Bulk")
public static class Out {

@AfterGroup
public void commit(final Collection<Record> records) {
// no-op
}

@AfterGroup
public void afterGroup(@Output("REJECT") final OutputEmitter<Record> rejected,
@LastGroup final boolean isLast) {
// no-op
}
}
}

public static class EmitterNoProduces {

@Emitter(family = "comp", name = "Input")
Expand Down
Loading

0 comments on commit 83cd8b3

Please sign in to comment.