-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PIN-4123 Added max number for client keys (Recovered)
- Loading branch information
nttdata-rtorsoli
committed
Nov 8, 2023
1 parent
2917ffa
commit c2115cd
Showing
1 changed file
with
47 additions
and
99 deletions.
There are no files selected for viewing
146 changes: 47 additions & 99 deletions
146
src/main/scala/it/pagopa/interop/authorizationprocess/server/impl/Main.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,110 +1,58 @@ | ||
package it.pagopa.interop.authorizationprocess.server.impl | ||
|
||
import cats.syntax.all._ | ||
import akka.actor.typed.ActorSystem | ||
import akka.actor.typed.scaladsl.Behaviors | ||
import akka.http.scaladsl.Http | ||
import akka.management.scaladsl.AkkaManagement | ||
import buildinfo.BuildInfo | ||
import cats.syntax.all._ | ||
import com.typesafe.scalalogging.Logger | ||
import it.pagopa.interop.authorizationprocess.common.system.ApplicationConfiguration | ||
import it.pagopa.interop.authorizationprocess.server.Controller | ||
import it.pagopa.interop.commons.logging.renderBuildInfo | ||
import it.pagopa.interop.commons.utils.CORSSupport | ||
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} | ||
import scala.util.{Failure, Success} | ||
import akka.actor.typed.DispatcherSelector | ||
|
||
import it.pagopa.interop.authorizationmanagement.model.client.PersistentClient | ||
import it.pagopa.interop.authorizationmanagement.model.persistence.JsonFormats._ | ||
import it.pagopa.interop.authorizationmanagement.model.key.PersistentKey | ||
import it.pagopa.interop.authorizationprocess.common.readmodel.model.ReadModelClientWithKeys | ||
import it.pagopa.interop.authorizationprocess.common.readmodel.model.impl._ | ||
import it.pagopa.interop.authorizationprocess.service.{AuthorizationManagementService, PartyManagementService} | ||
|
||
import org.mongodb.scala.model.Filters | ||
import it.pagopa.interop.commons.utils.CORRELATION_ID_HEADER | ||
|
||
import scala.concurrent.duration.Duration | ||
import java.util.concurrent.{Executors, ExecutorService} | ||
import scala.concurrent.{ExecutionContext, Future, Await} | ||
|
||
import java.util.UUID | ||
import scala.util.Failure | ||
|
||
object Main extends App with Dependencies { | ||
object Main extends App with CORSSupport with Dependencies { | ||
|
||
val logger: Logger = Logger(this.getClass) | ||
|
||
implicit val context: List[(String, String)] = (CORRELATION_ID_HEADER -> UUID.randomUUID().toString()) :: Nil | ||
|
||
implicit val actorSystem: ActorSystem[Nothing] = | ||
ActorSystem[Nothing](Behaviors.empty, "interop-be-authorization-process-alignment") | ||
implicit val executionContext: ExecutionContext = actorSystem.executionContext | ||
|
||
implicit val es: ExecutorService = Executors.newFixedThreadPool(1.max(Runtime.getRuntime.availableProcessors() - 1)) | ||
implicit val blockingEc = ExecutionContext.fromExecutor(es) | ||
implicit val authorizationManagementService: AuthorizationManagementService = authorizationManagementService( | ||
blockingEc | ||
ActorSystem[Nothing]( | ||
Behaviors.setup[Nothing] { context => | ||
implicit val actorSystem: ActorSystem[_] = context.system | ||
implicit val executionContext: ExecutionContext = actorSystem.executionContext | ||
val selector: DispatcherSelector = DispatcherSelector.fromConfig("futures-dispatcher") | ||
val blockingEc: ExecutionContextExecutor = actorSystem.dispatchers.lookup(selector) | ||
|
||
AkkaManagement.get(actorSystem.classicSystem).start() | ||
|
||
logger.info(renderBuildInfo(BuildInfo)) | ||
|
||
val serverBinding = for { | ||
jwtReader <- getJwtValidator() | ||
controller = new Controller( | ||
clientApi(jwtReader, blockingEc), | ||
healthApi, | ||
operatorApi(jwtReader, blockingEc), | ||
validationExceptionToRoute.some | ||
)(actorSystem.classicSystem) | ||
binding <- Http()(actorSystem.classicSystem) | ||
.newServerAt("0.0.0.0", ApplicationConfiguration.serverPort) | ||
.bind(corsHandler(controller.routes)) | ||
} yield binding | ||
|
||
serverBinding.onComplete { | ||
case Success(b) => | ||
logger.info(s"Started server at ${b.localAddress.getHostString()}:${b.localAddress.getPort()}") | ||
case Failure(e) => | ||
actorSystem.terminate() | ||
logger.error("Startup error: ", e) | ||
} | ||
|
||
Behaviors.empty[Nothing] | ||
}, | ||
BuildInfo.name | ||
) | ||
implicit val partyManagementService: PartyManagementService = partyManagementService() | ||
|
||
logger.info("Starting update") | ||
logger.info(s"Retrieving clients") | ||
Await.result( | ||
execution() | ||
.andThen { case Failure(ex) => logger.error("Houston we have a problem", ex) } | ||
.andThen { _ => | ||
es.shutdown() | ||
}, | ||
Duration.Inf | ||
): Unit | ||
|
||
logger.info("Completed update") | ||
|
||
def execution(): Future[Unit] = for { | ||
clients <- getClients() | ||
_ = logger.info(s"Start update clients ${clients.size}") | ||
_ <- clients.traverse(updateClient) | ||
_ = logger.info(s"End update clients") | ||
_ = logger.info(s"Retrieving keys") | ||
keys <- clients.traverse(getClientKeys).map(_.flatten) | ||
_ = logger.info(s"Start update keys ${keys.size}") | ||
_ <- keys | ||
.flatMap(client => | ||
client.keys.collect { case PersistentKey(Some(relationshipId), None, kid, _, _, _, _, _) => | ||
Parameter(client.id, kid, relationshipId) | ||
} | ||
) | ||
.traverse(keys => updateKeys(keys)) | ||
_ = logger.info(s"End update keys") | ||
} yield () | ||
|
||
def updateClient(client: PersistentClient): Future[Unit] = { | ||
logger.info(s"Update client ${client.id}") | ||
for { | ||
|
||
relationship <- client.relationships.toList.traverse(partyManagementService.getRelationshipById) | ||
_ <- relationship.traverse(rel => { | ||
if (client.users.exists(_ == rel.from)) Future.unit | ||
else authorizationManagementService.addUser(client.id, rel.from) | ||
}) | ||
} yield () | ||
} | ||
|
||
def getClients(): Future[Seq[PersistentClient]] = | ||
getAll(50)(readModelService.find[PersistentClient]("clients", Filters.empty(), _, _)) | ||
|
||
def getClientKeys(client: PersistentClient): Future[Option[ReadModelClientWithKeys]] = | ||
readModelService.findOne[ReadModelClientWithKeys]("clients", Filters.eq("data.id", client.id.toString)) | ||
|
||
def updateKeys(key: Parameter): Future[Unit] = { | ||
logger.info(s"Update keys for client ${key.clientId}") | ||
for { | ||
relationship <- partyManagementService.getRelationshipById(key.relationShipId) | ||
_ <- authorizationManagementService.migrateKeyRelationshipToUser(key.clientId, key.kid, relationship.from) | ||
} yield () | ||
} | ||
|
||
def getAll[T](limit: Int)(get: (Int, Int) => Future[Seq[T]]): Future[Seq[T]] = { | ||
def go(offset: Int)(acc: Seq[T]): Future[Seq[T]] = { | ||
get(offset, limit).flatMap(xs => | ||
if (xs.size < limit) Future.successful(xs ++ acc) | ||
else go(offset + xs.size)(xs ++ acc) | ||
) | ||
} | ||
go(0)(Nil) | ||
} | ||
|
||
final case class Parameter(clientId: UUID, kid: String, relationShipId: UUID) | ||
} |