Skip to content
This repository has been archived by the owner on May 16, 2023. It is now read-only.

Commit

Permalink
Fix: Delete Search Portal Entries (#269)
Browse files Browse the repository at this point in the history
* [WIP]
Modify Entity
DB Changeset
Add Service Method

* Split Scheduler Services for Cancellation
Add Delete of SearchProtal Entries

* Add Unit Test

* Checkstyle

* Fix Search getRootGroupByName
  • Loading branch information
f11h authored Nov 9, 2022
1 parent d665f57 commit b6ae09a
Show file tree
Hide file tree
Showing 13 changed files with 392 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public class Cancellation {
@JsonIgnore
private String dataExportError;

@Column(name = "search_portal_deleted")
private ZonedDateTime searchPortalDeleted;

@Transient()
private ZonedDateTime finalDeletion;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public interface CancellationRepository extends JpaRepository<Cancellation, Stri
List<Cancellation> findByMovedToLongtermArchiveIsNullAndCancellationDateBefore(
ZonedDateTime expiryDate, Pageable pageable);

List<Cancellation> findBySearchPortalDeletedIsNullAndCancellationDateBefore(
ZonedDateTime expiryDate, Pageable pageable);

List<Cancellation> findByMovedToLongtermArchiveNotNullAndCsvCreatedIsNull(Pageable pageable);

List<Cancellation> findByCancellationDateBeforeAndDataDeletedIsNull(ZonedDateTime expiryDate);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,9 @@
package app.coronawarn.quicktest.service;

import app.coronawarn.quicktest.archive.domain.ArchiveCipherDtoV1;
import app.coronawarn.quicktest.config.CsvUploadConfig;
import app.coronawarn.quicktest.domain.Cancellation;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.opencsv.CSVWriter;
import com.opencsv.bean.StatefulBeanToCsv;
import com.opencsv.bean.StatefulBeanToCsvBuilder;
import java.io.ByteArrayInputStream;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.ZonedDateTime;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.security.crypto.codec.Hex;
import org.springframework.stereotype.Service;

@Slf4j
Expand All @@ -28,10 +12,6 @@
public class ArchiveSchedulingService {

private final ArchiveService archiveService;
private final CancellationService cancellationService;

private final CsvUploadConfig s3Config;
private final AmazonS3 s3Client;

/**
* Scheduler used for moving quicktests from qt archive to longterm.
Expand All @@ -44,97 +24,4 @@ public void moveToArchiveJob() {
archiveService.moveToArchive();
log.info("Completed Job: moveToArchiveJob");
}

/**
* Scheduler used for moving quicktests from qt archive to longterm when a cancellation was triggered.
*/
@Scheduled(cron = "${archive.cancellationArchiveJob.cron}")
@SchedulerLock(name = "CancellationArchiveJob", lockAtLeastFor = "PT0S",
lockAtMostFor = "${archive.cancellationArchiveJob.locklimit}")
public void cancellationArchiveJob() {
log.info("Starting Job: cancellationArchiveJob");
processCancellationArchiveBatchRecursion(cancellationService.getReadyToArchiveBatch());
log.info("Completed Job: cancellationArchiveJob");
}

private void processCancellationArchiveBatchRecursion(List<Cancellation> cancellations) {
log.info("Process Cancellation Archive Batch with size of {}", cancellations.size());
for (Cancellation cancellation : cancellations) {
String partnerId = cancellation.getPartnerId();
archiveService.moveToArchiveByTenantId(partnerId);
cancellationService.updateMovedToLongterm(cancellation, ZonedDateTime.now());
}

List<Cancellation> nextBatch = cancellationService.getReadyToArchiveBatch();
if (!nextBatch.isEmpty()) {
processCancellationArchiveBatchRecursion(nextBatch);
}
}

/**
* Scheduler used for moving longterm archives to bucket as a csv.
*/
@Scheduled(cron = "${archive.csvUploadJob.cron}")
@SchedulerLock(name = "CsvUploadJob", lockAtLeastFor = "PT0S",
lockAtMostFor = "${archive.csvUploadJob.locklimit}")
public void csvUploadJob() {
log.info("Starting Job: csvUploadJob");
processCsvUploadBatchRecursion(cancellationService.getReadyToUploadBatch());
log.info("Completed Job: csvUploadJob");
}

private void processCsvUploadBatchRecursion(List<Cancellation> cancellations) {
log.info("Process CSV Upload Batch with size of {}", cancellations.size());
for (Cancellation cancellation : cancellations) {
try {
List<ArchiveCipherDtoV1> quicktests =
archiveService.getQuicktestsFromLongtermByTenantId(cancellation.getPartnerId());

StringWriter stringWriter = new StringWriter();
CSVWriter csvWriter =
new CSVWriter(stringWriter, '\t', CSVWriter.NO_QUOTE_CHARACTER,
CSVWriter.DEFAULT_ESCAPE_CHARACTER, CSVWriter.DEFAULT_LINE_END);
StatefulBeanToCsv<ArchiveCipherDtoV1> beanToCsv =
new StatefulBeanToCsvBuilder<ArchiveCipherDtoV1>(csvWriter)
.build();
beanToCsv.write(quicktests);
byte[] csvBytes = stringWriter.toString().getBytes(StandardCharsets.UTF_8);

String objectId = cancellation.getPartnerId() + ".csv";

ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(csvBytes.length);

s3Client.putObject(
s3Config.getBucketName(),
objectId,
new ByteArrayInputStream(csvBytes), metadata);

log.info("File stored to S3 with id {}", objectId);

cancellationService.updateCsvCreated(cancellation, ZonedDateTime.now(), objectId,
getHash(csvBytes), quicktests.size(), csvBytes.length);
} catch (Exception e) {
String errorMessage = e.getClass().getName() + ": " + e.getMessage();

log.error("Could not convert Quicktest to CSV: " + errorMessage);
cancellationService.updateDataExportError(cancellation, errorMessage);
}
}

List<Cancellation> nextBatch = cancellationService.getReadyToUploadBatch();
if (!nextBatch.isEmpty()) {
processCsvUploadBatchRecursion(nextBatch);
}
}

private String getHash(byte[] bytes) {
try {
MessageDigest sha256 = MessageDigest.getInstance("SHA-256");
byte[] hashBytes = sha256.digest(bytes);
return String.valueOf(Hex.encode(hashBytes));
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Failed to load SHA-256 Message Digest");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*-
* ---license-start
* Corona-Warn-App / cwa-quick-test-backend
* ---
* Copyright (C) 2021 T-Systems International GmbH and all other contributors
* ---
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ---license-end
*/

package app.coronawarn.quicktest.service;

import app.coronawarn.quicktest.archive.domain.ArchiveCipherDtoV1;
import app.coronawarn.quicktest.config.CsvUploadConfig;
import app.coronawarn.quicktest.domain.Cancellation;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.opencsv.CSVWriter;
import com.opencsv.bean.StatefulBeanToCsv;
import com.opencsv.bean.StatefulBeanToCsvBuilder;
import java.io.ByteArrayInputStream;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.ZonedDateTime;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.javacrumbs.shedlock.spring.annotation.SchedulerLock;
import org.keycloak.representations.idm.GroupRepresentation;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.security.crypto.codec.Hex;
import org.springframework.stereotype.Service;

@Slf4j
@RequiredArgsConstructor
@Service
public class CancellationSchedulingService {

private final ArchiveService archiveService;
private final CancellationService cancellationService;

private final KeycloakService keycloakService;

private final CsvUploadConfig s3Config;
private final AmazonS3 s3Client;

/**
* Scheduler used for moving quicktests from qt archive to longterm when a cancellation was triggered.
*/
@Scheduled(cron = "${archive.cancellationArchiveJob.cron}")
@SchedulerLock(name = "CancellationArchiveJob", lockAtLeastFor = "PT0S",
lockAtMostFor = "${archive.cancellationArchiveJob.locklimit}")
public void cancellationArchiveJob() {
log.info("Starting Job: cancellationArchiveJob");
processCancellationArchiveBatchRecursion(cancellationService.getReadyToArchiveBatch());
log.info("Completed Job: cancellationArchiveJob");
}

private void processCancellationArchiveBatchRecursion(List<Cancellation> cancellations) {
log.info("Process Cancellation Archive Batch with size of {}", cancellations.size());
for (Cancellation cancellation : cancellations) {
String partnerId = cancellation.getPartnerId();
archiveService.moveToArchiveByTenantId(partnerId);
cancellationService.updateMovedToLongterm(cancellation, ZonedDateTime.now());
}

List<Cancellation> nextBatch = cancellationService.getReadyToArchiveBatch();
if (!nextBatch.isEmpty()) {
processCancellationArchiveBatchRecursion(nextBatch);
}
}

/**
* Scheduler used for moving longterm archives to bucket as a csv.
*/
@Scheduled(cron = "${archive.csvUploadJob.cron}")
@SchedulerLock(name = "CsvUploadJob", lockAtLeastFor = "PT0S",
lockAtMostFor = "${archive.csvUploadJob.locklimit}")
public void csvUploadJob() {
log.info("Starting Job: csvUploadJob");
processCsvUploadBatchRecursion(cancellationService.getReadyToUploadBatch());
log.info("Completed Job: csvUploadJob");
}

private void processCsvUploadBatchRecursion(List<Cancellation> cancellations) {
log.info("Process CSV Upload Batch with size of {}", cancellations.size());
for (Cancellation cancellation : cancellations) {
try {
List<ArchiveCipherDtoV1> quicktests =
archiveService.getQuicktestsFromLongtermByTenantId(cancellation.getPartnerId());

StringWriter stringWriter = new StringWriter();
CSVWriter csvWriter =
new CSVWriter(stringWriter, '\t', CSVWriter.NO_QUOTE_CHARACTER,
CSVWriter.DEFAULT_ESCAPE_CHARACTER, CSVWriter.DEFAULT_LINE_END);
StatefulBeanToCsv<ArchiveCipherDtoV1> beanToCsv =
new StatefulBeanToCsvBuilder<ArchiveCipherDtoV1>(csvWriter)
.build();
beanToCsv.write(quicktests);
byte[] csvBytes = stringWriter.toString().getBytes(StandardCharsets.UTF_8);

String objectId = cancellation.getPartnerId() + ".csv";

ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(csvBytes.length);

s3Client.putObject(
s3Config.getBucketName(),
objectId,
new ByteArrayInputStream(csvBytes), metadata);

log.info("File stored to S3 with id {}", objectId);

cancellationService.updateCsvCreated(cancellation, ZonedDateTime.now(), objectId,
getHash(csvBytes), quicktests.size(), csvBytes.length);
} catch (Exception e) {
String errorMessage = e.getClass().getName() + ": " + e.getMessage();

log.error("Could not convert Quicktest to CSV: " + errorMessage);
cancellationService.updateDataExportError(cancellation, errorMessage);
}
}

List<Cancellation> nextBatch = cancellationService.getReadyToUploadBatch();
if (!nextBatch.isEmpty()) {
processCsvUploadBatchRecursion(nextBatch);
}
}

/**
* Scheduler used for deleting SearchPortal entries.
*/
@Scheduled(cron = "${archive.cancellationSearchPortalDeleteJob.cron}")
@SchedulerLock(name = "CancellationSearchPortalDeleteJob", lockAtLeastFor = "PT0S",
lockAtMostFor = "${archive.cancellationSearchPortalDeleteJob.locklimit}")
public void cancellationSearchPortalDeleteJob() {
log.info("Starting Job: cancellationSearchPortalDeleteJob");
processCancellationDeleteSearchPortalBatch(cancellationService.getReadyToDeleteSearchPortal());
log.info("Completed Job: cancellationSearchPortalDeleteJob");
}

private void processCancellationDeleteSearchPortalBatch(List<Cancellation> cancellations) {
log.info("Process Cancellation DeleteSearchPortal Batch with size of {}", cancellations.size());
for (Cancellation cancellation : cancellations) {
GroupRepresentation rootGroup = keycloakService.getRootGroupByName(cancellation.getPartnerId());

if (rootGroup == null) {
log.error("Could not find RootGroup for Partner {}", cancellation.getPartnerId());
} else {
keycloakService.deleteSubGroupsFromMapService(rootGroup);
}

cancellationService.updateSearchPortalDeleted(cancellation, ZonedDateTime.now());
}

List<Cancellation> nextBatch = cancellationService.getReadyToArchiveBatch();
if (!nextBatch.isEmpty()) {
processCancellationArchiveBatchRecursion(nextBatch);
}
}

private String getHash(byte[] bytes) {
try {
MessageDigest sha256 = MessageDigest.getInstance("SHA-256");
byte[] hashBytes = sha256.digest(bytes);
return String.valueOf(Hex.encode(hashBytes));
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Failed to load SHA-256 Message Digest");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void updateCsvCreated(Cancellation cancellation, ZonedDateTime csvCreated
* @param requester Username of the user who requested the download link
*/
public void updateDownloadLinkRequested(
Cancellation cancellation, ZonedDateTime downloadLinkRequested, String requester) {
Cancellation cancellation, ZonedDateTime downloadLinkRequested, String requester) {
cancellation.setDownloadLinkRequested(downloadLinkRequested);
cancellation.setDownloadLinkRequestedBy(requester);
cancellationRepository.save(cancellation);
Expand Down Expand Up @@ -170,6 +170,17 @@ public void updateDataExportError(Cancellation cancellation, String errorMessage
cancellationRepository.save(cancellation);
}

/**
* Set SearchPortalDeleted Flag/Timestamp and persist entity.
*
* @param cancellation Cancellation Entity
* @param dataDeleted timestamp of search portal deletion
*/
public void updateSearchPortalDeleted(Cancellation cancellation, ZonedDateTime dataDeleted) {
cancellation.setSearchPortalDeleted(dataDeleted);
cancellationRepository.save(cancellation);
}

/**
* Searches in the DB for an existing cancellation entity which download request is older than 48h and not
* moved_to_longterm_archive.
Expand All @@ -185,6 +196,20 @@ public List<Cancellation> getReadyToArchiveBatch() {
ldt, PageRequest.of(0, archiveProperties.getCancellationArchiveJob().getChunkSize()));
}

/**
* Searches in the DB for an existing cancellation entity with searchPortalDeleted null and
* cancellation_date in past.
* Returns only one batch of entities. Batch Size depends on configuration.
*
* @return List holding all entities found.
*/
public List<Cancellation> getReadyToDeleteSearchPortal() {
ZonedDateTime ldt = ZonedDateTime.now();

return cancellationRepository.findBySearchPortalDeletedIsNullAndCancellationDateBefore(
ldt, PageRequest.of(0, archiveProperties.getCancellationArchiveJob().getChunkSize()));
}

/**
* Searches in the DB for an existing cancellation entity which moved_to_longterm_archive is not null but
* csv_created is null.
Expand Down
Loading

0 comments on commit b6ae09a

Please sign in to comment.