Skip to content

Conversation

@abhizer
Copy link
Contributor

@abhizer abhizer commented Jun 25, 2025

  • also support case sensitive view names to listen to pipelines

Fixes: #3923

* also support case sensitive view names to listen to pipelines

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
@abhizer abhizer requested review from gz and mihaibudiu June 25, 2025 07:52
Signed-off-by: feldera-bot <feldera-bot@feldera.com>

return self.client.query_as_pyarrow(self.name, query)

def query_pylist(self, query: str) -> List[Mapping[str, Any]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like given this return type suffers from the same problem as the json (aliased columns will be removed?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it does. It doesn't suffer from it if we return pyarrow.Table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok probably not a good return type then, can we remove it?

Its stupid but unless we change our SQL to not accept this nonsense I dont see a good alternative cc @mihaibudiu

Copy link
Contributor

@gz gz Jun 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe instead if str as key you can have a Column Type that hashes using unique identifiers/position in the select stmt but displays as the column name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be a list, not a mapping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is mostly for convenience and works in most cases.
Should I remove it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to remove but either use a key that has a unique hash like the position or list of list

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
@abhizer abhizer requested a review from gz June 25, 2025 11:46
Copy link
Contributor

@mihaibudiu mihaibudiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have any other comments
In general, the view should be treated as a list of lists instead of as a list of maps

@mihaibudiu
Copy link
Contributor

Is this ready to merge?
Have @gz's comments been addressed?

* instead of a `List[dict]` it now returns a list of rows,
  where each row is a list of (column_name, value) tuples.

Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
@abhizer
Copy link
Contributor Author

abhizer commented Jul 3, 2025

@gz There is a problem with the arrow_ipc format, it seems to fail non deterministically.

Failure:

❯ fda query --host http://localhost:8080 test_adhoc_query_pyarrow_same_alias_name 'SELECT T.x, S.x FROM T, S WHERE T.y = S.y' --format arrow_ipc

thread 'main' panicked at crates/fda/src/adhoc.rs:208:59:
called `Result::unwrap()` on an `Err` value: ParseError("Unable to get root as message: RangeOutOfBounds { range: 1703948..1703952, error_trace: ErrorTrace([]) }")
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Success:

❯ fda query --host http://localhost:8080 test_adhoc_query_pyarrow_same_alias_name 'SELECT T.x, S.x FROM T, S WHERE T.y = S.y' --format arrow_ipc
+---+---+
| x | x |
+---+---+
| 4 | 6 |
| 1 | 5 |
+---+---+

Similarly the python test test_adhoc_query_pylist_same_alias_name also fails intermittently.

Success:

❯ pytest . -k "test_adhoc_query_pylist_same_alias" -s
=========================================================================== test session starts ===========================================================================
platform linux -- Python 3.12.10, pytest-8.3.5, pluggy-1.5.0
rootdir: /home/abhizer/Development/feldera.git/py_arrowrpc/python
configfile: pyproject.toml
plugins: timeout-2.3.1
collected 51 items / 50 deselected / 1 selected

tests/test_pipeline_builder.py .

==================================================================== 1 passed, 50 deselected in 1.35s =====================================================================

Failure:

❯ pytest . -k "test_adhoc_query_pylist_same_alias" -s
=========================================================================== test session starts ===========================================================================
platform linux -- Python 3.12.10, pytest-8.3.5, pluggy-1.5.0
rootdir: /home/abhizer/Development/feldera.git/py_arrowrpc/python
configfile: pyproject.toml
plugins: timeout-2.3.1
collected 51 items / 50 deselected / 1 selected

tests/test_pipeline_builder.py F

================================================================================ FAILURES =================================================================================
_______________________________________________________ TestPipelineBuilder.test_adhoc_query_pylist_same_alias_name _______________________________________________________

self = <tests.test_pipeline_builder.TestPipelineBuilder testMethod=test_adhoc_query_pylist_same_alias_name>

    def test_adhoc_query_pylist_same_alias_name(self):
        dataT = [{"x": 1, "y": 2}, {"x": 4, "y": 3}]
        dataS = [{"x": 5, "y": 2}, {"x": 6, "y": 3}]
        name = "test_adhoc_query_pyarrow_same_alias_name"

        sql = """
        CREATE TABLE T(x INT, y INT) with ('materialized' = 'true');
        CREATE TABLE S(x INT, y INT) with ('materialized' = 'true');
        """

        pipeline = PipelineBuilder(TEST_CLIENT, name, sql).create_or_replace()

        pipeline.start()

        pipeline.input_json("T", dataT)
        pipeline.input_json("S", dataS)

>       resp = pipeline.query_pylist("SELECT T.x, S.x FROM T, S WHERE T.y = S.y")

tests/test_pipeline_builder.py:1268:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
feldera/pipeline.py:753: in query_pylist
    table = self.query_pyarrow(query)
feldera/pipeline.py:731: in query_pyarrow
    return self.client.query_as_pyarrow(self.name, query)
feldera/rest/feldera_client.py:710: in query_as_pyarrow
    with pyarrow.ipc.RecordBatchStreamReader(resp.raw) as reader:
.venv/lib/python3.12/site-packages/pyarrow/ipc.py:52: in __init__
    self._open(source, options=options, memory_pool=memory_pool)
pyarrow/ipc.pxi:1006: in pyarrow.lib._RecordBatchStreamReader._open
    ???
pyarrow/error.pxi:155: in pyarrow.lib.pyarrow_internal_check_status
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   ???
E   OSError: Invalid flatbuffers message.

pyarrow/error.pxi:92: OSError
========================================================================= short test summary info =========================================================================
FAILED tests/test_pipeline_builder.py::TestPipelineBuilder::test_adhoc_query_pylist_same_alias_name - OSError: Invalid flatbuffers message.
==================================================================== 1 failed, 50 deselected in 1.33s =====================================================================

Sometimes, it also fails with:

  • OSError: Invalid IPC message: negative metadata length
  • Returning an incorrect result

Signed-off-by: feldera-bot <feldera-bot@users.noreply.github.com>
@gz
Copy link
Contributor

gz commented Jul 3, 2025

can you reprodcue it reliably if you save the bytestream of a failed invocation in a file and try to parse it with e.g., pyarrow or rust arrow crates?

also can you enable RUST_BACKTRACE=1 so we see in which crate it fails

@Karakatiza666
Copy link
Contributor

The same issue, along with the pattern of the error described: #4287

@mihaibudiu
Copy link
Contributor

Can we make progress with this issue?
What is the current status?

@Karakatiza666
Copy link
Contributor

We need either a backend fix for transport over HTTP, or python SDK and WebConsole to switch to websockets

@mihaibudiu
Copy link
Contributor

websockets sounds excessive. what kind of fix is needed for transport?

@Karakatiza666
Copy link
Contributor

There is a weird issue with serialization described here, idk if Gerd investigated it yet

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Switch python-sdk use the arrow-ipc format instead of JSON

6 participants