-
Notifications
You must be signed in to change notification settings - Fork 92
py: support arrow_ipc format for adhoc queries #4226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
* also support case sensitive view names to listen to pipelines Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
python/feldera/pipeline.py
Outdated
|
|
||
| return self.client.query_as_pyarrow(self.name, query) | ||
|
|
||
| def query_pylist(self, query: str) -> List[Mapping[str, Any]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like given this return type suffers from the same problem as the json (aliased columns will be removed?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it does. It doesn't suffer from it if we return pyarrow.Table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok probably not a good return type then, can we remove it?
Its stupid but unless we change our SQL to not accept this nonsense I dont see a good alternative cc @mihaibudiu
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe instead if str as key you can have a Column Type that hashes using unique identifiers/position in the select stmt but displays as the column name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be a list, not a mapping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is mostly for convenience and works in most cases.
Should I remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to remove but either use a key that has a unique hash like the position or list of list
Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
mihaibudiu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have any other comments
In general, the view should be treated as a list of lists instead of as a list of maps
|
Is this ready to merge? |
* instead of a `List[dict]` it now returns a list of rows, where each row is a list of (column_name, value) tuples. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
|
@gz There is a problem with the arrow_ipc format, it seems to fail non deterministically. Failure: ❯ fda query --host http://localhost:8080 test_adhoc_query_pyarrow_same_alias_name 'SELECT T.x, S.x FROM T, S WHERE T.y = S.y' --format arrow_ipc
thread 'main' panicked at crates/fda/src/adhoc.rs:208:59:
called `Result::unwrap()` on an `Err` value: ParseError("Unable to get root as message: RangeOutOfBounds { range: 1703948..1703952, error_trace: ErrorTrace([]) }")
note: run with `RUST_BACKTRACE=1` environment variable to display a backtraceSuccess: ❯ fda query --host http://localhost:8080 test_adhoc_query_pyarrow_same_alias_name 'SELECT T.x, S.x FROM T, S WHERE T.y = S.y' --format arrow_ipc
+---+---+
| x | x |
+---+---+
| 4 | 6 |
| 1 | 5 |
+---+---+Similarly the python test Success: ❯ pytest . -k "test_adhoc_query_pylist_same_alias" -s
=========================================================================== test session starts ===========================================================================
platform linux -- Python 3.12.10, pytest-8.3.5, pluggy-1.5.0
rootdir: /home/abhizer/Development/feldera.git/py_arrowrpc/python
configfile: pyproject.toml
plugins: timeout-2.3.1
collected 51 items / 50 deselected / 1 selected
tests/test_pipeline_builder.py .
==================================================================== 1 passed, 50 deselected in 1.35s =====================================================================Failure: ❯ pytest . -k "test_adhoc_query_pylist_same_alias" -s
=========================================================================== test session starts ===========================================================================
platform linux -- Python 3.12.10, pytest-8.3.5, pluggy-1.5.0
rootdir: /home/abhizer/Development/feldera.git/py_arrowrpc/python
configfile: pyproject.toml
plugins: timeout-2.3.1
collected 51 items / 50 deselected / 1 selected
tests/test_pipeline_builder.py F
================================================================================ FAILURES =================================================================================
_______________________________________________________ TestPipelineBuilder.test_adhoc_query_pylist_same_alias_name _______________________________________________________
self = <tests.test_pipeline_builder.TestPipelineBuilder testMethod=test_adhoc_query_pylist_same_alias_name>
def test_adhoc_query_pylist_same_alias_name(self):
dataT = [{"x": 1, "y": 2}, {"x": 4, "y": 3}]
dataS = [{"x": 5, "y": 2}, {"x": 6, "y": 3}]
name = "test_adhoc_query_pyarrow_same_alias_name"
sql = """
CREATE TABLE T(x INT, y INT) with ('materialized' = 'true');
CREATE TABLE S(x INT, y INT) with ('materialized' = 'true');
"""
pipeline = PipelineBuilder(TEST_CLIENT, name, sql).create_or_replace()
pipeline.start()
pipeline.input_json("T", dataT)
pipeline.input_json("S", dataS)
> resp = pipeline.query_pylist("SELECT T.x, S.x FROM T, S WHERE T.y = S.y")
tests/test_pipeline_builder.py:1268:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
feldera/pipeline.py:753: in query_pylist
table = self.query_pyarrow(query)
feldera/pipeline.py:731: in query_pyarrow
return self.client.query_as_pyarrow(self.name, query)
feldera/rest/feldera_client.py:710: in query_as_pyarrow
with pyarrow.ipc.RecordBatchStreamReader(resp.raw) as reader:
.venv/lib/python3.12/site-packages/pyarrow/ipc.py:52: in __init__
self._open(source, options=options, memory_pool=memory_pool)
pyarrow/ipc.pxi:1006: in pyarrow.lib._RecordBatchStreamReader._open
???
pyarrow/error.pxi:155: in pyarrow.lib.pyarrow_internal_check_status
???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> ???
E OSError: Invalid flatbuffers message.
pyarrow/error.pxi:92: OSError
========================================================================= short test summary info =========================================================================
FAILED tests/test_pipeline_builder.py::TestPipelineBuilder::test_adhoc_query_pylist_same_alias_name - OSError: Invalid flatbuffers message.
==================================================================== 1 failed, 50 deselected in 1.33s =====================================================================Sometimes, it also fails with:
|
Signed-off-by: feldera-bot <feldera-bot@users.noreply.github.com>
|
can you reprodcue it reliably if you save the bytestream of a failed invocation in a file and try to parse it with e.g., pyarrow or rust arrow crates? also can you enable |
|
The same issue, along with the pattern of the error described: #4287 |
|
Can we make progress with this issue? |
|
We need either a backend fix for transport over HTTP, or python SDK and WebConsole to switch to websockets |
|
websockets sounds excessive. what kind of fix is needed for transport? |
|
There is a weird issue with serialization described here, idk if Gerd investigated it yet |
Fixes: #3923