Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions sdk/python/feast/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,17 @@ def plan_command(
is_flag=True,
help="Don't validate feature views. Use with caution as this skips important checks.",
)
@click.option(
"--no-progress",
is_flag=True,
help="Disable progress bars during apply operation.",
)
@click.pass_context
def apply_total_command(
ctx: click.Context, skip_source_validation: bool, skip_feature_view_validation: bool
ctx: click.Context,
skip_source_validation: bool,
skip_feature_view_validation: bool,
no_progress: bool,
):
"""
Create or update a feature store deployment
Expand All @@ -282,9 +290,19 @@ def apply_total_command(
cli_check_repo(repo, fs_yaml_file)

repo_config = load_repo_config(repo, fs_yaml_file)

# Set environment variable to disable progress if requested
if no_progress:
import os

os.environ["FEAST_NO_PROGRESS"] = "1"

try:
apply_total(
repo_config, repo, skip_source_validation, skip_feature_view_validation
repo_config,
repo,
skip_source_validation,
skip_feature_view_validation,
)
except FeastProviderLoginError as e:
print(str(e))
Expand Down
185 changes: 185 additions & 0 deletions sdk/python/feast/diff/apply_progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
"""
Enhanced progress tracking infrastructure for feast apply operations.

This module provides the ApplyProgressContext class that manages positioned,
color-coded progress bars during apply operations with fixed-width formatting
for perfect alignment.
"""

from dataclasses import dataclass
from typing import Optional

from tqdm import tqdm

try:
from feast.diff.progress_utils import (
create_positioned_tqdm,
get_color_for_phase,
is_tty_available,
)

_PROGRESS_UTILS_AVAILABLE = True
except ImportError:
# Graceful fallback when progress_utils is not available (e.g., in tests)
_PROGRESS_UTILS_AVAILABLE = False

def create_positioned_tqdm(
position: int,
description: str,
total: int,
color: str = "blue",
postfix: Optional[str] = None,
) -> Optional[tqdm]:
return None

def get_color_for_phase(phase: str) -> str:
return "blue"

def is_tty_available() -> bool:
return False


@dataclass
class ApplyProgressContext:
"""
Enhanced context object for tracking progress during feast apply operations.

This class manages multiple positioned progress bars with fixed-width formatting:
1. Overall progress (position 0) - tracks main phases
2. Phase progress (position 1) - tracks operations within current phase

Features:
- Fixed-width alignment for perfect visual consistency
- Color-coded progress bars by phase
- Position coordination to prevent overlap
- TTY detection for CI/CD compatibility
"""

# Core tracking state
current_phase: str = ""
overall_progress: Optional[tqdm] = None
phase_progress: Optional[tqdm] = None

# Progress tracking
total_phases: int = 3
completed_phases: int = 0
tty_available: bool = True

# Position allocation
OVERALL_POSITION = 0
PHASE_POSITION = 1

def __post_init__(self):
"""Initialize TTY detection after dataclass creation."""
self.tty_available = _PROGRESS_UTILS_AVAILABLE and is_tty_available()

def start_overall_progress(self):
"""Initialize the overall progress bar for apply phases."""
if not self.tty_available:
return

if self.overall_progress is None:
try:
self.overall_progress = create_positioned_tqdm(
position=self.OVERALL_POSITION,
description="Applying changes",
total=self.total_phases,
color=get_color_for_phase("overall"),
)
except (TypeError, AttributeError):
# Handle case where fallback functions don't work as expected
self.overall_progress = None

def start_phase(self, phase_name: str, operations_count: int = 0):
"""
Start tracking a new phase.

Args:
phase_name: Human-readable name of the phase
operations_count: Number of operations in this phase (0 for unknown)
"""
if not self.tty_available:
return

self.current_phase = phase_name

# Close previous phase progress if exists
if self.phase_progress:
try:
self.phase_progress.close()
except (AttributeError, TypeError):
pass
self.phase_progress = None

# Create new phase progress bar if operations are known
if operations_count > 0:
try:
self.phase_progress = create_positioned_tqdm(
position=self.PHASE_POSITION,
description=phase_name,
total=operations_count,
color=get_color_for_phase(phase_name.lower()),
)
except (TypeError, AttributeError):
# Handle case where fallback functions don't work as expected
self.phase_progress = None

def update_phase_progress(self, description: Optional[str] = None):
"""
Update progress within the current phase.

Args:
description: Optional description of current operation
"""
if not self.tty_available or not self.phase_progress:
return

try:
if description:
# Update postfix with current operation
self.phase_progress.set_postfix_str(description)

self.phase_progress.update(1)
except (AttributeError, TypeError):
# Handle case where phase_progress is None or fallback function returned None
pass

def complete_phase(self):
"""Mark current phase as complete and advance overall progress."""
if not self.tty_available:
return

# Close phase progress
if self.phase_progress:
try:
self.phase_progress.close()
except (AttributeError, TypeError):
pass
self.phase_progress = None

# Update overall progress
if self.overall_progress:
try:
self.overall_progress.update(1)
# Update postfix with phase completion
phase_text = f"({self.completed_phases + 1}/{self.total_phases} phases)"
self.overall_progress.set_postfix_str(phase_text)
except (AttributeError, TypeError):
pass

self.completed_phases += 1

def cleanup(self):
"""Clean up all progress bars. Should be called in finally blocks."""
if self.phase_progress:
try:
self.phase_progress.close()
except (AttributeError, TypeError):
pass
self.phase_progress = None
if self.overall_progress:
try:
self.overall_progress.close()
except (AttributeError, TypeError):
pass
self.overall_progress = None
16 changes: 14 additions & 2 deletions sdk/python/feast/diff/infra_diff.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from dataclasses import dataclass
from typing import Generic, Iterable, List, Optional, Tuple, TypeVar
from typing import TYPE_CHECKING, Generic, Iterable, List, Optional, Tuple, TypeVar

if TYPE_CHECKING:
from feast.diff.apply_progress import ApplyProgressContext

from feast.diff.property_diff import PropertyDiff, TransitionType
from feast.infra.infra_object import (
Expand Down Expand Up @@ -33,8 +36,9 @@ class InfraDiff:
def __init__(self):
self.infra_object_diffs = []

def update(self):
def update(self, progress_ctx: Optional["ApplyProgressContext"] = None):
"""Apply the infrastructure changes specified in this object."""

for infra_object_diff in self.infra_object_diffs:
if infra_object_diff.transition_type in [
TransitionType.DELETE,
Expand All @@ -43,6 +47,10 @@ def update(self):
infra_object = InfraObject.from_proto(
infra_object_diff.current_infra_object
)
if progress_ctx:
progress_ctx.update_phase_progress(
f"Tearing down {infra_object_diff.name}"
)
infra_object.teardown()
elif infra_object_diff.transition_type in [
TransitionType.CREATE,
Expand All @@ -51,6 +59,10 @@ def update(self):
infra_object = InfraObject.from_proto(
infra_object_diff.new_infra_object
)
if progress_ctx:
progress_ctx.update_phase_progress(
f"Creating/updating {infra_object_diff.name}"
)
infra_object.update()

def to_string(self):
Expand Down
48 changes: 42 additions & 6 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from datetime import datetime, timedelta
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Expand All @@ -31,6 +32,9 @@
cast,
)

if TYPE_CHECKING:
from feast.diff.apply_progress import ApplyProgressContext

import pandas as pd
import pyarrow as pa
from colorama import Fore, Style
Expand Down Expand Up @@ -726,6 +730,7 @@ def plan(
self,
desired_repo_contents: RepoContents,
skip_feature_view_validation: bool = False,
progress_ctx: Optional["ApplyProgressContext"] = None,
) -> Tuple[RegistryDiff, InfraDiff, Infra]:
"""Dry-run registering objects to metadata store.

Expand Down Expand Up @@ -793,6 +798,9 @@ def plan(
self._registry, self.project, desired_repo_contents
)

if progress_ctx:
progress_ctx.update_phase_progress("Computing infrastructure diff")

# Compute the desired difference between the current infra, as stored in the registry,
# and the desired infra.
self._registry.refresh(project=self.project)
Expand All @@ -807,21 +815,49 @@ def plan(
return registry_diff, infra_diff, new_infra

def _apply_diffs(
self, registry_diff: RegistryDiff, infra_diff: InfraDiff, new_infra: Infra
self,
registry_diff: RegistryDiff,
infra_diff: InfraDiff,
new_infra: Infra,
progress_ctx: Optional["ApplyProgressContext"] = None,
):
"""Applies the given diffs to the metadata store and infrastructure.

Args:
registry_diff: The diff between the current registry and the desired registry.
infra_diff: The diff between the current infra and the desired infra.
new_infra: The desired infra.
progress_ctx: Optional progress context for tracking apply progress.
"""
infra_diff.update()
apply_diff_to_registry(
self._registry, registry_diff, self.project, commit=False
)
try:
# Infrastructure phase
if progress_ctx:
infra_ops_count = len(infra_diff.infra_object_diffs)
progress_ctx.start_phase("Updating infrastructure", infra_ops_count)

infra_diff.update(progress_ctx=progress_ctx)

if progress_ctx:
progress_ctx.complete_phase()
progress_ctx.start_phase("Updating registry", 2)

# Registry phase
apply_diff_to_registry(
self._registry, registry_diff, self.project, commit=False
)

if progress_ctx:
progress_ctx.update_phase_progress("Committing registry changes")

self._registry.update_infra(new_infra, self.project, commit=True)

self._registry.update_infra(new_infra, self.project, commit=True)
if progress_ctx:
progress_ctx.update_phase_progress("Registry update complete")
progress_ctx.complete_phase()
finally:
# Always cleanup progress bars
if progress_ctx:
progress_ctx.cleanup()

def apply(
self,
Expand Down
Loading
Loading