diff --git a/docs/design/2022-11-23-batch-cop.md b/docs/design/2022-11-23-batch-cop.md new file mode 100644 index 0000000000000..34f81fea82f46 --- /dev/null +++ b/docs/design/2022-11-23-batch-cop.md @@ -0,0 +1,149 @@ +# Proposal: support batch coprocessor for tikv + +* Authors: [cfzjywxk](https://github.com/cfzjywxk) +* Tracking issue: [39361](https://github.com/pingcap/tidb/issues/39361) + +## Motivation + +The fanout issue in index lookup queries is one cause of increased query latency and cost. If there are +1,000 handles and they are distributed in 1,000 regions, TiDB would construct 1,000 small tasks to retrieve +the 1000 related row contents, even when all the region leaders are in the same store. This results in the following problems: +1. Each task requires a single RPC request, there could be too many tasks or RPC requests though each +request just fetches a few rows. Sometimes the cost of RPC could not be ignored. +2. Increasing task numbers may lead to more queueing. Tuning the related concurrency parameters or task scheduling +policies become more complex and it’s difficult to get best performance. + +In the current coprocessor implementation, key ranges in the same region would be batched in a single +task(there is a hard coded 25000 upper limit), how about batching all the cop tasks which would +be sent to the same store? + +In a user situation, the index range scan returns 4000000 rows, and finally 400000 coprocessor table-lookup +tasks are generated, which means the key ranges are scattered in different regions. + +## Optimization + +### The IndexLookUp Execution Review + +Usually, the IndexLookUp executor may have an index worker which tries to read index keys and related row handles +according to the index filter conditions. Each time it fetches enough row handle data, it would create a +coprocessor table lookup task and send it to the table workers. The handle data size limit for one task could be configured +by the [tidb_index_lookup_size](https://docs.pingcap.com/tidb/dev/system-variables#tidb_index_lookup_size) +system variable. + +When the table worker gets a coprocessor task, it would split the handle ranges according to the region +information from the region cache. Then these region-aware tasks are processed by the coprocessor client +which has a default concurrency limit configured by the [tidb_distsql_scan_concurrency](https://docs.pingcap.com/tidb/dev/system-variables#tidb_distsql_scan_concurrency) system +variable. + +### Batching Strategy + +As coprocessor streaming is already deprecated, bringing it back may not be a good idea. To make the design +simple, we could just do the batching for each coprocessor table task separately. Different coprocessor table +tasks may still require different RPC requests, while row handle ranges within one task could be batched if +their region leaders are in the same store. The main idea is trying to batch sending the tasks using one +RPC for each original `copTask` if the row handle range-related region leaders are located in the same tikv store. + +With the batching optimization, the number of RPC requests may be at most the number of store nodes for each table lookup task +. Consider an extreme case, if the index scan returns 4000000 rows and each task range is one row +, there could be as many as `4000000/25000=160` table lookup tasks each containg 25000 key ranges. But now the RPC number +would become at most `160 * store_numbers`, for example if store_number is 10, the total request number is +1600 which is much less than the previous 400000. + +### Proto Change + +Create a new structure for the batched tasks, including the request `StoreBatchTask` and response `StoreBatchTaskResponse` types. + +```protobuf +message StoreBatchTask { + uint64 region_id = 1; + metapb.RegionEpoch region_epoch = 2; + metapb.Peer peer = 3; + repeated KeyRange ranges = 4; + uint64 task_id = 5; +} +``` + +```protobuf +message StoreBatchTaskResponse { + bytes data = 1 [(gogoproto.customtype) = "github.com/pingcap/kvproto/pkg/sharedbytes.SharedBytes", (gogoproto.nullable) = false]; + errorpb.Error region_error = 2; + kvrpcpb.LockInfo locked = 3; + string other_error = 4; + uint64 task_id = 5; + kvrpcpb.ExecDetailsV2 exec_details_v2 = 6; +} +``` + +Attach the batched tasks into the `Corprocessor` request. Reuse the `RegionInfo` mentioned above to store tasks +in different regions but the same store. +```protobuf +message Request { + … + + // Store the batched tasks belonging to other regions. + repeated StoreBatchTask tasks = 11; +} +``` + +Add batched task results in `Response`, different tasks may encounter different kinds of errors, collect them +together. +```protobuf +message Response { + … + repeated StoreBatchTaskResponse batch_responses = 13; +} +``` + +### The TiDB Side + +Adding a flag in `kv.Request` to indicate if the batch strategy is enabled or not. +```golang +type Request struct { + … + // EnableStoreBatch indicates if the tasks are batched. + EnableStoreBatch bool +} +``` + +Adding batch task related fields in `copr.copTask`. They would be collected when the `copTask` is being +prepared and the store batch is enabled. +```golang +type copTask struct { + … + // + batchTaskList []kvproto.Coprocessor.RegionInfo +} +``` + +When building coprocessor tasks in the `buildCopTasks` function, try to fill the `batchTaskList` if +necessary.The steps are: +1. Creating a map to record `store address => *copTask`.If store batch is enabled, tasks would be appended +to existing `copTask` when the store address is the same. +2. Split the ranges according to the region information as usual. After this, all the tasks correspond +to a single region. +3. When processing a new `KeyLocation`, try to append it as the batch task to the existing coprocessor task +if possible. + +The coprocessor client just sends the tasks as usual, the `Coprocessor` request is still a unary RPC +request though it may be batched. When handling `CopResponse`, if the batch path is enabled and +there are region errors or other errors processing batch tasks, rescheduling the cop tasks or +reporting errors to the upper layer. + +Note if the `keepOrder` is required, the partial query result could not be sent back until all the reads +have succeeded. + + + +### The TiKV Side + +A simple way is to change the logic in `Endpoint.parse_and_handle_unary_request`, after parsing the +original request, the batched task-related builder and handler could be also generated using the input +information from the RPC context, region information, and key ranges as long as they are properly passed in +the `Coprocessor` request. + +All the request handling could be scheduled to the read pool at the same time, +so before finishing something like `join_all` would be needed to wait for all the results of +different tasks. If any error is returned, do fill in the error fields in the `Response`. + +For the execution tracking, creating seperate trackers for the requests, all the execution details would be returned +to the client.