Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import contextlib
from dataclasses import asdict
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import (
Any,
Expand Down Expand Up @@ -46,7 +46,7 @@
from feast.repo_config import RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.type_map import pg_type_code_to_arrow
from feast.utils import compute_non_entity_date_range
from feast.utils import make_tzaware

from .postgres_source import PostgreSQLSource

Expand Down Expand Up @@ -134,24 +134,55 @@ def get_historical_features(

# Handle non-entity retrieval mode
if entity_df is None:
start_date, end_date = compute_non_entity_date_range(
feature_views,
start_date=start_date,
end_date=end_date,
# Default to current time if end_date not provided
if end_date is None:
end_date = datetime.now(tz=timezone.utc)
else:
end_date = make_tzaware(end_date)
# Find the maximum TTL across all feature views to ensure we capture enough data
max_ttl_seconds = max(
(
int(fv.ttl.total_seconds())
for fv in feature_views
if fv.ttl and isinstance(fv.ttl, timedelta)
),
default=0,
)

# Calculate start_date from TTL if not provided
if start_date is None:
if max_ttl_seconds > 0:
# Start from (end_date - max_ttl) to ensure we capture all relevant features
start_date = end_date - timedelta(seconds=max_ttl_seconds)
else:
# If no TTL is set, default to 30 days before end_date
start_date = end_date - timedelta(days=30)
else:
start_date = make_tzaware(start_date)

# Compute lookback_start_date for LOCF: pull feature data
# from (start_date - max_ttl) so window functions can
# forward-fill the last observation before start_date.
lookback_start_date: Optional[datetime] = (
start_date - timedelta(seconds=max_ttl_seconds)
if max_ttl_seconds > 0
else start_date
)
Comment on lines +137 to 170
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it;s still betetr to use compute_non_entity_date_range helper, you can do:

if entity_df is None:
    start_date, end_date = compute_non_entity_date_range(
        feature_views, start_date=start_date, end_date=end_date,
    )
    # Only extra piece needed for LOCF: lookback window
    max_ttl_seconds = max(
        (int(fv.ttl.total_seconds()) for fv in feature_views
         if fv.ttl and isinstance(fv.ttl, timedelta)),
        default=0,
    )
    lookback_start_date = (
        start_date - timedelta(seconds=max_ttl_seconds)
        if max_ttl_seconds > 0 else start_date
    )
    entity_df = pd.DataFrame({"event_timestamp": [end_date]})
    skip_entity_upload = True


# Single row with end_date
entity_df = pd.DataFrame({"event_timestamp": [end_date]})
skip_entity_upload = True
else:
lookback_start_date = None
skip_entity_upload = False

entity_schema = _get_entity_schema(entity_df, config)

entity_df_event_timestamp_col = (
offline_utils.infer_event_timestamp_from_entity_df(entity_schema)
)

# In non-entity mode, use the actual requested range so that
# min_event_timestamp (= range[0] - TTL) doesn't clip the window.
# The synthetic entity_df only has end_date, which would wrongly
# set min_event_timestamp to end_date - TTL instead of start_date - TTL.
if start_date is not None and end_date is not None:
if skip_entity_upload and start_date is not None and end_date is not None:
entity_df_event_timestamp_range = (start_date, end_date)
else:
entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range(
Expand All @@ -170,7 +201,12 @@ def query_generator() -> Iterator[str]:
and config.offline_store.entity_select_mode
== EntitySelectMode.embed_query
)
if use_cte:
if skip_entity_upload:
# LOCF path never uses left_table
left_table_query_string = (
"(SELECT NULL::timestamptz AS event_timestamp LIMIT 0)"
)
elif use_cte:
left_table_query_string = entity_df
else:
left_table_query_string = table_name
Expand Down Expand Up @@ -211,11 +247,13 @@ def query_generator() -> Iterator[str]:
use_cte=use_cte,
start_date=start_date,
end_date=end_date,
lookback_start_date=lookback_start_date,
)
finally:
# Only cleanup if we created a table
# Only cleanup if we created a table (not when skip_entity_upload)
if (
config.offline_store.entity_select_mode
not skip_entity_upload
and config.offline_store.entity_select_mode
== EntitySelectMode.temp_table
):
with _get_conn(config.offline_store) as conn, conn.cursor() as cur:
Expand Down Expand Up @@ -425,6 +463,7 @@ def build_point_in_time_query(
use_cte: bool = False,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
lookback_start_date: Optional[datetime] = None,
) -> str:
"""Build point-in-time query between each feature view table and the entity dataframe for PostgreSQL"""
template = Environment(loader=BaseLoader()).from_string(source=query_template)
Expand Down Expand Up @@ -455,6 +494,7 @@ def build_point_in_time_query(
"use_cte": use_cte,
"start_date": start_date,
"end_date": end_date,
"lookback_start_date": lookback_start_date,
}

query = template.render(template_context)
Expand Down Expand Up @@ -517,89 +557,173 @@ def _get_entity_schema(
AND "{{ featureviews[0].timestamp_field }}" >= '{{ featureviews[0].min_event_timestamp }}'
{% endif %}
{% else %}
{# --- LOCF (Last Observation Carried Forward) path for multi-FV date-range --- #}

{# Collect deduplicated entity list across all FVs #}
{% set all_entities = [] %}
{% for featureview in featureviews %}
{% for entity in featureview.entities %}
{% if entity not in all_entities %}
{% set _ = all_entities.append(entity) %}
{% endif %}
{% endfor %}
{% endfor %}

WITH
{# --- Per-FV __data: pull feature rows from lookback_start_date..end_date --- #}
{% for featureview in featureviews %}
"{{ featureview.name }}__data" AS (
"{{ featureview.name }}__data_raw" AS (
SELECT
"{{ featureview.timestamp_field }}" AS event_timestamp,
{% if featureview.created_timestamp_column %}
"{{ featureview.created_timestamp_column }}" AS created_timestamp,
{% endif %}
{% for entity in featureview.entities %}
"{{ entity }}",
{% endfor %}
{% for feature in featureview.features %}
"{{ feature }}" AS {% if full_feature_names %}"{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}"{% else %}"{{ featureview.field_mapping.get(feature, feature) }}"{% endif %}{% if not loop.last %},{% endif %}
"{{ feature }}" AS "{% if full_feature_names %}{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}"{% if not loop.last %},{% endif %}
{% endfor %}
{% if featureview.created_timestamp_column %}
, "{{ featureview.created_timestamp_column }}" AS created_timestamp
{% endif %}
FROM {{ featureview.table_subquery }} AS sub
WHERE "{{ featureview.timestamp_field }}" BETWEEN '{{ start_date }}' AND '{{ end_date }}'
{% if featureview.ttl != 0 and featureview.min_event_timestamp %}
AND "{{ featureview.timestamp_field }}" >= '{{ featureview.min_event_timestamp }}'
{% endif %}
WHERE "{{ featureview.timestamp_field }}" BETWEEN '{{ lookback_start_date or start_date }}' AND '{{ end_date }}'
),
{% if featureview.created_timestamp_column %}
"{{ featureview.name }}__data" AS (
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: When there's no created_timestamp_column, this CTE is a no-op passthrough (SELECT * FROM __data_raw). Postgres will likely optimize it away, but you could simplify by using __data_raw as the alias directly:

{% if featureview.created_timestamp_column %}
"{{ featureview.name }}__data" AS (
    SELECT * FROM (
        ... dedup logic ...
    ) __dedup WHERE __rn = 1
),
{% endif %}

Then reference {{ featureview.name }}__data{% if not featureview.created_timestamp_column %}_raw{% endif %} downstream. Though I understand the current approach is simpler to maintain.

SELECT * FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY {% for entity in featureview.entities %}"{{ entity }}", {% endfor %}event_timestamp
ORDER BY created_timestamp DESC
) AS __rn
FROM "{{ featureview.name }}__data_raw"
) __dedup WHERE __rn = 1
),
{% endif %}
{% endfor %}

-- Create a base query with all unique entity + timestamp combinations
base_entities AS (
{# --- Spine: prediction timeline = distinct (entity, timestamp) in [start_date, end_date] --- #}
{# Each UNION branch selects all_entities so column count/order match; NULL for entities not in this FV. #}
spine AS (
{% for featureview in featureviews %}
SELECT DISTINCT
event_timestamp,
{% for entity in featureview.entities %}
"{{ entity }}"{% if not loop.last %},{% endif %}
{% endfor %}
FROM "{{ featureview.name }}__data"
d.event_timestamp{% for entity in all_entities %},
{% if entity in featureview.entities %}d."{{ entity }}"{% else %}NULL AS "{{ entity }}"{% endif %}{% endfor %}
FROM "{{ featureview.name }}__data{% if not featureview.created_timestamp_column %}_raw{% endif %}" d
WHERE d.event_timestamp BETWEEN '{{ start_date }}' AND '{{ end_date }}'
{% if not loop.last %}
UNION
{% endif %}
{% endfor %}
)
),

SELECT
base.event_timestamp,
{% set all_entities = [] %}
{% for featureview in featureviews %}
{% for entity in featureview.entities %}
{% if entity not in all_entities %}
{% set _ = all_entities.append(entity) %}
{# --- Per-FV independent LOCF forward-fill pipelines --- #}
{# Each FV is stacked, grouped, and filled independently to prevent #}
{# cross-FV interference when multiple FVs share overlapping timestamps. #}
{% for featureview in featureviews %}
"{{ featureview.name }}__stacked" AS (
SELECT
s.event_timestamp{% for entity in all_entities %},
s."{{ entity }}"{% endfor %},
1 AS is_spine,
NULL::int AS feature_anchor
{% for feature in featureview.features %}
{% if full_feature_names %}
{% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %}
{% else %}
{% set col_name = featureview.field_mapping.get(feature, feature) %}
{% endif %}
, NULL AS "{{ col_name }}"
{% endfor %}
{% endfor %}
{% for entity in all_entities %}
base."{{ entity }}",
{% endfor %}
, NULL::timestamptz AS "{{ featureview.name }}__feature_ts"
FROM spine s

UNION ALL

SELECT
d.event_timestamp{% for entity in all_entities %},
{% if entity in featureview.entities %}d."{{ entity }}"{% else %}NULL AS "{{ entity }}"{% endif %}{% endfor %},
0 AS is_spine,
1 AS feature_anchor
{% for feature in featureview.features %}
{% if full_feature_names %}
{% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %}
{% else %}
{% set col_name = featureview.field_mapping.get(feature, feature) %}
{% endif %}
, d."{{ col_name }}"
{% endfor %}
, d.event_timestamp AS "{{ featureview.name }}__feature_ts"
FROM "{{ featureview.name }}__data{% if not featureview.created_timestamp_column %}_raw{% endif %}" d
),

"{{ featureview.name }}__grouped" AS (
SELECT *,
COUNT(feature_anchor) OVER (
PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}{% else %}(SELECT NULL){% endif %}
ORDER BY event_timestamp ASC, is_spine ASC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS value_group_id
FROM "{{ featureview.name }}__stacked"
),

"{{ featureview.name }}__filled" AS (
SELECT
event_timestamp{% for entity in all_entities %},
"{{ entity }}"{% endfor %},
is_spine
{% for feature in featureview.features %}
{% if full_feature_names %}
{% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %}
{% else %}
{% set col_name = featureview.field_mapping.get(feature, feature) %}
{% endif %}
, FIRST_VALUE("{{ col_name }}") OVER (
PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, {% endif %}value_group_id
ORDER BY event_timestamp ASC, is_spine ASC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS "{{ col_name }}"
{% endfor %}
, FIRST_VALUE("{{ featureview.name }}__feature_ts") OVER (
PARTITION BY {% if featureview.entities %}{% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, {% endif %}value_group_id
ORDER BY event_timestamp ASC, is_spine ASC
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS "{{ featureview.name }}__filled_ts"
FROM "{{ featureview.name }}__grouped"
){% if not loop.last %},{% endif %}
{% endfor %}

{# --- Final: join per-FV filled results back onto spine --- #}
SELECT
spine.event_timestamp{% for entity in all_entities %},
spine."{{ entity }}"{% endfor %},
{% set total_features = featureviews|map(attribute='features')|map('length')|sum %}
{% set feature_counter = namespace(count=0) %}
{% set feat_idx = namespace(count=0) %}
{% for featureview in featureviews %}
{% set outer_loop_index = loop.index0 %}
{% for feature in featureview.features %}
{% set feature_counter.count = feature_counter.count + 1 %}
fv_{{ outer_loop_index }}."{% if full_feature_names %}{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}"{% if feature_counter.count < total_features %},{% endif %}
{% set feat_idx.count = feat_idx.count + 1 %}
{% if full_feature_names %}
{% set col_name = featureview.name ~ '__' ~ featureview.field_mapping.get(feature, feature) %}
{% else %}
{% set col_name = featureview.field_mapping.get(feature, feature) %}
{% endif %}
{% if featureview.ttl != 0 %}
CASE WHEN (spine.event_timestamp - "{{ featureview.name }}__f"."{{ featureview.name }}__filled_ts") <= make_interval(secs => {{ featureview.ttl }})
THEN "{{ featureview.name }}__f"."{{ col_name }}" ELSE NULL END AS "{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %}
{% else %}
"{{ featureview.name }}__f"."{{ col_name }}"{% if feat_idx.count < total_features %},{% endif %}
{% endif %}
{% endfor %}
{% endfor %}
FROM base_entities base
FROM spine
{% for featureview in featureviews %}
{% set outer_loop_index = loop.index0 %}
LEFT JOIN LATERAL (
SELECT DISTINCT ON ({% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %})
event_timestamp,
{% for entity in featureview.entities %}
"{{ entity }}",
{% endfor %}
{% for feature in featureview.features %}
"{% if full_feature_names %}{{ featureview.name }}__{{ featureview.field_mapping.get(feature, feature) }}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}"{% if not loop.last %},{% endif %}
{% endfor %}
FROM "{{ featureview.name }}__data" fv_sub_{{ outer_loop_index }}
WHERE fv_sub_{{ outer_loop_index }}.event_timestamp <= base.event_timestamp
{% if featureview.ttl != 0 %}
AND fv_sub_{{ outer_loop_index }}.event_timestamp >= base.event_timestamp - {{ featureview.ttl }} * interval '1' second
{% endif %}
{% for entity in featureview.entities %}
AND fv_sub_{{ outer_loop_index }}."{{ entity }}" = base."{{ entity }}"
LEFT JOIN "{{ featureview.name }}__filled" AS "{{ featureview.name }}__f"
ON spine.event_timestamp = "{{ featureview.name }}__f".event_timestamp
{% for entity in all_entities %}
AND spine."{{ entity }}" IS NOT DISTINCT FROM "{{ featureview.name }}__f"."{{ entity }}"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good use of IS NOT DISTINCT FROM here. This fixes a bug in the old LATERAL join path where NULL = NULL would fail the join for FVs with different entity sets. Worth calling this out in the PR description as a bug fix.

{% endfor %}
ORDER BY {% for entity in featureview.entities %}"{{ entity }}"{% if not loop.last %}, {% endif %}{% endfor %}, event_timestamp DESC
) AS fv_{{ outer_loop_index }} ON true
AND "{{ featureview.name }}__f".is_spine = 1
{% endfor %}
ORDER BY base.event_timestamp
ORDER BY spine.event_timestamp
{% endif %}
{% else %}
WITH
Expand Down Expand Up @@ -679,7 +803,7 @@ def _get_entity_schema(
FROM {{ featureview.table_subquery }} AS sub
WHERE "{{ featureview.timestamp_field }}" <= (SELECT MAX(entity_timestamp) FROM entity_dataframe)
{% if featureview.ttl == 0 %}{% else %}
AND "{{ featureview.timestamp_field }}" >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - {{ featureview.ttl }} * interval '1' second
AND "{{ featureview.timestamp_field }}" >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - make_interval(secs => {{ featureview.ttl }})
{% endif %}
),

Expand All @@ -694,7 +818,7 @@ def _get_entity_schema(
AND subquery.event_timestamp <= entity_dataframe.entity_timestamp

{% if featureview.ttl == 0 %}{% else %}
AND subquery.event_timestamp >= entity_dataframe.entity_timestamp - {{ featureview.ttl }} * interval '1' second
AND subquery.event_timestamp >= entity_dataframe.entity_timestamp - make_interval(secs => {{ featureview.ttl }})
{% endif %}

{% for entity in featureview.entities %}
Expand Down
Loading
Loading