Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIN-3516 Bulk update Client and Keys (One shot) #229

Merged
merged 9 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/main/resources/interface-specification.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,6 @@ components:
type: string
format: date-time
required:
- relationshipId
- kid
- name
- encodedPem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object Adapters {
description = p.description,
consumerId = p.consumerId,
purposes = p.purposes.map(_.toApi),
relationshipsIds = if (showRelationShips) p.relationships else Set.empty,
relationshipsIds = if (showRelationShips) p.users else Set.empty,
kind = p.kind.toApi,
createdAt = p.createdAt
)
Expand Down Expand Up @@ -140,7 +140,7 @@ object Adapters {
implicit class KeySeedWrapper(private val keySeed: KeySeed) extends AnyVal {
def toDependency(relationshipId: UUID, createdAt: OffsetDateTime): AuthorizationManagementDependency.KeySeed =
AuthorizationManagementDependency.KeySeed(
relationshipId = relationshipId,
userId = relationshipId,
key = keySeed.key,
use = keySeed.use.toDependency,
alg = keySeed.alg,
Expand Down Expand Up @@ -179,7 +179,7 @@ object Adapters {
use = key.use.toApi,
name = key.name,
createdAt = key.createdAt,
relationshipId = key.relationshipId
relationshipId = Some(key.userId)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.nimbusds.jwt.proc.DefaultJWTClaimsVerifier
import it.pagopa.interop.authorizationmanagement.client.api.{
ClientApi => AuthorizationClientApi,
KeyApi => AuthorizationKeyApi,
MigrateApi,
PurposeApi => AuthorizationPurposeApi
}
import it.pagopa.interop.authorizationprocess.api.impl.{
Expand Down Expand Up @@ -79,6 +80,7 @@ trait Dependencies {
AuthorizationManagementInvoker(blockingEc)(actorSystem.classicSystem),
AuthorizationClientApi(ApplicationConfiguration.getAuthorizationManagementURL),
AuthorizationKeyApi(ApplicationConfiguration.getAuthorizationManagementURL),
MigrateApi(ApplicationConfiguration.getAuthorizationManagementURL),
AuthorizationPurposeApi(ApplicationConfiguration.getAuthorizationManagementURL)
)(blockingEc)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,58 +1,110 @@
package it.pagopa.interop.authorizationprocess.server.impl

import cats.syntax.all._
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.management.scaladsl.AkkaManagement
import buildinfo.BuildInfo
import cats.syntax.all._
import com.typesafe.scalalogging.Logger
import it.pagopa.interop.authorizationprocess.common.system.ApplicationConfiguration
import it.pagopa.interop.authorizationprocess.server.Controller
import it.pagopa.interop.commons.logging.renderBuildInfo
import it.pagopa.interop.commons.utils.CORSSupport
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.{Failure, Success}
import akka.actor.typed.DispatcherSelector

object Main extends App with CORSSupport with Dependencies {
import it.pagopa.interop.authorizationmanagement.model.client.PersistentClient
import it.pagopa.interop.authorizationmanagement.model.persistence.JsonFormats._
import it.pagopa.interop.authorizationmanagement.model.key.PersistentKey
import it.pagopa.interop.authorizationprocess.common.readmodel.model.ReadModelClientWithKeys
import it.pagopa.interop.authorizationprocess.common.readmodel.model.impl._
import it.pagopa.interop.authorizationprocess.service.{AuthorizationManagementService, PartyManagementService}

import org.mongodb.scala.model.Filters
import it.pagopa.interop.commons.utils.CORRELATION_ID_HEADER

import scala.concurrent.duration.Duration
import java.util.concurrent.{Executors, ExecutorService}
import scala.concurrent.{ExecutionContext, Future, Await}

import java.util.UUID
import scala.util.Failure

object Main extends App with Dependencies {

val logger: Logger = Logger(this.getClass)

ActorSystem[Nothing](
Behaviors.setup[Nothing] { context =>
implicit val actorSystem: ActorSystem[_] = context.system
implicit val executionContext: ExecutionContext = actorSystem.executionContext
val selector: DispatcherSelector = DispatcherSelector.fromConfig("futures-dispatcher")
val blockingEc: ExecutionContextExecutor = actorSystem.dispatchers.lookup(selector)

AkkaManagement.get(actorSystem.classicSystem).start()

logger.info(renderBuildInfo(BuildInfo))

val serverBinding = for {
jwtReader <- getJwtValidator()
controller = new Controller(
clientApi(jwtReader, blockingEc),
healthApi,
operatorApi(jwtReader, blockingEc),
validationExceptionToRoute.some
)(actorSystem.classicSystem)
binding <- Http()(actorSystem.classicSystem)
.newServerAt("0.0.0.0", ApplicationConfiguration.serverPort)
.bind(corsHandler(controller.routes))
} yield binding

serverBinding.onComplete {
case Success(b) =>
logger.info(s"Started server at ${b.localAddress.getHostString()}:${b.localAddress.getPort()}")
case Failure(e) =>
actorSystem.terminate()
logger.error("Startup error: ", e)
}

Behaviors.empty[Nothing]
},
BuildInfo.name
implicit val context: List[(String, String)] = (CORRELATION_ID_HEADER -> UUID.randomUUID().toString()) :: Nil

implicit val actorSystem: ActorSystem[Nothing] =
ActorSystem[Nothing](Behaviors.empty, "interop-be-authorization-process-alignment")
implicit val executionContext: ExecutionContext = actorSystem.executionContext

implicit val es: ExecutorService = Executors.newFixedThreadPool(1.max(Runtime.getRuntime.availableProcessors() - 1))
implicit val blockingEc = ExecutionContext.fromExecutor(es)
implicit val authorizationManagementService: AuthorizationManagementService = authorizationManagementService(
blockingEc
)
implicit val partyManagementService: PartyManagementService = partyManagementService()

logger.info("Starting update")
logger.info(s"Retrieving clients")
Await.result(
execution()
.andThen { case Failure(ex) => logger.error("Houston we have a problem", ex) }
.andThen { _ =>
es.shutdown()
},
Duration.Inf
): Unit

logger.info("Completed update")

def execution(): Future[Unit] = for {
clients <- getClients()
_ = logger.info(s"Start update clients ${clients.size}")
_ <- clients.traverse(updateClient)
_ = logger.info(s"End update clients")
_ = logger.info(s"Retrieving keys")
keys <- clients.traverse(getClientKeys).map(_.flatten)
_ = logger.info(s"Start update keys ${keys.size}")
_ <- keys
.flatMap(client =>
client.keys.collect { case PersistentKey(Some(relationshipId), None, kid, _, _, _, _, _) =>
Parameter(client.id, kid, relationshipId)
}
)
.traverse(keys => updateKeys(keys))
_ = logger.info(s"End update keys")
} yield ()

def updateClient(client: PersistentClient): Future[Unit] = {
logger.info(s"Update client ${client.id}")
for {

relationship <- client.relationships.toList.traverse(partyManagementService.getRelationshipById)
_ <- relationship.traverse(rel => {
if (client.users.exists(_ == rel.from)) Future.unit
else authorizationManagementService.addUser(client.id, rel.from)
})
} yield ()
}

def getClients(): Future[Seq[PersistentClient]] =
getAll(50)(readModelService.find[PersistentClient]("clients", Filters.empty(), _, _))

def getClientKeys(client: PersistentClient): Future[Option[ReadModelClientWithKeys]] =
readModelService.findOne[ReadModelClientWithKeys]("clients", Filters.eq("data.id", client.id.toString))

def updateKeys(key: Parameter): Future[Unit] = {
logger.info(s"Update keys for client ${key.clientId}")
for {
relationship <- partyManagementService.getRelationshipById(key.relationShipId)
_ <- authorizationManagementService.migrateKeyRelationshipToUser(key.clientId, key.kid, relationship.from)
} yield ()
}

def getAll[T](limit: Int)(get: (Int, Int) => Future[Seq[T]]): Future[Seq[T]] = {
def go(offset: Int)(acc: Seq[T]): Future[Seq[T]] = {
get(offset, limit).flatMap(xs =>
if (xs.size < limit) Future.successful(xs ++ acc)
else go(offset + xs.size)(xs ++ acc)
)
}
go(0)(Nil)
}

final case class Parameter(clientId: UUID, kid: String, relationShipId: UUID)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ trait AuthorizationManagementService {

def deleteClient(clientId: UUID)(implicit contexts: Seq[(String, String)]): Future[Unit]

def addUser(clientId: UUID, userId: UUID)(implicit contexts: Seq[(String, String)]): Future[ManagementClient]

def migrateKeyRelationshipToUser(clientId: UUID, keyId: String, userId: UUID)(implicit
contexts: Seq[(String, String)]
): Future[Unit]

def addRelationship(clientId: UUID, relationshipId: UUID)(implicit
contexts: Seq[(String, String)]
): Future[ManagementClient]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package it.pagopa.interop.authorizationprocess.service.impl

import com.typesafe.scalalogging.{Logger, LoggerTakingImplicit}
import it.pagopa.interop.authorizationmanagement.client.api.{ClientApi, KeyApi, PurposeApi}
import it.pagopa.interop.authorizationmanagement.client.api.{ClientApi, KeyApi, PurposeApi, MigrateApi}
import it.pagopa.interop.authorizationmanagement.client.invoker.{ApiError, ApiRequest, BearerToken}
import it.pagopa.interop.authorizationmanagement.client.model._
import it.pagopa.interop.authorizationmanagement.model.client.{PersistentClient, PersistentClientKind}
Expand All @@ -25,11 +25,12 @@ final case class AuthorizationManagementServiceImpl(
invoker: AuthorizationManagementInvoker,
clientApi: ClientApi,
keyApi: KeyApi,
migrateApi: MigrateApi,
purposeApi: PurposeApi
)(implicit ec: ExecutionContext)
extends AuthorizationManagementService {

implicit val logger: LoggerTakingImplicit[ContextFieldsToLog] =
implicit val logger: LoggerTakingImplicit[ContextFieldsToLog] =
Logger.takingImplicit[ContextFieldsToLog](this.getClass)
override def createClient(
consumerId: UUID,
Expand All @@ -48,7 +49,7 @@ final case class AuthorizationManagementServiceImpl(
description = description,
kind = kind,
createdAt = createdAt,
members = members
users = members
),
xForwardedFor = ip
)(BearerToken(bearerToken))
Expand All @@ -58,7 +59,7 @@ final case class AuthorizationManagementServiceImpl(
clientId: UUID
)(implicit ec: ExecutionContext, readModel: ReadModelService): Future[PersistentClient] =
ReadModelAuthorizationQueries.getClientById(clientId).flatMap(_.toFuture(ClientNotFound(clientId)))
override def deleteClient(clientId: UUID)(implicit contexts: Seq[(String, String)]): Future[Unit] =
override def deleteClient(clientId: UUID)(implicit contexts: Seq[(String, String)]): Future[Unit] =
withHeaders[Unit] { (bearerToken, correlationId, ip) =>
val request: ApiRequest[Unit] =
clientApi.deleteClient(xCorrelationId = correlationId, clientId.toString, xForwardedFor = ip)(
Expand All @@ -70,22 +71,43 @@ final case class AuthorizationManagementServiceImpl(
case err: ApiError[_] if err.code == 404 => Future.failed(ClientNotFound(clientId))
}
}

override def migrateKeyRelationshipToUser(clientId: UUID, keyId: String, userId: UUID)(implicit
contexts: Seq[(String, String)]
): Future[Unit] = withHeaders[Unit] { (bearerToken, correlationId, ip) =>
val request: ApiRequest[Unit] =
migrateApi.migrateKeyRelationshipToUser(
xCorrelationId = correlationId,
clientId,
keyId,
UserSeed(userId),
xForwardedFor = ip
)(BearerToken(bearerToken))
invoker.invoke(request, "Key user migration")
}

override def addUser(clientId: UUID, userId: UUID)(implicit contexts: Seq[(String, String)]): Future[Client] =
withHeaders[Client] { (bearerToken, correlationId, ip) =>
val request: ApiRequest[Client] =
clientApi.addUser(xCorrelationId = correlationId, clientId, UserSeed(userId), xForwardedFor = ip)(
BearerToken(bearerToken)
)
invoker.invoke(request, "Operator addition to client")
}
override def addRelationship(clientId: UUID, relationshipId: UUID)(implicit
contexts: Seq[(String, String)]
): Future[Client] = withHeaders[Client] { (bearerToken, correlationId, ip) =>
val request: ApiRequest[Client] = clientApi.addRelationship(
xCorrelationId = correlationId,
clientId,
PartyRelationshipSeed(relationshipId),
xForwardedFor = ip
)(BearerToken(bearerToken))
val request: ApiRequest[Client] =
clientApi.addUser(xCorrelationId = correlationId, clientId, UserSeed(relationshipId), xForwardedFor = ip)(
BearerToken(bearerToken)
)
invoker.invoke(request, "Operator addition to client")
}
override def removeClientRelationship(clientId: UUID, relationshipId: UUID)(implicit
contexts: Seq[(String, String)]
): Future[Unit] = withHeaders[Unit] { (bearerToken, correlationId, ip) =>
val request: ApiRequest[Unit] =
clientApi.removeClientRelationship(xCorrelationId = correlationId, clientId, relationshipId, xForwardedFor = ip)(
clientApi.removeClientUser(xCorrelationId = correlationId, clientId, relationshipId, xForwardedFor = ip)(
BearerToken(bearerToken)
)
invoker
Expand All @@ -103,7 +125,7 @@ final case class AuthorizationManagementServiceImpl(
.flatMap(_.map(_.keys).toFuture(ClientKeyNotFound(clientId, kid)))
key <- keys.find(_.kid == kid).toFuture(ClientKeyNotFound(clientId, kid))
} yield key
override def deleteKey(clientId: UUID, kid: String)(implicit contexts: Seq[(String, String)]): Future[Unit] =
override def deleteKey(clientId: UUID, kid: String)(implicit contexts: Seq[(String, String)]): Future[Unit] =
withHeaders[Unit] { (bearerToken, correlationId, ip) =>
val request: ApiRequest[Unit] =
keyApi.deleteClientKeyById(xCorrelationId = correlationId, clientId, kid, xForwardedFor = ip)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package it.pagopa.interop.authorizationprocess
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.testkit.ScalatestRouteTest
import it.pagopa.interop.authorizationmanagement
import it.pagopa.interop.authorizationmanagement.client.api.{ClientApi, KeyApi, PurposeApi}
import it.pagopa.interop.authorizationmanagement.client.api.{ClientApi, KeyApi, PurposeApi, MigrateApi}
import it.pagopa.interop.authorizationmanagement.model.client.{Api, PersistentClient, PersistentClientKind}
import it.pagopa.interop.authorizationprocess.api.impl.ClientApiServiceImpl
import it.pagopa.interop.authorizationprocess.common.readmodel.PaginatedResult
Expand Down Expand Up @@ -88,6 +88,7 @@ class ClientOperationSpec extends AnyWordSpecLike with MockFactory with SpecUtil
AuthorizationManagementInvoker(ExecutionContext.global),
ClientApi(),
KeyApi(),
MigrateApi(),
PurposeApi()
),
mockAgreementManagementService,
Expand Down Expand Up @@ -186,6 +187,7 @@ class ClientOperationSpec extends AnyWordSpecLike with MockFactory with SpecUtil
purposes = Seq.empty,
description = None,
relationships = Set.empty,
users = Set.empty,
kind = Api,
createdAt = timestamp
)
Expand Down Expand Up @@ -240,6 +242,7 @@ class ClientOperationSpec extends AnyWordSpecLike with MockFactory with SpecUtil
purposes = Seq.empty,
description = None,
relationships = Set.empty,
users = Set.empty,
kind = Api,
createdAt = timestamp
)
Expand Down
Loading