diff --git a/core/src/main/scala/akka/persistence/cassandra/query/AllPersistenceIdsStage.scala b/core/src/main/scala/akka/persistence/cassandra/query/AllPersistenceIdsStage.scala index 798c5238f..ca1adfd5b 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/AllPersistenceIdsStage.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/AllPersistenceIdsStage.scala @@ -121,6 +121,10 @@ import scala.concurrent.duration._ } def onPull(): Unit = { + // FIXME remove + if (Thread.currentThread().getName.contains("akka.actor.default-dispatcher")) + throw new RuntimeException("Wrong akka.actor.default-dispatcher") + flush() if (buffer.isEmpty && isAvailable(out)) { maybeResultSet match { diff --git a/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala b/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala index f37bdfb78..710bbdb3f 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala @@ -345,6 +345,10 @@ import akka.persistence.cassandra.PluginSettings @tailrec private def tryPushOne(): Unit = { + // FIXME remove + if (Thread.currentThread().getName.contains("akka.actor.default-dispatcher")) + throw new RuntimeException("Wrong akka.actor.default-dispatcher") + queryState match { case QueryResult(rs, empty, switchPartition) if isAvailable(out) => def afterExhausted(): Unit = { diff --git a/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala b/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala index 853cf45f5..77634cd50 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala @@ -794,6 +794,9 @@ import scala.compat.java8.FutureConverters._ @tailrec def tryPushOne(): Unit = stageState.state match { case QueryResult(rs) if isAvailable(out) => + // FIXME remove + if (Thread.currentThread().getName.contains("akka.actor.default-dispatcher")) + throw new RuntimeException("Wrong akka.actor.default-dispatcher") if (isExhausted(rs)) { queryExhausted() } else if (rs.remaining() == 0) { diff --git a/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala b/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala index d7d3aa9e9..f0b35a5b5 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/TagViewSequenceNumberScanner.scala @@ -23,7 +23,10 @@ import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.{ ExecutionContext, Future } import akka.persistence.cassandra.BucketSize +import akka.stream.ActorAttributes import akka.stream.alpakka.cassandra.scaladsl.CassandraSession +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink /** * INTERNAL API @@ -46,7 +49,7 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession /** * INTERNAL API */ -@InternalApi private[akka] class TagViewSequenceNumberScanner(session: Session)( +@InternalApi private[akka] class TagViewSequenceNumberScanner(session: Session, pluginDispatcher: String)( implicit materializer: Materializer, @nowarn("msg=never used") ec: ExecutionContext) { private val log = Logging(materializer.system, getClass) @@ -94,11 +97,20 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession }) .flatMapConcat(bucket => { log.debug("Scanning bucket {}", bucket) + + // FIXME remove + if (Thread.currentThread().getName.contains("akka.actor.default-dispatcher")) + throw new RuntimeException("Wrong akka.actor.default-dispatcher") + session.selectTagSequenceNrs(tag, bucket, fromOffset, toOffset) }) .map(row => (row.getString("persistence_id"), row.getLong("tag_pid_sequence_nr"), row.getUuid("timestamp"))) - .runFold(Map.empty[Tag, (TagPidSequenceNr, UUID)]) { + .toMat(Sink.fold(Map.empty[Tag, (TagPidSequenceNr, UUID)]) { case (acc, (pid, tagPidSequenceNr, timestamp)) => + // FIXME remove + if (Thread.currentThread().getName.contains("akka.actor.default-dispatcher")) + throw new RuntimeException("Wrong akka.actor.default-dispatcher") + val (newTagPidSequenceNr, newTimestamp) = acc.get(pid) match { case None => (tagPidSequenceNr, timestamp) @@ -109,7 +121,9 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession (currentTagPidSequenceNr, currentTimestamp) } acc + (pid -> ((newTagPidSequenceNr, newTimestamp))) - } + })(Keep.right) + .withAttributes(ActorAttributes.dispatcher(pluginDispatcher)) + .run() } if (scanningPeriod > Duration.Zero) { diff --git a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala index 2f4bc9777..2763e49c1 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/scaladsl/CassandraReadJournal.scala @@ -333,7 +333,9 @@ class CassandraReadJournal protected ( usingOffset, initialTagPidSequenceNrs, scanner)) - }.via(deserializeEventsByTagRow).mapMaterializedValue(_ => NotUsed) + }.via(deserializeEventsByTagRow) + .withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher)) + .mapMaterializedValue(_ => NotUsed) } catch { case NonFatal(e) => @@ -345,12 +347,15 @@ class CassandraReadJournal protected ( private def deserializeEventsByTagRow: Flow[EventsByTagStage.UUIDRow, UUIDPersistentRepr, NotUsed] = { val deserializeEventAsync = querySettings.deserializationParallelism > 1 - Flow[EventsByTagStage.UUIDRow] - .mapAsync(querySettings.deserializationParallelism) { uuidRow => - val row = uuidRow.row - eventsByTagDeserializer.deserializeEvent(row, deserializeEventAsync).map { - case DeserializedEvent(payload, metadata) => - val repr = mapEvent(PersistentRepr( + Flow[EventsByTagStage.UUIDRow].mapAsync(querySettings.deserializationParallelism) { uuidRow => + val row = uuidRow.row + // FIXME remove + if (Thread.currentThread().getName.contains("akka.actor.default-dispatcher")) + throw new RuntimeException("Wrong akka.actor.default-dispatcher") + eventsByTagDeserializer.deserializeEvent(row, deserializeEventAsync).map { + case DeserializedEvent(payload, metadata) => + val repr = mapEvent( + PersistentRepr( payload, sequenceNr = uuidRow.sequenceNr, persistenceId = uuidRow.persistenceId, @@ -358,14 +363,13 @@ class CassandraReadJournal protected ( deleted = false, sender = null, writerUuid = row.getString("writer_uuid"))) - val reprWithMeta = metadata match { - case OptionVal.None => repr - case OptionVal.Some(metadata) => repr.withMetadata(metadata) - } - UUIDPersistentRepr(uuidRow.offset, uuidRow.tagPidSequenceNr, reprWithMeta) - } + val reprWithMeta = metadata match { + case OptionVal.None => repr + case OptionVal.Some(metadata) => repr.withMetadata(metadata) + } + UUIDPersistentRepr(uuidRow.offset, uuidRow.tagPidSequenceNr, reprWithMeta) } - .withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher)) + } } private def eventsByTagPrereqs(tag: String, usingOffset: Boolean, fromOffset: UUID) @@ -390,7 +394,9 @@ class CassandraReadJournal protected ( */ @InternalApi private[akka] val tagViewScanner: Future[TagViewSequenceNumberScanner] = preparedSelectTagSequenceNrs.map { ps => - new TagViewSequenceNumberScanner(TagViewSequenceNumberScanner.Session(session, ps, querySettings.readProfile)) + new TagViewSequenceNumberScanner( + TagViewSequenceNumberScanner.Session(session, ps, querySettings.readProfile), + querySettings.pluginDispatcher) } /** @@ -513,7 +519,9 @@ class CassandraReadJournal protected ( usingOffset, initialTagPidSequenceNrs, scanner)) - }.via(deserializeEventsByTagRow).mapMaterializedValue(_ => NotUsed) + }.via(deserializeEventsByTagRow) + .withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher)) + .mapMaterializedValue(_ => NotUsed) } catch { case NonFatal(e) => @@ -647,9 +655,12 @@ class CassandraReadJournal protected ( querySettings.readProfile), settings, fastForwardEnabled)) - .withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher)) .named(name) }.mapAsync(querySettings.deserializationParallelism) { row => + // FIXME remove + if (Thread.currentThread().getName.contains("akka.actor.default-dispatcher")) + throw new RuntimeException("Wrong akka.actor.default-dispatcher") + extractor.extract(row, deserializeEventAsync) } .withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher)) @@ -724,9 +735,8 @@ class CassandraReadJournal protected ( (s, ps) => Source .fromGraph(new AllPersistenceIdsStage(refreshInterval, ps, s, querySettings.readProfile)) - .withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher)) .mapMaterializedValue(_ => NotUsed) - .named(name)) + .named(name)).withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher)) } /** @@ -738,8 +748,8 @@ class CassandraReadJournal protected ( (s, ps) => Source .fromGraph(new AllPersistenceIdsStage(None, ps, s, querySettings.readProfile)) - .withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher)) .mapMaterializedValue(_ => NotUsed) .named("currentPersistenceIdsFromMessages")) + .withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher)) }