-
Notifications
You must be signed in to change notification settings - Fork 92
Description
We currently encode pipeline outputs in the output connector thread. While multiple output endpoints can encode in parallel, an individual endpoint runs the encoder in a single thread, which can become a bottleneck when sending a lot of data. Can we have a generic implementation (i.e., one that works for all encoders) to split the work among a pool of worker threads (either regular or tokio threads)?
Background
Here is the current encoder API
pub trait Encoder: Send {
/// Returns a reference to the consumer that the encoder is connected to.
fn consumer(&mut self) -> &mut dyn OutputConsumer;
/// Encode a batch of updates, push encoded buffers to the consumer
/// using [`OutputConsumer::push_buffer`].
fn encode(&mut self, batch: &dyn SerBatchReader) -> AnyResult<()>;
}The encode method encodes the input batch into raw bytes and pushes it to an OutputConsumer. The batch can potentially be very large e.g., containing millions of records when committing a large transaction.
The encode method is invoked by a dedicated thread associated with the output connector. This thread either dequeues batches from the per-connector output queue (one batch per step) or removes them from the output buffer, if one is configured for the connector.
Design
In order to parallelize encode, we need to split the input batch into multiple chunks and distribute these chunks across multiple encoder threads. The Batch API doesn't support splitting a batch into chunks directly, e.g., one cannot take a slice of a batch and send it to a worker thread. It also doesn't support computing the number of unique keys in a batch efficiently (in O(1)).
However it is possible to partition the batch into N approximately equal chunks using sampling. Given N encoder threads,
- Use
BatchReader::sample_keysto pick N^2 random keys from the batch. - Split the batch into N chunks: [0,N), [N, 2N), [2N, 3*N)... [(N-1)N, NN)
Now that we have established partition boundaries, we can create N cursors, one per partition, by seeking to the first key in the partition.
To make this work, we'll need to extend the SerBatchReader API:
- Expose the
sample_keysmethod (available inBatchReader, but notSetBatchReader) - Add
seek_keymethod toSerCursor, which currently only supports iterating over all keys. - Add
Syncbound toSerBatchReader, so we can wrap it in an Arc and send a copy to each worker thread.
The next question is how to collect encoded data from the multiple encoder threads. One option is to have them all push encoded buffers to the OutputConsumer; however this would require us to make the OutputConsumer API thread-safe. Alternatively, we can send encoded buffers back to the connector thread and have it push them to OutputConsumer sequentially.
CursorWithPolarity
The above design works for indexed output streams. Non-indexed output streams require special care to make sure that deletions are output before insertions. We do this using CursorWithPolarity, which iterates over deletes before inserts. We need to figure out how to keep this invariant with parallel encoders. One approach is to split encoding into two phases: deletions and insertions and have all encoder threads synchronize on a global barrier before transitioning from phase 1 to phase 2.
Implementation reuse
The extensions outlined above will be implemented as reusable libraries, but per-connector work will be required to take advantage of them, e.g., we'll need a separate implementation for Avro, JSON, and other formats, as well as integrated connectors like Delta, Postgres, etc.