diff --git a/src/main/scala/it/pagopa/interop/authorizationmanagement/model/persistence/projection/ClientCqrsProjection.scala b/src/main/scala/it/pagopa/interop/authorizationmanagement/model/persistence/projection/ClientCqrsProjection.scala deleted file mode 100644 index f0dffeb..0000000 --- a/src/main/scala/it/pagopa/interop/authorizationmanagement/model/persistence/projection/ClientCqrsProjection.scala +++ /dev/null @@ -1,170 +0,0 @@ -package it.pagopa.interop.authorizationmanagement.model.persistence.projection - -import akka.actor.typed.ActorSystem -import it.pagopa.interop.authorizationmanagement.model.client.PersistentClient -import it.pagopa.interop.authorizationmanagement.model.persistence.JsonFormats._ -import it.pagopa.interop.authorizationmanagement.model.persistence._ -import it.pagopa.interop.commons.cqrs.model._ -import it.pagopa.interop.commons.cqrs.service.CqrsProjection -import it.pagopa.interop.commons.cqrs.service.DocumentConversions._ -import org.mongodb.scala.bson.BsonArray -import org.mongodb.scala.model._ -import org.mongodb.scala.{MongoCollection, _} -import slick.basic.DatabaseConfig -import slick.jdbc.JdbcProfile -import spray.json._ - -import scala.concurrent.ExecutionContext -import scala.jdk.CollectionConverters._ - -object ClientCqrsProjection { - - private final val emptyKeys: Map[String, JsValue] = Map("keys" -> JsArray.empty) - def projection(offsetDbConfig: DatabaseConfig[JdbcProfile], mongoDbConfig: MongoDbConfig, projectionId: String)( - implicit - system: ActorSystem[_], - ec: ExecutionContext - ): CqrsProjection[Event] = - CqrsProjection[Event](offsetDbConfig, mongoDbConfig, projectionId, eventHandler) - - private def eventHandler(collection: MongoCollection[Document], event: Event): PartialMongoAction = event match { - case ClientAdded(c) => - val data = JsObject(c.toJson.asJsObject.fields ++ emptyKeys) - ActionWithDocument(collection.insertOne, Document(s"{ data: ${data.compactPrint} }")) - case ClientDeleted(cId) => Action(collection.deleteOne(Filters.eq("data.id", cId))) - case KeysAdded(cId, keys) => - val updates = keys.map { case (_, key) => Updates.push(s"data.keys", key.toDocument) } - ActionWithBson(collection.updateOne(Filters.eq("data.id", cId), _), Updates.combine(updates.toList: _*)) - case KeyRelationshipToUserMigrated(cId, kId, uId) => - ActionWithBson( - collection.updateOne( - Filters.eq("data.id", cId), - _, - UpdateOptions().arrayFilters(List(Filters.eq("key.kid", kId)).asJava) - ), - Updates.set("data.keys.$[key].userId", uId.toString) - ) - case KeyDeleted(cId, kId, _) => - ActionWithBson( - collection.updateOne(Filters.eq("data.id", cId), _), - Updates.pull("data.keys", Document(s"{ kid : \"$kId\" }")) - ) - case RelationshipAdded(c, rId) => - ActionWithBson( - collection.updateOne(Filters.eq("data.id", c.id.toString), _), - Updates.push("data.relationships", rId.toString) - ) - case RelationshipRemoved(cId, rId) => - ActionWithBson(collection.updateOne(Filters.eq("data.id", cId), _), Updates.pull("data.relationships", rId)) - case UserAdded(c, rId) => - ActionWithBson( - collection.updateOne(Filters.eq("data.id", c.id.toString), _), - Updates.push("data.users", rId.toString) - ) - case UserRemoved(c, rId) => - ActionWithBson( - collection.updateOne(Filters.eq("data.id", c.id.toString), _), - Updates.pull("data.users", rId.toString) - ) - case ClientPurposeAdded(cId, states) => - // Added as array instead of map because it is not possible to update objects without knowing their key - ActionWithBson( - collection.updateOne(Filters.eq("data.id", cId), _), - Updates.push(s"data.purposes", states.toDocument) - ) - case ClientPurposeRemoved(cId, pId) => - // Note: Due to DocumentDB limitations, it is not possible, in a single instruction, to pull - // data from an array of objects filtering by a nested field of the object. - // e.g: { bar: [ { id: 1, v: { vv: "foo" } } ] } - // It is not possible to remove the element with id = 1 filtering by v.vv = "foo" - - val command = for { - document <- collection.find(Filters.eq("data.id", cId)) - client = Utils.extractData[PersistentClient](document) - updatedPurposes = client.purposes.filter(_.purpose.purposeId.toString != pId) - } yield Updates.set("data.purposes", BsonArray.fromIterable(updatedPurposes.map(_.toDocument.toBsonDocument()))) - - ActionWithObservable(collection.updateOne(Filters.eq("data.id", cId), _), command) - case EServiceStateUpdated(eServiceId, descriptorId, state, audience, voucherLifespan) => - // Updates all purposes states of all clients matching criteria - ActionWithBson( - collection.updateMany( - Filters.empty(), - _, - UpdateOptions().arrayFilters(List(Filters.eq("elem.eService.eServiceId", eServiceId)).asJava) - ), - Updates.combine( - Updates.set("data.purposes.$[elem].eService.state", state.toString), - Updates.set("data.purposes.$[elem].eService.descriptorId", descriptorId.toString), - Updates.set("data.purposes.$[elem].eService.audience", audience), - Updates.set("data.purposes.$[elem].eService.voucherLifespan", voucherLifespan) - ) - ) - case AgreementStateUpdated(eServiceId, consumerId, agreementId, state) => - // Updates all purposes states of all clients matching criteria - ActionWithBson( - collection.updateMany( - Filters.empty(), - _, - UpdateOptions().arrayFilters( - List( - Filters.and( - Filters.eq("elem.agreement.eServiceId", eServiceId), - Filters.eq("elem.agreement.consumerId", consumerId) - ) - ).asJava - ) - ), - Updates.combine( - Updates.set("data.purposes.$[elem].agreement.state", state.toString), - Updates.set("data.purposes.$[elem].agreement.agreementId", agreementId.toString) - ) - ) - case AgreementAndEServiceStatesUpdated( - eServiceId, - descriptorId, - consumerId, - agreementId, - agreementState, - eServiceState, - audience, - voucherLifespan - ) => - ActionWithBson( - collection.updateMany( - Filters.empty(), - _, - UpdateOptions().arrayFilters( - List( - Filters.and( - Filters.eq("elem.agreement.eServiceId", eServiceId), - Filters.eq("elem.agreement.consumerId", consumerId) - ) - ).asJava - ) - ), - Updates.combine( - Updates.set("data.purposes.$[elem].agreement.state", agreementState.toString), - Updates.set("data.purposes.$[elem].agreement.agreementId", agreementId.toString), - Updates.set("data.purposes.$[elem].eService.descriptorId", descriptorId.toString), - Updates.set("data.purposes.$[elem].eService.audience", audience), - Updates.set("data.purposes.$[elem].eService.voucherLifespan", voucherLifespan), - Updates.set("data.purposes.$[elem].eService.state", eServiceState.toString) - ) - ) - case PurposeStateUpdated(purposeId, versionId, state) => - // Updates all purposes states of all clients matching criteria - ActionWithBson( - collection.updateMany( - Filters.empty(), - _, - UpdateOptions().arrayFilters(List(Filters.eq("elem.purpose.purposeId", purposeId)).asJava) - ), - Updates.combine( - Updates.set("data.purposes.$[elem].purpose.state", state.toString), - Updates.set("data.purposes.$[elem].purpose.versionId", versionId.toString) - ) - ) - } - -} diff --git a/src/main/scala/it/pagopa/interop/authorizationmanagement/model/persistence/projection/ClientNotificationProjection.scala b/src/main/scala/it/pagopa/interop/authorizationmanagement/model/persistence/projection/ClientNotificationProjection.scala deleted file mode 100644 index 328852b..0000000 --- a/src/main/scala/it/pagopa/interop/authorizationmanagement/model/persistence/projection/ClientNotificationProjection.scala +++ /dev/null @@ -1,70 +0,0 @@ -package it.pagopa.interop.authorizationmanagement.model.persistence.projection - -import akka.Done -import akka.actor.typed.ActorSystem -import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal -import it.pagopa.interop.authorizationmanagement.model.persistence._ -import akka.persistence.query.Offset -import akka.projection.ProjectionId -import akka.projection.eventsourced.EventEnvelope -import akka.projection.eventsourced.scaladsl.EventSourcedProvider -import akka.projection.scaladsl.{ExactlyOnceProjection, SourceProvider} -import akka.projection.slick.{SlickHandler, SlickProjection} -import cats.syntax.all._ -import com.typesafe.scalalogging.Logger -import it.pagopa.interop.commons.queue.QueueWriter -import it.pagopa.interop.commons.queue.message.{Message, ProjectableEvent} -import slick.basic.DatabaseConfig -import slick.dbio._ -import slick.jdbc.JdbcProfile -import java.util.UUID -import scala.concurrent.ExecutionContext -import scala.util.{Failure, Success} - -class ClientNotificationProjection( - dbConfig: DatabaseConfig[JdbcProfile], - queueWriter: QueueWriter, - projectionId: String -)(implicit system: ActorSystem[_], ec: ExecutionContext) { - - def sourceProvider(tag: String): SourceProvider[Offset, EventEnvelope[Event]] = - EventSourcedProvider.eventsByTag[Event](system, readJournalPluginId = JdbcReadJournal.Identifier, tag = tag) - - def projection(tag: String): ExactlyOnceProjection[Offset, EventEnvelope[Event]] = SlickProjection.exactlyOnce( - projectionId = ProjectionId(projectionId, tag), - sourceProvider = sourceProvider(tag), - handler = () => new ProjectionHandler(queueWriter), - databaseConfig = dbConfig - ) -} - -class ProjectionHandler(queueWriter: QueueWriter)(implicit ec: ExecutionContext) - extends SlickHandler[EventEnvelope[Event]] { - - private val logger: Logger = Logger(this.getClass) - - def innerSend(message: Message): DBIO[Done] = DBIOAction.from { - def show(m: Message): String = { - val (persId, persNr, time) = (m.eventJournalPersistenceId, m.eventJournalSequenceNumber, m.eventTimestamp) - s"message with persistenceId $persId, sequenceNr $persNr and timestamp $time" - } - - val future = queueWriter.send(message) - future.onComplete { - case Failure(e) => logger.error(s"Error sending ${show(message)}", e) - case Success(_) => logger.debug(s"Wrote on queue ${show(message)}") - } - future.as(Done) - } - - val message: EventEnvelope[Event] => (String, ProjectableEvent) => Message = envelope => { case (kind, event) => - Message(UUID.randomUUID(), envelope.persistenceId, envelope.sequenceNr, envelope.timestamp, kind, event) - } - - override def process(envelope: EventEnvelope[Event]): DBIO[Done] = { - val kind: String = AuthorizationEventsSerde.getKind(envelope.event) - def send(kind: String, x: ProjectableEvent) = innerSend(message(envelope)(kind, x)) - send(kind, envelope.event) - } - -} diff --git a/src/main/scala/it/pagopa/interop/authorizationmanagement/model/persistence/projection/KeyCqrsProjection.scala b/src/main/scala/it/pagopa/interop/authorizationmanagement/model/persistence/projection/KeyCqrsProjection.scala deleted file mode 100644 index 1d0446a..0000000 --- a/src/main/scala/it/pagopa/interop/authorizationmanagement/model/persistence/projection/KeyCqrsProjection.scala +++ /dev/null @@ -1,46 +0,0 @@ -package it.pagopa.interop.authorizationmanagement.model.persistence.projection - -import akka.actor.typed.ActorSystem -import cats.implicits.toTraverseOps -import it.pagopa.interop.authorizationmanagement.api.impl.jwkKeyFormat -import it.pagopa.interop.authorizationmanagement.model.persistence._ -import it.pagopa.interop.authorizationmanagement.model.persistence.KeyAdapters._ -import it.pagopa.interop.authorizationmanagement.jwk.converter.KeyConverter -import it.pagopa.interop.commons.cqrs.model._ -import it.pagopa.interop.commons.cqrs.service.CqrsProjection -import org.mongodb.scala.model._ -import org.mongodb.scala.{MongoCollection, _} -import slick.basic.DatabaseConfig -import slick.jdbc.JdbcProfile -import spray.json.DefaultJsonProtocol.StringJsonFormat -import spray.json._ - -import scala.concurrent.ExecutionContext - -object KeyCqrsProjection { - - def projection(offsetDbConfig: DatabaseConfig[JdbcProfile], mongoDbConfig: MongoDbConfig, projectionId: String)( - implicit - system: ActorSystem[_], - ec: ExecutionContext - ): CqrsProjection[Event] = - CqrsProjection[Event](offsetDbConfig, mongoDbConfig, projectionId, eventHandler) - - private def eventHandler(collection: MongoCollection[Document], event: Event): PartialMongoAction = event match { - case KeysAdded(clientId, keys) => - val clientIdField: Map[String, JsValue] = Map("clientId" -> clientId.toJson) - val updates: Either[Throwable, Seq[ActionWithDocument]] = keys.values.toSeq.traverse(key => - KeyConverter - .fromBase64encodedPEMToAPIKey(key.kid, key.encodedPem, key.use.toJwk, key.algorithm) - .map { jwk => - val data: JsObject = JsObject(jwk.toApi.toJson.asJsObject.fields ++ clientIdField) - ActionWithDocument(collection.insertOne, Document(s"{ data: ${data.compactPrint} }")) - } - ) - updates.fold(ErrorAction, MultiAction) - case KeyDeleted(_, kid, _) => - Action(collection.deleteOne(Filters.eq("data.kid", kid))) - case ClientDeleted(cId) => Action(collection.deleteMany(Filters.eq("data.clientId", cId))) - case _ => NoOpAction - } -} diff --git a/src/main/scala/it/pagopa/interop/authorizationmanagement/model/persistence/projection/Utils.scala b/src/main/scala/it/pagopa/interop/authorizationmanagement/model/persistence/projection/Utils.scala deleted file mode 100644 index 9b5ecc7..0000000 --- a/src/main/scala/it/pagopa/interop/authorizationmanagement/model/persistence/projection/Utils.scala +++ /dev/null @@ -1,16 +0,0 @@ -package it.pagopa.interop.authorizationmanagement.model.persistence.projection - -import org.mongodb.scala.Document -import spray.json._ - -object Utils { - - def extractData[T: JsonReader](document: Document): T = { - val fields = document.toJson().parseJson.asJsObject.getFields("data") - fields match { - // Failures are not handled to stop the projection and avoid the consumption of further events - case data :: Nil => data.convertTo[T] - case _ => throw new Exception(s"Unexpected number of fields ${fields.size}. Content: $fields") - } - } -} diff --git a/src/main/scala/it/pagopa/interop/authorizationmanagement/server/impl/Dependencies.scala b/src/main/scala/it/pagopa/interop/authorizationmanagement/server/impl/Dependencies.scala index 671df37..96db40b 100644 --- a/src/main/scala/it/pagopa/interop/authorizationmanagement/server/impl/Dependencies.scala +++ b/src/main/scala/it/pagopa/interop/authorizationmanagement/server/impl/Dependencies.scala @@ -2,61 +2,49 @@ package it.pagopa.interop.authorizationmanagement.server.impl import akka.actor.typed.{ActorSystem, Behavior} import akka.cluster.sharding.typed.ShardingEnvelope -import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityContext, ShardedDaemonProcess} +import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityContext} import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.server.Directives.complete import akka.http.scaladsl.server.Route import akka.http.scaladsl.server.directives.SecurityDirectives import akka.persistence.typed.PersistenceId -import akka.projection.ProjectionBehavior import com.atlassian.oai.validator.report.ValidationReport import com.nimbusds.jose.proc.SecurityContext import com.nimbusds.jwt.proc.DefaultJWTClaimsVerifier import com.typesafe.scalalogging.{Logger, LoggerTakingImplicit} import it.pagopa.interop.authorizationmanagement.api._ import it.pagopa.interop.authorizationmanagement.api.impl.{ - MigrateApiMarshallerImpl, ClientApiMarshallerImpl, ClientApiServiceImpl, HealthApiMarshallerImpl, HealthServiceApiImpl, KeyApiMarshallerImpl, + KeyApiServiceImpl, + MigrateApiMarshallerImpl, + MigrateApiServiceImpl, PurposeApiMarshallerImpl, PurposeApiServiceImpl, TokenGenerationApiMarshallerImpl, TokenGenerationApiServiceImpl, serviceCode } -import it.pagopa.interop.authorizationmanagement.api.impl.{MigrateApiServiceImpl, KeyApiServiceImpl} import it.pagopa.interop.authorizationmanagement.common.system.ApplicationConfiguration import it.pagopa.interop.authorizationmanagement.common.system.ApplicationConfiguration.{ numberOfProjectionTags, projectionTag } -import it.pagopa.interop.authorizationmanagement.model.persistence.projection.{ - ClientCqrsProjection, - ClientNotificationProjection, - KeyCqrsProjection -} -import it.pagopa.interop.authorizationmanagement.model.persistence.{ - AuthorizationEventsSerde, - Command, - KeyPersistentBehavior -} +import it.pagopa.interop.authorizationmanagement.model.persistence.{Command, KeyPersistentBehavior} import it.pagopa.interop.commons.jwt.service.JWTReader import it.pagopa.interop.commons.jwt.service.impl.{DefaultJWTReader, getClaimsVerifier} import it.pagopa.interop.commons.jwt.{JWTConfiguration, KID, PublicKeysHolder, SerializedKey} import it.pagopa.interop.commons.logging.{CanLogContextFields, ContextFieldsToLog} -import it.pagopa.interop.commons.queue.QueueWriter import it.pagopa.interop.commons.utils.AkkaUtils.PassThroughAuthenticator import it.pagopa.interop.commons.utils.OpenapiUtils import it.pagopa.interop.commons.utils.TypeConversions._ import it.pagopa.interop.commons.utils.errors.{Problem => CommonProblem} import it.pagopa.interop.commons.utils.service.{OffsetDateTimeSupplier, UUIDSupplier} -import slick.basic.DatabaseConfig -import slick.jdbc.JdbcProfile -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} +import scala.concurrent.{ExecutionContext, Future} trait Dependencies { @@ -87,67 +75,6 @@ trait Dependencies { complete(error.status, error) } - def initProjections( - blockingEc: ExecutionContextExecutor - )(implicit actorSystem: ActorSystem[_], ec: ExecutionContext): Unit = { - initClientCqrsProjection() - initKeyCqrsProjection() - initNotificationProjection(blockingEc) - } - - def initClientCqrsProjection()(implicit actorSystem: ActorSystem[_], ec: ExecutionContext): Unit = { - val dbConfig: DatabaseConfig[JdbcProfile] = - DatabaseConfig.forConfig("akka-persistence-jdbc.shared-databases.slick") - - val projectionId = "client-cqrs-projections" - val cqrsProjection = - ClientCqrsProjection.projection(dbConfig, ApplicationConfiguration.clientsMongoDB, projectionId) - - ShardedDaemonProcess(actorSystem).init[ProjectionBehavior.Command]( - name = projectionId, - numberOfInstances = numberOfProjectionTags, - behaviorFactory = (i: Int) => ProjectionBehavior(cqrsProjection.projection(projectionTag(i))), - stopMessage = ProjectionBehavior.Stop - ) - } - - def initKeyCqrsProjection()(implicit actorSystem: ActorSystem[_], ec: ExecutionContext): Unit = { - val dbConfig: DatabaseConfig[JdbcProfile] = - DatabaseConfig.forConfig("akka-persistence-jdbc.shared-databases.slick") - - val projectionId = "key-cqrs-projections" - val cqrsProjection = KeyCqrsProjection.projection(dbConfig, ApplicationConfiguration.keysMongoDB, projectionId) - - ShardedDaemonProcess(actorSystem).init[ProjectionBehavior.Command]( - name = projectionId, - numberOfInstances = numberOfProjectionTags, - behaviorFactory = (i: Int) => ProjectionBehavior(cqrsProjection.projection(projectionTag(i))), - stopMessage = ProjectionBehavior.Stop - ) - } - - def initNotificationProjection( - blockingEc: ExecutionContextExecutor - )(implicit actorSystem: ActorSystem[_], ec: ExecutionContext): Unit = { - val queueWriter: QueueWriter = - QueueWriter.get(ApplicationConfiguration.queueUrl)(AuthorizationEventsSerde.authToJson)(blockingEc) - - val dbConfig: DatabaseConfig[JdbcProfile] = - DatabaseConfig.forConfig("akka-persistence-jdbc.shared-databases.slick") - - val notificationProjectionId: String = "authorization-notification-projections" - - val clientNotificationProjection: ClientNotificationProjection = - new ClientNotificationProjection(dbConfig, queueWriter, notificationProjectionId) - - ShardedDaemonProcess(actorSystem).init[ProjectionBehavior.Command]( - name = notificationProjectionId, - numberOfInstances = numberOfProjectionTags, - behaviorFactory = (i: Int) => ProjectionBehavior(clientNotificationProjection.projection(projectionTag(i))), - stopMessage = ProjectionBehavior.Stop - ) - } - def migrateApi(jwtReader: JWTReader, sharding: ClusterSharding)(implicit actorSystem: ActorSystem[_]) = new MigrateApi( MigrateApiServiceImpl(actorSystem, sharding, keyPersistentEntity), diff --git a/src/main/scala/it/pagopa/interop/authorizationmanagement/server/impl/Main.scala b/src/main/scala/it/pagopa/interop/authorizationmanagement/server/impl/Main.scala index d853b37..a13b8c6 100644 --- a/src/main/scala/it/pagopa/interop/authorizationmanagement/server/impl/Main.scala +++ b/src/main/scala/it/pagopa/interop/authorizationmanagement/server/impl/Main.scala @@ -1,9 +1,7 @@ package it.pagopa.interop.authorizationmanagement.server.impl -import cats.syntax.all._ -import buildinfo.BuildInfo -import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.Behaviors import akka.cluster.ClusterEvent import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.typed.{Cluster, Subscribe} @@ -11,15 +9,15 @@ import akka.http.scaladsl.Http import akka.http.scaladsl.Http.ServerBinding import akka.management.cluster.bootstrap.ClusterBootstrap import akka.management.scaladsl.AkkaManagement +import buildinfo.BuildInfo +import cats.syntax.all._ +import com.typesafe.scalalogging.Logger import it.pagopa.interop.authorizationmanagement.common.system.ApplicationConfiguration import it.pagopa.interop.authorizationmanagement.server.Controller import it.pagopa.interop.commons.logging.renderBuildInfo -import scala.concurrent.ExecutionContext -import com.typesafe.scalalogging.Logger -import scala.concurrent.Future -import scala.util.{Success, Failure} -import akka.actor.typed.DispatcherSelector -import scala.concurrent.ExecutionContextExecutor + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} object Main extends App with Dependencies { @@ -30,8 +28,6 @@ object Main extends App with Dependencies { implicit val actorSystem: ActorSystem[_] = context.system implicit val executionContext: ExecutionContext = actorSystem.executionContext // Let's keep it here in the case we'll need to call any external service - val selector: DispatcherSelector = DispatcherSelector.fromConfig("futures-dispatcher") - val blockingEc: ExecutionContextExecutor = actorSystem.dispatchers.lookup(selector) AkkaManagement.get(actorSystem).start() @@ -51,8 +47,6 @@ object Main extends App with Dependencies { cluster.subscriptions ! Subscribe(listener, classOf[ClusterEvent.MemberEvent]) - if (ApplicationConfiguration.projectionsEnabled) initProjections(blockingEc) - logger.info(renderBuildInfo(BuildInfo)) logger.info(s"Started cluster at ${cluster.selfMember.address}")