Skip to content
Next Next commit
feat: Enable static artifacts for feature server that can be used in …
…Feature Transformations

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo committed Dec 19, 2025
commit bff56273ab9f8f3ea8e8e17669e6c3244b5c8e72
48 changes: 48 additions & 0 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,50 @@ async def _get_features(
return features


async def load_static_artifacts(app: FastAPI, store):
"""
Load static artifacts (models, lookup tables, etc.) into app.state.

This function can be extended to load various types of static artifacts:
- Small ML models (scikit-learn, small neural networks)
- Lookup tables and reference data
- Configuration parameters
- Pre-computed embeddings

Note: Not recommended for large language models - use dedicated
model serving solutions (vLLM, TGI, etc.) for those.
"""
try:
# Import here to avoid loading heavy dependencies unless needed
import importlib.util
import inspect
from pathlib import Path

# Look for static artifacts loading in the feature repository
# This allows templates and users to define their own artifact loading
repo_path = Path(store.repo_path) if store.repo_path else Path.cwd()
artifacts_file = repo_path / "static_artifacts.py"

if artifacts_file.exists():
# Load and execute custom static artifacts loading
spec = importlib.util.spec_from_file_location("static_artifacts", artifacts_file)
if spec and spec.loader:
artifacts_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(artifacts_module)

# Look for load_artifacts function
if hasattr(artifacts_module, 'load_artifacts'):
load_func = artifacts_module.load_artifacts
if inspect.iscoroutinefunction(load_func):
await load_func(app)
else:
load_func(app)
logger.info("Loaded static artifacts from static_artifacts.py")
except Exception as e:
# Non-fatal error - feature server should still start
logger.warning(f"Failed to load static artifacts: {e}")


def get_app(
store: "feast.FeatureStore",
registry_ttl_sec: int = DEFAULT_FEATURE_SERVER_REGISTRY_TTL,
Expand Down Expand Up @@ -215,8 +259,12 @@ def async_refresh():
active_timer = threading.Timer(registry_ttl_sec, async_refresh)
active_timer.start()


@asynccontextmanager
async def lifespan(app: FastAPI):
# Load static artifacts before initializing store
await load_static_artifacts(app, store)

await store.initialize()
async_refresh()
yield
Expand Down
127 changes: 116 additions & 11 deletions sdk/python/feast/templates/pytorch_nlp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ my-sentiment-project/
├── README.md # This file
└── feature_repo/
├── feature_store.yaml # Feast configuration
├── example_repo.py # Feature definitions
├── example_repo.py # Feature definitions (uses pre-loaded artifacts)
├── static_artifacts.py # Static artifacts loading (models, lookup tables)
├── test_workflow.py # Complete demo workflow
└── data/ # Generated sample data
└── sentiment_data.parquet
Expand Down Expand Up @@ -205,6 +206,109 @@ offline_store:
- ✅ **No external services** - No Redis/cloud required
- ✅ **Perfect for demos** - Easy to share and understand

## 🚀 Static Artifacts Loading

This template demonstrates **static artifacts loading** - a performance optimization that loads models, lookup tables, and other artifacts once at feature server startup instead of on each request.

### What are Static Artifacts?

Static artifacts are pre-loaded resources that remain constant during server operation:
- **Small ML models** (sentiment analysis, classification, small neural networks)
- **Lookup tables and mappings** (label encoders, category mappings)
- **Configuration data** (model parameters, feature mappings)
- **Pre-computed embeddings** (user embeddings, item features)

### Performance Benefits

**Before (Per-Request Loading):**
```python
def sentiment_prediction(inputs):
# ❌ Model loads on every request - slow!
model = pipeline("sentiment-analysis", model="...")
return model(inputs["text"])
```

**After (Startup Loading):**
```python
# ✅ Model loads once at server startup
def sentiment_prediction(inputs):
global _sentiment_model # Pre-loaded model
return _sentiment_model(inputs["text"])
```

**Performance Impact:**
- 🚀 **10-100x faster** inference (no model loading overhead)
- 💾 **Lower memory usage** (shared model across requests)
- ⚡ **Better scalability** (consistent response times)

### How It Works

1. **Startup**: Feast server loads `static_artifacts.py` during initialization
2. **Loading**: `load_artifacts(app)` function stores models in `app.state`
3. **Access**: On-demand feature views access pre-loaded artifacts via global references

```python
# static_artifacts.py - Define what to load
def load_artifacts(app: FastAPI):
app.state.sentiment_model = load_sentiment_model()
app.state.lookup_tables = load_lookup_tables()

# Update global references for easy access
import example_repo
example_repo._sentiment_model = app.state.sentiment_model
example_repo._lookup_tables = app.state.lookup_tables

# example_repo.py - Use pre-loaded artifacts
_sentiment_model = None # Set by static_artifacts.py

def sentiment_prediction(inputs):
global _sentiment_model
if _sentiment_model is not None:
return _sentiment_model(inputs["text"])
else:
return fallback_predictions()
```

### Scope and Limitations

**✅ Great for:**
- Small to medium models (< 1GB)
- Fast-loading models (sentiment analysis, classification)
- Lookup tables and reference data
- Configuration parameters
- Pre-computed embeddings

**❌ Not recommended for:**
- **Large Language Models (LLMs)** - Use dedicated serving solutions like vLLM, TGI, or TensorRT-LLM
- Models requiring GPU clusters
- Frequently updated models
- Models with complex initialization dependencies

**Note:** Feast is optimized for feature serving, not large model inference. For production LLM workloads, use specialized model serving platforms.

### Customizing Static Artifacts

To add your own artifacts, modify `static_artifacts.py`:

```python
def load_custom_embeddings():
"""Load pre-computed user embeddings."""
embeddings_file = Path(__file__).parent / "data" / "user_embeddings.npy"
if embeddings_file.exists():
import numpy as np
return {"embeddings": np.load(embeddings_file)}
return None

def load_artifacts(app: FastAPI):
# Load your custom artifacts
app.state.custom_embeddings = load_custom_embeddings()
app.state.config_params = {"threshold": 0.7, "top_k": 10}

# Make them available to feature views
import example_repo
example_repo._custom_embeddings = app.state.custom_embeddings
```

## 📚 Detailed Usage

### 1. Feature Store Setup
Expand Down Expand Up @@ -409,20 +513,21 @@ def toxicity_detection(inputs: pd.DataFrame) -> pd.DataFrame:
### Performance Optimization

**Current Architecture:**
- Models load on each request (see `sentiment_prediction` function)
- ✅ **Static artifacts loading** at server startup (see `static_artifacts.py`)
- ✅ **Pre-loaded models** cached in memory for fast inference
- CPU-only operation to avoid multiprocessing issues
- SQLite-based storage for fast local access

**TODO: Optimization Opportunities:**
- **Startup-time Model Loading**: Load models once at server startup instead of per-request
- **Custom Provider**: Implement model caching via custom Feast provider
- **Model Serving Layer**: Use dedicated model servers (TorchServe, MLflow) for heavy models
**Implemented Optimizations:**
- **Startup-time Model Loading**: ✅ Models load once at server startup via `static_artifacts.py`
- **Memory-efficient Caching**: ✅ Models stored in `app.state` and accessed via global references
- **Fallback Handling**: ✅ Graceful degradation when artifacts fail to load

**Production Optimizations:**
1. **Model Caching**: Cache loaded models in memory to avoid repeated loading
2. **Batch Inference**: Process multiple texts together for efficiency
3. **Feature Materialization**: Pre-compute expensive features offline
4. **Async Processing**: Use async patterns for real-time serving
**Additional Production Optimizations:**
1. **Batch Inference**: Process multiple texts together for efficiency
2. **Feature Materialization**: Pre-compute expensive features offline
3. **Async Processing**: Use async patterns for real-time serving
4. **Model Serving Layer**: Use dedicated model servers (TorchServe, vLLM) for large models

### Production Configuration Examples

Expand Down
151 changes: 75 additions & 76 deletions sdk/python/feast/templates/pytorch_nlp/feature_repo/example_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Array, Float32, Int64, String

try:
# Import static artifacts helpers (available when feature server loads artifacts)
from static_artifacts import get_sentiment_model, get_lookup_tables
except ImportError:
# Fallback for when static_artifacts.py is not available
get_sentiment_model = None
get_lookup_tables = None

# Global references for static artifacts (set by feature server)
_sentiment_model = None
_lookup_tables = {}

# Configuration
repo_path = Path(__file__).parent
data_path = repo_path / "data"
Expand Down Expand Up @@ -143,100 +155,87 @@
)
def sentiment_prediction(inputs: pd.DataFrame) -> pd.DataFrame:
"""
Real-time sentiment prediction using pre-trained models.
Real-time sentiment prediction using pre-loaded static artifacts.

This function demonstrates how to integrate PyTorch/HuggingFace models
directly into Feast feature views for real-time inference.
This function demonstrates how to use static artifacts (pre-loaded models,
lookup tables) for efficient real-time inference. Models are loaded once
at feature server startup rather than on each request.
"""
try:
import numpy as np
from transformers import pipeline
except ImportError:
# Fallback to dummy predictions if dependencies aren't available
df = pd.DataFrame()
df["predicted_sentiment"] = ["neutral"] * len(inputs)
df["sentiment_confidence"] = np.array([0.5] * len(inputs), dtype=np.float32)
df["positive_prob"] = np.array([0.33] * len(inputs), dtype=np.float32)
df["negative_prob"] = np.array([0.33] * len(inputs), dtype=np.float32)
df["neutral_prob"] = np.array([0.34] * len(inputs), dtype=np.float32)
df["text_embedding"] = [[np.float32(0.0)] * 384] * len(inputs)
return df
# Fallback to dummy predictions if numpy isn't available
import array as np_fallback

# Initialize model (in production, you'd want to cache this)
model_name = "cardiffnlp/twitter-roberta-base-sentiment-latest"
try:
# Use sentiment pipeline for convenience (force CPU to avoid MPS forking issues)
sentiment_pipeline = pipeline(
"sentiment-analysis",
model=model_name,
tokenizer=model_name,
return_all_scores=True,
device="cpu", # Force CPU to avoid MPS forking issues on macOS
)

except Exception:
# Fallback if model loading fails
df = pd.DataFrame()
df["predicted_sentiment"] = ["neutral"] * len(inputs)
df["sentiment_confidence"] = np.array([0.5] * len(inputs), dtype=np.float32)
df["positive_prob"] = np.array([0.33] * len(inputs), dtype=np.float32)
df["negative_prob"] = np.array([0.33] * len(inputs), dtype=np.float32)
df["neutral_prob"] = np.array([0.34] * len(inputs), dtype=np.float32)
df["text_embedding"] = [[np.float32(0.0)] * 384] * len(inputs)
df["sentiment_confidence"] = [0.5] * len(inputs)
df["positive_prob"] = [0.33] * len(inputs)
df["negative_prob"] = [0.33] * len(inputs)
df["neutral_prob"] = [0.34] * len(inputs)
df["text_embedding"] = [[0.0] * 384] * len(inputs)
return df

# Get pre-loaded static artifacts from global references
# These are loaded once at startup via static_artifacts.py
global _sentiment_model, _lookup_tables

sentiment_model = _sentiment_model
lookup_tables = _lookup_tables

# Use lookup table for label mapping (from static artifacts)
label_map = lookup_tables.get("sentiment_labels", {
"LABEL_0": "negative",
"LABEL_1": "neutral",
"LABEL_2": "positive"
})

results = []

for text in inputs["input_text"]:
try:
# Get sentiment predictions
predictions = sentiment_pipeline(text)

# Parse results (RoBERTa model returns LABEL_0, LABEL_1, LABEL_2)
label_map = {
"LABEL_0": "negative",
"LABEL_1": "neutral",
"LABEL_2": "positive",
}

scores = {
label_map.get(pred["label"], pred["label"]): pred["score"]
for pred in predictions
}

# Get best prediction
best_pred = max(predictions, key=lambda x: x["score"])
predicted_sentiment = label_map.get(best_pred["label"], best_pred["label"])
confidence = best_pred["score"]

# Get embeddings (simplified - dummy embeddings for demo)
# In a real implementation, you'd run the model to get embeddings
# For this demo, we'll create a dummy embedding
embedding = np.random.rand(384).tolist() # DistilBERT size

results.append(
{
"predicted_sentiment": predicted_sentiment,
"sentiment_confidence": np.float32(confidence),
"positive_prob": np.float32(scores.get("positive", 0.0)),
"negative_prob": np.float32(scores.get("negative", 0.0)),
"neutral_prob": np.float32(scores.get("neutral", 0.0)),
"text_embedding": [np.float32(x) for x in embedding],
if sentiment_model is not None:
# Use pre-loaded model for prediction
predictions = sentiment_model(text)

# Parse results using static lookup tables
scores = {
label_map.get(pred["label"], pred["label"]): pred["score"]
for pred in predictions
}
)

# Get best prediction
best_pred = max(predictions, key=lambda x: x["score"])
predicted_sentiment = label_map.get(best_pred["label"], best_pred["label"])
confidence = best_pred["score"]
else:
# Fallback when model is not available
predicted_sentiment = "neutral"
confidence = 0.5
scores = {"positive": 0.33, "negative": 0.33, "neutral": 0.34}

# Generate dummy embeddings (in production, use pre-loaded embeddings)
embedding = np.random.rand(384).tolist()

results.append({
"predicted_sentiment": predicted_sentiment,
"sentiment_confidence": np.float32(confidence),
"positive_prob": np.float32(scores.get("positive", 0.0)),
"negative_prob": np.float32(scores.get("negative", 0.0)),
"neutral_prob": np.float32(scores.get("neutral", 0.0)),
"text_embedding": [np.float32(x) for x in embedding],
})

except Exception:
# Fallback for individual text processing errors
results.append(
{
"predicted_sentiment": "neutral",
"sentiment_confidence": np.float32(0.5),
"positive_prob": np.float32(0.33),
"negative_prob": np.float32(0.33),
"neutral_prob": np.float32(0.34),
"text_embedding": [np.float32(0.0)] * 384,
}
)
results.append({
"predicted_sentiment": "neutral",
"sentiment_confidence": np.float32(0.5),
"positive_prob": np.float32(0.33),
"negative_prob": np.float32(0.33),
"neutral_prob": np.float32(0.34),
"text_embedding": [np.float32(0.0)] * 384,
})

return pd.DataFrame(results)

Expand Down
Loading