From 5a7350b87eb51c543b97f949d62e6f62c812bbea Mon Sep 17 00:00:00 2001 From: TomSteenbergen Date: Thu, 4 Jul 2024 09:30:46 +0200 Subject: [PATCH 1/4] Remove batching and fix tqdm progress bar Signed-off-by: TomSteenbergen --- .../feast/infra/online_stores/contrib/postgres.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/postgres.py b/sdk/python/feast/infra/online_stores/contrib/postgres.py index 8715f0f65bb..6b1d1f3808b 100644 --- a/sdk/python/feast/infra/online_stores/contrib/postgres.py +++ b/sdk/python/feast/infra/online_stores/contrib/postgres.py @@ -75,7 +75,6 @@ def online_write_batch( Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], progress: Optional[Callable[[int], Any]], - batch_size: int = 5000, ) -> None: # Format insert values insert_values = [] @@ -120,13 +119,13 @@ def online_write_batch( # Push data in batches to online store with self._get_conn(config) as conn, conn.cursor() as cur: - for i in range(0, len(insert_values), batch_size): - cur_batch = insert_values[i : i + batch_size] - cur.executemany(sql_query, cur_batch) - conn.commit() + # XXX: Instead try to loop `execute` commands with prepared statements in a + # pipeline, since `executemany` seems to be slow according to docs. + cur.executemany(sql_query, insert_values) + conn.commit() - if progress: - progress(len(cur_batch)) + if progress: + progress(len(data)) def online_read( self, From 13040cf741f4766e19bd27d656ca91221d171bf8 Mon Sep 17 00:00:00 2001 From: TomSteenbergen Date: Thu, 4 Jul 2024 09:53:21 +0200 Subject: [PATCH 2/4] Comment Signed-off-by: TomSteenbergen --- .../feast/infra/online_stores/contrib/postgres.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/postgres.py b/sdk/python/feast/infra/online_stores/contrib/postgres.py index 6b1d1f3808b..ea0d1be3bf3 100644 --- a/sdk/python/feast/infra/online_stores/contrib/postgres.py +++ b/sdk/python/feast/infra/online_stores/contrib/postgres.py @@ -124,8 +124,18 @@ def online_write_batch( cur.executemany(sql_query, insert_values) conn.commit() - if progress: - progress(len(data)) + # At the moment, looping over many execute statements is still faster than a + # single executemany statement. Use a pipeline and a prepared statement to + # speed things up further. + # with self._get_conn(config) as conn: + # with conn.pipeline(): + # with conn.cursor() as cur: + # for value in insert_values: + # cur.execute(sql_query, value, prepare=True) + # conn.commit() + + if progress: + progress(len(data)) def online_read( self, From b0bb799f71d8f6eaaae1d6fde691f90c80205f7c Mon Sep 17 00:00:00 2001 From: TomSteenbergen Date: Thu, 4 Jul 2024 10:59:44 +0200 Subject: [PATCH 3/4] Remove test changes Signed-off-by: TomSteenbergen --- .../feast/infra/online_stores/contrib/postgres.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/postgres.py b/sdk/python/feast/infra/online_stores/contrib/postgres.py index ea0d1be3bf3..3a251f62448 100644 --- a/sdk/python/feast/infra/online_stores/contrib/postgres.py +++ b/sdk/python/feast/infra/online_stores/contrib/postgres.py @@ -119,21 +119,9 @@ def online_write_batch( # Push data in batches to online store with self._get_conn(config) as conn, conn.cursor() as cur: - # XXX: Instead try to loop `execute` commands with prepared statements in a - # pipeline, since `executemany` seems to be slow according to docs. cur.executemany(sql_query, insert_values) conn.commit() - # At the moment, looping over many execute statements is still faster than a - # single executemany statement. Use a pipeline and a prepared statement to - # speed things up further. - # with self._get_conn(config) as conn: - # with conn.pipeline(): - # with conn.cursor() as cur: - # for value in insert_values: - # cur.execute(sql_query, value, prepare=True) - # conn.commit() - if progress: progress(len(data)) From 24551f4d99f076154c93181305e367aa5d1f752a Mon Sep 17 00:00:00 2001 From: TomSteenbergen Date: Thu, 4 Jul 2024 11:12:21 +0200 Subject: [PATCH 4/4] Update comment Signed-off-by: TomSteenbergen --- sdk/python/feast/infra/online_stores/contrib/postgres.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/online_stores/contrib/postgres.py b/sdk/python/feast/infra/online_stores/contrib/postgres.py index 3a251f62448..330b50bc785 100644 --- a/sdk/python/feast/infra/online_stores/contrib/postgres.py +++ b/sdk/python/feast/infra/online_stores/contrib/postgres.py @@ -117,7 +117,7 @@ def online_write_batch( """ ).format(sql.Identifier(_table_id(config.project, table))) - # Push data in batches to online store + # Push data into the online store with self._get_conn(config) as conn, conn.cursor() as cur: cur.executemany(sql_query, insert_values) conn.commit()