Skip to content

Commit 7131ecc

Browse files
igorbernstein2pongad
authored andcommitted
Bigtable 9b. Implement ReadRows row merging logic. (googleapis#2914)
This PR sets the stage for ReadRows by implementing the row merging in a RowMergingCallable. In a future commit, this callable will be part of a chain that will implement the ReadRowsCallable chain. For now its disconnected. The implementation relies on ReframingResponseObserver to handle flow control and integrates with it by implementing the Reframer interface in RowMerger. The RowMerger, simply glues the Reframer api to the StateMachine. The StateMachine contains all of the logic of merging rows.
1 parent 3fd597b commit 7131ecc

File tree

10 files changed

+2244
-3
lines changed

10 files changed

+2244
-3
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.internal;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.protobuf.ByteString;
20+
import java.util.Comparator;
21+
22+
/** Compares {@link ByteString}s as unsigned byte arrays. */
23+
@InternalApi
24+
public class ByteStringComparator implements Comparator<ByteString> {
25+
public static final ByteStringComparator INSTANCE = new ByteStringComparator();
26+
27+
@Override
28+
public int compare(ByteString o1, ByteString o2) {
29+
int sizeA = o1.size();
30+
int sizeB = o2.size();
31+
int shortestSize = Math.min(sizeA, sizeB);
32+
for (int i = 0; i < shortestSize; i++) {
33+
int byteA = o1.byteAt(i) & 0xff;
34+
int byteB = o2.byteAt(i) & 0xff;
35+
36+
if (byteA != byteB) {
37+
return byteA < byteB ? -1 : 1;
38+
}
39+
}
40+
if (sizeA == sizeB) {
41+
return 0;
42+
}
43+
return sizeA < sizeB ? -1 : 1;
44+
}
45+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.readrows;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.bigtable.v2.ReadRowsResponse;
20+
import com.google.cloud.bigtable.data.v2.wrappers.RowAdapter.RowBuilder;
21+
import com.google.cloud.bigtable.gaxx.reframing.Reframer;
22+
import com.google.common.base.Preconditions;
23+
24+
/**
25+
* An implementation of a {@link Reframer} that feeds the row merging {@link StateMachine}.
26+
*
27+
* <p>{@link com.google.cloud.bigtable.gaxx.reframing.ReframingResponseObserver} pushes {@link
28+
* ReadRowsResponse.CellChunk}s into this class and pops fully merged logical rows. Example usage:
29+
*
30+
* <pre>{@code
31+
* RowMerger<Row> rowMerger = new RowMerger<>(myRowBuilder);
32+
*
33+
* while(responseIterator.hasNext()) {
34+
* ReadRowsResponse response = responseIterator.next();
35+
*
36+
* if (rowMerger.hasFullFrame()) {
37+
* Row row = rowMerger.pop();
38+
* // Do something with row.
39+
* } else {
40+
* rowMerger.push(response);
41+
* }
42+
* }
43+
*
44+
* if (rowMerger.hasPartialFrame()) {
45+
* throw new RuntimeException("Incomplete stream");
46+
* }
47+
*
48+
* }</pre>
49+
*
50+
* <p>This class is considered an internal implementation detail and not meant to be used by
51+
* applications.
52+
*
53+
* <p>Package-private for internal use.
54+
*
55+
* @see com.google.cloud.bigtable.gaxx.reframing.ReframingResponseObserver for more details
56+
*/
57+
@InternalApi
58+
public class RowMerger<RowT> implements Reframer<RowT, ReadRowsResponse> {
59+
private final StateMachine<RowT> stateMachine;
60+
private ReadRowsResponse buffer;
61+
private int nextChunk;
62+
private RowT nextRow;
63+
64+
public RowMerger(RowBuilder<RowT> rowBuilder) {
65+
stateMachine = new StateMachine<>(rowBuilder);
66+
67+
nextChunk = 0;
68+
buffer = ReadRowsResponse.getDefaultInstance();
69+
}
70+
71+
@Override
72+
public void push(ReadRowsResponse response) {
73+
Preconditions.checkState(
74+
buffer.getChunksCount() <= nextChunk, "Previous response not fully consumed");
75+
76+
buffer = response;
77+
nextChunk = 0;
78+
79+
// If the server sends a scan heartbeat, notify the StateMachine. It will generate a synthetic
80+
// row marker. See RowAdapter for more info.
81+
if (!response.getLastScannedRowKey().isEmpty()) {
82+
stateMachine.handleLastScannedRow(response.getLastScannedRowKey());
83+
}
84+
}
85+
86+
@Override
87+
public boolean hasFullFrame() {
88+
// Check if there an assembled row to consume
89+
if (nextRow != null) {
90+
return true;
91+
}
92+
93+
// Otherwise try to assemble a new row (readNextRow will set nextRow)
94+
boolean newRowCompleted = readNextRow();
95+
return newRowCompleted;
96+
}
97+
98+
@Override
99+
public boolean hasPartialFrame() {
100+
// Check if any of the buffers in this class contain data.
101+
// `hasFullFrame()` will check if `nextRow` has a row ready to go or if chunks in `buffer` can
102+
// be used to create a new `nextRow`
103+
if (hasFullFrame()) {
104+
return true;
105+
}
106+
107+
// If an assembled is still not available, then that means `buffer` has been fully consumed.
108+
// The last place to check is the StateMachine buffer, to see if its holding on to an incomplete
109+
// row.
110+
return stateMachine.isRowInProgress();
111+
}
112+
113+
@Override
114+
public RowT pop() {
115+
RowT row = nextRow;
116+
nextRow = null;
117+
return row;
118+
}
119+
120+
private boolean readNextRow() {
121+
// StateMachine might have a complete row already from receiving a scan marker.
122+
if (stateMachine.hasCompleteRow()) {
123+
nextRow = stateMachine.consumeRow();
124+
return true;
125+
}
126+
127+
while (nextChunk < buffer.getChunksCount()) {
128+
stateMachine.handleChunk(buffer.getChunks(nextChunk++));
129+
130+
if (stateMachine.hasCompleteRow()) {
131+
nextRow = stateMachine.consumeRow();
132+
return true;
133+
}
134+
}
135+
return false;
136+
}
137+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2018 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.readrows;
17+
18+
import com.google.api.core.InternalApi;
19+
import com.google.api.gax.rpc.ApiCallContext;
20+
import com.google.api.gax.rpc.ResponseObserver;
21+
import com.google.api.gax.rpc.ServerStreamingCallable;
22+
import com.google.bigtable.v2.ReadRowsRequest;
23+
import com.google.bigtable.v2.ReadRowsResponse;
24+
import com.google.cloud.bigtable.data.v2.wrappers.RowAdapter;
25+
import com.google.cloud.bigtable.data.v2.wrappers.RowAdapter.RowBuilder;
26+
import com.google.cloud.bigtable.gaxx.reframing.ReframingResponseObserver;
27+
28+
/**
29+
* A ServerStreamingCallable that will merge {@link
30+
* com.google.bigtable.v2.ReadRowsResponse.CellChunk}s into logical rows. This class delegates all
31+
* of the work to gax's {@link ReframingResponseObserver} and the logic to {@link RowMerger}.
32+
*
33+
* <p>This class is considered an internal implementation detail and not meant to be used by
34+
* applications.
35+
*/
36+
@InternalApi
37+
public class RowMergingCallable<RowT> extends ServerStreamingCallable<ReadRowsRequest, RowT> {
38+
private final ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> inner;
39+
private final RowAdapter<RowT> rowAdapter;
40+
41+
public RowMergingCallable(
42+
ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> inner,
43+
RowAdapter<RowT> rowAdapter) {
44+
this.inner = inner;
45+
this.rowAdapter = rowAdapter;
46+
}
47+
48+
@Override
49+
public void call(
50+
ReadRowsRequest request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
51+
RowBuilder<RowT> rowBuilder = rowAdapter.createRowBuilder();
52+
RowMerger<RowT> merger = new RowMerger<>(rowBuilder);
53+
ReframingResponseObserver<ReadRowsResponse, RowT> innerObserver =
54+
new ReframingResponseObserver<>(responseObserver, merger);
55+
inner.call(request, innerObserver, context);
56+
}
57+
}

0 commit comments

Comments
 (0)