Skip to content

Commit

Permalink
Use plugin dispatcher in query streams, #870
Browse files Browse the repository at this point in the history
* eventsByTag and TagViewSequenceNumberScanner were running on default-dispatcher
  instead of the plugin dispatcher
  • Loading branch information
patriknw committed Mar 9, 2021
1 parent f0ce92e commit 22670b4
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ import scala.concurrent.duration._
}

def onPull(): Unit = {
// FIXME remove
if (Thread.currentThread().getName.contains("akka.actor.default-dispatcher"))
throw new RuntimeException("Wrong akka.actor.default-dispatcher")

flush()
if (buffer.isEmpty && isAvailable(out)) {
maybeResultSet match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,10 @@ import akka.persistence.cassandra.PluginSettings

@tailrec private def tryPushOne(): Unit = {

// FIXME remove
if (Thread.currentThread().getName.contains("akka.actor.default-dispatcher"))
throw new RuntimeException("Wrong akka.actor.default-dispatcher")

queryState match {
case QueryResult(rs, empty, switchPartition) if isAvailable(out) =>
def afterExhausted(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,9 @@ import scala.compat.java8.FutureConverters._
@tailrec def tryPushOne(): Unit =
stageState.state match {
case QueryResult(rs) if isAvailable(out) =>
// FIXME remove
if (Thread.currentThread().getName.contains("akka.actor.default-dispatcher"))
throw new RuntimeException("Wrong akka.actor.default-dispatcher")
if (isExhausted(rs)) {
queryExhausted()
} else if (rs.remaining() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.{ ExecutionContext, Future }

import akka.persistence.cassandra.BucketSize
import akka.stream.ActorAttributes
import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink

/**
* INTERNAL API
Expand All @@ -46,7 +49,7 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
/**
* INTERNAL API
*/
@InternalApi private[akka] class TagViewSequenceNumberScanner(session: Session)(
@InternalApi private[akka] class TagViewSequenceNumberScanner(session: Session, pluginDispatcher: String)(
implicit materializer: Materializer,
@nowarn("msg=never used") ec: ExecutionContext) {
private val log = Logging(materializer.system, getClass)
Expand Down Expand Up @@ -94,11 +97,20 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
})
.flatMapConcat(bucket => {
log.debug("Scanning bucket {}", bucket)

// FIXME remove
if (Thread.currentThread().getName.contains("akka.actor.default-dispatcher"))
throw new RuntimeException("Wrong akka.actor.default-dispatcher")

session.selectTagSequenceNrs(tag, bucket, fromOffset, toOffset)
})
.map(row => (row.getString("persistence_id"), row.getLong("tag_pid_sequence_nr"), row.getUuid("timestamp")))
.runFold(Map.empty[Tag, (TagPidSequenceNr, UUID)]) {
.toMat(Sink.fold(Map.empty[Tag, (TagPidSequenceNr, UUID)]) {
case (acc, (pid, tagPidSequenceNr, timestamp)) =>
// FIXME remove
if (Thread.currentThread().getName.contains("akka.actor.default-dispatcher"))
throw new RuntimeException("Wrong akka.actor.default-dispatcher")

val (newTagPidSequenceNr, newTimestamp) = acc.get(pid) match {
case None =>
(tagPidSequenceNr, timestamp)
Expand All @@ -109,7 +121,9 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSession
(currentTagPidSequenceNr, currentTimestamp)
}
acc + (pid -> ((newTagPidSequenceNr, newTimestamp)))
}
})(Keep.right)
.withAttributes(ActorAttributes.dispatcher(pluginDispatcher))
.run()
}

if (scanningPeriod > Duration.Zero) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,9 @@ class CassandraReadJournal protected (
usingOffset,
initialTagPidSequenceNrs,
scanner))
}.via(deserializeEventsByTagRow).mapMaterializedValue(_ => NotUsed)
}.via(deserializeEventsByTagRow)
.withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
.mapMaterializedValue(_ => NotUsed)

} catch {
case NonFatal(e) =>
Expand All @@ -345,27 +347,29 @@ class CassandraReadJournal protected (

private def deserializeEventsByTagRow: Flow[EventsByTagStage.UUIDRow, UUIDPersistentRepr, NotUsed] = {
val deserializeEventAsync = querySettings.deserializationParallelism > 1
Flow[EventsByTagStage.UUIDRow]
.mapAsync(querySettings.deserializationParallelism) { uuidRow =>
val row = uuidRow.row
eventsByTagDeserializer.deserializeEvent(row, deserializeEventAsync).map {
case DeserializedEvent(payload, metadata) =>
val repr = mapEvent(PersistentRepr(
Flow[EventsByTagStage.UUIDRow].mapAsync(querySettings.deserializationParallelism) { uuidRow =>
val row = uuidRow.row
// FIXME remove
if (Thread.currentThread().getName.contains("akka.actor.default-dispatcher"))
throw new RuntimeException("Wrong akka.actor.default-dispatcher")
eventsByTagDeserializer.deserializeEvent(row, deserializeEventAsync).map {
case DeserializedEvent(payload, metadata) =>
val repr = mapEvent(
PersistentRepr(
payload,
sequenceNr = uuidRow.sequenceNr,
persistenceId = uuidRow.persistenceId,
manifest = row.getString("event_manifest"),
deleted = false,
sender = null,
writerUuid = row.getString("writer_uuid")))
val reprWithMeta = metadata match {
case OptionVal.None => repr
case OptionVal.Some(metadata) => repr.withMetadata(metadata)
}
UUIDPersistentRepr(uuidRow.offset, uuidRow.tagPidSequenceNr, reprWithMeta)
}
val reprWithMeta = metadata match {
case OptionVal.None => repr
case OptionVal.Some(metadata) => repr.withMetadata(metadata)
}
UUIDPersistentRepr(uuidRow.offset, uuidRow.tagPidSequenceNr, reprWithMeta)
}
.withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
}
}

private def eventsByTagPrereqs(tag: String, usingOffset: Boolean, fromOffset: UUID)
Expand All @@ -390,7 +394,9 @@ class CassandraReadJournal protected (
*/
@InternalApi
private[akka] val tagViewScanner: Future[TagViewSequenceNumberScanner] = preparedSelectTagSequenceNrs.map { ps =>
new TagViewSequenceNumberScanner(TagViewSequenceNumberScanner.Session(session, ps, querySettings.readProfile))
new TagViewSequenceNumberScanner(
TagViewSequenceNumberScanner.Session(session, ps, querySettings.readProfile),
querySettings.pluginDispatcher)
}

/**
Expand Down Expand Up @@ -513,7 +519,9 @@ class CassandraReadJournal protected (
usingOffset,
initialTagPidSequenceNrs,
scanner))
}.via(deserializeEventsByTagRow).mapMaterializedValue(_ => NotUsed)
}.via(deserializeEventsByTagRow)
.withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
.mapMaterializedValue(_ => NotUsed)

} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -647,9 +655,12 @@ class CassandraReadJournal protected (
querySettings.readProfile),
settings,
fastForwardEnabled))
.withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
.named(name)
}.mapAsync(querySettings.deserializationParallelism) { row =>
// FIXME remove
if (Thread.currentThread().getName.contains("akka.actor.default-dispatcher"))
throw new RuntimeException("Wrong akka.actor.default-dispatcher")

extractor.extract(row, deserializeEventAsync)
}
.withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
Expand Down Expand Up @@ -724,9 +735,8 @@ class CassandraReadJournal protected (
(s, ps) =>
Source
.fromGraph(new AllPersistenceIdsStage(refreshInterval, ps, s, querySettings.readProfile))
.withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
.mapMaterializedValue(_ => NotUsed)
.named(name))
.named(name)).withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
}

/**
Expand All @@ -738,8 +748,8 @@ class CassandraReadJournal protected (
(s, ps) =>
Source
.fromGraph(new AllPersistenceIdsStage(None, ps, s, querySettings.readProfile))
.withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))
.mapMaterializedValue(_ => NotUsed)
.named("currentPersistenceIdsFromMessages"))
.withAttributes(ActorAttributes.dispatcher(querySettings.pluginDispatcher))

}

0 comments on commit 22670b4

Please sign in to comment.