diff --git a/Cargo.lock b/Cargo.lock index 440469dec0..8b97c17978 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4947,8 +4947,10 @@ dependencies = [ name = "feldera-macros" version = "0.221.0" dependencies = [ + "dbsp", "derive_more 1.0.0", "feldera-size-of", + "feldera-sqllib", "prettyplease", "proc-macro2", "quote", @@ -10905,6 +10907,21 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "storage-test-compat" +version = "0.221.0" +dependencies = [ + "dbsp", + "derive_more 1.0.0", + "feldera-macros", + "feldera-size-of", + "feldera-sqllib", + "feldera-types", + "rkyv", + "serde", + "uuid", +] + [[package]] name = "str_stack" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 062e9e4d4e..b94dfc5bd9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ members = [ "crates/rest-api", "crates/ir", "crates/fxp", + "crates/storage-test-compat", ] exclude = [ "sql-to-dbsp-compiler/temp", diff --git a/crates/dbsp/src/dynamic/pair.rs b/crates/dbsp/src/dynamic/pair.rs index 5e494632a6..a36d081dae 100644 --- a/crates/dbsp/src/dynamic/pair.rs +++ b/crates/dbsp/src/dynamic/pair.rs @@ -1,14 +1,12 @@ use std::mem::take; use crate::{ - DBData, declare_trait_object_with_archived, derive_comparison_traits, dynamic::erase::Erase, + DBData, declare_trait_object, + dynamic::erase::Erase, utils::Tup2, }; -use super::{Data, DataTrait, DowncastTrait}; -use crate::utils::ArchivedTup2; - -use crate::dynamic::rkyv::{ArchivedDBData, DeserializeDyn, DeserializeImpl}; +use super::{Data, DataTrait}; /// A dynamically typed interface to `Tup2`. pub trait Pair: Data { @@ -86,80 +84,8 @@ where } } -pub trait ArchivedPair { - fn fst(&self) -> &T1::Archived; - fn snd(&self) -> &T2::Archived; - fn split(&self) -> (&T1::Archived, &T2::Archived); -} - -impl ArchivedPair for ArchivedTup2 -where - T1: DBData + Erase, - T2: DBData + Erase, - Trait1: DataTrait + ?Sized, - Trait2: DataTrait + ?Sized, -{ - fn fst(&self) -> &Trait1::Archived { - >::erase_archived(&self.0) - } - - fn snd(&self) -> &Trait2::Archived { - >::erase_archived(&self.1) - } - - fn split(&self) -> (&Trait1::Archived, &Trait2::Archived) { - ( - >::erase_archived(&self.0), - >::erase_archived(&self.1), - ) - } -} - -impl ArchivedPair for DeserializeImpl +declare_trait_object!(DynPair = dyn Pair where - T: ArchivedDBData + 'static, - T::Archived: ArchivedPair + Ord + Eq, - Trait: Pair + DowncastTrait + ?Sized + 'static, - Trait1: DataTrait + ?Sized, - Trait2: DataTrait + ?Sized, -{ - fn fst(&self) -> &Trait1::Archived { - self.archived.fst() - } - - fn snd(&self) -> &Trait2::Archived { - self.archived.snd() - } - - fn split(&self) -> (&Trait1::Archived, &Trait2::Archived) { - (self.archived.fst(), self.archived.snd()) - } -} - -pub trait ArchivedPairTrait: - ArchivedPair + DeserializeDyn> -{ -} - -impl ArchivedPairTrait for Trait -where - Trait: ArchivedPair + DeserializeDyn> + ?Sized, - T1: DataTrait + ?Sized, - T2: DataTrait + ?Sized, -{ -} - -type DynArchivedPair = dyn ArchivedPairTrait; - -derive_comparison_traits!(DynArchivedPair where Trait1: DataTrait + ?Sized + 'static, Trait2: DataTrait + ?Sized + 'static); -impl DowncastTrait - for DynArchivedPair -{ -} - -declare_trait_object_with_archived!(DynPair = dyn Pair - { type Archived = dyn ArchivedPairTrait} - where T1: DataTrait + ?Sized, T2: DataTrait + ?Sized, ); diff --git a/crates/dbsp/src/dynamic/rkyv.rs b/crates/dbsp/src/dynamic/rkyv.rs index 57b29496ca..aa6fe75a87 100644 --- a/crates/dbsp/src/dynamic/rkyv.rs +++ b/crates/dbsp/src/dynamic/rkyv.rs @@ -3,10 +3,7 @@ use crate::{ derive_comparison_traits, storage::file::{Deserializer, Serializer}, }; -use rkyv::{ - Archive, Archived, Deserialize, Fallible, Serialize, archived_value, - de::deserializers::SharedDeserializeMap, -}; +use rkyv::{Archive, Archived, Deserialize, Fallible, Serialize, archived_value}; use std::{cmp::Ordering, marker::PhantomData, mem::transmute}; /// Trait for DBData that can be deserialized with [`rkyv`]. @@ -42,7 +39,24 @@ pub trait DeserializableDyn { /// /// The offset must store a valid serialized value of the /// concrete type that `self` points to. - unsafe fn deserialize_from_bytes(&mut self, bytes: &[u8], pos: usize); + unsafe fn deserialize_from_bytes_with( + &mut self, + bytes: &[u8], + pos: usize, + deserializer: &mut Deserializer, + ); + + /// Deserialize `self` from the given slice and offset using the default + /// deserializer configuration. + /// + /// # Safety + /// + /// The offset must store a valid serialized value of the + /// concrete type that `self` points to. + unsafe fn deserialize_from_bytes(&mut self, bytes: &[u8], pos: usize) { + let mut deserializer = Deserializer::default(); + unsafe { self.deserialize_from_bytes_with(bytes, pos, &mut deserializer) }; + } } impl SerializeDyn for T @@ -60,20 +74,28 @@ impl DeserializableDyn for T where T: ArchivedDBData, { - unsafe fn deserialize_from_bytes(&mut self, bytes: &[u8], pos: usize) { + unsafe fn deserialize_from_bytes_with( + &mut self, + bytes: &[u8], + pos: usize, + deserializer: &mut Deserializer, + ) { unsafe { let archived: &::Archived = archived_value::(bytes, pos); - *self = archived - .deserialize(&mut SharedDeserializeMap::new()) - .unwrap(); + *self = archived.deserialize(deserializer).unwrap(); } } } /// Object-safe version of the `Deserialize` trait. pub trait DeserializeDyn: AsAny + Comparable { - fn deserialize(&self, target: &mut Trait); + fn deserialize_with(&self, target: &mut Trait, deserializer: &mut Deserializer); + + fn deserialize(&self, target: &mut Trait) { + let mut deserializer = Deserializer::default(); + self.deserialize_with(target, &mut deserializer); + } fn eq_target(&self, target: &Trait) -> bool; fn cmp_target(&self, target: &Trait) -> Option; @@ -143,23 +165,25 @@ where T: ArchivedDBData + Eq + Ord + 'static, Trait: DowncastTrait + ?Sized + 'static, { - fn deserialize(&self, target: &mut Trait) { + fn deserialize_with(&self, target: &mut Trait, deserializer: &mut Deserializer) { *unsafe { target.downcast_mut::() } = self .archived - .deserialize(&mut SharedDeserializeMap::new()) + .deserialize(deserializer) .unwrap() } fn eq_target(&self, other: &Trait) -> bool { + let mut deserializer = Deserializer::default(); self.archived - .deserialize(&mut SharedDeserializeMap::new()) + .deserialize(&mut deserializer) .unwrap() .eq(unsafe { other.downcast::() }) } fn cmp_target(&self, other: &Trait) -> Option { + let mut deserializer = Deserializer::default(); self.archived - .deserialize(&mut SharedDeserializeMap::new()) + .deserialize(&mut deserializer) .unwrap() .partial_cmp(unsafe { other.downcast::() }) } diff --git a/crates/dbsp/src/operator/group/custom_ord.rs b/crates/dbsp/src/operator/group/custom_ord.rs index e8c31ba9d2..285f264ea7 100644 --- a/crates/dbsp/src/operator/group/custom_ord.rs +++ b/crates/dbsp/src/operator/group/custom_ord.rs @@ -1,6 +1,6 @@ -use crate::trace::Deserializable; +use crate::trace::{Deserializable, Deserializer}; use feldera_macros::IsNone; -use rkyv::{Archive, Deserialize, Serialize, de::deserializers::SharedDeserializeMap}; +use rkyv::{Archive, Deserialize, Serialize}; use size_of::SizeOf; use std::{ cmp::Ordering, @@ -162,11 +162,11 @@ where // the Archived (we already know that T::Archived implements Ord). let real_self: T = self .val - .deserialize(&mut SharedDeserializeMap::new()) + .deserialize(&mut Deserializer::default()) .unwrap(); let real_other: T = other .val - .deserialize(&mut SharedDeserializeMap::new()) + .deserialize(&mut Deserializer::default()) .unwrap(); F::cmp(&real_self, &real_other) } diff --git a/crates/dbsp/src/storage/bin/bench.py b/crates/dbsp/src/storage/bin/bench.py deleted file mode 100644 index 07ba1f4559..0000000000 --- a/crates/dbsp/src/storage/bin/bench.py +++ /dev/null @@ -1,133 +0,0 @@ -import subprocess -import plotnine as p9 -import pandas as pd -import humanize as hm - -from io import StringIO - -BACKEND = "Posix" -PATH = "/tmp/feldera-storage-bench" -MEASURE = True - -ONE_MIB = 1024 * 1024 -ONE_GIB = 1024 * 1024 * 1024 - -if True: - # Big config - TOTAL_SIZE = str(256 * ONE_GIB) - THREAD_SHIFT_RANGE = range(0, 6) - BUFFER_SHIFT_RANGE = range(12, 22) -else: - # Small config - TOTAL_SIZE = str(32 * ONE_MIB) - THREAD_SHIFT_RANGE = range(0, 2) - BUFFER_SHIFT_RANGE = range(12, 14) - - -def plot(results): - results["per_thread_file_size"] = results["per_thread_file_size"].map( - lambda x: int(x) - ) - results["threads"] = results["threads"].map(lambda x: int(x)) - results["read_time"] = results["read_time"].map(lambda x: float(x)) - results["write_time"] = results["write_time"].map(lambda x: float(x)) - - results["buffer_size_str"] = results["buffer_size"].map( - lambda x: hm.naturalsize(x, binary=True) - ) - results["buffer_size_str"] = pd.Categorical( - results["buffer_size_str"], - categories=[hm.naturalsize(1 << x, binary=True) for x in BUFFER_SHIFT_RANGE], - ) - - results["Read"] = ( - (results["per_thread_file_size"] * results["threads"]) / ONE_MIB - ) / results["read_time"] - results["Write"] = ( - (results["per_thread_file_size"] * results["threads"]) / ONE_MIB - ) / results["write_time"] - - # Melt the DataFrame to long format - df_long = pd.melt( - results, - id_vars=["buffer_size", "threads", "buffer_size_str"], - value_vars=["Read", "Write"], - var_name="operation", - value_name="tput", - ) - - print(df_long) - - plot = ( - p9.ggplot( - data=df_long, - mapping=p9.aes( - x="threads", - y="tput", - group="buffer_size_str", - color="buffer_size_str", - ), - ) - + p9.labs(y="Throughput [MiB/s]") - + p9.scale_x_continuous( - breaks=[1 << x for x in THREAD_SHIFT_RANGE], name="# Threads" - ) - + p9.theme_538() - + p9.theme( - legend_position="top", - legend_title=p9.element_blank(), - subplots_adjust={"wspace": 0.25}, - ) - + p9.scale_color_brewer(type="qual", palette="Set2") - + p9.geom_point() - + p9.geom_line() - + p9.facet_wrap("~operation", scales="free_y") - ) - plot.save("disk_throughput_plot.png", width=12, height=5, dpi=300, verbose=False) - - -if __name__ == "__main__": - if MEASURE: - results = pd.DataFrame() - for thread_shift in THREAD_SHIFT_RANGE: - for buf_shift in BUFFER_SHIFT_RANGE: - thread_cnt = 1 << thread_shift - buf_size = 1 << buf_shift - per_thread_size = int(int(TOTAL_SIZE) / thread_cnt) - - print( - f"thread_cnt={thread_cnt} buf_size={buf_size} per_thread_size={per_thread_size} backend={BACKEND} path={PATH}" - ) - cmd = [ - "cargo", - "run", - "--release", - "--bin", - "bench", - "--", - "--backend", - BACKEND, - "--per-thread-file-size", - str(per_thread_size), - "--threads", - str(thread_cnt), - "--buffer-size", - str(buf_size), - "--path", - PATH, - "--csv", - ] - result = subprocess.run(cmd, capture_output=True, text=True) - - if result.returncode != 0: - print("Error in subprocess:", result.stderr) - else: - # Parse CSV data into pandas DataFrame - csv_data = pd.read_csv(StringIO(result.stdout)) - results = pd.concat([results, csv_data], ignore_index=True) - - results.to_csv("results.csv", index=False) - else: - results = pd.read_csv("results.csv") - - plot(results) diff --git a/crates/dbsp/src/storage/bin/bench.rs b/crates/dbsp/src/storage/bin/bench.rs deleted file mode 100644 index 947ae70948..0000000000 --- a/crates/dbsp/src/storage/bin/bench.rs +++ /dev/null @@ -1,368 +0,0 @@ -//! A simple CLI app to benchmark different storage backends/scenarios. -//! -//! An example invocation: -//! -//! ```shell -//! cargo run --release --bin bench -- --cache --threads 2 --total-size 4294967296 --path /path/to/disk -//! ``` -//! -//! Run `metrics-observer` in another terminal to see the metrics. -//! -//! There are still some issues with this benchmark to make it useful: -//! - Threads indicate they're done writing but are still writing. - -use libc::timespec; -use std::fs::create_dir_all; -use std::sync::{Arc, Barrier}; -use std::thread; -use std::time::{Duration, Instant}; - -use clap::Parser; - -use feldera_storage::backend::posixio_impl::PosixBackend; -use feldera_storage::backend::{AtomicIncrementOnlyI64, Storage}; -use feldera_storage::buffer_cache::FBuf; - -#[derive(Debug, Clone, Default)] -struct ThreadBenchResult { - read_time: Duration, - write_time: Duration, - cpu_time: Duration, -} - -#[derive(Debug, Clone, Default)] -struct BenchResult { - times: Vec, -} - -fn mean(data: &[f64]) -> Option { - let sum = data.iter().sum::(); - let count = data.len(); - - match count { - positive if positive > 0 => Some(sum / count as f64), - _ => None, - } -} - -fn std_deviation(data: &[f64]) -> Option { - match (mean(data), data.len()) { - (Some(data_mean), count) if count > 0 => { - let variance = data - .iter() - .map(|value| { - let diff = data_mean - *value; - - diff * diff - }) - .sum::() - / count as f64; - - Some(variance.sqrt()) - } - _ => None, - } -} - -impl BenchResult { - fn validate(&self) -> Result<(), String> { - if self.times.is_empty() { - return Err("No results found.".to_string()); - } - assert!(!self.times.is_empty()); - - if self.read_time_std() >= 2.0 { - return Err("Read times are not stable.".to_string()); - } - if self.write_time_std() >= 5.0 { - return Err("Write times are not stable.".to_string()); - } - Ok(()) - } - - fn read_time_std(&self) -> f64 { - std_deviation( - &self - .times - .iter() - .map(|t| t.read_time.as_secs_f64()) - .collect::>(), - ) - .unwrap() - } - - fn write_time_std(&self) -> f64 { - std_deviation( - &self - .times - .iter() - .map(|t| t.write_time.as_secs_f64()) - .collect::>(), - ) - .unwrap() - } - - fn read_time_mean(&self) -> f64 { - mean( - &self - .times - .iter() - .map(|t| t.read_time.as_secs_f64()) - .collect::>(), - ) - .unwrap() - } - - fn write_time_mean(&self) -> f64 { - mean( - &self - .times - .iter() - .map(|t| t.write_time.as_secs_f64()) - .collect::>(), - ) - .unwrap() - } - - fn cpu_time_mean(&self) -> f64 { - mean( - &self - .times - .iter() - .map(|t| t.cpu_time.as_secs_f64()) - .collect::>(), - ) - .unwrap() - } - - fn display(&self, args: Args) { - let read_time = self.read_time_mean(); - let write_time = self.write_time_mean(); - let cpu_time = self.cpu_time_mean(); - const ONE_MIB: f64 = 1024f64 * 1024f64; - - if !args.csv { - if !args.write_only { - println!( - "read: {} MiB/s (mean: {}s, std: {}s)", - ((args.per_thread_file_size * args.threads) as f64 / ONE_MIB) / read_time, - read_time, - self.read_time_std() - ); - } - println!( - "write: {} MiB/s (mean: {}s, std: {}s)", - ((args.per_thread_file_size * args.threads) as f64 / ONE_MIB) / write_time, - write_time, - self.write_time_std() - ); - println!("cpu: {}s (mean))", cpu_time,); - } else { - println!( - "backend,cache,per_thread_file_size,threads,buffer_size,read_time,read_time_std,write_time,write_time_std", - ); - println!( - "{:?},{:?},{},{},{},{},{},{},{}", - args.backend, - args.cache, - args.per_thread_file_size, - args.threads, - args.buffer_size, - read_time, - self.read_time_std(), - write_time, - self.write_time_std(), - ) - } - } -} - -#[derive(Debug, Clone)] -enum Backend { - Posix, -} - -impl From for Backend { - fn from(s: String) -> Self { - match s.as_str() { - "Posix" => Backend::Posix, - _ => panic!("invalid backend"), - } - } -} - -/// Simple program to benchmark files. -/// -/// Spawns multiple threads, each thread writes one file sequentially -/// and then reads it back. -/// -/// The program prints read and write throughput, and the CPU time used by the -/// benchmark threads, which includes system and user time for those threads -/// (but not for other user or kernel threads spawned by them for I/O, if any). -#[derive(Parser, Debug, Clone)] -#[command(author, version)] -struct Args { - /// Path to a file or directory - #[clap(short, long, default_value = "/tmp/feldera-storage")] - path: std::path::PathBuf, - - /// Which backend to use. - #[clap(long, default_value = "Posix")] - backend: Backend, - - /// Number of threads to use - #[clap(long, default_value = "1")] - threads: usize, - - /// Buffer size - #[clap(long, default_value = "4096")] - buffer_size: usize, - - /// Size that is to be written (per-thread) - #[clap(long, default_value = "1073741824")] - per_thread_file_size: usize, - - /// Verify file-operations are performed correctly. - #[clap(long, default_value = "false")] - verify: bool, - - /// Adds a buffer cache with given bytes of capacity. - #[clap(long)] - cache: Option, - - /// Write without reading back? - #[clap(long, default_value = "false")] - write_only: bool, - - /// Print data as CSV. - #[clap(long, default_value = "false")] - csv: bool, -} - -fn allocate_buffer(sz: usize) -> FBuf { - FBuf::with_capacity(sz) -} - -/// Returns the amount of CPU time (user + system) used by the current thread. -/// -/// It was difficult to determine that the result includes both user and system -/// time, so for future reference, see [the original commit] that added support, -/// which includes: -/// -/// ```patch -/// +static inline unsigned long thread_ticks(task_t *p) { -/// + return p->utime + current->stime; -/// +} -/// ``` -/// -/// [the original commit]: https://git.kernel.org/pub/scm/linux/kernel/git/tglx/history.git/commit/?id=bb82e8a53042a91688fd819d0c475a1c9a2b982a -fn thread_cpu_time() -> Duration { - let mut tp = timespec { - tv_sec: 0, - tv_nsec: 0, - }; - unsafe { libc::clock_gettime(libc::CLOCK_THREAD_CPUTIME_ID, &mut tp as *mut timespec) }; - Duration::new(tp.tv_sec as u64, tp.tv_nsec as u32) -} - -fn benchmark(backend: &T, barrier: Arc) -> ThreadBenchResult { - let args = Args::parse(); - let file = backend.create().unwrap(); - - barrier.wait(); - let start_write = Instant::now(); - for i in 0..args.per_thread_file_size / args.buffer_size { - let mut wb = allocate_buffer(args.buffer_size); - wb.resize(args.buffer_size, 0xff); - - debug_assert!(i * args.buffer_size < args.per_thread_file_size); - debug_assert!(wb.len() == args.buffer_size); - backend - .write_block(&file, (i * args.buffer_size) as u64, wb) - .expect("write failed"); - } - let (ih, _path) = backend.complete(file).expect("complete failed"); - ih.commit().unwrap(); - let write_time = start_write.elapsed(); - - barrier.wait(); - let start_read = Instant::now(); - if !args.write_only { - for i in 0..args.per_thread_file_size / args.buffer_size { - let rr = backend - .read_block(&ih, (i * args.buffer_size) as u64, args.buffer_size) - .expect("read failed"); - if args.verify { - assert_eq!(rr.len(), args.buffer_size); - assert_eq!( - rr.iter().as_slice(), - vec![0xffu8; args.buffer_size].as_slice() - ); - } - } - } - let read_time = start_read.elapsed(); - - backend.delete(ih).expect("delete failed"); - ThreadBenchResult { - write_time, - read_time, - cpu_time: thread_cpu_time(), - } -} - -fn posixio_main(args: Args) -> BenchResult { - let counter: Arc = Default::default(); - let barrier = Arc::new(Barrier::new(args.threads)); - // spawn n-1 threads - let threads: Vec<_> = (1..args.threads) - .map(|_| { - let args = args.clone(); - let barrier = barrier.clone(); - let counter = counter.clone(); - thread::spawn(move || { - let barrier = barrier.clone(); - let posixio_backend = PosixBackend::new(args.path.clone(), counter); - benchmark(&posixio_backend, barrier) - }) - }) - .collect(); - - // Run on main thread - let posixio_backend = PosixBackend::new(args.path.clone(), counter); - - let mut br = BenchResult::default(); - let main_res = benchmark(&posixio_backend, barrier); - br.times.push(main_res); - - // Wait for other n-1 threads - threads.into_iter().for_each(|t| { - let tres = t.join().expect("thread panicked"); - br.times.push(tres); - }); - - br -} - -fn main() { - let args = Args::parse(); - assert!(args.per_thread_file_size > 0); - assert!(args.buffer_size > 0); - assert!(args.per_thread_file_size >= args.buffer_size); - assert!(args.threads > 0); - if !args.path.exists() { - create_dir_all(&args.path).expect("failed to create directory"); - } - - let br = match args.backend { - Backend::Posix => posixio_main(args.clone()), - }; - - br.display(args.clone()); - if !args.csv { - if let Err(e) = br.validate() { - println!("Result validation failed: {}", e); - std::process::exit(1); - } - } -} diff --git a/crates/dbsp/src/storage/file.rs b/crates/dbsp/src/storage/file.rs index 08155c5ce2..0e13f2cc28 100644 --- a/crates/dbsp/src/storage/file.rs +++ b/crates/dbsp/src/storage/file.rs @@ -73,6 +73,7 @@ use crate::{ storage::buffer_cache::{FBuf, FBufSerializer}, }; use rkyv::de::deserializers::SharedDeserializeMap; +use rkyv::de::{SharedDeserializeRegistry, SharedPointer}; use rkyv::{ Archive, Archived, Deserialize, Fallible, Serialize, ser::{ @@ -164,7 +165,7 @@ where /// are unknown. /// /// See documentation of [`AnyFactories`]. - pub(crate) fn any_factories(&self) -> AnyFactories { + pub fn any_factories(&self) -> AnyFactories { AnyFactories { key_factory: Arc::new(self.key_factory), item_factory: Arc::new(self.item_factory), @@ -279,7 +280,58 @@ pub type Serializer = CompositeSerializer, DbspScratch, Sha pub type DbspScratch = FallbackScratch, AllocScratch>; /// The particular [`rkyv`] deserializer that we use. -pub type Deserializer = SharedDeserializeMap; +#[derive(Debug)] +pub struct Deserializer { + version: u32, + inner: SharedDeserializeMap, +} + +impl Deserializer { + /// Create a deserializer configured for the given file format version. + pub fn new(version: u32) -> Self { + Self { + version, + inner: SharedDeserializeMap::new(), + } + } + + /// Create a deserializer with a preallocated shared pointer map. + pub fn with_capacity(version: u32, capacity: usize) -> Self { + Self { + version, + inner: SharedDeserializeMap::with_capacity(capacity), + } + } + + /// Return the file format version this deserializer targets. + pub fn version(&self) -> u32 { + self.version + } +} + +impl Default for Deserializer { + fn default() -> Self { + Self::new(format::VERSION_NUMBER) + } +} + +impl Fallible for Deserializer { + type Error = ::Error; +} + +impl SharedDeserializeRegistry for Deserializer { + fn get_shared_ptr(&mut self, ptr: *const u8) -> Option<&dyn SharedPointer> { + self.inner.get_shared_ptr(ptr) + } + + fn add_shared_ptr( + &mut self, + ptr: *const u8, + shared: Box, + ) -> Result<(), Self::Error> { + self.inner.add_shared_ptr(ptr, shared) + } +} /// Creates an instance of [Serializer] that will serialize to `serializer` and /// passes it to `f`. Returns a tuple of the `FBuf` from the [Serializer] and diff --git a/crates/dbsp/src/storage/file/format.rs b/crates/dbsp/src/storage/file/format.rs index 1b35838c8a..556e5a1145 100644 --- a/crates/dbsp/src/storage/file/format.rs +++ b/crates/dbsp/src/storage/file/format.rs @@ -87,7 +87,11 @@ use num_traits::FromPrimitive; use size_of::SizeOf; /// Increment this on each incompatible change. -pub const VERSION_NUMBER: u32 = 3; +/// +/// When a new version is created, make sure to generate new golden +/// files for it in crate `storage-test-compat` to check for +/// backwards compatibility. +pub const VERSION_NUMBER: u32 = 4; /// Magic number for data blocks. pub const DATA_BLOCK_MAGIC: [u8; 4] = *b"LFDB"; diff --git a/crates/dbsp/src/storage/file/reader.rs b/crates/dbsp/src/storage/file/reader.rs index 31f5cafccf..f8e203f7c9 100644 --- a/crates/dbsp/src/storage/file/reader.rs +++ b/crates/dbsp/src/storage/file/reader.rs @@ -3,7 +3,7 @@ //! [`Reader`] is the top-level interface for reading layer files. use super::format::{Compression, FileTrailer}; -use super::{AnyFactories, Factories}; +use super::{AnyFactories, Deserializer, Factories}; use crate::dynamic::{DynVec, WeightTrait}; use crate::storage::buffer_cache::{CacheAccess, CacheEntry}; use crate::storage::file::format::FilterBlock; @@ -463,6 +463,7 @@ where value_map: ValueMapReader, row_groups: Option, first_row: u64, + version: u32, _phantom: PhantomData, } @@ -485,6 +486,7 @@ where raw: Arc, location: BlockLocation, first_row: u64, + version: u32, ) -> Result { let header = DataBlockHeader::read_le(&mut io::Cursor::new(raw.as_slice())).map_err(|e| { @@ -510,6 +512,7 @@ where )?, raw, first_row, + version, _phantom: PhantomData, }) } @@ -519,8 +522,14 @@ where node: &TreeNode, cache: &BufferCache, file_id: FileId, + version: u32, ) -> Result, Error> { - let block = Arc::new(Self::from_raw(raw, node.location, node.rows.start)?); + let block = Arc::new(Self::from_raw( + raw, + node.location, + node.rows.start, + version, + )?); cache.insert(file_id, node.location.offset, block.clone()); Ok(block) } @@ -545,8 +554,13 @@ where ), None => { let block = file.read_block(node.location)?; - let entry = - Self::from_raw_with_cache(block, node, &cache, file.file_handle.file_id())?; + let entry = Self::from_raw_with_cache( + block, + node, + &cache, + file.file_handle.file_id(), + file.version, + )?; (CacheAccess::Miss, entry) } }; @@ -620,8 +634,10 @@ where unsafe fn item(&self, factories: &Factories, index: usize, item: (&mut K, &mut A)) { unsafe { let archived_item = self.archived_item(factories, index); - DeserializeDyn::deserialize(archived_item.fst(), item.0); - DeserializeDyn::deserialize(archived_item.snd(), item.1); + let mut deserializer = Deserializer::new(self.version); + DeserializeDyn::deserialize_with(archived_item.fst(), item.0, &mut deserializer); + let mut deserializer = Deserializer::new(self.version); + DeserializeDyn::deserialize_with(archived_item.snd(), item.1, &mut deserializer); } } unsafe fn item_for_row(&self, factories: &Factories, row: u64, item: (&mut K, &mut A)) { @@ -633,13 +649,15 @@ where unsafe fn key(&self, factories: &Factories, index: usize, key: &mut K) { unsafe { let item = self.archived_item(factories, index); - DeserializeDyn::deserialize(item.fst(), key) + let mut deserializer = Deserializer::new(self.version); + DeserializeDyn::deserialize_with(item.fst(), key, &mut deserializer) } } unsafe fn aux(&self, factories: &Factories, index: usize, aux: &mut A) { unsafe { let item = self.archived_item(factories, index); - DeserializeDyn::deserialize(item.snd(), aux) + let mut deserializer = Deserializer::new(self.version); + DeserializeDyn::deserialize_with(item.snd(), aux, &mut deserializer) } } unsafe fn key_for_row(&self, factories: &Factories, row: u64, key: &mut K) { @@ -824,13 +842,14 @@ where node: &TreeNode, cache: &BufferCache, file_id: FileId, + version: u32, ) -> Result { match node.node_type { NodeType::Data => Ok(Self::Data(DataBlock::from_raw_with_cache( - raw, node, cache, file_id, + raw, node, cache, file_id, version, )?)), NodeType::Index => Ok(Self::Index(IndexBlock::from_raw_with_cache( - raw, node, cache, file_id, + raw, node, cache, file_id, version, )?)), } } @@ -865,6 +884,7 @@ where child_offsets: VarintReader, child_sizes: VarintReader, first_row: u64, + version: u32, _phantom: PhantomData, } @@ -885,6 +905,7 @@ where raw: Arc, location: BlockLocation, first_row: u64, + version: u32, ) -> Result { let header = IndexBlockHeader::read_le(&mut io::Cursor::new(raw.as_slice())).map_err(|e| { @@ -941,6 +962,7 @@ where )?, raw, first_row, + version, _phantom: PhantomData, }) } @@ -950,8 +972,14 @@ where node: &TreeNode, cache: &BufferCache, file_id: FileId, + version: u32, ) -> Result, Error> { - let block = Arc::new(Self::from_raw(raw, node.location, node.rows.start)?); + let block = Arc::new(Self::from_raw( + raw, + node.location, + node.rows.start, + version, + )?); cache.insert(file_id, node.location.offset, block.clone()); Ok(block) } @@ -982,8 +1010,13 @@ where } None => { let block = file.read_block(node.location)?; - let entry = - Self::from_raw_with_cache(block, node, &cache, file.file_handle.file_id())?; + let entry = Self::from_raw_with_cache( + block, + node, + &cache, + file.file_handle.file_id(), + file.version, + )?; (CacheAccess::Miss, entry) } }; @@ -1072,7 +1105,8 @@ where unsafe fn get_bound(&self, index: usize, bound: &mut K) { unsafe { let offset = self.bounds.get(&self.raw, index) as usize; - bound.deserialize_from_bytes(&self.raw, offset) + let mut deserializer = Deserializer::new(self.version); + bound.deserialize_from_bytes_with(&self.raw, offset, &mut deserializer) } } @@ -1318,6 +1352,7 @@ struct ImmutableFileRef { file_handle: Arc, compression: Option, stats: AtomicCacheStats, + version: u32, } impl Debug for ImmutableFileRef { @@ -1339,12 +1374,14 @@ impl ImmutableFileRef { file_handle: Arc, compression: Option, stats: AtomicCacheStats, + version: u32, ) -> Self { Self { cache, file_handle, compression, stats, + version, } } @@ -1511,7 +1548,7 @@ where warn!("{}: reading old format storage file, performance may be reduced due to incompatible Bloom filters", file.path()); Some(false) } - VERSION_NUMBER => Some(true), + x if x >= 3 => Some(true), _ => None, } .ok_or_else(|| CorruptionError::InvalidVersion { @@ -1583,7 +1620,13 @@ where }; Ok(Self { - file: ImmutableFileRef::new(cache, file, file_trailer.compression, stats), + file: ImmutableFileRef::new( + cache, + file, + file_trailer.compression, + stats, + file_trailer.version, + ), columns, bloom_filter, _phantom: PhantomData, diff --git a/crates/dbsp/src/storage/file/reader/bulk_rows.rs b/crates/dbsp/src/storage/file/reader/bulk_rows.rs index 858dace8e9..e317a5f865 100644 --- a/crates/dbsp/src/storage/file/reader/bulk_rows.rs +++ b/crates/dbsp/src/storage/file/reader/bulk_rows.rs @@ -216,7 +216,13 @@ where { let raw = decompress(self.reader.file.compression, node.location, result?)?; let file_id = self.reader.file.file_handle.file_id(); - let tree_block = TreeBlock::from_raw_with_cache(raw, &node, &self.cache, file_id)?; + let tree_block = TreeBlock::from_raw_with_cache( + raw, + &node, + &self.cache, + file_id, + self.reader.file.version, + )?; match tree_block { TreeBlock::Data(data_block) => self.received_data(data_block), TreeBlock::Index(index_block) => self.indexes[level].received(index_block), diff --git a/crates/dbsp/src/storage/file/reader/fetch_indexed_zset.rs b/crates/dbsp/src/storage/file/reader/fetch_indexed_zset.rs index e6c0bf0dca..94f0f4ae35 100644 --- a/crates/dbsp/src/storage/file/reader/fetch_indexed_zset.rs +++ b/crates/dbsp/src/storage/file/reader/fetch_indexed_zset.rs @@ -266,6 +266,7 @@ where &read.node, &self.cache, self.reader.file_handle().file_id(), + self.reader.file.version, ) .unwrap(); self.process_read(&read.keys, tree_block, reads)?; @@ -581,6 +582,7 @@ where &read.node, &self.cache, self.reader.file_handle().file_id(), + self.reader.file.version, ) .unwrap(); self.process_read(read.keys, tree_block, reads)?; diff --git a/crates/dbsp/src/storage/file/reader/fetch_zset.rs b/crates/dbsp/src/storage/file/reader/fetch_zset.rs index da9261960c..a8853bdafe 100644 --- a/crates/dbsp/src/storage/file/reader/fetch_zset.rs +++ b/crates/dbsp/src/storage/file/reader/fetch_zset.rs @@ -193,6 +193,7 @@ where &read.node, &self.cache, self.reader.file_handle().file_id(), + self.reader.file.version, ) .unwrap(); self.process_read(&read.keys, tree_block, reads)?; diff --git a/crates/dbsp/src/storage/file/test.rs b/crates/dbsp/src/storage/file/test.rs index c77463f1cf..b966f65c55 100644 --- a/crates/dbsp/src/storage/file/test.rs +++ b/crates/dbsp/src/storage/file/test.rs @@ -32,6 +32,112 @@ use feldera_types::config::{StorageConfig, StorageOptions}; use rand::{Rng, seq::SliceRandom, thread_rng}; use tempfile::tempdir; +feldera_macros::declare_tuple! { + Tup65< + T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, + T19, T20, T21, T22, T23, T24, T25, T26, T27, T28, T29, T30, T31, T32, T33, T34, T35, + T36, T37, T38, T39, T40, T41, T42, T43, T44, T45, T46, T47, T48, T49, T50, T51, T52, + T53, T54, T55, T56, T57, T58, T59, T60, T61, T62, T63, T64 + > +} + +type OptString = Option; +type Tup65OptString = Tup65< + OptString, OptString, OptString, OptString, OptString, OptString, OptString, OptString, + OptString, OptString, OptString, OptString, OptString, OptString, OptString, OptString, + OptString, OptString, OptString, OptString, OptString, OptString, OptString, OptString, + OptString, OptString, OptString, OptString, OptString, OptString, OptString, OptString, + OptString, OptString, OptString, OptString, OptString, OptString, OptString, OptString, + OptString, OptString, OptString, OptString, OptString, OptString, OptString, OptString, + OptString, OptString, OptString, OptString, OptString, OptString, OptString, OptString, + OptString, OptString, OptString, OptString, OptString, OptString, OptString, OptString, + OptString +>; + +// Map bits to fields in MSB->LSB order so row indices remain lexicographically sorted. +fn bit_set(bits: u128, idx: usize) -> bool { + debug_assert!(idx < 65); + ((bits >> (64 - idx)) & 1) != 0 +} + +fn opt_str(bit: bool) -> Option { + if bit { + Some("abc".to_string()) + } else { + None + } +} + +fn tup65_from_bits(bits: u128) -> Tup65OptString { + Tup65( + opt_str(bit_set(bits, 0)), + opt_str(bit_set(bits, 1)), + opt_str(bit_set(bits, 2)), + opt_str(bit_set(bits, 3)), + opt_str(bit_set(bits, 4)), + opt_str(bit_set(bits, 5)), + opt_str(bit_set(bits, 6)), + opt_str(bit_set(bits, 7)), + opt_str(bit_set(bits, 8)), + opt_str(bit_set(bits, 9)), + opt_str(bit_set(bits, 10)), + opt_str(bit_set(bits, 11)), + opt_str(bit_set(bits, 12)), + opt_str(bit_set(bits, 13)), + opt_str(bit_set(bits, 14)), + opt_str(bit_set(bits, 15)), + opt_str(bit_set(bits, 16)), + opt_str(bit_set(bits, 17)), + opt_str(bit_set(bits, 18)), + opt_str(bit_set(bits, 19)), + opt_str(bit_set(bits, 20)), + opt_str(bit_set(bits, 21)), + opt_str(bit_set(bits, 22)), + opt_str(bit_set(bits, 23)), + opt_str(bit_set(bits, 24)), + opt_str(bit_set(bits, 25)), + opt_str(bit_set(bits, 26)), + opt_str(bit_set(bits, 27)), + opt_str(bit_set(bits, 28)), + opt_str(bit_set(bits, 29)), + opt_str(bit_set(bits, 30)), + opt_str(bit_set(bits, 31)), + opt_str(bit_set(bits, 32)), + opt_str(bit_set(bits, 33)), + opt_str(bit_set(bits, 34)), + opt_str(bit_set(bits, 35)), + opt_str(bit_set(bits, 36)), + opt_str(bit_set(bits, 37)), + opt_str(bit_set(bits, 38)), + opt_str(bit_set(bits, 39)), + opt_str(bit_set(bits, 40)), + opt_str(bit_set(bits, 41)), + opt_str(bit_set(bits, 42)), + opt_str(bit_set(bits, 43)), + opt_str(bit_set(bits, 44)), + opt_str(bit_set(bits, 45)), + opt_str(bit_set(bits, 46)), + opt_str(bit_set(bits, 47)), + opt_str(bit_set(bits, 48)), + opt_str(bit_set(bits, 49)), + opt_str(bit_set(bits, 50)), + opt_str(bit_set(bits, 51)), + opt_str(bit_set(bits, 52)), + opt_str(bit_set(bits, 53)), + opt_str(bit_set(bits, 54)), + opt_str(bit_set(bits, 55)), + opt_str(bit_set(bits, 56)), + opt_str(bit_set(bits, 57)), + opt_str(bit_set(bits, 58)), + opt_str(bit_set(bits, 59)), + opt_str(bit_set(bits, 60)), + opt_str(bit_set(bits, 61)), + opt_str(bit_set(bits, 62)), + opt_str(bit_set(bits, 63)), + opt_str(bit_set(bits, 64)), + ) +} + fn test_buffer_cache() -> Arc { thread_local! { static BUFFER_CACHE: Arc = Arc::new(BufferCache::new(1024 * 1024)); @@ -929,6 +1035,20 @@ fn test_tuple() { }); } +#[test] +fn test_tup65_option_string() { + init_test_logger(); + for_each_compression_type(Parameters::default(), |parameters| { + test_one_column(1_000usize, |row| { + let bits = row as u128 * 2 + 1; + let before = tup65_from_bits(bits - 1); + let key = tup65_from_bits(bits); + let after = tup65_from_bits(bits + 1); + (before, key, after, ()) + }, parameters); + }); +} + #[test] fn test_big_values() { fn v(row: usize) -> Vec { diff --git a/crates/dbsp/src/storage/file/writer.rs b/crates/dbsp/src/storage/file/writer.rs index 437500c4f0..82c395f304 100644 --- a/crates/dbsp/src/storage/file/writer.rs +++ b/crates/dbsp/src/storage/file/writer.rs @@ -311,6 +311,7 @@ impl ColumnWriter { }, &block_writer.cache, block_writer.file_handle.file_id(), + VERSION_NUMBER, ) .unwrap(); @@ -347,6 +348,7 @@ impl ColumnWriter { }, &block_writer.cache, block_writer.file_handle.file_id(), + VERSION_NUMBER, ) .unwrap(); diff --git a/crates/dbsp/src/trace.rs b/crates/dbsp/src/trace.rs index 9367bf4244..8dd01de85a 100644 --- a/crates/dbsp/src/trace.rs +++ b/crates/dbsp/src/trace.rs @@ -69,7 +69,7 @@ pub use ord::{ VecWSetFactories, }; -use rkyv::{Deserialize, archived_root, de::deserializers::SharedDeserializeMap}; +use rkyv::{Deserialize, archived_root}; use crate::{ Error, NumEntries, Timestamp, @@ -89,7 +89,18 @@ pub use layers::Trie; /// `DBData` as a trait bound on types. Conversely, a trait bound of the form /// `B: BatchReader` implies `B::Key: DBData` and `B::Val: DBData`. pub trait DBData: - Default + Clone + Eq + Ord + Hash + SizeOf + Send + Sync + Debug + ArchivedDBData + IsNone + 'static + Default + + Clone + + Eq + + Ord + + Hash + + SizeOf + + Send + + Sync + + Debug + + ArchivedDBData + + IsNone + + 'static { } @@ -105,8 +116,8 @@ impl DBData for T where + Sync + Debug + ArchivedDBData - + IsNone - + 'static /* as ArchivedDBData>::Repr: Ord + PartialOrd, */ + + IsNone + + 'static, { } @@ -129,7 +140,7 @@ pub fn unaligned_deserialize(bytes: &[u8]) -> T { let mut aligned_bytes = FBuf::new(); aligned_bytes.extend_from_slice(bytes); unsafe { archived_root::(&aligned_bytes[..]) } - .deserialize(&mut SharedDeserializeMap::new()) + .deserialize(&mut Deserializer::default()) .unwrap() } diff --git a/crates/dbsp/src/trace/spine_async.rs b/crates/dbsp/src/trace/spine_async.rs index 6726124089..e0548f66dc 100644 --- a/crates/dbsp/src/trace/spine_async.rs +++ b/crates/dbsp/src/trace/spine_async.rs @@ -26,17 +26,14 @@ use crate::{ }, }; -use crate::storage::file::to_bytes; +use crate::storage::file::{Deserializer, to_bytes}; use crate::trace::CommittedSpine; use enum_map::EnumMap; use feldera_storage::{FileCommitter, StoragePath}; use feldera_types::checkpoint::PSpineBatches; use ouroboros::self_referencing; use rand::Rng; -use rkyv::{ - Archive, Archived, Deserialize, Fallible, Serialize, de::deserializers::SharedDeserializeMap, - ser::Serializer, -}; +use rkyv::{Archive, Archived, Deserialize, Fallible, Serialize, ser::Serializer}; use size_of::{Context, SizeOf}; use std::sync::{Arc, MutexGuard}; use std::time::{Duration, Instant}; @@ -1508,7 +1505,7 @@ where let archived = unsafe { rkyv::archived_root::(&content) }; let committed: CommittedSpine = archived - .deserialize(&mut SharedDeserializeMap::new()) + .deserialize(&mut Deserializer::default()) .unwrap(); self.dirty = committed.dirty; self.key_filter = None; diff --git a/crates/dbsp/src/typed_batch.rs b/crates/dbsp/src/typed_batch.rs index 3a534f8a2f..01a633282f 100644 --- a/crates/dbsp/src/typed_batch.rs +++ b/crates/dbsp/src/typed_batch.rs @@ -703,7 +703,7 @@ where #[cfg(test)] #[test] fn test_typedbox_rkyv() { - use rkyv::{archived_value, de::deserializers::SharedDeserializeMap}; + use rkyv::archived_value; let tbox = TypedBox::::new(12345u64); @@ -715,9 +715,7 @@ fn test_typedbox_rkyv() { let archived: & as Archive>::Archived = unsafe { archived_value::>(bytes.as_slice(), 0) }; - let tbox2 = archived - .deserialize(&mut SharedDeserializeMap::new()) - .unwrap(); + let tbox2 = archived.deserialize(&mut Deserializer::default()).unwrap(); assert_eq!(tbox, tbox2); } diff --git a/crates/dbsp/src/utils.rs b/crates/dbsp/src/utils.rs index 65843a978d..c9b8df2d18 100644 --- a/crates/dbsp/src/utils.rs +++ b/crates/dbsp/src/utils.rs @@ -5,7 +5,7 @@ mod consolidation; mod graph; mod is_none; mod sort; -mod tuple; +pub mod tuple; #[cfg(test)] mod vec_ext; @@ -42,7 +42,7 @@ pub use sort::{stable_sort, stable_sort_by}; pub use tuple::{ ArchivedTup0, ArchivedTup1, ArchivedTup2, ArchivedTup3, ArchivedTup4, ArchivedTup5, ArchivedTup6, ArchivedTup7, ArchivedTup8, ArchivedTup9, ArchivedTup10, Tup0, Tup1, Tup2, Tup3, - Tup4, Tup5, Tup6, Tup7, Tup8, Tup9, Tup10, + Tup4, Tup5, Tup6, Tup7, Tup8, Tup9, Tup10, TupleBitmap, TupleFormat, }; // mod unstable_sort; diff --git a/crates/dbsp/src/utils/is_none.rs b/crates/dbsp/src/utils/is_none.rs index e64bb78fd7..ba3db072ba 100644 --- a/crates/dbsp/src/utils/is_none.rs +++ b/crates/dbsp/src/utils/is_none.rs @@ -9,13 +9,30 @@ use std::sync::Arc; use uuid::Uuid; pub trait IsNone { + type Inner; + fn is_none(&self) -> bool; + + fn unwrap_or_self(&self) -> &Self::Inner; + + fn from_inner(inner: Self::Inner) -> Self; } impl IsNone for Option { + type Inner = T; + fn is_none(&self) -> bool { self.is_none() } + + fn unwrap_or_self(&self) -> &Self::Inner { + self.as_ref() + .expect("IsNone::unwrap_or_self called on None") + } + + fn from_inner(inner: Self::Inner) -> Self { + Some(inner) + } } #[macro_export] @@ -23,7 +40,13 @@ macro_rules! never_none { ($($ty:ty),* $(,)?) => { $( impl $crate::utils::IsNone for $ty { + type Inner = $ty; + fn is_none(&self) -> bool { false } + + fn unwrap_or_self(&self) -> &Self::Inner { self } + + fn from_inner(inner: Self::Inner) -> Self { inner } } )* }; @@ -57,7 +80,13 @@ macro_rules! never_none_1 { ($($wrapper:ident),* $(,)?) => { $( impl $crate::utils::IsNone for $wrapper { + type Inner = $wrapper; + fn is_none(&self) -> bool { false } + + fn unwrap_or_self(&self) -> &Self::Inner { self } + + fn from_inner(inner: Self::Inner) -> Self { inner } } )* }; @@ -70,11 +99,17 @@ macro_rules! delegate_is_none { ($($wrapper:ident),* $(,)?) => { $( impl $crate::utils::IsNone for $wrapper { + type Inner = $wrapper; + fn is_none(&self) -> bool { //self.as_ref().is_none() // but for simplicity lets just start by making this always false false } + + fn unwrap_or_self(&self) -> &Self::Inner { self } + + fn from_inner(inner: Self::Inner) -> Self { inner } } )* }; @@ -87,9 +122,15 @@ macro_rules! never_none_tuples { // Entry point: generate up to N elements ($($name:ident),+) => { impl<$($name),+> IsNone for ($($name,)+) { + type Inner = ($($name,)+); + fn is_none(&self) -> bool { false } + + fn unwrap_or_self(&self) -> &Self::Inner { self } + + fn from_inner(inner: Self::Inner) -> Self { inner } } }; } @@ -102,9 +143,15 @@ never_none_tuples!(A, B, C, D, E); never_none_tuples!(A, B, C, D, E, F); impl IsNone for BTreeMap { + type Inner = BTreeMap; + fn is_none(&self) -> bool { false } + + fn unwrap_or_self(&self) -> &Self::Inner { self } + + fn from_inner(inner: Self::Inner) -> Self { inner } } #[cfg(test)] diff --git a/crates/dbsp/src/utils/tuple.rs b/crates/dbsp/src/utils/tuple.rs index d3adcf2d43..990ca7c2fa 100644 --- a/crates/dbsp/src/utils/tuple.rs +++ b/crates/dbsp/src/utils/tuple.rs @@ -7,6 +7,71 @@ use feldera_types::deserialize_without_context; +#[repr(transparent)] +#[derive(Copy, Clone)] +pub struct TupleBitmap { + bytes: [u8; N], +} + +impl Default for TupleBitmap { + fn default() -> Self { + Self { bytes: [0u8; N] } + } +} + +impl TupleBitmap { + #[inline] + pub fn new() -> Self { + Self::default() + } + + #[inline] + pub fn set_none(&mut self, idx: usize) { + debug_assert!(idx < N * 8); + let byte = idx / 8; + let bit = idx % 8; + self.bytes[byte] |= 1u8 << bit; + } + + #[inline] + pub fn is_none(&self, idx: usize) -> bool { + debug_assert!(idx < N * 8); + let byte = idx / 8; + let bit = idx % 8; + (self.bytes[byte] & (1u8 << bit)) != 0 + } + + #[inline] + pub fn count_none(&self, fields: usize) -> usize { + debug_assert!(fields <= N * 8); + let full_bytes = fields / 8; + let rem_bits = fields % 8; + let mut count = 0usize; + let mut i = 0usize; + while i < full_bytes { + count += self.bytes[i].count_ones() as usize; + i += 1; + } + if rem_bits != 0 { + let mask = (1u8 << rem_bits) - 1; + count += (self.bytes[full_bytes] & mask).count_ones() as usize; + } + count + } + + #[inline] + pub fn count_none_before(&self, field_idx: usize) -> usize { + self.count_none(field_idx) + } +} + +#[repr(u8)] +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum TupleFormat { + Sparse = 0, + Dense = 1, +} + // Make sure to also call `dbsp_adapters::deserialize_without_context!` // and `sltsqlvalue::to_sql_row_impl!` for each new tuple type. // Also the compiler currently generates Tup11..Tup* if necessary, diff --git a/crates/feldera-macros/Cargo.toml b/crates/feldera-macros/Cargo.toml index cfa45da1c9..d6acb3a795 100644 --- a/crates/feldera-macros/Cargo.toml +++ b/crates/feldera-macros/Cargo.toml @@ -24,6 +24,8 @@ derive_more = { version = "1.0", features = ["not"] } rkyv = { workspace = true, features = ["std", "size_64", "validation"] } serde = { workspace = true, features = ["derive"] } size-of = { workspace = true } +dbsp = { workspace = true } +feldera-sqllib = { workspace = true } [lib] proc-macro = true diff --git a/crates/feldera-macros/src/lib.rs b/crates/feldera-macros/src/lib.rs index 59e881c29f..2c716e2f39 100644 --- a/crates/feldera-macros/src/lib.rs +++ b/crates/feldera-macros/src/lib.rs @@ -29,10 +29,22 @@ pub fn derive_not_none(item: TokenStream) -> TokenStream { let expanded = quote! { impl #impl_generics ::dbsp::utils::IsNone for #ident #ty_generics #where_clause { + type Inner = Self; + #[inline] fn is_none(&self) -> bool { false } + + #[inline] + fn unwrap_or_self(&self) -> &Self::Inner { + self + } + + #[inline] + fn from_inner(inner: Self::Inner) -> Self { + inner + } } }; diff --git a/crates/feldera-macros/src/tuples.rs b/crates/feldera-macros/src/tuples.rs index 1ee055188f..8f2f981996 100644 --- a/crates/feldera-macros/src/tuples.rs +++ b/crates/feldera-macros/src/tuples.rs @@ -1,4 +1,4 @@ -use proc_macro2::TokenStream as TokenStream2; +use proc_macro2::{Literal, TokenStream as TokenStream2}; use quote::{format_ident, quote}; use syn::{punctuated::Punctuated, Ident, Index, Token}; @@ -18,33 +18,49 @@ pub(super) fn declare_tuple_impl(tuple: TupleDef) -> TokenStream2 { .enumerate() .map(|(idx, _e)| format_ident!("other_t{}", idx)) // Generate lowercase t0, t1, ... .collect::>(); - let archive_bounds = elements - .iter() - .map(|e| quote!(#e: rkyv::Archive, <#e as rkyv::Archive>::Archived: Ord,)) - .map(|e| e.to_string()) - .fold(String::new(), |a, b| format!("{} {}", a, b)); let self_indexes = elements .iter() .enumerate() .map(|(idx, _e)| Index::from(idx)) .collect::>(); let num_elements = elements.len(); - + let use_legacy = num_elements < 8; + let bitmap_bytes = num_elements.div_ceil(8); + let field_ptr_name = format_ident!("{}FieldPtr", name); + let bitmap_ty = quote!(::dbsp::utils::tuple::TupleBitmap<#bitmap_bytes>); + let bitmap_new = quote!(::dbsp::utils::tuple::TupleBitmap::<#bitmap_bytes>::new()); + let format_ty = quote!(::dbsp::utils::tuple::TupleFormat); + let archived_name = format_ident!("Archived{}", name); + let archived_sparse_name = format_ident!("Archived{}Sparse", name); + let archived_dense_name = format_ident!("Archived{}Dense", name); + let archived_v3_name = format_ident!("Archived{}V3", name); + let resolver_name = format_ident!("{}Resolver", name); + let sparse_data_name = format_ident!("{}SparseData", name); + let sparse_resolver_name = format_ident!("{}SparseResolver", name); + let dense_data_name = format_ident!("{}DenseData", name); + let dense_resolver_name = format_ident!("{}DenseResolver", name); // Struct definition - let struct_def = quote! { - #[derive( - Default, Eq, Ord, Clone, Hash, PartialEq, PartialOrd, - derive_more::Neg, - serde::Serialize, serde::Deserialize, - size_of::SizeOf, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize - )] - #[archive_attr( - derive(Ord, Eq, PartialEq, PartialOrd), - )] - #[archive( - bound(archive = #archive_bounds) - )] - pub struct #name<#(#generics),*>( #(pub #generics),* ); + let struct_def = if use_legacy { + quote! { + #[derive( + Default, Eq, Ord, Clone, Hash, PartialEq, PartialOrd, + derive_more::Neg, + serde::Serialize, serde::Deserialize, + size_of::SizeOf, + rkyv::Archive, rkyv::Serialize, rkyv::Deserialize + )] + pub struct #name<#(#generics),*>( #(pub #generics),* ); + } + } else { + quote! { + #[derive( + Default, Eq, Ord, Clone, Hash, PartialEq, PartialOrd, + derive_more::Neg, + serde::Serialize, serde::Deserialize, + size_of::SizeOf + )] + pub struct #name<#(#generics),*>( #(pub #generics),* ); + } }; // Constructor @@ -78,6 +94,12 @@ pub(super) fn declare_tuple_impl(tuple: TupleDef) -> TokenStream2 { }); } + let num_elements_const = quote! { + impl<#(#generics),*> #name<#(#generics),*> { + pub const NUM_ELEMENTS: usize = #num_elements; + } + }; + // Trait implementations let algebra_traits = quote! { impl<#(#generics),*, W> ::dbsp::algebra::MulByRef for #name<#(#generics),*> @@ -184,7 +206,7 @@ pub(super) fn declare_tuple_impl(tuple: TupleDef) -> TokenStream2 { const CONST_NUM_ENTRIES: Option = None;//Some(#num_elements); fn num_entries_shallow(&self) -> usize { - #num_elements + Self::NUM_ELEMENTS } fn num_entries_deep(&self) -> usize { @@ -200,7 +222,735 @@ pub(super) fn declare_tuple_impl(tuple: TupleDef) -> TokenStream2 { let not_an_option = quote! { impl<#(#generics),*> ::dbsp::utils::IsNone for #name<#(#generics),*> { + type Inner = Self; + fn is_none(&self) -> bool { false } + + fn unwrap_or_self(&self) -> &Self::Inner { self } + + fn from_inner(inner: Self::Inner) -> Self { inner } + } + }; + + let v3_archived_struct = quote! { + pub struct #archived_v3_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive,)* + { + #(pub #fields: ::rkyv::Archived<#generics>),* + } + }; + + let v3_deserialize_impl = quote! { + impl ::rkyv::Deserialize<#name<#(#generics),*>, D> + for #archived_v3_name<#(#generics),*> + where + D: ::rkyv::Fallible + ?Sized, + #(#generics: ::rkyv::Archive,)* + #(::rkyv::Archived<#generics>: ::rkyv::Deserialize<#generics, D>,)* + { + #[inline] + fn deserialize( + &self, + deserializer: &mut D, + ) -> Result<#name<#(#generics),*>, D::Error> { + Ok(#name( #(self.#fields.deserialize(deserializer)?),* )) + } + } + }; + + let sparse_get_methods = fields + .iter() + .enumerate() + .zip(generics.iter()) + .map(|((idx, _field), ty)| { + let get_name = format_ident!("get_t{}", idx); + let idx_lit = Index::from(idx); + quote! { + #[inline] + pub fn #get_name( + &self, + ) -> Option<&::rkyv::Archived<<#ty as ::dbsp::utils::IsNone>::Inner>> { + if self.none_bit_set(#idx_lit) { + None + } else { + let ptr_idx = self.idx_for_field(#idx_lit); + debug_assert!(ptr_idx < self.ptrs.len()); + // SAFETY: `ptrs[ptr_idx]` points at the archived field when the bit is clear. + Some(unsafe { + &*self + .ptrs + .as_slice() + .get_unchecked(ptr_idx) + .as_ptr() + .cast::<::rkyv::Archived<<#ty as ::dbsp::utils::IsNone>::Inner>>() + }) + } + } + } + }); + + let dense_get_methods = + fields + .iter() + .enumerate() + .zip(generics.iter()) + .map(|((idx, _field), ty)| { + let get_name = format_ident!("get_t{}", idx); + let idx_lit = Index::from(idx); + let field = format_ident!("t{}", idx); + quote! { + #[inline] + pub fn #get_name( + &self, + ) -> Option<&::rkyv::Archived<<#ty as ::dbsp::utils::IsNone>::Inner>> { + if self.none_bit_set(#idx_lit) { + None + } else { + Some(unsafe { &*self.#field.as_ptr() }) + } + } + } + }); + + let archived_get_methods = + fields + .iter() + .enumerate() + .zip(generics.iter()) + .map(|((idx, _field), ty)| { + let get_name = format_ident!("get_t{}", idx); + quote! { + #[inline] + pub fn #get_name( + &self, + ) -> Option<&::rkyv::Archived<<#ty as ::dbsp::utils::IsNone>::Inner>> { + match self.format { + #format_ty::Sparse => self.sparse().#get_name(), + #format_ty::Dense => self.dense().#get_name(), + } + } + } + }); + + let eq_checks = fields.iter().enumerate().map(|(idx, _)| { + let get_name = format_ident!("get_t{}", idx); + quote!(self.#get_name() == other.#get_name()) + }); + + let cmp_checks = fields.iter().enumerate().map(|(idx, _)| { + let get_name = format_ident!("get_t{}", idx); + quote! { + let cmp = self.#get_name().cmp(&other.#get_name()); + if cmp != core::cmp::Ordering::Equal { + return cmp; + } + } + }); + + let legacy_eq_checks = self_indexes + .iter() + .map(|idx| quote!(self.#idx == other.#idx)); + + let legacy_cmp_checks = self_indexes.iter().map(|idx| { + quote! { + let cmp = self.#idx.cmp(&other.#idx); + if cmp != core::cmp::Ordering::Equal { + return cmp; + } + } + }); + + let legacy_archived_ord_impls = quote! { + impl<#(#generics),*> core::cmp::PartialEq for #archived_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive,)* + #(::rkyv::Archived<#generics>: core::cmp::PartialEq,)* + { + #[inline] + fn eq(&self, other: &Self) -> bool { + true #(&& #legacy_eq_checks)* + } + } + + impl<#(#generics),*> core::cmp::Eq for #archived_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive,)* + #(::rkyv::Archived<#generics>: core::cmp::Eq,)* + {} + + impl<#(#generics),*> core::cmp::PartialOrd for #archived_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive,)* + #(::rkyv::Archived<#generics>: core::cmp::Ord,)* + { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl<#(#generics),*> core::cmp::Ord for #archived_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive,)* + #(::rkyv::Archived<#generics>: core::cmp::Ord,)* + { + #[inline] + fn cmp(&self, other: &Self) -> core::cmp::Ordering { + #(#legacy_cmp_checks)* + core::cmp::Ordering::Equal + } + } + }; + + let choose_format_fields = + fields + .iter() + .enumerate() + .zip(self_indexes.iter()) + .map(|((idx, _), self_idx)| { + let idx_lit = Index::from(idx); + quote! { + if ::dbsp::utils::IsNone::is_none(&self.#self_idx) { + bitmap.set_none(#idx_lit); + } + } + }); + + let sparse_serialize_fields = + fields + .iter() + .enumerate() + .zip(self_indexes.iter()) + .map(|((idx, _), self_idx)| { + let idx_lit = Index::from(idx); + quote! { + if !self.bitmap.is_none(#idx_lit) { + let pos = serializer.serialize_value( + ::dbsp::utils::IsNone::unwrap_or_self(&self.value.#self_idx), + )?; + ptrs[ptrs_len] = #field_ptr_name { pos }; + ptrs_len += 1; + } + } + }); + + let dense_serialize_fields = + fields + .iter() + .enumerate() + .zip(self_indexes.iter()) + .map(|((idx, _), self_idx)| { + let idx_lit = Index::from(idx); + let field_name = format_ident!("t{}", idx); + quote! { + let #field_name = if self.bitmap.is_none(#idx_lit) { + None + } else { + Some(::rkyv::Serialize::serialize( + ::dbsp::utils::IsNone::unwrap_or_self(&self.value.#self_idx), + serializer, + )?) + }; + } + }); + + let dense_resolve_fields = + fields + .iter() + .enumerate() + .zip(self_indexes.iter()) + .zip(generics.iter()) + .map(|(((idx, _), self_idx), ty)| { + let field_name = format_ident!("t{}", idx); + let field_out = format_ident!("{}_out", field_name); + quote! { + let (fp, fo) = ::rkyv::out_field!(out.#field_name); + let #field_out = fo + .cast::::Inner>>>(); + if let Some(resolver) = resolver.#field_name { + let inner = ::dbsp::utils::IsNone::unwrap_or_self(&self.value.#self_idx); + <<#ty as ::dbsp::utils::IsNone>::Inner as ::rkyv::Archive>::resolve( + inner, + pos + fp, + resolver, + #field_out + .cast::<::rkyv::Archived<<#ty as ::dbsp::utils::IsNone>::Inner>>(), + ); + } else { + core::ptr::write(#field_out, core::mem::MaybeUninit::zeroed()); + } + } + }); + + let sparse_deserialize_fields = + fields + .iter() + .enumerate() + .zip(generics.iter()) + .map(|((idx, _), ty)| { + let field_name = format_ident!("t{}", idx); + let idx_lit = Index::from(idx); + quote! { + let #field_name = if sparse.none_bit_set(#idx_lit) { + #ty::default() + } else { + let archived: &::rkyv::Archived<<#ty as ::dbsp::utils::IsNone>::Inner> = unsafe { + &*sparse + .ptrs + .as_slice() + .get_unchecked(ptr_idx) + .as_ptr() + .cast::<::rkyv::Archived<<#ty as ::dbsp::utils::IsNone>::Inner>>() + }; + ptr_idx += 1; + let inner = archived.deserialize(deserializer)?; + <#ty as ::dbsp::utils::IsNone>::from_inner(inner) + }; + } + }); + + let dense_deserialize_fields = + fields + .iter() + .enumerate() + .zip(generics.iter()) + .map(|((idx, _), ty)| { + let field_name = format_ident!("t{}", idx); + let idx_lit = Index::from(idx); + let get_name = format_ident!("get_t{}", idx); + let expect_msg = + Literal::string(&format!("{}: missing field {}", archived_dense_name, idx)); + quote! { + let #field_name = if dense.none_bit_set(#idx_lit) { + #ty::default() + } else { + let archived = dense.#get_name().expect(#expect_msg); + let inner = archived.deserialize(deserializer)?; + <#ty as ::dbsp::utils::IsNone>::from_inner(inner) + }; + } + }); + + let dense_resolver_fields = + fields + .iter() + .enumerate() + .zip(generics.iter()) + .map(|((idx, _), ty)| { + let field_name = format_ident!("t{}", idx); + quote! { + #field_name: Option<<<#ty as ::dbsp::utils::IsNone>::Inner as ::rkyv::Archive>::Resolver>, + } + }); + + let dense_resolver_inits = fields.iter().enumerate().map(|(idx, _)| { + let field_name = format_ident!("t{}", idx); + quote!(#field_name) + }); + + let choose_format_impl = quote! { + impl<#(#generics),*> #name<#(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + { + #[inline] + fn choose_format(&self) -> (#bitmap_ty, #format_ty) { + let mut bitmap = #bitmap_new; + #(#choose_format_fields)* + + let none_bits = bitmap.count_none(Self::NUM_ELEMENTS); + if none_bits * 3 > Self::NUM_ELEMENTS { + (bitmap, #format_ty::Sparse) + } else { + (bitmap, #format_ty::Dense) + } + } + } + }; + + let rkyv_impls = quote! { + #[derive(Copy, Clone)] + struct #field_ptr_name { + pos: usize, + } + + impl ::rkyv::Archive for #field_ptr_name { + type Archived = ::rkyv::rel_ptr::RawRelPtrI32; + type Resolver = usize; + + #[inline] + unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) { + ::rkyv::rel_ptr::RawRelPtrI32::emplace(pos, resolver, out); + } + } + + impl ::rkyv::Serialize for #field_ptr_name + where + S: ::rkyv::ser::Serializer + ?Sized, + { + #[inline] + fn serialize(&self, _serializer: &mut S) -> Result { + Ok(self.pos) + } + } + + #[repr(C)] + pub struct #archived_sparse_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + { + bitmap: #bitmap_ty, + ptrs: ::rkyv::vec::ArchivedVec<::rkyv::rel_ptr::RawRelPtrI32>, + _phantom: core::marker::PhantomData (#(#generics),*)>, + } + + #[repr(C)] + pub struct #archived_dense_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + { + bitmap: #bitmap_ty, + #( + #fields: core::mem::MaybeUninit<::rkyv::Archived<<#generics as ::dbsp::utils::IsNone>::Inner>>, + )* + _phantom: core::marker::PhantomData (#(#generics),*)>, + } + + #[repr(C)] + pub struct #archived_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + { + format: #format_ty, + data: ::rkyv::rel_ptr::RawRelPtrI32, + _phantom: core::marker::PhantomData (#(#generics),*)>, + } + + impl<#(#generics),*> #archived_sparse_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + { + #[inline] + fn none_bit_set(&self, idx: usize) -> bool { + debug_assert!(idx < #num_elements); + self.bitmap.is_none(idx) + } + + #[inline] + fn idx_for_field(&self, field_idx: usize) -> usize { + debug_assert!(field_idx < #num_elements); + field_idx - self.bitmap.count_none_before(field_idx) + } + + #(#sparse_get_methods)* + } + + impl<#(#generics),*> #archived_dense_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + { + #[inline] + fn none_bit_set(&self, idx: usize) -> bool { + debug_assert!(idx < #num_elements); + self.bitmap.is_none(idx) + } + + #(#dense_get_methods)* + } + + impl<#(#generics),*> #archived_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + { + #[inline] + fn sparse(&self) -> &#archived_sparse_name<#(#generics),*> { + unsafe { &*self.data.as_ptr().cast::<#archived_sparse_name<#(#generics),*>>() } + } + + #[inline] + fn dense(&self) -> &#archived_dense_name<#(#generics),*> { + unsafe { &*self.data.as_ptr().cast::<#archived_dense_name<#(#generics),*>>() } + } + + #(#archived_get_methods)* + } + + impl<#(#generics),*> core::cmp::PartialEq for #archived_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + #(::rkyv::Archived<<#generics as ::dbsp::utils::IsNone>::Inner>: core::cmp::PartialEq,)* + { + #[inline] + fn eq(&self, other: &Self) -> bool { + true #(&& #eq_checks)* + } + } + + impl<#(#generics),*> core::cmp::Eq for #archived_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + #(::rkyv::Archived<<#generics as ::dbsp::utils::IsNone>::Inner>: core::cmp::Eq,)* + {} + + impl<#(#generics),*> core::cmp::PartialOrd for #archived_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + #(::rkyv::Archived<<#generics as ::dbsp::utils::IsNone>::Inner>: core::cmp::Ord,)* + { + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl<#(#generics),*> core::cmp::Ord for #archived_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + #(::rkyv::Archived<<#generics as ::dbsp::utils::IsNone>::Inner>: core::cmp::Ord,)* + { + #[inline] + fn cmp(&self, other: &Self) -> core::cmp::Ordering { + #(#cmp_checks)* + core::cmp::Ordering::Equal + } + } + + struct #sparse_data_name<'a, #(#generics),*> { + bitmap: #bitmap_ty, + value: &'a #name<#(#generics),*>, + } + + struct #sparse_resolver_name { + bitmap: #bitmap_ty, + ptrs_resolver: ::rkyv::vec::VecResolver, + ptrs_len: usize, + } + + impl<'a, #(#generics),*> ::rkyv::Archive for #sparse_data_name<'a, #(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + { + type Archived = #archived_sparse_name<#(#generics),*>; + type Resolver = #sparse_resolver_name; + + #[inline] + unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) { + let (_fp, fo) = ::rkyv::out_field!(out.bitmap); + fo.write(resolver.bitmap); + + let (fp, fo) = ::rkyv::out_field!(out.ptrs); + let vec_pos = pos + fp; + ::rkyv::vec::ArchivedVec::<::rkyv::rel_ptr::RawRelPtrI32>::resolve_from_len( + resolver.ptrs_len, + vec_pos, + resolver.ptrs_resolver, + fo, + ); + + let (_fp, fo) = ::rkyv::out_field!(out._phantom); + fo.write(core::marker::PhantomData); + } + } + + impl<'a, S, #(#generics),*> ::rkyv::Serialize for #sparse_data_name<'a, #(#generics),*> + where + S: ::rkyv::ser::Serializer + ::rkyv::ser::ScratchSpace + ?Sized, + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive + ::rkyv::Serialize,)* + { + #[inline] + fn serialize(&self, serializer: &mut S) -> Result { + let mut ptrs: [#field_ptr_name; #num_elements] = + [#field_ptr_name { pos: 0 }; #num_elements]; + let mut ptrs_len = 0usize; + + #(#sparse_serialize_fields)* + + let ptrs_resolver = + ::rkyv::vec::ArchivedVec::<::rkyv::rel_ptr::RawRelPtrI32>::serialize_from_slice( + &ptrs[..ptrs_len], + serializer, + )?; + + Ok(#sparse_resolver_name { + bitmap: self.bitmap, + ptrs_resolver, + ptrs_len, + }) + } + } + + struct #dense_data_name<'a, #(#generics),*> { + bitmap: #bitmap_ty, + value: &'a #name<#(#generics),*>, + } + + struct #dense_resolver_name<#(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + { + bitmap: #bitmap_ty, + #(#dense_resolver_fields)* + } + + impl<'a, #(#generics),*> ::rkyv::Archive for #dense_data_name<'a, #(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + { + type Archived = #archived_dense_name<#(#generics),*>; + type Resolver = #dense_resolver_name<#(#generics),*>; + + #[inline] + unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) { + let (_fp, fo) = ::rkyv::out_field!(out.bitmap); + fo.write(resolver.bitmap); + + #(#dense_resolve_fields)* + + let (_fp, fo) = ::rkyv::out_field!(out._phantom); + fo.write(core::marker::PhantomData); + } + } + + impl<'a, S, #(#generics),*> ::rkyv::Serialize for #dense_data_name<'a, #(#generics),*> + where + S: ::rkyv::ser::Serializer + ::rkyv::ser::ScratchSpace + ?Sized, + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive + ::rkyv::Serialize,)* + { + #[inline] + fn serialize(&self, serializer: &mut S) -> Result { + #(#dense_serialize_fields)* + Ok(#dense_resolver_name { + bitmap: self.bitmap, + #(#dense_resolver_inits),* + }) + } + } + + pub enum #resolver_name<#(#generics),*> { + Sparse { data_pos: usize }, + Dense { data_pos: usize }, + _Phantom(core::marker::PhantomData<(#(#generics),*)>), + } + + impl<#(#generics),*> ::rkyv::Archive for #name<#(#generics),*> + where + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + { + type Archived = #archived_name<#(#generics),*>; + type Resolver = #resolver_name<#(#generics),*>; + + #[inline] + unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) { + let (_fp, format_out) = ::rkyv::out_field!(out.format); + let (fp, data_out) = ::rkyv::out_field!(out.data); + let data_out = data_out.cast::<::rkyv::rel_ptr::RawRelPtrI32>(); + + match resolver { + #resolver_name::Sparse { data_pos } => { + format_out.write(#format_ty::Sparse); + ::rkyv::rel_ptr::RawRelPtrI32::emplace(pos + fp, data_pos, data_out); + } + #resolver_name::Dense { data_pos } => { + format_out.write(#format_ty::Dense); + ::rkyv::rel_ptr::RawRelPtrI32::emplace(pos + fp, data_pos, data_out); + } + #resolver_name::_Phantom(_) => unreachable!(), + } + + let (_fp, fo) = ::rkyv::out_field!(out._phantom); + fo.write(core::marker::PhantomData); + } + } + + impl ::rkyv::Serialize for #name<#(#generics),*> + where + S: ::rkyv::ser::Serializer + ::rkyv::ser::ScratchSpace + ?Sized, + #(#generics: ::rkyv::Archive + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive + ::rkyv::Serialize,)* + { + #[inline] + fn serialize(&self, serializer: &mut S) -> Result { + let (bitmap, format) = self.choose_format(); + match format { + #format_ty::Dense => { + let data = #dense_data_name { + bitmap, + value: self, + }; + let data_pos = serializer.serialize_value(&data)?; + Ok(#resolver_name::Dense { data_pos }) + } + #format_ty::Sparse => { + let data = #sparse_data_name { + bitmap, + value: self, + }; + let data_pos = serializer.serialize_value(&data)?; + Ok(#resolver_name::Sparse { data_pos }) + } + } + } + } + + impl ::rkyv::Deserialize<#name<#(#generics),*>, D> + for #archived_name<#(#generics),*> + where + D: ::rkyv::Fallible + ::core::any::Any, + #(#generics: ::rkyv::Archive + Default + ::dbsp::utils::IsNone,)* + #(<#generics as ::dbsp::utils::IsNone>::Inner: ::rkyv::Archive,)* + #(::rkyv::Archived<#generics>: ::rkyv::Deserialize<#generics, D>,)* + #(::rkyv::Archived<<#generics as ::dbsp::utils::IsNone>::Inner>: + ::rkyv::Deserialize<<#generics as ::dbsp::utils::IsNone>::Inner, D>,)* + { + #[inline] + fn deserialize(&self, deserializer: &mut D) -> Result<#name<#(#generics),*>, D::Error> { + let version = (deserializer as &mut dyn ::core::any::Any) + .downcast_mut::<::dbsp::storage::file::Deserializer>() + .map(|deserializer| deserializer.version()) + .expect("passed wrong deserializer"); + if version <= 3 { + // SAFETY: Before V4 files store tuples in the naive (standard rkyv) form that + // does not have a bitfield to optimize None values. + let legacy = unsafe { + &*(self as *const _ as *const #archived_v3_name<#(#generics),*>) + }; + return <#archived_v3_name<#(#generics),*> as ::rkyv::Deserialize< + #name<#(#generics),*>, + D, + >>::deserialize(legacy, deserializer); + } + match self.format { + #format_ty::Sparse => { + let sparse = self.sparse(); + let mut ptr_idx = 0usize; + #(#sparse_deserialize_fields)* + Ok(#name( #(#fields),* )) + } + #format_ty::Dense => { + let dense = self.dense(); + #(#dense_deserialize_fields)* + Ok(#name( #(#fields),* )) + } + } + } } }; @@ -224,10 +974,25 @@ pub(super) fn declare_tuple_impl(tuple: TupleDef) -> TokenStream2 { } }; + let rkyv_blocks = if use_legacy { + quote! { + #legacy_archived_ord_impls + } + } else { + quote! { + #v3_archived_struct + #v3_deserialize_impl + #choose_format_impl + #rkyv_impls + } + }; + expanded.extend(quote! { #struct_def #constructor #getter_setter + #num_elements_const + #rkyv_blocks #algebra_traits #conversion_traits #num_entries_impl @@ -280,9 +1045,14 @@ mod tests { let expanded = declare_tuple_impl(tuple); let parsed_file: syn::File = syn::parse2(expanded).expect("Failed to parse output"); let formatted = prettyplease::unparse(&parsed_file); - println!("{formatted}"); - assert!(formatted.contains("pub struct Tup1")); + + let tuple: TupleDef = syn::parse2(quote!(Tup2)).expect("Failed to parse TupleDef"); + let expanded = declare_tuple_impl(tuple); + let parsed_file: syn::File = syn::parse2(expanded).expect("Failed to parse output"); + let formatted = prettyplease::unparse(&parsed_file); + println!("{formatted}"); + assert!(formatted.contains("pub struct Tup2")); } } diff --git a/crates/feldera-macros/tests/compile.rs b/crates/feldera-macros/tests/compile.rs new file mode 100644 index 0000000000..b23cbdbb6c --- /dev/null +++ b/crates/feldera-macros/tests/compile.rs @@ -0,0 +1,1639 @@ +#![allow(unused)] + +use dbsp::utils::IsNone; +use feldera_macros::IsNone; +use rkyv::{out_field, Archived}; +use std::sync::OnceLock; + +#[derive( + Default, + Eq, + Ord, + Clone, + Hash, + PartialEq, + PartialOrd, + derive_more::Neg, + serde::Serialize, + serde::Deserialize, + size_of::SizeOf, +)] +pub struct Tup1(pub T0); +impl Tup1 { + #[allow(clippy::too_many_arguments)] + pub fn new(t0: T0) -> Self { + Self(t0) + } +} +impl Tup1 { + pub const NUM_ELEMENTS: usize = 1; + + #[inline] + pub fn get_0(&self) -> &T0 { + &self.0 + } + #[inline] + pub fn get_0_mut(&mut self) -> &mut T0 { + &mut self.0 + } +} + +type TupBitmap = ::dbsp::utils::tuple::TupleBitmap; +type TupFormat = ::dbsp::utils::tuple::TupleFormat; +impl Tup1 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, +{ + #[inline] + fn choose_format(&self) -> (TupBitmap<1>, TupFormat) { + let mut bitmap = TupBitmap::<1>::new(); + if ::dbsp::utils::IsNone::is_none(&self.0) { + bitmap.set_none(0); + } + + let none_bits = bitmap.count_none(Self::NUM_ELEMENTS); + if none_bits * 3 > Self::NUM_ELEMENTS { + (bitmap, TupFormat::Sparse) + } else { + (bitmap, TupFormat::Dense) + } + } +} +pub struct ArchivedTup1V3 +where + T0: ::rkyv::Archive, +{ + pub t0: ::rkyv::Archived, +} +impl ::rkyv::Deserialize, D> for ArchivedTup1V3 +where + D: ::rkyv::Fallible + ?Sized, + T0: ::rkyv::Archive, + ::rkyv::Archived: ::rkyv::Deserialize, +{ + #[inline] + fn deserialize(&self, deserializer: &mut D) -> Result, D::Error> { + Ok(Tup1(self.t0.deserialize(deserializer)?)) + } +} +#[derive(Copy, Clone)] +struct Tup1FieldPtr { + pos: usize, +} +impl ::rkyv::Archive for Tup1FieldPtr { + type Archived = ::rkyv::rel_ptr::RawRelPtrI32; + type Resolver = usize; + #[inline] + unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) { + ::rkyv::rel_ptr::RawRelPtrI32::emplace(pos, resolver, out); + } +} +impl ::rkyv::Serialize for Tup1FieldPtr +where + S: ::rkyv::ser::Serializer + ?Sized, +{ + #[inline] + fn serialize(&self, _serializer: &mut S) -> Result { + Ok(self.pos) + } +} +#[repr(C)] +pub struct ArchivedTup1Sparse +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, +{ + bitmap: TupBitmap<1>, + ptrs: ::rkyv::vec::ArchivedVec<::rkyv::rel_ptr::RawRelPtrI32>, + _phantom: core::marker::PhantomData (T0)>, +} + +#[repr(C)] +pub struct ArchivedTup1Dense +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, +{ + bitmap: TupBitmap<1>, + t0: core::mem::MaybeUninit<::rkyv::Archived<::Inner>>, + _phantom: core::marker::PhantomData (T0)>, +} + +#[repr(C)] +pub struct ArchivedTup1 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, +{ + format: TupFormat, + data: ::rkyv::rel_ptr::RawRelPtrI32, + _phantom: core::marker::PhantomData (T0)>, +} + +impl ArchivedTup1Sparse +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, +{ + #[inline] + fn none_bit_set(&self, idx: usize) -> bool { + debug_assert!(idx < Tup1::::NUM_ELEMENTS); + self.bitmap.is_none(idx) + } + + #[inline] + fn idx_for_field(&self, field_idx: usize) -> usize { + debug_assert!(field_idx < Tup1::::NUM_ELEMENTS); + field_idx - self.bitmap.count_none_before(field_idx) + } + + #[inline] + pub fn get_t0(&self) -> Option<&::rkyv::Archived<::Inner>> { + if self.none_bit_set(0) { + None + } else { + let ptr_idx = self.idx_for_field(0); + debug_assert!(ptr_idx < self.ptrs.len()); + Some(unsafe { + &*self + .ptrs + .as_slice() + .get_unchecked(ptr_idx) + .as_ptr() + .cast::<::rkyv::Archived<::Inner>>() + }) + } + } +} + +impl ArchivedTup1Dense +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, +{ + #[inline] + fn none_bit_set(&self, idx: usize) -> bool { + debug_assert!(idx < Tup1::::NUM_ELEMENTS); + self.bitmap.is_none(idx) + } + + #[inline] + pub fn get_t0(&self) -> Option<&::rkyv::Archived<::Inner>> { + if self.none_bit_set(0) { + None + } else { + Some(unsafe { &*self.t0.as_ptr() }) + } + } +} + +impl ArchivedTup1 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, +{ + #[inline] + fn sparse(&self) -> &ArchivedTup1Sparse { + unsafe { &*self.data.as_ptr().cast::>() } + } + + #[inline] + fn dense(&self) -> &ArchivedTup1Dense { + unsafe { &*self.data.as_ptr().cast::>() } + } + + #[inline] + pub fn get_t0(&self) -> Option<&::rkyv::Archived<::Inner>> { + match self.format { + TupFormat::Sparse => self.sparse().get_t0(), + TupFormat::Dense => self.dense().get_t0(), + } + } +} +impl core::cmp::PartialEq for ArchivedTup1 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::rkyv::Archived<::Inner>: core::cmp::PartialEq, +{ + #[inline] + fn eq(&self, other: &Self) -> bool { + true && self.get_t0() == other.get_t0() + } +} +impl core::cmp::Eq for ArchivedTup1 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::rkyv::Archived<::Inner>: core::cmp::Eq, +{ +} +impl core::cmp::PartialOrd for ArchivedTup1 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::rkyv::Archived<::Inner>: core::cmp::Ord, +{ + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} +impl core::cmp::Ord for ArchivedTup1 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::rkyv::Archived<::Inner>: core::cmp::Ord, +{ + #[inline] + fn cmp(&self, other: &Self) -> core::cmp::Ordering { + let cmp = self.get_t0().cmp(&other.get_t0()); + if cmp != core::cmp::Ordering::Equal { + return cmp; + } + core::cmp::Ordering::Equal + } +} +struct Tup1SparseData<'a, T0> { + bitmap: TupBitmap<1>, + value: &'a Tup1, +} + +struct Tup1SparseResolver { + bitmap: TupBitmap<1>, + ptrs_resolver: ::rkyv::vec::VecResolver, + ptrs_len: usize, +} + +impl<'a, T0> ::rkyv::Archive for Tup1SparseData<'a, T0> +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, +{ + type Archived = ArchivedTup1Sparse; + type Resolver = Tup1SparseResolver; + #[inline] + unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) { + let (_fp, fo) = ::rkyv::out_field!(out.bitmap); + fo.write(resolver.bitmap); + let (fp, fo) = ::rkyv::out_field!(out.ptrs); + let vec_pos = pos + fp; + ::rkyv::vec::ArchivedVec::<::rkyv::rel_ptr::RawRelPtrI32>::resolve_from_len( + resolver.ptrs_len, + vec_pos, + resolver.ptrs_resolver, + fo, + ); + let (_fp, fo) = ::rkyv::out_field!(out._phantom); + fo.write(core::marker::PhantomData); + } +} + +impl<'a, S, T0> ::rkyv::Serialize for Tup1SparseData<'a, T0> +where + S: ::rkyv::ser::Serializer + ::rkyv::ser::ScratchSpace + ?Sized, + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive + ::rkyv::Serialize, +{ + #[inline] + fn serialize(&self, serializer: &mut S) -> Result { + let t0_is_none = self.bitmap.is_none(0); + let mut ptrs: [Tup1FieldPtr; 1usize] = [Tup1FieldPtr { pos: 0 }; 1usize]; + let mut ptrs_len = 0usize; + if !t0_is_none { + let pos = + serializer.serialize_value(::dbsp::utils::IsNone::unwrap_or_self(&self.value.0))?; + ptrs[ptrs_len] = Tup1FieldPtr { pos }; + ptrs_len += 1; + } + let ptrs_resolver = + ::rkyv::vec::ArchivedVec::<::rkyv::rel_ptr::RawRelPtrI32>::serialize_from_slice( + &ptrs[..ptrs_len], + serializer, + )?; + Ok(Tup1SparseResolver { + bitmap: self.bitmap, + ptrs_resolver, + ptrs_len, + }) + } +} + +struct Tup1DenseData<'a, T0> { + bitmap: TupBitmap<1>, + value: &'a Tup1, +} + +struct Tup1DenseResolver +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, +{ + bitmap: TupBitmap<1>, + t0: Option<<::Inner as ::rkyv::Archive>::Resolver>, +} + +impl<'a, T0> ::rkyv::Archive for Tup1DenseData<'a, T0> +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, +{ + type Archived = ArchivedTup1Dense; + type Resolver = Tup1DenseResolver; + #[inline] + unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) { + let (_fp, fo) = ::rkyv::out_field!(out.bitmap); + fo.write(resolver.bitmap); + let (fp, fo) = ::rkyv::out_field!(out.t0); + let t0_out = fo + .cast::::Inner>>>( + ); + if let Some(resolver) = resolver.t0 { + let inner = ::dbsp::utils::IsNone::unwrap_or_self(&self.value.0); + <::Inner as ::rkyv::Archive>::resolve( + inner, + pos + fp, + resolver, + t0_out.cast::<::rkyv::Archived<::Inner>>(), + ); + } else { + core::ptr::write(t0_out, core::mem::MaybeUninit::zeroed()); + } + let (_fp, fo) = ::rkyv::out_field!(out._phantom); + fo.write(core::marker::PhantomData); + } +} + +impl<'a, S, T0> ::rkyv::Serialize for Tup1DenseData<'a, T0> +where + S: ::rkyv::ser::Serializer + ::rkyv::ser::ScratchSpace + ?Sized, + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive + ::rkyv::Serialize, +{ + #[inline] + fn serialize(&self, serializer: &mut S) -> Result { + let t0_is_none = self.bitmap.is_none(0); + let t0 = if t0_is_none { + None + } else { + Some(::rkyv::Serialize::serialize( + ::dbsp::utils::IsNone::unwrap_or_self(&self.value.0), + serializer, + )?) + }; + Ok(Tup1DenseResolver { + bitmap: self.bitmap, + t0, + }) + } +} + +pub enum Tup1Resolver { + Sparse { data_pos: usize }, + Dense { data_pos: usize }, + _Phantom(core::marker::PhantomData), +} + +impl ::rkyv::Archive for Tup1 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, +{ + type Archived = ArchivedTup1; + type Resolver = Tup1Resolver; + #[inline] + unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) { + let (_fp, format_out) = ::rkyv::out_field!(out.format); + let (fp, data_out) = ::rkyv::out_field!(out.data); + let data_out = data_out.cast::<::rkyv::rel_ptr::RawRelPtrI32>(); + + match resolver { + Tup1Resolver::Sparse { data_pos } => { + format_out.write(TupFormat::Sparse); + ::rkyv::rel_ptr::RawRelPtrI32::emplace(pos + fp, data_pos, data_out); + } + Tup1Resolver::Dense { data_pos } => { + format_out.write(TupFormat::Dense); + ::rkyv::rel_ptr::RawRelPtrI32::emplace(pos + fp, data_pos, data_out); + } + Tup1Resolver::_Phantom(_) => unreachable!(), + } + let (_fp, fo) = ::rkyv::out_field!(out._phantom); + fo.write(core::marker::PhantomData); + } +} + +impl ::rkyv::Serialize for Tup1 +where + S: ::rkyv::ser::Serializer + ::rkyv::ser::ScratchSpace + ?Sized, + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive + ::rkyv::Serialize, +{ + #[inline] + fn serialize(&self, serializer: &mut S) -> Result { + let (bitmap, format) = self.choose_format(); + match format { + TupFormat::Dense => { + let data = Tup1DenseData { + bitmap, + value: self, + }; + let data_pos = serializer.serialize_value(&data)?; + Ok(Tup1Resolver::Dense { data_pos }) + } + TupFormat::Sparse => { + let data = Tup1SparseData { + bitmap, + value: self, + }; + let data_pos = serializer.serialize_value(&data)?; + Ok(Tup1Resolver::Sparse { data_pos }) + } + } + } +} +impl ::rkyv::Deserialize, D> for ArchivedTup1 +where + D: ::rkyv::Fallible + ::core::any::Any, + T0: ::rkyv::Archive + Default + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::rkyv::Archived: ::rkyv::Deserialize, + ::rkyv::Archived<::Inner>: + ::rkyv::Deserialize<::Inner, D>, +{ + #[inline] + fn deserialize(&self, deserializer: &mut D) -> Result, D::Error> { + let version = (deserializer as &mut dyn ::core::any::Any) + .downcast_mut::<::dbsp::storage::file::Deserializer>() + .map(|deserializer| deserializer.version()) + .expect("passed wrong deserializer"); + if version <= 3 { + let legacy = unsafe { &*(self as *const _ as *const ArchivedTup1V3) }; + return as ::rkyv::Deserialize, D>>::deserialize( + legacy, + deserializer, + ); + } + match self.format { + TupFormat::Sparse => { + let sparse = self.sparse(); + let mut ptr_idx = 0usize; + let t0 = if sparse.none_bit_set(0) { + T0::default() + } else { + let archived: &::rkyv::Archived<::Inner> = unsafe { + &*sparse + .ptrs + .as_slice() + .get_unchecked(ptr_idx) + .as_ptr() + .cast::<::rkyv::Archived<::Inner>>() + }; + ptr_idx += 1; + let inner = archived.deserialize(deserializer)?; + ::from_inner(inner) + }; + Ok(Tup1(t0)) + } + TupFormat::Dense => { + let dense = self.dense(); + let t0 = if dense.none_bit_set(0) { + T0::default() + } else { + let archived = dense.get_t0().expect("ArchivedTup1Dense: missing field 0"); + let inner = archived.deserialize(deserializer)?; + ::from_inner(inner) + }; + Ok(Tup1(t0)) + } + } + } +} +impl ::dbsp::algebra::MulByRef for Tup1 +where + T0: ::dbsp::algebra::MulByRef, + W: ::dbsp::algebra::ZRingValue, +{ + type Output = Self; + fn mul_by_ref(&self, other: &W) -> Self::Output { + let Tup1(t0) = self; + Tup1(t0.mul_by_ref(other)) + } +} +impl ::dbsp::algebra::HasZero for Tup1 +where + T0: ::dbsp::algebra::HasZero, +{ + fn zero() -> Self { + Tup1(T0::zero()) + } + fn is_zero(&self) -> bool { + let mut result = true; + let Tup1(t0) = self; + result = result && t0.is_zero(); + result + } +} +impl ::dbsp::algebra::AddByRef for Tup1 +where + T0: ::dbsp::algebra::AddByRef, +{ + fn add_by_ref(&self, other: &Self) -> Self { + let Tup1(t0) = self; + let Tup1(other_t0) = other; + Tup1(t0.add_by_ref(other_t0)) + } +} +impl ::dbsp::algebra::AddAssignByRef for Tup1 +where + T0: ::dbsp::algebra::AddAssignByRef, +{ + fn add_assign_by_ref(&mut self, other: &Self) { + let Tup1(ref mut t0) = self; + let Tup1(ref other_t0) = other; + t0.add_assign_by_ref(other_t0); + } +} +impl ::dbsp::algebra::NegByRef for Tup1 +where + T0: ::dbsp::algebra::NegByRef, +{ + fn neg_by_ref(&self) -> Self { + let Tup1(t0) = self; + Tup1(t0.neg_by_ref()) + } +} +impl From<(T0)> for Tup1 { + fn from((t0): (T0)) -> Self { + Self(t0) + } +} +impl<'a, T0> Into<(&'a T0,)> for &'a Tup1 { + #[allow(clippy::from_over_into)] + fn into(self) -> (&'a T0,) { + let Tup1(t0) = &self; + (t0,) + } +} +impl Into<(T0,)> for Tup1 { + #[allow(clippy::from_over_into)] + fn into(self) -> (T0,) { + let Tup1(t0) = self; + (t0,) + } +} +impl ::dbsp::NumEntries for Tup1 +where + T0: ::dbsp::NumEntries, +{ + const CONST_NUM_ENTRIES: Option = None; + fn num_entries_shallow(&self) -> usize { + Self::NUM_ELEMENTS + } + fn num_entries_deep(&self) -> usize { + let Tup1(t0) = self; + 0 + (t0).num_entries_deep() + } +} +impl core::fmt::Debug for Tup1 { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::result::Result<(), core::fmt::Error> { + let Tup1(t0) = self; + f.debug_tuple("").field(&t0).finish() + } +} +impl Copy for Tup1 {} +impl ::dbsp::circuit::checkpointer::Checkpoint for Tup1 +where + Tup1: ::rkyv::Serialize<::dbsp::storage::file::Serializer> + + ::dbsp::storage::file::Deserializable, +{ + fn checkpoint(&self) -> Result, ::dbsp::Error> { + let mut s = ::dbsp::storage::file::Serializer::default(); + let _offset = ::rkyv::ser::Serializer::serialize_value(&mut s, self).unwrap(); + let data = s.into_serializer().into_inner().into_vec(); + Ok(data) + } + fn restore(&mut self, data: &[u8]) -> Result<(), ::dbsp::Error> { + *self = ::dbsp::trace::unaligned_deserialize(data); + Ok(()) + } +} +impl ::dbsp::utils::IsNone for Tup1 { + type Inner = Self; + fn is_none(&self) -> bool { + false + } + fn unwrap_or_self(&self) -> &Self::Inner { + self + } + fn from_inner(inner: Self::Inner) -> Self { + inner + } +} + +#[derive( + Default, + Eq, + Ord, + Clone, + Hash, + PartialEq, + PartialOrd, + derive_more::Neg, + serde::Serialize, + serde::Deserialize, + size_of::SizeOf, +)] +pub struct Tup2(pub T0, pub T1); +impl Tup2 { + #[allow(clippy::too_many_arguments)] + pub fn new(t0: T0, t1: T1) -> Self { + Self(t0, t1) + } +} +impl Tup2 { + pub const NUM_ELEMENTS: usize = 2; + + #[inline] + pub fn get_0(&self) -> &T0 { + &self.0 + } + #[inline] + pub fn get_0_mut(&mut self) -> &mut T0 { + &mut self.0 + } +} +impl Tup2 { + #[inline] + pub fn get_1(&self) -> &T1 { + &self.1 + } + #[inline] + pub fn get_1_mut(&mut self) -> &mut T1 { + &mut self.1 + } +} +impl Tup2 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, +{ + #[inline] + fn choose_format(&self) -> (TupBitmap<1>, TupFormat) { + let mut bitmap = TupBitmap::<1>::new(); + if ::dbsp::utils::IsNone::is_none(&self.0) { + bitmap.set_none(0); + } + if ::dbsp::utils::IsNone::is_none(&self.1) { + bitmap.set_none(1); + } + + let none_bits = bitmap.count_none(Self::NUM_ELEMENTS); + if none_bits * 3 > Self::NUM_ELEMENTS { + eprintln!("choosing TupFormat::Sparse..."); + (bitmap, TupFormat::Sparse) + } else { + eprintln!("choosing TupFormat::Dense"); + (bitmap, TupFormat::Dense) + } + } +} +pub struct ArchivedTup2V3 +where + T0: ::rkyv::Archive, + T1: ::rkyv::Archive, +{ + pub t0: ::rkyv::Archived, + pub t1: ::rkyv::Archived, +} +impl ::rkyv::Deserialize, D> for ArchivedTup2V3 +where + D: ::rkyv::Fallible + ?Sized, + T0: ::rkyv::Archive, + T1: ::rkyv::Archive, + ::rkyv::Archived: ::rkyv::Deserialize, + ::rkyv::Archived: ::rkyv::Deserialize, +{ + #[inline] + fn deserialize(&self, deserializer: &mut D) -> Result, D::Error> { + Ok(Tup2( + self.t0.deserialize(deserializer)?, + self.t1.deserialize(deserializer)?, + )) + } +} +#[derive(Copy, Clone)] +struct Tup2FieldPtr { + pos: usize, +} +impl ::rkyv::Archive for Tup2FieldPtr { + type Archived = ::rkyv::rel_ptr::RawRelPtrI32; + type Resolver = usize; + #[inline] + unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) { + ::rkyv::rel_ptr::RawRelPtrI32::emplace(pos, resolver, out); + } +} +impl ::rkyv::Serialize for Tup2FieldPtr +where + S: ::rkyv::ser::Serializer + ?Sized, +{ + #[inline] + fn serialize(&self, _serializer: &mut S) -> Result { + Ok(self.pos) + } +} +#[repr(C)] +pub struct ArchivedTup2Sparse +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, +{ + bitmap: TupBitmap<1>, + ptrs: ::rkyv::vec::ArchivedVec<::rkyv::rel_ptr::RawRelPtrI32>, + _phantom: core::marker::PhantomData (T0, T1)>, +} + +#[repr(C)] +pub struct ArchivedTup2Dense +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, +{ + bitmap: TupBitmap<1>, + t0: core::mem::MaybeUninit<::rkyv::Archived<::Inner>>, + t1: core::mem::MaybeUninit<::rkyv::Archived<::Inner>>, + _phantom: core::marker::PhantomData (T0, T1)>, +} + +#[repr(C)] +pub struct ArchivedTup2 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, +{ + format: TupFormat, + data: ::rkyv::rel_ptr::RawRelPtrI32, + _phantom: core::marker::PhantomData (T0, T1)>, +} + +impl ArchivedTup2Sparse +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, +{ + #[inline] + fn none_bit_set(&self, idx: usize) -> bool { + debug_assert!(idx < Tup2::::NUM_ELEMENTS); + self.bitmap.is_none(idx) + } + + #[inline] + fn idx_for_field(&self, field_idx: usize) -> usize { + debug_assert!(field_idx < Tup2::::NUM_ELEMENTS); + field_idx - self.bitmap.count_none_before(field_idx) + } + + #[inline] + pub fn get_t0(&self) -> Option<&::rkyv::Archived<::Inner>> { + if self.none_bit_set(0) { + None + } else { + let ptr_idx = self.idx_for_field(0); + debug_assert!(ptr_idx < self.ptrs.len()); + Some(unsafe { + &*self + .ptrs + .as_slice() + .get_unchecked(ptr_idx) + .as_ptr() + .cast::<::rkyv::Archived<::Inner>>() + }) + } + } + + #[inline] + pub fn get_t1(&self) -> Option<&::rkyv::Archived<::Inner>> { + if self.none_bit_set(1) { + None + } else { + let ptr_idx = self.idx_for_field(1); + debug_assert!(ptr_idx < self.ptrs.len()); + Some(unsafe { + &*self + .ptrs + .as_slice() + .get_unchecked(ptr_idx) + .as_ptr() + .cast::<::rkyv::Archived<::Inner>>() + }) + } + } +} + +impl ArchivedTup2Dense +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, +{ + #[inline] + fn none_bit_set(&self, idx: usize) -> bool { + debug_assert!(idx < Tup2::::NUM_ELEMENTS); + self.bitmap.is_none(idx) + } + + #[inline] + pub fn get_t0(&self) -> Option<&::rkyv::Archived<::Inner>> { + if self.none_bit_set(0) { + None + } else { + Some(unsafe { &*self.t0.as_ptr() }) + } + } + + #[inline] + pub fn get_t1(&self) -> Option<&::rkyv::Archived<::Inner>> { + if self.none_bit_set(1) { + None + } else { + Some(unsafe { &*self.t1.as_ptr() }) + } + } +} + +impl ArchivedTup2 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, +{ + #[inline] + fn sparse(&self) -> &ArchivedTup2Sparse { + unsafe { &*self.data.as_ptr().cast::>() } + } + + #[inline] + fn dense(&self) -> &ArchivedTup2Dense { + unsafe { &*self.data.as_ptr().cast::>() } + } + + #[inline] + pub fn get_t0(&self) -> Option<&::rkyv::Archived<::Inner>> { + match self.format { + TupFormat::Sparse => self.sparse().get_t0(), + TupFormat::Dense => self.dense().get_t0(), + } + } + + #[inline] + pub fn get_t1(&self) -> Option<&::rkyv::Archived<::Inner>> { + match self.format { + TupFormat::Sparse => self.sparse().get_t1(), + TupFormat::Dense => self.dense().get_t1(), + } + } +} +impl core::cmp::PartialEq for ArchivedTup2 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, + ::rkyv::Archived<::Inner>: core::cmp::PartialEq, + ::rkyv::Archived<::Inner>: core::cmp::PartialEq, +{ + #[inline] + fn eq(&self, other: &Self) -> bool { + true && self.get_t0() == other.get_t0() && self.get_t1() == other.get_t1() + } +} +impl core::cmp::Eq for ArchivedTup2 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, + ::rkyv::Archived<::Inner>: core::cmp::Eq, + ::rkyv::Archived<::Inner>: core::cmp::Eq, +{ +} +impl core::cmp::PartialOrd for ArchivedTup2 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, + ::rkyv::Archived<::Inner>: core::cmp::Ord, + ::rkyv::Archived<::Inner>: core::cmp::Ord, +{ + #[inline] + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} +impl core::cmp::Ord for ArchivedTup2 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, + ::rkyv::Archived<::Inner>: core::cmp::Ord, + ::rkyv::Archived<::Inner>: core::cmp::Ord, +{ + #[inline] + fn cmp(&self, other: &Self) -> core::cmp::Ordering { + let cmp = self.get_t0().cmp(&other.get_t0()); + if cmp != core::cmp::Ordering::Equal { + return cmp; + } + let cmp = self.get_t1().cmp(&other.get_t1()); + if cmp != core::cmp::Ordering::Equal { + return cmp; + } + core::cmp::Ordering::Equal + } +} +struct Tup2SparseData<'a, T0, T1> { + bitmap: TupBitmap<1>, + value: &'a Tup2, +} + +struct Tup2SparseResolver { + bitmap: TupBitmap<1>, + ptrs_resolver: ::rkyv::vec::VecResolver, + ptrs_len: usize, +} + +impl<'a, T0, T1> ::rkyv::Archive for Tup2SparseData<'a, T0, T1> +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, +{ + type Archived = ArchivedTup2Sparse; + type Resolver = Tup2SparseResolver; + #[inline] + unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) { + let (_fp, fo) = ::rkyv::out_field!(out.bitmap); + fo.write(resolver.bitmap); + let (fp, fo) = ::rkyv::out_field!(out.ptrs); + let vec_pos = pos + fp; + ::rkyv::vec::ArchivedVec::<::rkyv::rel_ptr::RawRelPtrI32>::resolve_from_len( + resolver.ptrs_len, + vec_pos, + resolver.ptrs_resolver, + fo, + ); + let (_fp, fo) = ::rkyv::out_field!(out._phantom); + fo.write(core::marker::PhantomData); + } +} + +impl<'a, S, T0, T1> ::rkyv::Serialize for Tup2SparseData<'a, T0, T1> +where + S: ::rkyv::ser::Serializer + ::rkyv::ser::ScratchSpace + ?Sized, + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive + ::rkyv::Serialize, + ::Inner: ::rkyv::Archive + ::rkyv::Serialize, +{ + #[inline] + fn serialize(&self, serializer: &mut S) -> Result { + let t0_is_none = self.bitmap.is_none(0); + let t1_is_none = self.bitmap.is_none(1); + let mut ptrs: [Tup2FieldPtr; 2usize] = [Tup2FieldPtr { pos: 0 }; 2usize]; + let mut ptrs_len = 0usize; + if !t0_is_none { + let pos = + serializer.serialize_value(::dbsp::utils::IsNone::unwrap_or_self(&self.value.0))?; + ptrs[ptrs_len] = Tup2FieldPtr { pos }; + ptrs_len += 1; + } + if !t1_is_none { + let pos = + serializer.serialize_value(::dbsp::utils::IsNone::unwrap_or_self(&self.value.1))?; + ptrs[ptrs_len] = Tup2FieldPtr { pos }; + ptrs_len += 1; + } + let ptrs_resolver = + ::rkyv::vec::ArchivedVec::<::rkyv::rel_ptr::RawRelPtrI32>::serialize_from_slice( + &ptrs[..ptrs_len], + serializer, + )?; + Ok(Tup2SparseResolver { + bitmap: self.bitmap, + ptrs_resolver, + ptrs_len, + }) + } +} + +struct Tup2DenseData<'a, T0, T1> { + bitmap: TupBitmap<1>, + value: &'a Tup2, +} + +struct Tup2DenseResolver +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, +{ + bitmap: TupBitmap<1>, + t0: Option<<::Inner as ::rkyv::Archive>::Resolver>, + t1: Option<<::Inner as ::rkyv::Archive>::Resolver>, +} + +impl<'a, T0, T1> ::rkyv::Archive for Tup2DenseData<'a, T0, T1> +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, +{ + type Archived = ArchivedTup2Dense; + type Resolver = Tup2DenseResolver; + #[inline] + unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) { + let (_fp, fo) = ::rkyv::out_field!(out.bitmap); + fo.write(resolver.bitmap); + let (fp, fo) = ::rkyv::out_field!(out.t0); + let t0_out = fo + .cast::::Inner>>>( + ); + if let Some(resolver) = resolver.t0 { + let inner = ::dbsp::utils::IsNone::unwrap_or_self(&self.value.0); + <::Inner as ::rkyv::Archive>::resolve( + inner, + pos + fp, + resolver, + t0_out.cast::<::rkyv::Archived<::Inner>>(), + ); + } else { + core::ptr::write(t0_out, core::mem::MaybeUninit::zeroed()); + } + let (fp, fo) = ::rkyv::out_field!(out.t1); + let t1_out = fo + .cast::::Inner>>>( + ); + if let Some(resolver) = resolver.t1 { + let inner = ::dbsp::utils::IsNone::unwrap_or_self(&self.value.1); + <::Inner as ::rkyv::Archive>::resolve( + inner, + pos + fp, + resolver, + t1_out.cast::<::rkyv::Archived<::Inner>>(), + ); + } else { + core::ptr::write(t1_out, core::mem::MaybeUninit::zeroed()); + } + let (_fp, fo) = ::rkyv::out_field!(out._phantom); + fo.write(core::marker::PhantomData); + } +} + +impl<'a, S, T0, T1> ::rkyv::Serialize for Tup2DenseData<'a, T0, T1> +where + S: ::rkyv::ser::Serializer + ::rkyv::ser::ScratchSpace + ?Sized, + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive + ::rkyv::Serialize, + ::Inner: ::rkyv::Archive + ::rkyv::Serialize, +{ + #[inline] + fn serialize(&self, serializer: &mut S) -> Result { + let t0_is_none = self.bitmap.is_none(0); + let t1_is_none = self.bitmap.is_none(1); + let t0 = if t0_is_none { + None + } else { + Some(::rkyv::Serialize::serialize( + ::dbsp::utils::IsNone::unwrap_or_self(&self.value.0), + serializer, + )?) + }; + let t1 = if t1_is_none { + None + } else { + Some(::rkyv::Serialize::serialize( + ::dbsp::utils::IsNone::unwrap_or_self(&self.value.1), + serializer, + )?) + }; + Ok(Tup2DenseResolver { + bitmap: self.bitmap, + t0, + t1, + }) + } +} + +pub enum Tup2Resolver { + Sparse { data_pos: usize }, + Dense { data_pos: usize }, + _Phantom(core::marker::PhantomData<(T0, T1)>), +} + +impl ::rkyv::Archive for Tup2 +where + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, +{ + type Archived = ArchivedTup2; + type Resolver = Tup2Resolver; + #[inline] + unsafe fn resolve(&self, pos: usize, resolver: Self::Resolver, out: *mut Self::Archived) { + let (_fp, format_out) = ::rkyv::out_field!(out.format); + let (fp, data_out) = ::rkyv::out_field!(out.data); + let data_out = data_out.cast::<::rkyv::rel_ptr::RawRelPtrI32>(); + + match resolver { + Tup2Resolver::Sparse { data_pos } => { + format_out.write(TupFormat::Sparse); + ::rkyv::rel_ptr::RawRelPtrI32::emplace(pos + fp, data_pos, data_out); + } + Tup2Resolver::Dense { data_pos } => { + format_out.write(TupFormat::Dense); + ::rkyv::rel_ptr::RawRelPtrI32::emplace(pos + fp, data_pos, data_out); + } + Tup2Resolver::_Phantom(_) => unreachable!(), + } + let (_fp, fo) = ::rkyv::out_field!(out._phantom); + fo.write(core::marker::PhantomData); + } +} + +impl ::rkyv::Serialize for Tup2 +where + S: ::rkyv::ser::Serializer + ::rkyv::ser::ScratchSpace + ?Sized, + T0: ::rkyv::Archive + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive + ::rkyv::Serialize, + ::Inner: ::rkyv::Archive + ::rkyv::Serialize, +{ + #[inline] + fn serialize(&self, serializer: &mut S) -> Result { + let (bitmap, format) = self.choose_format(); + match format { + TupFormat::Dense => { + let data = Tup2DenseData { + bitmap, + value: self, + }; + let data_pos = serializer.serialize_value(&data)?; + Ok(Tup2Resolver::Dense { data_pos }) + } + TupFormat::Sparse => { + let data = Tup2SparseData { + bitmap, + value: self, + }; + let data_pos = serializer.serialize_value(&data)?; + Ok(Tup2Resolver::Sparse { data_pos }) + } + } + } +} +impl ::rkyv::Deserialize, D> for ArchivedTup2 +where + D: ::rkyv::Fallible + ::core::any::Any, + T0: ::rkyv::Archive + Default + ::dbsp::utils::IsNone, + T1: ::rkyv::Archive + Default + ::dbsp::utils::IsNone, + ::Inner: ::rkyv::Archive, + ::Inner: ::rkyv::Archive, + ::rkyv::Archived: ::rkyv::Deserialize, + ::rkyv::Archived: ::rkyv::Deserialize, + ::rkyv::Archived<::Inner>: + ::rkyv::Deserialize<::Inner, D>, + ::rkyv::Archived<::Inner>: + ::rkyv::Deserialize<::Inner, D>, +{ + #[inline] + fn deserialize(&self, deserializer: &mut D) -> Result, D::Error> { + let version = (deserializer as &mut dyn ::core::any::Any) + .downcast_mut::<::dbsp::storage::file::Deserializer>() + .map(|deserializer| deserializer.version()) + .expect("passed wrong deserializer"); + if version <= 3 { + let legacy = unsafe { &*(self as *const _ as *const ArchivedTup2V3) }; + return as ::rkyv::Deserialize, D>>::deserialize( + legacy, + deserializer, + ); + } + match self.format { + TupFormat::Sparse => { + let sparse = self.sparse(); + let mut ptr_idx = 0usize; + let t0 = if sparse.none_bit_set(0) { + T0::default() + } else { + let archived: &::rkyv::Archived<::Inner> = unsafe { + &*sparse + .ptrs + .as_slice() + .get_unchecked(ptr_idx) + .as_ptr() + .cast::<::rkyv::Archived<::Inner>>() + }; + ptr_idx += 1; + let inner = archived.deserialize(deserializer)?; + ::from_inner(inner) + }; + let t1 = if sparse.none_bit_set(1) { + T1::default() + } else { + let archived: &::rkyv::Archived<::Inner> = unsafe { + &*sparse + .ptrs + .as_slice() + .get_unchecked(ptr_idx) + .as_ptr() + .cast::<::rkyv::Archived<::Inner>>() + }; + ptr_idx += 1; + let inner = archived.deserialize(deserializer)?; + ::from_inner(inner) + }; + Ok(Tup2(t0, t1)) + } + TupFormat::Dense => { + let dense = self.dense(); + let t0 = if dense.none_bit_set(0) { + T0::default() + } else { + let archived = dense.get_t0().expect("ArchivedTup2Dense: missing field 0"); + let inner = archived.deserialize(deserializer)?; + ::from_inner(inner) + }; + let t1 = if dense.none_bit_set(1) { + T1::default() + } else { + let archived = dense.get_t1().expect("ArchivedTup2Dense: missing field 1"); + let inner = archived.deserialize(deserializer)?; + ::from_inner(inner) + }; + Ok(Tup2(t0, t1)) + } + } + } +} +impl ::dbsp::algebra::MulByRef for Tup2 +where + T0: ::dbsp::algebra::MulByRef, + T1: ::dbsp::algebra::MulByRef, + W: ::dbsp::algebra::ZRingValue, +{ + type Output = Self; + fn mul_by_ref(&self, other: &W) -> Self::Output { + let Tup2(t0, t1) = self; + Tup2(t0.mul_by_ref(other), t1.mul_by_ref(other)) + } +} +impl ::dbsp::algebra::HasZero for Tup2 +where + T0: ::dbsp::algebra::HasZero, + T1: ::dbsp::algebra::HasZero, +{ + fn zero() -> Self { + Tup2(T0::zero(), T1::zero()) + } + fn is_zero(&self) -> bool { + let mut result = true; + let Tup2(t0, t1) = self; + result = result && t0.is_zero(); + result = result && t1.is_zero(); + result + } +} +impl ::dbsp::algebra::AddByRef for Tup2 +where + T0: ::dbsp::algebra::AddByRef, + T1: ::dbsp::algebra::AddByRef, +{ + fn add_by_ref(&self, other: &Self) -> Self { + let Tup2(t0, t1) = self; + let Tup2(other_t0, other_t1) = other; + Tup2(t0.add_by_ref(other_t0), t1.add_by_ref(other_t1)) + } +} +impl ::dbsp::algebra::AddAssignByRef for Tup2 +where + T0: ::dbsp::algebra::AddAssignByRef, + T1: ::dbsp::algebra::AddAssignByRef, +{ + fn add_assign_by_ref(&mut self, other: &Self) { + let Tup2(ref mut t0, ref mut t1) = self; + let Tup2(ref other_t0, ref other_t1) = other; + t0.add_assign_by_ref(other_t0); + t1.add_assign_by_ref(other_t1); + } +} +impl ::dbsp::algebra::NegByRef for Tup2 +where + T0: ::dbsp::algebra::NegByRef, + T1: ::dbsp::algebra::NegByRef, +{ + fn neg_by_ref(&self) -> Self { + let Tup2(t0, t1) = self; + Tup2(t0.neg_by_ref(), t1.neg_by_ref()) + } +} +impl From<(T0, T1)> for Tup2 { + fn from((t0, t1): (T0, T1)) -> Self { + Self(t0, t1) + } +} +impl<'a, T0, T1> Into<(&'a T0, &'a T1)> for &'a Tup2 { + #[allow(clippy::from_over_into)] + fn into(self) -> (&'a T0, &'a T1) { + let Tup2(t0, t1) = &self; + (t0, t1) + } +} +impl Into<(T0, T1)> for Tup2 { + #[allow(clippy::from_over_into)] + fn into(self) -> (T0, T1) { + let Tup2(t0, t1) = self; + (t0, t1) + } +} +impl ::dbsp::NumEntries for Tup2 +where + T0: ::dbsp::NumEntries, + T1: ::dbsp::NumEntries, +{ + const CONST_NUM_ENTRIES: Option = None; + fn num_entries_shallow(&self) -> usize { + Self::NUM_ELEMENTS + } + fn num_entries_deep(&self) -> usize { + let Tup2(t0, t1) = self; + 0 + (t0).num_entries_deep() + (t1).num_entries_deep() + } +} +impl core::fmt::Debug for Tup2 { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::result::Result<(), core::fmt::Error> { + let Tup2(t0, t1) = self; + f.debug_tuple("").field(&t0).field(&t1).finish() + } +} +impl Copy for Tup2 {} +impl ::dbsp::circuit::checkpointer::Checkpoint for Tup2 +where + Tup2: ::rkyv::Serialize<::dbsp::storage::file::Serializer> + + ::dbsp::storage::file::Deserializable, +{ + fn checkpoint(&self) -> Result, ::dbsp::Error> { + let mut s = ::dbsp::storage::file::Serializer::default(); + let _offset = ::rkyv::ser::Serializer::serialize_value(&mut s, self).unwrap(); + let data = s.into_serializer().into_inner().into_vec(); + Ok(data) + } + fn restore(&mut self, data: &[u8]) -> Result<(), ::dbsp::Error> { + *self = ::dbsp::trace::unaligned_deserialize(data); + Ok(()) + } +} +impl ::dbsp::utils::IsNone for Tup2 { + type Inner = Self; + fn is_none(&self) -> bool { + false + } + fn unwrap_or_self(&self) -> &Self::Inner { + self + } + fn from_inner(inner: Self::Inner) -> Self { + inner + } +} + +#[test] +fn compile_tup1() { + let t1 = Tup1::new(Some(32i32)); +} + +#[test] +fn compile_tup2() { + let t2 = Tup2::new(Some(32i32), Some(64i32)); +} + +#[test] +fn rkyv_compact_tup1_option_roundtrip_bool() { + let tup_some = Tup1::new(Some(true)); + let bytes_some = dbsp::storage::file::to_bytes(&tup_some).unwrap(); + let restored_some: Tup1> = dbsp::trace::unaligned_deserialize(&bytes_some[..]); + assert_eq!(restored_some, tup_some); + + let tup_none: Tup1> = Tup1::new(None); + let bytes_none = dbsp::storage::file::to_bytes(&tup_none).unwrap(); + let restored_none: Tup1> = dbsp::trace::unaligned_deserialize(&bytes_none[..]); + assert_eq!(restored_none, tup_none); + assert!( + bytes_none.len() >= core::mem::size_of::>>(), + "expected serialized bytes to include archived data, got bytes={} archived={}", + bytes_none.len(), + core::mem::size_of::>>() + ); + + eprintln!( + "Tup1>: None={} Some={}", + bytes_none.len(), + bytes_some.len() + ); + + let _ = (bytes_none, bytes_some); +} + +#[test] +fn rkyv_compact_tup1_option_roundtrip() { + use feldera_sqllib::SqlString; + + let tup_some = Tup1::new(()); + let bytes_some = dbsp::storage::file::to_bytes(&tup_some).unwrap(); + let restored_some: Tup1<()> = dbsp::trace::unaligned_deserialize(&bytes_some[..]); + assert_eq!(restored_some, tup_some); + + eprintln!("Tup1<()>: {}", bytes_some.len()); + + let tup_some = Tup1::new(0u8); + let bytes_some = dbsp::storage::file::to_bytes(&tup_some).unwrap(); + let restored_some: Tup1 = dbsp::trace::unaligned_deserialize(&bytes_some[..]); + assert_eq!(restored_some, tup_some); + + eprintln!("Tup1: {}", bytes_some.len()); + + let tup_some = Tup1::new(Some(0u8)); + let bytes_some = dbsp::storage::file::to_bytes(&tup_some).unwrap(); + let restored_some: Tup1> = dbsp::trace::unaligned_deserialize(&bytes_some[..]); + assert_eq!(restored_some, tup_some); + + eprintln!("Tup1>: {}", bytes_some.len()); +} + +#[test] +fn rkyv_compact_tup2_option_sizes() { + use feldera_sqllib::SqlString; + + let tup_some = Tup2::new( + Some(SqlString::from_ref("hello")), + Some(SqlString::from_ref("hello")), + ); + let bytes_some = dbsp::storage::file::to_bytes(&tup_some).unwrap(); + let restored_some: Tup2, Option> = + dbsp::trace::unaligned_deserialize(&bytes_some[..]); + assert_eq!(restored_some, tup_some); + + let tup_none: Tup2, Option> = Tup2::new(None, None); + let bytes_none = dbsp::storage::file::to_bytes(&tup_none).unwrap(); + let restored_none: Tup2, Option> = + dbsp::trace::unaligned_deserialize(&bytes_none[..]); + assert_eq!(restored_none, tup_none); + + eprintln!( + "Tup2, Option>: None={} Some={}", + bytes_none.len(), + bytes_some.len() + ); +} + +#[test] +fn rkyv_compact_tup1_archived_get_t0() { + use feldera_sqllib::SqlString; + + let tup_none: Tup1> = Tup1::new(None); + let bytes_none = dbsp::storage::file::to_bytes(&tup_none).unwrap(); + let archived_none = unsafe { rkyv::archived_root::>>(&bytes_none[..]) }; + let archived_t0_none = archived_none.get_t0(); + assert!(archived_t0_none.is_none()); + + let tup_some = Tup1::new(Some(SqlString::from_ref("hello"))); + let bytes_some = dbsp::storage::file::to_bytes(&tup_some).unwrap(); + let archived_some = unsafe { rkyv::archived_root::>>(&bytes_some[..]) }; + let archived_t0_some = archived_some.get_t0(); + assert!(archived_t0_some.is_some()); + assert_eq!(archived_t0_some.unwrap().as_str(), "hello"); +} + +#[test] +fn rkyv_compact_tup1_option_checkpoint_roundtrip() { + use dbsp::circuit::checkpointer::Checkpoint; + use feldera_sqllib::SqlString; + + let tup_some = Tup1::new(Some(SqlString::from_ref("hello"))); + let bytes_some = tup_some.checkpoint().unwrap(); + let mut restored_some: Tup1> = Tup1::new(None); + restored_some.restore(&bytes_some).unwrap(); + assert_eq!(restored_some, tup_some); + + let tup_none: Tup1> = Tup1::new(None); + let bytes_none = tup_none.checkpoint().unwrap(); + let mut restored_none: Tup1> = Tup1::new(Some(SqlString::from_ref("x"))); + restored_none.restore(&bytes_none).unwrap(); + assert_eq!(restored_none, tup_none); + + assert!( + bytes_none.len() >= core::mem::size_of::>>(), + "expected serialized bytes to include archived data, got bytes={} archived={}", + bytes_none.len(), + core::mem::size_of::>>() + ); +} + +#[test] +fn rkyv_compact_tup2_roundtrip() { + use feldera_sqllib::SqlString; + + let tup_vals: Tup2 = Tup2::new(SqlString::from_ref("hi"), 8i32); + let bytes_vals = dbsp::storage::file::to_bytes(&tup_vals).unwrap(); + let restored_vals: Tup2 = dbsp::trace::unaligned_deserialize(&bytes_vals[..]); + assert_eq!(restored_vals, tup_vals); + assert!( + bytes_vals.len() >= core::mem::size_of::>(), + "expected serialized bytes to include archived data, got bytes={} archived={}", + bytes_vals.len(), + core::mem::size_of::>() + ); +} + +#[test] +fn rkyv_compact_tup2_option_roundtrip() { + use feldera_sqllib::SqlString; + + let tup_none: Tup2, Option> = Tup2::new(None, None); + let bytes_none = dbsp::storage::file::to_bytes(&tup_none).unwrap(); + let restored_none: Tup2, Option> = + dbsp::trace::unaligned_deserialize(&bytes_none[..]); + assert_eq!(restored_none, tup_none); + eprintln!("bytes_none.len() = {}", bytes_none.len()); + assert!( + bytes_none.len() >= core::mem::size_of::, Option>>(), + "expected serialized bytes to include archived data, got bytes={} archived={}", + bytes_none.len(), + core::mem::size_of::, Option>>() + ); + + let tup_10: Tup2, Option> = + Tup2::new(Some(SqlString::from_ref("hi")), None); + let bytes_10 = dbsp::storage::file::to_bytes(&tup_10).unwrap(); + let restored_10: Tup2, Option> = + dbsp::trace::unaligned_deserialize(&bytes_10[..]); + assert_eq!(restored_10, tup_10); + + let tup_01: Tup2, Option> = Tup2::new(None, Some(42)); + let bytes_01 = dbsp::storage::file::to_bytes(&tup_01).unwrap(); + let restored_01: Tup2, Option> = + dbsp::trace::unaligned_deserialize(&bytes_01[..]); + assert_eq!(restored_01, tup_01); + + let tup_11: Tup2, Option> = + Tup2::new(Some(SqlString::from_ref("hello")), Some(42)); + let bytes_11 = dbsp::storage::file::to_bytes(&tup_11).unwrap(); + let restored_11: Tup2, Option> = + dbsp::trace::unaligned_deserialize(&bytes_11[..]); + assert_eq!(restored_11, tup_11); +} + +#[test] +fn rkyv_compact_tup2_archived_get_t0_t1() { + use feldera_sqllib::SqlString; + + let tup_10: Tup2, Option> = + Tup2::new(Some(SqlString::from_ref("hi")), None); + let bytes_10 = dbsp::storage::file::to_bytes(&tup_10).unwrap(); + let archived_10 = + unsafe { rkyv::archived_root::, Option>>(&bytes_10[..]) }; + let t0_10 = archived_10.get_t0(); + let t1_10 = archived_10.get_t1(); + assert!(t0_10.is_some()); + assert!(t1_10.is_none()); + assert_eq!(t0_10.unwrap().as_str(), "hi"); + + let tup_01: Tup2, Option> = Tup2::new(None, Some(42)); + let bytes_01 = dbsp::storage::file::to_bytes(&tup_01).unwrap(); + let archived_01 = + unsafe { rkyv::archived_root::, Option>>(&bytes_01[..]) }; + let t0_01 = archived_01.get_t0(); + let t1_01 = archived_01.get_t1(); + assert!(t0_01.is_none()); + assert!(t1_01.is_some()); + assert_eq!(*t1_01.unwrap(), 42); +} diff --git a/crates/feldera-macros/tests/declare_tuple_rkyv.rs b/crates/feldera-macros/tests/declare_tuple_rkyv.rs new file mode 100644 index 0000000000..ba8ec3288b --- /dev/null +++ b/crates/feldera-macros/tests/declare_tuple_rkyv.rs @@ -0,0 +1,106 @@ +use feldera_sqllib::SqlString; + +feldera_macros::declare_tuple! { Tup1 } +feldera_macros::declare_tuple! { Tup2 } +feldera_macros::declare_tuple! { Tup12 } + +type OptStr = Option; +type Tup12Opt = Tup12< + OptStr, + OptStr, + OptStr, + OptStr, + OptStr, + OptStr, + OptStr, + OptStr, + OptStr, + OptStr, + OptStr, + OptStr, +>; + +#[test] +fn rkyv_roundtrip_with_option_sqlstring() { + let tup1_some = Tup1::new(Some(SqlString::from_ref("hello"))); + assert_eq!(tup1_some.get_0().as_ref().cloned().unwrap().str(), "hello"); + + let bytes1_some = dbsp::storage::file::to_bytes(&tup1_some).unwrap(); + let restored1_some: Tup1> = + dbsp::trace::unaligned_deserialize(&bytes1_some[..]); + assert_eq!(restored1_some, tup1_some); + + let tup1_none: Tup1> = Tup1::new(None); + let bytes1_none = dbsp::storage::file::to_bytes(&tup1_none).unwrap(); + let restored1_none: Tup1> = + dbsp::trace::unaligned_deserialize(&bytes1_none[..]); + assert_eq!(restored1_none, tup1_none); + assert!( + bytes1_none.len() >= core::mem::size_of::>>(), + "expected archived root to fit, got {} bytes", + bytes1_none.len() + ); + + let tup2_some = Tup2::new( + Some(SqlString::from_ref("hello")), + Some(SqlString::from_ref("world")), + ); + assert_eq!(tup2_some.get_0().as_ref().cloned().unwrap().str(), "hello"); + assert_eq!(tup2_some.get_1().as_ref().cloned().unwrap().str(), "world"); + + let bytes2_some = dbsp::storage::file::to_bytes(&tup2_some).unwrap(); + let restored2_some: Tup2, Option> = + dbsp::trace::unaligned_deserialize(&bytes2_some[..]); + assert_eq!(restored2_some, tup2_some); + + let tup2_none: Tup2, Option> = Tup2::new(None, None); + let bytes2_none = dbsp::storage::file::to_bytes(&tup2_none).unwrap(); + let restored2_none: Tup2, Option> = + dbsp::trace::unaligned_deserialize(&bytes2_none[..]); + assert_eq!(restored2_none, tup2_none); + assert!( + bytes2_none.len() >= core::mem::size_of::, Option>>(), + "expected archived root to fit, got {} bytes", + bytes2_none.len() + ); +} + +#[test] +fn rkyv_archived_getters() { + let tup1_none: Tup1> = Tup1::new(None); + let bytes1_none = dbsp::storage::file::to_bytes(&tup1_none).unwrap(); + let archived1_none = + unsafe { rkyv::archived_root::>>(&bytes1_none[..]) }; + let t0_none = archived1_none.0.as_ref(); + assert!(t0_none.is_none()); + + let tup2_10: Tup2, Option> = + Tup2::new(Some(SqlString::from_ref("hi")), None); + let bytes2_10 = dbsp::storage::file::to_bytes(&tup2_10).unwrap(); + let archived2_10 = + unsafe { rkyv::archived_root::, Option>>(&bytes2_10[..]) }; + let t0_10 = archived2_10.0.as_ref(); + let t1_10 = archived2_10.1.as_ref(); + assert!(t0_10.is_some()); + assert!(t1_10.is_none()); + assert_eq!(t0_10.unwrap().as_str(), "hi"); +} + +#[test] +fn rkyv_tup12_all_none_size() { + let tup: Tup12Opt = Tup12Opt { + 11: Some(SqlString::from("12345678")), + ..Default::default() + }; + let bytes = dbsp::storage::file::to_bytes(&tup).unwrap(); + assert!( + bytes.len() >= core::mem::size_of::>(), + "expected archived root to fit, got {} bytes", + bytes.len() + ); + let tup_archived = unsafe { rkyv::archived_root::(&bytes[..]) }; + assert_eq!( + tup_archived.get_t11().as_ref().unwrap().as_str(), + "12345678" + ); +} diff --git a/crates/fxp/src/dbsp_impl.rs b/crates/fxp/src/dbsp_impl.rs index 3ac33df44b..4292d6f36c 100644 --- a/crates/fxp/src/dbsp_impl.rs +++ b/crates/fxp/src/dbsp_impl.rs @@ -10,9 +10,19 @@ use std::fmt::Write; use crate::{DynamicDecimal, Fixed, FixedInteger}; impl IsNone for Fixed { + type Inner = Self; + fn is_none(&self) -> bool { false } + + fn unwrap_or_self(&self) -> &Self::Inner { + self + } + + fn from_inner(inner: Self::Inner) -> Self { + inner + } } impl OptionWeightType for Fixed {} diff --git a/crates/sqllib/src/string.rs b/crates/sqllib/src/string.rs index 6bde29aaba..3968cebcc7 100644 --- a/crates/sqllib/src/string.rs +++ b/crates/sqllib/src/string.rs @@ -1024,3 +1024,24 @@ pub fn unintern(id: Option) -> Option { }, } } + +#[cfg(test)] +mod test { + use crate::SqlString; + use rkyv::{option::ArchivedOption, string::ArchivedString}; + + #[test] + fn sql_string_size() { + // This is the same, the None is optimized away + assert_eq!( + std::mem::size_of::(), + std::mem::size_of::>() + ); + + // This None isn't optimized away here, ideally it would be + assert!( + std::mem::size_of::() + < std::mem::size_of::>() + ) + } +} diff --git a/crates/storage-test-compat/Cargo.toml b/crates/storage-test-compat/Cargo.toml new file mode 100644 index 0000000000..8c8205818f --- /dev/null +++ b/crates/storage-test-compat/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "storage-test-compat" +authors.workspace = true +version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +keywords.workspace = true +readme.workspace = true +rust-version.workspace = true +edition.workspace = true + +[dependencies] +dbsp = { workspace = true } +feldera-sqllib = { workspace = true } +feldera-macros = { workspace = true } +uuid = { workspace = true, features = ["v4", "std"] } +serde = { workspace = true, features = ["derive"] } +size-of = { workspace = true } +rkyv = { workspace = true, features = ["std", "size_64", "validation", "smallvec"] } +derive_more = { version = "1.0", features = ["add", "not", "from", "debug"] } +feldera-types = { workspace = true } diff --git a/crates/storage-test-compat/golden-files/golden-batch-v3-snappy.feldera b/crates/storage-test-compat/golden-files/golden-batch-v3-snappy.feldera new file mode 100644 index 0000000000..b4ec9ee746 Binary files /dev/null and b/crates/storage-test-compat/golden-files/golden-batch-v3-snappy.feldera differ diff --git a/crates/storage-test-compat/golden-files/golden-batch-v3.feldera b/crates/storage-test-compat/golden-files/golden-batch-v3.feldera new file mode 100644 index 0000000000..af75d9df3a Binary files /dev/null and b/crates/storage-test-compat/golden-files/golden-batch-v3.feldera differ diff --git a/crates/storage-test-compat/golden-files/golden-batch-v4-snappy.feldera b/crates/storage-test-compat/golden-files/golden-batch-v4-snappy.feldera new file mode 100644 index 0000000000..4b1b8adff8 Binary files /dev/null and b/crates/storage-test-compat/golden-files/golden-batch-v4-snappy.feldera differ diff --git a/crates/storage-test-compat/golden-files/golden-batch-v4.feldera b/crates/storage-test-compat/golden-files/golden-batch-v4.feldera new file mode 100644 index 0000000000..fdf424e457 Binary files /dev/null and b/crates/storage-test-compat/golden-files/golden-batch-v4.feldera differ diff --git a/crates/storage-test-compat/src/bin/golden-writer.rs b/crates/storage-test-compat/src/bin/golden-writer.rs new file mode 100644 index 0000000000..d2147a6742 --- /dev/null +++ b/crates/storage-test-compat/src/bin/golden-writer.rs @@ -0,0 +1,97 @@ +/// Generate new golden files by running: +/// `cargo run -p feldera-sqllib-test --bin golden-writer` +/// +/// whenever you increment `VERSION_NUMBER` +use std::path::PathBuf; + +use dbsp::dynamic::DynData; +use dbsp::storage::backend::StorageBackend; +use dbsp::storage::file::Factories; +use dbsp::storage::file::format::Compression; +use dbsp::storage::file::format::VERSION_NUMBER; +use dbsp::storage::file::writer::{Parameters, Writer1}; +use feldera_types::config::{StorageConfig, StorageOptions}; + +use storage_test_compat::{ + DEFAULT_ROWS, GoldenRow, buffer_cache, golden_row, storage_base_and_path, +}; + +struct Config { + rows: usize, + compression: Option, +} + +impl Config { + fn output(&self) -> PathBuf { + let mut file_name = format!("golden-batch-v{VERSION_NUMBER}"); + match self.compression { + Some(Compression::Snappy) => { + file_name += "-snappy"; + } + None => (), + } + file_name += ".feldera"; + + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("golden-files") + .join(file_name) + } +} + +impl Default for Config { + fn default() -> Self { + let rows = DEFAULT_ROWS; + let compression = Some(Compression::Snappy); + + Config { rows, compression } + } +} + +fn main() -> Result<(), Box> { + let mut config = Config::default(); + for compression in [None, Some(Compression::Snappy)] { + config.compression = compression; + + let output = if config.output().is_absolute() { + config.output() + } else { + std::env::current_dir()?.join(config.output()) + }; + let (base_dir, output_storage_path) = storage_base_and_path(&output); + std::fs::create_dir_all(&base_dir)?; + + let storage_backend = ::new( + &StorageConfig { + path: base_dir.to_string_lossy().to_string(), + cache: Default::default(), + }, + &StorageOptions::default(), + )?; + + let factories = Factories::::new::(); + let parameters = Parameters::default().with_compression(config.compression); + let mut writer = Writer1::new( + &factories, + buffer_cache, + &*storage_backend, + parameters, + config.rows, + )?; + + for row in 0..config.rows { + let key = golden_row(row); + let aux = (); + writer.write0((&key, &aux))?; + } + + let tmp_path = writer.path().clone(); + let (_file_handle, _bloom_filter) = writer.close()?; + let content = storage_backend.read(&tmp_path)?; + storage_backend.write(&output_storage_path, (*content).clone())?; + storage_backend.delete(&tmp_path)?; + + println!("wrote {} rows to {}", config.rows, output.display()); + } + + Ok(()) +} diff --git a/crates/storage-test-compat/src/lib.rs b/crates/storage-test-compat/src/lib.rs new file mode 100644 index 0000000000..2f7eb013a1 --- /dev/null +++ b/crates/storage-test-compat/src/lib.rs @@ -0,0 +1,419 @@ +use std::collections::BTreeMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use dbsp::algebra::{F32, F64}; +use dbsp::storage::backend::StoragePath; +use dbsp::storage::buffer_cache::BufferCache; +use uuid::Uuid as RawUuid; + +use feldera_sqllib::{ + to_array, to_map, Array, ByteArray, Date, GeoPoint, LongInterval, Map, ShortInterval, + SqlDecimal, SqlString, Time, Timestamp, Uuid, Variant, +}; + +type Opt = Option; +type Dec12_2 = SqlDecimal<12, 2>; +type Dec10_0 = SqlDecimal<10, 0>; +type Dec18_4 = SqlDecimal<18, 4>; + +feldera_macros::declare_tuple! { + Tup65< + T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, + T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, + T20, T21, T22, T23, T24, T25, T26, T27, T28, T29, + T30, T31, T32, T33, T34, T35, T36, T37, T38, T39, + T40, T41, T42, T43, T44, T45, T46, T47, T48, T49, + T50, T51, T52, T53, T54, T55, T56, T57, T58, T59, + T60, T61, T62, T63, T64 + > +} + +pub type GoldenRow = Tup65< + u64, + bool, + i8, + i16, + i32, + i64, + i128, + u8, + u16, + u32, + u128, + isize, + usize, + F32, + F64, + char, + String, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt, + SqlString, + ByteArray, + GeoPoint, + ShortInterval, + LongInterval, + Timestamp, + Date, + Time, + Uuid, + Variant, + Dec12_2, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt, + Opt