From 32c1976ad4281ecc4066307507fb59b6c533edc3 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 27 Mar 2026 16:59:12 -0400 Subject: [PATCH 1/3] Add Chronon online and offline store integrations --- .../pr_chronon_integration_tests.yml | 113 ++++++ .../chronon/start-local-chronon-service.sh | 146 +++++++ .../chronon/stop-local-chronon-service.sh | 21 + sdk/python/feast/__init__.py | 4 + sdk/python/feast/infra/chronon_provider.py | 5 + .../contrib/chronon_offline_store/__init__.py | 1 + .../contrib/chronon_offline_store/chronon.py | 363 ++++++++++++++++++ .../chronon_offline_store/chronon_source.py | 161 ++++++++ .../chronon_online_store/__init__.py | 1 + .../chronon_online_store/chronon.py | 154 ++++++++ sdk/python/feast/infra/provider.py | 1 + sdk/python/feast/repo_config.py | 2 + .../test_chronon_offline_store.py | 115 ++++++ .../online_store/test_chronon_online_store.py | 100 +++++ .../test_chronon_online_store_real_service.py | 67 ++++ .../test_chronon_source.py | 34 ++ .../test_chronon_online_store.py | 107 ++++++ 17 files changed, 1395 insertions(+) create mode 100644 .github/workflows/pr_chronon_integration_tests.yml create mode 100755 infra/scripts/chronon/start-local-chronon-service.sh create mode 100755 infra/scripts/chronon/stop-local-chronon-service.sh create mode 100644 sdk/python/feast/infra/chronon_provider.py create mode 100644 sdk/python/feast/infra/offline_stores/contrib/chronon_offline_store/__init__.py create mode 100644 sdk/python/feast/infra/offline_stores/contrib/chronon_offline_store/chronon.py create mode 100644 sdk/python/feast/infra/offline_stores/contrib/chronon_offline_store/chronon_source.py create mode 100644 sdk/python/feast/infra/online_stores/chronon_online_store/__init__.py create mode 100644 sdk/python/feast/infra/online_stores/chronon_online_store/chronon.py create mode 100644 sdk/python/tests/integration/offline_store/test_chronon_offline_store.py create mode 100644 sdk/python/tests/integration/online_store/test_chronon_online_store.py create mode 100644 sdk/python/tests/integration/online_store/test_chronon_online_store_real_service.py create mode 100644 sdk/python/tests/unit/infra/offline_stores/contrib/chronon_offline_store/test_chronon_source.py create mode 100644 sdk/python/tests/unit/infra/online_stores/chronon_online_store/test_chronon_online_store.py diff --git a/.github/workflows/pr_chronon_integration_tests.yml b/.github/workflows/pr_chronon_integration_tests.yml new file mode 100644 index 00000000000..4a87d978133 --- /dev/null +++ b/.github/workflows/pr_chronon_integration_tests.yml @@ -0,0 +1,113 @@ +name: pr-chronon-integration-tests + +on: + pull_request: + types: + - opened + - synchronize + - labeled + paths: + - "sdk/python/feast/infra/online_stores/chronon_online_store/**" + - "sdk/python/feast/infra/offline_stores/contrib/chronon_offline_store/**" + - "sdk/python/feast/infra/chronon_provider.py" + - "sdk/python/tests/unit/infra/online_stores/chronon_online_store/**" + - "sdk/python/tests/unit/infra/offline_stores/contrib/chronon_offline_store/**" + - "sdk/python/tests/integration/online_store/test_chronon_online_store.py" + - "sdk/python/tests/integration/online_store/test_chronon_online_store_real_service.py" + - "sdk/python/tests/integration/offline_store/test_chronon_offline_store.py" + - "infra/scripts/chronon/**" + - "chronon/**" + - ".github/workflows/pr_chronon_integration_tests.yml" + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +jobs: + chronon-python-tests: + if: + ((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'lgtm' || github.event.label.name == 'ok-to-test')) || + (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) && + github.event.pull_request.base.repo.full_name == 'feast-dev/feast' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + repository: ${{ github.event.repository.full_name }} + ref: ${{ github.ref }} + token: ${{ secrets.GITHUB_TOKEN }} + submodules: recursive + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + architecture: x64 + - name: Install uv + uses: astral-sh/setup-uv@v5 + with: + enable-cache: true + - name: Setup Java + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "8" + - name: Install dependencies + run: make install-python-dependencies-ci + - name: Install Chronon build dependencies + run: | + sudo apt-get update + sudo apt-get install -y \ + apt-transport-https \ + autoconf \ + automake \ + bison \ + build-essential \ + ca-certificates \ + curl \ + flex \ + g++ \ + gnupg \ + libtool \ + pkg-config \ + python3-pip \ + python3-venv + echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list + curl -fsSL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x99E82A75642AC823" | sudo apt-key add - + sudo apt-get update + sudo apt-get install -y sbt + curl -sSL "http://archive.apache.org/dist/thrift/0.13.0/thrift-0.13.0.tar.gz" -o /tmp/thrift.tar.gz + sudo rm -rf /usr/src/thrift + sudo mkdir -p /usr/src/thrift + sudo tar zxf /tmp/thrift.tar.gz -C /usr/src/thrift --strip-components=1 + cd /usr/src/thrift + sudo ./configure --without-python --without-cpp + sudo make -j2 + sudo make install + python3 -m pip install --break-system-packages build + thrift -version + - name: Build Chronon quickstart and service jars + run: | + cd chronon/quickstart/mongo-online-impl + sbt assembly + cd "${GITHUB_WORKSPACE}/chronon" + sbt "project service" assembly + - name: Run Chronon unit and stub integration tests + run: | + uv run pytest -c sdk/python/pytest.ini \ + sdk/python/tests/unit/infra/online_stores/chronon_online_store \ + sdk/python/tests/unit/infra/offline_stores/contrib/chronon_offline_store \ + sdk/python/tests/integration/online_store/test_chronon_online_store.py \ + sdk/python/tests/integration/offline_store/test_chronon_offline_store.py \ + --integration + - name: Start live Chronon service + run: infra/scripts/chronon/start-local-chronon-service.sh + - name: Run Chronon live-service integration test + env: + CHRONON_SERVICE_URL: http://127.0.0.1:9000 + run: | + uv run pytest -c sdk/python/pytest.ini \ + sdk/python/tests/integration/online_store/test_chronon_online_store_real_service.py \ + --integration + - name: Stop live Chronon service + if: always() + run: infra/scripts/chronon/stop-local-chronon-service.sh diff --git a/infra/scripts/chronon/start-local-chronon-service.sh b/infra/scripts/chronon/start-local-chronon-service.sh new file mode 100755 index 00000000000..d0a30a366db --- /dev/null +++ b/infra/scripts/chronon/start-local-chronon-service.sh @@ -0,0 +1,146 @@ +#!/usr/bin/env bash + +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/../../.." && pwd)" +CHRONON_DIR="${ROOT_DIR}/chronon" + +MONGO_CONTAINER="${CHRONON_MONGO_CONTAINER:-chronon-mongo}" +MAIN_CONTAINER="${CHRONON_MAIN_CONTAINER:-chronon-main}" +NETWORK_NAME="${CHRONON_NETWORK:-chronon-net}" +SERVICE_PORT="${CHRONON_SERVICE_PORT:-9000}" +SERVICE_HOST="${CHRONON_SERVICE_HOST:-127.0.0.1}" +SERVICE_PID_FILE="${CHRONON_SERVICE_PID_FILE:-/tmp/chronon-service.pid}" +SERVICE_LOG_FILE="${CHRONON_SERVICE_LOG_FILE:-/tmp/chronon-service.log}" +JAVA_BIN="${JAVA_BIN:-java}" + +if [[ ! -d "${CHRONON_DIR}" ]]; then + echo "Chronon repo not found at ${CHRONON_DIR}" >&2 + exit 1 +fi + +if [[ ! -f "${CHRONON_DIR}/quickstart/mongo-online-impl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar" ]]; then + echo "Missing quickstart Mongo implementation jar. Build chronon/quickstart/mongo-online-impl first." >&2 + exit 1 +fi + +if [[ ! -f "${CHRONON_DIR}/service/target/scala-2.12/service-0.0.111-SNAPSHOT.jar" ]]; then + echo "Missing Chronon service jar. Build chronon service first." >&2 + exit 1 +fi + +cleanup_stale_service() { + if [[ -f "${SERVICE_PID_FILE}" ]]; then + local pid + pid="$(cat "${SERVICE_PID_FILE}")" + if kill -0 "${pid}" >/dev/null 2>&1; then + kill "${pid}" >/dev/null 2>&1 || true + wait "${pid}" 2>/dev/null || true + fi + rm -f "${SERVICE_PID_FILE}" + fi +} + +wait_for_mongo() { + local attempts=60 + until docker exec "${MONGO_CONTAINER}" mongosh --quiet --eval 'db.runCommand({ ping: 1 }).ok' >/dev/null 2>&1; do + attempts=$((attempts - 1)) + if [[ "${attempts}" -le 0 ]]; then + echo "Mongo did not become ready in time." >&2 + exit 1 + fi + sleep 2 + done +} + +wait_for_data_load() { + local attempts=90 + until docker logs "${MAIN_CONTAINER}" 2>&1 | grep -q "Spark session available as 'spark'"; do + attempts=$((attempts - 1)) + if [[ "${attempts}" -le 0 ]]; then + echo "Chronon quickstart data loader did not initialize Spark in time." >&2 + exit 1 + fi + sleep 2 + done +} + +wait_for_http() { + local url="$1" + local attempts=60 + until curl --fail --silent "${url}" >/dev/null; do + attempts=$((attempts - 1)) + if [[ "${attempts}" -le 0 ]]; then + echo "Chronon service did not become ready at ${url}." >&2 + exit 1 + fi + sleep 2 + done +} + +run_quickstart_online_prep() { + docker exec "${MAIN_CONTAINER}" bash -lc ' + set -euo pipefail + cd /srv/chronon + run.py --conf production/group_bys/quickstart/purchases.v1 --mode upload --ds 2023-12-01 + run.py --conf production/group_bys/quickstart/returns.v1 --mode upload --ds 2023-12-01 + # pragma: allowlist secret + /opt/spark/bin/spark-submit --class ai.chronon.quickstart.online.Spark2MongoLoader --master local[*] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_purchases_v1_upload mongodb://admin:admin@'"${MONGO_CONTAINER}"':27017/?authSource=admin + # pragma: allowlist secret + /opt/spark/bin/spark-submit --class ai.chronon.quickstart.online.Spark2MongoLoader --master local[*] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_returns_v1_upload mongodb://admin:admin@'"${MONGO_CONTAINER}"':27017/?authSource=admin + run.py --mode metadata-upload --conf production/joins/quickstart/training_set.v2 --ds 2023-12-01 + run.py --mode fetch --type join --name quickstart/training_set.v2 -k "{\"user_id\":\"5\"}" + ' +} + +cleanup_stale_service +(docker network inspect "${NETWORK_NAME}" >/dev/null 2>&1) || docker network create "${NETWORK_NAME}" >/dev/null +(docker rm -f "${MONGO_CONTAINER}" >/dev/null 2>&1) || true +(docker rm -f "${MAIN_CONTAINER}" >/dev/null 2>&1) || true + +docker run -d \ + --name "${MONGO_CONTAINER}" \ + --network "${NETWORK_NAME}" \ + -p 27017:27017 \ + -e MONGO_INITDB_ROOT_USERNAME=admin \ + -e MONGO_INITDB_ROOT_PASSWORD=admin \ + mongo:latest >/dev/null + +wait_for_mongo + +docker run -d \ + --name "${MAIN_CONTAINER}" \ + --network "${NETWORK_NAME}" \ + -p 4040:4040 \ + -e USER=root \ + -e SPARK_SUBMIT_PATH=/opt/spark/bin/spark-submit \ + -e PYTHONPATH=/srv/chronon \ + -e SPARK_VERSION=3.1.1 \ + -e JOB_MODE='local[*]' \ + -e PARALLELISM=2 \ + -e EXECUTOR_MEMORY=2G \ + -e EXECUTOR_CORES=4 \ + -e DRIVER_MEMORY=1G \ + -e CHRONON_LOG_TABLE=default.chronon_log_table \ + -e CHRONON_ONLINE_CLASS=ai.chronon.quickstart.online.ChrononMongoOnlineImpl \ + -e "CHRONON_ONLINE_ARGS=-Zuser=admin -Zpassword=admin -Zhost=${MONGO_CONTAINER} -Zport=27017 -Zdatabase=admin" \ + -v "${CHRONON_DIR}/quickstart/mongo-online-impl:/srv/onlineImpl" \ + ezvz/chronon \ + bash -lc '/opt/spark/bin/spark-shell -i scripts/data-loader.scala && tail -f /dev/null' >/dev/null + +wait_for_data_load +run_quickstart_online_prep + +: > "${SERVICE_LOG_FILE}" +( + cd "${CHRONON_DIR}" + exec "${JAVA_BIN}" -jar service/target/scala-2.12/service-0.0.111-SNAPSHOT.jar \ + run ai.chronon.service.WebServiceVerticle \ + -Dserver.port="${SERVICE_PORT}" \ + -conf service/src/main/resources/example_config.json +) >"${SERVICE_LOG_FILE}" 2>&1 & + +echo "$!" > "${SERVICE_PID_FILE}" +wait_for_http "http://${SERVICE_HOST}:${SERVICE_PORT}/ping" + +echo "CHRONON_SERVICE_URL=http://${SERVICE_HOST}:${SERVICE_PORT}" diff --git a/infra/scripts/chronon/stop-local-chronon-service.sh b/infra/scripts/chronon/stop-local-chronon-service.sh new file mode 100755 index 00000000000..4aceed23659 --- /dev/null +++ b/infra/scripts/chronon/stop-local-chronon-service.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +set -euo pipefail + +MONGO_CONTAINER="${CHRONON_MONGO_CONTAINER:-chronon-mongo}" +MAIN_CONTAINER="${CHRONON_MAIN_CONTAINER:-chronon-main}" +NETWORK_NAME="${CHRONON_NETWORK:-chronon-net}" +SERVICE_PID_FILE="${CHRONON_SERVICE_PID_FILE:-/tmp/chronon-service.pid}" + +if [[ -f "${SERVICE_PID_FILE}" ]]; then + pid="$(cat "${SERVICE_PID_FILE}")" + if kill -0 "${pid}" >/dev/null 2>&1; then + kill "${pid}" >/dev/null 2>&1 || true + wait "${pid}" 2>/dev/null || true + fi + rm -f "${SERVICE_PID_FILE}" +fi + +(docker rm -f "${MAIN_CONTAINER}" >/dev/null 2>&1) || true +(docker rm -f "${MONGO_CONTAINER}" >/dev/null 2>&1) || true +(docker network rm "${NETWORK_NAME}" >/dev/null 2>&1) || true diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index c91e6b4c3ec..d75257265cb 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -5,6 +5,9 @@ from feast.infra.offline_stores.contrib.athena_offline_store.athena_source import ( AthenaSource, ) +from feast.infra.offline_stores.contrib.chronon_offline_store.chronon_source import ( + ChrononSource, +) from feast.infra.offline_stores.contrib.oracle_offline_store.oracle_source import ( OracleSource, ) @@ -63,6 +66,7 @@ "RequestSource", "AthenaSource", "OracleSource", + "ChrononSource", "Project", "FeastVectorStore", "DocEmbedder", diff --git a/sdk/python/feast/infra/chronon_provider.py b/sdk/python/feast/infra/chronon_provider.py new file mode 100644 index 00000000000..6df8881e387 --- /dev/null +++ b/sdk/python/feast/infra/chronon_provider.py @@ -0,0 +1,5 @@ +from feast.infra.passthrough_provider import PassthroughProvider + + +class ChrononProvider(PassthroughProvider): + """Optional provider wrapper for Chronon-backed Feast configurations.""" diff --git a/sdk/python/feast/infra/offline_stores/contrib/chronon_offline_store/__init__.py b/sdk/python/feast/infra/offline_stores/contrib/chronon_offline_store/__init__.py new file mode 100644 index 00000000000..89b3ae12bf6 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/chronon_offline_store/__init__.py @@ -0,0 +1 @@ +__all__ = ["ChrononOfflineStore", "ChrononOfflineStoreConfig", "ChrononSource"] diff --git a/sdk/python/feast/infra/offline_stores/contrib/chronon_offline_store/chronon.py b/sdk/python/feast/infra/offline_stores/contrib/chronon_offline_store/chronon.py new file mode 100644 index 00000000000..073efac26e3 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/chronon_offline_store/chronon.py @@ -0,0 +1,363 @@ +from datetime import datetime +from pathlib import Path +from typing import Any, Callable, List, Literal, Optional, Union + +import pandas as pd +import pyarrow as pa +from pydantic import StrictStr + +from feast import utils +from feast.data_source import DataSource +from feast.feature_logging import LoggingConfig, LoggingSource +from feast.feature_view import FeatureView +from feast.infra.offline_stores.file_source import FileSource +from feast.infra.offline_stores.offline_store import ( + OfflineStore, + RetrievalJob, + RetrievalMetadata, +) +from feast.infra.offline_stores.offline_utils import ( + DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL, + assert_expected_columns_in_entity_df, + get_expected_join_keys, +) +from feast.infra.registry.base_registry import BaseRegistry +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.saved_dataset import SavedDatasetStorage + + +class ChrononOfflineStoreConfig(FeastConfigBaseModel): + type: Literal["chronon"] = "chronon" + path: Optional[StrictStr] = None + + +class ChrononRetrievalJob(RetrievalJob): + def __init__( + self, + evaluation_function: Callable[[], pd.DataFrame], + full_feature_names: bool, + metadata: Optional[RetrievalMetadata] = None, + ): + self.evaluation_function = evaluation_function + self._full_feature_names = full_feature_names + self._metadata = metadata + + @property + def full_feature_names(self) -> bool: + return self._full_feature_names + + @property + def on_demand_feature_views(self) -> List[OnDemandFeatureView]: + return [] + + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: + return self.evaluation_function().reset_index(drop=True) + + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: + return pa.Table.from_pandas(self._to_df_internal(timeout=timeout)) + + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: Optional[int] = None, + ): + raise NotImplementedError( + "ChrononRetrievalJob does not currently support persisted saved datasets." + ) + + @property + def metadata(self) -> Optional[RetrievalMetadata]: + return self._metadata + + +def _get_chronon_source(feature_view: FeatureView): + source = feature_view.batch_source + if source is None or source.__class__.__name__ != "ChrononSource": + raise ValueError( + f"Feature view '{feature_view.name}' is not backed by a ChrononSource." + ) + return source + + +def _load_materialized_dataframe( + config: RepoConfig, feature_view: FeatureView +) -> pd.DataFrame: + source = _get_chronon_source(feature_view) + resolved_path = FileSource.get_uri_for_file_path( + repo_path=config.repo_path, uri=source.materialization_path + ) + dataframe = pd.read_parquet(resolved_path) + if source.field_mapping: + dataframe = dataframe.rename(columns=source.field_mapping) + return utils.make_df_tzaware(dataframe) + + +def _output_feature_name( + feature_view: FeatureView, feature_name: str, full_feature_names: bool +) -> str: + if full_feature_names: + return f"{feature_view.projection.name_to_use()}__{feature_name}" + return feature_name + + +class ChrononOfflineStore(OfflineStore): + @staticmethod + def get_historical_features( + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Optional[Union[pd.DataFrame, str]], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + **kwargs, + ) -> RetrievalJob: + assert isinstance(config.offline_store, ChrononOfflineStoreConfig) + + requested_features, requested_odfvs = ( + utils._get_requested_feature_views_to_features_dict( + feature_refs, + feature_views, + registry.list_on_demand_feature_views(project), + ) + ) + if requested_odfvs: + raise ValueError( + "ChrononOfflineStore does not support on-demand feature views." + ) + if entity_df is not None and not isinstance(entity_df, pd.DataFrame): + raise ValueError( + "ChrononOfflineStore currently supports pandas entity_df inputs or " + "time-range retrieval without entity_df." + ) + + metadata = RetrievalMetadata( + features=feature_refs, + keys=( + list(entity_df.columns) + if isinstance(entity_df, pd.DataFrame) + else [DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL] + ), + min_event_timestamp=kwargs.get("start_date"), + max_event_timestamp=kwargs.get("end_date"), + ) + + if entity_df is None: + if len(requested_features) != 1: + raise ValueError( + "ChrononOfflineStore non-entity retrieval currently supports " + "exactly one feature view at a time." + ) + + def evaluate_non_entity_retrieval() -> pd.DataFrame: + feature_view, selected_features = next(iter(requested_features.items())) + dataframe = _load_materialized_dataframe(config, feature_view) + timestamp_col = feature_view.batch_source.timestamp_field # type: ignore[union-attr] + if timestamp_col: + start_date = kwargs.get("start_date") + end_date = kwargs.get("end_date") + if start_date is not None: + dataframe = dataframe[ + dataframe[timestamp_col] >= utils.make_tzaware(start_date) + ] + if end_date is not None: + dataframe = dataframe[ + dataframe[timestamp_col] <= utils.make_tzaware(end_date) + ] + keep_columns = list( + { + *(entity.name for entity in feature_view.entity_columns), + timestamp_col, + *(selected_features), + } + ) + dataframe = dataframe.loc[ + :, [c for c in keep_columns if c in dataframe.columns] + ] + rename_map = { + feature: _output_feature_name( + feature_view, feature, full_feature_names + ) + for feature in selected_features + } + return dataframe.rename(columns=rename_map) + + return ChrononRetrievalJob( + evaluation_function=evaluate_non_entity_retrieval, + full_feature_names=full_feature_names, + metadata=metadata, + ) + + entity_df = utils.make_df_tzaware(entity_df) + entity_event_timestamp_col = DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL + expected_join_keys = get_expected_join_keys(project, feature_views, registry) + assert_expected_columns_in_entity_df( + entity_df.dtypes.to_dict(), + expected_join_keys, + entity_event_timestamp_col, + ) + + def evaluate_historical_retrieval() -> pd.DataFrame: + result = entity_df.copy() + for feature_view, selected_features in requested_features.items(): + source_df = _load_materialized_dataframe(config, feature_view) + timestamp_col = feature_view.batch_source.timestamp_field # type: ignore[union-attr] + created_col = feature_view.batch_source.created_timestamp_column # type: ignore[union-attr] + left_keys = [] + right_keys = [] + for entity_column in feature_view.entity_columns: + left_keys.append( + feature_view.projection.join_key_map.get( + entity_column.name, entity_column.name + ) + ) + right_keys.append(entity_column.name) + + if created_col and created_col in source_df.columns: + source_df = source_df.sort_values(created_col).drop_duplicates( + subset=right_keys + [timestamp_col], keep="last" + ) + + merge_columns = right_keys + [timestamp_col] + list(selected_features) + merge_frame = source_df.loc[ + :, [col for col in merge_columns if col in source_df.columns] + ].copy() + merge_frame = merge_frame.rename( + columns={ + feature: _output_feature_name( + feature_view, feature, full_feature_names + ) + for feature in selected_features + } + ) + + result = result.merge( + merge_frame, + how="left", + left_on=left_keys + [entity_event_timestamp_col], + right_on=right_keys + [timestamp_col], + sort=False, + ) + + drop_columns = [ + col + for col in right_keys + [timestamp_col] + if col in result.columns + and col not in left_keys + [entity_event_timestamp_col] + ] + if drop_columns: + result = result.drop(columns=drop_columns) + return result + + return ChrononRetrievalJob( + evaluation_function=evaluate_historical_retrieval, + full_feature_names=full_feature_names, + metadata=metadata, + ) + + @staticmethod + def pull_latest_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str], + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + if data_source.__class__.__name__ != "ChrononSource": + raise ValueError("ChrononOfflineStore only supports ChrononSource inputs.") + + def evaluate_pull_latest() -> pd.DataFrame: + feature_view = FeatureView( + name=data_source.name, + entities=[], + schema=[], + source=data_source, + ) + dataframe = _load_materialized_dataframe(config, feature_view) + dataframe = dataframe[ + (dataframe[timestamp_field] >= utils.make_tzaware(start_date)) + & (dataframe[timestamp_field] <= utils.make_tzaware(end_date)) + ] + sort_columns = [timestamp_field] + if ( + created_timestamp_column + and created_timestamp_column in dataframe.columns + ): + sort_columns.append(created_timestamp_column) + dataframe = dataframe.sort_values(sort_columns).drop_duplicates( + subset=join_key_columns, keep="last" + ) + return dataframe[ + join_key_columns + feature_name_columns + [timestamp_field] + ] + + return ChrononRetrievalJob( + evaluation_function=evaluate_pull_latest, + full_feature_names=False, + ) + + @staticmethod + def pull_all_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + ) -> RetrievalJob: + if data_source.__class__.__name__ != "ChrononSource": + raise ValueError("ChrononOfflineStore only supports ChrononSource inputs.") + + def evaluate_pull_all() -> pd.DataFrame: + feature_view = FeatureView( + name=data_source.name, + entities=[], + schema=[], + source=data_source, + ) + dataframe = _load_materialized_dataframe(config, feature_view) + if start_date is not None: + dataframe = dataframe[ + dataframe[timestamp_field] >= utils.make_tzaware(start_date) + ] + if end_date is not None: + dataframe = dataframe[ + dataframe[timestamp_field] <= utils.make_tzaware(end_date) + ] + keep_columns = join_key_columns + feature_name_columns + [timestamp_field] + return dataframe.loc[:, [c for c in keep_columns if c in dataframe.columns]] + + return ChrononRetrievalJob( + evaluation_function=evaluate_pull_all, + full_feature_names=False, + ) + + @staticmethod + def write_logged_features( + config: RepoConfig, + data: Union[pa.Table, Path], + source: LoggingSource, + logging_config: LoggingConfig, + registry: BaseRegistry, + ): + raise NotImplementedError( + "ChrononOfflineStore does not support Feast-managed logged feature writes." + ) + + @staticmethod + def offline_write_batch( + config: RepoConfig, + feature_view: FeatureView, + table: pa.Table, + progress: Optional[Callable[[int], Any]], + ): + raise NotImplementedError( + "ChrononOfflineStore does not support Feast-managed offline writes." + ) diff --git a/sdk/python/feast/infra/offline_stores/contrib/chronon_offline_store/chronon_source.py b/sdk/python/feast/infra/offline_stores/contrib/chronon_offline_store/chronon_source.py new file mode 100644 index 00000000000..95a2cee267f --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/chronon_offline_store/chronon_source.py @@ -0,0 +1,161 @@ +import json +from pathlib import Path +from typing import Any, Callable, Dict, Iterable, Optional, Tuple + +import pyarrow.dataset as ds + +from feast import type_map +from feast.data_source import DataSource +from feast.errors import DataSourceNoNameException +from feast.infra.offline_stores.file_source import FileSource +from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.repo_config import RepoConfig +from feast.value_type import ValueType + + +class ChrononOptions: + def __init__( + self, + *, + materialization_path: str, + chronon_group_by: Optional[str] = None, + chronon_join: Optional[str] = None, + online_endpoint: Optional[str] = None, + ): + self.materialization_path = materialization_path + self.chronon_group_by = chronon_group_by or "" + self.chronon_join = chronon_join or "" + self.online_endpoint = online_endpoint or "" + + @classmethod + def from_proto(cls, proto: DataSourceProto.CustomSourceOptions): + config = json.loads(proto.configuration.decode("utf8")) + return cls( + materialization_path=config["materialization_path"], + chronon_group_by=config.get("chronon_group_by"), + chronon_join=config.get("chronon_join"), + online_endpoint=config.get("online_endpoint"), + ) + + def to_proto(self) -> DataSourceProto.CustomSourceOptions: + return DataSourceProto.CustomSourceOptions( + configuration=json.dumps( + { + "materialization_path": self.materialization_path, + "chronon_group_by": self.chronon_group_by, + "chronon_join": self.chronon_join, + "online_endpoint": self.online_endpoint, + } + ).encode() + ) + + +class ChrononSource(DataSource): + def source_type(self) -> DataSourceProto.SourceType.ValueType: + return DataSourceProto.CUSTOM_SOURCE + + def __init__( + self, + *, + materialization_path: str, + name: Optional[str] = None, + chronon_group_by: Optional[str] = None, + chronon_join: Optional[str] = None, + online_endpoint: Optional[str] = None, + timestamp_field: Optional[str] = "", + created_timestamp_column: Optional[str] = "", + field_mapping: Optional[Dict[str, str]] = None, + description: Optional[str] = "", + tags: Optional[Dict[str, str]] = None, + owner: Optional[str] = "", + ): + if not name and not materialization_path: + raise DataSourceNoNameException() + + self._chronon_options = ChrononOptions( + materialization_path=materialization_path, + chronon_group_by=chronon_group_by, + chronon_join=chronon_join, + online_endpoint=online_endpoint, + ) + + super().__init__( + name=name or materialization_path, + timestamp_field=timestamp_field, + created_timestamp_column=created_timestamp_column, + field_mapping=field_mapping, + description=description, + tags=tags, + owner=owner, + ) + + @property + def materialization_path(self) -> str: + return self._chronon_options.materialization_path + + @property + def chronon_group_by(self) -> str: + return self._chronon_options.chronon_group_by + + @property + def chronon_join(self) -> str: + return self._chronon_options.chronon_join + + @property + def online_endpoint(self) -> str: + return self._chronon_options.online_endpoint + + @staticmethod + def from_proto(data_source: DataSourceProto) -> Any: + assert data_source.HasField("custom_options") + options = ChrononOptions.from_proto(data_source.custom_options) + return ChrononSource( + name=data_source.name, + materialization_path=options.materialization_path, + chronon_group_by=options.chronon_group_by, + chronon_join=options.chronon_join, + online_endpoint=options.online_endpoint, + field_mapping=dict(data_source.field_mapping), + timestamp_field=data_source.timestamp_field, + created_timestamp_column=data_source.created_timestamp_column, + description=data_source.description, + tags=dict(data_source.tags), + owner=data_source.owner, + ) + + def _to_proto_impl(self) -> DataSourceProto: + proto = DataSourceProto( + name=self.name, + type=DataSourceProto.CUSTOM_SOURCE, + data_source_class_type="feast.infra.offline_stores.contrib.chronon_offline_store.chronon_source.ChrononSource", + field_mapping=self.field_mapping, + custom_options=self._chronon_options.to_proto(), + description=self.description, + tags=self.tags, + owner=self.owner, + ) + proto.timestamp_field = self.timestamp_field + proto.created_timestamp_column = self.created_timestamp_column + return proto + + def validate(self, config: RepoConfig): + resolved_path = FileSource.get_uri_for_file_path( + repo_path=config.repo_path, uri=self.materialization_path + ) + if not Path(resolved_path).exists(): + raise FileNotFoundError( + f"Chronon materialization path does not exist: {resolved_path}" + ) + + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + return type_map.pa_to_feast_value_type + + def get_table_column_names_and_types( + self, config: RepoConfig + ) -> Iterable[Tuple[str, str]]: + resolved_path = FileSource.get_uri_for_file_path( + repo_path=config.repo_path, uri=self.materialization_path + ) + schema = ds.dataset(resolved_path, format="parquet").schema + return [(field.name, str(field.type)) for field in schema] diff --git a/sdk/python/feast/infra/online_stores/chronon_online_store/__init__.py b/sdk/python/feast/infra/online_stores/chronon_online_store/__init__.py new file mode 100644 index 00000000000..fd29e9d0b0a --- /dev/null +++ b/sdk/python/feast/infra/online_stores/chronon_online_store/__init__.py @@ -0,0 +1 @@ +__all__ = ["ChrononOnlineStore", "ChrononOnlineStoreConfig"] diff --git a/sdk/python/feast/infra/online_stores/chronon_online_store/chronon.py b/sdk/python/feast/infra/online_stores/chronon_online_store/chronon.py new file mode 100644 index 00000000000..68cec202975 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/chronon_online_store/chronon.py @@ -0,0 +1,154 @@ +from datetime import datetime +from typing import Any, Callable, Dict, List, Literal, Optional, Tuple +from urllib.parse import quote + +from pydantic import StrictInt, StrictStr + +from feast.data_source import DataSource +from feast.feature_view import FeatureView +from feast.infra.online_stores.online_store import OnlineStore +from feast.permissions.client.http_auth_requests_wrapper import HttpSessionManager +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.type_map import ( + feast_value_type_to_python_type, + python_values_to_proto_values, +) +from feast.value_type import ValueType + + +class ChrononOnlineStoreConfig(FeastConfigBaseModel): + type: Literal["chronon"] = "chronon" + path: StrictStr = "http://localhost:8080" + timeout: StrictInt = 30 + verify_ssl: bool = True + + +class ChrononOnlineStore(OnlineStore): + @staticmethod + def _get_chronon_source(table: FeatureView): + source: Optional[DataSource] = getattr(table, "batch_source", None) + if source is None or source.__class__.__name__ != "ChrononSource": + raise ValueError( + f"Feature view '{table.name}' is not backed by a ChrononSource." + ) + return source + + @staticmethod + def _build_url(config: RepoConfig, table: FeatureView) -> str: + source = ChrononOnlineStore._get_chronon_source(table) + base_url = getattr(source, "online_endpoint", "") or config.online_store.path + object_name = getattr(source, "chronon_join", "") or getattr( + source, "chronon_group_by", "" + ) + if not object_name: + raise ValueError( + f"ChrononSource for feature view '{table.name}' must define either " + "`chronon_join` or `chronon_group_by`." + ) + object_type = "join" if getattr(source, "chronon_join", "") else "groupby" + return ( + f"{base_url.rstrip('/')}/v1/features/" + f"{object_type}/{quote(object_name, safe='')}" + ) + + @staticmethod + def _entity_key_to_request_row(entity_key: EntityKeyProto) -> Dict[str, Any]: + return { + join_key: feast_value_type_to_python_type(value) + for join_key, value in zip(entity_key.join_keys, entity_key.entity_values) + } + + @staticmethod + def _feature_values_to_proto( + response_features: Dict[str, Any], requested_features: Optional[List[str]] + ) -> Dict[str, ValueProto]: + if requested_features is not None: + response_features = { + key: value + for key, value in response_features.items() + if key in requested_features + } + result: Dict[str, ValueProto] = {} + for feature_name, value in response_features.items(): + result[feature_name] = python_values_to_proto_values( + [value], ValueType.UNKNOWN + )[0] + return result + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + raise NotImplementedError( + "ChrononOnlineStore does not support Feast-managed online writes. " + "Chronon-backed features must be populated by Chronon." + ) + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + assert isinstance(config.online_store, ChrononOnlineStoreConfig) + + url = self._build_url(config, table) + payload = [ + self._entity_key_to_request_row(entity_key) for entity_key in entity_keys + ] + session = HttpSessionManager.get_session( + config.auth_config, + max_retries=0, + ) + response = session.post( + url, + json=payload, + timeout=config.online_store.timeout, + verify=config.online_store.verify_ssl, + ) + response.raise_for_status() + + parsed = response.json() + results = parsed.get("results", []) + if len(results) != len(entity_keys): + raise RuntimeError( + f"Chronon returned {len(results)} rows for {len(entity_keys)} entity keys." + ) + + output: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] + for row in results: + if row.get("status") != "Success": + output.append((None, None)) + continue + feature_values = self._feature_values_to_proto( + row.get("features", {}), requested_features + ) + output.append((None, feature_values)) + return output + + def update( + self, + config: RepoConfig, + tables_to_delete, + tables_to_keep, + entities_to_delete, + entities_to_keep, + partial: bool, + ): + return None + + def teardown( + self, + config: RepoConfig, + tables, + entities, + ): + return None diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 9bdf681fb69..a2323c9fae1 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -43,6 +43,7 @@ "aws": "feast.infra.passthrough_provider.PassthroughProvider", "local": "feast.infra.passthrough_provider.PassthroughProvider", "azure": "feast.infra.passthrough_provider.PassthroughProvider", + "chronon": "feast.infra.chronon_provider.ChrononProvider", } diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 93fb2070cfd..27502f8bc16 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -84,6 +84,7 @@ "milvus": "feast.infra.online_stores.milvus_online_store.milvus.MilvusOnlineStore", "mongodb": "feast.infra.online_stores.mongodb_online_store.MongoDBOnlineStore", "hybrid": "feast.infra.online_stores.hybrid_online_store.hybrid_online_store.HybridOnlineStore", + "chronon": "feast.infra.online_stores.chronon_online_store.chronon.ChrononOnlineStore", **LEGACY_ONLINE_STORE_CLASS_FOR_TYPE, } @@ -104,6 +105,7 @@ "clickhouse": "feast.infra.offline_stores.contrib.clickhouse_offline_store.clickhouse.ClickhouseOfflineStore", "ray": "feast.infra.offline_stores.contrib.ray_offline_store.ray.RayOfflineStore", "oracle": "feast.infra.offline_stores.contrib.oracle_offline_store.oracle.OracleOfflineStore", + "chronon": "feast.infra.offline_stores.contrib.chronon_offline_store.chronon.ChrononOfflineStore", } FEATURE_SERVER_CONFIG_CLASS_FOR_TYPE = { diff --git a/sdk/python/tests/integration/offline_store/test_chronon_offline_store.py b/sdk/python/tests/integration/offline_store/test_chronon_offline_store.py new file mode 100644 index 00000000000..9ada9e07e99 --- /dev/null +++ b/sdk/python/tests/integration/offline_store/test_chronon_offline_store.py @@ -0,0 +1,115 @@ +from pathlib import Path + +import pandas as pd +import pytest + +from feast import Entity, FeatureStore, FeatureView, Field, RepoConfig +from feast.infra.offline_stores.contrib.chronon_offline_store.chronon_source import ( + ChrononSource, +) +from feast.repo_config import RegistryConfig +from feast.types import Float32 +from feast.value_type import ValueType + + +@pytest.mark.integration +def test_chronon_offline_store_historical_retrieval(tmp_path: Path): + data_path = tmp_path / "chronon.parquet" + pd.DataFrame( + { + "user_id": [1, 2], + "event_timestamp": [ + pd.Timestamp("2024-01-01T00:00:00Z"), + pd.Timestamp("2024-01-02T00:00:00Z"), + ], + "feature_a": [2.0, 4.0], + } + ).to_parquet(data_path) + + config = RepoConfig( + project="test", + registry=RegistryConfig(path=str(tmp_path / "registry.db")), + provider="chronon", + offline_store={"type": "chronon"}, + online_store={"type": "sqlite", "path": str(tmp_path / "online.db")}, + ) + config.repo_path = tmp_path + + store = FeatureStore(config=config) + user = Entity(name="user", join_keys=["user_id"], value_type=ValueType.INT64) + fv = FeatureView( + name="fraud_profile", + entities=[user], + schema=[Field(name="feature_a", dtype=Float32)], + source=ChrononSource( + materialization_path=str(data_path), + chronon_join="team/training_set.v1", + timestamp_field="event_timestamp", + ), + offline=True, + ) + store.apply([user, fv]) + + entity_df = pd.DataFrame( + { + "user_id": [1, 2], + "event_timestamp": [ + pd.Timestamp("2024-01-01T00:00:00Z"), + pd.Timestamp("2024-01-02T00:00:00Z"), + ], + } + ) + + result = store.get_historical_features( + entity_df=entity_df, + features=["fraud_profile:feature_a"], + ).to_df() + + assert result["feature_a"].tolist() == [2.0, 4.0] + + +@pytest.mark.integration +def test_chronon_offline_store_non_entity_retrieval(tmp_path: Path): + data_path = tmp_path / "chronon.parquet" + pd.DataFrame( + { + "user_id": [1, 2], + "event_timestamp": [ + pd.Timestamp("2024-01-01T00:00:00Z"), + pd.Timestamp("2024-01-02T00:00:00Z"), + ], + "feature_a": [2.0, 4.0], + } + ).to_parquet(data_path) + + config = RepoConfig( + project="test", + registry=RegistryConfig(path=str(tmp_path / "registry.db")), + provider="chronon", + offline_store={"type": "chronon"}, + online_store={"type": "sqlite", "path": str(tmp_path / "online.db")}, + ) + config.repo_path = tmp_path + + store = FeatureStore(config=config) + user = Entity(name="user", join_keys=["user_id"], value_type=ValueType.INT64) + fv = FeatureView( + name="fraud_profile", + entities=[user], + schema=[Field(name="feature_a", dtype=Float32)], + source=ChrononSource( + materialization_path=str(data_path), + chronon_join="team/training_set.v1", + timestamp_field="event_timestamp", + ), + offline=True, + ) + store.apply([user, fv]) + + result = store.get_historical_features( + features=["fraud_profile:feature_a"], + start_date=pd.Timestamp("2024-01-01T00:00:00Z").to_pydatetime(), + end_date=pd.Timestamp("2024-01-01T23:59:59Z").to_pydatetime(), + ).to_df() + + assert result["feature_a"].tolist() == [2.0] diff --git a/sdk/python/tests/integration/online_store/test_chronon_online_store.py b/sdk/python/tests/integration/online_store/test_chronon_online_store.py new file mode 100644 index 00000000000..831a1c48e4a --- /dev/null +++ b/sdk/python/tests/integration/online_store/test_chronon_online_store.py @@ -0,0 +1,100 @@ +import json +import threading +from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path + +import pandas as pd +import pytest + +from feast import Entity, FeatureStore, FeatureView, Field, RepoConfig +from feast.infra.offline_stores.contrib.chronon_offline_store.chronon_source import ( + ChrononSource, +) +from feast.repo_config import RegistryConfig +from feast.types import Float32 +from feast.value_type import ValueType +from tests.utils.http_server import free_port + + +class _ChrononHandler(BaseHTTPRequestHandler): + def do_POST(self): + body = self.rfile.read(int(self.headers["Content-Length"])) + rows = json.loads(body.decode("utf8")) + response = { + "results": [ + { + "status": "Success", + "entityKeys": row, + "features": {"feature_a": float(row["user_id"]) * 2.0}, + } + for row in rows + ] + } + payload = json.dumps(response).encode("utf8") + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(payload))) + self.end_headers() + self.wfile.write(payload) + + def log_message(self, format, *args): + return + + +@pytest.fixture +def chronon_server(): + port = free_port() + server = HTTPServer(("127.0.0.1", port), _ChrononHandler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + yield f"http://127.0.0.1:{port}" + server.shutdown() + thread.join(timeout=5) + + +@pytest.mark.integration +def test_chronon_online_store_feature_store_integration( + tmp_path: Path, chronon_server: str +): + data_path = tmp_path / "chronon.parquet" + pd.DataFrame( + { + "user_id": [1, 2], + "event_timestamp": [ + pd.Timestamp("2024-01-01T00:00:00Z"), + pd.Timestamp("2024-01-02T00:00:00Z"), + ], + "feature_a": [2.0, 4.0], + } + ).to_parquet(data_path) + + config = RepoConfig( + project="test", + registry=RegistryConfig(path=str(tmp_path / "registry.db")), + provider="chronon", + offline_store={"type": "chronon"}, + online_store={"type": "chronon", "path": chronon_server}, + ) + config.repo_path = tmp_path + + store = FeatureStore(config=config) + user = Entity(name="user", join_keys=["user_id"], value_type=ValueType.INT64) + fv = FeatureView( + name="fraud_profile", + entities=[user], + schema=[Field(name="feature_a", dtype=Float32)], + source=ChrononSource( + materialization_path=str(data_path), + chronon_join="team/training_set.v1", + timestamp_field="event_timestamp", + ), + online=True, + ) + store.apply([user, fv]) + + response = store.get_online_features( + features=["fraud_profile:feature_a"], + entity_rows=[{"user_id": 1}, {"user_id": 2}], + ).to_dict() + + assert response["feature_a"] == [2.0, 4.0] diff --git a/sdk/python/tests/integration/online_store/test_chronon_online_store_real_service.py b/sdk/python/tests/integration/online_store/test_chronon_online_store_real_service.py new file mode 100644 index 00000000000..3256934b99a --- /dev/null +++ b/sdk/python/tests/integration/online_store/test_chronon_online_store_real_service.py @@ -0,0 +1,67 @@ +import os +from pathlib import Path + +import pandas as pd +import pytest + +from feast import Entity, FeatureStore, FeatureView, Field, RepoConfig +from feast.infra.offline_stores.contrib.chronon_offline_store.chronon_source import ( + ChrononSource, +) +from feast.repo_config import RegistryConfig +from feast.types import Int64 +from feast.value_type import ValueType + + +@pytest.mark.integration +def test_chronon_online_store_real_service(tmp_path: Path): + service_url = os.environ.get("CHRONON_SERVICE_URL") + if not service_url: + pytest.skip("Set CHRONON_SERVICE_URL to run against a live Chronon service.") + + data_path = tmp_path / "placeholder.parquet" + pd.DataFrame( + { + "event_timestamp": [pd.Timestamp("2024-01-01T00:00:00Z")], + } + ).to_parquet(data_path) + + config = RepoConfig( + project="test", + registry=RegistryConfig(path=str(tmp_path / "registry.db")), + provider="chronon", + offline_store={"type": "chronon"}, + online_store={"type": "chronon", "path": service_url}, + ) + config.repo_path = tmp_path + + store = FeatureStore(config=config) + user = Entity(name="user", join_keys=["user_id"], value_type=ValueType.STRING) + fv = FeatureView( + name="training_set", + entities=[user], + schema=[ + Field(name="quickstart_purchases_v1_purchase_price_sum_30d", dtype=Int64), + Field(name="quickstart_returns_v1_refund_amt_sum_30d", dtype=Int64), + ], + source=ChrononSource( + materialization_path=str(data_path), + chronon_join="quickstart/training_set.v2", + timestamp_field="event_timestamp", + online_endpoint=service_url, + ), + online=True, + offline=False, + ) + store.apply([user, fv]) + + response = store.get_online_features( + features=[ + "training_set:quickstart_purchases_v1_purchase_price_sum_30d", + "training_set:quickstart_returns_v1_refund_amt_sum_30d", + ], + entity_rows=[{"user_id": "5"}], + ).to_dict() + + assert response["quickstart_purchases_v1_purchase_price_sum_30d"] == [1253] + assert response["quickstart_returns_v1_refund_amt_sum_30d"] == [1269] diff --git a/sdk/python/tests/unit/infra/offline_stores/contrib/chronon_offline_store/test_chronon_source.py b/sdk/python/tests/unit/infra/offline_stores/contrib/chronon_offline_store/test_chronon_source.py new file mode 100644 index 00000000000..a70d72d0572 --- /dev/null +++ b/sdk/python/tests/unit/infra/offline_stores/contrib/chronon_offline_store/test_chronon_source.py @@ -0,0 +1,34 @@ +from pathlib import Path + +import pandas as pd + +from feast.infra.offline_stores.contrib.chronon_offline_store.chronon_source import ( + ChrononSource, +) + + +def test_chronon_source_proto_round_trip(tmp_path: Path): + data_path = tmp_path / "chronon.parquet" + pd.DataFrame( + { + "user_id": [1], + "event_timestamp": [pd.Timestamp("2024-01-01T00:00:00Z")], + "feature_a": [0.5], + } + ).to_parquet(data_path) + + source = ChrononSource( + name="chronon_source", + materialization_path=str(data_path), + chronon_join="team/training_set.v1", + online_endpoint="http://localhost:8080", + timestamp_field="event_timestamp", + ) + + restored = ChrononSource.from_proto(source.to_proto()) + + assert restored.name == source.name + assert restored.materialization_path == source.materialization_path + assert restored.chronon_join == source.chronon_join + assert restored.online_endpoint == source.online_endpoint + assert restored.timestamp_field == source.timestamp_field diff --git a/sdk/python/tests/unit/infra/online_stores/chronon_online_store/test_chronon_online_store.py b/sdk/python/tests/unit/infra/online_stores/chronon_online_store/test_chronon_online_store.py new file mode 100644 index 00000000000..fea418b2af8 --- /dev/null +++ b/sdk/python/tests/unit/infra/online_stores/chronon_online_store/test_chronon_online_store.py @@ -0,0 +1,107 @@ +from pathlib import Path + +import pandas as pd + +from feast import Entity, FeatureView, Field +from feast.infra.offline_stores.contrib.chronon_offline_store.chronon_source import ( + ChrononSource, +) +from feast.infra.online_stores.chronon_online_store.chronon import ( + ChrononOnlineStore, + ChrononOnlineStoreConfig, +) +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import RegistryConfig, RepoConfig +from feast.types import Float32, Int64 +from feast.value_type import ValueType + + +class _Response: + def __init__(self, payload): + self._payload = payload + + def raise_for_status(self): + return None + + def json(self): + return self._payload + + +def test_chronon_online_store_maps_success_and_failure(monkeypatch, tmp_path: Path): + data_path = tmp_path / "chronon.parquet" + pd.DataFrame( + { + "user_id": [1], + "event_timestamp": [pd.Timestamp("2024-01-01T00:00:00Z")], + "feature_a": [0.5], + } + ).to_parquet(data_path) + + config = RepoConfig( + project="test", + registry=RegistryConfig(path=str(tmp_path / "registry.db")), + provider="chronon", + offline_store={"type": "chronon"}, + online_store=ChrononOnlineStoreConfig(path="http://chronon.test"), + ) + config.repo_path = tmp_path + + source = ChrononSource( + materialization_path=str(data_path), + chronon_join="team/training_set.v1", + timestamp_field="event_timestamp", + ) + feature_view = FeatureView( + name="fraud_profile", + entities=[ + Entity(name="user", join_keys=["user_id"], value_type=ValueType.INT64) + ], + schema=[ + Field(name="user_id", dtype=Int64), + Field(name="feature_a", dtype=Float32), + ], + source=source, + ) + + captured = {} + + class _Session: + def post(self, url, json, timeout, verify): + captured["url"] = url + captured["json"] = json + return _Response( + { + "results": [ + {"status": "Success", "features": {"feature_a": 0.5}}, + {"status": "Failure", "error": "boom"}, + ] + } + ) + + monkeypatch.setattr( + "feast.infra.online_stores.chronon_online_store.chronon.HttpSessionManager.get_session", + lambda config, **kwargs: _Session(), + ) + + store = ChrononOnlineStore() + entity_keys = [ + EntityKeyProto( + join_keys=["user_id"], + entity_values=[ValueProto(int64_val=1)], + ), + EntityKeyProto( + join_keys=["user_id"], + entity_values=[ValueProto(int64_val=2)], + ), + ] + + rows = store.online_read( + config, feature_view, entity_keys, requested_features=["feature_a"] + ) + + assert captured["url"].endswith("/v1/features/join/team%2Ftraining_set.v1") + assert captured["json"] == [{"user_id": 1}, {"user_id": 2}] + assert rows[0][1] is not None + assert abs(rows[0][1]["feature_a"].double_val - 0.5) < 1e-6 + assert rows[1] == (None, None) From c7a21e9c65c723d9319a13faae2b83afeae98b7d Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 27 Mar 2026 20:46:30 -0400 Subject: [PATCH 2/3] Format python utilities for lint --- sdk/python/feast/utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/feast/utils.py b/sdk/python/feast/utils.py index ce45bb0862e..719539583af 100644 --- a/sdk/python/feast/utils.py +++ b/sdk/python/feast/utils.py @@ -127,7 +127,6 @@ def compute_non_entity_date_range( end_date: Optional[datetime] = None, default_window_days: int = 30, ) -> Tuple[datetime, datetime]: - if end_date is None: end_date = datetime.now(tz=timezone.utc) else: From b15df790156723caed7d57cf97e864d95a640b87 Mon Sep 17 00:00:00 2001 From: Francisco Javier Arceo Date: Fri, 27 Mar 2026 20:52:15 -0400 Subject: [PATCH 3/3] Allowlist Chronon quickstart Mongo URI in bootstrap script --- infra/scripts/chronon/start-local-chronon-service.sh | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/infra/scripts/chronon/start-local-chronon-service.sh b/infra/scripts/chronon/start-local-chronon-service.sh index d0a30a366db..3522ce8f8db 100755 --- a/infra/scripts/chronon/start-local-chronon-service.sh +++ b/infra/scripts/chronon/start-local-chronon-service.sh @@ -84,10 +84,8 @@ run_quickstart_online_prep() { cd /srv/chronon run.py --conf production/group_bys/quickstart/purchases.v1 --mode upload --ds 2023-12-01 run.py --conf production/group_bys/quickstart/returns.v1 --mode upload --ds 2023-12-01 - # pragma: allowlist secret - /opt/spark/bin/spark-submit --class ai.chronon.quickstart.online.Spark2MongoLoader --master local[*] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_purchases_v1_upload mongodb://admin:admin@'"${MONGO_CONTAINER}"':27017/?authSource=admin - # pragma: allowlist secret - /opt/spark/bin/spark-submit --class ai.chronon.quickstart.online.Spark2MongoLoader --master local[*] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_returns_v1_upload mongodb://admin:admin@'"${MONGO_CONTAINER}"':27017/?authSource=admin + /opt/spark/bin/spark-submit --class ai.chronon.quickstart.online.Spark2MongoLoader --master local[*] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_purchases_v1_upload mongodb://admin:admin@'"${MONGO_CONTAINER}"':27017/?authSource=admin # pragma: allowlist secret + /opt/spark/bin/spark-submit --class ai.chronon.quickstart.online.Spark2MongoLoader --master local[*] /srv/onlineImpl/target/scala-2.12/mongo-online-impl-assembly-0.1.0-SNAPSHOT.jar default.quickstart_returns_v1_upload mongodb://admin:admin@'"${MONGO_CONTAINER}"':27017/?authSource=admin # pragma: allowlist secret run.py --mode metadata-upload --conf production/joins/quickstart/training_set.v2 --ds 2023-12-01 run.py --mode fetch --type join --name quickstart/training_set.v2 -k "{\"user_id\":\"5\"}" '