Production Deployment Guide¶
Best practices and patterns for deploying recommendation systems to production.
Overview¶
This guide covers: - Model serialization and deployment - Real-time vs batch inference - Performance optimization - Monitoring and maintenance
Model Serialization¶
Saving Models¶
import pickle
# Train your recommender
recommender.train(interactions_ds, users_ds, items_ds)
# Save to file
with open('recommender_model.pkl', 'wb') as f:
pickle.dump(recommender, f)
# Or save to S3
import boto3
s3 = boto3.client('s3')
with open('recommender_model.pkl', 'rb') as f:
s3.upload_fileobj(f, 'my-bucket', 'models/recommender_v1.pkl')
Loading Models¶
import pickle
# Load from file
with open('recommender_model.pkl', 'rb') as f:
recommender = pickle.load(f)
# Or load from S3
import boto3
from io import BytesIO
s3 = boto3.client('s3')
buffer = BytesIO()
s3.download_fileobj('my-bucket', 'models/recommender_v1.pkl', buffer)
buffer.seek(0)
recommender = pickle.load(buffer)
# Make recommendations
recommendations = recommender.recommend(interactions_df, users_df, top_k=5)
Deployment Patterns¶
1. Real-Time API (MXS, Lambda)¶
Use Case: User-facing recommendations with low latency requirements
from flask import Flask, request, jsonify
import pandas as pd
import pickle
app = Flask(__name__)
# Load model once at startup
with open('recommender_model.pkl', 'rb') as f:
recommender = pickle.load(f)
@app.route('/recommend', methods=['POST'])
def recommend():
data = request.json
# Create DataFrames from request
interactions_df = pd.DataFrame({
"USER_ID": [data['user_id']]
})
users_df = pd.DataFrame({
"USER_ID": [data['user_id']],
**data['user_features']
})
# Get recommendations — use recommend_online() for single-user, no join overhead
recommendations = recommender.recommend_online(
interactions=interactions_df,
users=users_df,
top_k=data.get('top_k', 5),
)
return jsonify({
'user_id': data['user_id'],
'recommendations': recommendations[0].tolist()
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
2. Batch Processing (Airflow, Spark)¶
Use Case: Pre-compute recommendations for all users offline
from pyspark.sql import SparkSession
import pickle
def batch_recommendations(spark, model_path, users_path, output_path):
# Load model
with open(model_path, 'rb') as f:
recommender = pickle.load(f)
# Load users (large dataset)
users_df = spark.read.parquet(users_path).toPandas()
# Create interactions
interactions_df = users_df[['USER_ID']].copy()
# Batch recommend (process in chunks if needed)
chunk_size = 10000
all_recommendations = []
for i in range(0, len(users_df), chunk_size):
chunk_interactions = interactions_df.iloc[i:i+chunk_size]
chunk_users = users_df.iloc[i:i+chunk_size]
chunk_recs = recommender.recommend(
interactions=chunk_interactions,
users=chunk_users,
top_k=10
)
all_recommendations.append(chunk_recs)
# Save results
# ... (save to database, S3, etc.)
3. Streaming (Kafka, Kinesis)¶
Use Case: Continuous recommendation updates
from kafka import KafkaConsumer, KafkaProducer
import json
import pickle
# Load model
with open('recommender_model.pkl', 'rb') as f:
recommender = pickle.load(f)
consumer = KafkaConsumer(
'user_events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda m: json.dumps(m).encode('utf-8')
)
for message in consumer:
user_data = message.value
# Create DataFrames
interactions_df = pd.DataFrame({"USER_ID": [user_data['user_id']]})
users_df = pd.DataFrame({
"USER_ID": [user_data['user_id']],
**user_data['features']
})
# Get recommendations
recommendations = recommender.recommend(
interactions=interactions_df,
users=users_df,
top_k=5
)
# Send to output topic
producer.send('recommendations', {
'user_id': user_data['user_id'],
'recommendations': recommendations[0].tolist(),
'timestamp': time.time()
})
Performance Optimization¶
1. Single-User Optimization¶
For real-time APIs with one user per request, use recommend_online() which skips
the pandas join entirely:
# No join overhead — designed for real-time serving
recommendations = recommender.recommend_online(
interactions=single_user_interactions_df,
users=single_user_df,
top_k=5,
)
For scoring only (without ranking), call scorer.score_fast() directly with a
pre-merged single-row DataFrame (no USER_ID):
features_df = pd.DataFrame({"feat1": [18], "feat2": [0]}) # no USER_ID
scores_df = recommender.scorer.score_fast(features_df)
# Returns: DataFrame with item names as columns
Supported scorers: UniversalScorer, MulticlassScorer, MultioutputScorer,
and IndependentScorer. Not supported for embedding-based estimators.
2. Caching Strategies¶
Cache Item Scores¶
from functools import lru_cache
import hashlib
class CachedRecommender:
def __init__(self, recommender):
self.recommender = recommender
self.cache = {}
def recommend(self, user_id, user_features, top_k=5):
# Create cache key from user features
cache_key = hashlib.md5(
json.dumps(user_features, sort_keys=True).encode()
).hexdigest()
if cache_key in self.cache:
return self.cache[cache_key][:top_k]
# Get recommendations
interactions_df = pd.DataFrame({"USER_ID": [user_id]})
users_df = pd.DataFrame({"USER_ID": [user_id], **user_features})
recommendations = self.recommender.recommend(
interactions=interactions_df,
users=users_df,
top_k=top_k
)
# Cache result
self.cache[cache_key] = recommendations[0]
return recommendations[0]
Cache with Redis¶
import redis
import pickle
import hashlib
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_recommendations_cached(user_id, user_features, top_k=5):
# Create cache key
cache_key = f"recs:{user_id}:{hash(str(user_features))}"
# Check cache
cached = redis_client.get(cache_key)
if cached:
return pickle.loads(cached)
# Compute recommendations
recommendations = recommender.recommend(...)
# Cache with TTL (e.g., 1 hour)
redis_client.setex(cache_key, 3600, pickle.dumps(recommendations))
return recommendations
3. Model Optimization¶
Use Lighter Models¶
# Development: Complex model
dev_config = {
"estimator_config": {
"ml_task": "classification",
"xgboost": {"n_estimators": 200, "max_depth": 8}
},
"scorer_type": "universal",
"recommender_type": "ranking"
}
# Production: Faster model
prod_config = {
"estimator_config": {
"ml_task": "classification",
"xgboost": {"n_estimators": 100, "max_depth": 4} # Lighter for low latency
},
"scorer_type": "universal",
"recommender_type": "ranking"
}
Pre-filter Items¶
# Filter items before scoring
def get_recommendations(user_id, user_features, top_k=5):
# Apply business rules first
eligible_items = get_in_stock_items(user_location)
recommender.set_item_subset(eligible_items)
recommendations = recommender.recommend(
interactions=interactions_df,
users=users_df,
top_k=top_k
)
return recommendations
4. Parallel Processing¶
For Independent Scorer¶
from skrec.scorer.independent import IndependentScorer
scorer = IndependentScorer(estimator)
scorer.set_parallel_inference(parallel_inference_status=True, num_cores=4)
recommender = RankingRecommender(scorer)
# Inference now parallelized
Batch Parallelization¶
from concurrent.futures import ThreadPoolExecutor
import numpy as np
def parallel_batch_recommend(users_df, batch_size=1000, n_workers=4):
def process_batch(batch_idx):
start = batch_idx * batch_size
end = start + batch_size
batch_df = users_df.iloc[start:end]
batch_interactions = batch_df[['USER_ID']].copy()
return recommender.recommend(
interactions=batch_interactions,
users=batch_df,
top_k=10
)
n_batches = (len(users_df) + batch_size - 1) // batch_size
with ThreadPoolExecutor(max_workers=n_workers) as executor:
results = list(executor.map(process_batch, range(n_batches)))
return np.vstack(results)
Monitoring¶
1. Latency Metrics¶
import time
from datadog import statsd
def monitored_recommend(user_id, user_features, top_k=5):
start_time = time.time()
try:
recommendations = recommender.recommend(...)
# Log latency
latency = (time.time() - start_time) * 1000 # ms
statsd.timing('recommender.latency', latency)
return recommendations
except Exception as e:
statsd.increment('recommender.errors')
raise
2. Recommendation Quality¶
def log_recommendation_metrics(user_id, recommendations, actual_click):
# Log if top recommendation was clicked
top_rec = recommendations[0]
statsd.increment(
'recommender.top1_accuracy',
1 if top_rec == actual_click else 0
)
# Log if any recommendation was clicked
hit = actual_click in recommendations
statsd.increment('recommender.hit_rate', 1 if hit else 0)
3. Model Drift Detection¶
from scipy.stats import ks_2samp
def check_feature_drift(current_data, baseline_data, feature_name):
current_values = current_data[feature_name]
baseline_values = baseline_data[feature_name]
# KS test for distribution shift
statistic, p_value = ks_2samp(current_values, baseline_values)
if p_value < 0.05:
print(f"WARNING: Drift detected in {feature_name}")
statsd.increment('recommender.feature_drift', tags=[f'feature:{feature_name}'])
return p_value
A/B Testing¶
Simple A/B Test¶
import hashlib
def get_recommender_variant(user_id):
# Consistent hashing for user assignment
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
return 'A' if hash_val % 2 == 0 else 'B'
def recommend_with_ab_test(user_id, user_features, top_k=5):
variant = get_recommender_variant(user_id)
if variant == 'A':
recommender = recommender_a # Control
else:
recommender = recommender_b # Treatment
recommendations = recommender.recommend(...)
# Log variant
statsd.increment('recommender.variant', tags=[f'variant:{variant}'])
return recommendations, variant
Best Practices¶
1. Version Your Models¶
import datetime
model_metadata = {
'version': 'v1.2.0',
'trained_at': datetime.datetime.now().isoformat(),
'training_data_date': '2025-01-01',
'config': config,
'metrics': {
'ndcg@5': 0.78,
'precision@5': 0.65
}
}
# Save metadata with model
with open('model_metadata.json', 'w') as f:
json.dump(model_metadata, f)
2. Graceful Degradation¶
def safe_recommend(user_id, user_features, top_k=5):
try:
# Try ML model
recommendations = recommender.recommend(...)
return recommendations
except Exception as e:
logger.error(f"Recommender failed: {e}")
statsd.increment('recommender.fallback')
# Fallback to popular items
return get_popular_items(top_k)
3. Feature Store Integration¶
class FeatureStoreRecommender:
def __init__(self, recommender, feature_store):
self.recommender = recommender
self.feature_store = feature_store
def recommend(self, user_id, top_k=5):
# Fetch features from feature store
user_features = self.feature_store.get_user_features(user_id)
interactions_df = pd.DataFrame({"USER_ID": [user_id]})
users_df = pd.DataFrame({
"USER_ID": [user_id],
**user_features
})
return self.recommender.recommend(
interactions=interactions_df,
users=users_df,
top_k=top_k
)
4. Canary Deployments¶
def canary_recommend(user_id, user_features, canary_percentage=10):
# Route small percentage to new model
if random.random() < canary_percentage / 100:
recommender = new_model
version = 'canary'
else:
recommender = stable_model
version = 'stable'
recommendations = recommender.recommend(...)
# Log version used
statsd.increment('recommender.version', tags=[f'version:{version}'])
return recommendations
Troubleshooting¶
Issue: High latency in production¶
Solutions:
- Use recommend_online() for single-user requests (no join overhead)
- Implement caching (Redis)
- Pre-filter items
- Use lighter models
- Enable parallel inference
Issue: Out of memory errors¶
Solutions: - Process in smaller batches - Use lighter models - Reduce feature dimensionality - Increase instance memory
Issue: Stale recommendations¶
Solutions: - Implement cache invalidation - Reduce cache TTL - Retrain models more frequently - Use online learning approaches
Next Steps¶
- Orchestration - Config-driven deployment
- HPO Guide - Optimize for production
- [Monitoring Guide] - Production monitoring (coming soon)