Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/getting-started/concepts/batch-feature-view.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class BatchFeatureView(FeatureView):
def __init__(
*,
name: str,
source: Union[DataSource, FeatureView, List[FeatureView]],
source: Optional[Union[DataSource, FeatureView, List[FeatureView]]] = None,
sink_source: Optional[DataSource] = None,
schema: Optional[List[Field]] = None,
entities: Optional[List[Entity]] = None,
Expand Down Expand Up @@ -142,6 +142,7 @@ See:
## 🛑 Gotchas

- `sink_source` is **required** when chaining views (i.e., `source` is another FeatureView or list of them).
- `source` is optional; if omitted (`None`), the feature view has no associated batch data source.
- Schema fields must be consistent with `sink_source`, `batch_source.field_mapping` if field mappings exist.
- Aggregation logic must reference columns present in the raw source or transformed inputs.

Expand Down
2 changes: 2 additions & 0 deletions protos/feast/core/FeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ message FeatureViewSpec {
google.protobuf.Duration ttl = 6;

// Batch/Offline DataSource where this view can retrieve offline feature data.
// Optional: if not set, the feature view has no associated batch data source (e.g. purely derived views).
DataSource batch_source = 7;

// Whether these features should be served online or not
// This is also used to determine whether the features should be written to the online store
bool online = 8;

// Streaming DataSource from where this view can consume "online" feature data.
// Optional: only required for streaming feature views.
DataSource stream_source = 9;

// Description of the feature view.
Expand Down
3 changes: 1 addition & 2 deletions sdk/python/feast/base_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ def __init__(
self.created_timestamp = None
self.last_updated_timestamp = None

if source:
self.source = source
self.source = source

@property
@abstractmethod
Expand Down
19 changes: 18 additions & 1 deletion sdk/python/feast/batch_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def __init__(
*,
name: str,
mode: Union[TransformationMode, str] = TransformationMode.PYTHON,
source: Union[DataSource, "BatchFeatureView", List["BatchFeatureView"]],
source: Optional[
Union[DataSource, "BatchFeatureView", List["BatchFeatureView"]]
] = None,
sink_source: Optional[DataSource] = None,
entities: Optional[List[Entity]] = None,
ttl: Optional[timedelta] = None,
Expand Down Expand Up @@ -115,6 +117,21 @@ def __init__(
f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead "
)

if source is None and aggregations:
raise ValueError(
"BatchFeatureView with aggregations requires a source to aggregate from."
)

if (
source is None
and not udf
and not feature_transformation
and not aggregations
):
raise ValueError(
"BatchFeatureView requires at least one of: source, udf, feature_transformation, or aggregations."
)

self.mode = mode
self.udf = udf
self.udf_string = udf_string
Expand Down
16 changes: 14 additions & 2 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,11 +702,21 @@ def _make_inferences(
)

update_data_sources_with_inferred_event_timestamp_col(
[view.batch_source for view in views_to_update], self.config
[
view.batch_source
for view in views_to_update
if view.batch_source is not None
],
self.config,
)

update_data_sources_with_inferred_event_timestamp_col(
[view.batch_source for view in sfvs_to_update], self.config
[
view.batch_source
for view in sfvs_to_update
if view.batch_source is not None
],
self.config,
)

# New feature views may reference previously applied entities.
Expand Down Expand Up @@ -2416,6 +2426,8 @@ def write_to_offline_store(

provider = self._get_provider()
# Get columns of the batch source and the input dataframe.
if feature_view.batch_source is None:
raise ValueError(f"Feature view '{feature_view.name}' has no batch_source.")
column_names_and_types = (
provider.get_table_column_names_and_types_from_data_source(
self.config, feature_view.batch_source
Expand Down
23 changes: 14 additions & 9 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ class FeatureView(BaseFeatureView):
ttl: The amount of time this group of features lives. A ttl of 0 indicates that
this group of features lives forever. Note that large ttl's or a ttl of 0
can result in extremely computationally intensive queries.
batch_source: The batch source of data where this group of features
is stored. This is optional ONLY if a push source is specified as the
stream_source, since push sources contain their own batch sources.
batch_source: Optional batch source of data where this group of features
is stored. If no source is provided, this will be None.
stream_source: The stream source of data where this group of features is stored.
schema: The schema of the feature view, including feature, timestamp, and entity
columns. If not specified, can be inferred from the underlying data source.
Expand All @@ -97,7 +96,7 @@ class FeatureView(BaseFeatureView):
name: str
entities: List[str]
ttl: Optional[timedelta]
batch_source: DataSource
batch_source: Optional[DataSource]
stream_source: Optional[DataSource]
source_views: Optional[List["FeatureView"]]
entity_columns: List[Field]
Expand All @@ -115,7 +114,7 @@ def __init__(
self,
*,
name: str,
source: Union[DataSource, "FeatureView", List["FeatureView"]],
source: Optional[Union[DataSource, "FeatureView", List["FeatureView"]]] = None,
sink_source: Optional[DataSource] = None,
schema: Optional[List[Field]] = None,
entities: Optional[List[Entity]] = None,
Expand All @@ -133,8 +132,9 @@ def __init__(

Args:
name: The unique name of the feature view.
source: The source of data for this group of features. May be a stream source, or a batch source.
If a stream source, the source should contain a batch_source for backfills & batch materialization.
source (optional): The source of data for this group of features. May be a stream source,
a batch source, a FeatureView, or a list of FeatureViews. If None, the feature view
has no associated data source.
schema (optional): The schema of the feature view, including feature, timestamp,
and entity columns.
# TODO: clarify that schema is only useful here...
Expand Down Expand Up @@ -170,7 +170,9 @@ def __init__(
self.data_source: Optional[DataSource] = None
self.source_views: List[FeatureView] = []

if isinstance(source, DataSource):
if source is None:
pass # data_source remains None, source_views remains []
elif isinstance(source, DataSource):
self.data_source = source
elif isinstance(source, FeatureView):
self.source_views = [source]
Expand Down Expand Up @@ -199,11 +201,14 @@ def __init__(
elif self.data_source:
# Batch source definition
self.batch_source = self.data_source
else:
elif self.source_views:
# Derived view source definition
if not sink_source:
raise ValueError("Derived FeatureView must specify `sink_source`.")
self.batch_source = sink_source
else:
# source=None - no batch source
self.batch_source = None

# Initialize features and entity columns.
features: List[Field] = []
Expand Down
14 changes: 6 additions & 8 deletions sdk/python/feast/feature_view_projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,17 @@ def from_proto(proto: FeatureViewProjectionProto) -> "FeatureViewProjection":
@staticmethod
def from_feature_view_definition(feature_view: "FeatureView"):
# TODO need to implement this for StreamFeatureViews
if getattr(feature_view, "batch_source", None):
batch_source = getattr(feature_view, "batch_source", None)
if batch_source:
return FeatureViewProjection(
name=feature_view.name,
name_alias=None,
features=feature_view.features,
desired_features=[],
timestamp_field=feature_view.batch_source.created_timestamp_column
or None,
created_timestamp_column=feature_view.batch_source.created_timestamp_column
or None,
date_partition_column=feature_view.batch_source.date_partition_column
or None,
batch_source=feature_view.batch_source or None,
timestamp_field=batch_source.created_timestamp_column or None,
created_timestamp_column=batch_source.created_timestamp_column or None,
date_partition_column=batch_source.date_partition_column or None,
batch_source=batch_source or None,
)
else:
return FeatureViewProjection(
Expand Down
16 changes: 14 additions & 2 deletions sdk/python/feast/feature_view_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ def resolve_feature_view_source(

if not is_derived_view:
# Regular feature view - use its batch_source directly
if feature_view.batch_source is None:
raise ValueError(f"Feature view '{feature_view.name}' has no batch_source.")
return FeatureViewSourceInfo(
data_source=feature_view.batch_source,
source_type="batch_source",
Expand Down Expand Up @@ -178,8 +180,13 @@ def resolve_feature_view_source(
if hasattr(parent_view, "source_views") and parent_view.source_views:
# Parent is also a derived view - recursively find original source
original_source_view = find_original_source_view(parent_view)
original_batch_source = original_source_view.batch_source
if original_batch_source is None:
raise ValueError(
f"Original source view '{original_source_view.name}' has no batch_source."
)
return FeatureViewSourceInfo(
data_source=original_source_view.batch_source,
data_source=original_batch_source,
source_type="original_source",
has_transformation=view_has_transformation,
transformation_func=transformation_func,
Expand Down Expand Up @@ -229,8 +236,13 @@ def resolve_feature_view_source_with_fallback(
elif hasattr(feature_view, "source_views") and feature_view.source_views:
# Try the original source view as last resort
original_view = find_original_source_view(feature_view)
original_view_batch_source = original_view.batch_source
if original_view_batch_source is None:
raise ValueError(
f"Original source view '{original_view.name}' has no batch_source."
)
return FeatureViewSourceInfo(
data_source=original_view.batch_source,
data_source=original_view_batch_source,
source_type="fallback_original_source",
has_transformation=has_transformation(feature_view),
transformation_func=get_transformation_function(feature_view),
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ def _infer_features_and_entities(
fv, join_keys, run_inference_for_features, config
)

if fv.batch_source is None:
return

entity_columns: List[Field] = fv.entity_columns if fv.entity_columns else []
columns_to_exclude = {
fv.batch_source.timestamp_field,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def _materialize_one(

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
data_source=feature_view.batch_source, # type: ignore[arg-type]
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def _materialize_one(

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
data_source=feature_view.batch_source, # type: ignore[arg-type]
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/compute_engines/ray/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def _materialize_from_offline_store(
# Pull data from offline store
retrieval_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
data_source=feature_view.batch_source, # type: ignore[arg-type]
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,14 @@ def _materialize_one(
timestamp_field,
created_timestamp_column,
) = _get_column_names(feature_view, entities)
assert feature_view.batch_source is not None # guaranteed by _get_column_names

job_id = f"{feature_view.name}-{start_date}-{end_date}"

try:
offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
data_source=feature_view.batch_source, # type: ignore[arg-type]
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
Expand Down Expand Up @@ -341,6 +342,7 @@ def generate_snowflake_materialization_query(
feature_batch: list,
project: str,
) -> str:
assert feature_view.batch_source is not None
if feature_view.batch_source.created_timestamp_column:
fv_created_str = f',"{feature_view.batch_source.created_timestamp_column}"'
else:
Expand Down Expand Up @@ -406,6 +408,7 @@ def materialize_to_snowflake_online_store(
project: str,
) -> None:
assert_snowflake_feature_names(feature_view)
assert feature_view.batch_source is not None

feature_names_str = '", "'.join(
[feature.name for feature in feature_view.features]
Expand Down Expand Up @@ -467,6 +470,7 @@ def materialize_to_external_online_store(
feature_view: Union[StreamFeatureView, FeatureView],
pbar: tqdm,
) -> None:
assert feature_view.batch_source is not None
feature_names = [feature.name for feature in feature_view.features]

with GetSnowflakeConnection(repo_config.batch_engine) as conn:
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/compute_engines/spark/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def _materialize_from_offline_store(
SparkRetrievalJob,
self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
data_source=feature_view.batch_source, # type: ignore[arg-type]
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2168,7 +2168,7 @@ def get_historical_features(

# Build reverse field mapping to get actual source column names
reverse_field_mapping = {}
if fv.batch_source.field_mapping:
if fv.batch_source is not None and fv.batch_source.field_mapping:
reverse_field_mapping = {
v: k for k, v in fv.batch_source.field_mapping.items()
}
Expand Down
5 changes: 4 additions & 1 deletion sdk/python/feast/infra/offline_stores/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,10 @@ def _field_mapping(
full_feature_names: bool,
) -> Tuple[dd.DataFrame, str]:
# Rename columns by the field mapping dictionary if it exists
if feature_view.batch_source.field_mapping:
if (
feature_view.batch_source is not None
and feature_view.batch_source.field_mapping
):
df_to_join = _run_dask_field_mapping(
df_to_join, feature_view.batch_source.field_mapping
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ def _get_offline_store_for_feature_view(
self, feature_view: FeatureView, config: RepoConfig
) -> OfflineStore:
self._initialize_offline_stores(config)
if feature_view.batch_source is None:
raise ValueError(f"Feature view '{feature_view.name}' has no batch_source.")
source_type = feature_view.batch_source.source_type()
store_key = self.get_source_key_from_type(source_type)
if store_key is None:
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/infra/offline_stores/ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ def get_historical_features_ibis(
def read_fv(
feature_view: FeatureView, feature_refs: List[str], full_feature_names: bool
) -> Tuple:
if feature_view.batch_source is None:
raise ValueError(
f"Feature view '{feature_view.name}' has no batch_source and cannot be queried."
)
fv_table: Table = data_source_reader(
feature_view.batch_source, str(config.repo_path)
)
Expand Down Expand Up @@ -335,6 +339,8 @@ def offline_write_batch_ibis(
progress: Optional[Callable[[int], Any]],
data_source_writer: Callable[[pyarrow.Table, DataSource, str], None],
):
if feature_view.batch_source is None:
raise ValueError(f"Feature view '{feature_view.name}' has no batch_source.")
pa_schema, column_names = get_pyarrow_schema_from_batch_source(
config, feature_view.batch_source
)
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ def get_feature_view_query_context(

query_context = []
for feature_view, features in feature_views_to_feature_map.items():
if feature_view.batch_source is None:
raise ValueError(
f"Feature view '{feature_view.name}' has no batch_source and cannot be queried."
)
reverse_field_mapping = {
v: k for k, v in feature_view.batch_source.field_mapping.items()
}
Expand Down
Loading
Loading