diff --git a/sdk/python/feast/cli/cli.py b/sdk/python/feast/cli/cli.py index 41dcb104eeb..1ef6ac7fbda 100644 --- a/sdk/python/feast/cli/cli.py +++ b/sdk/python/feast/cli/cli.py @@ -1,464 +1,518 @@ -# Copyright 2019 The Feast Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import json -import logging -from datetime import datetime -from importlib.metadata import version as importlib_version -from pathlib import Path -from typing import List, Optional - -import click -import yaml -from colorama import Fore, Style -from dateutil import parser -from pygments import formatters, highlight, lexers - -from feast import utils -from feast.cli.data_sources import data_sources_cmd -from feast.cli.entities import entities_cmd -from feast.cli.feature_services import feature_services_cmd -from feast.cli.feature_views import feature_views_cmd -from feast.cli.features import ( - features_cmd, - get_historical_features, - get_online_features, -) -from feast.cli.on_demand_feature_views import on_demand_feature_views_cmd -from feast.cli.permissions import feast_permissions_cmd -from feast.cli.projects import projects_cmd -from feast.cli.saved_datasets import saved_datasets_cmd -from feast.cli.serve import ( - serve_command, - serve_offline_command, - serve_registry_command, - serve_transformations_command, -) -from feast.cli.stream_feature_views import stream_feature_views_cmd -from feast.cli.ui import ui -from feast.cli.validation_references import validation_references_cmd -from feast.errors import FeastProviderLoginError -from feast.repo_config import load_repo_config -from feast.repo_operations import ( - apply_total, - cli_check_repo, - create_feature_store, - generate_project_name, - init_repo, - plan, - registry_dump, - teardown, -) -from feast.utils import maybe_local_tz - -_logger = logging.getLogger(__name__) - - -class NoOptionDefaultFormat(click.Command): - def format_options(self, ctx: click.Context, formatter: click.HelpFormatter): - """Writes all the options into the formatter if they exist.""" - opts = [] - for param in self.get_params(ctx): - rv = param.get_help_record(ctx) - if rv is not None: - opts.append(rv) - if opts: - with formatter.section("Options(No current command options)"): - formatter.write_dl(opts) - - -@click.group() -@click.option( - "--chdir", - "-c", - help="Switch to a different feature repository directory before executing the given subcommand.", -) -@click.option( - "--log-level", - default="warning", - help="The logging level. One of DEBUG, INFO, WARNING, ERROR, and CRITICAL (case-insensitive).", -) -@click.option( - "--feature-store-yaml", - "-f", - help="Override the directory where the CLI should look for the feature_store.yaml file.", -) -@click.pass_context -def cli( - ctx: click.Context, - chdir: Optional[str], - log_level: str, - feature_store_yaml: Optional[str], -): - """ - Feast CLI - - For more information, see our public docs at https://docs.feast.dev/ - """ - ctx.ensure_object(dict) - ctx.obj["CHDIR"] = Path.cwd() if chdir is None else Path(chdir).absolute() - ctx.obj["FS_YAML_FILE"] = ( - Path(feature_store_yaml).absolute() - if feature_store_yaml - else utils.get_default_yaml_file_path(ctx.obj["CHDIR"]) - ) - try: - level = getattr(logging, log_level.upper()) - logging.basicConfig( - format="%(asctime)s %(name)s %(levelname)s: %(message)s", - datefmt="%m/%d/%Y %I:%M:%S %p", - level=level, - ) - # Override the logging level for already created loggers (due to loggers being created at the import time) - # Note, that format & datefmt does not need to be set, because by default child loggers don't override them - - # Also note, that mypy complains that logging.root doesn't have "manager" because of the way it's written. - # So we have to put a type ignore hint for mypy. - for logger_name in logging.root.manager.loggerDict: # type: ignore - if "feast" in logger_name: - logger = logging.getLogger(logger_name) - logger.setLevel(level) - except Exception as e: - raise e - pass - - -@cli.command() -def version(): - """ - Display Feast SDK version - """ - print(f'Feast SDK Version: "{importlib_version("feast")}"') - - -@cli.command() -@click.pass_context -def configuration(ctx: click.Context): - """ - Display Feast configuration - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - repo_config = load_repo_config(repo, fs_yaml_file) - if repo_config: - config_dict = repo_config.model_dump(by_alias=True, exclude_unset=True) - config_dict.pop("repo_path", None) - print(yaml.dump(config_dict, default_flow_style=False, sort_keys=False)) - else: - print("No configuration found.") - - -@cli.command() -@click.pass_context -def endpoint(ctx: click.Context): - """ - Display feature server endpoints - """ - store = create_feature_store(ctx) - endpoint = store.get_feature_server_endpoint() - if endpoint is not None: - _logger.info( - f"Feature server endpoint: {Style.BRIGHT + Fore.GREEN}{endpoint}{Style.RESET_ALL}" - ) - else: - _logger.info("There is no active feature server.") - - -@cli.command("plan", cls=NoOptionDefaultFormat) -@click.option( - "--skip-source-validation", - is_flag=True, - help="Don't validate the data sources by checking for that the tables exist.", -) -@click.pass_context -def plan_command(ctx: click.Context, skip_source_validation: bool): - """ - Create or update a feature store deployment - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - repo_config = load_repo_config(repo, fs_yaml_file) - try: - plan(repo_config, repo, skip_source_validation) - except FeastProviderLoginError as e: - print(str(e)) - - -@cli.command("apply", cls=NoOptionDefaultFormat) -@click.option( - "--skip-source-validation", - is_flag=True, - help="Don't validate the data sources by checking for that the tables exist.", -) -@click.pass_context -def apply_total_command(ctx: click.Context, skip_source_validation: bool): - """ - Create or update a feature store deployment - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - - repo_config = load_repo_config(repo, fs_yaml_file) - try: - apply_total(repo_config, repo, skip_source_validation) - except FeastProviderLoginError as e: - print(str(e)) - - -@cli.command("teardown", cls=NoOptionDefaultFormat) -@click.pass_context -def teardown_command(ctx: click.Context): - """ - Tear down deployed feature store infrastructure - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - repo_config = load_repo_config(repo, fs_yaml_file) - - teardown(repo_config, repo) - - -@cli.command("registry-dump") -@click.pass_context -def registry_dump_command(ctx: click.Context): - """ - Print contents of the metadata registry - """ - repo = ctx.obj["CHDIR"] - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - repo_config = load_repo_config(repo, fs_yaml_file) - - click.echo(registry_dump(repo_config, repo_path=repo)) - - -@cli.command("materialize") -@click.argument("start_ts") -@click.argument("end_ts") -@click.option( - "--views", - "-v", - help="Feature views to materialize", - multiple=True, -) -@click.pass_context -def materialize_command( - ctx: click.Context, start_ts: str, end_ts: str, views: List[str] -): - """ - Run a (non-incremental) materialization job to ingest data into the online store. Feast - will read all data between START_TS and END_TS from the offline store and write it to the - online store. If you don't specify feature view names using --views, all registered Feature - Views will be materialized. - - START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' - """ - store = create_feature_store(ctx) - - store.materialize( - feature_views=None if not views else views, - start_date=utils.make_tzaware(parser.parse(start_ts)), - end_date=utils.make_tzaware(parser.parse(end_ts)), - ) - - -@cli.command("materialize-incremental") -@click.argument("end_ts") -@click.option( - "--views", - "-v", - help="Feature views to incrementally materialize", - multiple=True, -) -@click.pass_context -def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List[str]): - """ - Run an incremental materialization job to ingest new data into the online store. Feast will read - all data from the previously ingested point to END_TS from the offline store and write it to the - online store. If you don't specify feature view names using --views, all registered Feature - Views will be incrementally materialized. - - END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' - """ - store = create_feature_store(ctx) - store.materialize_incremental( - feature_views=None if not views else views, - end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)), - ) - - -@cli.command("init") -@click.argument("PROJECT_DIRECTORY", required=False) -@click.option( - "--minimal", "-m", is_flag=True, help="Create an empty project repository" -) -@click.option( - "--template", - "-t", - type=click.Choice( - [ - "local", - "gcp", - "aws", - "snowflake", - "spark", - "postgres", - "hbase", - "cassandra", - "hazelcast", - "ikv", - "couchbase", - "milvus", - ], - case_sensitive=False, - ), - help="Specify a template for the created project", - default="local", -) -def init_command(project_directory, minimal: bool, template: str): - """Create a new Feast repository""" - if not project_directory: - project_directory = generate_project_name() - - if minimal: - template = "minimal" - - init_repo(project_directory, template) - - -@cli.command("listen") -@click.option( - "--address", - "-a", - type=click.STRING, - default="localhost:50051", - show_default=True, - help="Address of the gRPC server", -) -@click.option( - "--max_workers", - "-w", - type=click.INT, - default=10, - show_default=False, - help="The maximum number of threads that can be used to execute the gRPC calls", -) -@click.option( - "--registry_ttl_sec", - "-r", - help="Number of seconds after which the registry is refreshed", - type=click.INT, - default=5, - show_default=True, -) -@click.pass_context -def listen_command( - ctx: click.Context, - address: str, - max_workers: int, - registry_ttl_sec: int, -): - """Start a gRPC feature server to ingest streaming features on given address""" - from feast.infra.contrib.grpc_server import get_grpc_server - - store = create_feature_store(ctx) - server = get_grpc_server(address, store, max_workers, registry_ttl_sec) - server.start() - server.wait_for_termination() - - -@cli.command("validate") -@click.option( - "--feature-service", - "-f", - help="Specify a feature service name", -) -@click.option( - "--reference", - "-r", - help="Specify a validation reference name", -) -@click.option( - "--no-profile-cache", - is_flag=True, - help="Do not store cached profile in registry", -) -@click.argument("start_ts") -@click.argument("end_ts") -@click.pass_context -def validate( - ctx: click.Context, - feature_service: str, - reference: str, - start_ts: str, - end_ts: str, - no_profile_cache, -): - """ - Perform validation of logged features (produced by a given feature service) against provided reference. - - START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' - """ - store = create_feature_store(ctx) - - _feature_service = store.get_feature_service(name=feature_service) - _reference = store.get_validation_reference(reference) - - result = store.validate_logged_features( - source=_feature_service, - reference=_reference, - start=maybe_local_tz(datetime.fromisoformat(start_ts)), - end=maybe_local_tz(datetime.fromisoformat(end_ts)), - throw_exception=False, - cache_profile=not no_profile_cache, - ) - - if not result: - print(f"{Style.BRIGHT + Fore.GREEN}Validation successful!{Style.RESET_ALL}") - return - - errors = [e.to_dict() for e in result.report.errors] - formatted_json = json.dumps(errors, indent=4) - colorful_json = highlight( - formatted_json, lexers.JsonLexer(), formatters.TerminalFormatter() - ) - print(f"{Style.BRIGHT + Fore.RED}Validation failed!{Style.RESET_ALL}") - print(colorful_json) - exit(1) - - -cli.add_command(data_sources_cmd) -cli.add_command(entities_cmd) -cli.add_command(feature_services_cmd) -cli.add_command(feature_views_cmd) -cli.add_command(features_cmd) -cli.add_command(get_historical_features) -cli.add_command(get_online_features) -cli.add_command(on_demand_feature_views_cmd) -cli.add_command(feast_permissions_cmd) -cli.add_command(projects_cmd) -cli.add_command(saved_datasets_cmd) -cli.add_command(stream_feature_views_cmd) -cli.add_command(validation_references_cmd) -cli.add_command(ui) -cli.add_command(serve_command) -cli.add_command(serve_offline_command) -cli.add_command(serve_registry_command) -cli.add_command(serve_transformations_command) - -if __name__ == "__main__": - cli() +# Copyright 2019 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +import logging +from datetime import datetime +from importlib.metadata import version as importlib_version +from pathlib import Path +from typing import List, Optional + +import click +import yaml +from colorama import Fore, Style +from dateutil import parser +from pygments import formatters, highlight, lexers + +from feast import utils +from feast.cli.data_sources import data_sources_cmd +from feast.cli.entities import entities_cmd +from feast.cli.feature_services import feature_services_cmd +from feast.cli.feature_views import feature_views_cmd +from feast.cli.features import ( + features_cmd, + get_historical_features, + get_online_features, +) +from feast.cli.on_demand_feature_views import on_demand_feature_views_cmd +from feast.cli.permissions import feast_permissions_cmd +from feast.cli.projects import projects_cmd +from feast.cli.saved_datasets import saved_datasets_cmd +from feast.cli.serve import ( + serve_command, + serve_offline_command, + serve_registry_command, + serve_transformations_command, +) +from feast.cli.stream_feature_views import stream_feature_views_cmd +from feast.cli.ui import ui +from feast.cli.validation_references import validation_references_cmd +from feast.errors import FeastProviderLoginError +from feast.repo_config import load_repo_config +from feast.repo_operations import ( + apply_total, + cli_check_repo, + create_feature_store, + generate_project_name, + init_repo, + plan, + registry_dump, + teardown, +) +from feast.utils import maybe_local_tz + +_logger = logging.getLogger(__name__) + + +class NoOptionDefaultFormat(click.Command): + def format_options(self, ctx: click.Context, formatter: click.HelpFormatter): + """Writes all the options into the formatter if they exist.""" + opts = [] + for param in self.get_params(ctx): + rv = param.get_help_record(ctx) + if rv is not None: + opts.append(rv) + if opts: + with formatter.section("Options(No current command options)"): + formatter.write_dl(opts) + + +@click.group() +@click.option( + "--chdir", + "-c", + help="Switch to a different feature repository directory before executing the given subcommand.", +) +@click.option( + "--log-level", + default="warning", + help="The logging level. One of DEBUG, INFO, WARNING, ERROR, and CRITICAL (case-insensitive).", +) +@click.option( + "--feature-store-yaml", + "-f", + help="Override the directory where the CLI should look for the feature_store.yaml file.", +) +@click.pass_context +def cli( + ctx: click.Context, + chdir: Optional[str], + log_level: str, + feature_store_yaml: Optional[str], +): + """ + Feast CLI + + For more information, see our public docs at https://docs.feast.dev/ + """ + ctx.ensure_object(dict) + ctx.obj["CHDIR"] = Path.cwd() if chdir is None else Path(chdir).absolute() + ctx.obj["FS_YAML_FILE"] = ( + Path(feature_store_yaml).absolute() + if feature_store_yaml + else utils.get_default_yaml_file_path(ctx.obj["CHDIR"]) + ) + try: + level = getattr(logging, log_level.upper()) + logging.basicConfig( + format="%(asctime)s %(name)s %(levelname)s: %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", + level=level, + ) + # Override the logging level for already created loggers (due to loggers being created at the import time) + # Note, that format & datefmt does not need to be set, because by default child loggers don't override them + + # Also note, that mypy complains that logging.root doesn't have "manager" because of the way it's written. + # So we have to put a type ignore hint for mypy. + for logger_name in logging.root.manager.loggerDict: # type: ignore + if "feast" in logger_name: + logger = logging.getLogger(logger_name) + logger.setLevel(level) + except Exception as e: + raise e + pass + + +@cli.command() +def version(): + """ + Display Feast SDK version + """ + print( + f'{Style.BRIGHT + Fore.BLUE}Feast SDK Version: {Style.BRIGHT + Fore.GREEN}"{importlib_version("feast")}"' + ) + + +@cli.command() +@click.argument("object_id") +@click.pass_context +def delete(ctx: click.Context, object_id: str): + """ + Delete Feast Object + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = create_feature_store(ctx) + + e = None + object_type = None + + # Order matters if names can overlap between types, + # though typically they shouldn't in a well-structured feature store. + object_getters_and_types = [ + (store.get_entity, "Entity"), + (store.get_feature_view, "FeatureView"), + (store.get_feature_service, "FeatureService"), + (store.get_data_source, "DataSource"), + (store.get_saved_dataset, "SavedDataset"), + (store.get_validation_reference, "ValidationReference"), + (store.get_stream_feature_view, "StreamFeatureView"), + (store.get_on_demand_feature_view, "OnDemandFeatureView"), + # Add other get_* methods here if needed + ] + + for getter, obj_type_str in object_getters_and_types: + try: + potential_e = getter(object_id) # type: ignore[operator] + if potential_e: + e = potential_e + object_type = obj_type_str + break + except Exception: + pass + + if isinstance(e, list): + e = e[0] + if e: + store.apply([e], objects_to_delete=[e], partial=False) + print( + f"{Style.BRIGHT + Fore.RED}Deleted {Style.BRIGHT + Fore.GREEN}{object_type} {Fore.YELLOW}{object_id} from {Fore.GREEN}{store.project}.{Style.RESET_ALL}" + ) + else: + print( + f"{Style.BRIGHT + Fore.GREEN}Object not found. Deletion skipped.{Style.RESET_ALL}" + ) + + +@cli.command() +@click.pass_context +def configuration(ctx: click.Context): + """ + Display Feast configuration + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) + if repo_config: + config_dict = repo_config.model_dump(by_alias=True, exclude_unset=True) + config_dict.pop("repo_path", None) + print(yaml.dump(config_dict, default_flow_style=False, sort_keys=False)) + else: + print("No configuration found.") + + +@cli.command() +@click.pass_context +def endpoint(ctx: click.Context): + """ + Display feature server endpoints + """ + store = create_feature_store(ctx) + endpoint = store.get_feature_server_endpoint() + if endpoint is not None: + _logger.info( + f"Feature server endpoint: {Style.BRIGHT + Fore.GREEN}{endpoint}{Style.RESET_ALL}" + ) + else: + _logger.info("There is no active feature server.") + + +@cli.command("plan", cls=NoOptionDefaultFormat) +@click.option( + "--skip-source-validation", + is_flag=True, + help="Don't validate the data sources by checking for that the tables exist.", +) +@click.pass_context +def plan_command(ctx: click.Context, skip_source_validation: bool): + """ + Create or update a feature store deployment + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) + try: + plan(repo_config, repo, skip_source_validation) + except FeastProviderLoginError as e: + print(str(e)) + + +@cli.command("apply", cls=NoOptionDefaultFormat) +@click.option( + "--skip-source-validation", + is_flag=True, + help="Don't validate the data sources by checking for that the tables exist.", +) +@click.pass_context +def apply_total_command(ctx: click.Context, skip_source_validation: bool): + """ + Create or update a feature store deployment + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + + repo_config = load_repo_config(repo, fs_yaml_file) + try: + apply_total(repo_config, repo, skip_source_validation) + except FeastProviderLoginError as e: + print(str(e)) + + +@cli.command("teardown", cls=NoOptionDefaultFormat) +@click.pass_context +def teardown_command(ctx: click.Context): + """ + Tear down deployed feature store infrastructure + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) + + teardown(repo_config, repo) + + +@cli.command("registry-dump") +@click.pass_context +def registry_dump_command(ctx: click.Context): + """ + Print contents of the metadata registry + """ + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + repo_config = load_repo_config(repo, fs_yaml_file) + + click.echo(registry_dump(repo_config, repo_path=repo)) + + +@cli.command("materialize") +@click.argument("start_ts") +@click.argument("end_ts") +@click.option( + "--views", + "-v", + help="Feature views to materialize", + multiple=True, +) +@click.pass_context +def materialize_command( + ctx: click.Context, start_ts: str, end_ts: str, views: List[str] +): + """ + Run a (non-incremental) materialization job to ingest data into the online store. Feast + will read all data between START_TS and END_TS from the offline store and write it to the + online store. If you don't specify feature view names using --views, all registered Feature + Views will be materialized. + + START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' + """ + store = create_feature_store(ctx) + + store.materialize( + feature_views=None if not views else views, + start_date=utils.make_tzaware(parser.parse(start_ts)), + end_date=utils.make_tzaware(parser.parse(end_ts)), + ) + + +@cli.command("materialize-incremental") +@click.argument("end_ts") +@click.option( + "--views", + "-v", + help="Feature views to incrementally materialize", + multiple=True, +) +@click.pass_context +def materialize_incremental_command(ctx: click.Context, end_ts: str, views: List[str]): + """ + Run an incremental materialization job to ingest new data into the online store. Feast will read + all data from the previously ingested point to END_TS from the offline store and write it to the + online store. If you don't specify feature view names using --views, all registered Feature + Views will be incrementally materialized. + + END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' + """ + store = create_feature_store(ctx) + store.materialize_incremental( + feature_views=None if not views else views, + end_date=utils.make_tzaware(datetime.fromisoformat(end_ts)), + ) + + +@cli.command("init") +@click.argument("PROJECT_DIRECTORY", required=False) +@click.option( + "--minimal", "-m", is_flag=True, help="Create an empty project repository" +) +@click.option( + "--template", + "-t", + type=click.Choice( + [ + "local", + "gcp", + "aws", + "snowflake", + "spark", + "postgres", + "hbase", + "cassandra", + "hazelcast", + "ikv", + "couchbase", + "milvus", + ], + case_sensitive=False, + ), + help="Specify a template for the created project", + default="local", +) +def init_command(project_directory, minimal: bool, template: str): + """Create a new Feast repository""" + if not project_directory: + project_directory = generate_project_name() + + if minimal: + template = "minimal" + + init_repo(project_directory, template) + + +@cli.command("listen") +@click.option( + "--address", + "-a", + type=click.STRING, + default="localhost:50051", + show_default=True, + help="Address of the gRPC server", +) +@click.option( + "--max_workers", + "-w", + type=click.INT, + default=10, + show_default=False, + help="The maximum number of threads that can be used to execute the gRPC calls", +) +@click.option( + "--registry_ttl_sec", + "-r", + help="Number of seconds after which the registry is refreshed", + type=click.INT, + default=5, + show_default=True, +) +@click.pass_context +def listen_command( + ctx: click.Context, + address: str, + max_workers: int, + registry_ttl_sec: int, +): + """Start a gRPC feature server to ingest streaming features on given address""" + from feast.infra.contrib.grpc_server import get_grpc_server + + store = create_feature_store(ctx) + server = get_grpc_server(address, store, max_workers, registry_ttl_sec) + server.start() + server.wait_for_termination() + + +@cli.command("validate") +@click.option( + "--feature-service", + "-f", + help="Specify a feature service name", +) +@click.option( + "--reference", + "-r", + help="Specify a validation reference name", +) +@click.option( + "--no-profile-cache", + is_flag=True, + help="Do not store cached profile in registry", +) +@click.argument("start_ts") +@click.argument("end_ts") +@click.pass_context +def validate( + ctx: click.Context, + feature_service: str, + reference: str, + start_ts: str, + end_ts: str, + no_profile_cache, +): + """ + Perform validation of logged features (produced by a given feature service) against provided reference. + + START_TS and END_TS should be in ISO 8601 format, e.g. '2021-07-16T19:20:01' + """ + store = create_feature_store(ctx) + + _feature_service = store.get_feature_service(name=feature_service) + _reference = store.get_validation_reference(reference) + + result = store.validate_logged_features( + source=_feature_service, + reference=_reference, + start=maybe_local_tz(datetime.fromisoformat(start_ts)), + end=maybe_local_tz(datetime.fromisoformat(end_ts)), + throw_exception=False, + cache_profile=not no_profile_cache, + ) + + if not result: + print(f"{Style.BRIGHT + Fore.GREEN}Validation successful!{Style.RESET_ALL}") + return + + errors = [e.to_dict() for e in result.report.errors] + formatted_json = json.dumps(errors, indent=4) + colorful_json = highlight( + formatted_json, lexers.JsonLexer(), formatters.TerminalFormatter() + ) + print(f"{Style.BRIGHT + Fore.RED}Validation failed!{Style.RESET_ALL}") + print(colorful_json) + exit(1) + + +cli.add_command(data_sources_cmd) +cli.add_command(entities_cmd) +cli.add_command(feature_services_cmd) +cli.add_command(feature_views_cmd) +cli.add_command(features_cmd) +cli.add_command(get_historical_features) +cli.add_command(get_online_features) +cli.add_command(on_demand_feature_views_cmd) +cli.add_command(feast_permissions_cmd) +cli.add_command(projects_cmd) +cli.add_command(saved_datasets_cmd) +cli.add_command(stream_feature_views_cmd) +cli.add_command(validation_references_cmd) +cli.add_command(ui) +cli.add_command(serve_command) +cli.add_command(serve_offline_command) +cli.add_command(serve_registry_command) +cli.add_command(serve_transformations_command) + +if __name__ == "__main__": + cli() diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index a3bf52fb10e..c958f0af889 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -1,516 +1,516 @@ -import base64 -import importlib -import json -import logging -import os -import random -import re -import sys -import tempfile -from importlib.abc import Loader -from importlib.machinery import ModuleSpec -from pathlib import Path -from typing import List, Optional, Set, Union - -import click -from click.exceptions import BadParameter - -from feast import PushSource -from feast.batch_feature_view import BatchFeatureView -from feast.constants import FEATURE_STORE_YAML_ENV_NAME -from feast.data_source import DataSource, KafkaSource, KinesisSource -from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add -from feast.entity import Entity -from feast.feature_service import FeatureService -from feast.feature_store import FeatureStore -from feast.feature_view import DUMMY_ENTITY, FeatureView -from feast.file_utils import replace_str_in_file -from feast.infra.registry.base_registry import BaseRegistry -from feast.infra.registry.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry -from feast.names import adjectives, animals -from feast.on_demand_feature_view import OnDemandFeatureView -from feast.permissions.permission import Permission -from feast.project import Project -from feast.repo_config import RepoConfig -from feast.repo_contents import RepoContents -from feast.stream_feature_view import StreamFeatureView - -logger = logging.getLogger(__name__) - - -def py_path_to_module(path: Path) -> str: - return ( - str(path.relative_to(os.getcwd()))[: -len(".py")] - .replace("./", "") - .replace("/", ".") - .replace("\\", ".") - ) - - -def read_feastignore(repo_root: Path) -> List[str]: - """Read .feastignore in the repo root directory (if exists) and return the list of user-defined ignore paths""" - feast_ignore = repo_root / ".feastignore" - if not feast_ignore.is_file(): - return [] - lines = feast_ignore.read_text().strip().split("\n") - ignore_paths = [] - for line in lines: - # Remove everything after the first occurance of "#" symbol (comments) - if line.find("#") >= 0: - line = line[: line.find("#")] - # Strip leading or ending whitespaces - line = line.strip() - # Add this processed line to ignore_paths if it's not empty - if len(line) > 0: - ignore_paths.append(line) - return ignore_paths - - -def get_ignore_files(repo_root: Path, ignore_paths: List[str]) -> Set[Path]: - """Get all ignore files that match any of the user-defined ignore paths""" - ignore_files = set() - for ignore_path in ignore_paths: - # ignore_path may contains matchers (* or **). Use glob() to match user-defined path to actual paths - for matched_path in repo_root.glob(ignore_path): - if matched_path.is_file(): - # If the matched path is a file, add that to ignore_files set - ignore_files.add(matched_path.resolve()) - else: - # Otherwise, list all Python files in that directory and add all of them to ignore_files set - ignore_files |= { - sub_path.resolve() - for sub_path in matched_path.glob("**/*.py") - if sub_path.is_file() - } - return ignore_files - - -def get_repo_files(repo_root: Path) -> List[Path]: - """Get the list of all repo files, ignoring undesired files & directories specified in .feastignore""" - # Read ignore paths from .feastignore and create a set of all files that match any of these paths - ignore_paths = read_feastignore(repo_root) - ignore_files = get_ignore_files(repo_root, ignore_paths) - ignore_paths += [ - ".git", - ".feastignore", - ".venv", - ".pytest_cache", - "__pycache__", - ".ipynb_checkpoints", - ] - - # List all Python files in the root directory (recursively) - repo_files = { - p.resolve() - for p in repo_root.glob("**/*.py") - if p.is_file() and "__init__.py" != p.name - } - # Ignore all files that match any of the ignore paths in .feastignore - repo_files -= ignore_files - - # Sort repo_files to read them in the same order every time - return sorted(repo_files) - - -def parse_repo(repo_root: Path) -> RepoContents: - """ - Collects unique Feast object definitions from the given feature repo. - - Specifically, if an object foo has already been added, bar will still be added if - (bar == foo), but not if (bar is foo). This ensures that import statements will - not result in duplicates, but defining two equal objects will. - """ - res = RepoContents( - projects=[], - data_sources=[], - entities=[], - feature_views=[], - feature_services=[], - on_demand_feature_views=[], - stream_feature_views=[], - permissions=[], - ) - - for repo_file in get_repo_files(repo_root): - module_path = py_path_to_module(repo_file) - module = importlib.import_module(module_path) - - for attr_name in dir(module): - obj = getattr(module, attr_name) - - if isinstance(obj, DataSource) and not any( - (obj is ds) for ds in res.data_sources - ): - res.data_sources.append(obj) - - # Handle batch sources defined within stream sources. - if ( - isinstance(obj, PushSource) - or isinstance(obj, KafkaSource) - or isinstance(obj, KinesisSource) - ): - batch_source = obj.batch_source - - if batch_source and not any( - (batch_source is ds) for ds in res.data_sources - ): - res.data_sources.append(batch_source) - if ( - isinstance(obj, FeatureView) - and not any((obj is fv) for fv in res.feature_views) - and not isinstance(obj, StreamFeatureView) - and not isinstance(obj, BatchFeatureView) - ): - res.feature_views.append(obj) - - # Handle batch sources defined with feature views. - batch_source = obj.batch_source - assert batch_source - if not any((batch_source is ds) for ds in res.data_sources): - res.data_sources.append(batch_source) - - # Handle stream sources defined with feature views. - if obj.stream_source: - stream_source = obj.stream_source - if not any((stream_source is ds) for ds in res.data_sources): - res.data_sources.append(stream_source) - elif isinstance(obj, StreamFeatureView) and not any( - (obj is sfv) for sfv in res.stream_feature_views - ): - res.stream_feature_views.append(obj) - - # Handle batch sources defined with feature views. - batch_source = obj.batch_source - if not any((batch_source is ds) for ds in res.data_sources): - res.data_sources.append(batch_source) - - # Handle stream sources defined with feature views. - assert obj.stream_source - stream_source = obj.stream_source - if not any((stream_source is ds) for ds in res.data_sources): - res.data_sources.append(stream_source) - elif isinstance(obj, BatchFeatureView) and not any( - (obj is bfv) for bfv in res.feature_views - ): - res.feature_views.append(obj) - - # Handle batch sources defined with feature views. - batch_source = obj.batch_source - if not any((batch_source is ds) for ds in res.data_sources): - res.data_sources.append(batch_source) - elif isinstance(obj, Entity) and not any( - (obj is entity) for entity in res.entities - ): - res.entities.append(obj) - elif isinstance(obj, FeatureService) and not any( - (obj is fs) for fs in res.feature_services - ): - res.feature_services.append(obj) - elif isinstance(obj, OnDemandFeatureView) and not any( - (obj is odfv) for odfv in res.on_demand_feature_views - ): - res.on_demand_feature_views.append(obj) - elif isinstance(obj, Permission) and not any( - (obj is p) for p in res.permissions - ): - res.permissions.append(obj) - elif isinstance(obj, Project) and not any((obj is p) for p in res.projects): - res.projects.append(obj) - - res.entities.append(DUMMY_ENTITY) - return res - - -def plan(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): - os.chdir(repo_path) - repo = _get_repo_contents(repo_path, repo_config.project) - for project in repo.projects: - repo_config.project = project.name - store, registry = _get_store_and_registry(repo_config) - # TODO: When we support multiple projects in a single repo, we should filter repo contents by project - if not skip_source_validation: - provider = store._get_provider() - data_sources = [t.batch_source for t in repo.feature_views] - # Make sure the data source used by this feature view is supported by Feast - for data_source in data_sources: - provider.validate_data_source(store.config, data_source) - - registry_diff, infra_diff, _ = store.plan(repo) - click.echo(registry_diff.to_string()) - click.echo(infra_diff.to_string()) - - -def _get_repo_contents(repo_path, project_name: Optional[str] = None): - sys.dont_write_bytecode = True - repo = parse_repo(repo_path) - - if len(repo.projects) < 1: - if project_name: - print( - f"No project found in the repository. Using project name {project_name} defined in feature_store.yaml" - ) - repo.projects.append(Project(name=project_name)) - else: - print( - "No project found in the repository. Either define Project in repository or define a project in feature_store.yaml" - ) - sys.exit(1) - elif len(repo.projects) == 1: - if repo.projects[0].name != project_name: - print( - "Project object name should match with the project name defined in feature_store.yaml" - ) - sys.exit(1) - else: - print( - "Multiple projects found in the repository. Currently no support for multiple projects" - ) - sys.exit(1) - - return repo - - -def _get_store_and_registry(repo_config): - store = FeatureStore(config=repo_config) - registry = store.registry - return store, registry - - -def extract_objects_for_apply_delete(project, registry, repo): - # TODO(achals): This code path should be refactored to handle added & kept entities separately. - ( - _, - objs_to_delete, - objs_to_update, - objs_to_add, - ) = extract_objects_for_keep_delete_update_add(registry, project, repo) - - all_to_apply: List[ - Union[ - Entity, - FeatureView, - OnDemandFeatureView, - StreamFeatureView, - FeatureService, - ] - ] = [] - for object_type in FEAST_OBJECT_TYPES: - to_apply = set(objs_to_add[object_type]).union(objs_to_update[object_type]) - all_to_apply.extend(to_apply) - - all_to_delete: List[ - Union[ - Entity, - FeatureView, - OnDemandFeatureView, - StreamFeatureView, - FeatureService, - ] - ] = [] - for object_type in FEAST_OBJECT_TYPES: - all_to_delete.extend(objs_to_delete[object_type]) - - return ( - all_to_apply, - all_to_delete, - set(objs_to_add[FeastObjectType.FEATURE_VIEW]).union( - set(objs_to_update[FeastObjectType.FEATURE_VIEW]) - ), - objs_to_delete[FeastObjectType.FEATURE_VIEW], - ) - - -def apply_total_with_repo_instance( - store: FeatureStore, - project_name: str, - registry: BaseRegistry, - repo: RepoContents, - skip_source_validation: bool, -): - if not skip_source_validation: - provider = store._get_provider() - data_sources = [t.batch_source for t in repo.feature_views] - # Make sure the data source used by this feature view is supported by Feast - for data_source in data_sources: - provider.validate_data_source(store.config, data_source) - - # For each object in the registry, determine whether it should be kept or deleted. - ( - all_to_apply, - all_to_delete, - views_to_keep, - views_to_delete, - ) = extract_objects_for_apply_delete(project_name, registry, repo) - - if store._should_use_plan(): - registry_diff, infra_diff, new_infra = store.plan(repo) - click.echo(registry_diff.to_string()) - - store._apply_diffs(registry_diff, infra_diff, new_infra) - click.echo(infra_diff.to_string()) - else: - store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) - log_infra_changes(views_to_keep, views_to_delete) - - -def log_infra_changes( - views_to_keep: Set[FeatureView], views_to_delete: Set[FeatureView] -): - from colorama import Fore, Style - - for view in views_to_keep: - click.echo( - f"Deploying infrastructure for {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" - ) - for view in views_to_delete: - click.echo( - f"Removing infrastructure for {Style.BRIGHT + Fore.RED}{view.name}{Style.RESET_ALL}" - ) - - -def create_feature_store( - ctx: click.Context, -) -> FeatureStore: - repo = ctx.obj["CHDIR"] - # If we received a base64 encoded version of feature_store.yaml, use that - config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME) - if config_base64: - print("Received base64 encoded feature_store.yaml") - config_bytes = base64.b64decode(config_base64) - # Create a new unique directory for writing feature_store.yaml - repo_path = Path(tempfile.mkdtemp()) - with open(repo_path / "feature_store.yaml", "wb") as f: - f.write(config_bytes) - return FeatureStore(repo_path=str(repo_path.resolve())) - else: - fs_yaml_file = ctx.obj["FS_YAML_FILE"] - cli_check_repo(repo, fs_yaml_file) - return FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) - - -def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): - os.chdir(repo_path) - repo = _get_repo_contents(repo_path, repo_config.project) - for project in repo.projects: - repo_config.project = project.name - store, registry = _get_store_and_registry(repo_config) - if not is_valid_name(project.name): - print( - f"{project.name} is not valid. Project name should only have " - f"alphanumerical values and underscores but not start with an underscore." - ) - sys.exit(1) - # TODO: When we support multiple projects in a single repo, we should filter repo contents by project. Currently there is no way to associate Feast objects to project. - print(f"Applying changes for project {project.name}") - apply_total_with_repo_instance( - store, project.name, registry, repo, skip_source_validation - ) - - -def teardown(repo_config: RepoConfig, repo_path: Optional[str]): - # Cannot pass in both repo_path and repo_config to FeatureStore. - feature_store = FeatureStore(repo_path=repo_path, config=repo_config) - feature_store.teardown() - - -def registry_dump(repo_config: RepoConfig, repo_path: Path) -> str: - """For debugging only: output contents of the metadata registry""" - registry_config = repo_config.registry - project = repo_config.project - registry = Registry( - project, - registry_config=registry_config, - repo_path=repo_path, - auth_config=repo_config.auth_config, - ) - registry_dict = registry.to_dict(project=project) - return json.dumps(registry_dict, indent=2, sort_keys=True) - - -def cli_check_repo(repo_path: Path, fs_yaml_file: Path): - sys.path.append(str(repo_path)) - if not fs_yaml_file.exists(): - print( - f"Can't find feature repo configuration file at {fs_yaml_file}. " - "Make sure you're running feast from an initialized feast repository." - ) - sys.exit(1) - - -def init_repo(repo_name: str, template: str): - import os - from pathlib import Path - from shutil import copytree - - from colorama import Fore, Style - - if not is_valid_name(repo_name): - raise BadParameter( - message="Name should be alphanumeric values and underscores but not start with an underscore", - param_hint="PROJECT_DIRECTORY", - ) - repo_path = Path(os.path.join(Path.cwd(), repo_name)) - repo_path.mkdir(exist_ok=True) - repo_config_path = repo_path / "feature_store.yaml" - - if repo_config_path.exists(): - new_directory = os.path.relpath(repo_path, os.getcwd()) - - print( - f"The directory {Style.BRIGHT + Fore.GREEN}{new_directory}{Style.RESET_ALL} contains an existing feature " - f"store repository that may cause a conflict" - ) - print() - sys.exit(1) - - # Copy template directory - template_path = str(Path(Path(__file__).parent / "templates" / template).absolute()) - if not os.path.exists(template_path): - raise IOError(f"Could not find template {template}") - copytree(template_path, str(repo_path), dirs_exist_ok=True) - - # Rename gitignore files back to .gitignore - for gitignore_path in repo_path.rglob("gitignore"): - gitignore_path.rename(gitignore_path.with_name(".gitignore")) - - # Seed the repository - bootstrap_path = repo_path / "bootstrap.py" - if os.path.exists(bootstrap_path): - import importlib.util - - spec = importlib.util.spec_from_file_location("bootstrap", str(bootstrap_path)) - assert isinstance(spec, ModuleSpec) - bootstrap = importlib.util.module_from_spec(spec) - assert isinstance(spec.loader, Loader) - spec.loader.exec_module(bootstrap) - bootstrap.bootstrap() # type: ignore - os.remove(bootstrap_path) - - # Template the feature_store.yaml file - feature_store_yaml_path = repo_path / "feature_repo" / "feature_store.yaml" - replace_str_in_file( - feature_store_yaml_path, "project: my_project", f"project: {repo_name}" - ) - - # Remove the __pycache__ folder if it exists - import shutil - - shutil.rmtree(repo_path / "__pycache__", ignore_errors=True) - - import click - - click.echo() - click.echo( - f"Creating a new Feast repository in {Style.BRIGHT + Fore.GREEN}{repo_path}{Style.RESET_ALL}." - ) - click.echo() - - -def is_valid_name(name: str) -> bool: - """A name should be alphanumeric values and underscores but not start with an underscore""" - return not name.startswith("_") and re.compile(r"\W+").search(name) is None - - -def generate_project_name() -> str: - """Generates a unique project name""" - return f"{random.choice(adjectives)}_{random.choice(animals)}" +import base64 +import importlib +import json +import logging +import os +import random +import re +import sys +import tempfile +from importlib.abc import Loader +from importlib.machinery import ModuleSpec +from pathlib import Path +from typing import List, Optional, Set, Union + +import click +from click.exceptions import BadParameter + +from feast import PushSource +from feast.batch_feature_view import BatchFeatureView +from feast.constants import FEATURE_STORE_YAML_ENV_NAME +from feast.data_source import DataSource, KafkaSource, KinesisSource +from feast.diff.registry_diff import extract_objects_for_keep_delete_update_add +from feast.entity import Entity +from feast.feature_service import FeatureService +from feast.feature_store import FeatureStore +from feast.feature_view import DUMMY_ENTITY, FeatureView +from feast.file_utils import replace_str_in_file +from feast.infra.registry.base_registry import BaseRegistry +from feast.infra.registry.registry import FEAST_OBJECT_TYPES, FeastObjectType, Registry +from feast.names import adjectives, animals +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.permissions.permission import Permission +from feast.project import Project +from feast.repo_config import RepoConfig +from feast.repo_contents import RepoContents +from feast.stream_feature_view import StreamFeatureView + +logger = logging.getLogger(__name__) + + +def py_path_to_module(path: Path) -> str: + return ( + str(path.relative_to(os.getcwd()))[: -len(".py")] + .replace("./", "") + .replace("/", ".") + .replace("\\", ".") + ) + + +def read_feastignore(repo_root: Path) -> List[str]: + """Read .feastignore in the repo root directory (if exists) and return the list of user-defined ignore paths""" + feast_ignore = repo_root / ".feastignore" + if not feast_ignore.is_file(): + return [] + lines = feast_ignore.read_text().strip().split("\n") + ignore_paths = [] + for line in lines: + # Remove everything after the first occurance of "#" symbol (comments) + if line.find("#") >= 0: + line = line[: line.find("#")] + # Strip leading or ending whitespaces + line = line.strip() + # Add this processed line to ignore_paths if it's not empty + if len(line) > 0: + ignore_paths.append(line) + return ignore_paths + + +def get_ignore_files(repo_root: Path, ignore_paths: List[str]) -> Set[Path]: + """Get all ignore files that match any of the user-defined ignore paths""" + ignore_files = set() + for ignore_path in ignore_paths: + # ignore_path may contains matchers (* or **). Use glob() to match user-defined path to actual paths + for matched_path in repo_root.glob(ignore_path): + if matched_path.is_file(): + # If the matched path is a file, add that to ignore_files set + ignore_files.add(matched_path.resolve()) + else: + # Otherwise, list all Python files in that directory and add all of them to ignore_files set + ignore_files |= { + sub_path.resolve() + for sub_path in matched_path.glob("**/*.py") + if sub_path.is_file() + } + return ignore_files + + +def get_repo_files(repo_root: Path) -> List[Path]: + """Get the list of all repo files, ignoring undesired files & directories specified in .feastignore""" + # Read ignore paths from .feastignore and create a set of all files that match any of these paths + ignore_paths = read_feastignore(repo_root) + ignore_files = get_ignore_files(repo_root, ignore_paths) + ignore_paths += [ + ".git", + ".feastignore", + ".venv", + ".pytest_cache", + "__pycache__", + ".ipynb_checkpoints", + ] + + # List all Python files in the root directory (recursively) + repo_files = { + p.resolve() + for p in repo_root.glob("**/*.py") + if p.is_file() and "__init__.py" != p.name + } + # Ignore all files that match any of the ignore paths in .feastignore + repo_files -= ignore_files + + # Sort repo_files to read them in the same order every time + return sorted(repo_files) + + +def parse_repo(repo_root: Path) -> RepoContents: + """ + Collects unique Feast object definitions from the given feature repo. + + Specifically, if an object foo has already been added, bar will still be added if + (bar == foo), but not if (bar is foo). This ensures that import statements will + not result in duplicates, but defining two equal objects will. + """ + res = RepoContents( + projects=[], + data_sources=[], + entities=[], + feature_views=[], + feature_services=[], + on_demand_feature_views=[], + stream_feature_views=[], + permissions=[], + ) + + for repo_file in get_repo_files(repo_root): + module_path = py_path_to_module(repo_file) + module = importlib.import_module(module_path) + + for attr_name in dir(module): + obj = getattr(module, attr_name) + + if isinstance(obj, DataSource) and not any( + (obj is ds) for ds in res.data_sources + ): + res.data_sources.append(obj) + + # Handle batch sources defined within stream sources. + if ( + isinstance(obj, PushSource) + or isinstance(obj, KafkaSource) + or isinstance(obj, KinesisSource) + ): + batch_source = obj.batch_source + + if batch_source and not any( + (batch_source is ds) for ds in res.data_sources + ): + res.data_sources.append(batch_source) + if ( + isinstance(obj, FeatureView) + and not any((obj is fv) for fv in res.feature_views) + and not isinstance(obj, StreamFeatureView) + and not isinstance(obj, BatchFeatureView) + ): + res.feature_views.append(obj) + + # Handle batch sources defined with feature views. + batch_source = obj.batch_source + assert batch_source + if not any((batch_source is ds) for ds in res.data_sources): + res.data_sources.append(batch_source) + + # Handle stream sources defined with feature views. + if obj.stream_source: + stream_source = obj.stream_source + if not any((stream_source is ds) for ds in res.data_sources): + res.data_sources.append(stream_source) + elif isinstance(obj, StreamFeatureView) and not any( + (obj is sfv) for sfv in res.stream_feature_views + ): + res.stream_feature_views.append(obj) + + # Handle batch sources defined with feature views. + batch_source = obj.batch_source + if not any((batch_source is ds) for ds in res.data_sources): + res.data_sources.append(batch_source) + + # Handle stream sources defined with feature views. + assert obj.stream_source + stream_source = obj.stream_source + if not any((stream_source is ds) for ds in res.data_sources): + res.data_sources.append(stream_source) + elif isinstance(obj, BatchFeatureView) and not any( + (obj is bfv) for bfv in res.feature_views + ): + res.feature_views.append(obj) + + # Handle batch sources defined with feature views. + batch_source = obj.batch_source + if not any((batch_source is ds) for ds in res.data_sources): + res.data_sources.append(batch_source) + elif isinstance(obj, Entity) and not any( + (obj is entity) for entity in res.entities + ): + res.entities.append(obj) + elif isinstance(obj, FeatureService) and not any( + (obj is fs) for fs in res.feature_services + ): + res.feature_services.append(obj) + elif isinstance(obj, OnDemandFeatureView) and not any( + (obj is odfv) for odfv in res.on_demand_feature_views + ): + res.on_demand_feature_views.append(obj) + elif isinstance(obj, Permission) and not any( + (obj is p) for p in res.permissions + ): + res.permissions.append(obj) + elif isinstance(obj, Project) and not any((obj is p) for p in res.projects): + res.projects.append(obj) + + res.entities.append(DUMMY_ENTITY) + return res + + +def plan(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): + os.chdir(repo_path) + repo = _get_repo_contents(repo_path, repo_config.project) + for project in repo.projects: + repo_config.project = project.name + store, registry = _get_store_and_registry(repo_config) + # TODO: When we support multiple projects in a single repo, we should filter repo contents by project + if not skip_source_validation: + provider = store._get_provider() + data_sources = [t.batch_source for t in repo.feature_views] + # Make sure the data source used by this feature view is supported by Feast + for data_source in data_sources: + provider.validate_data_source(store.config, data_source) + + registry_diff, infra_diff, _ = store.plan(repo) + click.echo(registry_diff.to_string()) + click.echo(infra_diff.to_string()) + + +def _get_repo_contents(repo_path, project_name: Optional[str] = None): + sys.dont_write_bytecode = True + repo = parse_repo(repo_path) + + if len(repo.projects) < 1: + if project_name: + print( + f"No project found in the repository. Using project name {project_name} defined in feature_store.yaml" + ) + repo.projects.append(Project(name=project_name)) + else: + print( + "No project found in the repository. Either define Project in repository or define a project in feature_store.yaml" + ) + sys.exit(1) + elif len(repo.projects) == 1: + if repo.projects[0].name != project_name: + print( + "Project object name should match with the project name defined in feature_store.yaml" + ) + sys.exit(1) + else: + print( + "Multiple projects found in the repository. Currently no support for multiple projects" + ) + sys.exit(1) + + return repo + + +def _get_store_and_registry(repo_config): + store = FeatureStore(config=repo_config) + registry = store.registry + return store, registry + + +def extract_objects_for_apply_delete(project, registry, repo): + # TODO(achals): This code path should be refactored to handle added & kept entities separately. + ( + _, + objs_to_delete, + objs_to_update, + objs_to_add, + ) = extract_objects_for_keep_delete_update_add(registry, project, repo) + + all_to_apply: List[ + Union[ + Entity, + FeatureView, + OnDemandFeatureView, + StreamFeatureView, + FeatureService, + ] + ] = [] + for object_type in FEAST_OBJECT_TYPES: + to_apply = set(objs_to_add[object_type]).union(objs_to_update[object_type]) + all_to_apply.extend(to_apply) + + all_to_delete: List[ + Union[ + Entity, + FeatureView, + OnDemandFeatureView, + StreamFeatureView, + FeatureService, + ] + ] = [] + for object_type in FEAST_OBJECT_TYPES: + all_to_delete.extend(objs_to_delete[object_type]) + + return ( + all_to_apply, + all_to_delete, + set(objs_to_add[FeastObjectType.FEATURE_VIEW]).union( + set(objs_to_update[FeastObjectType.FEATURE_VIEW]) + ), + objs_to_delete[FeastObjectType.FEATURE_VIEW], + ) + + +def apply_total_with_repo_instance( + store: FeatureStore, + project_name: str, + registry: BaseRegistry, + repo: RepoContents, + skip_source_validation: bool, +): + if not skip_source_validation: + provider = store._get_provider() + data_sources = [t.batch_source for t in repo.feature_views] + # Make sure the data source used by this feature view is supported by Feast + for data_source in data_sources: + provider.validate_data_source(store.config, data_source) + + # For each object in the registry, determine whether it should be kept or deleted. + ( + all_to_apply, + all_to_delete, + views_to_keep, + views_to_delete, + ) = extract_objects_for_apply_delete(project_name, registry, repo) + + if store._should_use_plan(): + registry_diff, infra_diff, new_infra = store.plan(repo) + click.echo(registry_diff.to_string()) + + store._apply_diffs(registry_diff, infra_diff, new_infra) + click.echo(infra_diff.to_string()) + else: + store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) + log_infra_changes(views_to_keep, views_to_delete) + + +def log_infra_changes( + views_to_keep: Set[FeatureView], views_to_delete: Set[FeatureView] +): + from colorama import Fore, Style + + for view in views_to_keep: + click.echo( + f"Deploying infrastructure for {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" + ) + for view in views_to_delete: + click.echo( + f"Removing infrastructure for {Style.BRIGHT + Fore.RED}{view.name}{Style.RESET_ALL}" + ) + + +def create_feature_store( + ctx: click.Context, +) -> FeatureStore: + repo = ctx.obj["CHDIR"] + # If we received a base64 encoded version of feature_store.yaml, use that + config_base64 = os.getenv(FEATURE_STORE_YAML_ENV_NAME) + if config_base64: + print("Received base64 encoded feature_store.yaml") + config_bytes = base64.b64decode(config_base64) + # Create a new unique directory for writing feature_store.yaml + repo_path = Path(tempfile.mkdtemp()) + with open(repo_path / "feature_store.yaml", "wb") as f: + f.write(config_bytes) + return FeatureStore(repo_path=str(repo_path.resolve())) + else: + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + return FeatureStore(repo_path=str(repo), fs_yaml_file=fs_yaml_file) + + +def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): + os.chdir(repo_path) + repo = _get_repo_contents(repo_path, repo_config.project) + for project in repo.projects: + repo_config.project = project.name + store, registry = _get_store_and_registry(repo_config) + if not is_valid_name(project.name): + print( + f"{project.name} is not valid. Project name should only have " + f"alphanumerical values and underscores but not start with an underscore." + ) + sys.exit(1) + # TODO: When we support multiple projects in a single repo, we should filter repo contents by project. Currently there is no way to associate Feast objects to project. + print(f"Applying changes for project {project.name}") + apply_total_with_repo_instance( + store, project.name, registry, repo, skip_source_validation + ) + + +def teardown(repo_config: RepoConfig, repo_path: Optional[str]): + # Cannot pass in both repo_path and repo_config to FeatureStore. + feature_store = FeatureStore(repo_path=repo_path, config=repo_config) + feature_store.teardown() + + +def registry_dump(repo_config: RepoConfig, repo_path: Path) -> str: + """For debugging only: output contents of the metadata registry""" + registry_config = repo_config.registry + project = repo_config.project + registry = Registry( + project, + registry_config=registry_config, + repo_path=repo_path, + auth_config=repo_config.auth_config, + ) + registry_dict = registry.to_dict(project=project) + return json.dumps(registry_dict, indent=2, sort_keys=True) + + +def cli_check_repo(repo_path: Path, fs_yaml_file: Path): + sys.path.append(str(repo_path)) + if not fs_yaml_file.exists(): + print( + f"Can't find feature repo configuration file at {fs_yaml_file}. " + "Make sure you're running feast from an initialized feast repository." + ) + sys.exit(1) + + +def init_repo(repo_name: str, template: str): + import os + from pathlib import Path + from shutil import copytree + + from colorama import Fore, Style + + if not is_valid_name(repo_name): + raise BadParameter( + message="Name should be alphanumeric values and underscores but not start with an underscore", + param_hint="PROJECT_DIRECTORY", + ) + repo_path = Path(os.path.join(Path.cwd(), repo_name)) + repo_path.mkdir(exist_ok=True) + repo_config_path = repo_path / "feature_store.yaml" + + if repo_config_path.exists(): + new_directory = os.path.relpath(repo_path, os.getcwd()) + + print( + f"The directory {Style.BRIGHT + Fore.GREEN}{new_directory}{Style.RESET_ALL} contains an existing feature " + f"store repository that may cause a conflict" + ) + print() + sys.exit(1) + + # Copy template directory + template_path = str(Path(Path(__file__).parent / "templates" / template).absolute()) + if not os.path.exists(template_path): + raise IOError(f"Could not find template {template}") + copytree(template_path, str(repo_path), dirs_exist_ok=True) + + # Rename gitignore files back to .gitignore + for gitignore_path in repo_path.rglob("gitignore"): + gitignore_path.rename(gitignore_path.with_name(".gitignore")) + + # Seed the repository + bootstrap_path = repo_path / "bootstrap.py" + if os.path.exists(bootstrap_path): + import importlib.util + + spec = importlib.util.spec_from_file_location("bootstrap", str(bootstrap_path)) + assert isinstance(spec, ModuleSpec) + bootstrap = importlib.util.module_from_spec(spec) + assert isinstance(spec.loader, Loader) + spec.loader.exec_module(bootstrap) + bootstrap.bootstrap() # type: ignore + os.remove(bootstrap_path) + + # Template the feature_store.yaml file + feature_store_yaml_path = repo_path / "feature_repo" / "feature_store.yaml" + replace_str_in_file( + feature_store_yaml_path, "project: my_project", f"project: {repo_name}" + ) + + # Remove the __pycache__ folder if it exists + import shutil + + shutil.rmtree(repo_path / "__pycache__", ignore_errors=True) + + import click + + click.echo() + click.echo( + f"Creating a new Feast repository in {Style.BRIGHT + Fore.GREEN}{repo_path}{Style.RESET_ALL}." + ) + click.echo() + + +def is_valid_name(name: str) -> bool: + """A name should be alphanumeric values and underscores but not start with an underscore""" + return not name.startswith("_") and re.compile(r"\W+").search(name) is None + + +def generate_project_name() -> str: + """Generates a unique project name""" + return f"{random.choice(adjectives)}_{random.choice(animals)}" diff --git a/sdk/python/tests/unit/cli/test_cli.py b/sdk/python/tests/unit/cli/test_cli.py index b09eabebb80..0dd51e9ce68 100644 --- a/sdk/python/tests/unit/cli/test_cli.py +++ b/sdk/python/tests/unit/cli/test_cli.py @@ -1,192 +1,192 @@ -import os -import tempfile -from contextlib import contextmanager -from pathlib import Path -from textwrap import dedent -from unittest import mock - -from assertpy import assertpy - -from tests.utils.cli_repo_creator import CliRunner - - -def test_3rd_party_providers() -> None: - """ - Test running apply on third party providers - """ - runner = CliRunner() - # Check with incorrect built-in provider name (no dots) - with setup_third_party_provider_repo("feast123") as repo_path: - return_code, output = runner.run_with_output(["apply"], cwd=repo_path) - assertpy.assert_that(return_code).is_equal_to(1) - assertpy.assert_that(output).contains(b"Provider 'feast123' is not implemented") - # Check with incorrect third-party provider name (with dots) - with setup_third_party_provider_repo("feast_foo.Provider") as repo_path: - return_code, output = runner.run_with_output(["apply"], cwd=repo_path) - assertpy.assert_that(return_code).is_equal_to(1) - assertpy.assert_that(output).contains( - b"Could not import module 'feast_foo' while attempting to load class 'Provider'" - ) - # Check with incorrect third-party provider name (with dots) - with setup_third_party_provider_repo("foo.FooProvider") as repo_path: - return_code, output = runner.run_with_output(["apply"], cwd=repo_path) - assertpy.assert_that(return_code).is_equal_to(1) - assertpy.assert_that(output).contains( - b"Could not import class 'FooProvider' from module 'foo'" - ) - # Check with correct third-party provider name - with setup_third_party_provider_repo("foo.provider.FooProvider") as repo_path: - return_code, output = runner.run_with_output(["apply"], cwd=repo_path) - assertpy.assert_that(return_code).is_equal_to(0) - - -def test_3rd_party_registry_store() -> None: - """ - Test running apply on third party registry stores - """ - runner = CliRunner() - # Check with incorrect built-in provider name (no dots) - with setup_third_party_registry_store_repo("feast123") as repo_path: - return_code, output = runner.run_with_output(["apply"], cwd=repo_path) - assertpy.assert_that(return_code).is_equal_to(1) - assertpy.assert_that(output).contains( - b'Registry store class name should end with "RegistryStore"' - ) - # Check with incorrect third-party registry store name (with dots) - with setup_third_party_registry_store_repo("feast_foo.RegistryStore") as repo_path: - return_code, output = runner.run_with_output(["apply"], cwd=repo_path) - assertpy.assert_that(return_code).is_equal_to(1) - assertpy.assert_that(output).contains( - b"Could not import module 'feast_foo' while attempting to load class 'RegistryStore'" - ) - # Check with incorrect third-party registry store name (with dots) - with setup_third_party_registry_store_repo("foo.FooRegistryStore") as repo_path: - return_code, output = runner.run_with_output(["apply"], cwd=repo_path) - assertpy.assert_that(return_code).is_equal_to(1) - assertpy.assert_that(output).contains( - b"Could not import class 'FooRegistryStore' from module 'foo'" - ) - # Check with correct third-party registry store name - with setup_third_party_registry_store_repo( - "foo.registry_store.FooRegistryStore" - ) as repo_path: - return_code, output = runner.run_with_output(["apply"], cwd=repo_path) - assertpy.assert_that(return_code).is_equal_to(0) - - -def test_3rd_party_registry_store_with_fs_yaml_override() -> None: - runner = CliRunner() - - fs_yaml_file = "test_fs.yaml" - with setup_third_party_registry_store_repo( - "foo.registry_store.FooRegistryStore", fs_yaml_file_name=fs_yaml_file - ) as repo_path: - return_code, output = runner.run_with_output( - ["--feature-store-yaml", fs_yaml_file, "apply"], cwd=repo_path - ) - assertpy.assert_that(return_code).is_equal_to(0) - - -def test_3rd_party_registry_store_with_fs_yaml_override_by_env_var() -> None: - runner = CliRunner() - - fs_yaml_file = "test_fs.yaml" - with setup_third_party_registry_store_repo( - "foo.registry_store.FooRegistryStore", fs_yaml_file_name=fs_yaml_file - ) as repo_path: - custom_yaml_path = os.path.join(repo_path, fs_yaml_file) - with mock.patch.dict( - "os.environ", {"FEAST_FS_YAML_FILE_PATH": custom_yaml_path}, clear=True - ): - return_code, output = runner.run_with_output(["apply"], cwd=repo_path) - assertpy.assert_that(return_code).is_equal_to(0) - - -@contextmanager -def setup_third_party_provider_repo(provider_name: str): - with tempfile.TemporaryDirectory() as repo_dir_name: - # Construct an example repo in a temporary dir - repo_path = Path(repo_dir_name) - - repo_config = repo_path / "feature_store.yaml" - - repo_config.write_text( - dedent( - f""" - project: foo - registry: data/registry.db - provider: {provider_name} - online_store: - path: data/online_store.db - type: sqlite - offline_store: - type: file - entity_key_serialization_version: 2 - """ - ) - ) - - (repo_path / "foo").mkdir() - repo_example = repo_path / "foo/provider.py" - repo_example.write_text( - (Path(__file__).parents[2] / "foo_provider.py").read_text() - ) - - yield repo_path - - -@contextmanager -def setup_third_party_registry_store_repo( - registry_store: str, fs_yaml_file_name: str = "feature_store.yaml" -): - with tempfile.TemporaryDirectory() as repo_dir_name: - # Construct an example repo in a temporary dir - repo_path = Path(repo_dir_name) - - repo_config = repo_path / fs_yaml_file_name - - repo_config.write_text( - dedent( - f""" - project: foo - registry: - registry_store_type: {registry_store} - path: foobar://foo.bar - provider: local - online_store: - path: data/online_store.db - type: sqlite - offline_store: - type: file - entity_key_serialization_version: 2 - """ - ) - ) - - (repo_path / "foo").mkdir() - repo_example = repo_path / "foo/registry_store.py" - repo_example.write_text( - (Path(__file__).parents[2] / "foo_registry_store.py").read_text() - ) - - yield repo_path - - -def test_cli_configuration(): - """ - Unit test for the 'feast configuration' command - """ - runner = CliRunner() - - with setup_third_party_provider_repo("local") as repo_path: - # Run the 'feast configuration' command - return_code, output = runner.run_with_output(["configuration"], cwd=repo_path) - - # Assertions - assertpy.assert_that(return_code).is_equal_to(0) - assertpy.assert_that(output).contains(b"project: foo") - assertpy.assert_that(output).contains(b"provider: local") - assertpy.assert_that(output).contains(b"type: sqlite") - assertpy.assert_that(output).contains(b"path: data/online_store.db") - assertpy.assert_that(output).contains(b"type: file") - assertpy.assert_that(output).contains(b"entity_key_serialization_version: 2") +import os +import tempfile +from contextlib import contextmanager +from pathlib import Path +from textwrap import dedent +from unittest import mock + +from assertpy import assertpy + +from tests.utils.cli_repo_creator import CliRunner + + +def test_3rd_party_providers() -> None: + """ + Test running apply on third party providers + """ + runner = CliRunner() + # Check with incorrect built-in provider name (no dots) + with setup_third_party_provider_repo("feast123") as repo_path: + return_code, output = runner.run_with_output(["apply"], cwd=repo_path) + assertpy.assert_that(return_code).is_equal_to(1) + assertpy.assert_that(output).contains(b"Provider 'feast123' is not implemented") + # Check with incorrect third-party provider name (with dots) + with setup_third_party_provider_repo("feast_foo.Provider") as repo_path: + return_code, output = runner.run_with_output(["apply"], cwd=repo_path) + assertpy.assert_that(return_code).is_equal_to(1) + assertpy.assert_that(output).contains( + b"Could not import module 'feast_foo' while attempting to load class 'Provider'" + ) + # Check with incorrect third-party provider name (with dots) + with setup_third_party_provider_repo("foo.FooProvider") as repo_path: + return_code, output = runner.run_with_output(["apply"], cwd=repo_path) + assertpy.assert_that(return_code).is_equal_to(1) + assertpy.assert_that(output).contains( + b"Could not import class 'FooProvider' from module 'foo'" + ) + # Check with correct third-party provider name + with setup_third_party_provider_repo("foo.provider.FooProvider") as repo_path: + return_code, output = runner.run_with_output(["apply"], cwd=repo_path) + assertpy.assert_that(return_code).is_equal_to(0) + + +def test_3rd_party_registry_store() -> None: + """ + Test running apply on third party registry stores + """ + runner = CliRunner() + # Check with incorrect built-in provider name (no dots) + with setup_third_party_registry_store_repo("feast123") as repo_path: + return_code, output = runner.run_with_output(["apply"], cwd=repo_path) + assertpy.assert_that(return_code).is_equal_to(1) + assertpy.assert_that(output).contains( + b'Registry store class name should end with "RegistryStore"' + ) + # Check with incorrect third-party registry store name (with dots) + with setup_third_party_registry_store_repo("feast_foo.RegistryStore") as repo_path: + return_code, output = runner.run_with_output(["apply"], cwd=repo_path) + assertpy.assert_that(return_code).is_equal_to(1) + assertpy.assert_that(output).contains( + b"Could not import module 'feast_foo' while attempting to load class 'RegistryStore'" + ) + # Check with incorrect third-party registry store name (with dots) + with setup_third_party_registry_store_repo("foo.FooRegistryStore") as repo_path: + return_code, output = runner.run_with_output(["apply"], cwd=repo_path) + assertpy.assert_that(return_code).is_equal_to(1) + assertpy.assert_that(output).contains( + b"Could not import class 'FooRegistryStore' from module 'foo'" + ) + # Check with correct third-party registry store name + with setup_third_party_registry_store_repo( + "foo.registry_store.FooRegistryStore" + ) as repo_path: + return_code, output = runner.run_with_output(["apply"], cwd=repo_path) + assertpy.assert_that(return_code).is_equal_to(0) + + +def test_3rd_party_registry_store_with_fs_yaml_override() -> None: + runner = CliRunner() + + fs_yaml_file = "test_fs.yaml" + with setup_third_party_registry_store_repo( + "foo.registry_store.FooRegistryStore", fs_yaml_file_name=fs_yaml_file + ) as repo_path: + return_code, output = runner.run_with_output( + ["--feature-store-yaml", fs_yaml_file, "apply"], cwd=repo_path + ) + assertpy.assert_that(return_code).is_equal_to(0) + + +def test_3rd_party_registry_store_with_fs_yaml_override_by_env_var() -> None: + runner = CliRunner() + + fs_yaml_file = "test_fs.yaml" + with setup_third_party_registry_store_repo( + "foo.registry_store.FooRegistryStore", fs_yaml_file_name=fs_yaml_file + ) as repo_path: + custom_yaml_path = os.path.join(repo_path, fs_yaml_file) + with mock.patch.dict( + "os.environ", {"FEAST_FS_YAML_FILE_PATH": custom_yaml_path}, clear=True + ): + return_code, output = runner.run_with_output(["apply"], cwd=repo_path) + assertpy.assert_that(return_code).is_equal_to(0) + + +@contextmanager +def setup_third_party_provider_repo(provider_name: str): + with tempfile.TemporaryDirectory() as repo_dir_name: + # Construct an example repo in a temporary dir + repo_path = Path(repo_dir_name) + + repo_config = repo_path / "feature_store.yaml" + + repo_config.write_text( + dedent( + f""" + project: foo + registry: data/registry.db + provider: {provider_name} + online_store: + path: data/online_store.db + type: sqlite + offline_store: + type: file + entity_key_serialization_version: 2 + """ + ) + ) + + (repo_path / "foo").mkdir() + repo_example = repo_path / "foo/provider.py" + repo_example.write_text( + (Path(__file__).parents[2] / "foo_provider.py").read_text() + ) + + yield repo_path + + +@contextmanager +def setup_third_party_registry_store_repo( + registry_store: str, fs_yaml_file_name: str = "feature_store.yaml" +): + with tempfile.TemporaryDirectory() as repo_dir_name: + # Construct an example repo in a temporary dir + repo_path = Path(repo_dir_name) + + repo_config = repo_path / fs_yaml_file_name + + repo_config.write_text( + dedent( + f""" + project: foo + registry: + registry_store_type: {registry_store} + path: foobar://foo.bar + provider: local + online_store: + path: data/online_store.db + type: sqlite + offline_store: + type: file + entity_key_serialization_version: 2 + """ + ) + ) + + (repo_path / "foo").mkdir() + repo_example = repo_path / "foo/registry_store.py" + repo_example.write_text( + (Path(__file__).parents[2] / "foo_registry_store.py").read_text() + ) + + yield repo_path + + +def test_cli_configuration(): + """ + Unit test for the 'feast configuration' command + """ + runner = CliRunner() + + with setup_third_party_provider_repo("local") as repo_path: + # Run the 'feast configuration' command + return_code, output = runner.run_with_output(["configuration"], cwd=repo_path) + + # Assertions + assertpy.assert_that(return_code).is_equal_to(0) + assertpy.assert_that(output).contains(b"project: foo") + assertpy.assert_that(output).contains(b"provider: local") + assertpy.assert_that(output).contains(b"type: sqlite") + assertpy.assert_that(output).contains(b"path: data/online_store.db") + assertpy.assert_that(output).contains(b"type: file") + assertpy.assert_that(output).contains(b"entity_key_serialization_version: 2")