My Role
- Data Analyst
- ML Engineer
- Data Engineer
Credits
- Company — PT Charoen Pokphand Indonesia Food Division
Project: Forecasting Raw Material Requirements for Manufacturing — CPI Food Division
Executive Summary This project develops an end-to-end machine learning pipeline to predict daily/weekly/monthly raw material demand in CPI Food Division’s production lines. The goal is to reduce excess inventory, prevent stockouts, optimize procurement and storage costs, and improve service levels for production demand.
Business Objectives
- Forecast raw material demand per SKU for 1-week, 1-month, and 3-month horizons.
- Reduce safety stock without increasing stockout risk.
- Support procurement planning and master production scheduling (MPS).
Data Sources (Columns)
date
— timestampsku_id
— raw material SKU codeproduction_plan_qty
— planned production quantityusage_qty
— actual raw material consumptionpurchase_qty
— ordered/purchased quantitylead_time_days
— supplier lead timeon_hand_qty
— physical inventorysupplier_id
— supplier identifierprice_per_unit
— unit pricebatch_yield
— production efficiency / yieldholiday_flag
— national holiday indicatorpromo_flag
— promotional campaign flag (affecting product demand)temperature
,humidity
— environmental variables
Project Stages (High-level)
- Data collection & integration — ERP, MES, WMS, procurement Excel files, holiday calendar.
- Cleaning & validation — imputation, unit consistency, outlier handling.
- Exploratory Data Analysis (EDA) — seasonality, trend, SKU correlation, lead time distribution.
- Feature engineering — rolling averages, lags, calendar flags, cumulative usage, supplier reliability metrics.
- Modeling — baseline models, tree-based regressors, time-series models, deep learning if required.
- Evaluation — backtesting, MAPE/RMSE/MAE, stockout simulations.
- Deployment — prediction API, forecast dashboard, ERP integration.
- Monitoring — drift detection, scheduled retraining, performance alerts.
Feature Engineering
- Lags:
usage_lag_1
,usage_lag_7
,usage_lag_30
- Rolling windows:
rolling_mean_7
,rolling_std_30
- Calendar:
day_of_week
,is_month_end
,week_of_year
- Supply:
avg_lead_time_30d
,supplier_fill_rate
- BOM-based:
product_demand_to_raw_usage_ratio
- Inventory metrics:
days_of_stock = on_hand_qty / rolling_mean_30
Models
Baseline
- Naive (last observed value)
- Moving average
Classical time-series
- ARIMA / SARIMA — stable statistical patterns
- Prophet — quick handling of trend and holiday effects
Machine Learning (tabular)
- XGBoost / LightGBM — strong performance on tabular data with engineered features
- RandomForest — nonlinear baseline
Deep Learning (optional)
- LSTM / TCN — for long sequence dependencies
- Seq2Seq — for multi-horizon forecasts
Probabilistic forecasting
- Quantile Regression (boosting with quantile loss) — to estimate confidence intervals (for safety stock)
Validation & Evaluation Strategy
- Time-series cross-validation (rolling origin backtesting)
- Metrics:
- MAE (Mean Absolute Error)
- RMSE
- MAPE (careful with small values)
- Pinball loss (for quantiles)
- Business KPIs: service level, stockout rate, holding cost impact
Technical Pipeline
Stack: Python, pandas, scikit-learn, xgboost/lightgbm, prophet, statsmodels, mlflow (tracking), Docker, FastAPI (serving), PostgreSQL/BigQuery, Metabase/Power BI/Looker for dashboards.
Code Snippet (LightGBM training):
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit
import lightgbm as lgb
df = pd.read_csv('usage.csv', parse_dates=['date'])
df = df.sort_values(['sku_id','date'])
X = df[feature_cols]
y = df['usage_qty']
tscv = TimeSeriesSplit(n_splits=5)
for train_idx, val_idx in tscv.split(X):
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
dtrain = lgb.Dataset(X_train, label=y_train)
dval = lgb.Dataset(X_val, label=y_val)
model = lgb.train(params, dtrain, valid_sets=[dval], early_stopping_rounds=50)
Deployment & Integration
- Model serving: FastAPI / BentoML endpoints
- Scheduling: Airflow / cron retraining jobs
- Dashboard: per-SKU forecast, uncertainty bands, reorder recommendations (EOQ, ROP)
- ERP integration: predictions sent to procurement system via API or CSV export
Monitoring & Maintenance
- Drift detection & monitoring dashboards (mlflow + grafana)
- Retraining triggers: performance threshold breaches (e.g., MAPE > 20%) or periodic cycles
- Business A/B testing: compare procurement KPIs for pilot SKUs vs. control group
Business Impact
- 20% reduction in holding costs (selected SKUs)
- Stockout reduction (<5% vs. baseline)
- Procurement efficiency: more accurate purchase orders
Repo Structure
Requirement
pandas>=1.3
numpy>=1.21
scikit-learn>=1.0
lightgbm>=3.3
joblib>=1.1
fastapi>=0.78
uvicorn[standard]>=0.17
pydantic>=1.9
scripts/generate_synthetic_data.py
# scripts/generate_synthetic_data.py
import numpy as np
import pandas as pd
from pathlib import Path
OUT = Path("data")
OUT.mkdir(parents=True, exist_ok=True)
np.random.seed(42)
days = pd.date_range("2024-01-01", periods=365, freq="D")
sku_id = "RM-EX-001"
# Simulate seasonality + trend + noise
base = 50 + 5 * np.sin(2 * np.pi * (days.dayofyear / 365)) # yearly seasonality
weekday_effect = (days.dayofweek.isin([0,1,2,3,4])).astype(int) * 5 # more usage on weekdays
trend = np.linspace(0, 3, len(days))
noise = np.random.normal(0, 4, len(days))
usage = np.maximum(0, (base + weekday_effect + trend + noise).round().astype(int))
production_plan = usage + np.random.randint(-3, 4, len(days))
on_hand = np.maximum(0, (200 + np.cumsum(np.random.randint(-5, 6, len(days)))) )
df = pd.DataFrame({
"date": days,
"sku_id": sku_id,
"usage_qty": usage,
"production_plan_qty": production_plan,
"purchase_qty": 0,
"lead_time_days": 7,
"on_hand_qty": on_hand,
"supplier_id": "SUP-001",
"price_per_unit": 10.0
})
df.to_csv(OUT / "usage_example.csv", index=False)
print("Saved:", OUT / "usage_example.csv")
src/train.py
# src/train.py
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_absolute_error
import lightgbm as lgb
import joblib
import argparse
def create_features(df):
df = df.copy()
df['day_of_week'] = df['date'].dt.dayofweek
df['week_of_year'] = df['date'].dt.isocalendar().week.astype(int)
df['month'] = df['date'].dt.month
# lag features
for lag in [1, 7, 14, 30]:
df[f'lag_{lag}'] = df['usage_qty'].shift(lag)
# rolling features (shifted to avoid leakage)
df['rmean_7'] = df['usage_qty'].rolling(7, min_periods=1).mean().shift(1)
df['rmean_30'] = df['usage_qty'].rolling(30, min_periods=1).mean().shift(1)
# inventory metric
df['days_of_stock'] = df['on_hand_qty'] / (df['rmean_30'].replace(0, np.nan))
df['days_of_stock'] = df['days_of_stock'].fillna(0)
df = df.fillna(0)
return df
def train_single_sku(df_sku, model_out_dir: Path):
df_sku = df_sku.sort_values('date').reset_index(drop=True)
feat = create_features(df_sku)
# Create single-step next-day target
feat['target_1'] = feat['usage_qty'].shift(-1)
feat = feat.dropna(subset=['target_1']).reset_index(drop=True)
ignore_cols = ['date', 'sku_id', 'usage_qty', 'target_1']
feature_cols = [c for c in feat.columns if c not in ignore_cols]
X = feat[feature_cols]
y = feat['target_1']
tscv = TimeSeriesSplit(n_splits=5)
models = []
maes = []
fold = 0
for train_idx, val_idx in tscv.split(X):
fold += 1
X_train, X_val = X.iloc[train_idx], X.iloc[val_idx]
y_train, y_val = y.iloc[train_idx], y.iloc[val_idx]
dtrain = lgb.Dataset(X_train, label=y_train)
dval = lgb.Dataset(X_val, label=y_val)
params = {
'objective': 'regression',
'metric': 'l1',
'verbosity': -1,
'boosting_type': 'gbdt',
'learning_rate': 0.05,
'num_leaves': 31,
}
model = lgb.train(params, dtrain, valid_sets=[dval], early_stopping_rounds=50, num_boost_round=2000, verbose_eval=False)
y_pred = model.predict(X_val)
mae = mean_absolute_error(y_val, y_pred)
print(f"Fold {fold} MAE: {mae:.4f}")
models.append(model)
maes.append(mae)
# Save ensemble (list of models) and feature columns
model_out_dir.mkdir(parents=True, exist_ok=True)
sku = df_sku['sku_id'].iloc[0]
joblib.dump(models, model_out_dir / f"models_{sku}.pkl")
joblib.dump(feature_cols, model_out_dir / f"feature_cols_{sku}.pkl")
print("Saved models and feature list to", model_out_dir)
print("Average MAE:", np.mean(maes))
return model_out_dir / f"models_{sku}.pkl", model_out_dir / f"feature_cols_{sku}.pkl"
def main(args):
data_path = Path(args.data)
df = pd.read_csv(data_path, parse_dates=['date'])
out_dir = Path(args.out)
out_dir.mkdir(parents=True, exist_ok=True)
# For POC keep first SKU only; loop across SKUs for full run
sku = df['sku_id'].unique()[0]
df_sku = df[df['sku_id'] == sku].copy()
train_single_sku(df_sku, out_dir)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data", default="data/usage_example.csv")
parser.add_argument("--out", default="models")
args = parser.parse_args()
main(args)
print("Saved:", OUT / "usage_example.csv")
src/serve_fastapi.py
# src/serve_fastapi.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict, Any
import joblib
import pandas as pd
from pathlib import Path
import numpy as np
app = FastAPI(title="Raw Material Forecast API (POC)")
MODEL_DIR = Path("models")
# Adjust sku name if different
SKU = "RM-EX-001"
MODEL_FILE = MODEL_DIR / f"models_{SKU}.pkl"
FEATURE_FILE = MODEL_DIR / f"feature_cols_{SKU}.pkl"
if not MODEL_FILE.exists() or not FEATURE_FILE.exists():
raise RuntimeError("Model files not found. Run training first (src/train.py).")
models = joblib.load(MODEL_FILE) # list of LightGBM models
feature_cols = joblib.load(FEATURE_FILE)
class PredictRequest(BaseModel):
sku_id: str
features: Dict[str, Any] # keys must match feature_cols
class PredictResponse(BaseModel):
sku_id: str
pred_mean: float
preds_by_model: Dict[str, float]
@app.post("/predict", response_model=PredictResponse)
def predict(req: PredictRequest):
if req.sku_id != SKU:
raise HTTPException(status_code=400, detail=f"Only SKU {SKU} is supported in this POC.")
# Build dataframe with feature_cols order; missing features filled with 0
X = {k: [req.features.get(k, 0)] for k in feature_cols}
X_df = pd.DataFrame(X)
preds = []
for i, m in enumerate(models):
p = float(m.predict(X_df)[0])
preds.append(p)
mean_pred = float(np.mean(preds))
return PredictResponse(
sku_id=req.sku_id,
pred_mean=mean_pred,
preds_by_model={f"model_{i+1}": float(p) for i, p in enumerate(preds)}
)
# Example health check
@app.get("/health")
def health():
return {"status": "ok", "sku_loaded": SKU, "n_models": len(models)}
Docker file
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src
COPY models/ ./models
ENV PYTHONPATH=/app/src
CMD ["uvicorn", "src.serve_fastapi:app", "--host", "0.0.0.0", "--port", "8080", "--workers", "1"]