From c6f8868610e835ee0e101b4ff09449058fc5f15d Mon Sep 17 00:00:00 2001 From: lucas-phillips28 Date: Tue, 4 Feb 2025 11:35:02 +0000 Subject: [PATCH 1/3] add extra logging --- .../uk/gov/hmcts/reform/preapi/tasks/StartLiveEvents.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/uk/gov/hmcts/reform/preapi/tasks/StartLiveEvents.java b/src/main/java/uk/gov/hmcts/reform/preapi/tasks/StartLiveEvents.java index 5471f6ae6..18c6a01bc 100644 --- a/src/main/java/uk/gov/hmcts/reform/preapi/tasks/StartLiveEvents.java +++ b/src/main/java/uk/gov/hmcts/reform/preapi/tasks/StartLiveEvents.java @@ -130,7 +130,13 @@ private void awaitIngestAddresses(List captureSessionIds) { Thread.sleep(2000); var liveEvents = mediaService.getLiveEvents(); startingCaptureSessions = startingCaptureSessions.stream() - .filter(id -> !tryGetIngestAddress(id, liveEvents)) + .filter(id -> { + var result = tryGetIngestAddress(id, liveEvents); + if (result) { + log.info("Ingest address obtained for capture session {}", id); + } + return !result; + }) .toList(); } while (!startingCaptureSessions.isEmpty()); } catch (Exception e) { From 0e19b5c6c37ec42b5ef2589fbd8aa225e727bded Mon Sep 17 00:00:00 2001 From: lucas-phillips28 Date: Wed, 5 Feb 2025 16:52:26 +0000 Subject: [PATCH 2/3] add batching for cleanup task --- .env.local | 2 + charts/pre-api/values.yaml | 2 + .../hmcts/reform/preapi/entities/Booking.java | 2 +- .../reform/preapi/entities/Participant.java | 2 +- .../reform/preapi/media/IMediaService.java | 8 + .../hmcts/reform/preapi/media/MediaKind.java | 353 +++++++------ .../preapi/tasks/CleanupLiveEvents.java | 445 +++++++++++----- src/main/resources/application.yaml | 4 + .../reform/preapi/media/MediaKindTest.java | 8 +- .../preapi/tasks/CleanupLiveEventsTest.java | 494 +++++------------- 10 files changed, 666 insertions(+), 654 deletions(-) diff --git a/.env.local b/.env.local index 579a901c2..4daa93380 100644 --- a/.env.local +++ b/.env.local @@ -4,6 +4,8 @@ TESTING_SUPPORT_ENDPOINTS_ENABLED=false # CRONS TASK_START_LIVE_EVENTS_BATCH_SIZE=20 +TASK_CLEANUP_LIVE_EVENTS_BATCH_SIZE=3 +TASK_CLEANUP_LIVE_EVENTS_COOLDOWN=60000 # MEDIA SERVICES MEDIA_SERVICE= diff --git a/charts/pre-api/values.yaml b/charts/pre-api/values.yaml index 246245485..fc176786d 100644 --- a/charts/pre-api/values.yaml +++ b/charts/pre-api/values.yaml @@ -59,6 +59,8 @@ java: &javavalues #This is an anchor to reuse settings in Job mode. CRON_USER_EMAIL: "dts-pre-app-{{ .Values.global.environment }}@hmcts.net" ISSUER: https://sts.windows.net/531ff96d-0ae9-462a-8d2d-bec7c0b42082/ TASK_START_LIVE_EVENTS_BATCH_SIZE: 20 + TASK_CLEANUP_LIVE_EVENTS_BATCH_SIZE: 3 + TASK_CLEANUP_LIVE_EVENTS_COOLDOWN: 60000 job: #DO NOT DELETE, Below line is aliasing all values from Java settings above to job to avoid duplication. <<: *javavalues diff --git a/src/main/java/uk/gov/hmcts/reform/preapi/entities/Booking.java b/src/main/java/uk/gov/hmcts/reform/preapi/entities/Booking.java index 53034817b..1aca7bb9b 100644 --- a/src/main/java/uk/gov/hmcts/reform/preapi/entities/Booking.java +++ b/src/main/java/uk/gov/hmcts/reform/preapi/entities/Booking.java @@ -24,7 +24,7 @@ @Entity @Table(name = "bookings") public class Booking extends CreatedModifiedAtEntity implements ISoftDeletable { - @ManyToOne(fetch = FetchType.LAZY) + @ManyToOne(fetch = FetchType.EAGER) @JoinColumn(name = "case_id", referencedColumnName = "id") private Case caseId; diff --git a/src/main/java/uk/gov/hmcts/reform/preapi/entities/Participant.java b/src/main/java/uk/gov/hmcts/reform/preapi/entities/Participant.java index 2f73bde8b..48613dae8 100644 --- a/src/main/java/uk/gov/hmcts/reform/preapi/entities/Participant.java +++ b/src/main/java/uk/gov/hmcts/reform/preapi/entities/Participant.java @@ -25,7 +25,7 @@ @Entity @Table(name = "participants") public class Participant extends CreatedModifiedAtEntity { - @ManyToOne(fetch = FetchType.LAZY) + @ManyToOne(fetch = FetchType.EAGER) @JoinColumn(name = "case_id", referencedColumnName = "id") private Case caseId; diff --git a/src/main/java/uk/gov/hmcts/reform/preapi/media/IMediaService.java b/src/main/java/uk/gov/hmcts/reform/preapi/media/IMediaService.java index 6fc4c1466..419efd32d 100644 --- a/src/main/java/uk/gov/hmcts/reform/preapi/media/IMediaService.java +++ b/src/main/java/uk/gov/hmcts/reform/preapi/media/IMediaService.java @@ -33,4 +33,12 @@ public interface IMediaService { void cleanupStoppedLiveEvent(String liveEventId); void deleteAllStreamingLocatorsAndContentKeyPolicies(); + + String triggerProcessingStep1(CaptureSessionDTO captureSession, String captureSessionNoHyphen, UUID recordingId); + + String triggerProcessingStep2(UUID recordingId); + + RecordingStatus verifyFinalAssetExists(UUID recordingId); + + RecordingStatus hasJobCompleted(String transformName, String jobName); } diff --git a/src/main/java/uk/gov/hmcts/reform/preapi/media/MediaKind.java b/src/main/java/uk/gov/hmcts/reform/preapi/media/MediaKind.java index 0ae32b3a8..e787dba3c 100644 --- a/src/main/java/uk/gov/hmcts/reform/preapi/media/MediaKind.java +++ b/src/main/java/uk/gov/hmcts/reform/preapi/media/MediaKind.java @@ -160,103 +160,6 @@ public PlaybackDTO playAsset(String assetName, String userId) throws Interrupted ); } - private String refreshStreamingLocatorForUser(String userId, String assetName) { - var now = OffsetDateTime.now(); - var streamingLocatorName = userId + "_" + assetName; - - // check streaming locator is still valid - try { - var locator = mediaKindClient.getStreamingLocator(streamingLocatorName); - if (locator.getProperties().getEndTime().toInstant().isAfter(now.toInstant())) { - return streamingLocatorName; - } - mediaKindClient.deleteStreamingLocator(streamingLocatorName); - } catch (NotFoundException e) { - // ignore - } - - mediaKindClient.createStreamingLocator( - streamingLocatorName, - MkStreamingLocator.builder() - .properties( - MkStreamingLocatorProperties.builder() - .assetName(assetName) - .streamingPolicyName(STREAMING_POLICY_CLEAR_KEY) - .defaultContentKeyPolicyName(userId) - // set end time to midnight tonight - .endTime(Timestamp.from( - now.toLocalDate() - .atTime(LocalTime.MAX) - .atZone(now.getOffset()) - .toInstant() - )) - .build()) - .build() - ); - - return streamingLocatorName; - } - - private void assertStreamingPolicyExists(String defaultContentKeyPolicy) { - try { - mediaKindClient.getStreamingPolicy(MediaKind.STREAMING_POLICY_CLEAR_KEY); - } catch (NotFoundException e) { - log.info("Streaming policy {} was not found. Creating streaming policy.", - MediaKind.STREAMING_POLICY_CLEAR_KEY - ); - mediaKindClient.putStreamingPolicy( - STREAMING_POLICY_CLEAR_KEY, - MkStreamingPolicy.builder() - .properties( - MkStreamingPolicyProperties.builder() - .defaultContentKeyPolicyName(defaultContentKeyPolicy) - .envelopeEncryption( - new EnvelopeEncryption() - .withEnabledProtocols( - new EnabledProtocols() - .withDash(true) - .withHls(true) - .withSmoothStreaming(false) - .withDownload(false)) - .withContentKeys( - new StreamingPolicyContentKeys() - .withDefaultKey(new DefaultKey() - .withLabel("ContentKey_AES") - .withPolicyName(defaultContentKeyPolicy))) - ) - .build() - ) - .build() - ); - } - } - - private void createContentKeyPolicy(String userId, String key) { - try { - mediaKindClient.getContentKeyPolicy(userId); - } catch (NotFoundException e) { - mediaKindClient.putContentKeyPolicy(userId, MkContentKeyPolicy.builder() - .properties(MkContentKeyPolicyProperties.builder() - .description("Content key policy for user: " + userId) - .options( - List.of(MkContentKeyPolicyOptions.builder() - .name("key") - .restriction( - new ContentKeyPolicyTokenRestriction() - .withIssuer(issuer) - .withAudience(userId) - .withRestrictionTokenType( - ContentKeyPolicyRestrictionTokenType.JWT) - .withPrimaryVerificationKey( - new ContentKeyPolicySymmetricTokenKey() - .withKeyValue(key.getBytes()))) - .configuration(new ContentKeyPolicyClearKeyConfiguration()) - .build())) - .build()) - .build()); - } - } - @Override public GenerateAssetResponseDTO importAsset(GenerateAssetDTO generateAssetDTO) throws InterruptedException { createAsset(generateAssetDTO.getTempAsset(), @@ -309,18 +212,12 @@ public String playLiveEvent(UUID liveEventId) throws InterruptedException { return parseLiveOutputUrlFromStreamingLocatorPaths(DEFAULT_LIVE_STREAMING_ENDPOINT, paths); } + @Override public LiveEventDTO getLiveEvent(String liveEventName) { return new LiveEventDTO(getLiveEventMk(liveEventName)); } - private MkLiveEvent getLiveEventMk(String liveEventName) { - try { - return mediaKindClient.getLiveEvent(liveEventName); - } catch (NotFoundException e) { - throw new NotFoundException("Live Event: " + liveEventName); - } - } - + @Override public List getLiveEvents() { return getAllMkList(mediaKindClient::getLiveEvents) .map(LiveEventDTO::new) @@ -336,44 +233,25 @@ public RecordingStatus stopLiveEvent(CaptureSessionDTO captureSession, UUID reco cleanupStoppedLiveEvent(captureSessionNoHyphen); - if (!azureIngestStorageService.doesValidAssetExist(captureSession.getBookingId().toString())) { - log.info("No valid asset files found for capture session [{}] in container named [{}]", - captureSession.getId(), - captureSession.getBookingId().toString() - ); + var jobName = triggerProcessingStep1(captureSession, captureSessionNoHyphen, recordingId); + if (jobName == null) { return RecordingStatus.NO_RECORDING; } - - var recordingNoHyphen = getSanitisedLiveEventId(recordingId); - var recordingTempAssetName = recordingNoHyphen + "_temp"; - var recordingAssetName = recordingNoHyphen + "_output"; - - createAsset(recordingTempAssetName, captureSession, recordingId.toString(), false); - createAsset(recordingAssetName, captureSession, recordingId.toString(), true); - - var jobName = encodeFromIngest(captureSessionNoHyphen, recordingTempAssetName); var encodeFromIngestJobState = waitEncodeComplete(jobName, ENCODE_FROM_INGEST_TRANSFORM); if (encodeFromIngestJobState != JobState.FINISHED) { return RecordingStatus.FAILURE; } - var filename = azureIngestStorageService.tryGetMp4FileName(recordingId.toString()); - if (filename == null) { - log.error("Output file from {} transform not found", ENCODE_FROM_INGEST_TRANSFORM); + var jobName2 = triggerProcessingStep2(recordingId); + if (jobName2 == null) { return RecordingStatus.FAILURE; } - - var jobName2 = encodeFromMp4(recordingTempAssetName, recordingAssetName, filename); var encodeFromMp4JobState = waitEncodeComplete(jobName2, ENCODE_FROM_MP4_TRANSFORM); if (encodeFromMp4JobState != JobState.FINISHED) { return RecordingStatus.FAILURE; } - if (!azureFinalStorageService.doesIsmFileExist(recordingId.toString())) { - log.error("Final asset .ism file not found for asset [{}] in container [{}]", - recordingAssetName, recordingId); - return RecordingStatus.FAILURE; - } - return RecordingStatus.RECORDING_AVAILABLE; + + return verifyFinalAssetExists(recordingId); } @Override @@ -427,6 +305,198 @@ private void startLiveEvent(String liveEventName) { } } + @Override + public String triggerProcessingStep1(CaptureSessionDTO captureSession, String captureSessionNoHyphen, + UUID recordingId) { + if (!azureIngestStorageService.doesValidAssetExist(captureSession.getBookingId().toString())) { + log.info("No valid asset files found for capture session [{}] in container named [{}]", + captureSession.getId(), + captureSession.getBookingId().toString() + ); + return null; + } + + var recordingNoHyphen = getSanitisedLiveEventId(recordingId); + var recordingTempAssetName = recordingNoHyphen + "_temp"; + var recordingAssetName = recordingNoHyphen + "_output"; + + createAsset(recordingTempAssetName, captureSession, recordingId.toString(), false); + createAsset(recordingAssetName, captureSession, recordingId.toString(), true); + + return encodeFromIngest(captureSessionNoHyphen, recordingTempAssetName); + } + + @Override + public String triggerProcessingStep2(UUID recordingId) { + var filename = azureIngestStorageService.tryGetMp4FileName(recordingId.toString()); + if (filename == null) { + log.error("Output file from {} transform not found", ENCODE_FROM_INGEST_TRANSFORM); + return null; + } + + var recordingNoHyphen = getSanitisedLiveEventId(recordingId); + var recordingTempAssetName = recordingNoHyphen + "_temp"; + var recordingAssetName = recordingNoHyphen + "_output"; + + return encodeFromMp4(recordingTempAssetName, recordingAssetName, filename); + } + + @Override + public RecordingStatus verifyFinalAssetExists(UUID recordingId) { + var recordingAssetName = getSanitisedLiveEventId(recordingId) + "_output"; + + if (!azureFinalStorageService.doesIsmFileExist(recordingId.toString())) { + log.error("Final asset .ism file not found for asset [{}] in container [{}]", + recordingAssetName, recordingId); + return RecordingStatus.FAILURE; + } + return RecordingStatus.RECORDING_AVAILABLE; + } + + @Override + public RecordingStatus hasJobCompleted(String transformName, String jobName) { + var job = mediaKindClient.getJob(transformName, jobName); + return hasJobCompleted(job) && job.getProperties().getState() == JobState.FINISHED + ? RecordingStatus.RECORDING_AVAILABLE + : (job.getProperties().getState() == JobState.ERROR || job.getProperties().getState() == JobState.CANCELED + ? RecordingStatus.FAILURE + : RecordingStatus.PROCESSING); + } + + private boolean hasJobCompleted(MkJob job) { + var state = job.getProperties().getState(); + var jobName = job.getName(); + + if (state.equals(JobState.ERROR)) { + log.error("Job [{}] failed with error [{}]", + jobName, + job.getProperties().getOutputs().getLast().error().message()); + } else if (state.equals(JobState.CANCELED)) { + log.error("Job [{}] was cancelled", jobName); + } + + return state.equals(JobState.FINISHED) + || state.equals(JobState.ERROR) + || state.equals(JobState.CANCELED); + } + + private JobState waitEncodeComplete(String jobName, String transformName) throws InterruptedException { + log.info("Waiting for job [{}] to complete", jobName); + MkJob job = null; + do { + if (job != null) { + TimeUnit.MILLISECONDS.sleep(10000); + } + job = mediaKindClient.getJob(transformName, jobName); + } while (!hasJobCompleted(job)); + return job.getProperties().getState(); + } + + private String refreshStreamingLocatorForUser(String userId, String assetName) { + var now = OffsetDateTime.now(); + var streamingLocatorName = userId + "_" + assetName; + + // check streaming locator is still valid + try { + var locator = mediaKindClient.getStreamingLocator(streamingLocatorName); + if (locator.getProperties().getEndTime().toInstant().isAfter(now.toInstant())) { + return streamingLocatorName; + } + mediaKindClient.deleteStreamingLocator(streamingLocatorName); + } catch (NotFoundException e) { + // ignore + } + + mediaKindClient.createStreamingLocator( + streamingLocatorName, + MkStreamingLocator.builder() + .properties( + MkStreamingLocatorProperties.builder() + .assetName(assetName) + .streamingPolicyName(STREAMING_POLICY_CLEAR_KEY) + .defaultContentKeyPolicyName(userId) + // set end time to midnight tonight + .endTime(Timestamp.from( + now.toLocalDate() + .atTime(LocalTime.MAX) + .atZone(now.getOffset()) + .toInstant() + )) + .build()) + .build() + ); + + return streamingLocatorName; + } + + private void assertStreamingPolicyExists(String defaultContentKeyPolicy) { + try { + mediaKindClient.getStreamingPolicy(MediaKind.STREAMING_POLICY_CLEAR_KEY); + } catch (NotFoundException e) { + log.info("Streaming policy {} was not found. Creating streaming policy.", + MediaKind.STREAMING_POLICY_CLEAR_KEY + ); + mediaKindClient.putStreamingPolicy( + STREAMING_POLICY_CLEAR_KEY, + MkStreamingPolicy.builder() + .properties( + MkStreamingPolicyProperties.builder() + .defaultContentKeyPolicyName(defaultContentKeyPolicy) + .envelopeEncryption( + new EnvelopeEncryption() + .withEnabledProtocols( + new EnabledProtocols() + .withDash(true) + .withHls(true) + .withSmoothStreaming(false) + .withDownload(false)) + .withContentKeys( + new StreamingPolicyContentKeys() + .withDefaultKey(new DefaultKey() + .withLabel("ContentKey_AES") + .withPolicyName(defaultContentKeyPolicy))) + ) + .build() + ) + .build() + ); + } + } + + private void createContentKeyPolicy(String userId, String key) { + try { + mediaKindClient.getContentKeyPolicy(userId); + } catch (NotFoundException e) { + mediaKindClient.putContentKeyPolicy(userId, MkContentKeyPolicy.builder() + .properties(MkContentKeyPolicyProperties.builder() + .description("Content key policy for user: " + userId) + .options( + List.of(MkContentKeyPolicyOptions.builder() + .name("key") + .restriction( + new ContentKeyPolicyTokenRestriction() + .withIssuer(issuer) + .withAudience(userId) + .withRestrictionTokenType( + ContentKeyPolicyRestrictionTokenType.JWT) + .withPrimaryVerificationKey( + new ContentKeyPolicySymmetricTokenKey() + .withKeyValue(key.getBytes()))) + .configuration(new ContentKeyPolicyClearKeyConfiguration()) + .build())) + .build()) + .build()); + } + } + + private MkLiveEvent getLiveEventMk(String liveEventName) { + try { + return mediaKindClient.getLiveEvent(liveEventName); + } catch (NotFoundException e) { + throw new NotFoundException("Live Event: " + liveEventName); + } + } + private void stopAndDeleteLiveEvent(String liveEventName) { try { mediaKindClient.stopLiveEvent(liveEventName); @@ -522,29 +592,6 @@ private String runEncodeTransform(String inputAssetName, return jobName; } - private JobState waitEncodeComplete(String jobName, String transformName) throws InterruptedException { - log.info("Waiting for job [{}] to complete", jobName); - MkJob job = null; - do { - if (job != null) { - TimeUnit.MILLISECONDS.sleep(10000); - } - job = mediaKindClient.getJob(transformName, jobName); - } while (!job.getProperties().getState().equals(JobState.FINISHED) - && !job.getProperties().getState().equals(JobState.ERROR) - && !job.getProperties().getState().equals(JobState.CANCELED)); - var state = job.getProperties().getState(); - if (state.equals(JobState.ERROR)) { - log.error("Job [{}] failed with error [{}]", - jobName, - job.getProperties().getOutputs().getLast().error().message()); - } else if (state.equals(JobState.CANCELED)) { - log.error("Job [{}] was cancelled", jobName); - } - - return job.getProperties().getState(); - } - private MkLiveEvent checkStreamReady(String liveEventName) throws InterruptedException { MkLiveEvent liveEvent; do { diff --git a/src/main/java/uk/gov/hmcts/reform/preapi/tasks/CleanupLiveEvents.java b/src/main/java/uk/gov/hmcts/reform/preapi/tasks/CleanupLiveEvents.java index c67da2bc1..f882f13c0 100644 --- a/src/main/java/uk/gov/hmcts/reform/preapi/tasks/CleanupLiveEvents.java +++ b/src/main/java/uk/gov/hmcts/reform/preapi/tasks/CleanupLiveEvents.java @@ -1,56 +1,72 @@ package uk.gov.hmcts.reform.preapi.tasks; import com.azure.resourcemanager.mediaservices.models.LiveEventResourceState; +import lombok.AllArgsConstructor; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.domain.Pageable; import org.springframework.stereotype.Component; -import uk.gov.hmcts.reform.preapi.controllers.params.SearchRecordings; import uk.gov.hmcts.reform.preapi.dto.CaptureSessionDTO; import uk.gov.hmcts.reform.preapi.dto.flow.StoppedLiveEventsNotificationDTO; +import uk.gov.hmcts.reform.preapi.dto.media.LiveEventDTO; import uk.gov.hmcts.reform.preapi.email.StopLiveEventNotifierFlowClient; import uk.gov.hmcts.reform.preapi.enums.RecordingStatus; import uk.gov.hmcts.reform.preapi.exception.NotFoundException; import uk.gov.hmcts.reform.preapi.media.IMediaService; +import uk.gov.hmcts.reform.preapi.media.MediaKind; import uk.gov.hmcts.reform.preapi.media.MediaServiceBroker; import uk.gov.hmcts.reform.preapi.security.service.UserAuthenticationService; import uk.gov.hmcts.reform.preapi.services.BookingService; import uk.gov.hmcts.reform.preapi.services.CaptureSessionService; -import uk.gov.hmcts.reform.preapi.services.RecordingService; import uk.gov.hmcts.reform.preapi.services.UserService; +import uk.gov.hmcts.reform.preapi.util.Batcher; +import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import static uk.gov.hmcts.reform.preapi.media.MediaResourcesHelper.getSanitisedLiveEventId; -@Component @Slf4j +@Component public class CleanupLiveEvents extends RobotUserTask { + private final BookingService bookingService; private final MediaServiceBroker mediaServiceBroker; private final CaptureSessionService captureSessionService; - private final BookingService bookingService; - private final RecordingService recordingService; private final StopLiveEventNotifierFlowClient stopLiveEventNotifierFlowClient; private final String platformEnv; + private final int batchSize; + private final int batchCooldownTime; + private final int jobPollingInterval; + + private final ConcurrentHashMap liveEventCleanupMap = new ConcurrentHashMap<>(); + @Autowired CleanupLiveEvents(MediaServiceBroker mediaServiceBroker, CaptureSessionService captureSessionService, BookingService bookingService, - RecordingService recordingService, UserService userService, UserAuthenticationService userAuthenticationService, + StopLiveEventNotifierFlowClient stopLiveEventNotifierFlowClient, @Value("${cron-user-email}") String cronUserEmail, @Value("${platform-env}") String platformEnv, - StopLiveEventNotifierFlowClient stopLiveEventNotifierFlowClient) { + @Value("${tasks.cleanup-live-events.batch-size}") int batchSize, + @Value("${tasks.cleanup-live-events.cooldown}") int batchCooldownTime, + @Value("${tasks.cleanup-live-events.job-poll-interval}") int jobPollingInterval) { super(userService, userAuthenticationService, cronUserEmail); this.mediaServiceBroker = mediaServiceBroker; this.captureSessionService = captureSessionService; this.bookingService = bookingService; - this.recordingService = recordingService; this.platformEnv = platformEnv; this.stopLiveEventNotifierFlowClient = stopLiveEventNotifierFlowClient; + this.batchSize = batchSize; + this.batchCooldownTime = batchCooldownTime; + this.jobPollingInterval = jobPollingInterval; } @Override @@ -60,134 +76,294 @@ public void run() throws RuntimeException { var mediaService = mediaServiceBroker.getEnabledMediaService(); - // Find all Live events currently running and stop and delete them along with their streaming endpoints and - // locators - var liveEvents = mediaService.getLiveEvents(); + // Find all Live events currently running + var liveEvents = mediaService.getLiveEvents() + .stream() + .filter(liveEventDTO -> liveEventDTO.getResourceState().equals(LiveEventResourceState.RUNNING.toString())) + .toList(); + + // handle invalid named live events (no capture session) + var invalidNamedEvents = liveEvents.stream() + .filter(event -> { + try { + captureSessionService.findByLiveEventId(event.getName()); + return false; + } catch (NotFoundException e) { + return true; + } + }) + .toList(); + cleanupLiveEventsInBatches(invalidNamedEvents); + + // Set up map liveEvents.stream() - .filter(liveEventDTO -> liveEventDTO - .getResourceState().equals(LiveEventResourceState.RUNNING.toString()) - ) - .forEach(liveEventDTO -> { - log.info("Finding capture session by live event id {}", liveEventDTO.getName()); - var sendNotification = false; - try { - var captureSession = captureSessionService.findByLiveEventId(liveEventDTO.getName()); - var search = new SearchRecordings(); - log.info("Finding recordings by capture session {}", captureSession.getId()); - search.setCaptureSessionId(captureSession.getId()); - var recordings = recordingService.findAll(search, false, Pageable.unpaged()); - - if (recordings.isEmpty()) { - log.info("No recordings found for capture session {}", captureSession.getId()); - sendNotification = stopLiveEvent( - mediaService, - captureSession, - UUID.randomUUID(), - liveEventDTO.getName() - ); - } else { - sendNotification = !recordings.map(recording -> { - log.info( - "{} recordings found for capture session {}", - recordings.getSize(), - captureSession.getId() - ); - return stopLiveEvent( - mediaService, - captureSession, - recording.getId(), - liveEventDTO.getName() - ); - }) - .filter(b -> b) - .toList() - .isEmpty(); - } - - if (sendNotification) { - try { - var booking = bookingService.findById(captureSession.getBookingId()); - - var toNotify = booking.getShares().stream() - .map(shareBooking -> userService.findById( - shareBooking.getSharedWithUser().getId()) - ) - .map(u -> StoppedLiveEventsNotificationDTO - .builder() - .email(u.getEmail()) - .firstName(u.getFirstName()) - .caseReference(booking.getCaseDTO().getReference()) - .courtName(booking.getCaseDTO().getCourt().getName()) - .build()) - .toList(); - if (!toNotify.isEmpty()) { - log.info("Sending email notifications to {} user(s)", toNotify.size()); - stopLiveEventNotifierFlowClient.emailAfterStoppingLiveEvents(toNotify); - } else { - log.info("No users to notify for capture session {}", captureSession.getId()); - } - } catch (NotFoundException e) { - log.error(e.getMessage()); - } - } - } catch (NotFoundException e) { - if (platformEnv.equals("Production")) { - log.error("Error stopping live event {}", liveEventDTO.getName(), e); - return; - } - log.info("Stopping live event without associated capture session: {}", - liveEventDTO.getName()); - mediaService.cleanupStoppedLiveEvent(liveEventDTO.getName()); - } catch (Exception e) { - log.error("Error stopping live event {}", liveEventDTO.getName(), e); - } - }); + .filter(event -> { + try { + captureSessionService.findByLiveEventId(event.getName()); + return true; + } catch (NotFoundException e) { + return false; + } + }) + .forEach(liveEventDTO -> { + liveEventCleanupMap.put( + generateUuidFromLiveEventName(liveEventDTO.getName()), + new CleanupTask(CleanupTaskStatus.QUEUED, null, UUID.randomUUID()) + ); + }); + + // Thread 1 - Cleanup Live Events in batches + var threadCleanupLiveEvents = new Thread(() -> cleanupLiveEventsInBatches(liveEvents)); + + // Thread 2 - Trigger and await processing of live event assets + var threadProcessLiveEventAssets = new Thread(this::processLiveEvents); + + threadCleanupLiveEvents.start(); + threadProcessLiveEventAssets.start(); + try { + threadCleanupLiveEvents.join(); + threadProcessLiveEventAssets.join(); + } catch (InterruptedException e) { + log.error("Clean up live event thread interrupted", e); + throw new RuntimeException(e); + } + + // Notify + log.info("Notifying shares for new recordings"); + liveEventCleanupMap.entrySet().stream() + .filter(entry -> entry.getValue().getStatus() == CleanupTaskStatus.RECORDING_AVAILABLE) + .map(Map.Entry::getKey) + .map(captureSessionService::findById) + .forEach(this::notify); log.info("Completed CleanupLiveEvents task"); } - private boolean stopLiveEvent(IMediaService mediaService, - CaptureSessionDTO captureSession, - UUID recordingId, - String liveEventId) { - try { - // This shouldn't happen but as this is the cleanup cron we do want to ensure the Live Events in AMS/MK - // are terminated and cleaned up. - // A manual process will be needed to investigate why the CaptureSession is in an unexpected state. - if (captureSession.getStatus() != RecordingStatus.STANDBY - && captureSession.getStatus() != RecordingStatus.RECORDING - && captureSession.getStatus() != RecordingStatus.PROCESSING) { - log.error( - "CaptureSession {} is in an unexpected state: {}", - captureSession.getId(), - captureSession.getStatus() + private void processLiveEvents() { + signInRobotUser(); + var mediaService = mediaServiceBroker.getEnabledMediaService(); + + do { + try { + Thread.sleep(jobPollingInterval); + } catch (InterruptedException e) { + // Do nothing + } + + liveEventCleanupMap.entrySet() + .stream() + .filter(entry -> + entry.getValue().getStatus() != CleanupTaskStatus.QUEUED + && entry.getValue().getStatus() != CleanupTaskStatus.NO_RECORDING + && entry.getValue().getStatus() != CleanupTaskStatus.RECORDING_AVAILABLE) + .forEach(entry -> { + switch (entry.getValue().getStatus()) { + case READY -> { + onCaptureSessionReady(entry, mediaService); + } + case PROCESSING_1 -> { + onCaptureSessionProcessingStep1(entry, mediaService); + } + case PROCESSING_2 -> { + onCaptureSessionProcessingStep2(entry, mediaService); + } + default -> { + // Do nothing + } + } + log.info( + "Live event cleanup task status after check: Capture Session: {}, Status: {}, Current Job: {}", + entry.getKey(), + entry.getValue().getStatus(), + entry.getValue().getCurrentJobName() + ); + }); + + } while (!liveEventCleanupMap.entrySet() + .stream() + .allMatch(entry -> + entry.getValue().getStatus() == CleanupTaskStatus.NO_RECORDING + || entry.getValue().getStatus() == CleanupTaskStatus.RECORDING_AVAILABLE)); + } + + private void onCaptureSessionReady(Map.Entry task, IMediaService mediaService) { + var captureSessionId = task.getKey(); + var currentTask = task.getValue(); + + var captureSession = captureSessionService.findById(captureSessionId); + var jobName = mediaService.triggerProcessingStep1( + captureSession, + getSanitisedLiveEventId(captureSessionId), + currentTask.getRecordingId() + ); + + if (jobName == null) { + currentTask.setStatus(CleanupTaskStatus.NO_RECORDING); + captureSessionService.stopCaptureSession( + captureSessionId, + RecordingStatus.NO_RECORDING, + currentTask.getRecordingId() + ); + return; + } + + currentTask.setCurrentJobName(jobName); + currentTask.setStatus(CleanupTaskStatus.PROCESSING_1); + } + + private void onCaptureSessionProcessingStep1(Map.Entry task, IMediaService mediaService) { + var captureSessionId = task.getKey(); + var currentTask = task.getValue(); + + switch (mediaService.hasJobCompleted(MediaKind.ENCODE_FROM_INGEST_TRANSFORM, currentTask.getCurrentJobName())) { + case RECORDING_AVAILABLE -> { + // trigger processing step 2 + var jobName = mediaService.triggerProcessingStep2(currentTask.getRecordingId()); + + if (jobName == null) { + log.error("Failed to trigger processing step 2 for capture session {}", captureSessionId); + captureSessionService.stopCaptureSession( + captureSessionId, + RecordingStatus.FAILURE, + currentTask.getRecordingId() + ); + currentTask.setStatus(CleanupTaskStatus.NO_RECORDING); + return; + } + currentTask.setCurrentJobName(jobName); + currentTask.setStatus(CleanupTaskStatus.PROCESSING_2); + } + case FAILURE -> { + log.error("Processing job {} failed for capture session {}", + currentTask.getCurrentJobName(), + captureSessionId); + captureSessionService.stopCaptureSession( + captureSessionId, + RecordingStatus.FAILURE, + currentTask.getRecordingId() + ); + currentTask.setStatus(CleanupTaskStatus.NO_RECORDING); + } + default -> { + // still processing + } + } + } + + private void onCaptureSessionProcessingStep2(Map.Entry task, IMediaService mediaService) { + var captureSessionId = task.getKey(); + var currentTask = task.getValue(); + + switch (mediaService.hasJobCompleted(MediaKind.ENCODE_FROM_MP4_TRANSFORM, currentTask.getCurrentJobName())) { + case RECORDING_AVAILABLE -> { + // verify final asset exists + if (mediaService.verifyFinalAssetExists(currentTask.getRecordingId()).equals(RecordingStatus.FAILURE)) { + log.error("Final asset not found for capture session {}", captureSessionId); + captureSessionService.stopCaptureSession( + captureSessionId, + RecordingStatus.FAILURE, + currentTask.getRecordingId() + ); + currentTask.setStatus(CleanupTaskStatus.NO_RECORDING); + return; + } + log.info("Final asset found for capture session {}", captureSessionId); + captureSessionService.stopCaptureSession( + captureSessionId, + RecordingStatus.RECORDING_AVAILABLE, + currentTask.getRecordingId() ); - mediaService.cleanupStoppedLiveEvent(liveEventId); - return false; + currentTask.setStatus(CleanupTaskStatus.RECORDING_AVAILABLE); } - var updatedCaptureSession = captureSessionService.stopCaptureSession( + case FAILURE -> { + log.error("Processing job {} failed for capture session {}", + currentTask.getCurrentJobName(), + captureSessionId); + captureSessionService.stopCaptureSession( + captureSessionId, + RecordingStatus.FAILURE, + currentTask.getRecordingId() + ); + currentTask.setStatus(CleanupTaskStatus.NO_RECORDING); + } + default -> { + // still processing + } + } + } + + private void cleanupLiveEventsInBatches(List liveEvents) { + signInRobotUser(); + Batcher.batchProcess( + liveEvents, + batchSize, + liveEventDTO -> { + var liveEventName = liveEventDTO.getName(); + log.info("Cleaning up live event {}", liveEventName); + cleanupLiveEvent(mediaServiceBroker.getEnabledMediaService(), liveEventName); + }, + batch -> { + try { + Thread.sleep(batchCooldownTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + ); + } + + private void cleanupLiveEvent(IMediaService mediaService, String liveEventName) { + try { + var captureSession = captureSessionService.findByLiveEventId(liveEventName); + log.info("Capture session {} is processing", captureSession.getId()); + captureSession = captureSessionService.stopCaptureSession( captureSession.getId(), RecordingStatus.PROCESSING, - recordingId + null ); - log.info("Stopping live event {}", liveEventId); - var status = mediaService.stopLiveEvent(updatedCaptureSession, recordingId); - var stoppedCaptureSession = captureSessionService.stopCaptureSession( - updatedCaptureSession.getId(), - status, - recordingId - ); - log.info("Stopped live event {}", liveEventId); - return stoppedCaptureSession.getStatus().equals(RecordingStatus.RECORDING_AVAILABLE); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - log.error("Failed to stop live event for capture session {}", captureSession.getId(), e); - captureSessionService.stopCaptureSession(captureSession.getId(), RecordingStatus.FAILURE, recordingId); - return false; + mediaService.cleanupStoppedLiveEvent(liveEventName); + var currentTask = liveEventCleanupMap.get(captureSession.getId()); + currentTask.setStatus(CleanupTaskStatus.READY); + } catch (NotFoundException e) { + if (platformEnv.equals("Production")) { + log.error("Error stopping live event {}", liveEventName, e); + return; + } + log.info("Stopping live event without associated capture session: {}", liveEventName); + try { + mediaService.cleanupStoppedLiveEvent(liveEventName); + } catch (Exception ex) { + // Do nothing + } } catch (Exception e) { - log.error("Failed to stop live event for capture session {}", captureSession.getId(), e); - captureSessionService.stopCaptureSession(captureSession.getId(), RecordingStatus.FAILURE, recordingId); - return false; + log.error("Error stopping live event {}", liveEventName, e); + } + } + + private void notify(CaptureSessionDTO captureSession) { + try { + var booking = bookingService.findById(captureSession.getBookingId()); + + var toNotify = booking.getShares().stream() + .map(shareBooking -> userService.findById( + shareBooking.getSharedWithUser().getId()) + ) + .map(u -> StoppedLiveEventsNotificationDTO + .builder() + .email(u.getEmail()) + .firstName(u.getFirstName()) + .caseReference(booking.getCaseDTO().getReference()) + .courtName(booking.getCaseDTO().getCourt().getName()) + .build()) + .toList(); + if (!toNotify.isEmpty()) { + log.info("Sending email notifications to {} user(s)", toNotify.size()); + stopLiveEventNotifierFlowClient.emailAfterStoppingLiveEvents(toNotify); + } else { + log.info("No users to notify for capture session {}", captureSession.getId()); + } + } catch (NotFoundException e) { + log.error(e.getMessage()); } } @@ -197,4 +373,21 @@ private UUID generateUuidFromLiveEventName(String liveEventId) { Long.parseUnsignedLong(liveEventId.substring(16), 16) ); } + + @Data + @AllArgsConstructor + private static class CleanupTask { + private CleanupTaskStatus status; + private String currentJobName; + private UUID recordingId; + } + + private enum CleanupTaskStatus { + QUEUED, + READY, + PROCESSING_1, + PROCESSING_2, + NO_RECORDING, + RECORDING_AVAILABLE, + } } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 47ee564d7..f10edd25b 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -107,6 +107,10 @@ flow: tasks: start-live-event: batch-size: ${TASK_START_LIVE_EVENTS_BATCH_SIZE:20} + cleanup-live-events: + batch-size: ${TASK_CLEANUP_LIVE_EVENTS_BATCH_SIZE:3} + cooldown: ${TASK_CLEANUP_LIVE_EVENTS_COOLDOWN:60000} # one minute + job-poll-interval: 10000 # 10 seconds #logging: # level: diff --git a/src/test/java/uk/gov/hmcts/reform/preapi/media/MediaKindTest.java b/src/test/java/uk/gov/hmcts/reform/preapi/media/MediaKindTest.java index db3658855..023420dfb 100644 --- a/src/test/java/uk/gov/hmcts/reform/preapi/media/MediaKindTest.java +++ b/src/test/java/uk/gov/hmcts/reform/preapi/media/MediaKindTest.java @@ -519,11 +519,11 @@ void stopLiveEventRecordingAvailable() throws InterruptedException { var jobArgument3 = ArgumentCaptor.forClass(String.class); var jobArgument4 = ArgumentCaptor.forClass(String.class); verify(mockClient, times(1)).putJob(eq(ENCODE_FROM_INGEST_TRANSFORM), jobArgument.capture(), any(MkJob.class)); - verify(mockClient, times(2)).getJob(eq(ENCODE_FROM_INGEST_TRANSFORM), jobArgument2.capture()); + verify(mockClient, times(3)).getJob(eq(ENCODE_FROM_INGEST_TRANSFORM), jobArgument2.capture()); assertThat(jobArgument.getValue()).startsWith(liveEventName); assertThat(jobArgument2.getValue()).startsWith(liveEventName); verify(mockClient, times(1)).putJob(eq(ENCODE_FROM_MP4_TRANSFORM), jobArgument3.capture(), any(MkJob.class)); - verify(mockClient, times(2)).getJob(eq(ENCODE_FROM_MP4_TRANSFORM), jobArgument4.capture()); + verify(mockClient, times(3)).getJob(eq(ENCODE_FROM_MP4_TRANSFORM), jobArgument4.capture()); assertThat(jobArgument3.getValue()).startsWith(tempName); assertThat(jobArgument4.getValue()).startsWith(tempName); } @@ -610,7 +610,7 @@ void stopLiveEventRecordingFoundRunEncodeTransform() throws InterruptedException var jobArgument = ArgumentCaptor.forClass(String.class); var jobArgument2 = ArgumentCaptor.forClass(String.class); verify(mockClient, times(1)).putJob(eq(ENCODE_FROM_INGEST_TRANSFORM), jobArgument.capture(), any(MkJob.class)); - verify(mockClient, times(2)).getJob(eq(ENCODE_FROM_INGEST_TRANSFORM), jobArgument2.capture()); + verify(mockClient, times(3)).getJob(eq(ENCODE_FROM_INGEST_TRANSFORM), jobArgument2.capture()); assertThat(jobArgument.getValue()).startsWith(liveEventName); assertThat(jobArgument2.getValue()).startsWith(liveEventName); verify(mockClient, times(1)).stopLiveEvent(liveEventName); @@ -656,7 +656,7 @@ void stopLiveEventRecordingFoundRunEncodeTransform2() throws InterruptedExceptio var jobArgument = ArgumentCaptor.forClass(String.class); var jobArgument2 = ArgumentCaptor.forClass(String.class); verify(mockClient, times(1)).putJob(eq(ENCODE_FROM_INGEST_TRANSFORM), jobArgument.capture(), any(MkJob.class)); - verify(mockClient, times(2)).getJob(eq(ENCODE_FROM_INGEST_TRANSFORM), jobArgument2.capture()); + verify(mockClient, times(3)).getJob(eq(ENCODE_FROM_INGEST_TRANSFORM), jobArgument2.capture()); assertThat(jobArgument.getValue()).startsWith(liveEventName); assertThat(jobArgument2.getValue()).startsWith(liveEventName); verify(mockClient, times(1)).stopLiveEvent(liveEventName); diff --git a/src/test/java/uk/gov/hmcts/reform/preapi/tasks/CleanupLiveEventsTest.java b/src/test/java/uk/gov/hmcts/reform/preapi/tasks/CleanupLiveEventsTest.java index 1e48ab052..29b357281 100644 --- a/src/test/java/uk/gov/hmcts/reform/preapi/tasks/CleanupLiveEventsTest.java +++ b/src/test/java/uk/gov/hmcts/reform/preapi/tasks/CleanupLiveEventsTest.java @@ -1,27 +1,16 @@ package uk.gov.hmcts.reform.preapi.tasks; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentCaptor; -import org.springframework.data.domain.PageImpl; -import org.springframework.data.domain.Pageable; -import uk.gov.hmcts.reform.preapi.controllers.params.SearchRecordings; import uk.gov.hmcts.reform.preapi.dto.AccessDTO; import uk.gov.hmcts.reform.preapi.dto.BookingDTO; import uk.gov.hmcts.reform.preapi.dto.CaptureSessionDTO; import uk.gov.hmcts.reform.preapi.dto.CaseDTO; import uk.gov.hmcts.reform.preapi.dto.CourtDTO; -import uk.gov.hmcts.reform.preapi.dto.RecordingDTO; import uk.gov.hmcts.reform.preapi.dto.ShareBookingDTO; import uk.gov.hmcts.reform.preapi.dto.UserDTO; import uk.gov.hmcts.reform.preapi.dto.base.BaseAppAccessDTO; -import uk.gov.hmcts.reform.preapi.dto.base.BaseUserDTO; -import uk.gov.hmcts.reform.preapi.dto.flow.StoppedLiveEventsNotificationDTO; -import uk.gov.hmcts.reform.preapi.dto.media.AssetDTO; import uk.gov.hmcts.reform.preapi.dto.media.LiveEventDTO; import uk.gov.hmcts.reform.preapi.email.StopLiveEventNotifierFlowClient; import uk.gov.hmcts.reform.preapi.enums.RecordingStatus; @@ -32,15 +21,14 @@ import uk.gov.hmcts.reform.preapi.security.service.UserAuthenticationService; import uk.gov.hmcts.reform.preapi.services.BookingService; import uk.gov.hmcts.reform.preapi.services.CaptureSessionService; -import uk.gov.hmcts.reform.preapi.services.RecordingService; import uk.gov.hmcts.reform.preapi.services.UserService; -import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.UUID; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; @@ -51,11 +39,9 @@ import static org.mockito.Mockito.when; public class CleanupLiveEventsTest { - private static MediaServiceBroker mediaServiceBroker; private static CaptureSessionService captureSessionService; private static BookingService bookingService; - private static RecordingService recordingService; private static MediaKind mediaService; private static UserService userService; private static UserAuthenticationService userAuthenticationService; @@ -63,18 +49,22 @@ public class CleanupLiveEventsTest { private static final String CRON_USER_EMAIL = "test@test.com"; private static final String CRON_PLATFORM_ENV = "Staging"; + private static final int BATCH_SIZE = 3; + private static final int BATCH_COOLDOWN = 100; + private static final int POLLING_INTERVAL = 100; @BeforeEach void beforeEach() { mediaServiceBroker = mock(MediaServiceBroker.class); captureSessionService = mock(CaptureSessionService.class); - recordingService = mock(RecordingService.class); mediaService = mock(MediaKind.class); userService = mock(UserService.class); userAuthenticationService = mock(UserAuthenticationService.class); bookingService = mock(BookingService.class); stopLiveEventNotifierFlowClient = mock(StopLiveEventNotifierFlowClient.class); + when(mediaServiceBroker.getEnabledMediaService()).thenReturn(mediaService); + var accessDto = mock(AccessDTO.class); var baseAppAccessDTO = mock(BaseAppAccessDTO.class); when(baseAppAccessDTO.getId()).thenReturn(UUID.randomUUID()); @@ -86,397 +76,163 @@ void beforeEach() { when(userAuthenticationService.validateUser(any())).thenReturn(Optional.ofNullable(userAuth)); } - @DisplayName("Test CleanupLiveEvents run method") @Test - @SuppressWarnings({"checkstyle:VariableDeclarationUsageDistance", "unchecked", "checkstyle:LineLength"}) - public void testRun() throws InterruptedException, JsonProcessingException { + @DisplayName("CleanupLiveEvents when no live events are running") + public void runNoLiveEvents() { + var cleanupLiveEvents = createCleanupLiveEventsTask(); + when(mediaService.getLiveEvents()).thenReturn(List.of()); - var captureSessionId = UUID.randomUUID(); - var liveEventDTO = new LiveEventDTO(); - liveEventDTO.setId(captureSessionId.toString().replace("-", "")); - liveEventDTO.setName(liveEventDTO.getId()); - liveEventDTO.setResourceState("Running"); - List liveEventDTOList = new ArrayList<>(); - liveEventDTOList.add(liveEventDTO); - when(mediaServiceBroker.getEnabledMediaService()).thenReturn(mediaService); - when(mediaService.getLiveEvents()).thenReturn(liveEventDTOList); - - var bookingId = UUID.randomUUID(); - - var mockCaptureSession = new CaptureSessionDTO(); - mockCaptureSession.setId(captureSessionId); - mockCaptureSession.setBookingId(bookingId); - mockCaptureSession.setStatus(RecordingStatus.RECORDING); - - var mockRecording = new RecordingDTO(); - mockRecording.setId(UUID.randomUUID()); - - var mockRecording2 = new RecordingDTO(); - mockRecording2.setId(UUID.randomUUID()); - - var mockBaseUser = new BaseUserDTO(); - mockBaseUser.setId(UUID.randomUUID()); - mockBaseUser.setFirstName("Foo"); - mockBaseUser.setEmail("foo@bar.org"); - - var mockUser = new UserDTO(); - mockUser.setId(UUID.randomUUID()); - mockUser.setFirstName("Foo"); - mockUser.setEmail("foo@bar.org"); - - var mockShareBooking = new ShareBookingDTO(); - mockShareBooking.setId(UUID.randomUUID()); - mockShareBooking.setSharedWithUser(mockBaseUser); - - var mockCourt = new CourtDTO(); - mockCourt.setName("Test Court"); - - var mockCaseDTO = new CaseDTO(); - mockCaseDTO.setReference("123456"); - mockCaseDTO.setCourt(mockCourt); - - var mockBooking = new BookingDTO(); - mockBooking.setId(bookingId); - mockBooking.setShares(List.of( - mockShareBooking - )); - mockBooking.setCaseDTO(mockCaseDTO); - - var mockCaptureSessionProcessing = new CaptureSessionDTO(); - mockCaptureSessionProcessing.setId(captureSessionId); - mockCaptureSessionProcessing.setBookingId(bookingId); - mockCaptureSessionProcessing.setStatus(RecordingStatus.PROCESSING); - - var mockCaptureSessionRecordingAvailable = new CaptureSessionDTO(); - mockCaptureSessionProcessing.setId(captureSessionId); - mockCaptureSessionProcessing.setBookingId(bookingId); - mockCaptureSessionProcessing.setStatus(RecordingStatus.RECORDING_AVAILABLE); - - when(captureSessionService.findByLiveEventId(liveEventDTO.getName())) - .thenReturn(mockCaptureSession, mockCaptureSessionRecordingAvailable); - when(recordingService.findAll(any(SearchRecordings.class), eq(false), eq(Pageable.unpaged()))) - .thenReturn(new PageImpl<>(List.of(mockRecording, mockRecording2))); - - when(captureSessionService.stopCaptureSession(captureSessionId, - RecordingStatus.PROCESSING, - mockRecording.getId())) - .thenReturn(mockCaptureSessionProcessing); - when(captureSessionService.stopCaptureSession(captureSessionId, - RecordingStatus.PROCESSING, - mockRecording2.getId())) - .thenReturn(mockCaptureSessionProcessing); - - when(mediaService.stopLiveEvent(mockCaptureSessionProcessing, mockRecording.getId())) - .thenReturn(RecordingStatus.RECORDING_AVAILABLE); - when(mediaService.stopLiveEvent(mockCaptureSessionProcessing, mockRecording2.getId())) - .thenReturn(RecordingStatus.RECORDING_AVAILABLE); - - when(captureSessionService.stopCaptureSession(captureSessionId, - RecordingStatus.RECORDING_AVAILABLE, - mockRecording.getId())) - .thenReturn(mockCaptureSessionProcessing); - when(captureSessionService.stopCaptureSession(captureSessionId, - RecordingStatus.RECORDING_AVAILABLE, - mockRecording2.getId())) - .thenReturn(mockCaptureSessionProcessing); - - when(bookingService.findById(bookingId)).thenReturn(mockBooking); - - when(userService.findById(mockShareBooking.getSharedWithUser().getId())).thenReturn(mockUser); - - CleanupLiveEvents cleanupLiveEvents = new CleanupLiveEvents(mediaServiceBroker, - captureSessionService, - bookingService, - recordingService, - userService, - userAuthenticationService, - CRON_USER_EMAIL, - CRON_PLATFORM_ENV, - stopLiveEventNotifierFlowClient); - - cleanupLiveEvents.run(); - - verify(mediaServiceBroker, times(1)).getEnabledMediaService(); - verify(mediaService, times(1)).getLiveEvents(); + assertDoesNotThrow(cleanupLiveEvents::run); - ArgumentCaptor captureSessionCaptor = ArgumentCaptor.forClass(CaptureSessionDTO.class); - ArgumentCaptor captureSessionCaptor2 = ArgumentCaptor.forClass(CaptureSessionDTO.class); - - verify(mediaService, times(1)).stopLiveEvent(captureSessionCaptor.capture(), eq(mockRecording.getId())); - verify(mediaService, times(1)).stopLiveEvent(captureSessionCaptor2.capture(), eq(mockRecording2.getId())); + verify(mediaService, times(1)).getLiveEvents(); + } - Assertions.assertEquals(captureSessionId, captureSessionCaptor.getValue().getId()); - Assertions.assertEquals(captureSessionId, captureSessionCaptor2.getValue().getId()); + @Test + @DisplayName("Should clean up live events that are capture sessions in non-production environments") + public void runMissingCaptureSessionInNonProd() { + var liveEvent = new LiveEventDTO(); + liveEvent.setName("something"); + liveEvent.setResourceState("Running"); - Class> listClass = - (Class>)(Class)List.class; - ArgumentCaptor> captor = ArgumentCaptor.forClass(listClass); + when(mediaService.getLiveEvents()).thenReturn(List.of(liveEvent)); + doThrow(NotFoundException.class).when(captureSessionService).findByLiveEventId(liveEvent.getName()); - verify(stopLiveEventNotifierFlowClient, times(1)).emailAfterStoppingLiveEvents(captor.capture()); + var cleanupLiveEvents = createCleanupLiveEventsTask(); - Assertions.assertEquals(mockUser.getFirstName(), captor.getValue().getFirst().getFirstName()); + assertDoesNotThrow(cleanupLiveEvents::run); - var om = new ObjectMapper(); - Assertions.assertEquals( - "[{\"email\":\"foo@bar.org\",\"first_name\":\"Foo\",\"case_reference\":\"123456\",\"court_name\":\"Test Court\"}]", - om.writeValueAsString(captor.getValue())); + verify(mediaService, times(1)).getLiveEvents(); + verify(captureSessionService, times(4)).findByLiveEventId(liveEvent.getName()); + verify(mediaService, times(2)).cleanupStoppedLiveEvent(liveEvent.getName()); } - @DisplayName("Test CleanupLiveEvents with Capture Session in wrong state to encode") @Test - @SuppressWarnings({"checkstyle:VariableDeclarationUsageDistance"}) - public void testCaptureSessionInUnexpectedState() { - var captureSessionId = UUID.randomUUID(); - var liveEventDTO = new LiveEventDTO(); - liveEventDTO.setId(captureSessionId.toString().replace("-", "")); - liveEventDTO.setName(liveEventDTO.getId()); - liveEventDTO.setResourceState("Running"); - List liveEventDTOList = new ArrayList<>(); - liveEventDTOList.add(liveEventDTO); - when(mediaServiceBroker.getEnabledMediaService()).thenReturn(mediaService); - when(mediaService.getLiveEvents()).thenReturn(liveEventDTOList); - - var bookingId = UUID.randomUUID(); - - var mockCaptureSession = new CaptureSessionDTO(); - mockCaptureSession.setId(captureSessionId); - mockCaptureSession.setBookingId(bookingId); - mockCaptureSession.setStatus(RecordingStatus.RECORDING_AVAILABLE); - - var mockRecording = new RecordingDTO(); - mockRecording.setId(UUID.randomUUID()); - - var mockRecording2 = new RecordingDTO(); - mockRecording2.setId(UUID.randomUUID()); - - var mockBaseUser = new BaseUserDTO(); - mockBaseUser.setId(UUID.randomUUID()); - mockBaseUser.setFirstName("Foo"); - mockBaseUser.setEmail("foo@bar.org"); - - var mockUser = new UserDTO(); - mockUser.setId(UUID.randomUUID()); - mockUser.setFirstName("Foo"); - mockUser.setEmail("foo@bar.org"); - - var mockShareBooking = new ShareBookingDTO(); - mockShareBooking.setId(UUID.randomUUID()); - mockShareBooking.setSharedWithUser(mockBaseUser); - - var mockCourt = new CourtDTO(); - mockCourt.setName("Test Court"); - - var mockCaseDTO = new CaseDTO(); - mockCaseDTO.setReference("123456"); - mockCaseDTO.setCourt(mockCourt); - - var mockBooking = new BookingDTO(); - mockBooking.setId(bookingId); - mockBooking.setShares(List.of( - mockShareBooking - )); - mockBooking.setCaseDTO(mockCaseDTO); - - var mockCaptureSessionProcessing = new CaptureSessionDTO(); - mockCaptureSessionProcessing.setId(captureSessionId); - mockCaptureSessionProcessing.setBookingId(bookingId); - mockCaptureSessionProcessing.setStatus(RecordingStatus.PROCESSING); - - var mockCaptureSessionRecordingAvailable = new CaptureSessionDTO(); - mockCaptureSessionProcessing.setId(captureSessionId); - mockCaptureSessionProcessing.setBookingId(bookingId); - mockCaptureSessionProcessing.setStatus(RecordingStatus.RECORDING_AVAILABLE); - - when(captureSessionService.findByLiveEventId(liveEventDTO.getName())) - .thenReturn(mockCaptureSession, mockCaptureSessionRecordingAvailable); - when(recordingService.findAll(any(SearchRecordings.class), eq(false), eq(Pageable.unpaged()))) - .thenReturn(new PageImpl<>(List.of(mockRecording, mockRecording2))); - - when(captureSessionService.stopCaptureSession(captureSessionId, - RecordingStatus.PROCESSING, - mockRecording.getId())) - .thenReturn(mockCaptureSessionProcessing); - when(captureSessionService.stopCaptureSession(captureSessionId, - RecordingStatus.PROCESSING, - mockRecording2.getId())) - .thenReturn(mockCaptureSessionProcessing); - - when(bookingService.findById(bookingId)).thenReturn(mockBooking); - - when(userService.findById(mockShareBooking.getSharedWithUser().getId())).thenReturn(mockUser); - - CleanupLiveEvents cleanupLiveEvents = new CleanupLiveEvents(mediaServiceBroker, - captureSessionService, - bookingService, - recordingService, - userService, - userAuthenticationService, - CRON_USER_EMAIL, - CRON_PLATFORM_ENV, - stopLiveEventNotifierFlowClient); - - cleanupLiveEvents.run(); - - verify(stopLiveEventNotifierFlowClient, times(0)).emailAfterStoppingLiveEvents(any()); + @DisplayName("Should not clean up live events that are capture sessions in production environments") + public void runMissingCaptureSessionInProd() { + var liveEvent = new LiveEventDTO(); + liveEvent.setName("something"); + liveEvent.setResourceState("Running"); + + when(mediaService.getLiveEvents()).thenReturn(List.of(liveEvent)); + doThrow(NotFoundException.class).when(captureSessionService).findByLiveEventId(liveEvent.getName()); + + var cleanupLiveEvents = new CleanupLiveEvents( + mediaServiceBroker, + captureSessionService, + bookingService, + userService, + userAuthenticationService, + stopLiveEventNotifierFlowClient, + CRON_USER_EMAIL, + "Production", + BATCH_SIZE, + BATCH_COOLDOWN, + POLLING_INTERVAL + ); + + assertDoesNotThrow(cleanupLiveEvents::run); + verify(mediaService, times(1)).getLiveEvents(); + verify(captureSessionService, times(4)).findByLiveEventId(liveEvent.getName()); + verify(mediaService, never()).cleanupStoppedLiveEvent(liveEvent.getName()); } - @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance") - @DisplayName("Test CleanupLiveEvents run method when InterruptedException is thrown") @Test - public void runInterruptedExceptionTest() throws InterruptedException { - + void shouldProcessLiveEventAndTriggerNotifications() { var captureSessionId = UUID.randomUUID(); var liveEventDTO = new LiveEventDTO(); - liveEventDTO.setId(captureSessionId.toString().replace("-", "")); - liveEventDTO.setName(liveEventDTO.getId()); + liveEventDTO.setName(captureSessionId.toString().replace("-", "")); liveEventDTO.setResourceState("Running"); - List liveEventDTOList = new ArrayList<>(); - liveEventDTOList.add(liveEventDTO); var captureSession = new CaptureSessionDTO(); captureSession.setId(captureSessionId); - captureSession.setStatus(RecordingStatus.STANDBY); - - when(mediaServiceBroker.getEnabledMediaService()).thenReturn(mediaService); - when(mediaService.getLiveEvents()).thenReturn(liveEventDTOList); - when(captureSessionService.findByLiveEventId(any())).thenReturn(captureSession); - - var mockRecording = new RecordingDTO(); - mockRecording.setId(UUID.randomUUID()); + captureSession.setBookingId(UUID.randomUUID()); + + when(captureSessionService.findByLiveEventId(liveEventDTO.getName())).thenReturn(captureSession); + when(captureSessionService.findById(captureSessionId)).thenReturn(captureSession); + when(captureSessionService.stopCaptureSession(captureSessionId, RecordingStatus.PROCESSING, null)) + .thenReturn(captureSession); + + var court = new CourtDTO(); + court.setName("Test Court"); + var aCase = new CaseDTO(); + aCase.setReference("123456"); + aCase.setCourt(court); + var booking = new BookingDTO(); + booking.setId(captureSession.getBookingId()); + booking.setCaseDTO(aCase); + + var user = new UserDTO(); + user.setId(UUID.randomUUID()); + user.setEmail("test@example.com"); + user.setFirstName("Test"); + when(userService.findById(user.getId())).thenReturn(user); + var share = new ShareBookingDTO(); + share.setSharedWithUser(user); + booking.setShares(List.of(share)); + + when(bookingService.findById(booking.getId())).thenReturn(booking); - var mockRecording2 = new RecordingDTO(); - mockRecording2.setId(UUID.randomUUID()); - - when(mediaService.stopLiveEvent(captureSession, mockRecording.getId())) - .thenThrow(InterruptedException.class); - - when(recordingService.findAll(any(SearchRecordings.class), eq(false), eq(Pageable.unpaged()))) - .thenReturn(new PageImpl<>(List.of(mockRecording, mockRecording2))); + when(mediaService.getLiveEvents()).thenReturn(List.of(liveEventDTO)); + when(captureSessionService.findByLiveEventId(liveEventDTO.getName())).thenReturn(captureSession); + when(mediaService.triggerProcessingStep1(any(), any(), any())).thenReturn("job1"); + when(mediaService.hasJobCompleted(any(), eq("job1"))).thenReturn(RecordingStatus.RECORDING_AVAILABLE); + when(mediaService.triggerProcessingStep2(any())).thenReturn("job2"); + when(mediaService.hasJobCompleted(any(), eq("job2"))).thenReturn(RecordingStatus.RECORDING_AVAILABLE); + when(mediaService.verifyFinalAssetExists(any())).thenReturn(RecordingStatus.RECORDING_AVAILABLE); - CleanupLiveEvents cleanupLiveEvents = new CleanupLiveEvents(mediaServiceBroker, - captureSessionService, - bookingService, - recordingService, - userService, - userAuthenticationService, - CRON_USER_EMAIL, - CRON_PLATFORM_ENV, - stopLiveEventNotifierFlowClient); + var cleanupLiveEvents = createCleanupLiveEventsTask(); - cleanupLiveEvents.run(); + assertDoesNotThrow(cleanupLiveEvents::run); - verify(mediaServiceBroker, times(1)).getEnabledMediaService(); - verify(mediaService, times(1)).getLiveEvents(); verify(captureSessionService, times(1)) - .stopCaptureSession(captureSession.getId(), RecordingStatus.FAILURE, mockRecording.getId()); + .stopCaptureSession(eq(captureSessionId), eq(RecordingStatus.PROCESSING), any()); + verify(captureSessionService, times(1)) + .stopCaptureSession(eq(captureSessionId), eq(RecordingStatus.RECORDING_AVAILABLE), any()); + verify(stopLiveEventNotifierFlowClient, times(1)).emailAfterStoppingLiveEvents(any()); } - @DisplayName("Should stop live event when capture session cannot be found (only in non-prod)") @Test - void runStopLiveEventForMissingCaptureSession() { + void shouldHandleNoFileInIngestStorage() { var captureSessionId = UUID.randomUUID(); var liveEventDTO = new LiveEventDTO(); - liveEventDTO.setId(captureSessionId.toString().replace("-", "")); - liveEventDTO.setName(liveEventDTO.getId()); + liveEventDTO.setName(captureSessionId.toString().replace("-", "")); liveEventDTO.setResourceState("Running"); - when(mediaServiceBroker.getEnabledMediaService()).thenReturn(mediaService); - when(mediaService.getLiveEvents()).thenReturn(List.of(liveEventDTO)); - var mockBaseUser = new BaseUserDTO(); - mockBaseUser.setId(UUID.randomUUID()); - mockBaseUser.setFirstName("Foo"); - mockBaseUser.setEmail("foo@bar.org"); - - var mockUser = new UserDTO(); - mockUser.setId(UUID.randomUUID()); - mockUser.setFirstName("Foo"); - mockUser.setEmail("foo@bar.org"); - - - var mockCourt = new CourtDTO(); - mockCourt.setName("Test Court"); - - var mockCaseDTO = new CaseDTO(); - mockCaseDTO.setReference("123456"); - mockCaseDTO.setCourt(mockCourt); - - var bookingId = UUID.randomUUID(); - - var mockAsset = new AssetDTO(); - mockAsset.setName(captureSessionId.toString()); - mockAsset.setDescription(bookingId.toString()); - - when(mediaService.getAsset(captureSessionId.toString())).thenReturn(mockAsset); - - doThrow(NotFoundException.class).when(captureSessionService).findByLiveEventId(liveEventDTO.getName()); - - var cleanupLiveEvents = new CleanupLiveEvents(mediaServiceBroker, - captureSessionService, - bookingService, - recordingService, - userService, - userAuthenticationService, - CRON_USER_EMAIL, - CRON_PLATFORM_ENV, - stopLiveEventNotifierFlowClient); - - cleanupLiveEvents.run(); - - verify(mediaServiceBroker, times(1)).getEnabledMediaService(); - verify(mediaService, times(1)).getLiveEvents(); - verify(captureSessionService, times(1)).findByLiveEventId(liveEventDTO.getName()); - verify(recordingService, never()).findAll(any(), eq(false), any()); - verify(mediaService, times(1)).cleanupStoppedLiveEvent(liveEventDTO.getName()); - } + var captureSession = new CaptureSessionDTO(); + captureSession.setId(captureSessionId); + captureSession.setBookingId(UUID.randomUUID()); - @DisplayName("Should not stop live event when capture session cannot be found (in prod)") - @Test - void runStopLiveEventForMissingCaptureSessionInProduction() throws InterruptedException { - var captureSessionId = UUID.randomUUID(); - var liveEventDTO = new LiveEventDTO(); - liveEventDTO.setId(captureSessionId.toString().replace("-", "")); - liveEventDTO.setName(liveEventDTO.getId()); - liveEventDTO.setResourceState("Running"); - when(mediaServiceBroker.getEnabledMediaService()).thenReturn(mediaService); when(mediaService.getLiveEvents()).thenReturn(List.of(liveEventDTO)); + when(captureSessionService.findByLiveEventId(liveEventDTO.getName())).thenReturn(captureSession); + when(captureSessionService.findById(captureSessionId)).thenReturn(captureSession); + when(captureSessionService.stopCaptureSession(captureSessionId, RecordingStatus.PROCESSING, null)) + .thenReturn(captureSession); + when(mediaService.triggerProcessingStep1(any(), any(), any())).thenReturn(null); - var mockBaseUser = new BaseUserDTO(); - mockBaseUser.setId(UUID.randomUUID()); - mockBaseUser.setFirstName("Foo"); - mockBaseUser.setEmail("foo@bar.org"); - - var mockUser = new UserDTO(); - mockUser.setId(UUID.randomUUID()); - mockUser.setFirstName("Foo"); - mockUser.setEmail("foo@bar.org"); + var cleanupLiveEvents = createCleanupLiveEventsTask(); - var mockCourt = new CourtDTO(); - mockCourt.setName("Test Court"); + assertDoesNotThrow(cleanupLiveEvents::run); - var mockCaseDTO = new CaseDTO(); - mockCaseDTO.setReference("123456"); - mockCaseDTO.setCourt(mockCourt); - - doThrow(NotFoundException.class).when(captureSessionService).findByLiveEventId(liveEventDTO.getName()); - - var cleanupLiveEvents = new CleanupLiveEvents(mediaServiceBroker, - captureSessionService, - bookingService, - recordingService, - userService, - userAuthenticationService, - CRON_USER_EMAIL, - "Production", - stopLiveEventNotifierFlowClient); - - cleanupLiveEvents.run(); + verify(captureSessionService, times(1)) + .stopCaptureSession(eq(captureSessionId), eq(RecordingStatus.PROCESSING), any()); + verify(captureSessionService, times(1)) + .stopCaptureSession(eq(captureSessionId), eq(RecordingStatus.NO_RECORDING), any()); + } - verify(mediaServiceBroker, times(1)).getEnabledMediaService(); - verify(mediaService, times(1)).getLiveEvents(); - verify(captureSessionService, times(1)).findByLiveEventId(liveEventDTO.getName()); - verify(recordingService, never()).findAll(any(), eq(false), any()); - verify(mediaService, never()).getAsset(any()); - verify(mediaService, never()).stopLiveEvent(any(), any()); + private CleanupLiveEvents createCleanupLiveEventsTask() { + return new CleanupLiveEvents( + mediaServiceBroker, + captureSessionService, + bookingService, + userService, + userAuthenticationService, + stopLiveEventNotifierFlowClient, + CRON_USER_EMAIL, + CRON_PLATFORM_ENV, + BATCH_SIZE, + BATCH_COOLDOWN, + POLLING_INTERVAL + ); } } From 03816cf0b4bf6a8d010de67a5256891255269cbb Mon Sep 17 00:00:00 2001 From: lucas-phillips28 Date: Wed, 5 Feb 2025 17:04:29 +0000 Subject: [PATCH 3/3] fix sonar issues --- .../uk/gov/hmcts/reform/preapi/tasks/CleanupLiveEvents.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/uk/gov/hmcts/reform/preapi/tasks/CleanupLiveEvents.java b/src/main/java/uk/gov/hmcts/reform/preapi/tasks/CleanupLiveEvents.java index f882f13c0..dc9ad7214 100644 --- a/src/main/java/uk/gov/hmcts/reform/preapi/tasks/CleanupLiveEvents.java +++ b/src/main/java/uk/gov/hmcts/reform/preapi/tasks/CleanupLiveEvents.java @@ -125,6 +125,7 @@ public void run() throws RuntimeException { threadProcessLiveEventAssets.join(); } catch (InterruptedException e) { log.error("Clean up live event thread interrupted", e); + Thread.currentThread().interrupt(); throw new RuntimeException(e); } @@ -148,6 +149,7 @@ private void processLiveEvents() { Thread.sleep(jobPollingInterval); } catch (InterruptedException e) { // Do nothing + Thread.currentThread().interrupt(); } liveEventCleanupMap.entrySet()