From 381755eae44e0c170b5036b8869b06f8931fd33c Mon Sep 17 00:00:00 2001 From: ntkathole Date: Mon, 31 Mar 2025 13:03:30 +0530 Subject: [PATCH] feat: Improved CLI file structuring Signed-off-by: ntkathole --- .github/workflows/smoke_tests.yml | 2 +- pyproject.toml | 2 +- sdk/python/docs/source/conf.py | 3 - sdk/python/feast/cli.py | 1651 ----------------- sdk/python/feast/cli/__init__.py | 0 sdk/python/feast/cli/cli.py | 464 +++++ sdk/python/feast/cli/cli_options.py | 8 + sdk/python/feast/{ => cli}/cli_utils.py | 0 sdk/python/feast/cli/data_sources.py | 55 + sdk/python/feast/cli/entities.py | 55 + sdk/python/feast/cli/feature_services.py | 62 + sdk/python/feast/cli/feature_views.py | 72 + sdk/python/feast/cli/features.py | 176 ++ .../feast/cli/on_demand_feature_views.py | 57 + sdk/python/feast/cli/permissions.py | 271 +++ sdk/python/feast/cli/projects.py | 80 + sdk/python/feast/cli/saved_datasets.py | 57 + sdk/python/feast/cli/serve.py | 234 +++ sdk/python/feast/cli/stream_feature_views.py | 57 + sdk/python/feast/cli/ui.py | 82 + sdk/python/feast/cli/validation_references.py | 57 + sdk/python/tests/utils/cli_repo_creator.py | 2 +- setup.py | 2 +- 23 files changed, 1791 insertions(+), 1658 deletions(-) delete mode 100644 sdk/python/feast/cli.py create mode 100644 sdk/python/feast/cli/__init__.py create mode 100644 sdk/python/feast/cli/cli.py create mode 100644 sdk/python/feast/cli/cli_options.py rename sdk/python/feast/{ => cli}/cli_utils.py (100%) create mode 100644 sdk/python/feast/cli/data_sources.py create mode 100644 sdk/python/feast/cli/entities.py create mode 100644 sdk/python/feast/cli/feature_services.py create mode 100644 sdk/python/feast/cli/feature_views.py create mode 100644 sdk/python/feast/cli/features.py create mode 100644 sdk/python/feast/cli/on_demand_feature_views.py create mode 100644 sdk/python/feast/cli/permissions.py create mode 100644 sdk/python/feast/cli/projects.py create mode 100644 sdk/python/feast/cli/saved_datasets.py create mode 100644 sdk/python/feast/cli/serve.py create mode 100644 sdk/python/feast/cli/stream_feature_views.py create mode 100644 sdk/python/feast/cli/ui.py create mode 100644 sdk/python/feast/cli/validation_references.py diff --git a/.github/workflows/smoke_tests.yml b/.github/workflows/smoke_tests.yml index a7eb1966269..e9048d4b33d 100644 --- a/.github/workflows/smoke_tests.yml +++ b/.github/workflows/smoke_tests.yml @@ -42,4 +42,4 @@ jobs: uv pip sync --system sdk/python/requirements/py${{ matrix.python-version }}-requirements.txt uv pip install --system --no-deps . - name: Test Imports - run: python -c "from feast import cli" + run: python -c "from feast.cli import cli" diff --git a/pyproject.toml b/pyproject.toml index f24235176b7..42641832fe5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -164,7 +164,7 @@ Homepage = "https://github.com/feast-dev/feast" name = "Feast" [project.scripts] -feast = "feast.cli:cli" +feast = "feast.cli.cli:cli" [build-system] requires = [ diff --git a/sdk/python/docs/source/conf.py b/sdk/python/docs/source/conf.py index 5e8fd11d161..fccbb2b09ca 100644 --- a/sdk/python/docs/source/conf.py +++ b/sdk/python/docs/source/conf.py @@ -19,8 +19,6 @@ import os import sys -import sphinx_rtd_theme - sys.path.insert(0, os.path.abspath("../../feast")) sys.path.insert(0, os.path.abspath("../..")) @@ -32,7 +30,6 @@ # example where the Python protos did not build, which subsequently broke # the RTD build. In order to fix this, we manually compile the protos. import subprocess - from pathlib import Path # cwd will be feast/sdk/python/docs/source diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py deleted file mode 100644 index 81dac70a9fb..00000000000 --- a/sdk/python/feast/cli.py +++ /dev/null @@ -1,1651 +0,0 @@ -# Copyright 2019 The Feast Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import json -import logging -from datetime import datetime -from importlib.metadata import version as importlib_version -from pathlib import Path -from typing import Any, List, Optional - -import click -import pandas as pd -import yaml -from bigtree import Node -from colorama import Fore, Style -from dateutil import parser -from pygments import formatters, highlight, lexers - -import feast.cli_utils as cli_utils -from feast import BatchFeatureView, Entity, FeatureService, StreamFeatureView, utils -from feast.constants import ( - DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT, - DEFAULT_OFFLINE_SERVER_PORT, - DEFAULT_REGISTRY_SERVER_PORT, -) -from feast.data_source import DataSource -from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError -from feast.feature_view import FeatureView -from feast.on_demand_feature_view import OnDemandFeatureView -from feast.permissions.policy import RoleBasedPolicy -from feast.repo_config import load_repo_config -from feast.repo_operations import ( - apply_total, - cli_check_repo, - create_feature_store, - generate_project_name, - init_repo, - plan, - registry_dump, - teardown, -) -from feast.saved_dataset import SavedDataset, ValidationReference -from feast.utils import maybe_local_tz - -_logger = logging.getLogger(__name__) -tagsOption = click.option( - "--tags", - help="Filter by tags (e.g. --tags 'key:value' --tags 'key:value, key:value, ...'). Items return when ALL tags match.", - default=[""], - multiple=True, -) - - -class NoOptionDefaultFormat(click.Command): - def format_options(self, ctx: click.Context, formatter: click.HelpFormatter): - """Writes all the options into the formatter if they exist.""" - opts = [] - for param in self.get_params(ctx): - rv = param.get_help_record(ctx) - if rv is not None: - opts.append(rv) - if opts: - with formatter.section("Options(No current command options)"): - formatter.write_dl(opts) - - -@click.group() -@click.option( - "--chdir", - "-c", - help="Switch to a different feature repository directory before executing the given subcommand.", -) -@click.option( - "--log-level", - default="warning", - help="The logging level. One of DEBUG, INFO, WARNING, ERROR, and CRITICAL (case-insensitive).", -) -@click.option( - "--feature-store-yaml", - "-f", - help="Override the directory where the CLI should look for the feature_store.yaml file.", -) -@click.pass_context -def cli( - ctx: click.Context, - chdir: Optional[str], - log_level: str, - feature_store_yaml: Optional[str], -): - """ - Feast CLI - - For more information, see our public docs at https://docs.feast.dev/ - """ - ctx.ensure_object(dict) - ctx.obj["CHDIR"] = Path.cwd() if chdir is None else Path(chdir).absolute() - ctx.obj["FS_YAML_FILE"] = ( - Path(feature_store_yaml).absolute() - if feature_store_yaml - else utils.get_default_yaml_file_path(ctx.obj["CHDIR"]) - ) - try: - level = getattr(logging, log_level.upper()) - logging.basicConfig( - format="%(asctime)s %(name)s %(levelname)s: %(message)s", - datefmt="%m/%d/%Y %I:%M:%S %p", - level=level, - ) - # Override the logging level for already created loggers (due to loggers being created at the import time) - # Note, that format & datefmt does not need to be set, because by default child loggers don't override them - - # Also note, that mypy complains that logging.root doesn't have "manager" because of the way it's written. - # So we have to put a type ignore hint for mypy. - for logger_name in logging.root.manager.loggerDict: # type: ignore - if "feast" in logger_name: - logger = logging.getLogger(logger_name) - logger.setLevel(level) - except Exception as e: - raise e - pass - - -@cli.command() -def version(): - """ - Display Feast SDK version - """ - print(f'Feast SDK Version: "{importlib_version("feast")}"') - - -@cli.command() -@click.pass_context -def configuration(ctx: click.Context): - """ - Display Feast configuration - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - repo_config = load_repo_config(repo, fs_yaml_file) - if repo_config: - config_dict = repo_config.model_dump(by_alias=True, exclude_unset=True) - config_dict.pop("repo_path", None) - print(yaml.dump(config_dict, default_flow_style=False, sort_keys=False)) - else: - print("No configuration found.") - - -@cli.command() -@click.option( - "--host", - "-h", - type=click.STRING, - default="0.0.0.0", - show_default=True, - help="Specify a host for the server", -) -@click.option( - "--port", - "-p", - type=click.INT, - default=8888, - show_default=True, - help="Specify a port for the server", -) -@click.option( - "--registry_ttl_sec", - "-r", - help="Number of seconds after which the registry is refreshed", - type=click.INT, - default=5, - show_default=True, -) -@click.option( - "--root_path", - help="Provide root path to make the UI working behind proxy", - type=click.STRING, - default="", -) -@click.option( - "--key", - "-k", - "tls_key_path", - type=click.STRING, - default="", - show_default=False, - help="path to TLS(SSL) certificate private key. You need to pass --cert arg as well to start server in TLS mode", -) -@click.option( - "--cert", - "-c", - "tls_cert_path", - type=click.STRING, - default="", - show_default=False, - help="path to TLS(SSL) certificate public key. You need to pass --key arg as well to start server in TLS mode", -) -@click.pass_context -def ui( - ctx: click.Context, - host: str, - port: int, - registry_ttl_sec: int, - root_path: str = "", - tls_key_path: str = "", - tls_cert_path: str = "", -): - """ - Shows the Feast UI over the current directory - """ - if (tls_key_path and not tls_cert_path) or (not tls_key_path and tls_cert_path): - raise click.BadParameter( - "Please configure --key and --cert args to start the feature server in SSL mode." - ) - store = create_feature_store(ctx) - # Pass in the registry_dump method to get around a circular dependency - store.serve_ui( - host=host, - port=port, - get_registry_dump=registry_dump, - registry_ttl_sec=registry_ttl_sec, - root_path=root_path, - tls_key_path=tls_key_path, - tls_cert_path=tls_cert_path, - ) - - -@cli.command() -@click.pass_context -def endpoint(ctx: click.Context): - """ - Display feature server endpoints - """ - store = create_feature_store(ctx) - endpoint = store.get_feature_server_endpoint() - if endpoint is not None: - _logger.info( - f"Feature server endpoint: {Style.BRIGHT + Fore.GREEN}{endpoint}{Style.RESET_ALL}" - ) - else: - _logger.info("There is no active feature server.") - - -@cli.group(name="data-sources") -def data_sources_cmd(): - """ - Access data sources - """ - pass - - -@data_sources_cmd.command("describe") -@click.argument("name", type=click.STRING) -@click.pass_context -def data_source_describe(ctx: click.Context, name: str): - """ - Describe a data source - """ - store = create_feature_store(ctx) - - try: - data_source = store.get_data_source(name) - except FeastObjectNotFoundException as e: - print(e) - exit(1) - - print( - yaml.dump( - yaml.safe_load(str(data_source)), default_flow_style=False, sort_keys=False - ) - ) - - -@data_sources_cmd.command(name="list") -@tagsOption -@click.pass_context -def data_source_list(ctx: click.Context, tags: list[str]): - """ - List all data sources - """ - store = create_feature_store(ctx) - table = [] - tags_filter = utils.tags_list_to_dict(tags) - for datasource in store.list_data_sources(tags=tags_filter): - table.append([datasource.name, datasource.__class__]) - - from tabulate import tabulate - - print(tabulate(table, headers=["NAME", "CLASS"], tablefmt="plain")) - - -@cli.group(name="projects") -def projects_cmd(): - """ - Access projects - """ - pass - - -@projects_cmd.command("describe") -@click.argument("name", type=click.STRING) -@click.pass_context -def project_describe(ctx: click.Context, name: str): - """ - Describe a project - """ - store = create_feature_store(ctx) - - try: - project = store.get_project(name) - except FeastObjectNotFoundException as e: - print(e) - exit(1) - - print( - yaml.dump( - yaml.safe_load(str(project)), default_flow_style=False, sort_keys=False - ) - ) - - -@projects_cmd.command("current_project") -@click.pass_context -def project_current(ctx: click.Context): - """ - Returns the current project configured with FeatureStore object - """ - store = create_feature_store(ctx) - - try: - project = store.get_project(name=None) - except FeastObjectNotFoundException as e: - print(e) - exit(1) - - print( - yaml.dump( - yaml.safe_load(str(project)), default_flow_style=False, sort_keys=False - ) - ) - - -@projects_cmd.command(name="list") -@tagsOption -@click.pass_context -def project_list(ctx: click.Context, tags: list[str]): - """ - List all projects - """ - store = create_feature_store(ctx) - table = [] - tags_filter = utils.tags_list_to_dict(tags) - for project in store.list_projects(tags=tags_filter): - table.append([project.name, project.description, project.tags, project.owner]) - - from tabulate import tabulate - - print( - tabulate( - table, headers=["NAME", "DESCRIPTION", "TAGS", "OWNER"], tablefmt="plain" - ) - ) - - -@cli.group(name="entities") -def entities_cmd(): - """ - Access entities - """ - pass - - -@entities_cmd.command("describe") -@click.argument("name", type=click.STRING) -@click.pass_context -def entity_describe(ctx: click.Context, name: str): - """ - Describe an entity - """ - store = create_feature_store(ctx) - - try: - entity = store.get_entity(name) - except FeastObjectNotFoundException as e: - print(e) - exit(1) - - print( - yaml.dump( - yaml.safe_load(str(entity)), default_flow_style=False, sort_keys=False - ) - ) - - -@entities_cmd.command(name="list") -@tagsOption -@click.pass_context -def entity_list(ctx: click.Context, tags: list[str]): - """ - List all entities - """ - store = create_feature_store(ctx) - table = [] - tags_filter = utils.tags_list_to_dict(tags) - for entity in store.list_entities(tags=tags_filter): - table.append([entity.name, entity.description, entity.value_type]) - - from tabulate import tabulate - - print(tabulate(table, headers=["NAME", "DESCRIPTION", "TYPE"], tablefmt="plain")) - - -@cli.group(name="feature-services") -def feature_services_cmd(): - """ - Access feature services - """ - pass - - -@feature_services_cmd.command("describe") -@click.argument("name", type=click.STRING) -@click.pass_context -def feature_service_describe(ctx: click.Context, name: str): - """ - Describe a feature service - """ - store = create_feature_store(ctx) - - try: - feature_service = store.get_feature_service(name) - except FeastObjectNotFoundException as e: - print(e) - exit(1) - - print( - yaml.dump( - yaml.safe_load(str(feature_service)), - default_flow_style=False, - sort_keys=False, - ) - ) - - -@feature_services_cmd.command(name="list") -@tagsOption -@click.pass_context -def feature_service_list(ctx: click.Context, tags: list[str]): - """ - List all feature services - """ - store = create_feature_store(ctx) - feature_services = [] - tags_filter = utils.tags_list_to_dict(tags) - for feature_service in store.list_feature_services(tags=tags_filter): - feature_names = [] - for projection in feature_service.feature_view_projections: - feature_names.extend( - [f"{projection.name}:{feature.name}" for feature in projection.features] - ) - feature_services.append([feature_service.name, ", ".join(feature_names)]) - - from tabulate import tabulate - - print(tabulate(feature_services, headers=["NAME", "FEATURES"], tablefmt="plain")) - - -@cli.group(name="feature-views") -def feature_views_cmd(): - """ - Access feature views - """ - pass - - -@feature_views_cmd.command("describe") -@click.argument("name", type=click.STRING) -@click.pass_context -def feature_view_describe(ctx: click.Context, name: str): - """ - Describe a feature view - """ - store = create_feature_store(ctx) - - try: - feature_view = store.get_feature_view(name) - except FeastObjectNotFoundException as e: - print(e) - exit(1) - - print( - yaml.dump( - yaml.safe_load(str(feature_view)), default_flow_style=False, sort_keys=False - ) - ) - - -@feature_views_cmd.command(name="list") -@tagsOption -@click.pass_context -def feature_view_list(ctx: click.Context, tags: list[str]): - """ - List all feature views - """ - store = create_feature_store(ctx) - table = [] - tags_filter = utils.tags_list_to_dict(tags) - for feature_view in [ - *store.list_batch_feature_views(tags=tags_filter), - *store.list_on_demand_feature_views(tags=tags_filter), - ]: - entities = set() - if isinstance(feature_view, FeatureView): - entities.update(feature_view.entities) - elif isinstance(feature_view, OnDemandFeatureView): - for backing_fv in feature_view.source_feature_view_projections.values(): - entities.update(store.get_feature_view(backing_fv.name).entities) - table.append( - [ - feature_view.name, - entities if len(entities) > 0 else "n/a", - type(feature_view).__name__, - ] - ) - - from tabulate import tabulate - - print(tabulate(table, headers=["NAME", "ENTITIES", "TYPE"], tablefmt="plain")) - - -@cli.group(name="features") -def features_cmd(): - """ - Access features - """ - pass - - -@features_cmd.command(name="list") -@click.option( - "--output", - type=click.Choice(["table", "json"], case_sensitive=False), - default="table", - show_default=True, - help="Output format", -) -@click.pass_context -def features_list(ctx: click.Context, output: str): - """ - List all features - """ - store = create_feature_store(ctx) - feature_views = [ - *store.list_batch_feature_views(), - *store.list_on_demand_feature_views(), - *store.list_stream_feature_views(), - ] - feature_list = [] - for fv in feature_views: - for feature in fv.features: - feature_list.append([feature.name, fv.name, str(feature.dtype)]) - - if output == "json": - json_output = [ - {"feature_name": fn, "feature_view": fv, "dtype": dt} - for fv, fn, dt in feature_list - ] - click.echo(json.dumps(json_output, indent=4)) - else: - from tabulate import tabulate - - click.echo( - tabulate( - feature_list, - headers=["Feature", "Feature View", "Data Type"], - tablefmt="plain", - ) - ) - - -@features_cmd.command("describe") -@click.argument("feature_name", type=str) -@click.pass_context -def describe_feature(ctx: click.Context, feature_name: str): - """ - Describe a specific feature by name - """ - store = create_feature_store(ctx) - feature_views = [ - *store.list_batch_feature_views(), - *store.list_on_demand_feature_views(), - *store.list_stream_feature_views(), - ] - - feature_details = [] - for fv in feature_views: - for feature in fv.features: - if feature.name == feature_name: - feature_details.append( - { - "Feature Name": feature.name, - "Feature View": fv.name, - "Data Type": str(feature.dtype), - "Description": getattr(feature, "description", "N/A"), - "Online Store": getattr(fv, "online", "N/A"), - "Source": json.loads(str(getattr(fv, "batch_source", "N/A"))), - } - ) - if not feature_details: - click.echo(f"Feature '{feature_name}' not found in any feature view.") - return - - click.echo(json.dumps(feature_details, indent=4)) - - -@cli.command("get-online-features") -@click.option( - "--entities", - "-e", - type=str, - multiple=True, - required=True, - help="Entity key-value pairs (e.g., driver_id=1001)", -) -@click.option( - "--features", - "-f", - multiple=True, - required=True, - help="Features to retrieve. (e.g.,feature-view:feature-name) ex: driver_hourly_stats:conv_rate", -) -@click.pass_context -def get_online_features(ctx: click.Context, entities: List[str], features: List[str]): - """ - Fetch online feature values for a given entity ID - """ - store = create_feature_store(ctx) - entity_dict: dict[str, List[str]] = {} - for entity in entities: - try: - key, value = entity.split("=") - if key not in entity_dict: - entity_dict[key] = [] - entity_dict[key].append(value) - except ValueError: - click.echo(f"Invalid entity format: {entity}. Use key=value format.") - return - entity_rows = [ - dict(zip(entity_dict.keys(), values)) for values in zip(*entity_dict.values()) - ] - feature_vector = store.get_online_features( - features=list(features), - entity_rows=entity_rows, - ).to_dict() - - click.echo(json.dumps(feature_vector, indent=4)) - - -@cli.command(name="get-historical-features") -@click.option( - "--dataframe", - "-d", - type=str, - required=True, - help='JSON string containing entities and timestamps. Example: \'[{"event_timestamp": "2025-03-29T12:00:00", "driver_id": 1001}]\'', -) -@click.option( - "--features", - "-f", - multiple=True, - required=True, - help="Features to retrieve. feature-view:feature-name ex: driver_hourly_stats:conv_rate", -) -@click.pass_context -def get_historical_features(ctx: click.Context, dataframe: str, features: List[str]): - """ - Fetch historical feature values for a given entity ID - """ - store = create_feature_store(ctx) - try: - entity_list = json.loads(dataframe) - if not isinstance(entity_list, list): - raise ValueError("Entities must be a list of dictionaries.") - - entity_df = pd.DataFrame(entity_list) - entity_df["event_timestamp"] = pd.to_datetime(entity_df["event_timestamp"]) - - except Exception as e: - click.echo(f"Error parsing entities JSON: {e}", err=True) - return - - feature_vector = store.get_historical_features( - entity_df=entity_df, - features=list(features), - ).to_df() - - click.echo(feature_vector.to_json(orient="records", indent=4)) - - -@cli.group(name="on-demand-feature-views") -def on_demand_feature_views_cmd(): - """ - [Experimental] Access on demand feature views - """ - pass - - -@on_demand_feature_views_cmd.command("describe") -@click.argument("name", type=click.STRING) -@click.pass_context -def on_demand_feature_view_describe(ctx: click.Context, name: str): - """ - [Experimental] Describe an on demand feature view - """ - store = create_feature_store(ctx) - - try: - on_demand_feature_view = store.get_on_demand_feature_view(name) - except FeastObjectNotFoundException as e: - print(e) - exit(1) - - print( - yaml.dump( - yaml.safe_load(str(on_demand_feature_view)), - default_flow_style=False, - sort_keys=False, - ) - ) - - -@on_demand_feature_views_cmd.command(name="list") -@tagsOption -@click.pass_context -def on_demand_feature_view_list(ctx: click.Context, tags: list[str]): - """ - [Experimental] List all on demand feature views - """ - store = create_feature_store(ctx) - table = [] - tags_filter = utils.tags_list_to_dict(tags) - for on_demand_feature_view in store.list_on_demand_feature_views(tags=tags_filter): - table.append([on_demand_feature_view.name]) - - from tabulate import tabulate - - print(tabulate(table, headers=["NAME"], tablefmt="plain")) - - -@cli.group(name="saved-datasets") -def saved_datasets_cmd(): - """ - [Experimental] Access saved datasets - """ - pass - - -@saved_datasets_cmd.command("describe") -@click.argument("name", type=click.STRING) -@click.pass_context -def saved_datasets_describe(ctx: click.Context, name: str): - """ - [Experimental] Describe a saved dataset - """ - store = create_feature_store(ctx) - - try: - saved_dataset = store.get_saved_dataset(name) - except FeastObjectNotFoundException as e: - print(e) - exit(1) - - print( - yaml.dump( - yaml.safe_load(str(saved_dataset)), - default_flow_style=False, - sort_keys=False, - ) - ) - - -@saved_datasets_cmd.command(name="list") -@tagsOption -@click.pass_context -def saved_datasets_list(ctx: click.Context, tags: list[str]): - """ - [Experimental] List all saved datasets - """ - store = create_feature_store(ctx) - table = [] - tags_filter = utils.tags_list_to_dict(tags) - for saved_dataset in store.list_saved_datasets(tags=tags_filter): - table.append([saved_dataset.name]) - - from tabulate import tabulate - - print(tabulate(table, headers=["NAME"], tablefmt="plain")) - - -@cli.group(name="stream-feature-views") -def stream_feature_views_cmd(): - """ - [Experimental] Access stream feature views - """ - pass - - -@stream_feature_views_cmd.command("describe") -@click.argument("name", type=click.STRING) -@click.pass_context -def stream_feature_views_describe(ctx: click.Context, name: str): - """ - [Experimental] Describe a stream feature view - """ - store = create_feature_store(ctx) - - try: - stream_feature_view = store.get_stream_feature_view(name) - except FeastObjectNotFoundException as e: - print(e) - exit(1) - - print( - yaml.dump( - yaml.safe_load(str(stream_feature_view)), - default_flow_style=False, - sort_keys=False, - ) - ) - - -@stream_feature_views_cmd.command(name="list") -@tagsOption -@click.pass_context -def stream_feature_views_list(ctx: click.Context, tags: list[str]): - """ - [Experimental] List all stream feature views - """ - store = create_feature_store(ctx) - table = [] - tags_filter = utils.tags_list_to_dict(tags) - for stream_feature_view in store.list_stream_feature_views(tags=tags_filter): - table.append([stream_feature_view.name]) - - from tabulate import tabulate - - print(tabulate(table, headers=["NAME"], tablefmt="plain")) - - -@cli.group(name="validation-references") -def validation_references_cmd(): - """ - [Experimental] Access validation references - """ - pass - - -@validation_references_cmd.command("describe") -@click.argument("name", type=click.STRING) -@click.pass_context -def validation_references_describe(ctx: click.Context, name: str): - """ - [Experimental] Describe a validation reference - """ - store = create_feature_store(ctx) - - try: - validation_reference = store.get_validation_reference(name) - except FeastObjectNotFoundException as e: - print(e) - exit(1) - - print( - yaml.dump( - yaml.safe_load(str(validation_reference)), - default_flow_style=False, - sort_keys=False, - ) - ) - - -@validation_references_cmd.command(name="list") -@tagsOption -@click.pass_context -def validation_references_list(ctx: click.Context, tags: list[str]): - """ - [Experimental] List all validation references - """ - store = create_feature_store(ctx) - table = [] - tags_filter = utils.tags_list_to_dict(tags) - for validation_reference in store.list_validation_references(tags=tags_filter): - table.append([validation_reference.name]) - - from tabulate import tabulate - - print(tabulate(table, headers=["NAME"], tablefmt="plain")) - - -@cli.command("plan", cls=NoOptionDefaultFormat) -@click.option( - "--skip-source-validation", - is_flag=True, - help="Don't validate the data sources by checking for that the tables exist.", -) -@click.pass_context -def plan_command(ctx: click.Context, skip_source_validation: bool): - """ - Create or update a feature store deployment - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - repo_config = load_repo_config(repo, fs_yaml_file) - try: - plan(repo_config, repo, skip_source_validation) - except FeastProviderLoginError as e: - print(str(e)) - - -@cli.command("apply", cls=NoOptionDefaultFormat) -@click.option( - "--skip-source-validation", - is_flag=True, - help="Don't validate the data sources by checking for that the tables exist.", -) -@click.pass_context -def apply_total_command(ctx: click.Context, skip_source_validation: bool): - """ - Create or update a feature store deployment - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - - repo_config = load_repo_config(repo, fs_yaml_file) - try: - apply_total(repo_config, repo, skip_source_validation) - except FeastProviderLoginError as e: - print(str(e)) - - -@cli.command("teardown", cls=NoOptionDefaultFormat) -@click.pass_context -def teardown_command(ctx: click.Context): - """ - Tear down deployed feature store infrastructure - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - repo_config = load_repo_config(repo, fs_yaml_file) - - teardown(repo_config, repo) - - -@cli.command("registry-dump") -@click.pass_context -def registry_dump_command(ctx: click.Context): - """ - Print contents of the metadata registry - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - repo_config = load_repo_config(repo, fs_yaml_file) - - click.echo(registry_dump(repo_config, repo_path=repo)) - - -@cli.command("materialize") -@click.argument("start_ts") -@click.argument("end_ts") -@click.option( - "--views", - "-v", - help="Feature views to materialize", - multiple=True, -) -@click.pass_context -def materialize_command( - ctx: click.Context, start_ts: str, end_ts: str, views: List[str] -): - """ - Run a (non-incremental) materialization job to ingest data into the online store. Feast - will read all data between START_TS and END_TS from the offline store and write it to the - online store. If you don't specify feature view names using --views, all registered Feature - Views will be materialized. - - START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' - """ - store = create_feature_store(ctx) - - store.materialize( - feature_views=None if not views else views, - start_date=utils.make_tzaware(parser.parse(start_ts)), - end_date=utils.make_tzaware(parser.parse(end_ts)), - ) - - -@cli.command("materialize-incremental") -@click.argument("end_ts") -@click.option( - "--views", - "-v", - help="Feature views to incrementally materialize", - multiple=True, -) -@click.pass_context -def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List[str]): - """ - Run an incremental materialization job to ingest new data into the online store. Feast will read - all data from the previously ingested point to END_TS from the offline store and write it to the - online store. If you don't specify feature view names using --views, all registered Feature - Views will be incrementally materialized. - - END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' - """ - store = create_feature_store(ctx) - store.materialize_incremental( - feature_views=None if not views else views, - end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)), - ) - - -@cli.command("init") -@click.argument("PROJECT_DIRECTORY", required=False) -@click.option( - "--minimal", "-m", is_flag=True, help="Create an empty project repository" -) -@click.option( - "--template", - "-t", - type=click.Choice( - [ - "local", - "gcp", - "aws", - "snowflake", - "spark", - "postgres", - "hbase", - "cassandra", - "hazelcast", - "ikv", - "couchbase", - "milvus", - ], - case_sensitive=False, - ), - help="Specify a template for the created project", - default="local", -) -def init_command(project_directory, minimal: bool, template: str): - """Create a new Feast repository""" - if not project_directory: - project_directory = generate_project_name() - - if minimal: - template = "minimal" - - init_repo(project_directory, template) - - -@cli.command("serve") -@click.option( - "--host", - "-h", - type=click.STRING, - default="127.0.0.1", - show_default=True, - help="Specify a host for the server", -) -@click.option( - "--port", - "-p", - type=click.INT, - default=6566, - show_default=True, - help="Specify a port for the server", -) -@click.option( - "--type", - "-t", - "type_", - type=click.STRING, - default="http", - show_default=True, - help="Specify a server type: 'http' or 'grpc'", -) -@click.option( - "--no-access-log", - is_flag=True, - show_default=True, - help="Disable the Uvicorn access log", -) -@click.option( - "--workers", - "-w", - type=click.INT, - default=1, - show_default=True, - help="Number of worker", -) -@click.option( - "--keep-alive-timeout", - type=click.INT, - default=5, - show_default=True, - help="Timeout for keep alive", -) -@click.option( - "--registry_ttl_sec", - "-r", - help="Number of seconds after which the registry is refreshed", - type=click.INT, - default=5, - show_default=True, -) -@click.option( - "--key", - "-k", - "tls_key_path", - type=click.STRING, - default="", - show_default=False, - help="path to TLS certificate private key. You need to pass --cert as well to start server in TLS mode", -) -@click.option( - "--cert", - "-c", - "tls_cert_path", - type=click.STRING, - default="", - show_default=False, - help="path to TLS certificate public key. You need to pass --key as well to start server in TLS mode", -) -@click.option( - "--metrics", - "-m", - is_flag=True, - show_default=True, - help="Enable the Metrics Server", -) -@click.pass_context -def serve_command( - ctx: click.Context, - host: str, - port: int, - type_: str, - no_access_log: bool, - workers: int, - metrics: bool, - keep_alive_timeout: int, - tls_key_path: str, - tls_cert_path: str, - registry_ttl_sec: int = 5, -): - """Start a feature server locally on a given port.""" - if (tls_key_path and not tls_cert_path) or (not tls_key_path and tls_cert_path): - raise click.BadParameter( - "Please pass --cert and --key args to start the feature server in TLS mode." - ) - store = create_feature_store(ctx) - - store.serve( - host=host, - port=port, - type_=type_, - no_access_log=no_access_log, - workers=workers, - metrics=metrics, - keep_alive_timeout=keep_alive_timeout, - tls_key_path=tls_key_path, - tls_cert_path=tls_cert_path, - registry_ttl_sec=registry_ttl_sec, - ) - - -@cli.command("listen") -@click.option( - "--address", - "-a", - type=click.STRING, - default="localhost:50051", - show_default=True, - help="Address of the gRPC server", -) -@click.option( - "--max_workers", - "-w", - type=click.INT, - default=10, - show_default=False, - help="The maximum number of threads that can be used to execute the gRPC calls", -) -@click.option( - "--registry_ttl_sec", - "-r", - help="Number of seconds after which the registry is refreshed", - type=click.INT, - default=5, - show_default=True, -) -@click.pass_context -def listen_command( - ctx: click.Context, - address: str, - max_workers: int, - registry_ttl_sec: int, -): - """Start a gRPC feature server to ingest streaming features on given address""" - from feast.infra.contrib.grpc_server import get_grpc_server - - store = create_feature_store(ctx) - server = get_grpc_server(address, store, max_workers, registry_ttl_sec) - server.start() - server.wait_for_termination() - - -@cli.command("serve_transformations") -@click.option( - "--port", - "-p", - type=click.INT, - default=DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT, - help="Specify a port for the server", -) -@click.pass_context -def serve_transformations_command(ctx: click.Context, port: int): - """[Experimental] Start a feature consumption server locally on a given port.""" - store = create_feature_store(ctx) - - store.serve_transformations(port) - - -@cli.command("serve_registry") -@click.option( - "--port", - "-p", - type=click.INT, - default=DEFAULT_REGISTRY_SERVER_PORT, - help="Specify a port for the server", -) -@click.option( - "--key", - "-k", - "tls_key_path", - type=click.STRING, - default="", - show_default=False, - help="path to TLS certificate private key. You need to pass --cert as well to start server in TLS mode", -) -@click.option( - "--cert", - "-c", - "tls_cert_path", - type=click.STRING, - default="", - show_default=False, - help="path to TLS certificate public key. You need to pass --key as well to start server in TLS mode", -) -@click.pass_context -def serve_registry_command( - ctx: click.Context, - port: int, - tls_key_path: str, - tls_cert_path: str, -): - """Start a registry server locally on a given port.""" - if (tls_key_path and not tls_cert_path) or (not tls_key_path and tls_cert_path): - raise click.BadParameter( - "Please pass --cert and --key args to start the registry server in TLS mode." - ) - store = create_feature_store(ctx) - - store.serve_registry(port, tls_key_path, tls_cert_path) - - -@cli.command("serve_offline") -@click.option( - "--host", - "-h", - type=click.STRING, - default="127.0.0.1", - show_default=True, - help="Specify a host for the server", -) -@click.option( - "--port", - "-p", - type=click.INT, - default=DEFAULT_OFFLINE_SERVER_PORT, - help="Specify a port for the server", -) -@click.option( - "--key", - "-k", - "tls_key_path", - type=click.STRING, - default="", - show_default=False, - help="path to TLS certificate private key. You need to pass --cert as well to start server in TLS mode", -) -@click.option( - "--cert", - "-c", - "tls_cert_path", - type=click.STRING, - default="", - show_default=False, - help="path to TLS certificate public key. You need to pass --key as well to start server in TLS mode", -) -@click.pass_context -def serve_offline_command( - ctx: click.Context, - host: str, - port: int, - tls_key_path: str, - tls_cert_path: str, -): - """Start a remote server locally on a given host, port.""" - if (tls_key_path and not tls_cert_path) or (not tls_key_path and tls_cert_path): - raise click.BadParameter( - "Please pass --cert and --key args to start the offline server in TLS mode." - ) - store = create_feature_store(ctx) - - store.serve_offline(host, port, tls_key_path, tls_cert_path) - - -@cli.command("validate") -@click.option( - "--feature-service", - "-f", - help="Specify a feature service name", -) -@click.option( - "--reference", - "-r", - help="Specify a validation reference name", -) -@click.option( - "--no-profile-cache", - is_flag=True, - help="Do not store cached profile in registry", -) -@click.argument("start_ts") -@click.argument("end_ts") -@click.pass_context -def validate( - ctx: click.Context, - feature_service: str, - reference: str, - start_ts: str, - end_ts: str, - no_profile_cache, -): - """ - Perform validation of logged features (produced by a given feature service) against provided reference. - - START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' - """ - store = create_feature_store(ctx) - - _feature_service = store.get_feature_service(name=feature_service) - _reference = store.get_validation_reference(reference) - - result = store.validate_logged_features( - source=_feature_service, - reference=_reference, - start=maybe_local_tz(datetime.fromisoformat(start_ts)), - end=maybe_local_tz(datetime.fromisoformat(end_ts)), - throw_exception=False, - cache_profile=not no_profile_cache, - ) - - if not result: - print(f"{Style.BRIGHT + Fore.GREEN}Validation successful!{Style.RESET_ALL}") - return - - errors = [e.to_dict() for e in result.report.errors] - formatted_json = json.dumps(errors, indent=4) - colorful_json = highlight( - formatted_json, lexers.JsonLexer(), formatters.TerminalFormatter() - ) - print(f"{Style.BRIGHT + Fore.RED}Validation failed!{Style.RESET_ALL}") - print(colorful_json) - exit(1) - - -@cli.group(name="permissions") -def feast_permissions_cmd(): - """ - Access permissions - """ - pass - - -@feast_permissions_cmd.command(name="list") -@click.option( - "--verbose", - "-v", - is_flag=True, - help="Print the resources matching each configured permission", -) -@tagsOption -@click.pass_context -def feast_permissions_list_command(ctx: click.Context, verbose: bool, tags: list[str]): - from tabulate import tabulate - - table: list[Any] = [] - tags_filter = utils.tags_list_to_dict(tags) - - store = create_feature_store(ctx) - - permissions = store.list_permissions(tags=tags_filter) - - root_node = Node("permissions") - roles: set[str] = set() - - for p in permissions: - policy = p.policy - if not verbose: - cli_utils.handle_not_verbose_permissions_command(p, policy, table) - else: - if isinstance(policy, RoleBasedPolicy) and len(policy.get_roles()) > 0: - roles = set(policy.get_roles()) - permission_node = Node( - p.name + " " + str(list(roles)), parent=root_node - ) - else: - permission_node = Node(p.name, parent=root_node) - - for feast_type in p.types: - if feast_type in [ - FeatureView, - OnDemandFeatureView, - BatchFeatureView, - StreamFeatureView, - ]: - cli_utils.handle_fv_verbose_permissions_command( - feast_type, # type: ignore[arg-type] - p, - permission_node, - store, - tags_filter, - ) - elif feast_type == Entity: - cli_utils.handle_entity_verbose_permissions_command( - feast_type, # type: ignore[arg-type] - p, - permission_node, - store, - tags_filter, - ) - elif feast_type == FeatureService: - cli_utils.handle_fs_verbose_permissions_command( - feast_type, # type: ignore[arg-type] - p, - permission_node, - store, - tags_filter, - ) - elif feast_type == DataSource: - cli_utils.handle_ds_verbose_permissions_command( - feast_type, # type: ignore[arg-type] - p, - permission_node, - store, - tags_filter, - ) - elif feast_type == ValidationReference: - cli_utils.handle_vr_verbose_permissions_command( - feast_type, # type: ignore[arg-type] - p, - permission_node, - store, - tags_filter, - ) - elif feast_type == SavedDataset: - cli_utils.handle_sd_verbose_permissions_command( - feast_type, # type: ignore[arg-type] - p, - permission_node, - store, - tags_filter, - ) - - if not verbose: - print( - tabulate( - table, - headers=[ - "NAME", - "TYPES", - "NAME_PATTERNS", - "ACTIONS", - "ROLES", - "REQUIRED_TAGS", - ], - tablefmt="plain", - ) - ) - else: - cli_utils.print_permission_verbose_example() - - print("Permissions:") - print("") - root_node.show() - - -@feast_permissions_cmd.command("describe") -@click.argument("name", type=click.STRING) -@click.pass_context -def permission_describe(ctx: click.Context, name: str): - """ - Describe a permission - """ - store = create_feature_store(ctx) - - try: - permission = store.get_permission(name) - except FeastObjectNotFoundException as e: - print(e) - exit(1) - - print( - yaml.dump( - yaml.safe_load(str(permission)), default_flow_style=False, sort_keys=False - ) - ) - - -@feast_permissions_cmd.command(name="check") -@click.pass_context -def feast_permissions_check_command(ctx: click.Context): - """ - Validate the permissions configuration - """ - from tabulate import tabulate - - all_unsecured_table: list[Any] = [] - store = create_feature_store(ctx) - permissions = store.list_permissions() - objects = cli_utils.fetch_all_feast_objects( - store=store, - ) - - print( - f"{Style.BRIGHT + Fore.RED}The following resources are not secured by any permission configuration:{Style.RESET_ALL}" - ) - for o in objects: - cli_utils.handle_permissions_check_command( - object=o, permissions=permissions, table=all_unsecured_table - ) - print( - tabulate( - all_unsecured_table, - headers=[ - "NAME", - "TYPE", - ], - tablefmt="plain", - ) - ) - - all_unsecured_actions_table: list[Any] = [] - print( - f"{Style.BRIGHT + Fore.RED}The following actions are not secured by any permission configuration (Note: this might not be a security concern, depending on the used APIs):{Style.RESET_ALL}" - ) - for o in objects: - cli_utils.handle_permissions_check_command_with_actions( - object=o, permissions=permissions, table=all_unsecured_actions_table - ) - print( - tabulate( - all_unsecured_actions_table, - headers=[ - "NAME", - "TYPE", - "UNSECURED ACTIONS", - ], - tablefmt="plain", - ) - ) - - -@feast_permissions_cmd.command(name="list-roles") -@click.option( - "--verbose", - "-v", - is_flag=True, - help="Print the resources and actions permitted to each configured role", -) -@click.pass_context -def feast_permissions_list_roles_command(ctx: click.Context, verbose: bool): - """ - List all the configured roles - """ - from tabulate import tabulate - - table: list[Any] = [] - store = create_feature_store(ctx) - permissions = store.list_permissions() - if not verbose: - cli_utils.handler_list_all_permissions_roles( - permissions=permissions, table=table - ) - print( - tabulate( - table, - headers=[ - "ROLE NAME", - ], - tablefmt="grid", - ) - ) - else: - objects = cli_utils.fetch_all_feast_objects( - store=store, - ) - cli_utils.handler_list_all_permissions_roles_verbose( - objects=objects, permissions=permissions, table=table - ) - print( - tabulate( - table, - headers=[ - "ROLE NAME", - "RESOURCE NAME", - "RESOURCE TYPE", - "PERMITTED ACTIONS", - ], - tablefmt="plain", - ) - ) - - -if __name__ == "__main__": - cli() diff --git a/sdk/python/feast/cli/__init__.py b/sdk/python/feast/cli/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sdk/python/feast/cli/cli.py b/sdk/python/feast/cli/cli.py new file mode 100644 index 00000000000..41dcb104eeb --- /dev/null +++ b/sdk/python/feast/cli/cli.py @@ -0,0 +1,464 @@ +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +import logging +from datetime import datetime +from importlib.metadata import version as importlib_version +from pathlib import Path +from typing import List, Optional + +import click +import yaml +from colorama import Fore, Style +from dateutil import parser +from pygments import formatters, highlight, lexers + +from feast import utils +from feast.cli.data_sources import data_sources_cmd +from feast.cli.entities import entities_cmd +from feast.cli.feature_services import feature_services_cmd +from feast.cli.feature_views import feature_views_cmd +from feast.cli.features import ( + features_cmd, + get_historical_features, + get_online_features, +) +from feast.cli.on_demand_feature_views import on_demand_feature_views_cmd +from feast.cli.permissions import feast_permissions_cmd +from feast.cli.projects import projects_cmd +from feast.cli.saved_datasets import saved_datasets_cmd +from feast.cli.serve import ( + serve_command, + serve_offline_command, + serve_registry_command, + serve_transformations_command, +) +from feast.cli.stream_feature_views import stream_feature_views_cmd +from feast.cli.ui import ui +from feast.cli.validation_references import validation_references_cmd +from feast.errors import FeastProviderLoginError +from feast.repo_config import load_repo_config +from feast.repo_operations import ( + apply_total, + cli_check_repo, + create_feature_store, + generate_project_name, + init_repo, + plan, + registry_dump, + teardown, +) +from feast.utils import maybe_local_tz + +_logger = logging.getLogger(__name__) + + +class NoOptionDefaultFormat(click.Command): + def format_options(self, ctx: click.Context, formatter: click.HelpFormatter): + """Writes all the options into the formatter if they exist.""" + opts = [] + for param in self.get_params(ctx): + rv = param.get_help_record(ctx) + if rv is not None: + opts.append(rv) + if opts: + with formatter.section("Options(No current command options)"): + formatter.write_dl(opts) + + +@click.group() +@click.option( + "--chdir", + "-c", + help="Switch to a different feature repository directory before executing the given subcommand.", +) +@click.option( + "--log-level", + default="warning", + help="The logging level. One of DEBUG, INFO, WARNING, ERROR, and CRITICAL (case-insensitive).", +) +@click.option( + "--feature-store-yaml", + "-f", + help="Override the directory where the CLI should look for the feature_store.yaml file.", +) +@click.pass_context +def cli( + ctx: click.Context, + chdir: Optional[str], + log_level: str, + feature_store_yaml: Optional[str], +): + """ + Feast CLI + + For more information, see our public docs at https://docs.feast.dev/ + """ + ctx.ensure_object(dict) + ctx.obj["CHDIR"] = Path.cwd() if chdir is None else Path(chdir).absolute() + ctx.obj["FS_YAML_FILE"] = ( + Path(feature_store_yaml).absolute() + if feature_store_yaml + else utils.get_default_yaml_file_path(ctx.obj["CHDIR"]) + ) + try: + level = getattr(logging, log_level.upper()) + logging.basicConfig( + format="%(asctime)s %(name)s %(levelname)s: %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", + level=level, + ) + # Override the logging level for already created loggers (due to loggers being created at the import time) + # Note, that format & datefmt does not need to be set, because by default child loggers don't override them + + # Also note, that mypy complains that logging.root doesn't have "manager" because of the way it's written. + # So we have to put a type ignore hint for mypy. + for logger_name in logging.root.manager.loggerDict: # type: ignore + if "feast" in logger_name: + logger = logging.getLogger(logger_name) + logger.setLevel(level) + except Exception as e: + raise e + pass + + +@cli.command() +def version(): + """ + Display Feast SDK version + """ + print(f'Feast SDK Version: "{importlib_version("feast")}"') + + +@cli.command() +@click.pass_context +def configuration(ctx: click.Context): + """ + Display Feast configuration + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) + if repo_config: + config_dict = repo_config.model_dump(by_alias=True, exclude_unset=True) + config_dict.pop("repo_path", None) + print(yaml.dump(config_dict, default_flow_style=False, sort_keys=False)) + else: + print("No configuration found.") + + +@cli.command() +@click.pass_context +def endpoint(ctx: click.Context): + """ + Display feature server endpoints + """ + store = create_feature_store(ctx) + endpoint = store.get_feature_server_endpoint() + if endpoint is not None: + _logger.info( + f"Feature server endpoint: {Style.BRIGHT + Fore.GREEN}{endpoint}{Style.RESET_ALL}" + ) + else: + _logger.info("There is no active feature server.") + + +@cli.command("plan", cls=NoOptionDefaultFormat) +@click.option( + "--skip-source-validation", + is_flag=True, + help="Don't validate the data sources by checking for that the tables exist.", +) +@click.pass_context +def plan_command(ctx: click.Context, skip_source_validation: bool): + """ + Create or update a feature store deployment + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) + try: + plan(repo_config, repo, skip_source_validation) + except FeastProviderLoginError as e: + print(str(e)) + + +@cli.command("apply", cls=NoOptionDefaultFormat) +@click.option( + "--skip-source-validation", + is_flag=True, + help="Don't validate the data sources by checking for that the tables exist.", +) +@click.pass_context +def apply_total_command(ctx: click.Context, skip_source_validation: bool): + """ + Create or update a feature store deployment + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + + repo_config = load_repo_config(repo, fs_yaml_file) + try: + apply_total(repo_config, repo, skip_source_validation) + except FeastProviderLoginError as e: + print(str(e)) + + +@cli.command("teardown", cls=NoOptionDefaultFormat) +@click.pass_context +def teardown_command(ctx: click.Context): + """ + Tear down deployed feature store infrastructure + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) + + teardown(repo_config, repo) + + +@cli.command("registry-dump") +@click.pass_context +def registry_dump_command(ctx: click.Context): + """ + Print contents of the metadata registry + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) + + click.echo(registry_dump(repo_config, repo_path=repo)) + + +@cli.command("materialize") +@click.argument("start_ts") +@click.argument("end_ts") +@click.option( + "--views", + "-v", + help="Feature views to materialize", + multiple=True, +) +@click.pass_context +def materialize_command( + ctx: click.Context, start_ts: str, end_ts: str, views: List[str] +): + """ + Run a (non-incremental) materialization job to ingest data into the online store. Feast + will read all data between START_TS and END_TS from the offline store and write it to the + online store. If you don't specify feature view names using --views, all registered Feature + Views will be materialized. + + START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' + """ + store = create_feature_store(ctx) + + store.materialize( + feature_views=None if not views else views, + start_date=utils.make_tzaware(parser.parse(start_ts)), + end_date=utils.make_tzaware(parser.parse(end_ts)), + ) + + +@cli.command("materialize-incremental") +@click.argument("end_ts") +@click.option( + "--views", + "-v", + help="Feature views to incrementally materialize", + multiple=True, +) +@click.pass_context +def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List[str]): + """ + Run an incremental materialization job to ingest new data into the online store. Feast will read + all data from the previously ingested point to END_TS from the offline store and write it to the + online store. If you don't specify feature view names using --views, all registered Feature + Views will be incrementally materialized. + + END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' + """ + store = create_feature_store(ctx) + store.materialize_incremental( + feature_views=None if not views else views, + end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)), + ) + + +@cli.command("init") +@click.argument("PROJECT_DIRECTORY", required=False) +@click.option( + "--minimal", "-m", is_flag=True, help="Create an empty project repository" +) +@click.option( + "--template", + "-t", + type=click.Choice( + [ + "local", + "gcp", + "aws", + "snowflake", + "spark", + "postgres", + "hbase", + "cassandra", + "hazelcast", + "ikv", + "couchbase", + "milvus", + ], + case_sensitive=False, + ), + help="Specify a template for the created project", + default="local", +) +def init_command(project_directory, minimal: bool, template: str): + """Create a new Feast repository""" + if not project_directory: + project_directory = generate_project_name() + + if minimal: + template = "minimal" + + init_repo(project_directory, template) + + +@cli.command("listen") +@click.option( + "--address", + "-a", + type=click.STRING, + default="localhost:50051", + show_default=True, + help="Address of the gRPC server", +) +@click.option( + "--max_workers", + "-w", + type=click.INT, + default=10, + show_default=False, + help="The maximum number of threads that can be used to execute the gRPC calls", +) +@click.option( + "--registry_ttl_sec", + "-r", + help="Number of seconds after which the registry is refreshed", + type=click.INT, + default=5, + show_default=True, +) +@click.pass_context +def listen_command( + ctx: click.Context, + address: str, + max_workers: int, + registry_ttl_sec: int, +): + """Start a gRPC feature server to ingest streaming features on given address""" + from feast.infra.contrib.grpc_server import get_grpc_server + + store = create_feature_store(ctx) + server = get_grpc_server(address, store, max_workers, registry_ttl_sec) + server.start() + server.wait_for_termination() + + +@cli.command("validate") +@click.option( + "--feature-service", + "-f", + help="Specify a feature service name", +) +@click.option( + "--reference", + "-r", + help="Specify a validation reference name", +) +@click.option( + "--no-profile-cache", + is_flag=True, + help="Do not store cached profile in registry", +) +@click.argument("start_ts") +@click.argument("end_ts") +@click.pass_context +def validate( + ctx: click.Context, + feature_service: str, + reference: str, + start_ts: str, + end_ts: str, + no_profile_cache, +): + """ + Perform validation of logged features (produced by a given feature service) against provided reference. + + START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' + """ + store = create_feature_store(ctx) + + _feature_service = store.get_feature_service(name=feature_service) + _reference = store.get_validation_reference(reference) + + result = store.validate_logged_features( + source=_feature_service, + reference=_reference, + start=maybe_local_tz(datetime.fromisoformat(start_ts)), + end=maybe_local_tz(datetime.fromisoformat(end_ts)), + throw_exception=False, + cache_profile=not no_profile_cache, + ) + + if not result: + print(f"{Style.BRIGHT + Fore.GREEN}Validation successful!{Style.RESET_ALL}") + return + + errors = [e.to_dict() for e in result.report.errors] + formatted_json = json.dumps(errors, indent=4) + colorful_json = highlight( + formatted_json, lexers.JsonLexer(), formatters.TerminalFormatter() + ) + print(f"{Style.BRIGHT + Fore.RED}Validation failed!{Style.RESET_ALL}") + print(colorful_json) + exit(1) + + +cli.add_command(data_sources_cmd) +cli.add_command(entities_cmd) +cli.add_command(feature_services_cmd) +cli.add_command(feature_views_cmd) +cli.add_command(features_cmd) +cli.add_command(get_historical_features) +cli.add_command(get_online_features) +cli.add_command(on_demand_feature_views_cmd) +cli.add_command(feast_permissions_cmd) +cli.add_command(projects_cmd) +cli.add_command(saved_datasets_cmd) +cli.add_command(stream_feature_views_cmd) +cli.add_command(validation_references_cmd) +cli.add_command(ui) +cli.add_command(serve_command) +cli.add_command(serve_offline_command) +cli.add_command(serve_registry_command) +cli.add_command(serve_transformations_command) + +if __name__ == "__main__": + cli() diff --git a/sdk/python/feast/cli/cli_options.py b/sdk/python/feast/cli/cli_options.py new file mode 100644 index 00000000000..8f4550ee2d2 --- /dev/null +++ b/sdk/python/feast/cli/cli_options.py @@ -0,0 +1,8 @@ +import click + +tagsOption = click.option( + "--tags", + help="Filter by tags (e.g. --tags 'key:value' --tags 'key:value, key:value, ...'). Items return when ALL tags match.", + default=[""], + multiple=True, +) diff --git a/sdk/python/feast/cli_utils.py b/sdk/python/feast/cli/cli_utils.py similarity index 100% rename from sdk/python/feast/cli_utils.py rename to sdk/python/feast/cli/cli_utils.py diff --git a/sdk/python/feast/cli/data_sources.py b/sdk/python/feast/cli/data_sources.py new file mode 100644 index 00000000000..44fdf78d89b --- /dev/null +++ b/sdk/python/feast/cli/data_sources.py @@ -0,0 +1,55 @@ +import click +import yaml + +from feast import utils +from feast.cli.cli_options import tagsOption +from feast.errors import FeastObjectNotFoundException +from feast.repo_operations import create_feature_store + + +@click.group(name="data-sources") +def data_sources_cmd(): + """ + Access data sources + """ + pass + + +@data_sources_cmd.command("describe") +@click.argument("name", type=click.STRING) +@click.pass_context +def data_source_describe(ctx: click.Context, name: str): + """ + Describe a data source + """ + store = create_feature_store(ctx) + + try: + data_source = store.get_data_source(name) + except FeastObjectNotFoundException as e: + print(e) + exit(1) + + print( + yaml.dump( + yaml.safe_load(str(data_source)), default_flow_style=False, sort_keys=False + ) + ) + + +@data_sources_cmd.command(name="list") +@tagsOption +@click.pass_context +def data_source_list(ctx: click.Context, tags: list[str]): + """ + List all data sources + """ + store = create_feature_store(ctx) + table = [] + tags_filter = utils.tags_list_to_dict(tags) + for datasource in store.list_data_sources(tags=tags_filter): + table.append([datasource.name, datasource.__class__]) + + from tabulate import tabulate + + print(tabulate(table, headers=["NAME", "CLASS"], tablefmt="plain")) diff --git a/sdk/python/feast/cli/entities.py b/sdk/python/feast/cli/entities.py new file mode 100644 index 00000000000..7d1f25aaa9f --- /dev/null +++ b/sdk/python/feast/cli/entities.py @@ -0,0 +1,55 @@ +import click +import yaml + +from feast import utils +from feast.cli.cli_options import tagsOption +from feast.errors import FeastObjectNotFoundException +from feast.repo_operations import create_feature_store + + +@click.group(name="entities") +def entities_cmd(): + """ + Access entities + """ + pass + + +@entities_cmd.command("describe") +@click.argument("name", type=click.STRING) +@click.pass_context +def entity_describe(ctx: click.Context, name: str): + """ + Describe an entity + """ + store = create_feature_store(ctx) + + try: + entity = store.get_entity(name) + except FeastObjectNotFoundException as e: + print(e) + exit(1) + + print( + yaml.dump( + yaml.safe_load(str(entity)), default_flow_style=False, sort_keys=False + ) + ) + + +@entities_cmd.command(name="list") +@tagsOption +@click.pass_context +def entity_list(ctx: click.Context, tags: list[str]): + """ + List all entities + """ + store = create_feature_store(ctx) + table = [] + tags_filter = utils.tags_list_to_dict(tags) + for entity in store.list_entities(tags=tags_filter): + table.append([entity.name, entity.description, entity.value_type]) + + from tabulate import tabulate + + print(tabulate(table, headers=["NAME", "DESCRIPTION", "TYPE"], tablefmt="plain")) diff --git a/sdk/python/feast/cli/feature_services.py b/sdk/python/feast/cli/feature_services.py new file mode 100644 index 00000000000..070eecbda0c --- /dev/null +++ b/sdk/python/feast/cli/feature_services.py @@ -0,0 +1,62 @@ +import click +import yaml + +from feast import utils +from feast.cli.cli_options import tagsOption +from feast.errors import FeastObjectNotFoundException +from feast.repo_operations import create_feature_store + + +@click.group(name="feature-services") +def feature_services_cmd(): + """ + Access feature services + """ + pass + + +@feature_services_cmd.command("describe") +@click.argument("name", type=click.STRING) +@click.pass_context +def feature_service_describe(ctx: click.Context, name: str): + """ + Describe a feature service + """ + store = create_feature_store(ctx) + + try: + feature_service = store.get_feature_service(name) + except FeastObjectNotFoundException as e: + print(e) + exit(1) + + print( + yaml.dump( + yaml.safe_load(str(feature_service)), + default_flow_style=False, + sort_keys=False, + ) + ) + + +@feature_services_cmd.command(name="list") +@tagsOption +@click.pass_context +def feature_service_list(ctx: click.Context, tags: list[str]): + """ + List all feature services + """ + store = create_feature_store(ctx) + feature_services = [] + tags_filter = utils.tags_list_to_dict(tags) + for feature_service in store.list_feature_services(tags=tags_filter): + feature_names = [] + for projection in feature_service.feature_view_projections: + feature_names.extend( + [f"{projection.name}:{feature.name}" for feature in projection.features] + ) + feature_services.append([feature_service.name, ", ".join(feature_names)]) + + from tabulate import tabulate + + print(tabulate(feature_services, headers=["NAME", "FEATURES"], tablefmt="plain")) diff --git a/sdk/python/feast/cli/feature_views.py b/sdk/python/feast/cli/feature_views.py new file mode 100644 index 00000000000..a1a29ac9f27 --- /dev/null +++ b/sdk/python/feast/cli/feature_views.py @@ -0,0 +1,72 @@ +import click +import yaml + +from feast import utils +from feast.cli.cli_options import tagsOption +from feast.errors import FeastObjectNotFoundException +from feast.feature_view import FeatureView +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.repo_operations import create_feature_store + + +@click.group(name="feature-views") +def feature_views_cmd(): + """ + Access feature views + """ + pass + + +@feature_views_cmd.command("describe") +@click.argument("name", type=click.STRING) +@click.pass_context +def feature_view_describe(ctx: click.Context, name: str): + """ + Describe a feature view + """ + store = create_feature_store(ctx) + + try: + feature_view = store.get_feature_view(name) + except FeastObjectNotFoundException as e: + print(e) + exit(1) + + print( + yaml.dump( + yaml.safe_load(str(feature_view)), default_flow_style=False, sort_keys=False + ) + ) + + +@feature_views_cmd.command(name="list") +@tagsOption +@click.pass_context +def feature_view_list(ctx: click.Context, tags: list[str]): + """ + List all feature views + """ + store = create_feature_store(ctx) + table = [] + tags_filter = utils.tags_list_to_dict(tags) + for feature_view in [ + *store.list_batch_feature_views(tags=tags_filter), + *store.list_on_demand_feature_views(tags=tags_filter), + ]: + entities = set() + if isinstance(feature_view, FeatureView): + entities.update(feature_view.entities) + elif isinstance(feature_view, OnDemandFeatureView): + for backing_fv in feature_view.source_feature_view_projections.values(): + entities.update(store.get_feature_view(backing_fv.name).entities) + table.append( + [ + feature_view.name, + entities if len(entities) > 0 else "n/a", + type(feature_view).__name__, + ] + ) + + from tabulate import tabulate + + print(tabulate(table, headers=["NAME", "ENTITIES", "TYPE"], tablefmt="plain")) diff --git a/sdk/python/feast/cli/features.py b/sdk/python/feast/cli/features.py new file mode 100644 index 00000000000..6228fbacb6a --- /dev/null +++ b/sdk/python/feast/cli/features.py @@ -0,0 +1,176 @@ +import json +from typing import List + +import click +import pandas as pd + +from feast.repo_operations import create_feature_store + + +@click.group(name="features") +def features_cmd(): + """ + Access features + """ + pass + + +@features_cmd.command(name="list") +@click.option( + "--output", + type=click.Choice(["table", "json"], case_sensitive=False), + default="table", + show_default=True, + help="Output format", +) +@click.pass_context +def features_list(ctx: click.Context, output: str): + """ + List all features + """ + store = create_feature_store(ctx) + feature_views = [ + *store.list_batch_feature_views(), + *store.list_on_demand_feature_views(), + *store.list_stream_feature_views(), + ] + feature_list = [] + for fv in feature_views: + for feature in fv.features: + feature_list.append([feature.name, fv.name, str(feature.dtype)]) + + if output == "json": + json_output = [ + {"feature_name": fn, "feature_view": fv, "dtype": dt} + for fv, fn, dt in feature_list + ] + click.echo(json.dumps(json_output, indent=4)) + else: + from tabulate import tabulate + + click.echo( + tabulate( + feature_list, + headers=["Feature", "Feature View", "Data Type"], + tablefmt="plain", + ) + ) + + +@features_cmd.command("describe") +@click.argument("feature_name", type=str) +@click.pass_context +def describe_feature(ctx: click.Context, feature_name: str): + """ + Describe a specific feature by name + """ + store = create_feature_store(ctx) + feature_views = [ + *store.list_batch_feature_views(), + *store.list_on_demand_feature_views(), + *store.list_stream_feature_views(), + ] + + feature_details = [] + for fv in feature_views: + for feature in fv.features: + if feature.name == feature_name: + feature_details.append( + { + "Feature Name": feature.name, + "Feature View": fv.name, + "Data Type": str(feature.dtype), + "Description": getattr(feature, "description", "N/A"), + "Online Store": getattr(fv, "online", "N/A"), + "Source": json.loads(str(getattr(fv, "batch_source", "N/A"))), + } + ) + if not feature_details: + click.echo(f"Feature '{feature_name}' not found in any feature view.") + return + + click.echo(json.dumps(feature_details, indent=4)) + + +@click.command("get-online-features") +@click.option( + "--entities", + "-e", + type=str, + multiple=True, + required=True, + help="Entity key-value pairs (e.g., driver_id=1001)", +) +@click.option( + "--features", + "-f", + multiple=True, + required=True, + help="Features to retrieve. (e.g.,feature-view:feature-name) ex: driver_hourly_stats:conv_rate", +) +@click.pass_context +def get_online_features(ctx: click.Context, entities: List[str], features: List[str]): + """ + Fetch online feature values for a given entity ID + """ + store = create_feature_store(ctx) + entity_dict: dict[str, List[str]] = {} + for entity in entities: + try: + key, value = entity.split("=") + if key not in entity_dict: + entity_dict[key] = [] + entity_dict[key].append(value) + except ValueError: + click.echo(f"Invalid entity format: {entity}. Use key=value format.") + return + entity_rows = [ + dict(zip(entity_dict.keys(), values)) for values in zip(*entity_dict.values()) + ] + feature_vector = store.get_online_features( + features=list(features), + entity_rows=entity_rows, + ).to_dict() + + click.echo(json.dumps(feature_vector, indent=4)) + + +@click.command(name="get-historical-features") +@click.option( + "--dataframe", + "-d", + type=str, + required=True, + help='JSON string containing entities and timestamps. Example: \'[{"event_timestamp": "2025-03-29T12:00:00", "driver_id": 1001}]\'', +) +@click.option( + "--features", + "-f", + multiple=True, + required=True, + help="Features to retrieve. feature-view:feature-name ex: driver_hourly_stats:conv_rate", +) +@click.pass_context +def get_historical_features(ctx: click.Context, dataframe: str, features: List[str]): + """ + Fetch historical feature values for a given entity ID + """ + store = create_feature_store(ctx) + try: + entity_list = json.loads(dataframe) + if not isinstance(entity_list, list): + raise ValueError("Entities must be a list of dictionaries.") + + entity_df = pd.DataFrame(entity_list) + entity_df["event_timestamp"] = pd.to_datetime(entity_df["event_timestamp"]) + + except Exception as e: + click.echo(f"Error parsing entities JSON: {e}", err=True) + return + + feature_vector = store.get_historical_features( + entity_df=entity_df, + features=list(features), + ).to_df() + + click.echo(feature_vector.to_json(orient="records", indent=4)) diff --git a/sdk/python/feast/cli/on_demand_feature_views.py b/sdk/python/feast/cli/on_demand_feature_views.py new file mode 100644 index 00000000000..a7ee4093318 --- /dev/null +++ b/sdk/python/feast/cli/on_demand_feature_views.py @@ -0,0 +1,57 @@ +import click +import yaml + +from feast import utils +from feast.cli.cli_options import tagsOption +from feast.errors import FeastObjectNotFoundException +from feast.repo_operations import create_feature_store + + +@click.group(name="on-demand-feature-views") +def on_demand_feature_views_cmd(): + """ + [Experimental] Access on demand feature views + """ + pass + + +@on_demand_feature_views_cmd.command("describe") +@click.argument("name", type=click.STRING) +@click.pass_context +def on_demand_feature_view_describe(ctx: click.Context, name: str): + """ + [Experimental] Describe an on demand feature view + """ + store = create_feature_store(ctx) + + try: + on_demand_feature_view = store.get_on_demand_feature_view(name) + except FeastObjectNotFoundException as e: + print(e) + exit(1) + + print( + yaml.dump( + yaml.safe_load(str(on_demand_feature_view)), + default_flow_style=False, + sort_keys=False, + ) + ) + + +@on_demand_feature_views_cmd.command(name="list") +@tagsOption +@click.pass_context +def on_demand_feature_view_list(ctx: click.Context, tags: list[str]): + """ + [Experimental] List all on demand feature views + """ + store = create_feature_store(ctx) + table = [] + tags_filter = utils.tags_list_to_dict(tags) + for on_demand_feature_view in store.list_on_demand_feature_views(tags=tags_filter): + table.append([on_demand_feature_view.name]) + + from tabulate import tabulate + + print(tabulate(table, headers=["NAME"], tablefmt="plain")) diff --git a/sdk/python/feast/cli/permissions.py b/sdk/python/feast/cli/permissions.py new file mode 100644 index 00000000000..c75cbbe55a1 --- /dev/null +++ b/sdk/python/feast/cli/permissions.py @@ -0,0 +1,271 @@ +from typing import Any + +import click +import yaml +from bigtree import Node +from colorama import Fore, Style + +from feast import ( + BatchFeatureView, + Entity, + FeatureService, + FeatureView, + OnDemandFeatureView, + StreamFeatureView, + utils, +) +from feast.cli import cli_utils +from feast.cli.cli_options import tagsOption +from feast.data_source import DataSource +from feast.errors import FeastObjectNotFoundException +from feast.permissions.policy import RoleBasedPolicy +from feast.repo_operations import create_feature_store +from feast.saved_dataset import SavedDataset, ValidationReference + + +@click.group(name="permissions") +def feast_permissions_cmd(): + """ + Access permissions + """ + pass + + +@feast_permissions_cmd.command(name="list") +@click.option( + "--verbose", + "-v", + is_flag=True, + help="Print the resources matching each configured permission", +) +@tagsOption +@click.pass_context +def feast_permissions_list_command(ctx: click.Context, verbose: bool, tags: list[str]): + from tabulate import tabulate + + table: list[Any] = [] + tags_filter = utils.tags_list_to_dict(tags) + + store = create_feature_store(ctx) + + permissions = store.list_permissions(tags=tags_filter) + + root_node = Node("permissions") + roles: set[str] = set() + + for p in permissions: + policy = p.policy + if not verbose: + cli_utils.handle_not_verbose_permissions_command(p, policy, table) + else: + if isinstance(policy, RoleBasedPolicy) and len(policy.get_roles()) > 0: + roles = set(policy.get_roles()) + permission_node = Node( + p.name + " " + str(list(roles)), parent=root_node + ) + else: + permission_node = Node(p.name, parent=root_node) + + for feast_type in p.types: + if feast_type in [ + FeatureView, + OnDemandFeatureView, + BatchFeatureView, + StreamFeatureView, + ]: + cli_utils.handle_fv_verbose_permissions_command( + feast_type, # type: ignore[arg-type] + p, + permission_node, + store, + tags_filter, + ) + elif feast_type == Entity: + cli_utils.handle_entity_verbose_permissions_command( + feast_type, # type: ignore[arg-type] + p, + permission_node, + store, + tags_filter, + ) + elif feast_type == FeatureService: + cli_utils.handle_fs_verbose_permissions_command( + feast_type, # type: ignore[arg-type] + p, + permission_node, + store, + tags_filter, + ) + elif feast_type == DataSource: + cli_utils.handle_ds_verbose_permissions_command( + feast_type, # type: ignore[arg-type] + p, + permission_node, + store, + tags_filter, + ) + elif feast_type == ValidationReference: + cli_utils.handle_vr_verbose_permissions_command( + feast_type, # type: ignore[arg-type] + p, + permission_node, + store, + tags_filter, + ) + elif feast_type == SavedDataset: + cli_utils.handle_sd_verbose_permissions_command( + feast_type, # type: ignore[arg-type] + p, + permission_node, + store, + tags_filter, + ) + + if not verbose: + print( + tabulate( + table, + headers=[ + "NAME", + "TYPES", + "NAME_PATTERNS", + "ACTIONS", + "ROLES", + "REQUIRED_TAGS", + ], + tablefmt="plain", + ) + ) + else: + cli_utils.print_permission_verbose_example() + + print("Permissions:") + print("") + root_node.show() + + +@feast_permissions_cmd.command("describe") +@click.argument("name", type=click.STRING) +@click.pass_context +def permission_describe(ctx: click.Context, name: str): + """ + Describe a permission + """ + store = create_feature_store(ctx) + + try: + permission = store.get_permission(name) + except FeastObjectNotFoundException as e: + print(e) + exit(1) + + print( + yaml.dump( + yaml.safe_load(str(permission)), default_flow_style=False, sort_keys=False + ) + ) + + +@feast_permissions_cmd.command(name="check") +@click.pass_context +def feast_permissions_check_command(ctx: click.Context): + """ + Validate the permissions configuration + """ + from tabulate import tabulate + + all_unsecured_table: list[Any] = [] + store = create_feature_store(ctx) + permissions = store.list_permissions() + objects = cli_utils.fetch_all_feast_objects( + store=store, + ) + + print( + f"{Style.BRIGHT + Fore.RED}The following resources are not secured by any permission configuration:{Style.RESET_ALL}" + ) + for o in objects: + cli_utils.handle_permissions_check_command( + object=o, permissions=permissions, table=all_unsecured_table + ) + print( + tabulate( + all_unsecured_table, + headers=[ + "NAME", + "TYPE", + ], + tablefmt="plain", + ) + ) + + all_unsecured_actions_table: list[Any] = [] + print( + f"{Style.BRIGHT + Fore.RED}The following actions are not secured by any permission configuration (Note: this might not be a security concern, depending on the used APIs):{Style.RESET_ALL}" + ) + for o in objects: + cli_utils.handle_permissions_check_command_with_actions( + object=o, permissions=permissions, table=all_unsecured_actions_table + ) + print( + tabulate( + all_unsecured_actions_table, + headers=[ + "NAME", + "TYPE", + "UNSECURED ACTIONS", + ], + tablefmt="plain", + ) + ) + + +@feast_permissions_cmd.command(name="list-roles") +@click.option( + "--verbose", + "-v", + is_flag=True, + help="Print the resources and actions permitted to each configured role", +) +@click.pass_context +def feast_permissions_list_roles_command(ctx: click.Context, verbose: bool): + """ + List all the configured roles + """ + from tabulate import tabulate + + table: list[Any] = [] + store = create_feature_store(ctx) + permissions = store.list_permissions() + if not verbose: + cli_utils.handler_list_all_permissions_roles( + permissions=permissions, table=table + ) + print( + tabulate( + table, + headers=[ + "ROLE NAME", + ], + tablefmt="grid", + ) + ) + else: + objects = cli_utils.fetch_all_feast_objects( + store=store, + ) + cli_utils.handler_list_all_permissions_roles_verbose( + objects=objects, permissions=permissions, table=table + ) + print( + tabulate( + table, + headers=[ + "ROLE NAME", + "RESOURCE NAME", + "RESOURCE TYPE", + "PERMITTED ACTIONS", + ], + tablefmt="plain", + ) + ) diff --git a/sdk/python/feast/cli/projects.py b/sdk/python/feast/cli/projects.py new file mode 100644 index 00000000000..ea8d5b573a6 --- /dev/null +++ b/sdk/python/feast/cli/projects.py @@ -0,0 +1,80 @@ +import click +import yaml + +from feast import utils +from feast.cli.cli_options import tagsOption +from feast.errors import FeastObjectNotFoundException +from feast.repo_operations import create_feature_store + + +@click.group(name="projects") +def projects_cmd(): + """ + Access projects + """ + pass + + +@projects_cmd.command("describe") +@click.argument("name", type=click.STRING) +@click.pass_context +def project_describe(ctx: click.Context, name: str): + """ + Describe a project + """ + store = create_feature_store(ctx) + + try: + project = store.get_project(name) + except FeastObjectNotFoundException as e: + print(e) + exit(1) + + print( + yaml.dump( + yaml.safe_load(str(project)), default_flow_style=False, sort_keys=False + ) + ) + + +@projects_cmd.command("current_project") +@click.pass_context +def project_current(ctx: click.Context): + """ + Returns the current project configured with FeatureStore object + """ + store = create_feature_store(ctx) + + try: + project = store.get_project(name=None) + except FeastObjectNotFoundException as e: + print(e) + exit(1) + + print( + yaml.dump( + yaml.safe_load(str(project)), default_flow_style=False, sort_keys=False + ) + ) + + +@projects_cmd.command(name="list") +@tagsOption +@click.pass_context +def project_list(ctx: click.Context, tags: list[str]): + """ + List all projects + """ + store = create_feature_store(ctx) + table = [] + tags_filter = utils.tags_list_to_dict(tags) + for project in store.list_projects(tags=tags_filter): + table.append([project.name, project.description, project.tags, project.owner]) + + from tabulate import tabulate + + print( + tabulate( + table, headers=["NAME", "DESCRIPTION", "TAGS", "OWNER"], tablefmt="plain" + ) + ) diff --git a/sdk/python/feast/cli/saved_datasets.py b/sdk/python/feast/cli/saved_datasets.py new file mode 100644 index 00000000000..69c31cf3340 --- /dev/null +++ b/sdk/python/feast/cli/saved_datasets.py @@ -0,0 +1,57 @@ +import click +import yaml + +from feast import utils +from feast.cli.cli_options import tagsOption +from feast.errors import FeastObjectNotFoundException +from feast.repo_operations import create_feature_store + + +@click.group(name="saved-datasets") +def saved_datasets_cmd(): + """ + [Experimental] Access saved datasets + """ + pass + + +@saved_datasets_cmd.command("describe") +@click.argument("name", type=click.STRING) +@click.pass_context +def saved_datasets_describe(ctx: click.Context, name: str): + """ + [Experimental] Describe a saved dataset + """ + store = create_feature_store(ctx) + + try: + saved_dataset = store.get_saved_dataset(name) + except FeastObjectNotFoundException as e: + print(e) + exit(1) + + print( + yaml.dump( + yaml.safe_load(str(saved_dataset)), + default_flow_style=False, + sort_keys=False, + ) + ) + + +@saved_datasets_cmd.command(name="list") +@tagsOption +@click.pass_context +def saved_datasets_list(ctx: click.Context, tags: list[str]): + """ + [Experimental] List all saved datasets + """ + store = create_feature_store(ctx) + table = [] + tags_filter = utils.tags_list_to_dict(tags) + for saved_dataset in store.list_saved_datasets(tags=tags_filter): + table.append([saved_dataset.name]) + + from tabulate import tabulate + + print(tabulate(table, headers=["NAME"], tablefmt="plain")) diff --git a/sdk/python/feast/cli/serve.py b/sdk/python/feast/cli/serve.py new file mode 100644 index 00000000000..c069d799ef1 --- /dev/null +++ b/sdk/python/feast/cli/serve.py @@ -0,0 +1,234 @@ +import click + +from feast.constants import ( + DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT, + DEFAULT_OFFLINE_SERVER_PORT, + DEFAULT_REGISTRY_SERVER_PORT, +) +from feast.repo_operations import create_feature_store + + +@click.command("serve") +@click.option( + "--host", + "-h", + type=click.STRING, + default="127.0.0.1", + show_default=True, + help="Specify a host for the server", +) +@click.option( + "--port", + "-p", + type=click.INT, + default=6566, + show_default=True, + help="Specify a port for the server", +) +@click.option( + "--type", + "-t", + "type_", + type=click.STRING, + default="http", + show_default=True, + help="Specify a server type: 'http' or 'grpc'", +) +@click.option( + "--no-access-log", + is_flag=True, + show_default=True, + help="Disable the Uvicorn access log", +) +@click.option( + "--workers", + "-w", + type=click.INT, + default=1, + show_default=True, + help="Number of worker", +) +@click.option( + "--keep-alive-timeout", + type=click.INT, + default=5, + show_default=True, + help="Timeout for keep alive", +) +@click.option( + "--registry_ttl_sec", + "-r", + help="Number of seconds after which the registry is refreshed", + type=click.INT, + default=5, + show_default=True, +) +@click.option( + "--key", + "-k", + "tls_key_path", + type=click.STRING, + default="", + show_default=False, + help="path to TLS certificate private key. You need to pass --cert as well to start server in TLS mode", +) +@click.option( + "--cert", + "-c", + "tls_cert_path", + type=click.STRING, + default="", + show_default=False, + help="path to TLS certificate public key. You need to pass --key as well to start server in TLS mode", +) +@click.option( + "--metrics", + "-m", + is_flag=True, + show_default=True, + help="Enable the Metrics Server", +) +@click.pass_context +def serve_command( + ctx: click.Context, + host: str, + port: int, + type_: str, + no_access_log: bool, + workers: int, + metrics: bool, + keep_alive_timeout: int, + tls_key_path: str, + tls_cert_path: str, + registry_ttl_sec: int = 5, +): + """Start a feature server locally on a given port.""" + if (tls_key_path and not tls_cert_path) or (not tls_key_path and tls_cert_path): + raise click.BadParameter( + "Please pass --cert and --key args to start the feature server in TLS mode." + ) + store = create_feature_store(ctx) + + store.serve( + host=host, + port=port, + type_=type_, + no_access_log=no_access_log, + workers=workers, + metrics=metrics, + keep_alive_timeout=keep_alive_timeout, + tls_key_path=tls_key_path, + tls_cert_path=tls_cert_path, + registry_ttl_sec=registry_ttl_sec, + ) + + +@click.command("serve_transformations") +@click.option( + "--port", + "-p", + type=click.INT, + default=DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT, + help="Specify a port for the server", +) +@click.pass_context +def serve_transformations_command(ctx: click.Context, port: int): + """[Experimental] Start a feature consumption server locally on a given port.""" + store = create_feature_store(ctx) + + store.serve_transformations(port) + + +@click.command("serve_registry") +@click.option( + "--port", + "-p", + type=click.INT, + default=DEFAULT_REGISTRY_SERVER_PORT, + help="Specify a port for the server", +) +@click.option( + "--key", + "-k", + "tls_key_path", + type=click.STRING, + default="", + show_default=False, + help="path to TLS certificate private key. You need to pass --cert as well to start server in TLS mode", +) +@click.option( + "--cert", + "-c", + "tls_cert_path", + type=click.STRING, + default="", + show_default=False, + help="path to TLS certificate public key. You need to pass --key as well to start server in TLS mode", +) +@click.pass_context +def serve_registry_command( + ctx: click.Context, + port: int, + tls_key_path: str, + tls_cert_path: str, +): + """Start a registry server locally on a given port.""" + if (tls_key_path and not tls_cert_path) or (not tls_key_path and tls_cert_path): + raise click.BadParameter( + "Please pass --cert and --key args to start the registry server in TLS mode." + ) + store = create_feature_store(ctx) + + store.serve_registry(port, tls_key_path, tls_cert_path) + + +@click.command("serve_offline") +@click.option( + "--host", + "-h", + type=click.STRING, + default="127.0.0.1", + show_default=True, + help="Specify a host for the server", +) +@click.option( + "--port", + "-p", + type=click.INT, + default=DEFAULT_OFFLINE_SERVER_PORT, + help="Specify a port for the server", +) +@click.option( + "--key", + "-k", + "tls_key_path", + type=click.STRING, + default="", + show_default=False, + help="path to TLS certificate private key. You need to pass --cert as well to start server in TLS mode", +) +@click.option( + "--cert", + "-c", + "tls_cert_path", + type=click.STRING, + default="", + show_default=False, + help="path to TLS certificate public key. You need to pass --key as well to start server in TLS mode", +) +@click.pass_context +def serve_offline_command( + ctx: click.Context, + host: str, + port: int, + tls_key_path: str, + tls_cert_path: str, +): + """Start a remote server locally on a given host, port.""" + if (tls_key_path and not tls_cert_path) or (not tls_key_path and tls_cert_path): + raise click.BadParameter( + "Please pass --cert and --key args to start the offline server in TLS mode." + ) + store = create_feature_store(ctx) + + store.serve_offline(host, port, tls_key_path, tls_cert_path) diff --git a/sdk/python/feast/cli/stream_feature_views.py b/sdk/python/feast/cli/stream_feature_views.py new file mode 100644 index 00000000000..16efd9ad2bd --- /dev/null +++ b/sdk/python/feast/cli/stream_feature_views.py @@ -0,0 +1,57 @@ +import click +import yaml + +from feast import utils +from feast.cli.cli_options import tagsOption +from feast.errors import FeastObjectNotFoundException +from feast.repo_operations import create_feature_store + + +@click.group(name="stream-feature-views") +def stream_feature_views_cmd(): + """ + [Experimental] Access stream feature views + """ + pass + + +@stream_feature_views_cmd.command("describe") +@click.argument("name", type=click.STRING) +@click.pass_context +def stream_feature_views_describe(ctx: click.Context, name: str): + """ + [Experimental] Describe a stream feature view + """ + store = create_feature_store(ctx) + + try: + stream_feature_view = store.get_stream_feature_view(name) + except FeastObjectNotFoundException as e: + print(e) + exit(1) + + print( + yaml.dump( + yaml.safe_load(str(stream_feature_view)), + default_flow_style=False, + sort_keys=False, + ) + ) + + +@stream_feature_views_cmd.command(name="list") +@tagsOption +@click.pass_context +def stream_feature_views_list(ctx: click.Context, tags: list[str]): + """ + [Experimental] List all stream feature views + """ + store = create_feature_store(ctx) + table = [] + tags_filter = utils.tags_list_to_dict(tags) + for stream_feature_view in store.list_stream_feature_views(tags=tags_filter): + table.append([stream_feature_view.name]) + + from tabulate import tabulate + + print(tabulate(table, headers=["NAME"], tablefmt="plain")) diff --git a/sdk/python/feast/cli/ui.py b/sdk/python/feast/cli/ui.py new file mode 100644 index 00000000000..9fd7b24b7cd --- /dev/null +++ b/sdk/python/feast/cli/ui.py @@ -0,0 +1,82 @@ +import click + +from feast.repo_operations import create_feature_store, registry_dump + + +@click.command() +@click.option( + "--host", + "-h", + type=click.STRING, + default="0.0.0.0", + show_default=True, + help="Specify a host for the server", +) +@click.option( + "--port", + "-p", + type=click.INT, + default=8888, + show_default=True, + help="Specify a port for the server", +) +@click.option( + "--registry_ttl_sec", + "-r", + help="Number of seconds after which the registry is refreshed", + type=click.INT, + default=5, + show_default=True, +) +@click.option( + "--root_path", + help="Provide root path to make the UI working behind proxy", + type=click.STRING, + default="", +) +@click.option( + "--key", + "-k", + "tls_key_path", + type=click.STRING, + default="", + show_default=False, + help="path to TLS(SSL) certificate private key. You need to pass --cert arg as well to start server in TLS mode", +) +@click.option( + "--cert", + "-c", + "tls_cert_path", + type=click.STRING, + default="", + show_default=False, + help="path to TLS(SSL) certificate public key. You need to pass --key arg as well to start server in TLS mode", +) +@click.pass_context +def ui( + ctx: click.Context, + host: str, + port: int, + registry_ttl_sec: int, + root_path: str = "", + tls_key_path: str = "", + tls_cert_path: str = "", +): + """ + Shows the Feast UI over the current directory + """ + if (tls_key_path and not tls_cert_path) or (not tls_key_path and tls_cert_path): + raise click.BadParameter( + "Please configure --key and --cert args to start the feature server in SSL mode." + ) + store = create_feature_store(ctx) + # Pass in the registry_dump method to get around a circular dependency + store.serve_ui( + host=host, + port=port, + get_registry_dump=registry_dump, + registry_ttl_sec=registry_ttl_sec, + root_path=root_path, + tls_key_path=tls_key_path, + tls_cert_path=tls_cert_path, + ) diff --git a/sdk/python/feast/cli/validation_references.py b/sdk/python/feast/cli/validation_references.py new file mode 100644 index 00000000000..09e0d68916b --- /dev/null +++ b/sdk/python/feast/cli/validation_references.py @@ -0,0 +1,57 @@ +import click +import yaml + +from feast import utils +from feast.cli.cli_options import tagsOption +from feast.errors import FeastObjectNotFoundException +from feast.repo_operations import create_feature_store + + +@click.group(name="validation-references") +def validation_references_cmd(): + """ + [Experimental] Access validation references + """ + pass + + +@validation_references_cmd.command("describe") +@click.argument("name", type=click.STRING) +@click.pass_context +def validation_references_describe(ctx: click.Context, name: str): + """ + [Experimental] Describe a validation reference + """ + store = create_feature_store(ctx) + + try: + validation_reference = store.get_validation_reference(name) + except FeastObjectNotFoundException as e: + print(e) + exit(1) + + print( + yaml.dump( + yaml.safe_load(str(validation_reference)), + default_flow_style=False, + sort_keys=False, + ) + ) + + +@validation_references_cmd.command(name="list") +@tagsOption +@click.pass_context +def validation_references_list(ctx: click.Context, tags: list[str]): + """ + [Experimental] List all validation references + """ + store = create_feature_store(ctx) + table = [] + tags_filter = utils.tags_list_to_dict(tags) + for validation_reference in store.list_validation_references(tags=tags_filter): + table.append([validation_reference.name]) + + from tabulate import tabulate + + print(tabulate(table, headers=["NAME"], tablefmt="plain")) diff --git a/sdk/python/tests/utils/cli_repo_creator.py b/sdk/python/tests/utils/cli_repo_creator.py index 46df563eafb..4b8f9aad04b 100644 --- a/sdk/python/tests/utils/cli_repo_creator.py +++ b/sdk/python/tests/utils/cli_repo_creator.py @@ -8,7 +8,7 @@ from textwrap import dedent from typing import List, Tuple -from feast import cli +from feast.cli import cli from feast.feature_store import FeatureStore diff --git a/setup.py b/setup.py index 503329aa6d8..81f47fc705a 100644 --- a/setup.py +++ b/setup.py @@ -338,7 +338,7 @@ "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.9", ], - entry_points={"console_scripts": ["feast=feast.cli:cli"]}, + entry_points={"console_scripts": ["feast=feast.cli.cli:cli"]}, use_scm_version=use_scm_version, setup_requires=[ "pybindgen==0.22.0", # TODO do we need this?