diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 7d85aba1ad0..27a84d31213 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -76,6 +76,7 @@ * [Adding a custom provider](how-to-guides/customizing-feast/creating-a-custom-provider.md) * [Adding or reusing tests](how-to-guides/adding-or-reusing-tests.md) * [Starting Feast servers in TLS(SSL) Mode](how-to-guides/starting-feast-servers-tls-mode.md) +* [Importing Features from dbt](how-to-guides/dbt-integration.md) ## Reference diff --git a/docs/how-to-guides/dbt-integration.md b/docs/how-to-guides/dbt-integration.md new file mode 100644 index 00000000000..abaadbf8740 --- /dev/null +++ b/docs/how-to-guides/dbt-integration.md @@ -0,0 +1,381 @@ +# Importing Features from dbt + +{% hint style="warning" %} +**Alpha Feature**: The dbt integration is currently in early development and subject to change. + +**Current Limitations**: +- Supported data sources: BigQuery, Snowflake, and File-based sources only +- Single entity per model +- Manual entity column specification required + +Breaking changes may occur in future releases. +{% endhint %} + +This guide explains how to use Feast's dbt integration to automatically import dbt models as Feast FeatureViews. This enables you to leverage your existing dbt transformations as feature definitions without manual duplication. + +## Overview + +[dbt (data build tool)](https://www.getdbt.com/) is a popular tool for transforming data in your warehouse. Many teams already use dbt to create feature tables. Feast's dbt integration allows you to: + +- **Discover** dbt models tagged for feature engineering +- **Import** model metadata (columns, types, descriptions) as Feast objects +- **Generate** Python code for Entity, DataSource, and FeatureView definitions + +This eliminates the need to manually define Feast objects that mirror your dbt models. + +## Prerequisites + +- A dbt project with compiled artifacts (`target/manifest.json`) +- Feast installed with dbt support: + +```bash +pip install 'feast[dbt]' +``` + +Or install the parser directly: + +```bash +pip install dbt-artifacts-parser +``` + +## Quick Start + +### 1. Tag your dbt models + +In your dbt project, add a `feast` tag to models you want to import: + +{% code title="models/driver_features.sql" %} +```sql +{{ config( + materialized='table', + tags=['feast'] +) }} + +SELECT + driver_id, + event_timestamp, + avg_rating, + total_trips, + is_active +FROM {{ ref('stg_drivers') }} +``` +{% endcode %} + +### 2. Define column types in schema.yml + +Feast uses column metadata from your `schema.yml` to determine feature types: + +{% code title="models/schema.yml" %} +```yaml +version: 2 +models: + - name: driver_features + description: "Driver aggregated features for ML models" + columns: + - name: driver_id + description: "Unique driver identifier" + data_type: STRING + - name: event_timestamp + description: "Feature timestamp" + data_type: TIMESTAMP + - name: avg_rating + description: "Average driver rating" + data_type: FLOAT64 + - name: total_trips + description: "Total completed trips" + data_type: INT64 + - name: is_active + description: "Whether driver is currently active" + data_type: BOOLEAN +``` +{% endcode %} + +### 3. Compile your dbt project + +```bash +cd your_dbt_project +dbt compile +``` + +This generates `target/manifest.json` which Feast will read. + +### 4. List available models + +Use the Feast CLI to discover tagged models: + +```bash +feast dbt list target/manifest.json --tag-filter feast +``` + +Output: +``` +Found 1 model(s) with tag 'feast': + + driver_features + Description: Driver aggregated features for ML models + Columns: driver_id, event_timestamp, avg_rating, total_trips, is_active + Tags: feast +``` + +### 5. Import models as Feast definitions + +Generate a Python file with Feast object definitions: + +```bash +feast dbt import target/manifest.json \ + --entity-column driver_id \ + --data-source-type bigquery \ + --tag-filter feast \ + --output features/driver_features.py +``` + +This generates: + +{% code title="features/driver_features.py" %} +```python +""" +Feast feature definitions generated from dbt models. + +Source: target/manifest.json +Project: my_dbt_project +Generated by: feast dbt import +""" + +from datetime import timedelta + +from feast import Entity, FeatureView, Field +from feast.types import Bool, Float64, Int64 +from feast.infra.offline_stores.bigquery_source import BigQuerySource + + +# Entities +driver_id = Entity( + name="driver_id", + join_keys=["driver_id"], + description="Entity key for dbt models", + tags={'source': 'dbt'}, +) + + +# Data Sources +driver_features_source = BigQuerySource( + name="driver_features_source", + table="my_project.my_dataset.driver_features", + timestamp_field="event_timestamp", + description="Driver aggregated features for ML models", + tags={'dbt.model': 'driver_features', 'dbt.tag.feast': 'true'}, +) + + +# Feature Views +driver_features_fv = FeatureView( + name="driver_features", + entities=[driver_id], + ttl=timedelta(days=1), + schema=[ + Field(name="avg_rating", dtype=Float64, description="Average driver rating"), + Field(name="total_trips", dtype=Int64, description="Total completed trips"), + Field(name="is_active", dtype=Bool, description="Whether driver is currently active"), + ], + online=True, + source=driver_features_source, + description="Driver aggregated features for ML models", + tags={'dbt.model': 'driver_features', 'dbt.tag.feast': 'true'}, +) +``` +{% endcode %} + +## CLI Reference + +### `feast dbt list` + +Discover dbt models available for import. + +```bash +feast dbt list [OPTIONS] +``` + +**Arguments:** +- `manifest_path`: Path to dbt's `manifest.json` file + +**Options:** +- `--tag-filter`, `-t`: Filter models by dbt tag (e.g., `feast`) +- `--model`, `-m`: Filter to specific model name(s) + +### `feast dbt import` + +Import dbt models as Feast object definitions. + +```bash +feast dbt import [OPTIONS] +``` + +**Arguments:** +- `manifest_path`: Path to dbt's `manifest.json` file + +**Options:** + +| Option | Description | Default | +|--------|-------------|---------| +| `--entity-column`, `-e` | Column to use as entity key | (required) | +| `--data-source-type`, `-d` | Data source type: `bigquery`, `snowflake`, `file` | `bigquery` | +| `--tag-filter`, `-t` | Filter models by dbt tag | None | +| `--model`, `-m` | Import specific model(s) only | None | +| `--timestamp-field` | Timestamp column name | `event_timestamp` | +| `--ttl-days` | Feature TTL in days | `1` | +| `--exclude-columns` | Columns to exclude from features | None | +| `--no-online` | Disable online serving | `False` | +| `--output`, `-o` | Output Python file path | None (stdout) | +| `--dry-run` | Preview without generating code | `False` | + +## Type Mapping + +Feast automatically maps dbt/warehouse column types to Feast types: + +| dbt/SQL Type | Feast Type | +|--------------|------------| +| `STRING`, `VARCHAR`, `TEXT` | `String` | +| `INT`, `INTEGER`, `BIGINT` | `Int64` | +| `SMALLINT`, `TINYINT` | `Int32` | +| `FLOAT`, `REAL` | `Float32` | +| `DOUBLE`, `FLOAT64` | `Float64` | +| `BOOLEAN`, `BOOL` | `Bool` | +| `TIMESTAMP`, `DATETIME` | `UnixTimestamp` | +| `BYTES`, `BINARY` | `Bytes` | +| `ARRAY` | `Array(type)` | + +Snowflake `NUMBER(precision, scale)` types are handled specially: +- Scale > 0: `Float64` +- Precision <= 9: `Int32` +- Precision <= 18: `Int64` +- Precision > 18: `Float64` + +## Data Source Configuration + +### BigQuery + +```bash +feast dbt import manifest.json -e user_id -d bigquery -o features.py +``` + +Generates `BigQuerySource` with the full table path from dbt metadata: +```python +BigQuerySource( + table="project.dataset.table_name", + ... +) +``` + +### Snowflake + +```bash +feast dbt import manifest.json -e user_id -d snowflake -o features.py +``` + +Generates `SnowflakeSource` with database, schema, and table: +```python +SnowflakeSource( + database="MY_DB", + schema="MY_SCHEMA", + table="TABLE_NAME", + ... +) +``` + +### File + +```bash +feast dbt import manifest.json -e user_id -d file -o features.py +``` + +Generates `FileSource` with a placeholder path: +```python +FileSource( + path="/data/table_name.parquet", + ... +) +``` + +{% hint style="info" %} +For file sources, update the generated path to point to your actual data files. +{% endhint %} + +## Best Practices + +### 1. Use consistent tagging + +Create a standard tagging convention in your dbt project: + +```yaml +# dbt_project.yml +models: + my_project: + features: + +tags: ['feast'] # All models in features/ get the feast tag +``` + +### 2. Document your columns + +Column descriptions from `schema.yml` are preserved in the generated Feast definitions, making your feature catalog self-documenting. + +### 3. Review before committing + +Use `--dry-run` to preview what will be generated: + +```bash +feast dbt import manifest.json -e user_id -d bigquery --dry-run +``` + +### 4. Version control generated code + +Commit the generated Python files to your repository. This allows you to: +- Track changes to feature definitions over time +- Review dbt-to-Feast mapping in pull requests +- Customize generated code if needed + +### 5. Integrate with CI/CD + +Add dbt import to your CI pipeline: + +```yaml +# .github/workflows/features.yml +- name: Compile dbt + run: dbt compile + +- name: Generate Feast definitions + run: | + feast dbt import target/manifest.json \ + -e user_id -d bigquery -t feast \ + -o feature_repo/features.py + +- name: Apply Feast changes + run: feast apply +``` + +## Limitations + +- **Single entity support**: Currently supports one entity column per import. For multi-entity models, run multiple imports or manually adjust the generated code. +- **No incremental updates**: Each import generates a complete file. Use version control to track changes. +- **Column types required**: Models without `data_type` in schema.yml default to `String` type. + +## Troubleshooting + +### "manifest.json not found" + +Run `dbt compile` or `dbt run` first to generate the manifest file. + +### "No models found with tag" + +Check that your models have the correct tag in their config: + +```sql +{{ config(tags=['feast']) }} +``` + +### "Missing entity column" + +Ensure your dbt model includes the entity column specified with `--entity-column`. Models missing this column are skipped with a warning. + +### "Missing timestamp column" + +By default, Feast looks for `event_timestamp`. Use `--timestamp-field` to specify a different column name. diff --git a/pyproject.toml b/pyproject.toml index acef967e688..b729ca4b63e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -131,10 +131,13 @@ snowflake = [ sqlite_vec = ["sqlite-vec==v0.1.6"] mcp = ["fastapi_mcp"] +dbt = ["dbt-artifacts-parser"] + ci = [ "build", "virtualenv==20.23.0", "cryptography>=43.0,<44", + "dbt-artifacts-parser", "ruff>=0.8.0", "mypy-protobuf>=3.1", "grpcio-tools>=1.56.2,<=1.62.3", diff --git a/sdk/python/feast/cli/cli.py b/sdk/python/feast/cli/cli.py index ab756d47496..6638576bd2b 100644 --- a/sdk/python/feast/cli/cli.py +++ b/sdk/python/feast/cli/cli.py @@ -26,6 +26,7 @@ from feast import utils from feast.cli.data_sources import data_sources_cmd +from feast.cli.dbt_import import dbt_cmd from feast.cli.entities import entities_cmd from feast.cli.feature_services import feature_services_cmd from feast.cli.feature_views import feature_views_cmd @@ -569,6 +570,7 @@ def validate( cli.add_command(serve_offline_command) cli.add_command(serve_registry_command) cli.add_command(serve_transformations_command) +cli.add_command(dbt_cmd) if __name__ == "__main__": cli() diff --git a/sdk/python/feast/cli/dbt_import.py b/sdk/python/feast/cli/dbt_import.py new file mode 100644 index 00000000000..b09fd90ec6d --- /dev/null +++ b/sdk/python/feast/cli/dbt_import.py @@ -0,0 +1,379 @@ +""" +CLI commands for importing dbt models as Feast features. + +This module provides the `feast dbt` command group for integrating +dbt models with Feast feature stores. +""" + +from typing import Any, Dict, List, Optional + +import click +from colorama import Fore, Style + +from feast.repo_operations import cli_check_repo, create_feature_store + + +@click.group(name="dbt") +def dbt_cmd(): + """Import dbt models as Feast features.""" + pass + + +@dbt_cmd.command("import") +@click.option( + "--manifest-path", + "-m", + required=True, + type=click.Path(exists=True), + help="Path to dbt manifest.json file (typically target/manifest.json)", +) +@click.option( + "--entity-column", + "-e", + required=True, + help="Primary key / entity column name (e.g., driver_id, customer_id)", +) +@click.option( + "--data-source-type", + "-d", + type=click.Choice(["bigquery", "snowflake", "file"]), + default="bigquery", + show_default=True, + help="Type of data source to create", +) +@click.option( + "--timestamp-field", + "-t", + default="event_timestamp", + show_default=True, + help="Timestamp field name for point-in-time joins", +) +@click.option( + "--tag", + "tag_filter", + default=None, + help="Only import models with this dbt tag (e.g., --tag feast)", +) +@click.option( + "--model", + "model_names", + multiple=True, + help="Specific model names to import (can be specified multiple times)", +) +@click.option( + "--ttl-days", + type=int, + default=1, + show_default=True, + help="TTL (time-to-live) in days for feature views", +) +@click.option( + "--dry-run", + is_flag=True, + default=False, + help="Preview what would be created without applying changes", +) +@click.option( + "--exclude-columns", + default=None, + help="Comma-separated list of columns to exclude from features", +) +@click.option( + "--output", + "-o", + type=click.Path(), + default=None, + help="Output Python file path (e.g., features.py). Generates code instead of applying to registry.", +) +@click.pass_context +def import_command( + ctx: click.Context, + manifest_path: str, + entity_column: str, + data_source_type: str, + timestamp_field: str, + tag_filter: Optional[str], + model_names: tuple, + ttl_days: int, + dry_run: bool, + exclude_columns: Optional[str], + output: Optional[str], +): + """ + Import dbt models as Feast FeatureViews. + + This command parses a dbt manifest.json file and creates corresponding + Feast DataSource and FeatureView objects. + + Examples: + + # Import all models with 'feast' tag + feast dbt import -m target/manifest.json -e driver_id --tag feast + + # Import specific models + feast dbt import -m target/manifest.json -e customer_id --model orders --model customers + + # Dry run to preview changes + feast dbt import -m target/manifest.json -e driver_id --tag feast --dry-run + + # Generate Python file instead of applying to registry + feast dbt import -m target/manifest.json -e driver_id --tag feast --output features.py + """ + from feast.dbt.mapper import DbtToFeastMapper + from feast.dbt.parser import DbtManifestParser + + # Parse manifest + click.echo(f"{Fore.CYAN}Parsing dbt manifest: {manifest_path}{Style.RESET_ALL}") + + try: + parser = DbtManifestParser(manifest_path) + parser.parse() + except FileNotFoundError as e: + click.echo(f"{Fore.RED}Error: {e}{Style.RESET_ALL}", err=True) + raise SystemExit(1) + except ValueError as e: + click.echo(f"{Fore.RED}Error: {e}{Style.RESET_ALL}", err=True) + raise SystemExit(1) + + # Display manifest info + if parser.dbt_version: + click.echo(f" dbt version: {parser.dbt_version}") + if parser.project_name: + click.echo(f" Project: {parser.project_name}") + + # Get models with filters + model_list: Optional[List[str]] = list(model_names) if model_names else None + models = parser.get_models(model_names=model_list, tag_filter=tag_filter) + + if not models: + click.echo( + f"{Fore.YELLOW}No models found matching the criteria.{Style.RESET_ALL}" + ) + if tag_filter: + click.echo(f" Tag filter: {tag_filter}") + if model_names: + click.echo(f" Model names: {', '.join(model_names)}") + raise SystemExit(0) + + click.echo(f"{Fore.GREEN}Found {len(models)} model(s) to import:{Style.RESET_ALL}") + for model in models: + tags_str = f" [tags: {', '.join(model.tags)}]" if model.tags else "" + click.echo(f" - {model.name} ({len(model.columns)} columns){tags_str}") + + # Parse exclude columns + excluded: Optional[List[str]] = None + if exclude_columns: + excluded = [c.strip() for c in exclude_columns.split(",")] + + # Create mapper + mapper = DbtToFeastMapper( + data_source_type=data_source_type, + timestamp_field=timestamp_field, + ttl_days=ttl_days, + ) + + # Generate Feast objects + click.echo(f"\n{Fore.CYAN}Generating Feast objects...{Style.RESET_ALL}") + + all_objects: List[Any] = [] + entities_created: Dict[str, Any] = {} + + for model in models: + # Validate timestamp field exists + column_names = [c.name for c in model.columns] + if timestamp_field not in column_names: + click.echo( + f"{Fore.YELLOW}Warning: Model '{model.name}' missing timestamp " + f"field '{timestamp_field}'. Skipping.{Style.RESET_ALL}" + ) + continue + + # Validate entity column exists + if entity_column not in column_names: + click.echo( + f"{Fore.YELLOW}Warning: Model '{model.name}' missing entity " + f"column '{entity_column}'. Skipping.{Style.RESET_ALL}" + ) + continue + + # Create or reuse entity + if entity_column not in entities_created: + entity = mapper.create_entity( + name=entity_column, + description="Entity key for dbt models", + ) + entities_created[entity_column] = entity + all_objects.append(entity) + else: + entity = entities_created[entity_column] + + # Create data source + data_source = mapper.create_data_source( + model=model, + timestamp_field=timestamp_field, + ) + all_objects.append(data_source) + + # Create feature view + feature_view = mapper.create_feature_view( + model=model, + source=data_source, + entity_column=entity_column, + entity=entity, + timestamp_field=timestamp_field, + ttl_days=ttl_days, + exclude_columns=excluded, + ) + all_objects.append(feature_view) + + click.echo( + f" {Fore.GREEN}✓{Style.RESET_ALL} {model.name}: " + f"DataSource + FeatureView ({len(feature_view.features)} features)" + ) + + if not all_objects: + click.echo( + f"{Fore.YELLOW}No valid models to import (check warnings above).{Style.RESET_ALL}" + ) + raise SystemExit(0) + + # Filter models that were actually processed (have valid columns) + valid_models = [ + m + for m in models + if timestamp_field in [c.name for c in m.columns] + and entity_column in [c.name for c in m.columns] + ] + + # Summary + click.echo(f"\n{Fore.CYAN}Summary:{Style.RESET_ALL}") + click.echo(f" Entities: {len(entities_created)}") + click.echo(f" DataSources: {len(valid_models)}") + click.echo(f" FeatureViews: {len(valid_models)}") + + # Generate Python file if --output specified + if output: + from feast.dbt.codegen import generate_feast_code + + code = generate_feast_code( + models=valid_models, + entity_column=entity_column, + data_source_type=data_source_type, + timestamp_field=timestamp_field, + ttl_days=ttl_days, + manifest_path=manifest_path, + project_name=parser.project_name or "", + exclude_columns=excluded, + online=True, + ) + + with open(output, "w") as f: + f.write(code) + + click.echo( + f"\n{Fore.GREEN}✓ Generated Feast definitions: {output}{Style.RESET_ALL}" + ) + click.echo(" You can now import this file in your feature_store.yaml repo.") + return + + if dry_run: + click.echo(f"\n{Fore.YELLOW}Dry run - no changes applied.{Style.RESET_ALL}") + click.echo("Remove --dry-run flag to apply changes.") + return + + # Apply to Feast + click.echo(f"\n{Fore.CYAN}Applying to Feast registry...{Style.RESET_ALL}") + + repo = ctx.obj["CHDIR"] + fs_yaml_file = ctx.obj["FS_YAML_FILE"] + cli_check_repo(repo, fs_yaml_file) + store = create_feature_store(ctx) + + store.apply(all_objects) + + click.echo( + f"{Fore.GREEN}✓ Successfully imported {len(valid_models)} dbt model(s) " + f"to Feast project '{store.project}'{Style.RESET_ALL}" + ) + + +@dbt_cmd.command("list") +@click.option( + "--manifest-path", + "-m", + required=True, + type=click.Path(exists=True), + help="Path to dbt manifest.json file", +) +@click.option( + "--tag", + "tag_filter", + default=None, + help="Filter models by dbt tag", +) +@click.option( + "--show-columns", + is_flag=True, + default=False, + help="Show column details for each model", +) +def list_command( + manifest_path: str, + tag_filter: Optional[str], + show_columns: bool, +): + """ + List dbt models available for import. + + Examples: + + # List all models + feast dbt list -m target/manifest.json + + # List models with specific tag + feast dbt list -m target/manifest.json --tag feast + + # Show column details + feast dbt list -m target/manifest.json --show-columns + """ + from feast.dbt.parser import DbtManifestParser + + click.echo(f"{Fore.CYAN}Parsing dbt manifest: {manifest_path}{Style.RESET_ALL}") + + try: + parser = DbtManifestParser(manifest_path) + parser.parse() + except (FileNotFoundError, ValueError) as e: + click.echo(f"{Fore.RED}Error: {e}{Style.RESET_ALL}", err=True) + raise SystemExit(1) + + if parser.dbt_version: + click.echo(f" dbt version: {parser.dbt_version}") + if parser.project_name: + click.echo(f" Project: {parser.project_name}") + + models = parser.get_models(tag_filter=tag_filter) + + if not models: + click.echo(f"{Fore.YELLOW}No models found.{Style.RESET_ALL}") + return + + click.echo(f"\n{Fore.GREEN}Found {len(models)} model(s):{Style.RESET_ALL}\n") + + for model in models: + tags_str = f" [tags: {', '.join(model.tags)}]" if model.tags else "" + click.echo(f"{Fore.CYAN}{model.name}{Style.RESET_ALL}{tags_str}") + click.echo(f" Table: {model.full_table_name}") + if model.description: + desc = model.description[:80] + ( + "..." if len(model.description) > 80 else "" + ) + click.echo(f" Description: {desc}") + + if show_columns and model.columns: + click.echo(f" Columns ({len(model.columns)}):") + for col in model.columns: + type_str = col.data_type or "unknown" + click.echo(f" - {col.name}: {type_str}") + + click.echo() diff --git a/sdk/python/feast/dbt/__init__.py b/sdk/python/feast/dbt/__init__.py new file mode 100644 index 00000000000..7d1312d5a1a --- /dev/null +++ b/sdk/python/feast/dbt/__init__.py @@ -0,0 +1,30 @@ +""" +dbt integration for Feast. + +This module provides functionality to import dbt models as Feast FeatureViews, +enabling automatic generation of Feast objects from dbt manifest.json files. + +Example usage:: + + from feast.dbt import DbtManifestParser, DbtToFeastMapper + parser = DbtManifestParser("target/manifest.json") + parser.parse() + models = parser.get_models(tag_filter="feast") + mapper = DbtToFeastMapper(data_source_type="bigquery") + for model in models: + data_source = mapper.create_data_source(model) + feature_view = mapper.create_feature_view(model, data_source, "driver_id") +""" + +from feast.dbt.codegen import DbtCodeGenerator, generate_feast_code +from feast.dbt.mapper import DbtToFeastMapper +from feast.dbt.parser import DbtColumn, DbtManifestParser, DbtModel + +__all__ = [ + "DbtManifestParser", + "DbtModel", + "DbtColumn", + "DbtToFeastMapper", + "DbtCodeGenerator", + "generate_feast_code", +] diff --git a/sdk/python/feast/dbt/codegen.py b/sdk/python/feast/dbt/codegen.py new file mode 100644 index 00000000000..1c7acfb944c --- /dev/null +++ b/sdk/python/feast/dbt/codegen.py @@ -0,0 +1,408 @@ +""" +Code generator for dbt to Feast imports. + +This module generates Python code files containing Feast object definitions +(Entity, DataSource, FeatureView) from dbt model metadata. +""" + +import logging +from typing import Any, List, Optional, Set + +from jinja2 import BaseLoader, Environment + +from feast.dbt.mapper import map_dbt_type_to_feast_type +from feast.dbt.parser import DbtModel +from feast.types import ( + Array, + Bool, + Bytes, + Float32, + Float64, + Int32, + Int64, + String, + UnixTimestamp, +) + +logger = logging.getLogger(__name__) + +# Template for generating a complete Feast definitions file +FEAST_FILE_TEMPLATE = '''""" +Feast feature definitions generated from dbt models. + +Source: {{ manifest_path }} +Project: {{ project_name }} +Generated by: feast dbt import +""" + +from datetime import timedelta + +from feast import Entity, FeatureView, Field +{% if type_imports %} +from feast.types import {{ type_imports | join(', ') }} +{% endif %} +{% if data_source_type == 'bigquery' %} +from feast.infra.offline_stores.bigquery_source import BigQuerySource +{% elif data_source_type == 'snowflake' %} +from feast.infra.offline_stores.snowflake_source import SnowflakeSource +{% elif data_source_type == 'file' %} +from feast.infra.offline_stores.file_source import FileSource +{% endif %} + + +# ============================================================================= +# Entities +# ============================================================================= + +{% for entity in entities %} +{{ entity.var_name }} = Entity( + name="{{ entity.name }}", + join_keys=["{{ entity.join_key }}"], + description="{{ entity.description }}", + tags={{ entity.tags }}, +) + +{% endfor %} + +# ============================================================================= +# Data Sources +# ============================================================================= + +{% for source in data_sources %} +{% if data_source_type == 'bigquery' %} +{{ source.var_name }} = BigQuerySource( + name="{{ source.name }}", + table="{{ source.table }}", + timestamp_field="{{ source.timestamp_field }}", + description="{{ source.description }}", + tags={{ source.tags }}, +) +{% elif data_source_type == 'snowflake' %} +{{ source.var_name }} = SnowflakeSource( + name="{{ source.name }}", + database="{{ source.database }}", + schema="{{ source.schema }}", + table="{{ source.table }}", + timestamp_field="{{ source.timestamp_field }}", + description="{{ source.description }}", + tags={{ source.tags }}, +) +{% elif data_source_type == 'file' %} +{{ source.var_name }} = FileSource( + name="{{ source.name }}", + path="{{ source.path }}", + timestamp_field="{{ source.timestamp_field }}", + description="{{ source.description }}", + tags={{ source.tags }}, +) +{% endif %} + +{% endfor %} + +# ============================================================================= +# Feature Views +# ============================================================================= + +{% for fv in feature_views %} +{{ fv.var_name }} = FeatureView( + name="{{ fv.name }}", + entities=[{{ fv.entity_var }}], + ttl=timedelta(days={{ fv.ttl_days }}), + schema=[ +{% for field in fv.fields %} + Field(name="{{ field.name }}", dtype={{ field.dtype }}{% if field.description %}, description="{{ field.description }}"{% endif %}), +{% endfor %} + ], + online={{ fv.online }}, + source={{ fv.source_var }}, + description="{{ fv.description }}", + tags={{ fv.tags }}, +) + +{% endfor %} +''' + + +def _get_feast_type_name(feast_type: Any) -> str: + """Get the string name of a Feast type for code generation.""" + if isinstance(feast_type, Array): + # Safely get base_type. Should always exist since Array.__init__ sets it. + # Example: Array(String) -> base_type = String + base_type = getattr(feast_type, "base_type", None) + + if base_type is None: + logger.warning( + "Array type missing 'base_type' attribute. " + "This indicates a bug in Array initialization. Falling back to String." + ) + base_type = String + + base_type_name = _get_feast_type_name(base_type) + return f"Array({base_type_name})" + + # Map type objects to their names. + # Note: ImageBytes and PdfBytes are excluded since dbt manifests only expose + # generic BYTES type without semantic information about binary content. + type_map = { + String: "String", + Int32: "Int32", + Int64: "Int64", + Float32: "Float32", + Float64: "Float64", + Bool: "Bool", + UnixTimestamp: "UnixTimestamp", + Bytes: "Bytes", + } + + return type_map.get(feast_type, "String") + + +def _make_var_name(name: str) -> str: + """Convert a name to a valid Python variable name.""" + # Replace hyphens and spaces with underscores + var_name = name.replace("-", "_").replace(" ", "_") + # Ensure it starts with a letter or underscore + if var_name and var_name[0].isdigit(): + var_name = f"_{var_name}" + return var_name + + +def _escape_description(desc: Optional[str]) -> str: + """Escape a description string for use in Python code.""" + if not desc: + return "" + # Escape quotes and newlines + return desc.replace("\\", "\\\\").replace('"', '\\"').replace("\n", " ") + + +class DbtCodeGenerator: + """ + Generates Python code for Feast objects from dbt models. + + This class creates complete, importable Python files containing + Entity, DataSource, and FeatureView definitions. + + Example:: + + generator = DbtCodeGenerator( + data_source_type="bigquery", + timestamp_field="event_timestamp", + ttl_days=7 + ) + code = generator.generate( + models=models, + entity_column="user_id", + manifest_path="target/manifest.json", + project_name="my_project" + ) + with open("features.py", "w") as f: + f.write(code) + """ + + def __init__( + self, + data_source_type: str = "bigquery", + timestamp_field: str = "event_timestamp", + ttl_days: int = 1, + ): + self.data_source_type = data_source_type.lower() + self.timestamp_field = timestamp_field + self.ttl_days = ttl_days + + # Set up Jinja2 environment + self.env = Environment( + loader=BaseLoader(), + trim_blocks=True, + lstrip_blocks=True, + ) + self.template = self.env.from_string(FEAST_FILE_TEMPLATE) + + def generate( + self, + models: List[DbtModel], + entity_column: str, + manifest_path: str = "", + project_name: str = "", + exclude_columns: Optional[List[str]] = None, + online: bool = True, + ) -> str: + """ + Generate Python code for Feast objects from dbt models. + + Args: + models: List of DbtModel objects to generate code for + entity_column: The entity/primary key column name + manifest_path: Path to the dbt manifest (for documentation) + project_name: dbt project name (for documentation) + exclude_columns: Columns to exclude from features + online: Whether to enable online serving + + Returns: + Generated Python code as a string + """ + excluded = {entity_column, self.timestamp_field} + if exclude_columns: + excluded.update(exclude_columns) + + # Collect all Feast types used for imports + type_imports: Set[str] = set() + + # Prepare entity data + entities = [] + entity_var = _make_var_name(entity_column) + entities.append( + { + "var_name": entity_var, + "name": entity_column, + "join_key": entity_column, + "description": "Entity key for dbt models", + "tags": {"source": "dbt"}, + } + ) + + # Prepare data sources and feature views + data_sources = [] + feature_views = [] + + for model in models: + # Check required columns exist + column_names = [c.name for c in model.columns] + if self.timestamp_field not in column_names: + continue + if entity_column not in column_names: + continue + + # Build tags + tags = {"dbt.model": model.name} + for tag in model.tags: + tags[f"dbt.tag.{tag}"] = "true" + + # Data source + source_var = _make_var_name(f"{model.name}_source") + source_data = { + "var_name": source_var, + "name": f"{model.name}_source", + "timestamp_field": self.timestamp_field, + "description": _escape_description(model.description), + "tags": tags, + } + + if self.data_source_type == "bigquery": + source_data["table"] = model.full_table_name + elif self.data_source_type == "snowflake": + source_data["database"] = model.database + source_data["schema"] = model.schema + source_data["table"] = model.alias + elif self.data_source_type == "file": + source_data["path"] = f"/data/{model.name}.parquet" + + data_sources.append(source_data) + + # Feature view fields + fields = [] + for column in model.columns: + if column.name in excluded: + continue + + feast_type = map_dbt_type_to_feast_type(column.data_type) + type_name = _get_feast_type_name(feast_type) + + # Track base type for imports. For Array types, import both Array and base type. + # Example: Array(Int64) requires imports: Array, Int64 + if isinstance(feast_type, Array): + type_imports.add("Array") + + base_type = getattr(feast_type, "base_type", None) + if base_type is None: + logger.warning( + "Array type missing 'base_type' attribute while generating imports. " + "This indicates a bug in Array initialization. Falling back to String." + ) + base_type = String + + base_type_name = _get_feast_type_name(base_type) + type_imports.add(base_type_name) + else: + type_imports.add(type_name) + + fields.append( + { + "name": column.name, + "dtype": type_name, + "description": _escape_description(column.description), + } + ) + + # Feature view + fv_var = _make_var_name(f"{model.name}_fv") + feature_views.append( + { + "var_name": fv_var, + "name": model.name, + "entity_var": entity_var, + "source_var": source_var, + "ttl_days": self.ttl_days, + "fields": fields, + "online": online, + "description": _escape_description(model.description), + "tags": tags, + } + ) + + # Sort type imports for consistent output + sorted_types = sorted(type_imports) + + # Render template + return self.template.render( + manifest_path=manifest_path, + project_name=project_name, + data_source_type=self.data_source_type, + type_imports=sorted_types, + entities=entities, + data_sources=data_sources, + feature_views=feature_views, + ) + + +def generate_feast_code( + models: List[DbtModel], + entity_column: str, + data_source_type: str = "bigquery", + timestamp_field: str = "event_timestamp", + ttl_days: int = 1, + manifest_path: str = "", + project_name: str = "", + exclude_columns: Optional[List[str]] = None, + online: bool = True, +) -> str: + """ + Convenience function to generate Feast code from dbt models. + + Args: + models: List of DbtModel objects + entity_column: Primary key column name + data_source_type: Type of data source (bigquery, snowflake, file) + timestamp_field: Timestamp column name + ttl_days: TTL in days for feature views + manifest_path: Path to manifest for documentation + project_name: Project name for documentation + exclude_columns: Columns to exclude from features + online: Whether to enable online serving + + Returns: + Generated Python code as a string + """ + generator = DbtCodeGenerator( + data_source_type=data_source_type, + timestamp_field=timestamp_field, + ttl_days=ttl_days, + ) + + return generator.generate( + models=models, + entity_column=entity_column, + manifest_path=manifest_path, + project_name=project_name, + exclude_columns=exclude_columns, + online=online, + ) diff --git a/sdk/python/feast/dbt/mapper.py b/sdk/python/feast/dbt/mapper.py new file mode 100644 index 00000000000..2d6d63fbd32 --- /dev/null +++ b/sdk/python/feast/dbt/mapper.py @@ -0,0 +1,413 @@ +""" +dbt to Feast type and object mapper. + +This module provides functionality to map dbt model metadata to Feast objects +including DataSource, Entity, and FeatureView. +""" + +from datetime import timedelta +from typing import Any, Dict, List, Optional, Union + +from feast.dbt.parser import DbtModel +from feast.entity import Entity +from feast.feature_view import FeatureView +from feast.field import Field +from feast.types import ( + Array, + Bool, + Bytes, + FeastType, + Float32, + Float64, + Int32, + Int64, + String, + UnixTimestamp, +) +from feast.value_type import ValueType + +# Comprehensive mapping from dbt/warehouse types to Feast types +# Covers BigQuery, Snowflake, Redshift, PostgreSQL, and common SQL types +DBT_TO_FEAST_TYPE_MAP: Dict[str, FeastType] = { + # String types + "STRING": String, + "TEXT": String, + "VARCHAR": String, + "CHAR": String, + "CHARACTER": String, + "NVARCHAR": String, + "NCHAR": String, + "CHARACTER VARYING": String, + # Integer types + "INT": Int64, + "INT32": Int32, + "INT64": Int64, + "INTEGER": Int64, + "BIGINT": Int64, + "SMALLINT": Int32, + "TINYINT": Int32, + "BYTEINT": Int32, + "NUMBER": Int64, # Snowflake - default to Int64, precision handling below + "NUMERIC": Int64, + "DECIMAL": Int64, + # Float types + "FLOAT": Float32, + "FLOAT32": Float32, + "FLOAT64": Float64, + "DOUBLE": Float64, + "DOUBLE PRECISION": Float64, + "REAL": Float32, + # Boolean types + "BOOL": Bool, + "BOOLEAN": Bool, + # Timestamp types + "TIMESTAMP": UnixTimestamp, + "TIMESTAMP_NTZ": UnixTimestamp, + "TIMESTAMP_LTZ": UnixTimestamp, + "TIMESTAMP_TZ": UnixTimestamp, + "DATETIME": UnixTimestamp, + "DATE": UnixTimestamp, + "TIME": UnixTimestamp, + # Binary types + "BYTES": Bytes, + "BINARY": Bytes, + "VARBINARY": Bytes, + "BLOB": Bytes, +} + + +def map_dbt_type_to_feast_type(dbt_type: str) -> FeastType: + """ + Map a dbt data type to a Feast type. + + Handles various database type formats including: + - Simple types: STRING, INT64, FLOAT + - Parameterized types: VARCHAR(255), NUMBER(10,2), DECIMAL(18,0) + - Array types: ARRAY, ARRAY + + Args: + dbt_type: The dbt/database data type string + + Returns: + The corresponding Feast type + """ + if not dbt_type: + return String + + # Normalize the type string + normalized = dbt_type.upper().strip() + + # Handle ARRAY types: ARRAY + if normalized.startswith("ARRAY<") and normalized.endswith(">"): + element_type_str = normalized[6:-1].strip() + element_type = map_dbt_type_to_feast_type(element_type_str) + # Array only supports primitive types + valid_array_types = { + String, + Int32, + Int64, + Float32, + Float64, + Bool, + Bytes, + UnixTimestamp, + } + if element_type in valid_array_types: + return Array(element_type) + return Array(String) # Fallback for complex nested types + + # Handle parameterized types: VARCHAR(255), NUMBER(10,2), etc. + # Extract base type by removing parentheses and parameters + base_type = normalized.split("(")[0].strip() + + # Handle Snowflake NUMBER with precision + if base_type == "NUMBER" and "(" in normalized: + try: + # Parse precision and scale: NUMBER(precision, scale) + params = normalized.split("(")[1].rstrip(")").split(",") + precision = int(params[0].strip()) + scale = int(params[1].strip()) if len(params) > 1 else 0 + + if scale > 0: + # Has decimal places, use Float64 + return Float64 + elif precision <= 9: + return Int32 + elif precision <= 18: + return Int64 + else: + # Precision > 18, may exceed Int64 range + return Float64 + except (ValueError, IndexError): + return Int64 + + # Look up in mapping table + if base_type in DBT_TO_FEAST_TYPE_MAP: + return DBT_TO_FEAST_TYPE_MAP[base_type] + + # Default to String for unknown types + return String + + +class DbtToFeastMapper: + """ + Maps dbt models to Feast objects. + + Supports creating DataSource, Entity, and FeatureView objects from + dbt model metadata. + + Example:: + + mapper = DbtToFeastMapper(data_source_type="bigquery") + data_source = mapper.create_data_source(model) + feature_view = mapper.create_feature_view( + model, data_source, entity_column="driver_id" + ) + + Args: + data_source_type: Type of data source ('bigquery', 'snowflake', 'file') + timestamp_field: Default timestamp field name + ttl_days: Default TTL in days for feature views + """ + + def __init__( + self, + data_source_type: str = "bigquery", + timestamp_field: str = "event_timestamp", + ttl_days: int = 1, + ): + self.data_source_type = data_source_type.lower() + self.timestamp_field = timestamp_field + self.ttl_days = ttl_days + + def create_data_source( + self, + model: DbtModel, + timestamp_field: Optional[str] = None, + created_timestamp_column: Optional[str] = None, + ) -> Any: + """ + Create a Feast DataSource from a dbt model. + + Args: + model: The DbtModel to create a DataSource from + timestamp_field: Override the default timestamp field + created_timestamp_column: Column for created timestamp (dedup) + + Returns: + A Feast DataSource (BigQuerySource, SnowflakeSource, or FileSource) + + Raises: + ValueError: If data_source_type is not supported + """ + ts_field = timestamp_field or self.timestamp_field + + # Build tags from dbt metadata + tags = {"dbt.model": model.name} + for tag in model.tags: + tags[f"dbt.tag.{tag}"] = "true" + + if self.data_source_type == "bigquery": + from feast.infra.offline_stores.bigquery_source import BigQuerySource + + return BigQuerySource( + name=f"{model.name}_source", + table=model.full_table_name, + timestamp_field=ts_field, + created_timestamp_column=created_timestamp_column or "", + description=model.description, + tags=tags, + ) + + elif self.data_source_type == "snowflake": + from feast.infra.offline_stores.snowflake_source import SnowflakeSource + + return SnowflakeSource( + name=f"{model.name}_source", + database=model.database, + schema=model.schema, + table=model.alias, + timestamp_field=ts_field, + created_timestamp_column=created_timestamp_column or "", + description=model.description, + tags=tags, + ) + + elif self.data_source_type == "file": + from feast.infra.offline_stores.file_source import FileSource + + # For file sources, use the model name as a placeholder path + return FileSource( + name=f"{model.name}_source", + path=f"/data/{model.name}.parquet", + timestamp_field=ts_field, + created_timestamp_column=created_timestamp_column or "", + description=model.description, + tags=tags, + ) + + else: + raise ValueError( + f"Unsupported data_source_type: {self.data_source_type}. " + f"Supported types: bigquery, snowflake, file" + ) + + def create_entity( + self, + name: str, + join_keys: Optional[List[str]] = None, + description: str = "", + tags: Optional[Dict[str, str]] = None, + value_type: ValueType = ValueType.STRING, + ) -> Entity: + """ + Create a Feast Entity. + + Args: + name: Entity name + join_keys: List of join key column names (defaults to [name]) + description: Entity description + tags: Optional tags + value_type: Value type for the entity (default: STRING) + + Returns: + A Feast Entity + """ + return Entity( + name=name, + join_keys=join_keys or [name], + value_type=value_type, + description=description, + tags=tags or {}, + ) + + def create_feature_view( + self, + model: DbtModel, + source: Any, + entity_column: str, + entity: Optional[Entity] = None, + timestamp_field: Optional[str] = None, + ttl_days: Optional[int] = None, + exclude_columns: Optional[List[str]] = None, + online: bool = True, + ) -> FeatureView: + """ + Create a Feast FeatureView from a dbt model. + + Args: + model: The DbtModel to create a FeatureView from + source: The DataSource for this FeatureView + entity_column: The entity/primary key column name + entity: Optional pre-created Entity (created if not provided) + timestamp_field: Override the default timestamp field + ttl_days: Override the default TTL in days + exclude_columns: Additional columns to exclude from features + online: Whether to enable online serving + + Returns: + A Feast FeatureView + """ + ts_field = timestamp_field or self.timestamp_field + ttl = timedelta(days=ttl_days if ttl_days is not None else self.ttl_days) + + # Columns to exclude from features + excluded = {entity_column, ts_field} + if exclude_columns: + excluded.update(exclude_columns) + + # Create schema from model columns + schema: List[Field] = [] + for column in model.columns: + if column.name not in excluded: + feast_type = map_dbt_type_to_feast_type(column.data_type) + schema.append( + Field( + name=column.name, + dtype=feast_type, + description=column.description, + ) + ) + + # Create entity if not provided + if entity is None: + entity = self.create_entity( + name=entity_column, + description=f"Entity for {model.name}", + ) + + # Build tags from dbt metadata + tags = { + "dbt.model": model.name, + "dbt.unique_id": model.unique_id, + } + for tag in model.tags: + tags[f"dbt.tag.{tag}"] = "true" + + return FeatureView( + name=model.name, + source=source, + schema=schema, + entities=[entity], + ttl=ttl, + online=online, + description=model.description, + tags=tags, + ) + + def create_all_from_model( + self, + model: DbtModel, + entity_column: str, + timestamp_field: Optional[str] = None, + ttl_days: Optional[int] = None, + exclude_columns: Optional[List[str]] = None, + online: bool = True, + ) -> Dict[str, Union[Entity, Any, FeatureView]]: + """ + Create all Feast objects (DataSource, Entity, FeatureView) from a dbt model. + + This is a convenience method that creates all necessary Feast objects + in one call. + + Args: + model: The DbtModel to create objects from + entity_column: The entity/primary key column name + timestamp_field: Override the default timestamp field + ttl_days: Override the default TTL in days + exclude_columns: Additional columns to exclude from features + online: Whether to enable online serving + + Returns: + Dict with keys 'entity', 'data_source', 'feature_view' + """ + # Create entity + entity = self.create_entity( + name=entity_column, + description=f"Entity for {model.name}", + tags={"dbt.model": model.name}, + ) + + # Create data source + data_source = self.create_data_source( + model=model, + timestamp_field=timestamp_field, + ) + + # Create feature view + feature_view = self.create_feature_view( + model=model, + source=data_source, + entity_column=entity_column, + entity=entity, + timestamp_field=timestamp_field, + ttl_days=ttl_days, + exclude_columns=exclude_columns, + online=online, + ) + + return { + "entity": entity, + "data_source": data_source, + "feature_view": feature_view, + } diff --git a/sdk/python/feast/dbt/parser.py b/sdk/python/feast/dbt/parser.py new file mode 100644 index 00000000000..f7d3e587e54 --- /dev/null +++ b/sdk/python/feast/dbt/parser.py @@ -0,0 +1,259 @@ +""" +dbt manifest parser for Feast integration. + +This module provides functionality to parse dbt manifest.json files and extract +model metadata for generating Feast FeatureViews. + +Uses dbt-artifacts-parser for typed parsing of manifest versions v1-v12 (dbt 0.19 through 1.11+). +""" + +import json +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + + +@dataclass +class DbtColumn: + """Represents a column in a dbt model.""" + + name: str + description: str = "" + data_type: str = "STRING" + tags: List[str] = field(default_factory=list) + meta: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class DbtModel: + """Represents a dbt model.""" + + name: str + unique_id: str + database: str + schema: str + alias: str + description: str = "" + columns: List[DbtColumn] = field(default_factory=list) + tags: List[str] = field(default_factory=list) + meta: Dict[str, Any] = field(default_factory=dict) + depends_on: List[str] = field(default_factory=list) + + @property + def full_table_name(self) -> str: + """Returns fully qualified table name (database.schema.table).""" + return f"{self.database}.{self.schema}.{self.alias}" + + +class DbtManifestParser: + """ + Parser for dbt manifest.json files using dbt-artifacts-parser. + + Uses dbt-artifacts-parser for typed parsing of manifest versions v1-v12 + (dbt versions 0.19 through 1.11+). + + Example:: + + parser = DbtManifestParser("target/manifest.json") + parser.parse() + models = parser.get_models(tag_filter="feast") + for model in models: + print(f"Model: {model.name}, Columns: {len(model.columns)}") + + Args: + manifest_path: Path to manifest.json file (typically target/manifest.json) + + Raises: + FileNotFoundError: If manifest.json doesn't exist + ValueError: If manifest.json is invalid JSON + """ + + def __init__(self, manifest_path: str): + """ + Initialize parser. + + Args: + manifest_path: Path to manifest.json file + """ + self.manifest_path = Path(manifest_path) + self._raw_manifest: Optional[Dict[str, Any]] = None + self._parsed_manifest: Optional[Any] = None + + def parse(self) -> None: + """ + Load and parse the manifest.json file using dbt-artifacts-parser. + + Raises: + FileNotFoundError: If manifest.json doesn't exist + ValueError: If manifest.json is invalid JSON + ImportError: If dbt-artifacts-parser is not installed + """ + if not self.manifest_path.exists(): + raise FileNotFoundError( + f"dbt manifest not found at {self.manifest_path}.\n" + f"Run 'dbt compile' or 'dbt run' first.\n" + f"Expected path: /target/manifest.json" + ) + + try: + with open(self.manifest_path, "r") as f: + self._raw_manifest = json.load(f) + except json.JSONDecodeError as e: + raise ValueError( + f"Invalid JSON in manifest: {e}\nTry: dbt clean && dbt compile" + ) + + # Parse using dbt-artifacts-parser for typed access + try: + from dbt_artifacts_parser.parser import parse_manifest + + self._parsed_manifest = parse_manifest(manifest=self._raw_manifest) + except ImportError: + raise ImportError( + "dbt-artifacts-parser is required for dbt integration.\n" + "Install with: pip install 'feast[dbt]' or pip install dbt-artifacts-parser" + ) + + def _extract_column_from_node(self, col_name: str, col_data: Any) -> DbtColumn: + """Extract column info from a parsed node column.""" + return DbtColumn( + name=col_name, + description=getattr(col_data, "description", "") or "", + data_type=getattr(col_data, "data_type", "STRING") or "STRING", + tags=list(getattr(col_data, "tags", []) or []), + meta=dict(getattr(col_data, "meta", {}) or {}), + ) + + def _extract_model_from_node(self, node_id: str, node: Any) -> Optional[DbtModel]: + """Extract DbtModel from a parsed manifest node.""" + # Check resource type + resource_type = getattr(node, "resource_type", None) + if resource_type is None: + if not node_id.startswith("model."): + return None + else: + resource_type_str = ( + resource_type.value + if hasattr(resource_type, "value") + else str(resource_type) + ) + if resource_type_str != "model": + return None + + model_name = getattr(node, "name", "") + node_tags = list(getattr(node, "tags", []) or []) + node_columns = getattr(node, "columns", {}) or {} + depends_on = getattr(node, "depends_on", None) + + if depends_on: + depends_on_nodes = list(getattr(depends_on, "nodes", []) or []) + else: + depends_on_nodes = [] + + # Extract columns + columns = [ + self._extract_column_from_node(col_name, col_data) + for col_name, col_data in node_columns.items() + ] + + # Get schema - dbt-artifacts-parser uses schema_ to avoid Python keyword + schema = getattr(node, "schema_", "") or getattr(node, "schema", "") or "" + + return DbtModel( + name=model_name, + unique_id=node_id, + database=getattr(node, "database", "") or "", + schema=schema, + alias=getattr(node, "alias", model_name) or model_name, + description=getattr(node, "description", "") or "", + columns=columns, + tags=node_tags, + meta=dict(getattr(node, "meta", {}) or {}), + depends_on=depends_on_nodes, + ) + + def get_models( + self, + model_names: Optional[List[str]] = None, + tag_filter: Optional[str] = None, + ) -> List[DbtModel]: + """ + Extract dbt models from manifest. + + Args: + model_names: Optional list of specific model names to extract + tag_filter: Optional tag to filter models by + + Returns: + List of DbtModel objects + + Example:: + + models = parser.get_models(model_names=["driver_stats"]) + models = parser.get_models(tag_filter="feast") + """ + if self._parsed_manifest is None: + self.parse() + + if self._parsed_manifest is None: + return [] + + models = [] + nodes = getattr(self._parsed_manifest, "nodes", {}) or {} + + for node_id, node in nodes.items(): + # Only process models (not tests, seeds, snapshots, etc.) + if not node_id.startswith("model."): + continue + + model = self._extract_model_from_node(node_id, node) + if model is None: + continue + + # Filter by model names if specified + if model_names and model.name not in model_names: + continue + + # Filter by tag if specified + if tag_filter and tag_filter not in model.tags: + continue + + models.append(model) + + return models + + def get_model_by_name(self, model_name: str) -> Optional[DbtModel]: + """ + Get a specific model by name. + + Args: + model_name: Name of the model to retrieve + + Returns: + DbtModel if found, None otherwise + """ + models = self.get_models(model_names=[model_name]) + return models[0] if models else None + + @property + def dbt_version(self) -> Optional[str]: + """Get dbt version from manifest metadata.""" + if self._parsed_manifest is None: + return None + metadata = getattr(self._parsed_manifest, "metadata", None) + if metadata is None: + return None + return getattr(metadata, "dbt_version", None) + + @property + def project_name(self) -> Optional[str]: + """Get project name from manifest metadata.""" + if self._parsed_manifest is None: + return None + metadata = getattr(self._parsed_manifest, "metadata", None) + if metadata is None: + return None + # project_name may not exist in all manifest versions + return getattr(metadata, "project_name", None) or getattr( + metadata, "project_id", None + ) diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 851c363868f..0a08de1c3be 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -853,6 +853,10 @@ db-dtypes==1.5.0 \ # via # google-cloud-bigquery # pandas-gbq +dbt-artifacts-parser==0.12.0 \ + --hash=sha256:3db93df7969c3f22c6fbf75a51b0af4c21b189d8db6f3c54e8471102c775bb0d \ + --hash=sha256:9d1c0ed41926102c1c39fdd780e1a332f58c9b794e94dba0dcf5dfefc847d6ea + # via feast (setup.py) debugpy==1.8.19 \ --hash=sha256:0601708223fe1cd0e27c6cce67a899d92c7d68e73690211e6788a4b0e1903f5b \ --hash=sha256:14035cbdbb1fe4b642babcdcb5935c2da3b1067ac211c5c5a8fdc0bb31adbcaa \ @@ -3763,6 +3767,7 @@ pydantic==2.12.5 \ --hash=sha256:e561593fccf61e8a20fc46dfc2dfe075b8be7d0188df33f221ad1f0139180f9d # via # feast (setup.py) + # dbt-artifacts-parser # docling # docling-core # docling-ibm-models diff --git a/sdk/python/requirements/py3.11-ci-requirements.txt b/sdk/python/requirements/py3.11-ci-requirements.txt index f47ea84f804..40a5f144ffb 100644 --- a/sdk/python/requirements/py3.11-ci-requirements.txt +++ b/sdk/python/requirements/py3.11-ci-requirements.txt @@ -933,6 +933,10 @@ db-dtypes==1.5.0 \ # via # google-cloud-bigquery # pandas-gbq +dbt-artifacts-parser==0.12.0 \ + --hash=sha256:3db93df7969c3f22c6fbf75a51b0af4c21b189d8db6f3c54e8471102c775bb0d \ + --hash=sha256:9d1c0ed41926102c1c39fdd780e1a332f58c9b794e94dba0dcf5dfefc847d6ea + # via feast (setup.py) debugpy==1.8.19 \ --hash=sha256:0601708223fe1cd0e27c6cce67a899d92c7d68e73690211e6788a4b0e1903f5b \ --hash=sha256:14035cbdbb1fe4b642babcdcb5935c2da3b1067ac211c5c5a8fdc0bb31adbcaa \ @@ -3928,6 +3932,7 @@ pydantic==2.12.5 \ # via # feast (setup.py) # codeflare-sdk + # dbt-artifacts-parser # docling # docling-core # docling-ibm-models diff --git a/sdk/python/requirements/py3.12-ci-requirements.txt b/sdk/python/requirements/py3.12-ci-requirements.txt index 3434f34a2ef..ed4014139d0 100644 --- a/sdk/python/requirements/py3.12-ci-requirements.txt +++ b/sdk/python/requirements/py3.12-ci-requirements.txt @@ -929,6 +929,10 @@ db-dtypes==1.5.0 \ # via # google-cloud-bigquery # pandas-gbq +dbt-artifacts-parser==0.12.0 \ + --hash=sha256:3db93df7969c3f22c6fbf75a51b0af4c21b189d8db6f3c54e8471102c775bb0d \ + --hash=sha256:9d1c0ed41926102c1c39fdd780e1a332f58c9b794e94dba0dcf5dfefc847d6ea + # via feast (setup.py) debugpy==1.8.19 \ --hash=sha256:0601708223fe1cd0e27c6cce67a899d92c7d68e73690211e6788a4b0e1903f5b \ --hash=sha256:14035cbdbb1fe4b642babcdcb5935c2da3b1067ac211c5c5a8fdc0bb31adbcaa \ @@ -3918,6 +3922,7 @@ pydantic==2.12.5 \ # via # feast (setup.py) # codeflare-sdk + # dbt-artifacts-parser # docling # docling-core # docling-ibm-models diff --git a/sdk/python/tests/unit/dbt/__init__.py b/sdk/python/tests/unit/dbt/__init__.py new file mode 100644 index 00000000000..8a225265d2c --- /dev/null +++ b/sdk/python/tests/unit/dbt/__init__.py @@ -0,0 +1 @@ +# dbt integration tests diff --git a/sdk/python/tests/unit/dbt/sample_manifest.json b/sdk/python/tests/unit/dbt/sample_manifest.json new file mode 100644 index 00000000000..6a44b9db749 --- /dev/null +++ b/sdk/python/tests/unit/dbt/sample_manifest.json @@ -0,0 +1,170 @@ +{ + "metadata": { + "dbt_version": "1.5.0", + "project_name": "sample_dbt_project", + "generated_at": "2024-01-10T00:00:00Z", + "invocation_id": "12345678-1234-1234-1234-123456789012" + }, + "nodes": { + "model.sample_dbt_project.driver_stats": { + "name": "driver_stats", + "unique_id": "model.sample_dbt_project.driver_stats", + "resource_type": "model", + "database": "feast_demo", + "schema": "public", + "alias": "driver_stats", + "description": "Driver statistics aggregated hourly for ML features", + "columns": { + "driver_id": { + "name": "driver_id", + "description": "Unique driver identifier", + "data_type": "INT64", + "tags": ["entity", "primary_key"], + "meta": {} + }, + "event_timestamp": { + "name": "event_timestamp", + "description": "Timestamp of the event", + "data_type": "TIMESTAMP", + "tags": ["timestamp"], + "meta": {} + }, + "trip_count": { + "name": "trip_count", + "description": "Total number of trips completed", + "data_type": "INT64", + "tags": ["feature"], + "meta": {} + }, + "avg_rating": { + "name": "avg_rating", + "description": "Average driver rating (1-5 scale)", + "data_type": "FLOAT64", + "tags": ["feature"], + "meta": {} + }, + "total_earnings": { + "name": "total_earnings", + "description": "Total earnings in dollars", + "data_type": "FLOAT64", + "tags": ["feature"], + "meta": {} + }, + "is_active": { + "name": "is_active", + "description": "Whether driver is currently active", + "data_type": "BOOLEAN", + "tags": ["feature"], + "meta": {} + } + }, + "tags": ["feast", "ml", "driver"], + "meta": { + "owner": "ml-team@example.com", + "team": "driver-experience" + }, + "depends_on": { + "nodes": ["source.sample_dbt_project.raw_trips"] + } + }, + "model.sample_dbt_project.customer_stats": { + "name": "customer_stats", + "unique_id": "model.sample_dbt_project.customer_stats", + "resource_type": "model", + "database": "feast_demo", + "schema": "public", + "alias": "customer_stats", + "description": "Customer statistics for personalization features", + "columns": { + "customer_id": { + "name": "customer_id", + "description": "Unique customer identifier", + "data_type": "STRING", + "tags": ["entity"], + "meta": {} + }, + "event_timestamp": { + "name": "event_timestamp", + "description": "Event timestamp", + "data_type": "TIMESTAMP", + "tags": [], + "meta": {} + }, + "order_count": { + "name": "order_count", + "description": "Total number of orders placed", + "data_type": "INT64", + "tags": ["feature"], + "meta": {} + }, + "avg_order_value": { + "name": "avg_order_value", + "description": "Average order value in dollars", + "data_type": "FLOAT64", + "tags": ["feature"], + "meta": {} + }, + "preferred_payment": { + "name": "preferred_payment", + "description": "Preferred payment method", + "data_type": "STRING", + "tags": ["feature"], + "meta": {} + } + }, + "tags": ["feast", "ml", "customer"], + "meta": { + "owner": "ml-team@example.com" + }, + "depends_on": { + "nodes": [] + } + }, + "model.sample_dbt_project.location_stats": { + "name": "location_stats", + "unique_id": "model.sample_dbt_project.location_stats", + "resource_type": "model", + "database": "feast_demo", + "schema": "public", + "alias": "location_stats", + "description": "Location-based statistics (no feast tag)", + "columns": { + "location_id": { + "name": "location_id", + "description": "Location identifier", + "data_type": "STRING", + "tags": [], + "meta": {} + }, + "event_timestamp": { + "name": "event_timestamp", + "description": "Event timestamp", + "data_type": "TIMESTAMP", + "tags": [], + "meta": {} + }, + "demand_score": { + "name": "demand_score", + "description": "Demand score for the location", + "data_type": "FLOAT64", + "tags": [], + "meta": {} + } + }, + "tags": ["analytics"], + "meta": {}, + "depends_on": { + "nodes": [] + } + } + }, + "sources": { + "source.sample_dbt_project.raw_trips": { + "name": "raw_trips", + "unique_id": "source.sample_dbt_project.raw_trips", + "source_name": "raw_data", + "schema": "raw", + "identifier": "trips" + } + } +} diff --git a/sdk/python/tests/unit/dbt/test_mapper.py b/sdk/python/tests/unit/dbt/test_mapper.py new file mode 100644 index 00000000000..809c4b43b8e --- /dev/null +++ b/sdk/python/tests/unit/dbt/test_mapper.py @@ -0,0 +1,312 @@ +""" +Unit tests for dbt to Feast mapper. +""" + +from datetime import timedelta + +import pytest + +# Skip all tests in this module if dbt-artifacts-parser is not installed +pytest.importorskip("dbt_artifacts_parser", reason="dbt-artifacts-parser not installed") + +from feast.dbt.mapper import ( + DbtToFeastMapper, + map_dbt_type_to_feast_type, +) +from feast.dbt.parser import DbtColumn, DbtModel +from feast.types import ( + Array, + Bool, + Bytes, + Float32, + Float64, + Int32, + Int64, + String, + UnixTimestamp, +) + + +class TestTypeMapping: + """Tests for dbt to Feast type mapping.""" + + def test_string_types(self): + """Test string type mappings.""" + assert map_dbt_type_to_feast_type("STRING") == String + assert map_dbt_type_to_feast_type("TEXT") == String + assert map_dbt_type_to_feast_type("VARCHAR") == String + assert map_dbt_type_to_feast_type("VARCHAR(255)") == String + assert map_dbt_type_to_feast_type("CHAR") == String + assert map_dbt_type_to_feast_type("NVARCHAR") == String + + def test_integer_types(self): + """Test integer type mappings.""" + assert map_dbt_type_to_feast_type("INT") == Int64 + assert map_dbt_type_to_feast_type("INT64") == Int64 + assert map_dbt_type_to_feast_type("INTEGER") == Int64 + assert map_dbt_type_to_feast_type("BIGINT") == Int64 + assert map_dbt_type_to_feast_type("INT32") == Int32 + assert map_dbt_type_to_feast_type("SMALLINT") == Int32 + assert map_dbt_type_to_feast_type("TINYINT") == Int32 + + def test_float_types(self): + """Test float type mappings.""" + assert map_dbt_type_to_feast_type("FLOAT") == Float32 + assert map_dbt_type_to_feast_type("FLOAT32") == Float32 + assert map_dbt_type_to_feast_type("FLOAT64") == Float64 + assert map_dbt_type_to_feast_type("DOUBLE") == Float64 + assert map_dbt_type_to_feast_type("DOUBLE PRECISION") == Float64 + assert map_dbt_type_to_feast_type("REAL") == Float32 + + def test_boolean_types(self): + """Test boolean type mappings.""" + assert map_dbt_type_to_feast_type("BOOL") == Bool + assert map_dbt_type_to_feast_type("BOOLEAN") == Bool + + def test_timestamp_types(self): + """Test timestamp type mappings.""" + assert map_dbt_type_to_feast_type("TIMESTAMP") == UnixTimestamp + assert map_dbt_type_to_feast_type("TIMESTAMP_NTZ") == UnixTimestamp + assert map_dbt_type_to_feast_type("TIMESTAMP_LTZ") == UnixTimestamp + assert map_dbt_type_to_feast_type("DATETIME") == UnixTimestamp + assert map_dbt_type_to_feast_type("DATE") == UnixTimestamp + + def test_binary_types(self): + """Test binary type mappings.""" + assert map_dbt_type_to_feast_type("BYTES") == Bytes + assert map_dbt_type_to_feast_type("BINARY") == Bytes + assert map_dbt_type_to_feast_type("VARBINARY") == Bytes + assert map_dbt_type_to_feast_type("BLOB") == Bytes + + def test_case_insensitivity(self): + """Test type mapping is case insensitive.""" + assert map_dbt_type_to_feast_type("string") == String + assert map_dbt_type_to_feast_type("String") == String + assert map_dbt_type_to_feast_type("STRING") == String + assert map_dbt_type_to_feast_type("int64") == Int64 + assert map_dbt_type_to_feast_type("INT64") == Int64 + + def test_parameterized_types(self): + """Test parameterized types are handled correctly.""" + assert map_dbt_type_to_feast_type("VARCHAR(255)") == String + assert map_dbt_type_to_feast_type("CHAR(10)") == String + assert map_dbt_type_to_feast_type("DECIMAL(10,2)") == Int64 + + def test_snowflake_number_precision(self): + """Test Snowflake NUMBER type with precision.""" + # Small precision -> Int32 + assert map_dbt_type_to_feast_type("NUMBER(5,0)") == Int32 + assert map_dbt_type_to_feast_type("NUMBER(9,0)") == Int32 + + # Medium precision -> Int64 + assert map_dbt_type_to_feast_type("NUMBER(10,0)") == Int64 + assert map_dbt_type_to_feast_type("NUMBER(18,0)") == Int64 + + # Large precision -> Float64 + assert map_dbt_type_to_feast_type("NUMBER(20,0)") == Float64 + + # With decimal places -> Float64 + assert map_dbt_type_to_feast_type("NUMBER(10,2)") == Float64 + assert map_dbt_type_to_feast_type("NUMBER(5,3)") == Float64 + + def test_array_types(self): + """Test ARRAY type mappings.""" + result = map_dbt_type_to_feast_type("ARRAY") + assert isinstance(result, Array) + + result = map_dbt_type_to_feast_type("ARRAY") + assert isinstance(result, Array) + + result = map_dbt_type_to_feast_type("ARRAY") + assert isinstance(result, Array) + + def test_unknown_type_defaults_to_string(self): + """Test unknown types default to String.""" + assert map_dbt_type_to_feast_type("UNKNOWN_TYPE") == String + assert map_dbt_type_to_feast_type("CUSTOM") == String + + def test_empty_type_defaults_to_string(self): + """Test empty type defaults to String.""" + assert map_dbt_type_to_feast_type("") == String + assert map_dbt_type_to_feast_type(None) == String + + +@pytest.fixture +def sample_model(): + """Create a sample DbtModel for testing.""" + return DbtModel( + name="driver_stats", + unique_id="model.test_project.driver_stats", + database="my_database", + schema="analytics", + alias="driver_stats", + description="Driver statistics", + columns=[ + DbtColumn(name="driver_id", data_type="INT64", description="Driver ID"), + DbtColumn( + name="event_timestamp", + data_type="TIMESTAMP", + description="Event time", + ), + DbtColumn( + name="trip_count", data_type="INT64", description="Number of trips" + ), + DbtColumn( + name="avg_rating", data_type="FLOAT64", description="Average rating" + ), + DbtColumn( + name="is_active", data_type="BOOLEAN", description="Is driver active" + ), + ], + tags=["feast", "ml"], + meta={"owner": "ml-team"}, + ) + + +class TestDbtToFeastMapper: + """Tests for DbtToFeastMapper class.""" + + def test_create_bigquery_data_source(self, sample_model): + """Test creating a BigQuery data source.""" + mapper = DbtToFeastMapper(data_source_type="bigquery") + source = mapper.create_data_source(sample_model) + + assert source.name == "driver_stats_source" + assert source.table == "my_database.analytics.driver_stats" + assert source.timestamp_field == "event_timestamp" + assert "dbt.model" in source.tags + assert source.tags["dbt.model"] == "driver_stats" + + def test_create_snowflake_data_source(self, sample_model): + """Test creating a Snowflake data source.""" + mapper = DbtToFeastMapper(data_source_type="snowflake") + source = mapper.create_data_source(sample_model) + + assert source.name == "driver_stats_source" + assert source.database == "my_database" + assert source.schema == "analytics" + assert source.table == "driver_stats" + assert source.timestamp_field == "event_timestamp" + + def test_create_file_data_source(self, sample_model): + """Test creating a file data source.""" + mapper = DbtToFeastMapper(data_source_type="file") + source = mapper.create_data_source(sample_model) + + assert source.name == "driver_stats_source" + assert "driver_stats.parquet" in source.path + + def test_unsupported_data_source_type(self, sample_model): + """Test error for unsupported data source type.""" + mapper = DbtToFeastMapper(data_source_type="unsupported") + + with pytest.raises(ValueError) as exc_info: + mapper.create_data_source(sample_model) + + assert "Unsupported data_source_type" in str(exc_info.value) + + def test_custom_timestamp_field(self, sample_model): + """Test custom timestamp field.""" + mapper = DbtToFeastMapper( + data_source_type="bigquery", timestamp_field="created_at" + ) + source = mapper.create_data_source(sample_model) + + assert source.timestamp_field == "created_at" + + def test_create_entity(self): + """Test creating an entity.""" + mapper = DbtToFeastMapper() + entity = mapper.create_entity( + name="driver_id", + description="Driver entity", + tags={"source": "dbt"}, + ) + + assert entity.name == "driver_id" + assert entity.join_key == "driver_id" + assert entity.description == "Driver entity" + assert entity.tags == {"source": "dbt"} + + def test_create_feature_view(self, sample_model): + """Test creating a feature view.""" + mapper = DbtToFeastMapper(data_source_type="bigquery", ttl_days=7) + source = mapper.create_data_source(sample_model) + fv = mapper.create_feature_view( + model=sample_model, + source=source, + entity_column="driver_id", + ) + + assert fv.name == "driver_stats" + assert fv.ttl == timedelta(days=7) + assert fv.description == "Driver statistics" + + # Check features (should exclude entity and timestamp) + feature_names = [f.name for f in fv.features] + assert "trip_count" in feature_names + assert "avg_rating" in feature_names + assert "is_active" in feature_names + assert "driver_id" not in feature_names + assert "event_timestamp" not in feature_names + + # Check tags + assert "dbt.model" in fv.tags + assert fv.tags["dbt.model"] == "driver_stats" + assert "dbt.tag.feast" in fv.tags + + def test_create_feature_view_with_exclude(self, sample_model): + """Test excluding columns from feature view.""" + mapper = DbtToFeastMapper(data_source_type="bigquery") + source = mapper.create_data_source(sample_model) + fv = mapper.create_feature_view( + model=sample_model, + source=source, + entity_column="driver_id", + exclude_columns=["is_active"], + ) + + feature_names = [f.name for f in fv.features] + assert "trip_count" in feature_names + assert "avg_rating" in feature_names + assert "is_active" not in feature_names + + def test_create_all_from_model(self, sample_model): + """Test creating all Feast objects from a model.""" + mapper = DbtToFeastMapper(data_source_type="bigquery") + result = mapper.create_all_from_model( + model=sample_model, + entity_column="driver_id", + ) + + assert "entity" in result + assert "data_source" in result + assert "feature_view" in result + + assert result["entity"].name == "driver_id" + assert result["data_source"].name == "driver_stats_source" + assert result["feature_view"].name == "driver_stats" + + def test_feature_type_mapping(self, sample_model): + """Test that feature types are correctly mapped.""" + mapper = DbtToFeastMapper(data_source_type="bigquery") + source = mapper.create_data_source(sample_model) + fv = mapper.create_feature_view( + model=sample_model, + source=source, + entity_column="driver_id", + ) + + # Find specific features and check types + trip_count = next((f for f in fv.features if f.name == "trip_count"), None) + avg_rating = next((f for f in fv.features if f.name == "avg_rating"), None) + is_active = next((f for f in fv.features if f.name == "is_active"), None) + + assert trip_count is not None + assert trip_count.dtype == Int64 + + assert avg_rating is not None + assert avg_rating.dtype == Float64 + + assert is_active is not None + assert is_active.dtype == Bool diff --git a/sdk/python/tests/unit/dbt/test_parser.py b/sdk/python/tests/unit/dbt/test_parser.py new file mode 100644 index 00000000000..2e15f9863ee --- /dev/null +++ b/sdk/python/tests/unit/dbt/test_parser.py @@ -0,0 +1,314 @@ +""" +Unit tests for dbt manifest parser. +""" + +import json + +import pytest + +# Skip all tests in this module if dbt-artifacts-parser is not installed +pytest.importorskip("dbt_artifacts_parser", reason="dbt-artifacts-parser not installed") + +from feast.dbt.parser import DbtColumn, DbtManifestParser, DbtModel + + +def _create_model_node( + name: str, + unique_id: str, + database: str = "my_database", + schema: str = "analytics", + description: str = "", + columns: dict = None, + tags: list = None, + meta: dict = None, + depends_on_nodes: list = None, +): + """Helper to create a complete model node for dbt-artifacts-parser.""" + return { + "name": name, + "unique_id": unique_id, + "resource_type": "model", + "package_name": "test_project", + "path": f"models/{name}.sql", + "original_file_path": f"models/{name}.sql", + "fqn": ["test_project", name], + "alias": name, + "checksum": {"name": "sha256", "checksum": "abc123"}, + "database": database, + "schema": schema, + "description": description or "", + "columns": columns or {}, + "tags": tags or [], + "meta": meta or {}, + "config": { + "enabled": True, + "materialized": "table", + "tags": tags or [], + "meta": meta or {}, + }, + "depends_on": {"nodes": depends_on_nodes or [], "macros": []}, + "refs": [], + "sources": [], + "metrics": [], + "compiled_path": f"target/compiled/test_project/models/{name}.sql", + } + + +def _create_column( + name: str, + data_type: str = "STRING", + description: str = "", + tags: list = None, + meta: dict = None, +): + """Helper to create a column definition.""" + return { + "name": name, + "description": description or "", + "data_type": data_type, + "tags": tags or [], + "meta": meta or {}, + } + + +@pytest.fixture +def sample_manifest(tmp_path): + """Create a sample dbt manifest.json for testing.""" + manifest = { + "metadata": { + "dbt_schema_version": "https://schemas.getdbt.com/dbt/manifest/v9.json", + "dbt_version": "1.5.0", + "generated_at": "2024-01-10T00:00:00Z", + "invocation_id": "test-invocation-id", + "env": {}, + "adapter_type": "bigquery", + }, + "nodes": { + "model.test_project.driver_stats": _create_model_node( + name="driver_stats", + unique_id="model.test_project.driver_stats", + description="Driver statistics aggregated hourly", + columns={ + "driver_id": _create_column( + "driver_id", "INT64", "Unique driver identifier", ["entity"] + ), + "event_timestamp": _create_column( + "event_timestamp", "TIMESTAMP", "Event timestamp" + ), + "trip_count": _create_column( + "trip_count", "INT64", "Number of trips", ["feature"] + ), + "avg_rating": _create_column( + "avg_rating", "FLOAT64", "Average driver rating", ["feature"] + ), + }, + tags=["feast", "ml"], + meta={"owner": "data-team"}, + depends_on_nodes=["source.test_project.raw_trips"], + ), + "model.test_project.customer_stats": _create_model_node( + name="customer_stats", + unique_id="model.test_project.customer_stats", + description="Customer statistics", + columns={ + "customer_id": _create_column( + "customer_id", "STRING", "Unique customer ID" + ), + "event_timestamp": _create_column( + "event_timestamp", "TIMESTAMP", "Event timestamp" + ), + "order_count": _create_column( + "order_count", "INT64", "Total orders" + ), + }, + tags=["ml"], + ), + }, + "sources": {}, + "macros": {}, + "docs": {}, + "exposures": {}, + "metrics": {}, + "groups": {}, + "selectors": {}, + "disabled": {}, + "parent_map": {}, + "child_map": {}, + } + + manifest_path = tmp_path / "manifest.json" + manifest_path.write_text(json.dumps(manifest)) + return manifest_path + + +class TestDbtManifestParser: + """Tests for DbtManifestParser class.""" + + def test_parse_manifest(self, sample_manifest): + """Test parsing a valid manifest file.""" + parser = DbtManifestParser(str(sample_manifest)) + parser.parse() + + assert parser.dbt_version == "1.5.0" + + def test_parse_manifest_not_found(self, tmp_path): + """Test error when manifest file doesn't exist.""" + parser = DbtManifestParser(str(tmp_path / "nonexistent.json")) + + with pytest.raises(FileNotFoundError) as exc_info: + parser.parse() + + assert "dbt manifest not found" in str(exc_info.value) + + def test_parse_manifest_invalid_json(self, tmp_path): + """Test error when manifest is invalid JSON.""" + invalid_path = tmp_path / "invalid.json" + invalid_path.write_text("not valid json {{{") + + parser = DbtManifestParser(str(invalid_path)) + + with pytest.raises(ValueError) as exc_info: + parser.parse() + + assert "Invalid JSON" in str(exc_info.value) + + def test_get_all_models(self, sample_manifest): + """Test getting all models from manifest.""" + parser = DbtManifestParser(str(sample_manifest)) + models = parser.get_models() + + assert len(models) == 2 + model_names = [m.name for m in models] + assert "driver_stats" in model_names + assert "customer_stats" in model_names + + def test_get_models_by_name(self, sample_manifest): + """Test filtering models by name.""" + parser = DbtManifestParser(str(sample_manifest)) + models = parser.get_models(model_names=["driver_stats"]) + + assert len(models) == 1 + assert models[0].name == "driver_stats" + + def test_get_models_by_tag(self, sample_manifest): + """Test filtering models by tag.""" + parser = DbtManifestParser(str(sample_manifest)) + + # Filter by 'feast' tag - only driver_stats has it + feast_models = parser.get_models(tag_filter="feast") + assert len(feast_models) == 1 + assert feast_models[0].name == "driver_stats" + + # Filter by 'ml' tag - both models have it + ml_models = parser.get_models(tag_filter="ml") + assert len(ml_models) == 2 + + def test_model_properties(self, sample_manifest): + """Test DbtModel properties.""" + parser = DbtManifestParser(str(sample_manifest)) + model = parser.get_model_by_name("driver_stats") + + assert model is not None + assert model.name == "driver_stats" + assert model.unique_id == "model.test_project.driver_stats" + assert model.database == "my_database" + assert model.schema == "analytics" + assert model.alias == "driver_stats" + assert model.description == "Driver statistics aggregated hourly" + assert model.full_table_name == "my_database.analytics.driver_stats" + assert "feast" in model.tags + assert "ml" in model.tags + assert len(model.columns) == 4 + + def test_column_properties(self, sample_manifest): + """Test DbtColumn properties.""" + parser = DbtManifestParser(str(sample_manifest)) + model = parser.get_model_by_name("driver_stats") + + # Find the trip_count column + trip_count_col = next( + (c for c in model.columns if c.name == "trip_count"), None + ) + + assert trip_count_col is not None + assert trip_count_col.name == "trip_count" + assert trip_count_col.data_type == "INT64" + assert trip_count_col.description == "Number of trips" + assert "feature" in trip_count_col.tags + + def test_get_model_by_name_not_found(self, sample_manifest): + """Test getting a model that doesn't exist.""" + parser = DbtManifestParser(str(sample_manifest)) + model = parser.get_model_by_name("nonexistent_model") + + assert model is None + + def test_depends_on(self, sample_manifest): + """Test model dependencies are captured.""" + parser = DbtManifestParser(str(sample_manifest)) + model = parser.get_model_by_name("driver_stats") + + assert len(model.depends_on) == 1 + assert "source.test_project.raw_trips" in model.depends_on + + +class TestDbtColumn: + """Tests for DbtColumn dataclass.""" + + def test_column_defaults(self): + """Test DbtColumn default values.""" + col = DbtColumn(name="test_col") + + assert col.name == "test_col" + assert col.description == "" + assert col.data_type == "STRING" + assert col.tags == [] + assert col.meta == {} + + def test_column_with_all_fields(self): + """Test DbtColumn with all fields specified.""" + col = DbtColumn( + name="feature_col", + description="A feature column", + data_type="FLOAT64", + tags=["feature", "numeric"], + meta={"owner": "ml-team"}, + ) + + assert col.name == "feature_col" + assert col.description == "A feature column" + assert col.data_type == "FLOAT64" + assert col.tags == ["feature", "numeric"] + assert col.meta == {"owner": "ml-team"} + + +class TestDbtModel: + """Tests for DbtModel dataclass.""" + + def test_model_full_table_name(self): + """Test full_table_name property.""" + model = DbtModel( + name="test_model", + unique_id="model.proj.test_model", + database="prod_db", + schema="public", + alias="test_model_v2", + ) + + assert model.full_table_name == "prod_db.public.test_model_v2" + + def test_model_defaults(self): + """Test DbtModel default values.""" + model = DbtModel( + name="test", + unique_id="model.proj.test", + database="db", + schema="schema", + alias="test", + ) + + assert model.description == "" + assert model.columns == [] + assert model.tags == [] + assert model.meta == {} + assert model.depends_on == [] diff --git a/setup.py b/setup.py index 32082c2fa0b..a3ac25a40d9 100644 --- a/setup.py +++ b/setup.py @@ -165,6 +165,8 @@ MILVUS_REQUIRED = ["pymilvus>2.5", "milvus-lite==2.4.12", "setuptools>=60,<81"] +DBT_REQUIRED = ["dbt-artifacts-parser>=0.6.0,<1"] + TORCH_REQUIRED = [ "torch>=2.7.0", "torchvision>=0.22.1", @@ -195,6 +197,7 @@ "build", "virtualenv==20.23.0", "cryptography>=43.0,<44", + "dbt-artifacts-parser>=0.6.0,<1", "ruff>=0.8.0", "mypy-protobuf>=3.1", "grpcio-tools>=1.56.2,<=1.62.3", @@ -365,6 +368,7 @@ "qdrant": QDRANT_REQUIRED, "go": GO_REQUIRED, "milvus": MILVUS_REQUIRED, + "dbt": DBT_REQUIRED, "docling": DOCLING_REQUIRED, "pytorch": TORCH_REQUIRED, "nlp": NLP_REQUIRED,