Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use max-message-batch-size as default in EventsByTagMigration batching, #906 #908

Merged
merged 1 commit into from
Dec 15, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ class EventsByTagMigration(
} yield Done
}

private def periodicFlushBatchSize(periodicFlushParameter: Int): Int = {
if (periodicFlushParameter == 0) eventsByTagSettings.tagWriterSettings.maxBatchSize
else periodicFlushParameter
}

// TODO might be nice to return a summary of what was done rather than just Done

/**
Expand All @@ -132,12 +137,15 @@ class EventsByTagMigration(
*
* It is recommended you use this if the `messages` table is large.
*
* Events are batched with the given `periodicFlush`. By default the value equals
* configured `events-by-tag.max-message-batch-size`.
*
* @param pids PersistenceIds to migrate
* @return A Future that completes when the migration is complete
*/
def migratePidsToTagViews(
pids: Seq[PersistenceId],
periodicFlush: Int = 1000,
periodicFlush: Int = 0,
flushTimeout: Timeout = Timeout(30.seconds)): Future[Done] = {
migrateToTagViewsInternal(Source.fromIterator(() => pids.iterator), periodicFlush, flushTimeout)
}
Expand All @@ -153,12 +161,15 @@ class EventsByTagMigration(
* the version of this method can be used where the persistenceIds are provided.
*
* Persistence ids can be excluded (e.g. useful if you know certain persistenceIds
* don't use tags
* don't use tags.
*
* Events are batched with the given `periodicFlush`. By default the value equals
* configured `events-by-tag.max-message-batch-size`.
*
* @return A Future that completes when the migration is complete.
*/
def migrateToTagViews(
periodicFlush: Int = 1000,
periodicFlush: Int = 0,
filter: String => Boolean = _ => true,
flushTimeout: Timeout = Timeout(30.seconds)): Future[Done] = {
migrateToTagViewsInternal(queries.currentPersistenceIds().filter(filter), periodicFlush, flushTimeout)
Expand Down Expand Up @@ -187,6 +198,8 @@ class EventsByTagMigration(
} yield (tp, startingSeq)
}

val flushBatchSize = periodicFlushBatchSize(periodicFlush)

// would be nice to group these up into a TagWrites message but also
// nice that this reuses the recovery code :-/
Source.futureSource {
Expand All @@ -209,7 +222,7 @@ class EventsByTagMigration(
extractor =
EventsByTagMigration.rawPayloadOldTagSchemaExtractor(eventsByTagSettings.bucketSize, system))
.map(tagRecovery.sendMissingTagWriteRaw(tp, actorRunning = false))
.grouped(periodicFlush)
.grouped(flushBatchSize)
.mapAsync(1)(_ => tagRecovery.flush(timeout))
}
}
Expand Down