Skip to content

[RFC] [adapters] Concurrent encoders. #5340

@ryzhyk

Description

@ryzhyk

We currently encode pipeline outputs in the output connector thread. While multiple output endpoints can encode in parallel, an individual endpoint runs the encoder in a single thread, which can become a bottleneck when sending a lot of data. Can we have a generic implementation (i.e., one that works for all encoders) to split the work among a pool of worker threads (either regular or tokio threads)?

Background

Here is the current encoder API

pub trait Encoder: Send {
    /// Returns a reference to the consumer that the encoder is connected to.
    fn consumer(&mut self) -> &mut dyn OutputConsumer;

    /// Encode a batch of updates, push encoded buffers to the consumer
    /// using [`OutputConsumer::push_buffer`].
    fn encode(&mut self, batch: &dyn SerBatchReader) -> AnyResult<()>;
}

The encode method encodes the input batch into raw bytes and pushes it to an OutputConsumer. The batch can potentially be very large e.g., containing millions of records when committing a large transaction.

The encode method is invoked by a dedicated thread associated with the output connector. This thread either dequeues batches from the per-connector output queue (one batch per step) or removes them from the output buffer, if one is configured for the connector.

Design

In order to parallelize encode, we need to split the input batch into multiple chunks and distribute these chunks across multiple encoder threads. The Batch API doesn't support splitting a batch into chunks directly, e.g., one cannot take a slice of a batch and send it to a worker thread. It also doesn't support computing the number of unique keys in a batch efficiently (in O(1)).

However it is possible to partition the batch into N approximately equal chunks using sampling. Given N encoder threads,

  • Use BatchReader::sample_keys to pick N^2 random keys from the batch.
  • Split the batch into N chunks: [0,N), [N, 2N), [2N, 3*N)... [(N-1)N, NN)

Now that we have established partition boundaries, we can create N cursors, one per partition, by seeking to the first key in the partition.

To make this work, we'll need to extend the SerBatchReader API:

  • Expose the sample_keys method (available in BatchReader, but not SetBatchReader)
  • Add seek_key method to SerCursor, which currently only supports iterating over all keys.
  • Add Sync bound to SerBatchReader, so we can wrap it in an Arc and send a copy to each worker thread.

The next question is how to collect encoded data from the multiple encoder threads. One option is to have them all push encoded buffers to the OutputConsumer; however this would require us to make the OutputConsumer API thread-safe. Alternatively, we can send encoded buffers back to the connector thread and have it push them to OutputConsumer sequentially.

CursorWithPolarity

The above design works for indexed output streams. Non-indexed output streams require special care to make sure that deletions are output before insertions. We do this using CursorWithPolarity, which iterates over deletes before inserts. We need to figure out how to keep this invariant with parallel encoders. One approach is to split encoding into two phases: deletions and insertions and have all encoder threads synchronize on a global barrier before transitioning from phase 1 to phase 2.

Implementation reuse

The extensions outlined above will be implemented as reusable libraries, but per-connector work will be required to take advantage of them, e.g., we'll need a separate implementation for Avro, JSON, and other formats, as well as integrated connectors like Delta, Postgres, etc.

Metadata

Metadata

Assignees

Labels

connectorsIssues related to the adapters/connectors crateperformanceuser-reportedReported by a user or customer

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions