Skip to content

Commit

Permalink
Use plugin dispatcher in query streams, #870 (backport)
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Mar 9, 2021
1 parent ccc0b1d commit 35ed1ff
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.Row

import scala.concurrent.duration.{ Deadline, FiniteDuration }
import scala.concurrent.{ ExecutionContext, Future }

import akka.stream.ActorAttributes
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink

@InternalApi
private[akka] object TagViewSequenceNumberScanner {

Expand All @@ -41,7 +44,7 @@ private[akka] object TagViewSequenceNumberScanner {
}

@InternalApi
private[akka] class TagViewSequenceNumberScanner(session: Session)(
private[akka] class TagViewSequenceNumberScanner(session: Session, pluginDispatcher: String)(
implicit materializer: ActorMaterializer,
ec: ExecutionContext) {
private val log = Logging(materializer.system, getClass)
Expand Down Expand Up @@ -93,7 +96,7 @@ private[akka] class TagViewSequenceNumberScanner(session: Session)(
session.selectTagSequenceNrs(tag, bucket, fromOffset, toOffset)
})
.map(row => (row.getString("persistence_id"), row.getLong("tag_pid_sequence_nr"), row.getUUID("timestamp")))
.runFold(Map.empty[Tag, (TagPidSequenceNr, UUID)]) {
.toMat(Sink.fold(Map.empty[Tag, (TagPidSequenceNr, UUID)]) {
case (acc, (pid, tagPidSequenceNr, timestamp)) =>
val (newTagPidSequenceNr, newTimestamp) = acc.get(pid) match {
case None =>
Expand All @@ -105,7 +108,9 @@ private[akka] class TagViewSequenceNumberScanner(session: Session)(
(currentTagPidSequenceNr, currentTimestamp)
}
acc + (pid -> ((newTagPidSequenceNr, newTimestamp)))
}
})(Keep.right)
.withAttributes(ActorAttributes.dispatcher(pluginDispatcher))
.run()
.flatMap { result =>
if (deadline.hasTimeLeft()) {
doIt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,9 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config)
usingOffset,
initialTagPidSequenceNrs,
scanner))
}.via(deserializeEventsByTagRow).mapMaterializedValue(_ => NotUsed)
}.via(deserializeEventsByTagRow)
.withAttributes(ActorAttributes.dispatcher(queryPluginConfig.pluginDispatcher))
.mapMaterializedValue(_ => NotUsed)

} catch {
case NonFatal(e) =>
Expand All @@ -337,22 +339,21 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config)

private def deserializeEventsByTagRow: Flow[EventsByTagStage.UUIDRow, UUIDPersistentRepr, NotUsed] = {
val deserializeEventAsync = queryPluginConfig.deserializationParallelism > 1
Flow[EventsByTagStage.UUIDRow]
.mapAsync(queryPluginConfig.deserializationParallelism) { uuidRow =>
val row = uuidRow.row
eventsByTagDeserializer.deserializeEvent(row, deserializeEventAsync).map { payload =>
val repr = mapEvent(PersistentRepr(
Flow[EventsByTagStage.UUIDRow].mapAsync(queryPluginConfig.deserializationParallelism) { uuidRow =>
val row = uuidRow.row
eventsByTagDeserializer.deserializeEvent(row, deserializeEventAsync).map { payload =>
val repr = mapEvent(
PersistentRepr(
payload,
sequenceNr = uuidRow.sequenceNr,
persistenceId = uuidRow.persistenceId,
manifest = row.getString("event_manifest"),
deleted = false,
sender = null,
writerUuid = row.getString("writer_uuid")))
UUIDPersistentRepr(uuidRow.offset, uuidRow.tagPidSequenceNr, repr)
}
UUIDPersistentRepr(uuidRow.offset, uuidRow.tagPidSequenceNr, repr)
}
.withAttributes(ActorAttributes.dispatcher(queryPluginConfig.pluginDispatcher))
}
}

private def eventsByTagPrereqs(tag: String, usingOffset: Boolean, fromOffset: UUID)
Expand All @@ -377,7 +378,9 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config)
*/
@InternalApi
private[akka] lazy val tagViewScanner: Future[TagViewSequenceNumberScanner] = preparedSelectTagSequenceNrs.map { ps =>
new TagViewSequenceNumberScanner(TagViewSequenceNumberScanner.Session(session, ps))
new TagViewSequenceNumberScanner(
TagViewSequenceNumberScanner.Session(session, ps),
queryPluginConfig.pluginDispatcher)
}

/**
Expand Down Expand Up @@ -497,7 +500,9 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config)
usingOffset,
initialTagPidSequenceNrs,
scanner))
}.via(deserializeEventsByTagRow).mapMaterializedValue(_ => NotUsed)
}.via(deserializeEventsByTagRow)
.withAttributes(ActorAttributes.dispatcher(queryPluginConfig.pluginDispatcher))
.mapMaterializedValue(_ => NotUsed)

} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -630,7 +635,6 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config)
customRetryPolicy),
queryPluginConfig,
fastForwardEnabled))
.withAttributes(ActorAttributes.dispatcher(queryPluginConfig.pluginDispatcher))
.named(name)
}.mapAsync(queryPluginConfig.deserializationParallelism) { row =>
extractor.extract(row, deserializeEventAsync)
Expand Down Expand Up @@ -706,7 +710,6 @@ class CassandraReadJournal(system: ExtendedActorSystem, cfg: Config)
(s, ps) =>
Source
.fromGraph(new AllPersistenceIdsStage(refreshInterval, queryPluginConfig.fetchSize, ps, s))
.withAttributes(ActorAttributes.dispatcher(queryPluginConfig.pluginDispatcher))
.mapMaterializedValue(_ => NotUsed)
.named(name))
.named(name)).withAttributes(ActorAttributes.dispatcher(queryPluginConfig.pluginDispatcher))
}

0 comments on commit 35ed1ff

Please sign in to comment.