From c1e2ef4a526bd5120da3c1753faaf8b4735186f7 Mon Sep 17 00:00:00 2001 From: "angeloantonio.defazio" Date: Wed, 17 Jan 2024 07:11:38 +0100 Subject: [PATCH] Use interface for DI, and else.. --- .gitignore | 6 + WebFlux/ex1/Dockerfile | 17 ++ WebFlux/ex1/jsclients/callerpage.html | 53 ++++ .../vandy/lockmanager/common/Constants.java | 15 +- .../vandy/lockmanager/common/LockManager.java | 119 ++++---- .../VirtualThreadsConfiguration.java | 37 +++ .../server/LockManagerApplication.java | 47 +-- .../server/LockManagerController.java | 249 ++++++++-------- .../service/LockManagerService.java | 22 ++ .../service/impl/LockManagerServiceImpl.java | 271 ++++++++++++++++++ .../edu/vandy/lockmanager/utils/Utils.java | 27 +- 11 files changed, 621 insertions(+), 242 deletions(-) create mode 100644 WebFlux/ex1/Dockerfile create mode 100644 WebFlux/ex1/jsclients/callerpage.html create mode 100644 WebFlux/ex1/src/main/java/edu/vandy/lockmanager/configuration/VirtualThreadsConfiguration.java create mode 100644 WebFlux/ex1/src/main/java/edu/vandy/lockmanager/service/LockManagerService.java create mode 100644 WebFlux/ex1/src/main/java/edu/vandy/lockmanager/service/impl/LockManagerServiceImpl.java diff --git a/.gitignore b/.gitignore index 048930795..e9c33558c 100644 --- a/.gitignore +++ b/.gitignore @@ -91,3 +91,9 @@ com_crashlytics_export_strings.xml crashlytics.properties crashlytics-build.properties fabric.properties +# Eclipse projects related +.project +.classpath +.settings/ + + diff --git a/WebFlux/ex1/Dockerfile b/WebFlux/ex1/Dockerfile new file mode 100644 index 000000000..2bc16106c --- /dev/null +++ b/WebFlux/ex1/Dockerfile @@ -0,0 +1,17 @@ +FROM amd64/gradle:8.0.1-jdk19-alpine as builder + +ADD ./build.gradle /home/gradle/build.gradle +ADD ./src /home/gradle/src/ + +RUN gradle build -x test + + + +FROM amazoncorretto:19-alpine-jdk + +# copy jar from builder stage +COPY --from=builder /home/gradle/build/libs/*.jar app.jar + +EXPOSE 8080 8080 + +CMD ["sh", "-c", "java --enable-preview -jar app.jar " ] \ No newline at end of file diff --git a/WebFlux/ex1/jsclients/callerpage.html b/WebFlux/ex1/jsclients/callerpage.html new file mode 100644 index 000000000..cf45d7b27 --- /dev/null +++ b/WebFlux/ex1/jsclients/callerpage.html @@ -0,0 +1,53 @@ + + + + + + Fetch API Example + + + + + + +

+ +

+ + + + + + + \ No newline at end of file diff --git a/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/common/Constants.java b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/common/Constants.java index c7a3b31c3..822a25c2a 100644 --- a/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/common/Constants.java +++ b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/common/Constants.java @@ -3,14 +3,11 @@ /** * Constants shared by the client and server components. */ -public class Constants { - public static final String SERVER_BASE_URL = "http://localhost:8080"; +public interface Constants { + String SERVER_BASE_URL = "http://localhost:8080"; - public static class Endpoints { - public static final String CREATE = "create"; - public static final String ACQUIRE_LOCK = "acquireLock"; - public static final String ACQUIRE_LOCKS = "acquireLocks"; - public static final String RELEASE_LOCK = "releaseLock"; - public static final String RELEASE_LOCKS = "releaseLocks"; - } + public interface Endpoints { + String CREATE = "create", ACQUIRE_LOCK = "acquireLock", ACQUIRE_LOCKS = "acquireLocks", + RELEASE_LOCK = "releaseLock", RELEASE_LOCKS = "releaseLocks", ACQUIRE_LOCKS_TEST = "acquireLocksTest"; + } } diff --git a/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/common/LockManager.java b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/common/LockManager.java index 071848d74..6debb9e18 100644 --- a/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/common/LockManager.java +++ b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/common/LockManager.java @@ -3,76 +3,71 @@ import java.util.Objects; /** - * This class is used to keep track of allocated {@link LockManager} - * objects. + * This class is used to keep track of allocated {@link LockManager} objects. */ public class LockManager { - /** - * The unique name of the {@link LockManager}. - */ - public String name; + /** + * The unique name of the {@link LockManager}. + */ + public String name; - /** - * The number of permits in this {@link LockManager}. - */ - public Integer permitCount; + /** + * The number of permits in this {@link LockManager}. + */ + public Integer permitCount; - /** - * @return The unique name of the {@link LockManager} - * - public String getName() { - return mName; - } - */ + /** + * @return The unique name of the {@link LockManager} + * + * public String getName() { return mName; } + */ - /** - * Set the unique name of the {@link LockManager}. - * - * @param name The unique name of the {@link LockManager} - */ - public LockManager(String name) { - this.name = name; - } + /** + * Set the unique name of the {@link LockManager}. + * + * @param name The unique name of the {@link LockManager} + */ + public LockManager(String name) { + this.name = name; + } - public LockManager(String name, - Integer permitCount) { - this.name = name + ":[" + permitCount + "]"; - } + public LockManager(String name, Integer permitCount) { + this.name = name; + this.permitCount = permitCount; + } - /** - * This class needs a default constructor. - */ - LockManager() { - name = "default"; - } + /** + * This class needs a default constructor. + */ + LockManager() { + name = "default"; + } - /** - * @return A {@link String} representation - */ - @Override - public String toString() { - return name; - } + /** + * @return A {@link String} representation + */ + @Override + public String toString() { + return name + ":[" + permitCount + "]"; + } - /** - * Overrides the {@code equals()} method to compare two {@link - * LockManager} objects based on their {@code name}. - * - * @param object The other {@link Object} to compare with this - * object - * @return true if the object names are equal, false otherwise - */ - @Override - public boolean equals(Object object) { - return object instanceof LockManager other - && this.name.equals(other.name); - } + /** + * Overrides the {@code equals()} method to compare two {@link LockManager} + * objects based on their {@code name}. + * + * @param object The other {@link Object} to compare with this object + * @return true if the object names are equal, false otherwise + */ + @Override + public boolean equals(Object object) { + return object instanceof LockManager other && this.name.equals(other.name); + } - /** - * @return A hash of the {@link LockManager} {@code name} - */ - @Override - public int hashCode() { - return Objects.hash(name); - } + /** + * @return A hash of the {@link LockManager} {@code name} + */ + @Override + public int hashCode() { + return Objects.hash(name); + } } diff --git a/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/configuration/VirtualThreadsConfiguration.java b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/configuration/VirtualThreadsConfiguration.java new file mode 100644 index 000000000..7f6b68bf9 --- /dev/null +++ b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/configuration/VirtualThreadsConfiguration.java @@ -0,0 +1,37 @@ +package edu.vandy.lockmanager.configuration; + +import static org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME; + +import java.util.concurrent.Executors; + +import org.springframework.boot.web.embedded.tomcat.TomcatProtocolHandlerCustomizer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.support.TaskExecutorAdapter; + +@Configuration +@Profile("default") +public class VirtualThreadsConfiguration { + + /** + * Configure the use of Java virtual threads to handle all incoming HTTP + * requests. + */ + @Bean(APPLICATION_TASK_EXECUTOR_BEAN_NAME) + public AsyncTaskExecutor asyncTaskExecutor() { + return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor()); + } + + /** + * Customize the ProtocolHandler on the TomCat Connector to use Java virtual + * threads to handle all incoming HTTP requests. + */ + @Bean + public TomcatProtocolHandlerCustomizer protocolHandlerVirtualThreadExecutorCustomizer() { + return protocolHandler -> protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor()); + + } + +} diff --git a/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/server/LockManagerApplication.java b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/server/LockManagerApplication.java index 20c03d00b..8f8c770f7 100644 --- a/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/server/LockManagerApplication.java +++ b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/server/LockManagerApplication.java @@ -2,51 +2,16 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.web.embedded.tomcat.TomcatProtocolHandlerCustomizer; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; -import org.springframework.core.task.AsyncTaskExecutor; -import org.springframework.core.task.support.TaskExecutorAdapter; - -import java.util.concurrent.Executors; - -import static org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME; @SpringBootApplication @ComponentScan("edu.vandy.lockmanager") public class LockManagerApplication { - /** - * The main entry point into the LockManager microservice. - */ - public static void main(String[] args) { - SpringApplication.run(LockManagerApplication.class, - args); - } + /** + * The main entry point into the LockManager microservice. + */ + public static void main(String[] args) { + SpringApplication.run(LockManagerApplication.class, args); + } - /** - * Configure the use of Java virtual threads to handle all - * incoming HTTP requests. - */ - @Bean(APPLICATION_TASK_EXECUTOR_BEAN_NAME) - public AsyncTaskExecutor asyncTaskExecutor() { - return new TaskExecutorAdapter(Executors - .newVirtualThreadPerTaskExecutor()); - } - - /** - * Customize the ProtocolHandler on the TomCat Connector to - * use Java virtual threads to handle all incoming HTTP requests. - */ - @Bean - public TomcatProtocolHandlerCustomizer protocolHandlerVirtualThreadExecutorCustomizer() { - return protocolHandler -> { - protocolHandler - .setExecutor(Executors.newVirtualThreadPerTaskExecutor()); - }; - } } - - - - - diff --git a/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/server/LockManagerController.java b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/server/LockManagerController.java index cb9504189..a79653fdf 100644 --- a/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/server/LockManagerController.java +++ b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/server/LockManagerController.java @@ -1,130 +1,143 @@ package edu.vandy.lockmanager.server; +import static edu.vandy.lockmanager.common.Constants.Endpoints.ACQUIRE_LOCK; +import static edu.vandy.lockmanager.common.Constants.Endpoints.ACQUIRE_LOCKS; +import static edu.vandy.lockmanager.common.Constants.Endpoints.ACQUIRE_LOCKS_TEST; +import static edu.vandy.lockmanager.common.Constants.Endpoints.CREATE; +import static edu.vandy.lockmanager.common.Constants.Endpoints.RELEASE_LOCK; +import static edu.vandy.lockmanager.common.Constants.Endpoints.RELEASE_LOCKS; +import static edu.vandy.lockmanager.utils.Utils.log; + +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.CrossOrigin; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + import edu.vandy.lockmanager.common.Lock; import edu.vandy.lockmanager.common.LockManager; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.*; +import edu.vandy.lockmanager.service.LockManagerService; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.List; - -import static edu.vandy.lockmanager.common.Constants.Endpoints.*; -import static edu.vandy.lockmanager.utils.Utils.log; - /** - * This Spring {@code @RestController} defines methods that provide a - * lock manager for a semaphore that can be shared amongst multiple - * asynchronous Spring WebFlux clients. + * This Spring {@code @RestController} defines methods that provide a lock + * manager for a semaphore that can be shared amongst multiple asynchronous + * Spring WebFlux clients. */ @RestController +@CrossOrigin(origins = "*") public class LockManagerController { - /** - * Auto-wire the {@link LockManagerController} to the {@link - * LockManagerService}. - */ - @Autowired - LockManagerService mService; - - /** - * Initialize the {@link Lock} manager. - * - * @param permitCount The number of {@link Lock} objects to - * manage - * @return A {@link Mono} that emits the {@link LockManager} - * associated with the state of the semaphore it manages - */ - @GetMapping(CREATE) - public Mono create - (@RequestParam Integer permitCount) { - log("LockController.create()"); - - return mService - // Forward to the service. - .create(permitCount); - } - - /** - * Acquire a {@link Lock}. - * - * @param lockManager The {@link LockManager} that is associated - * with the state of the semaphore it manages - * @return A {@link Mono} that emits an acquired {@link Lock} - */ - @GetMapping(ACQUIRE_LOCK) - public Mono acquire(@RequestParam LockManager lockManager) { - log("LockController.acquire()"); - - return mService - // Forward to the service. - .acquire(lockManager); - } - - /** - * Acquire {@code permits} number of {@link Lock} objects. - * - * @param lockManager The {@link LockManager} that is associated - * with the state of the semaphore it manages - * @param permits The number of permits to acquire - * @return A {@link Flux} that emits {@code permits} number of - * acquired {@link Lock} objects - */ - @GetMapping(ACQUIRE_LOCKS) - Flux acquire(@RequestParam LockManager lockManager, - Integer permits) { - log("LockController.acquire(" - + permits - + ")"); - - return mService - // Forward to the service. - .acquire(lockManager, permits); - } - - /** - * Release the {@link Lock} so other clients can acquire it. - * - * @param lockManager The {@link LockManager} that is associated - * with the state of the semaphore it manages - * @param lock The {@link Lock} to release - * @return A {@link Mono} that emits {@link Boolean#TRUE} if - * the {@link Lock} was released properly and - * {@link Boolean#FALSE} otherwise. - */ - @GetMapping(RELEASE_LOCK) - public Mono release(@RequestParam LockManager lockManager, - @RequestParam Lock lock) { - log("LockController.release(" - + lock - + ")"); - - return mService - // Forward to the service. - .release(lockManager, lock); - } - - /** - * Release the {@code locks} so other clients can acquire them. - * - * @param lockManager The {@link LockManager} that is associated - * with the state of the semaphore it manages - * @param locks A {@link List} that contains {@link Lock} objects - * to release - * @return A {@link Mono} that emits {@link Boolean#TRUE} if the - * {@link Lock} was released properly and {@link - * Boolean#FALSE} otherwise. - */ - @PostMapping(RELEASE_LOCKS) - public Mono release - (@RequestParam LockManager lockManager, - @RequestBody List locks) { - log("LockController.release(" - + locks - + ")"); - - return mService - // Forward to the service. - .release(lockManager, locks); - } -} + /** + * Auto-wire the {@link LockManagerController} to the + * {@link LockManagerService}. + */ + @Autowired + LockManagerService mService; + + /** + * Initialize the {@link Lock} manager. + * + * @param permitCount The number of {@link Lock} objects to manage + * @return A {@link Mono} that emits the {@link LockManager} associated with the + * state of the semaphore it manages + */ + @GetMapping(CREATE) + public Mono create(@RequestParam Integer permitCount) { + log(Thread.currentThread().getStackTrace()[1].getMethodName()); + return mService + // Forward to the service. + .create(permitCount); + } + + /** + * Acquire a {@link Lock}. + * + * @param lockManager The {@link LockManager} that is associated with the state + * of the semaphore it manages + * @return A {@link Mono} that emits an acquired {@link Lock} + */ + @GetMapping(ACQUIRE_LOCK) + public Mono acquire(@RequestParam LockManager lockManager) { + log(Thread.currentThread().getStackTrace()[1].getMethodName()); + + return mService + // Forward to the service. + .acquire(lockManager); + } + + /** + * Acquire {@code permits} number of {@link Lock} objects. + * + * @param lockManager The {@link LockManager} that is associated with the state + * of the semaphore it manages + * @param permits The number of permits to acquire + * @return A {@link Flux} that emits {@code permits} number of acquired + * {@link Lock} objects + */ + @GetMapping(ACQUIRE_LOCKS) + Flux acquire(@RequestParam LockManager lockManager, Integer permits) { + log(Thread.currentThread().getStackTrace()[1].getMethodName() + "(" + permits + ")"); + + return mService + // Forward to the service. + .acquire(lockManager, permits); + } + + @GetMapping(ACQUIRE_LOCKS_TEST) + public Flux acquireTest(@RequestParam String lockManagerName, + + @RequestParam Integer lockManagerPermitCount, + + @RequestParam Integer permits) { + log(Thread.currentThread().getStackTrace()[1].getMethodName() + "(" + lockManagerName + ")" + "(" + + lockManagerPermitCount + ")" + "(" + permits + ")"); + LockManager lockManager = new LockManager(lockManagerName, lockManagerPermitCount); + + return mService + // Forward to the service. + .acquire(lockManager, permits); + } + + /** + * Release the {@link Lock} so other clients can acquire it. + * + * @param lockManager The {@link LockManager} that is associated with the state + * of the semaphore it manages + * @param lock The {@link Lock} to release + * @return A {@link Mono} that emits {@link Boolean#TRUE} if the {@link Lock} + * was released properly and {@link Boolean#FALSE} otherwise. + */ + @GetMapping(RELEASE_LOCK) + public Mono release(@RequestParam LockManager lockManager, @RequestParam Lock lock) { + log(Thread.currentThread().getStackTrace()[1].getMethodName() + "(" + lock + ")"); + + return mService + // Forward to the service. + .release(lockManager, lock); + } + + /** + * Release the {@code locks} so other clients can acquire them. + * + * @param lockManager The {@link LockManager} that is associated with the state + * of the semaphore it manages + * @param locks A {@link List} that contains {@link Lock} objects to + * release + * @return A {@link Mono} that emits {@link Boolean#TRUE} if the {@link Lock} + * was released properly and {@link Boolean#FALSE} otherwise. + */ + @PostMapping(RELEASE_LOCKS) + public Mono release(@RequestParam LockManager lockManager, @RequestBody List locks) { + log(Thread.currentThread().getStackTrace()[1].getMethodName() + "(" + locks + ")"); + + return mService + // Forward to the service. + .release(lockManager, locks); + } +} diff --git a/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/service/LockManagerService.java b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/service/LockManagerService.java new file mode 100644 index 000000000..967eef074 --- /dev/null +++ b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/service/LockManagerService.java @@ -0,0 +1,22 @@ +package edu.vandy.lockmanager.service; + +import java.util.List; + +import edu.vandy.lockmanager.common.Lock; +import edu.vandy.lockmanager.common.LockManager; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface LockManagerService { + + Mono create(Integer permitCount); + + Mono acquire(LockManager lockManager); + + Flux acquire(LockManager lockManager, int permits); + + Mono release(LockManager lockManager, List locks); + + Mono release(LockManager lockManager, Lock lock); + +} diff --git a/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/service/impl/LockManagerServiceImpl.java b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/service/impl/LockManagerServiceImpl.java new file mode 100644 index 000000000..c3c2e0a05 --- /dev/null +++ b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/service/impl/LockManagerServiceImpl.java @@ -0,0 +1,271 @@ +package edu.vandy.lockmanager.service.impl; + +import static edu.vandy.lockmanager.utils.Utils.generateUniqueId; +import static edu.vandy.lockmanager.utils.Utils.log; +import static java.lang.Boolean.FALSE; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.IntStream; + +import org.springframework.stereotype.Service; + +import edu.vandy.lockmanager.common.Lock; +import edu.vandy.lockmanager.common.LockManager; +import edu.vandy.lockmanager.server.LockManagerController; +import edu.vandy.lockmanager.service.LockManagerService; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * This Spring {@code Service} implements the {@link LockManagerController} + * endpoint handler methods using Spring WebFlux reactive types and a + * {@link Map} of {@link LockManager} objects associated with the + * {@link ArrayBlockingQueue} objects that store the state of each semaphore. + */ +@SuppressWarnings("BlockingMethodInNonBlockingContext") +@Service +class LockManagerServiceImpl implements LockManagerService { + /** + * A {@link Map} that associates {@link LockManager} objects with the + * {@link ArrayBlockingQueue} that stores the state of the semaphore. + */ + private final Map> mLockManagerMap = new ConcurrentHashMap<>(); + + /** + * Initialize the {@link Lock} manager. + * + * @param permitCount The number of {@link Lock} objects to manage + * @return A {@link Mono} that emits a {@link LockManager} uniquely identifying + * this semaphore + */ + public Mono create(Integer permitCount) { + return Mono.fromSupplier(() -> { + var availableLocks = + // Make an ArrayBlockQueue with "fair" + // semantics that limits concurrent access to + // the fixed number of available locks. + new ArrayBlockingQueue(permitCount, true); + + // Add each Lock to the queue. + availableLocks.addAll(makeLocks(permitCount)); + + // Create a new LockManager with a unique name. + var lockManager = new LockManager(generateUniqueId(), permitCount); + + // Insert the new LockManager and the + // ArrayBlockingQueue into the Map. + mLockManagerMap.put(lockManager, availableLocks); + + log("LockService.create(" + permitCount + ") " + "- made " + lockManager + " with locks = " + + availableLocks); + + // Return the new LockManager. + return lockManager; + }); + } + + /** + * Create the requested number of {@link Lock} objects. + * + * @param count The number of {@link Lock} objects to create + */ + private List makeLocks(int count) { + return IntStream + // Iterate from 0 to count - 1. + .range(0, count) + + // Convert Integer to String. + .mapToObj(Integer::toString) + + // Create a new Lock. + .map(Lock::new) + + // Convert the Stream to a List. + .toList(); + } + + /** + * Acquire a {@link Lock}, blocking until one is available, but return a + * {@link Mono} so the caller needn't block. + * + * @param lockManager The {@link LockManager} that is associated with the state + * of the semaphore it manages + * @return A {@link Mono} that emits a {@link Lock} + */ + public Mono acquire(LockManager lockManager) { + log("LockService.acquire() on " + lockManager); + + return Mono + // Acquire an available lock, which may block. + .fromCallable(() -> { + log("LockService - requesting a Lock"); + + // Find the current state of the semaphore + // associated with lockManager. + var availableLocks = mLockManagerMap.get(lockManager); + + if (availableLocks == null) + throw new IllegalArgumentException(lockManager.name); + else { + var lock = availableLocks.poll(); + + if (lock != null) + log("LockService - obtained Lock non-blocking " + lock); + else { + // This call can block since it runs in a + // virtual thread. + lock = availableLocks.take(); + + log("LockService - obtained Lock blocking " + lock); + } + + // Return the Lock. + return lock; + } + }) + // Display any exception that might occur. + .doOnError(exception -> log("LockService error - " + exception.getMessage())) + .doOnSuccess(mono -> log("LockService - returning Mono")); + } + + /** + * Acquire {@code permits} number of {@link Lock} objects. + * + * @param lockManager The {@link LockManager} that is associated with the state + * of the semaphore it manages + * @param permits The number of permits to acquire + * @return A {@link Flux} that emits {@code permits} newly acquired {@link Lock} + * objects + */ + public Flux acquire(LockManager lockManager, int permits) { + log("LockService.acquire(" + permits + ")"); + + // Find the current state of the semaphore associated with + // lockManager. + var availableLocks = mLockManagerMap.get(lockManager); + + if (availableLocks == null) + throw new IllegalArgumentException(lockManager.name); + else { + // Create a List to hold the acquired Lock objects. + List acquiredLocks = new ArrayList<>(permits); + + var flux = Mono + // Create a Mono that executes tryAcquireLock() method + // and emits its result. + .fromSupplier(() -> tryAcquireLock(availableLocks, acquiredLocks)) + + // Repeat the Mono indefinitely. + .repeat() + + // Take elements from the stream until the number of + // acquired locks is equal to 'permits'. + .takeUntil(result -> result.equals(permits)) + + // Log the results. + .doOnNext(result -> { + if (result == permits) + log("LockService.acquire(" + permits + ") = " + result); + }) + // Transform Flux to Flux that emits + // the acquired Lock objects as individual elements. + .thenMany(Flux.fromIterable(acquiredLocks)); + + log("LockService.acquire(" + permits + ") returning Flux"); + return flux; + } + } + + /** + * This helper method tries to acquire a {@link Lock}. + * + * @param availableLocks Contains the state of the semaphore + * @param acquiredLocks The {@link List} of {@link Lock} objects we're trying + * to acquire + * @return The number of {@link Lock} objects in {@code + * acquiredLocks} + */ + private Integer tryAcquireLock(ArrayBlockingQueue availableLocks, List acquiredLocks) { + // Perform a non-blocking poll(). + var lock = availableLocks.poll(); + + if (lock != null) { + // Add the acquired lock to the List. + acquiredLocks.add(lock); + + // Return the number of acquired locks. + return acquiredLocks.size(); + } else { + // Not enough locks are available, so release the acquired + // locks. + acquiredLocks + // offer() does not block. + .forEach(availableLocks::offer); + + // Clear out the acquiredLocks List. + acquiredLocks.clear(); + + // Indicate we need to restart from the beginning. + return 0; + } + } + + /** + * Release the {@link Lock}. + * + * @param lockManager The {@link LockManager} that is associated with the state + * of the semaphore it manages + * @param lock The {@link Lock} to release + * @return A {@link Mono} that emits {@link Boolean#TRUE} if the {@link Lock} + * was released properly and {@link Boolean#FALSE} otherwise. + */ + public Mono release(LockManager lockManager, Lock lock) { + log("LockService.release([" + lock + "]) on " + lockManager); + + // Try to get the locks associated with the lockManager. + var availableLocks = mLockManagerMap.get(lockManager); + + if (availableLocks == null) + return Mono.just(FALSE); + return Mono + // Put the lock back into mAvailableQueue w/out blocking. + .just(availableLocks.offer(lock)); + } + + /** + * Release the {@code locks}. + * + * @param lockManager The {@link LockManager} that is associated with the state + * of the semaphore it manages + * @param locks A {@link List} that contains {@link Lock} objects to + * release + * @return A {@link Mono} that emits {@link Boolean#TRUE} if the {@link Lock} + * was released properly and {@link Boolean#FALSE} otherwise. + */ + public Mono release(LockManager lockManager, List locks) { + log("LockService.release(" + locks.size() + ") " + locks + " on " + lockManager); + + // Try to get the locks associated with lockManager. + var availableLocks = mLockManagerMap.get(lockManager); + + if (availableLocks == null) + return Mono.just(FALSE); + else { + boolean allReleased = locks + // Convert List to a Stream. + .stream() + + // Return true if all locks are put back into + // mAvailableLocks successfully (does not block). + .allMatch(availableLocks::offer); + + return Mono + // Return the result, either true or false. + .just(allReleased); + } + } +} diff --git a/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/utils/Utils.java b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/utils/Utils.java index 122bf0cfb..4b1e83cbb 100644 --- a/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/utils/Utils.java +++ b/WebFlux/ex1/src/main/java/edu/vandy/lockmanager/utils/Utils.java @@ -1,18 +1,21 @@ package edu.vandy.lockmanager.utils; import java.util.UUID; +import java.util.logging.Logger; -public class Utils { - public static void log(String text) { - var thread = Thread.currentThread(); //.threadId(); - System.out.println(text - + " [" + thread + "]: "); - } +public interface Utils { - /** - * @return A unique {@link String} id - */ - public static String generateUniqueId() { - return UUID.randomUUID().toString(); - } + Logger logger = Logger.getLogger(Utils.class.getName()); + + static void log(String text) { + var thread = Thread.currentThread(); // .threadId(); + logger.info(text + " [" + thread + "]: "); + } + + /** + * @return A unique {@link String} id + */ + static String generateUniqueId() { + return UUID.randomUUID().toString(); + } }