From a914eb87550ed4a7676878b1a569b95f2d15b83a Mon Sep 17 00:00:00 2001 From: Marco Brandizi Date: Tue, 2 Jul 2024 14:22:54 +0100 Subject: [PATCH] ReactorUtils, refinements. --- .../utils/opt/runcontrol/ReactorUtils.java | 70 ++++++++++++------- 1 file changed, 45 insertions(+), 25 deletions(-) diff --git a/src/main/java/uk/ac/ebi/utils/opt/runcontrol/ReactorUtils.java b/src/main/java/uk/ac/ebi/utils/opt/runcontrol/ReactorUtils.java index 5d03a8bba..8cba5b2bc 100644 --- a/src/main/java/uk/ac/ebi/utils/opt/runcontrol/ReactorUtils.java +++ b/src/main/java/uk/ac/ebi/utils/opt/runcontrol/ReactorUtils.java @@ -23,23 +23,6 @@ */ public class ReactorUtils { - /** - * {@link Schedulers#newBoundedElastic(int, int, String)} with the {@link Schedulers#DEFAULT_BOUNDED_ELASTIC_SIZE default threadCap} - * and a low limit for queuedTaskCap. This is suitable for cases where the source is - * much faster than the downstream processing and hence there is little point with queueing - * too much stuff. - * - */ - public static final Scheduler DEFAULT_FLUX_SCHEDULER = newBoundedElastic ( - DEFAULT_BOUNDED_ELASTIC_SIZE, 100, - "jutils.batchSched" - ); - - /** - * This has been tested in tasks like saving data on a database. - */ - public static final int DEFAULT_BATCH_SIZE = 2500; - /** * Little helper to build a common {@link ParallelFlux} to process a source of items * in parallel batches. @@ -50,6 +33,24 @@ public class ReactorUtils */ public static class ParallelBatchFluxBuilder> { + /** + * {@link Schedulers#newBoundedElastic(int, int, String)} with the {@link Schedulers#DEFAULT_BOUNDED_ELASTIC_SIZE default threadCap} + * and a low limit for queuedTaskCap. This is suitable for cases where the source is + * much faster than the downstream processing and hence there is little point with queueing + * too much stuff. + * + */ + public static final Scheduler DEFAULT_FLUX_SCHEDULER = newBoundedElastic ( + DEFAULT_BOUNDED_ELASTIC_SIZE, 100, + "jutils.batchSched" + ); + + /** + * This has been tested in tasks like saving data on a database. + */ + public static final int DEFAULT_BATCH_SIZE = 2500; + + private Flux flux; private Scheduler scheduler = DEFAULT_FLUX_SCHEDULER; private int batchSize = DEFAULT_BATCH_SIZE; @@ -83,7 +84,7 @@ public ParallelFlux build () } /** - * Default is {@link ReactorUtils#DEFAULT_FLUX_SCHEDULER}. + * Default is {@link #DEFAULT_FLUX_SCHEDULER}. */ public ParallelBatchFluxBuilder withScheduler ( Scheduler scheduler ) { @@ -92,7 +93,7 @@ public ParallelBatchFluxBuilder withScheduler ( Scheduler scheduler ) } /** - * Default it {@link ReactorUtils#DEFAULT_BATCH_SIZE}. + * Default it {@link #DEFAULT_BATCH_SIZE}. */ public ParallelBatchFluxBuilder withBatchSize ( int batchSize ) { @@ -112,6 +113,28 @@ public ParallelBatchFluxBuilder withBatchSupplier ( Supplier ParallelFlux> parallelBatchFlux ( Flux flux ) { + return new ParallelBatchFluxBuilder> ( flux ).build (); + } + + /** + * Just uses {@link ParallelBatchFluxBuilder} with its defaults. + */ + public static ParallelFlux> parallelBatchFlux ( Stream stream ) { + return new ParallelBatchFluxBuilder> ( stream ).build (); + } + + /** + * Just uses {@link ParallelBatchFluxBuilder} with its defaults. + */ + public static ParallelFlux> parallelBatchFlux ( Collection collection ) { + return new ParallelBatchFluxBuilder> ( collection ).build (); + } + + /** * Uses {@link ParallelBatchFluxBuilder} to process a source of batches. */ @@ -134,8 +157,7 @@ public static void batchProcessing ( Flux flux, Consumer> task ) { - ParallelFlux> parFlux = new ParallelBatchFluxBuilder> ( flux ).build (); - batchProcessing ( parFlux, task ); + batchProcessing ( parallelBatchFlux ( flux ), task ); } /** @@ -145,8 +167,7 @@ public static void batchProcessing ( Stream stream, Consumer> task ) { - ParallelFlux> parFlux = new ParallelBatchFluxBuilder> ( stream ).build (); - batchProcessing ( parFlux, task ); + batchProcessing ( parallelBatchFlux ( stream ), task ); } /** @@ -156,8 +177,7 @@ public static void batchProcessing ( Collection collection, Consumer> task ) { - ParallelFlux> parFlux = new ParallelBatchFluxBuilder> ( collection ).build (); - batchProcessing ( parFlux, task ); + batchProcessing ( parallelBatchFlux ( collection ), task ); } }