diff --git a/sdk/python/feast/cli/cli.py b/sdk/python/feast/cli/cli.py index 712e3905c3b..71cd518dc99 100644 --- a/sdk/python/feast/cli/cli.py +++ b/sdk/python/feast/cli/cli.py @@ -379,6 +379,7 @@ def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List "ikv", "couchbase", "milvus", + "ray", ], case_sensitive=False, ), diff --git a/sdk/python/feast/infra/compute_engines/ray/compute.py b/sdk/python/feast/infra/compute_engines/ray/compute.py index 7bf7e15dfb0..24d98cae7fb 100644 --- a/sdk/python/feast/infra/compute_engines/ray/compute.py +++ b/sdk/python/feast/infra/compute_engines/ray/compute.py @@ -24,6 +24,7 @@ RayDAGRetrievalJob, RayMaterializationJob, ) +from feast.infra.compute_engines.ray.utils import write_to_online_store from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.registry.base_registry import BaseRegistry @@ -203,11 +204,12 @@ def _materialize_from_offline_store( arrow_table = retrieval_job.to_arrow() # Write to online store if enabled - if getattr(feature_view, "online", False): - # TODO: Implement proper online store writing with correct data format conversion - logger.debug( - "Online store writing not implemented yet for Ray compute engine" - ) + write_to_online_store( + arrow_table=arrow_table, + feature_view=feature_view, + online_store=self.online_store, + repo_config=self.repo_config, + ) # Write to offline store if enabled (this handles sink_source automatically for derived views) if getattr(feature_view, "offline", False): diff --git a/sdk/python/feast/infra/compute_engines/ray/nodes.py b/sdk/python/feast/infra/compute_engines/ray/nodes.py index 5a5f04acee3..eaf48847113 100644 --- a/sdk/python/feast/infra/compute_engines/ray/nodes.py +++ b/sdk/python/feast/infra/compute_engines/ray/nodes.py @@ -18,6 +18,10 @@ from feast.infra.compute_engines.dag.node import DAGNode from feast.infra.compute_engines.dag.value import DAGValue from feast.infra.compute_engines.ray.config import RayComputeEngineConfig +from feast.infra.compute_engines.ray.utils import ( + safe_batch_processor, + write_to_online_store, +) from feast.infra.compute_engines.utils import create_offline_store_retrieval_job from feast.infra.ray_shared_utils import ( apply_field_mapping, @@ -149,9 +153,8 @@ def execute(self, context: ExecutionContext) -> DAGValue: feature_df = feature_dataset.to_pandas() feature_ref = ray.put(feature_df) + @safe_batch_processor def join_with_aggregated_features(batch: pd.DataFrame) -> pd.DataFrame: - if batch.empty: - return batch features = ray.get(feature_ref) if join_keys: result = pd.merge( @@ -226,10 +229,9 @@ def execute(self, context: ExecutionContext) -> DAGValue: input_value.assert_format(DAGFormat.RAY) dataset: Dataset = input_value.data + @safe_batch_processor def apply_filters(batch: pd.DataFrame) -> pd.DataFrame: """Apply TTL and custom filters to the batch.""" - if batch.empty: - return batch filtered_batch = batch.copy() @@ -447,11 +449,9 @@ def execute(self, context: ExecutionContext) -> DAGValue: input_value.assert_format(DAGFormat.RAY) dataset: Dataset = input_value.data + @safe_batch_processor def deduplicate_batch(batch: pd.DataFrame) -> pd.DataFrame: """Remove duplicates from the batch.""" - if batch.empty: - return batch - # Get deduplication keys join_keys = self.column_info.join_keys timestamp_col = self.column_info.timestamp_column @@ -518,27 +518,21 @@ def execute(self, context: ExecutionContext) -> DAGValue: elif callable(self.transformation): transformation_serialized = dill.dumps(self.transformation) + @safe_batch_processor def apply_transformation_with_serialized_udf( batch: pd.DataFrame, ) -> pd.DataFrame: """Apply the transformation using pre-serialized UDF.""" - if batch.empty: - return batch - - try: - if transformation_serialized: - transformation_func = dill.loads(transformation_serialized) - transformed_batch = transformation_func(batch) - else: - logger.warning( - "No serialized transformation available, returning original batch" - ) - transformed_batch = batch + if transformation_serialized: + transformation_func = dill.loads(transformation_serialized) + transformed_batch = transformation_func(batch) + else: + logger.warning( + "No serialized transformation available, returning original batch" + ) + transformed_batch = batch - return transformed_batch - except Exception as e: - logger.error(f"Transformation failed: {e}") - return batch + return transformed_batch transformed_dataset = dataset.map_batches( apply_transformation_with_serialized_udf, batch_format="pandas" @@ -645,46 +639,36 @@ def execute(self, context: ExecutionContext) -> DAGValue: feature_view=self.feature_view, repo_config=context.repo_config ) + @safe_batch_processor def write_batch_with_serialized_artifacts(batch: pd.DataFrame) -> pd.DataFrame: """Write each batch using pre-serialized artifacts.""" - if batch.empty: - return batch - - try: - ( - feature_view, - online_store, - offline_store, - repo_config, - ) = serialized_artifacts.unserialize() - - arrow_table = pa.Table.from_pandas(batch) - - # Write to online store if enabled - if getattr(feature_view, "online", False): - # TODO: Implement proper online store writing with correct data format conversion - logger.debug( - "Online store writing not implemented yet for Ray compute engine" - ) - - # Write to offline store if enabled - if getattr(feature_view, "offline", False): - try: - offline_store.offline_write_batch( - config=repo_config, - feature_view=feature_view, - table=arrow_table, - progress=lambda x: None, - ) - except Exception as e: - logger.error(f"Failed to write to offline store: {e}") - raise + ( + feature_view, + online_store, + offline_store, + repo_config, + ) = serialized_artifacts.unserialize() + + arrow_table = pa.Table.from_pandas(batch) + + # Write to online store if enabled + write_to_online_store( + arrow_table=arrow_table, + feature_view=feature_view, + online_store=online_store, + repo_config=repo_config, + ) - return batch + # Write to offline store if enabled + if getattr(feature_view, "offline", False): + offline_store.offline_write_batch( + config=repo_config, + feature_view=feature_view, + table=arrow_table, + progress=lambda x: None, + ) - except Exception as e: - logger.error(f"Write operation failed: {e}") - raise + return batch written_dataset = dataset.map_batches( write_batch_with_serialized_artifacts, batch_format="pandas" diff --git a/sdk/python/feast/infra/compute_engines/ray/utils.py b/sdk/python/feast/infra/compute_engines/ray/utils.py new file mode 100644 index 00000000000..94ebbe2c643 --- /dev/null +++ b/sdk/python/feast/infra/compute_engines/ray/utils.py @@ -0,0 +1,93 @@ +""" +Utility functions for Ray compute engine. +""" + +import logging +from typing import Callable, Dict, Union + +import pandas as pd +import pyarrow as pa + +from feast.batch_feature_view import BatchFeatureView +from feast.feature_view import FeatureView +from feast.infra.online_stores.online_store import OnlineStore +from feast.repo_config import RepoConfig +from feast.stream_feature_view import StreamFeatureView +from feast.utils import _convert_arrow_to_proto +from feast.value_type import ValueType + +logger = logging.getLogger(__name__) + + +def write_to_online_store( + arrow_table: pa.Table, + feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView], + online_store: OnlineStore, + repo_config: RepoConfig, +) -> None: + """ + Writes Arrow table data to the online store. + + Args: + arrow_table: Arrow table containing the data to write + feature_view: Feature view being materialized + online_store: Online store instance + repo_config: Repository configuration + """ + if not getattr(feature_view, "online", False): + return + + try: + join_key_to_value_type: Dict[str, ValueType] = {} + if hasattr(feature_view, "entity_columns") and feature_view.entity_columns: + join_key_to_value_type = { + entity.name: entity.dtype.to_value_type() + for entity in feature_view.entity_columns + } + + rows_to_write = _convert_arrow_to_proto( + arrow_table, feature_view, join_key_to_value_type + ) + + if rows_to_write: + online_store.online_write_batch( + config=repo_config, + table=feature_view, + data=rows_to_write, + progress=lambda x: None, + ) + logger.debug( + f"Successfully wrote {len(rows_to_write)} rows to online store for {feature_view.name}" + ) + else: + logger.warning(f"No rows to write for {feature_view.name}") + + except Exception as e: + logger.error(f"Failed to write to online store for {feature_view.name}: {e}") + + +def safe_batch_processor( + func: Callable[[pd.DataFrame], pd.DataFrame], +) -> Callable[[pd.DataFrame], pd.DataFrame]: + """ + Decorator for batch processing functions that handles empty batches and errors gracefully. + + Args: + func: Function that processes a pandas DataFrame batch + + Returns: + Wrapped function that handles empty batches and exceptions + """ + + def wrapper(batch: pd.DataFrame) -> pd.DataFrame: + # Handle empty batches + if batch.empty: + return batch + + try: + return func(batch) + except Exception as e: + logger.error(f"Batch processing failed in {func.__name__}: {e}") + return batch + + return wrapper diff --git a/sdk/python/feast/templates/ray/README.md b/sdk/python/feast/templates/ray/README.md new file mode 100644 index 00000000000..dd2d8c9f7f1 --- /dev/null +++ b/sdk/python/feast/templates/ray/README.md @@ -0,0 +1,41 @@ +# Feast Ray Template + +This template demonstrates Feast's Ray integration, showcasing both the **Ray Offline Store** and **Ray Compute Engine** capabilities for distributed feature processing. + +## What's Included + +``` +ray_template/ +├── feature_repo/ +│ ├── feature_store.yaml # Ray offline store + compute engine config +│ ├── example_repo.py # Feature definitions with Ray optimizations +│ ├── test_workflow.py # Demo script showing Ray capabilities +│ └── data/ # Sample datasets (generated by bootstrap) +│ ├── driver_stats.parquet +│ └── customer_daily_profile.parquet +└── README.md # This file +``` + + +## Getting Started + +1. **Initialize the template**: + ```bash + feast init -t ray my_ray_project + cd my_ray_project/feature_repo + ``` + +2. **Install Ray dependencies**: + ```bash + pip install feast[ray] + ``` + +3. **Apply feature definitions**: + ```bash + feast apply + ``` + +4. **Run the demo**: + ```bash + python test_workflow.py + ``` diff --git a/sdk/python/feast/templates/ray/__init__.py b/sdk/python/feast/templates/ray/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/feast/templates/ray/bootstrap.py b/sdk/python/feast/templates/ray/bootstrap.py new file mode 100644 index 00000000000..30f5bf7dd00 --- /dev/null +++ b/sdk/python/feast/templates/ray/bootstrap.py @@ -0,0 +1,93 @@ +from feast.file_utils import replace_str_in_file + + +def bootstrap(): + import pathlib + from datetime import datetime, timedelta + + import numpy as np + import pandas as pd + + from feast.driver_test_data import create_driver_hourly_stats_df + + repo_path = pathlib.Path(__file__).parent.absolute() / "feature_repo" + project_name = pathlib.Path(__file__).parent.absolute().name + data_path = repo_path / "data" + data_path.mkdir(exist_ok=True) + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + # Generate driver data using Feast's built-in test data generator + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) + + if driver_df["event_timestamp"].dt.tz is None: + driver_df["event_timestamp"] = driver_df["event_timestamp"].dt.tz_localize( + "UTC" + ) + if "created" in driver_df.columns and driver_df["created"].dt.tz is None: + driver_df["created"] = driver_df["created"].dt.tz_localize("UTC") + + driver_stats_path = data_path / "driver_stats.parquet" + driver_df.to_parquet(path=str(driver_stats_path), allow_truncated_timestamps=True) + + # Generate customer data to demonstrate Ray's multi-source capabilities + customer_entities = [2001, 2002, 2003, 2004, 2005] + + # Create customer daily profile data + customer_data = [] + for customer_id in customer_entities: + for i, single_date in enumerate( + pd.date_range(start_date, end_date, freq="D", tz="UTC") + ): + stable_timestamp = single_date.replace( + hour=12, minute=0, second=0, microsecond=0 + ) + customer_data.append( + { + "customer_id": customer_id, + "event_timestamp": stable_timestamp, + "created": stable_timestamp + timedelta(minutes=10), + "current_balance": np.random.uniform(10.0, 1000.0), + "avg_passenger_count": np.random.uniform(1.0, 4.0), + "lifetime_trip_count": np.random.randint(50, 500), + } + ) + + customer_df = pd.DataFrame(customer_data) + + if customer_df["event_timestamp"].dt.tz is None: + customer_df["event_timestamp"] = customer_df["event_timestamp"].dt.tz_localize( + "UTC" + ) + if customer_df["created"].dt.tz is None: + customer_df["created"] = customer_df["created"].dt.tz_localize("UTC") + + customer_profile_path = data_path / "customer_daily_profile.parquet" + customer_df.to_parquet( + path=str(customer_profile_path), allow_truncated_timestamps=True + ) + + # Update the example_repo.py file with actual paths + example_py_file = repo_path / "example_repo.py" + replace_str_in_file(example_py_file, "%PROJECT_NAME%", str(project_name)) + replace_str_in_file( + example_py_file, "%PARQUET_PATH%", str(driver_stats_path.relative_to(repo_path)) + ) + replace_str_in_file( + example_py_file, "%LOGGING_PATH%", str(data_path.relative_to(repo_path)) + ) + + print("Ray template initialized with sample data:") + print(f" - Driver stats: {driver_stats_path}") + print(f" - Customer profiles: {customer_profile_path}") + print(f" - Ray storage will be created at: {data_path / 'ray_storage'}") + print("\nTo get started:") + print(f" 1. cd {project_name}/feature_repo") + print(" 2. feast apply") + print(" 3. python test_workflow.py") + + +if __name__ == "__main__": + bootstrap() diff --git a/sdk/python/feast/templates/ray/feature_repo/__init__.py b/sdk/python/feast/templates/ray/feature_repo/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/feast/templates/ray/feature_repo/example_repo.py b/sdk/python/feast/templates/ray/feature_repo/example_repo.py new file mode 100644 index 00000000000..0b2df66de34 --- /dev/null +++ b/sdk/python/feast/templates/ray/feature_repo/example_repo.py @@ -0,0 +1,134 @@ +# # # # # # # # # # # # # # # # # # # # # # # # +# This is an example feature definition file # +# showcasing Ray offline store and compute # +# engine capabilities # +# # # # # # # # # # # # # # # # # # # # # # # # + +from datetime import timedelta +from pathlib import Path + +from feast import Entity, FeatureService, FeatureView, Field, ValueType +from feast.infra.offline_stores.file_source import FileSource +from feast.on_demand_feature_view import on_demand_feature_view +from feast.types import Float32, Float64, Int64 + +# Constants related to the generated data sets +CURRENT_DIR = Path(__file__).parent + +# Entity definitions +driver = Entity( + name="driver", + description="driver id", + value_type=ValueType.INT64, + join_keys=["driver_id"], +) + +customer = Entity( + name="customer", + description="customer id", + value_type=ValueType.INT64, + join_keys=["customer_id"], +) + +# Data sources - Ray offline store works with FileSource +# These will be processed by Ray for efficient distributed data access +driver_hourly_stats = FileSource( + name="driver_hourly_stats", + path=f"{CURRENT_DIR}/%PARQUET_PATH%", + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +customer_daily_profile = FileSource( + name="customer_daily_profile", + path=f"{CURRENT_DIR}/data/customer_daily_profile.parquet", + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +# Feature Views - These leverage Ray compute engine for distributed processing +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=7), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_hourly_stats, + tags={"team": "driver_performance", "processing": "ray"}, +) + +customer_daily_profile_view = FeatureView( + name="customer_daily_profile", + entities=[customer], + ttl=timedelta(days=7), + schema=[ + Field(name="current_balance", dtype=Float32), + Field(name="avg_passenger_count", dtype=Float32), + Field(name="lifetime_trip_count", dtype=Int64), + ], + online=True, + source=customer_daily_profile, + tags={"team": "customer_analytics", "processing": "ray"}, +) + + +# On-demand feature view showcasing Ray compute engine capabilities +# This demonstrates real-time feature transformations using Ray +@on_demand_feature_view( + sources=[driver_hourly_stats_view], + schema=[ + Field(name="conv_rate_plus_acc_rate", dtype=Float64), + Field(name="trips_per_day_normalized", dtype=Float64), + ], +) +def driver_activity_v2(inputs: dict): + """ + On-demand feature transformations processed by Ray compute engine. + These calculations happen in real-time and can leverage Ray's + distributed processing capabilities. + """ + import pandas as pd + + conv_rate = inputs["conv_rate"] + acc_rate = inputs["acc_rate"] + avg_daily_trips = inputs["avg_daily_trips"] + + # Feature engineering using Ray's distributed processing + conv_rate_plus_acc_rate = conv_rate + acc_rate + + # Normalize trips per day (example of more complex transformation) + max_trips = avg_daily_trips.max() if len(avg_daily_trips) > 0 else 1 + trips_per_day_normalized = avg_daily_trips / max_trips + + return pd.DataFrame( + { + "conv_rate_plus_acc_rate": conv_rate_plus_acc_rate, + "trips_per_day_normalized": trips_per_day_normalized, + } + ) + + +# Feature Service - Groups related features for serving +# Ray compute engine optimizes the retrieval of these feature combinations +driver_activity_v1 = FeatureService( + name="driver_activity_v1", + features=[ + driver_hourly_stats_view, + customer_daily_profile_view, + ], + tags={"version": "v1", "compute_engine": "ray"}, +) + +driver_activity_v2_service = FeatureService( + name="driver_activity_v2", + features=[ + driver_hourly_stats_view, + customer_daily_profile_view, + driver_activity_v2, # Includes on-demand transformations + ], + tags={"version": "v2", "compute_engine": "ray", "transformations": "on_demand"}, +) diff --git a/sdk/python/feast/templates/ray/feature_repo/feature_store.yaml b/sdk/python/feast/templates/ray/feature_repo/feature_store.yaml new file mode 100644 index 00000000000..9222f9b135f --- /dev/null +++ b/sdk/python/feast/templates/ray/feature_repo/feature_store.yaml @@ -0,0 +1,30 @@ +project: my_project +registry: data/registry.db +provider: local + +# Ray offline store configuration for data I/O operations +offline_store: + type: ray + storage_path: data/ray_storage # Optional: Path for storing datasets + # Conservative settings for local development and testing + broadcast_join_threshold_mb: 25 # Broadcast join threshold (MB) + max_parallelism_multiplier: 1 # Parallelism as multiple of CPU cores + target_partition_size_mb: 16 # Target partition size (MB) + enable_ray_logging: false # Disable Ray logging for cleaner output + # ray_address: "127.0.0.1:10001" # Ray address + +# Ray compute engine configuration for distributed feature processing +batch_engine: + type: ray.engine + max_workers: 2 # Maximum number of Ray workers (conservative for local) + enable_optimization: true # Enable performance optimizations + broadcast_join_threshold_mb: 50 # Broadcast join threshold (MB) + target_partition_size_mb: 32 # Target partition size (MB) + window_size_for_joins: "1H" # Time window for distributed joins + # ray_address: "127.0.0.1:10001" # Ray address + +# Online store for serving features +online_store: + path: data/online_store.db + +entity_key_serialization_version: 3 diff --git a/sdk/python/feast/templates/ray/feature_repo/test_workflow.py b/sdk/python/feast/templates/ray/feature_repo/test_workflow.py new file mode 100644 index 00000000000..8caa4cb6380 --- /dev/null +++ b/sdk/python/feast/templates/ray/feature_repo/test_workflow.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python3 + +""" +Test workflow for Ray offline store and compute engine template. + +This script demonstrates: +1. Ray offline store for efficient data I/O +2. Ray compute engine for distributed feature processing +3. Historical feature retrieval with point-in-time joins +4. Feature materialization to online store +5. Online feature serving + +Run this after: feast apply +""" + +import sys +from datetime import datetime, timedelta +from pathlib import Path + +import pandas as pd + +# Add the feature repo to the path +repo_path = Path(__file__).parent +sys.path.append(str(repo_path)) + +try: + from feast import FeatureStore +except ImportError: + print("Please install feast: pip install feast[ray]") + sys.exit(1) + + +def run_demo(): + print("=" * 60) + print("🚀 Ray Offline Store & Compute Engine Demo") + print("=" * 60) + + # Initialize the feature store + print("\n1. Initializing Feast with Ray configuration...") + store = FeatureStore(repo_path=".") + + print(f" ✓ Offline store: {store.config.offline_store.type}") + if hasattr(store.config, "batch_engine") and store.config.batch_engine: + print(f" ✓ Compute engine: {store.config.batch_engine.type}") + else: + print(" ⚠ No compute engine configured") + + # Create entity DataFrame for historical features + print("\n2. Creating entity DataFrame for historical feature retrieval...") + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=2) + + entity_df = pd.DataFrame( + { + "driver_id": [1001, 1002, 1003], + "customer_id": [2001, 2002, 2003], + "event_timestamp": [ + pd.Timestamp(end_date - timedelta(hours=24), tz="UTC"), + pd.Timestamp(end_date - timedelta(hours=12), tz="UTC"), + pd.Timestamp(end_date - timedelta(hours=6), tz="UTC"), + ], + } + ) + + print(f" ✓ Created entity DataFrame with {len(entity_df)} rows") + print(f" ✓ Time range: {start_date} to {end_date}") + + # Retrieve historical features using Ray compute engine + print("\n3. Retrieving historical features with Ray compute engine...") + print(" (This demonstrates distributed point-in-time joins)") + + try: + # Get historical features - this uses Ray compute engine for distributed processing + historical_features = store.get_historical_features( + entity_df=entity_df, + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "customer_daily_profile:current_balance", + "customer_daily_profile:avg_passenger_count", + "customer_daily_profile:lifetime_trip_count", + ], + ) + + # Convert to DataFrame - Ray processes this efficiently + historical_df = historical_features.to_df() + print(f" ✓ Retrieved {len(historical_df)} historical feature rows") + print(f" ✓ Features: {list(historical_df.columns)}") + + # Show sample of the data + print("\n Sample historical features:") + print(historical_df.head(3).to_string(index=False)) + + except Exception as e: + print(f" ⚠ Historical features retrieval failed: {e}") + print(" This might be due to missing Ray dependencies or data") + + # Demonstrate on-demand feature transformations + print("\n4. Testing on-demand feature transformations...") + try: + # Get features including on-demand transformations + features_with_odfv = store.get_historical_features( + entity_df=entity_df.head(1), + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "driver_activity_v2:conv_rate_plus_acc_rate", + "driver_activity_v2:trips_per_day_normalized", + ], + ) + + odfv_df = features_with_odfv.to_df() + print(f" ✓ Retrieved {len(odfv_df)} rows with on-demand transformations") + + # Show sample with transformations + print("\n Sample with on-demand features:") + print( + odfv_df[["driver_id", "conv_rate", "acc_rate", "conv_rate_plus_acc_rate"]] + .head(3) + .to_string(index=False) + ) + + except Exception as e: + print(f" ⚠ On-demand features failed: {e}") + + # Materialize features to online store + print("\n5. Materializing features to online store...") + try: + materialize_end = end_date + + print(f" Attempting materialization up to {materialize_end}") + + # Try materialization with Ray compute engine + store.materialize_incremental(end_date=materialize_end) + print(" ✓ Ray compute engine materialization successful!") + + # Test online feature retrieval + print("\n6. Testing online feature serving...") + online_features = store.get_online_features( + features=[ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "customer_daily_profile:current_balance", + ], + entity_rows=[ + {"driver_id": 1001, "customer_id": 2001}, + {"driver_id": 1002, "customer_id": 2002}, + ], + ) + + online_df = online_features.to_df() + print(f" ✓ Retrieved {len(online_df)} online feature rows") + print("\n Sample online features:") + print(online_df.to_string(index=False)) + + except Exception as e: + print(f" ⚠ Materialization/online serving failed: {e}") + + print("\n" + "=" * 60) + print("🎉 Ray Demo Complete!") + print("=" * 60) + + print( + "\nIf you want to explore Feast with your existing ray cluster, you can configure ray_address to feature_store.yaml: \n" + ) + print(""" + offline_store: + ray_address: "127.0.0.1:10001" + batch_engine: + ray_address: "127.0.0.1:10001" + """) + + +if __name__ == "__main__": + run_demo() diff --git a/sdk/python/feast/templates/ray/gitignore b/sdk/python/feast/templates/ray/gitignore new file mode 100644 index 00000000000..a947a09f3a4 --- /dev/null +++ b/sdk/python/feast/templates/ray/gitignore @@ -0,0 +1,139 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Feast specific +data/ +.feast/ +ray_storage/ + +# Ray specific +/tmp/ray/ +ray_results/ +.ray/