run_environtment = 'google_colab'
if run_environtment == 'google_colab':
from google.colab import drive
drive.mount('/content/drive')
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Company A is a large fresh-produce supply chain operator. Its fulfillment workflow needs a reliable image classifier to identify potato, onion, tomato, or noise (non-target market scenes).
Build a production-oriented computer vision pipeline that minimizes misrouting risk in automated sorting.
import os
import gc
import json
import random
import hashlib
import warnings
from dataclasses import dataclass
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from PIL import Image, UnidentifiedImageError
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, label_binarize
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import (
accuracy_score, precision_recall_fscore_support, classification_report,
confusion_matrix, roc_auc_score, average_precision_score, log_loss, brier_score_loss
)
from sklearn.calibration import calibration_curve
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import torchvision.transforms as T
from torchvision.models import resnet18, ResNet18_Weights
warnings.filterwarnings('ignore')
sns.set_theme(style='whitegrid', context='notebook')
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(SEED)
# Favor throughput for this training notebook; set deterministic mode only when strict reproducibility is required.
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = True
if hasattr(torch, 'set_float32_matmul_precision'):
torch.set_float32_matmul_precision('high')
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
LOGICAL_CPUS = os.cpu_count() or 2
DATALOADER_WORKERS = min(8, max(2, LOGICAL_CPUS // 2))
PIN_MEMORY = DEVICE.type == 'cuda'
if DEVICE.type == 'cuda':
gpu_props = torch.cuda.get_device_properties(0)
gpu_name = gpu_props.name
gpu_mem_gb = round(gpu_props.total_memory / (1024 ** 3), 2)
else:
gpu_name = None
gpu_mem_gb = 0.0
print({
'device': str(DEVICE),
'torch_version': torch.__version__,
'torch_cuda': torch.version.cuda,
'logical_cpus': LOGICAL_CPUS,
'dataloader_workers': DATALOADER_WORKERS,
'gpu_name': gpu_name,
'gpu_memory_gb': gpu_mem_gb
})
if DEVICE.type != 'cuda':
raise RuntimeError(
'CUDA GPU is required for this notebook. Current setup is CPU-only. '
'Update NVIDIA driver, restart machine, then rerun this cell.'
)
{'device': 'cuda', 'torch_version': '2.10.0+cu128', 'torch_cuda': '12.8', 'logical_cpus': 2, 'dataloader_workers': 2, 'gpu_name': 'Tesla T4', 'gpu_memory_gb': 14.56}
We create a file-level dataframe to make image datasets auditable like tabular datasets. This enables schema checks, missingness checks, duplicate detection, and stratified splitting without leakage.
DATA_ROOT = Path('../data/ninjacart_data').resolve()
if run_environtment == 'google_colab':
DATA_ROOT = Path('/content/drive/MyDrive/Colab Notebooks/DS_Projects/data/CNN/ninjacart_data').resolve()
TRAIN_ROOT = DATA_ROOT / 'train'
TEST_ROOT = DATA_ROOT / 'test'
MODEL_DIR = Path('./models').resolve()
MODEL_DIR.mkdir(parents=True, exist_ok=True)
assert TRAIN_ROOT.exists(), f'Missing train folder: {TRAIN_ROOT}'
assert TEST_ROOT.exists(), f'Missing test folder: {TEST_ROOT}'
VALID_EXT = {'.jpg', '.jpeg', '.png', '.bmp', '.webp'}
def hash_file(path: Path, block_size: int = 65536) -> str:
hasher = hashlib.md5()
with path.open('rb') as f:
while True:
data = f.read(block_size)
if not data:
break
hasher.update(data)
return hasher.hexdigest()
def collect_image_records(split_root: Path, split_name: str) -> pd.DataFrame:
records = []
for class_dir in sorted([p for p in split_root.iterdir() if p.is_dir()]):
label = class_dir.name.strip().lower()
for path in class_dir.rglob('*'):
if not path.is_file() or path.suffix.lower() not in VALID_EXT:
continue
rec = {
'split': split_name,
'label_raw': class_dir.name,
'label': label,
'path': str(path),
'filename': path.name,
'ext': path.suffix.lower(),
'file_size_kb': round(path.stat().st_size / 1024, 3),
}
try:
with Image.open(path) as im:
rec['width'], rec['height'] = im.size
rec['mode'] = im.mode
rec['channels'] = len(im.getbands())
rec['is_corrupt'] = False
except (UnidentifiedImageError, OSError):
rec['width'], rec['height'] = np.nan, np.nan
rec['mode'], rec['channels'] = 'UNKNOWN', np.nan
rec['is_corrupt'] = True
records.append(rec)
df = pd.DataFrame(records)
if not df.empty:
# Content hash lets us identify exact duplicate images.
df['file_hash'] = [hash_file(Path(p)) for p in df['path']]
df['aspect_ratio'] = df['width'] / df['height']
df['pixels'] = df['width'] * df['height']
return df
train_df = collect_image_records(TRAIN_ROOT, 'train')
test_df = collect_image_records(TEST_ROOT, 'test')
data_df = pd.concat([train_df, test_df], ignore_index=True)
print('train shape:', train_df.shape)
print('test shape:', test_df.shape)
print('all shape:', data_df.shape)
data_df.head()
train shape: (3135, 15) test shape: (351, 15) all shape: (3486, 15)
| split | label_raw | label | path | filename | ext | file_size_kb | width | height | mode | channels | is_corrupt | file_hash | aspect_ratio | pixels | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | train | indian market | indian market | /content/drive/MyDrive/Colab Notebooks/DS_Proj... | market11030.jpeg | .jpeg | 8.646 | 259 | 194 | RGB | 3 | False | 2b563fc3c42aa97ccc726202783dfae3 | 1.335052 | 50246 |
| 1 | train | indian market | indian market | /content/drive/MyDrive/Colab Notebooks/DS_Proj... | market11015.jpeg | .jpeg | 3.764 | 100 | 100 | RGB | 3 | False | 5caf4d05f8f8e49d3bb3128d0dcecd09 | 1.000000 | 10000 |
| 2 | train | indian market | indian market | /content/drive/MyDrive/Colab Notebooks/DS_Proj... | istockphoto-1186670497-612x612.jpg | .jpg | 40.407 | 612 | 408 | RGB | 3 | False | 44417fa54a5307e3e21c47e0decb5672 | 1.500000 | 249696 |
| 3 | train | indian market | indian market | /content/drive/MyDrive/Colab Notebooks/DS_Proj... | market11038.jpeg | .jpeg | 11.729 | 259 | 194 | RGB | 3 | False | 4304cbe6db0c78f15707ca763dc07781 | 1.335052 | 50246 |
| 4 | train | indian market | indian market | /content/drive/MyDrive/Colab Notebooks/DS_Proj... | market11043.jpeg | .jpeg | 4.211 | 100 | 100 | RGB | 3 | False | f1f528b7293ad257253764d0de9cac3a | 1.000000 | 10000 |
schema_df = pd.DataFrame({
'column': data_df.columns,
'dtype': [str(data_df[c].dtype) for c in data_df.columns],
'missing_count': [int(data_df[c].isna().sum()) for c in data_df.columns],
'missing_pct': [round(float(data_df[c].isna().mean() * 100), 3) for c in data_df.columns],
'n_unique': [int(data_df[c].nunique(dropna=True)) for c in data_df.columns],
})
duplicate_rows = int(data_df.duplicated(subset=['split', 'path']).sum())
duplicate_hashes = int(data_df.duplicated(subset=['split', 'file_hash']).sum())
print('Duplicate rows by path:', duplicate_rows)
print('Duplicate images by hash within split:', duplicate_hashes)
schema_df
Duplicate rows by path: 0 Duplicate images by hash within split: 43
| column | dtype | missing_count | missing_pct | n_unique | |
|---|---|---|---|---|---|
| 0 | split | object | 0 | 0.0 | 2 |
| 1 | label_raw | object | 0 | 0.0 | 4 |
| 2 | label | object | 0 | 0.0 | 4 |
| 3 | path | object | 0 | 0.0 | 3486 |
| 4 | filename | object | 0 | 0.0 | 3486 |
| 5 | ext | object | 0 | 0.0 | 3 |
| 6 | file_size_kb | float64 | 0 | 0.0 | 3329 |
| 7 | width | int64 | 0 | 0.0 | 277 |
| 8 | height | int64 | 0 | 0.0 | 345 |
| 9 | mode | object | 0 | 0.0 | 3 |
| 10 | channels | int64 | 0 | 0.0 | 2 |
| 11 | is_corrupt | bool | 0 | 0.0 | 1 |
| 12 | file_hash | object | 0 | 0.0 | 3440 |
| 13 | aspect_ratio | float64 | 0 | 0.0 | 469 |
| 14 | pixels | int64 | 0 | 0.0 | 488 |
The visuals below are decision-oriented: class balance, geometry distribution, skew/outlier behavior, and class separability clues from color statistics.
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
sns.countplot(data=train_df, x='label', order=sorted(train_df['label'].unique()), ax=axes[0], palette='viridis')
axes[0].set_title('Train Class Distribution')
axes[0].tick_params(axis='x', rotation=20)
sns.countplot(data=test_df, x='label', order=sorted(test_df['label'].unique()), ax=axes[1], palette='magma')
axes[1].set_title('Test Class Distribution')
axes[1].tick_params(axis='x', rotation=20)
plt.tight_layout()
plt.show()
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
sns.histplot(train_df['width'].dropna(), bins=40, kde=True, ax=axes[0, 0], color='teal')
axes[0, 0].set_title('Width Distribution (Train)')
sns.histplot(train_df['height'].dropna(), bins=40, kde=True, ax=axes[0, 1], color='slateblue')
axes[0, 1].set_title('Height Distribution (Train)')
sns.boxplot(data=train_df[['width', 'height', 'pixels']].dropna(), ax=axes[1, 0], palette='Set2')
axes[1, 0].set_title('Outlier Scan for Size Features')
sns.histplot(train_df['aspect_ratio'].dropna(), bins=40, kde=True, ax=axes[1, 1], color='orange')
axes[1, 1].set_title('Aspect Ratio Distribution')
plt.tight_layout()
plt.show()
skew_report = train_df[['width', 'height', 'pixels', 'aspect_ratio', 'file_size_kb']].skew(numeric_only=True).sort_values(ascending=False)
skew_report.to_frame('skewness')
| skewness | |
|---|---|
| pixels | 29.142993 |
| width | 7.070185 |
| height | 5.507461 |
| file_size_kb | 3.258502 |
| aspect_ratio | 0.701000 |
def sampled_rgb_stats(df: pd.DataFrame, max_per_class: int = 120) -> pd.DataFrame:
rows = []
for label, group in df.groupby('label'):
sample = group.sample(min(max_per_class, len(group)), random_state=SEED)
for p in sample['path']:
try:
arr = np.array(Image.open(p).convert('RGB'), dtype=np.float32) / 255.0
rows.append({
'label': label,
'r_mean': float(arr[..., 0].mean()),
'g_mean': float(arr[..., 1].mean()),
'b_mean': float(arr[..., 2].mean()),
'brightness': float(arr.mean()),
'std_intensity': float(arr.std()),
})
except Exception:
continue
return pd.DataFrame(rows)
rgb_df = sampled_rgb_stats(train_df)
print('RGB sample size:', rgb_df.shape)
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
sns.boxplot(data=rgb_df, x='label', y='brightness', ax=axes[0], palette='coolwarm')
axes[0].set_title('Brightness by Class')
axes[0].tick_params(axis='x', rotation=20)
sns.scatterplot(data=rgb_df, x='r_mean', y='g_mean', hue='label', alpha=0.6, ax=axes[1], palette='tab10')
axes[1].set_title('Multivariate Color Signal: R vs G Mean')
plt.tight_layout()
plt.show()
rgb_df.groupby('label')[['r_mean', 'g_mean', 'b_mean', 'brightness', 'std_intensity']].mean().round(4)
RGB sample size: (480, 6)
| r_mean | g_mean | b_mean | brightness | std_intensity | |
|---|---|---|---|---|---|
| label | |||||
| indian market | 0.5056 | 0.4467 | 0.3867 | 0.4463 | 0.2458 |
| onion | 0.6422 | 0.5151 | 0.4816 | 0.5463 | 0.2544 |
| potato | 0.6575 | 0.5549 | 0.4373 | 0.5499 | 0.2507 |
| tomato | 0.4029 | 0.4545 | 0.3329 | 0.3968 | 0.2142 |
def show_samples(df: pd.DataFrame, per_class: int = 5):
labels = sorted(df['label'].unique())
fig, axes = plt.subplots(len(labels), per_class, figsize=(3 * per_class, 3 * len(labels)))
if len(labels) == 1:
axes = np.array([axes])
for i, label in enumerate(labels):
sample_paths = df[df['label'] == label].sample(min(per_class, (df['label'] == label).sum()), random_state=SEED)['path'].tolist()
for j in range(per_class):
ax = axes[i, j]
ax.axis('off')
if j < len(sample_paths):
img = Image.open(sample_paths[j]).convert('RGB')
ax.imshow(img)
if j == 0:
ax.set_title(label, loc='left', fontsize=11, fontweight='bold')
plt.suptitle('Representative Training Images by Class', y=1.01, fontsize=14)
plt.tight_layout()
show_samples(train_df, per_class=5)
label_encoder = LabelEncoder()
label_encoder.fit(train_df['label'])
class_names = list(label_encoder.classes_)
num_classes = len(class_names)
print('Classes:', class_names)
train_idx, val_idx = train_test_split(
np.arange(len(train_df)),
test_size=0.2,
stratify=train_df['label'],
random_state=SEED
)
train_split_df = train_df.iloc[train_idx].reset_index(drop=True)
val_split_df = train_df.iloc[val_idx].reset_index(drop=True)
print('train split:', train_split_df.shape, train_split_df['label'].value_counts().to_dict())
print('val split:', val_split_df.shape, val_split_df['label'].value_counts().to_dict())
print('test split:', test_df.shape, test_df['label'].value_counts().to_dict())
Classes: ['indian market', 'onion', 'potato', 'tomato']
train split: (2508, 15) {'potato': 719, 'onion': 679, 'tomato': 631, 'indian market': 479}
val split: (627, 15) {'potato': 179, 'onion': 170, 'tomato': 158, 'indian market': 120}
test split: (351, 15) {'tomato': 106, 'onion': 83, 'indian market': 81, 'potato': 81}
IMG_SIZE = 224
if DEVICE.type == 'cuda':
BATCH_SIZE = 64 if gpu_mem_gb >= 8 else 32
else:
BATCH_SIZE = 24
train_transform = T.Compose([
T.Resize((IMG_SIZE, IMG_SIZE)),
T.RandomHorizontalFlip(p=0.5),
T.RandomRotation(degrees=15),
T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.03),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
eval_transform = T.Compose([
T.Resize((IMG_SIZE, IMG_SIZE)),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
class ProduceDataset(Dataset):
def __init__(self, df: pd.DataFrame, encoder: LabelEncoder, transform=None):
self.df = df.reset_index(drop=True).copy()
self.transform = transform
self.labels = encoder.transform(self.df['label'])
def __len__(self):
return len(self.df)
def __getitem__(self, idx):
row = self.df.iloc[idx]
with Image.open(row['path']) as im:
image = im.convert('RGB')
if self.transform:
image = self.transform(image)
label = int(self.labels[idx])
return image, label
# Re-fit LabelEncoder on all unique labels from the combined data_df
# to ensure all classes (including 'potato') are known.
label_encoder.fit(data_df['label'].unique())
class_names = list(label_encoder.classes_)
num_classes = len(class_names)
print('Updated Classes:', class_names)
train_ds = ProduceDataset(train_split_df, label_encoder, train_transform)
val_ds = ProduceDataset(val_split_df, label_encoder, eval_transform)
test_ds = ProduceDataset(test_df, label_encoder, eval_transform)
class_counts = train_split_df['label'].value_counts().sort_index()
weights = train_split_df['label'].map(lambda x: 1.0 / class_counts[x]).values
sampler = WeightedRandomSampler(weights=torch.DoubleTensor(weights), num_samples=len(weights), replacement=True)
dl_kwargs = {
'num_workers': DATALOADER_WORKERS,
'pin_memory': PIN_MEMORY,
}
if DATALOADER_WORKERS > 0:
dl_kwargs['persistent_workers'] = True
dl_kwargs['prefetch_factor'] = 2
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, sampler=sampler, **dl_kwargs)
val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, **dl_kwargs)
test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False, **dl_kwargs)
print({'batch_size': BATCH_SIZE, 'workers': DATALOADER_WORKERS, 'pin_memory': PIN_MEMORY})
len(train_ds), len(val_ds), len(test_ds)
Updated Classes: ['indian market', 'onion', 'potato', 'tomato']
{'batch_size': 64, 'workers': 2, 'pin_memory': True}
(2508, 627, 351)
This baseline establishes a non-deep-learning benchmark. If CNNs do not beat this by a useful margin, deployment complexity may not be justified.
def extract_hist_features(paths, bins=32):
features = []
for p in paths:
arr = np.array(Image.open(p).convert('RGB').resize((128, 128)), dtype=np.uint8)
vec = []
for ch in range(3):
hist, _ = np.histogram(arr[..., ch], bins=bins, range=(0, 255), density=True)
vec.extend(hist.tolist())
features.append(vec)
return np.array(features, dtype=np.float32)
X_train_hist = extract_hist_features(train_split_df['path'].tolist())
X_val_hist = extract_hist_features(val_split_df['path'].tolist())
X_test_hist = extract_hist_features(test_df['path'].tolist())
y_train = label_encoder.transform(train_split_df['label'])
y_val = label_encoder.transform(val_split_df['label'])
y_test = label_encoder.transform(test_df['label'])
lr_model = Pipeline([
('clf', LogisticRegression(max_iter=1000, class_weight='balanced'))
])
lr_model.fit(X_train_hist, y_train)
val_pred = lr_model.predict(X_val_hist)
val_proba = lr_model.predict_proba(X_val_hist)
# Identify which classes are actually present in the validation set to avoid name mismatch
present_labels = np.unique(np.concatenate([y_val, val_pred]))
present_names = [class_names[i] for i in present_labels]
print(classification_report(y_val, val_pred, labels=present_labels, target_names=present_names, digits=4))
baseline_metrics = {
'model': 'LogReg-Histogram',
'val_accuracy': accuracy_score(y_val, val_pred),
'val_macro_f1': precision_recall_fscore_support(y_val, val_pred, average='macro')[2],
'val_macro_roc_auc_ovr': roc_auc_score(label_binarize(y_val, classes=np.arange(num_classes)), val_proba, multi_class='ovr', average='macro', labels=np.arange(num_classes))
}
baseline_metrics
precision recall f1-score support
indian market 0.2731 0.6167 0.3785 120
onion 0.4177 0.1941 0.2651 170
potato 0.4375 0.1955 0.2703 179
tomato 0.6294 0.7848 0.6986 158
accuracy 0.4242 627
macro avg 0.4394 0.4478 0.4031 627
weighted avg 0.4490 0.4242 0.3975 627
{'model': 'LogReg-Histogram',
'val_accuracy': 0.42424242424242425,
'val_macro_f1': 0.40310967114270524,
'val_macro_roc_auc_ovr': np.float64(0.7519893847269308)}
Reusable training/evaluation functions keep experiments reproducible and easy to compare.
@dataclass
class TrainConfig:
epochs: int = 12
lr: float = 1e-3
weight_decay: float = 1e-4
patience: int = 3
AMP_ENABLED = DEVICE.type == 'cuda'
def compute_metrics(y_true, y_pred, y_proba, class_names):
n_classes = len(class_names)
labels = np.arange(n_classes)
p_macro, r_macro, f1_macro, _ = precision_recall_fscore_support(y_true, y_pred, average='macro', zero_division=0)
# Explicitly pass the full set of labels to handle cases where some classes are missing in the batch/split
roc_ovr = roc_auc_score(label_binarize(y_true, classes=labels), y_proba, multi_class='ovr', average='macro', labels=labels)
pr_auc = average_precision_score(label_binarize(y_true, classes=labels), y_proba, average='macro')
return {
'accuracy': accuracy_score(y_true, y_pred),
'precision_macro': p_macro,
'recall_macro': r_macro,
'f1_macro': f1_macro,
'roc_auc_macro_ovr': roc_ovr,
'pr_auc_macro': pr_auc,
'log_loss': log_loss(y_true, y_proba, labels=labels),
}
def evaluate_model(model, loader):
model.eval()
all_logits, all_labels = [], []
with torch.no_grad():
for x, y in loader:
x = x.to(DEVICE, non_blocking=True)
with torch.autocast(device_type='cuda', dtype=torch.float16, enabled=AMP_ENABLED):
logits = model(x)
all_logits.append(logits.detach().cpu())
all_labels.append(y)
logits = torch.cat(all_logits).numpy()
y_true = torch.cat(all_labels).numpy()
y_proba = torch.softmax(torch.tensor(logits), dim=1).numpy()
y_pred = y_proba.argmax(axis=1)
metrics = compute_metrics(y_true, y_pred, y_proba, class_names)
return metrics, y_true, y_pred, y_proba
def fit_model(model, train_loader, val_loader, config: TrainConfig, save_path: Path):
model = model.to(DEVICE)
criterion = nn.CrossEntropyLoss()
trainable_params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.AdamW(trainable_params, lr=config.lr, weight_decay=config.weight_decay)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=1)
scaler = torch.amp.GradScaler('cuda', enabled=AMP_ENABLED)
best_f1 = -np.inf
best_state = None
wait = 0
history = []
for epoch in range(1, config.epochs + 1):
model.train()
train_losses = []
for x, y in train_loader:
x = x.to(DEVICE, non_blocking=True)
y = y.to(DEVICE, non_blocking=True)
optimizer.zero_grad(set_to_none=True)
with torch.autocast(device_type='cuda', dtype=torch.float16, enabled=AMP_ENABLED):
logits = model(x)
loss = criterion(logits, y)
scaler.scale(loss).backward()
scaler.step(optimizer)
scaler.update()
train_losses.append(loss.item())
val_metrics, _, _, _ = evaluate_model(model, val_loader)
scheduler.step(val_metrics['f1_macro'])
row = {
'epoch': epoch,
'train_loss': float(np.mean(train_losses)),
'val_f1_macro': float(val_metrics['f1_macro']),
'val_accuracy': float(val_metrics['accuracy']),
'lr': optimizer.param_groups[0]['lr']
}
history.append(row)
print(row)
if val_metrics['f1_macro'] > best_f1:
best_f1 = val_metrics['f1_macro']
best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
wait = 0
else:
wait += 1
if wait >= config.patience:
print(f'Early stopping at epoch {epoch}')
break
if best_state is None:
best_state = model.state_dict()
model.load_state_dict(best_state)
save_obj = {'state_dict': model.state_dict(), 'class_names': class_names, 'img_size': IMG_SIZE}
torch.save(save_obj, save_path)
return model, pd.DataFrame(history)
class SmallCNN(nn.Module):
def __init__(self, n_classes):
super().__init__()
self.features = nn.Sequential(
nn.Conv2d(3, 32, kernel_size=3, padding=1),
nn.BatchNorm2d(32),
nn.ReLU(inplace=True),
nn.MaxPool2d(2),
nn.Conv2d(32, 64, kernel_size=3, padding=1),
nn.BatchNorm2d(64),
nn.ReLU(inplace=True),
nn.MaxPool2d(2),
nn.Conv2d(64, 128, kernel_size=3, padding=1),
nn.BatchNorm2d(128),
nn.ReLU(inplace=True),
nn.MaxPool2d(2),
nn.Conv2d(128, 256, kernel_size=3, padding=1),
nn.BatchNorm2d(256),
nn.ReLU(inplace=True),
nn.AdaptiveAvgPool2d((1, 1)),
)
self.classifier = nn.Sequential(
nn.Flatten(),
nn.Dropout(0.35),
nn.Linear(256, 128),
nn.ReLU(inplace=True),
nn.Dropout(0.2),
nn.Linear(128, n_classes),
)
def forward(self, x):
return self.classifier(self.features(x))
scratch_model = SmallCNN(num_classes)
scratch_cfg = TrainConfig(epochs=10, lr=8e-4, weight_decay=1e-4, patience=3)
scratch_path = MODEL_DIR / 'cnn_scratch_best.pt'
scratch_model, scratch_history = fit_model(scratch_model, train_loader, val_loader, scratch_cfg, scratch_path)
scratch_history.tail()
{'epoch': 1, 'train_loss': 0.725045595318079, 'val_f1_macro': 0.7083479265275485, 'val_accuracy': 0.7320574162679426, 'lr': 0.0008}
{'epoch': 2, 'train_loss': 0.5161211542785168, 'val_f1_macro': 0.7878662952433746, 'val_accuracy': 0.7910685805422647, 'lr': 0.0008}
{'epoch': 3, 'train_loss': 0.45725610665977, 'val_f1_macro': 0.8372282447148104, 'val_accuracy': 0.8341307814992025, 'lr': 0.0008}
{'epoch': 4, 'train_loss': 0.45569629073143003, 'val_f1_macro': 0.7944227864205552, 'val_accuracy': 0.7990430622009569, 'lr': 0.0008}
{'epoch': 5, 'train_loss': 0.4086588829755783, 'val_f1_macro': 0.8182776118516397, 'val_accuracy': 0.8181818181818182, 'lr': 0.0004}
{'epoch': 6, 'train_loss': 0.3871041785925627, 'val_f1_macro': 0.8202561662108785, 'val_accuracy': 0.8197767145135566, 'lr': 0.0004}
Early stopping at epoch 6
| epoch | train_loss | val_f1_macro | val_accuracy | lr | |
|---|---|---|---|---|---|
| 1 | 2 | 0.516121 | 0.787866 | 0.791069 | 0.0008 |
| 2 | 3 | 0.457256 | 0.837228 | 0.834131 | 0.0008 |
| 3 | 4 | 0.455696 | 0.794423 | 0.799043 | 0.0008 |
| 4 | 5 | 0.408659 | 0.818278 | 0.818182 | 0.0004 |
| 5 | 6 | 0.387104 | 0.820256 | 0.819777 | 0.0004 |
This is the production candidate because pretrained features typically improve generalization on medium-sized datasets.
def build_resnet18(n_classes):
model = resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)
in_features = model.fc.in_features
model.fc = nn.Sequential(
nn.Dropout(0.25),
nn.Linear(in_features, n_classes)
)
return model
transfer_model = build_resnet18(num_classes)
# Stage 1: train classifier head
for p in transfer_model.parameters():
p.requires_grad = False
for p in transfer_model.fc.parameters():
p.requires_grad = True
head_cfg = TrainConfig(epochs=4, lr=1e-3, weight_decay=1e-4, patience=2)
transfer_path = MODEL_DIR / 'cnn_transfer_best.pt'
transfer_model, head_hist = fit_model(transfer_model, train_loader, val_loader, head_cfg, transfer_path)
# Stage 2: fine-tune deeper blocks
for name, p in transfer_model.named_parameters():
if name.startswith('layer4') or name.startswith('layer3') or name.startswith('fc'):
p.requires_grad = True
ft_cfg = TrainConfig(epochs=8, lr=3e-4, weight_decay=1e-4, patience=3)
transfer_model, ft_hist = fit_model(transfer_model, train_loader, val_loader, ft_cfg, transfer_path)
transfer_history = pd.concat([head_hist.assign(stage='head'), ft_hist.assign(stage='finetune')], ignore_index=True)
transfer_history.tail()
{'epoch': 1, 'train_loss': 0.8252654403448105, 'val_f1_macro': 0.8974775764424837, 'val_accuracy': 0.8931419457735247, 'lr': 0.001}
{'epoch': 2, 'train_loss': 0.3838032685220242, 'val_f1_macro': 0.9298917646351434, 'val_accuracy': 0.9282296650717703, 'lr': 0.001}
{'epoch': 3, 'train_loss': 0.28728400617837907, 'val_f1_macro': 0.9445052550934904, 'val_accuracy': 0.9425837320574163, 'lr': 0.001}
{'epoch': 4, 'train_loss': 0.2644272956997156, 'val_f1_macro': 0.9460409359484079, 'val_accuracy': 0.9441786283891547, 'lr': 0.001}
{'epoch': 1, 'train_loss': 0.14255334078334272, 'val_f1_macro': 0.9677236341691148, 'val_accuracy': 0.9665071770334929, 'lr': 0.0003}
{'epoch': 2, 'train_loss': 0.04856402790173888, 'val_f1_macro': 0.9766301159158302, 'val_accuracy': 0.9760765550239234, 'lr': 0.0003}
{'epoch': 3, 'train_loss': 0.04745411624462577, 'val_f1_macro': 0.9798588363297309, 'val_accuracy': 0.9792663476874003, 'lr': 0.0003}
{'epoch': 4, 'train_loss': 0.03402912020683289, 'val_f1_macro': 0.972186305836469, 'val_accuracy': 0.9712918660287081, 'lr': 0.0003}
{'epoch': 5, 'train_loss': 0.029500448773615063, 'val_f1_macro': 0.9770709616726002, 'val_accuracy': 0.9760765550239234, 'lr': 0.00015}
{'epoch': 6, 'train_loss': 0.019463256996823476, 'val_f1_macro': 0.9846151697584579, 'val_accuracy': 0.9840510366826156, 'lr': 0.00015}
{'epoch': 7, 'train_loss': 0.00981823032052489, 'val_f1_macro': 0.984988747618109, 'val_accuracy': 0.9840510366826156, 'lr': 0.00015}
{'epoch': 8, 'train_loss': 0.010930009124240313, 'val_f1_macro': 0.9863561909016455, 'val_accuracy': 0.9856459330143541, 'lr': 0.00015}
| epoch | train_loss | val_f1_macro | val_accuracy | lr | stage | |
|---|---|---|---|---|---|---|
| 7 | 4 | 0.034029 | 0.972186 | 0.971292 | 0.00030 | finetune |
| 8 | 5 | 0.029500 | 0.977071 | 0.976077 | 0.00015 | finetune |
| 9 | 6 | 0.019463 | 0.984615 | 0.984051 | 0.00015 | finetune |
| 10 | 7 | 0.009818 | 0.984989 | 0.984051 | 0.00015 | finetune |
| 11 | 8 | 0.010930 | 0.986356 | 0.985646 | 0.00015 | finetune |
def plot_history(df, title):
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
sns.lineplot(data=df, x='epoch', y='train_loss', marker='o', ax=axes[0])
axes[0].set_title(f'{title} - Train Loss')
sns.lineplot(data=df, x='epoch', y='val_f1_macro', marker='o', ax=axes[1])
axes[1].set_title(f'{title} - Validation Macro F1')
plt.tight_layout()
plot_history(scratch_history, 'Scratch CNN')
plot_history(transfer_history, 'Transfer ResNet18')
plt.show()
We compare models on validation first, choose the best candidate, and only then report final test performance.
scratch_val_metrics, y_val_true_s, y_val_pred_s, y_val_prob_s = evaluate_model(scratch_model, val_loader)
transfer_val_metrics, y_val_true_t, y_val_pred_t, y_val_prob_t = evaluate_model(transfer_model, val_loader)
comparison_df = pd.DataFrame([
{'model': 'LogReg-Histogram', **{k: baseline_metrics[k] for k in baseline_metrics if k != 'model'}},
{'model': 'CNN-Scratch',
'val_accuracy': scratch_val_metrics['accuracy'],
'val_macro_f1': scratch_val_metrics['f1_macro'],
'val_macro_roc_auc_ovr': scratch_val_metrics['roc_auc_macro_ovr']},
{'model': 'ResNet18-Transfer',
'val_accuracy': transfer_val_metrics['accuracy'],
'val_macro_f1': transfer_val_metrics['f1_macro'],
'val_macro_roc_auc_ovr': transfer_val_metrics['roc_auc_macro_ovr']},
])
comparison_df = comparison_df.sort_values('val_macro_f1', ascending=False).reset_index(drop=True)
comparison_df
| model | val_accuracy | val_macro_f1 | val_macro_roc_auc_ovr | |
|---|---|---|---|---|
| 0 | ResNet18-Transfer | 0.985646 | 0.986356 | 0.999650 |
| 1 | CNN-Scratch | 0.834131 | 0.837228 | 0.957764 |
| 2 | LogReg-Histogram | 0.424242 | 0.403110 | 0.751989 |
best_model_name = comparison_df.loc[0, 'model']
best_model = transfer_model if best_model_name == 'ResNet18-Transfer' else scratch_model
test_metrics, y_test_true, y_test_pred, y_test_prob = evaluate_model(best_model, test_loader)
print('Best model selected:', best_model_name)
print(json.dumps(test_metrics, indent=2))
print('\nClassification report (test):')
print(classification_report(y_test_true, y_test_pred, target_names=class_names, digits=4))
cm = confusion_matrix(y_test_true, y_test_pred)
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
plt.title(f'Confusion Matrix - {best_model_name} (Test)')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.tight_layout()
plt.show()
Best model selected: ResNet18-Transfer
{
"accuracy": 0.9572649572649573,
"precision_macro": 0.9561644495855022,
"recall_macro": 0.9538524468243343,
"f1_macro": 0.9541963485268543,
"roc_auc_macro_ovr": 0.9975521478662844,
"pr_auc_macro": 0.9926964365035892,
"log_loss": 0.12802611079570064
}
Classification report (test):
precision recall f1-score support
indian market 0.9872 0.9506 0.9686 81
onion 0.8901 0.9759 0.9310 83
potato 0.9474 0.8889 0.9172 81
tomato 1.0000 1.0000 1.0000 106
accuracy 0.9573 351
macro avg 0.9562 0.9539 0.9542 351
weighted avg 0.9589 0.9573 0.9573 351
# Calibration snapshot (top-1 confidence vs correctness)
pred_conf = y_test_prob.max(axis=1)
pred_ok = (y_test_pred == y_test_true).astype(int)
frac_pos, mean_pred = calibration_curve(pred_ok, pred_conf, n_bins=10, strategy='uniform')
plt.figure(figsize=(6, 5))
plt.plot(mean_pred, frac_pos, marker='o', label='Model')
plt.plot([0, 1], [0, 1], '--', color='gray', label='Perfectly calibrated')
plt.xlabel('Predicted confidence')
plt.ylabel('Empirical accuracy')
plt.title('Calibration Curve (Top-1 Confidence)')
plt.legend()
plt.tight_layout()
plt.show()
calibration_brier = brier_score_loss(pred_ok, pred_conf)
print('Brier score (lower is better):', round(float(calibration_brier), 4))
Brier score (lower is better): 0.0331
For image models, pixel-saliency maps show which regions most influenced predictions.
inv_norm = T.Normalize(
mean=[-0.485 / 0.229, -0.456 / 0.224, -0.406 / 0.225],
std=[1 / 0.229, 1 / 0.224, 1 / 0.225]
)
def compute_saliency(model, x_tensor):
model.eval()
x = x_tensor.clone().detach().unsqueeze(0).to(DEVICE)
x.requires_grad = True
logits = model(x)
pred_idx = logits.argmax(dim=1)
score = logits[0, pred_idx]
score.backward()
grad = x.grad.detach().abs().squeeze().cpu().numpy()
saliency = grad.max(axis=0)
saliency = (saliency - saliency.min()) / (saliency.max() - saliency.min() + 1e-8)
return int(pred_idx.item()), saliency
sample_batch = next(iter(test_loader))
sample_images, sample_labels = sample_batch
n_show = min(6, sample_images.shape[0])
fig, axes = plt.subplots(n_show, 2, figsize=(8, 3 * n_show))
for i in range(n_show):
pred_idx, sal = compute_saliency(best_model, sample_images[i])
raw = inv_norm(sample_images[i]).clamp(0, 1).permute(1, 2, 0).cpu().numpy()
axes[i, 0].imshow(raw)
axes[i, 0].set_title(f'True: {class_names[int(sample_labels[i])]} | Pred: {class_names[pred_idx]}')
axes[i, 0].axis('off')
axes[i, 1].imshow(raw)
axes[i, 1].imshow(sal, cmap='jet', alpha=0.45)
axes[i, 1].set_title('Saliency Overlay')
axes[i, 1].axis('off')
plt.tight_layout()
plt.show()
best_model_path = MODEL_DIR / 'best_model_final.pt'
torch.save({'state_dict': best_model.state_dict(), 'class_names': class_names, 'img_size': IMG_SIZE}, best_model_path)
print('Saved model artifact to:', best_model_path)
def predict_single_image(image_path: str, model, class_names):
model.eval()
img = Image.open(image_path).convert('RGB')
x = eval_transform(img).unsqueeze(0).to(DEVICE)
with torch.no_grad():
proba = torch.softmax(model(x), dim=1).cpu().numpy().ravel()
pred_idx = int(np.argmax(proba))
return {
'predicted_class': class_names[pred_idx],
'confidence': float(proba[pred_idx]),
'probabilities': {class_names[i]: float(proba[i]) for i in range(len(class_names))}
}
sample_path = test_df.sample(1, random_state=SEED)['path'].iloc[0]
predict_single_image(sample_path, best_model, class_names)
Saved model artifact to: /content/models/best_model_final.pt
{'predicted_class': 'onion',
'confidence': 0.9344338774681091,
'probabilities': {'indian market': 0.003869600361213088,
'onion': 0.9344338774681091,
'potato': 0.06165598705410957,
'tomato': 4.053128941450268e-05}}
key_findings = pd.DataFrame([
{'dimension': 'Data Quality', 'finding': f'Corrupt images found: {int(data_df["is_corrupt"].sum())}', 'business_impact': 'Low ingestion risk if checked pre-training'},
{'dimension': 'Class Balance', 'finding': 'Mild imbalance across classes; weighted sampling applied', 'business_impact': 'Better fairness across produce categories'},
{'dimension': 'Best Model', 'finding': best_model_name, 'business_impact': 'Selected for deployment candidate and holdout performance'},
{'dimension': 'Test Macro F1', 'finding': round(float(test_metrics['f1_macro']), 4), 'business_impact': 'Expected reliability in real routing decisions'},
{'dimension': 'Calibration', 'finding': round(float(calibration_brier), 4), 'business_impact': 'Confidence score usefulness for human-in-the-loop fallback'}
])
key_findings
| dimension | finding | business_impact | |
|---|---|---|---|
| 0 | Data Quality | Corrupt images found: 0 | Low ingestion risk if checked pre-training |
| 1 | Class Balance | Mild imbalance across classes; weighted sampli... | Better fairness across produce categories |
| 2 | Best Model | ResNet18-Transfer | Selected for deployment candidate and holdout ... |
| 3 | Test Macro F1 | 0.9542 | Expected reliability in real routing decisions |
| 4 | Calibration | 0.0331 | Confidence score usefulness for human-in-the-l... |
A production-focused image classification pipeline was built for Company A to classify potato, onion, tomato, and noise classes with deployment-readiness controls for automated sorting decisions.
The first run was executed in Google Colab and successfully connected to Drive, enabling full cloud-based execution of data ingestion, model training, and holdout evaluation.
Output-backed results indicate a clear model winner. ResNet18 transfer learning outperformed alternatives in validation (macro F1 0.9864 vs. 0.8372 for scratch CNN and 0.4031 for logistic baseline) and sustained strong holdout performance (test macro F1 0.9542; macro precision 0.9562; macro recall 0.9539; macro ROC-AUC 0.9976).
Business implication: the solution is mature enough for controlled deployment with confidence-threshold escalation, while retaining measurable fallback paths (efficient transfer fallback and optional ensemble hardening) if drift or class-specific degradation appears in production.
Data quality checks further support reliability (3,486 images audited; 0 corrupt files), and calibration quality (Brier score 0.0331) supports operational confidence gating and manual-review routing where risk is highest.