diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/InsertLimiters.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/InsertLimiters.java index 52c863db38..445a6dd6a5 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/InsertLimiters.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/InsertLimiters.java @@ -108,6 +108,7 @@ import org.dbsp.util.Linq; import org.dbsp.util.Logger; import org.dbsp.util.NullableFunction; +import org.dbsp.util.NullablePredicate; import org.dbsp.util.Utilities; import javax.annotation.Nullable; @@ -159,10 +160,13 @@ public class InsertLimiters extends CircuitCloneVisitor { final List errorStreams; /** These operators use the error view as a source */ final Set reachableFromError; + /** For each output port whether it represents an append-only stream. */ + final NullablePredicate appendOnly; public InsertLimiters(DBSPCompiler compiler, DBSPCircuit expandedCircuit, Monotonicity.MonotonicityInformation expansionMonotoneValues, + NullablePredicate appendOnly, Map expandedInto, NullableFunction joinInformation, Set reachableFromError) { @@ -174,6 +178,7 @@ public InsertLimiters(DBSPCompiler compiler, this.bound = new HashMap<>(); this.errorStreams = new ArrayList<>(); this.reachableFromError = reachableFromError; + this.appendOnly = appendOnly; } void markBound(OutputPort operator, OutputPort bound) { @@ -1050,9 +1055,12 @@ public void postorder(DBSPAsofJoinOperator join) { join.getRelNode(), this.mapped(join.left()), leftDataProjection, this.createDelay(minOperator)); this.addOperator(retainLeft); - DBSPSimpleOperator retainRight = DBSPIntegrateTraceRetainValuesLastNOperator.create( - join.getRelNode(), this.mapped(join.right()), rightDataProjection, this.createDelay(minOperator), 1); - this.addOperator(retainRight); + Boolean rightIsAppendOnly = this.appendOnly.test(join.right()); + if (rightIsAppendOnly != null && rightIsAppendOnly) { + DBSPSimpleOperator retainRight = DBSPIntegrateTraceRetainValuesLastNOperator.create( + join.getRelNode(), this.mapped(join.right()), rightDataProjection, this.createDelay(minOperator), 1); + this.addOperator(retainRight); + } } super.postorder(join); diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/MonotoneAnalyzer.java b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/MonotoneAnalyzer.java index e7f79532ec..5f85f9b3bd 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/MonotoneAnalyzer.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/main/java/org/dbsp/sqlCompiler/compiler/visitors/outer/monotonicity/MonotoneAnalyzer.java @@ -155,8 +155,8 @@ public String toString() { stream -> new MonotoneDot(compiler, stream, details, monotonicity.info)); InsertLimiters limiters = new InsertLimiters( - this.compiler, expanded, monotonicity.info, expander.expansion, - keyPropagation.joins::get, reachableFromError); + this.compiler, expanded, monotonicity.info, appendOnly.appendOnly::contains, + expander.expansion, keyPropagation.joins::get, reachableFromError); // Notice that we apply the limiters to the original circuit, not to the expanded circuit! DBSPCircuit result = limiters.apply(circuit); diff --git a/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/IncrementalRegressionTests.java b/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/IncrementalRegressionTests.java index ded97135b6..110be66b14 100644 --- a/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/IncrementalRegressionTests.java +++ b/sql-to-dbsp-compiler/SQL-compiler/src/test/java/org/dbsp/sqlCompiler/compiler/sql/simple/IncrementalRegressionTests.java @@ -277,7 +277,7 @@ public void issue2530() { create table r( id BIGINT NOT NULL, ts timestamp NOT NULL LATENESS INTERVAL 0 days - ); + ) WITH ('append_only' = 'true'); create table l ( id BIGINT NOT NULL,