From 35ed1ffe888eca6e0e1164d872e97580d4cdd3b3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 9 Mar 2021 14:16:42 +0100 Subject: [PATCH] Use plugin dispatcher in query streams, #870 (backport) --- .../query/TagViewSequenceNumberScanner.scala | 13 +++++--- .../query/scaladsl/CassandraReadJournal.scala | 31 ++++++++++--------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala b/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala index d9639d2c2..4d7f26a7d 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala @@ -20,10 +20,13 @@ import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import com.datastax.driver.core.PreparedStatement import com.datastax.driver.core.Row - import scala.concurrent.duration.{ Deadline, FiniteDuration } import scala.concurrent.{ ExecutionContext, Future } +import akka.stream.ActorAttributes +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink + @InternalApi private[akka] object TagViewSequenceNumberScanner { @@ -41,7 +44,7 @@ private[akka] object TagViewSequenceNumberScanner { } @InternalApi -private[akka] class TagViewSequenceNumberScanner(session: Session)( +private[akka] class TagViewSequenceNumberScanner(session: Session, pluginDispatcher: String)( implicit materializer: ActorMaterializer, ec: ExecutionContext) { private val log = Logging(materializer.system, getClass) @@ -93,7 +96,7 @@ private[akka] class TagViewSequenceNumberScanner(session: Session)( session.selectTagSequenceNrs(tag, bucket, fromOffset, toOffset) }) .map(row => (row.getString("persistence_id"), row.getLong("tag_pid_sequence_nr"), row.getUUID("timestamp"))) - .runFold(Map.empty[Tag, (TagPidSequenceNr, UUID)]) { + .toMat(Sink.fold(Map.empty[Tag, (TagPidSequenceNr, UUID)]) { case (acc, (pid, tagPidSequenceNr, timestamp)) => val (newTagPidSequenceNr, newTimestamp) = acc.get(pid) match { case None => @@ -105,7 +108,9 @@ private[akka] class TagViewSequenceNumberScanner(session: Session)( (currentTagPidSequenceNr, currentTimestamp) } acc + (pid -> ((newTagPidSequenceNr, newTimestamp))) - } + })(Keep.right) + .withAttributes(ActorAttributes.dispatcher(pluginDispatcher)) + .run() .flatMap { result => if (deadline.hasTimeLeft()) { doIt() diff --git a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala index 7d33ca084..4f4b218ff 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala @@ -325,7 +325,9 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config) usingOffset, initialTagPidSequenceNrs, scanner)) - }.via(deserializeEventsByTagRow).mapMaterializedValue(_ => NotUsed) + }.via(deserializeEventsByTagRow) + .withAttributes(ActorAttributes.dispatcher(queryPluginConfig.pluginDispatcher)) + .mapMaterializedValue(_ => NotUsed) } catch { case NonFatal(e) => @@ -337,11 +339,11 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config) private def deserializeEventsByTagRow: Flow[EventsByTagStage.UUIDRow, UUIDPersistentRepr, NotUsed] = { val deserializeEventAsync = queryPluginConfig.deserializationParallelism > 1 - Flow[EventsByTagStage.UUIDRow] - .mapAsync(queryPluginConfig.deserializationParallelism) { uuidRow => - val row = uuidRow.row - eventsByTagDeserializer.deserializeEvent(row, deserializeEventAsync).map { payload => - val repr = mapEvent(PersistentRepr( + Flow[EventsByTagStage.UUIDRow].mapAsync(queryPluginConfig.deserializationParallelism) { uuidRow => + val row = uuidRow.row + eventsByTagDeserializer.deserializeEvent(row, deserializeEventAsync).map { payload => + val repr = mapEvent( + PersistentRepr( payload, sequenceNr = uuidRow.sequenceNr, persistenceId = uuidRow.persistenceId, @@ -349,10 +351,9 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config) deleted = false, sender = null, writerUuid = row.getString("writer_uuid"))) - UUIDPersistentRepr(uuidRow.offset, uuidRow.tagPidSequenceNr, repr) - } + UUIDPersistentRepr(uuidRow.offset, uuidRow.tagPidSequenceNr, repr) } - .withAttributes(ActorAttributes.dispatcher(queryPluginConfig.pluginDispatcher)) + } } private def eventsByTagPrereqs(tag: String, usingOffset: Boolean, fromOffset: UUID) @@ -377,7 +378,9 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config) */ @InternalApi private[akka] lazy val tagViewScanner: Future[TagViewSequenceNumberScanner] = preparedSelectTagSequenceNrs.map { ps => - new TagViewSequenceNumberScanner(TagViewSequenceNumberScanner.Session(session, ps)) + new TagViewSequenceNumberScanner( + TagViewSequenceNumberScanner.Session(session, ps), + queryPluginConfig.pluginDispatcher) } /** @@ -497,7 +500,9 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config) usingOffset, initialTagPidSequenceNrs, scanner)) - }.via(deserializeEventsByTagRow).mapMaterializedValue(_ => NotUsed) + }.via(deserializeEventsByTagRow) + .withAttributes(ActorAttributes.dispatcher(queryPluginConfig.pluginDispatcher)) + .mapMaterializedValue(_ => NotUsed) } catch { case NonFatal(e) => @@ -630,7 +635,6 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config) customRetryPolicy), queryPluginConfig, fastForwardEnabled)) - .withAttributes(ActorAttributes.dispatcher(queryPluginConfig.pluginDispatcher)) .named(name) }.mapAsync(queryPluginConfig.deserializationParallelism) { row => extractor.extract(row, deserializeEventAsync) @@ -706,7 +710,6 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config) (s, ps) => Source .fromGraph(new AllPersistenceIdsStage(refreshInterval, queryPluginConfig.fetchSize, ps, s)) - .withAttributes(ActorAttributes.dispatcher(queryPluginConfig.pluginDispatcher)) .mapMaterializedValue(_ => NotUsed) - .named(name)) + .named(name)).withAttributes(ActorAttributes.dispatcher(queryPluginConfig.pluginDispatcher)) }