Skip to content

Commit

Permalink
A preliminary attempt to define a new batch collector library, probably
Browse files Browse the repository at this point in the history
I'll replace it with Reactor.
  • Loading branch information
marco-brandizi committed Nov 6, 2023
1 parent 922d4d2 commit 06edc95
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package uk.ac.ebi.utils.threading.batchproc2;

/**
* TODO: comment me!
*
* @author brandizi
* <dl><dt>Date:</dt><dd>30 Oct 2023</dd></dl>
*
*/
public interface BatchCollector<B>
{
B newBatch ();
boolean isReady ( B batch );
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package uk.ac.ebi.utils.threading.batchproc2;

import java.util.function.Consumer;
import java.util.stream.Stream;

import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;

/**
* TODO: comment me!
*
* @author brandizi
* <dl><dt>Date:</dt><dd>30 Oct 2023</dd></dl>
*
*/
public abstract class BatchProcessor<B>
{
private BatchCollector<B> batchCollector;
private Consumer<B> batchJob;

public abstract void process ();

public static <B1> BatchProcessor<B1> create ( Consumer<BatchSink<B1>> emitter )
{
BatchProcessor<B1> processor = new BatchProcessor<> ()
{
@Override
public void process ()
{
BatchCollector<B1> batchCollector = this.getBatchCollector ();
Consumer<B1> batchJob = this.getBatchJob ();

MutableObject<B1> currentBatch = new MutableObject<> ( batchCollector.newBatch () );

BatchSink<B1> sink = new BatchSink<> ()
{
@Override
public B1 getBatch () {
return currentBatch.getValue ();
}

@Override
public void update ()
{
if ( !batchCollector.isReady ( currentBatch.getValue () ) ) return;

// TODO: submit()
batchJob.accept ( currentBatch.getValue () );
currentBatch.setValue ( batchCollector.newBatch () );
}
}; // BatchSink

emitter.accept ( sink );
}
}; // processor

return processor;
}

public BatchCollector<B> getBatchCollector ()
{
return batchCollector;
}

public Consumer<B> getBatchJob ()
{
return batchJob;
}
}
14 changes: 14 additions & 0 deletions src/main/java/uk/ac/ebi/utils/threading/batchproc2/BatchSink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package uk.ac.ebi.utils.threading.batchproc2;

/**
* TODO: comment me!
*
* @author brandizi
* <dl><dt>Date:</dt><dd>31 Oct 2023</dd></dl>
*
*/
public interface BatchSink<B>
{
B getBatch ();
void update ();
}

0 comments on commit 06edc95

Please sign in to comment.