trash-division/baseline/ResNet34_Pretrained_10pct.py

278 lines
11 KiB
Python

"""
baseline/ResNet34_Pretrained_10pct.py
ResNet-34 ImageNet 预训练权重 + 10% 训练集微调
可独立运行训练,也可被 compare_models.py 导入
author: yukun-hh
date: 2026-5-14
"""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
from torchvision import models, transforms
from tqdm import tqdm
import csv
import matplotlib.pyplot as plt
import matplotlib
from Dataloader import RobustImageFolder
matplotlib.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
matplotlib.rcParams['axes.unicode_minus'] = False
# ============================================================
# ★★★ 可配置参数 ★★★
# ============================================================
DATA_ROOT = '../../trash_division_data/ultimate_4_class/'
BATCH_SIZE = 32
IMAGE_SIZE = 256
NUM_WORKERS = 4
EPOCHS = 30
LR = 0.001
TRAIN_PCT = 0.1
SEED = 42
DROPOUT = 0.3
MODEL_SAVE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'resnet34_10pct.pth')
LOG_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'resnet34_10pct_log.csv')
# ============================================================
NUM_CLASSES = 4
CLASS_NAMES = ['厨余垃圾', '可回收物', '其他垃圾', '有害垃圾']
class PretrainedResNet34(nn.Module):
def __init__(self, num_classes=NUM_CLASSES, dropout=DROPOUT):
super().__init__()
self.backbone = models.resnet34(weights='IMAGENET1K_V1')
in_features = self.backbone.fc.in_features
self.backbone.fc = nn.Identity()
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(in_features, num_classes)
def forward(self, x):
x = self.backbone(x)
x = self.dropout(x)
x = self.fc(x)
return x
def freeze_early_layers(self):
for param in self.backbone.conv1.parameters():
param.requires_grad = False
for param in self.backbone.bn1.parameters():
param.requires_grad = False
for param in self.backbone.layer1.parameters():
param.requires_grad = False
for param in self.backbone.layer2.parameters():
param.requires_grad = False
def print_trainable_info(self):
frozen = sum(p.numel() for p in self.parameters() if not p.requires_grad)
trainable = sum(p.numel() for p in self.parameters() if p.requires_grad)
total = frozen + trainable
print(f" 冻结参数: {frozen:,} 可训练参数: {trainable:,} ({100.*trainable/total:.1f}%)")
def compute_macro_f1(predicted, targets, num_classes=NUM_CLASSES):
tp = torch.zeros(num_classes, device=predicted.device)
fp = torch.zeros(num_classes, device=predicted.device)
fn = torch.zeros(num_classes, device=predicted.device)
for c in range(num_classes):
tp[c] = ((predicted == c) & (targets == c)).sum()
fp[c] = ((predicted == c) & (targets != c)).sum()
fn[c] = ((predicted != c) & (targets == c)).sum()
precision = tp / (tp + fp + 1e-8)
recall = tp / (tp + fn + 1e-8)
f1 = 2 * precision * recall / (precision + recall + 1e-8)
return f1.mean().item()
def train_one_epoch(model, loader, criterion, optimizer, device, epoch):
model.train()
running_loss, correct, total = 0.0, 0, 0
all_preds, all_labels = [], []
pbar = tqdm(loader, desc=f'Epoch {epoch+1} [Train]')
for images, labels in pbar:
images, labels = images.to(device), labels.to(device)
outputs = model(images)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
running_loss += loss.item() * images.size(0)
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
all_preds.append(predicted)
all_labels.append(labels)
batch_f1 = compute_macro_f1(predicted, labels)
pbar.set_postfix({'loss': loss.item(), 'F1': f'{batch_f1:.4f}',
'Acc': f'{100.*correct/total:.2f}%'})
epoch_loss = running_loss / total
epoch_f1 = compute_macro_f1(torch.cat(all_preds), torch.cat(all_labels))
epoch_acc = 100. * correct / total
return epoch_loss, epoch_f1, epoch_acc
@torch.no_grad()
def validate(model, loader, criterion, device):
model.eval()
running_loss, correct, total = 0.0, 0, 0
all_preds, all_labels = [], []
for images, labels in tqdm(loader, desc='[Validate]'):
images, labels = images.to(device), labels.to(device)
outputs = model(images)
loss = criterion(outputs, labels)
running_loss += loss.item() * images.size(0)
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
all_preds.append(predicted)
all_labels.append(labels)
epoch_loss = running_loss / total
epoch_f1 = compute_macro_f1(torch.cat(all_preds), torch.cat(all_labels))
epoch_acc = 100. * correct / total
return epoch_loss, epoch_f1, epoch_acc
def train_model(model, train_loader, val_loader, device, epochs=EPOCHS, lr=LR):
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(filter(lambda p: p.requires_grad, model.parameters()),
lr=lr, momentum=0.9, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
history = {'train_loss': [], 'train_f1': [], 'train_acc': [],
'val_loss': [], 'val_f1': [], 'val_acc': []}
best_val_f1 = 0.0
log_file = open(LOG_PATH, 'w', newline='')
log_writer = csv.writer(log_file)
log_writer.writerow(['epoch', 'train_loss', 'train_f1', 'train_acc',
'val_loss', 'val_f1', 'val_acc', 'lr', 'best'])
for epoch in range(epochs):
print(f'\n{"="*50}')
print(f'Epoch {epoch+1}/{epochs}')
train_loss, train_f1, train_acc = train_one_epoch(
model, train_loader, criterion, optimizer, device, epoch)
val_loss, val_f1, val_acc = validate(model, val_loader, criterion, device)
scheduler.step()
history['train_loss'].append(train_loss)
history['train_f1'].append(train_f1)
history['train_acc'].append(train_acc)
history['val_loss'].append(val_loss)
history['val_f1'].append(val_f1)
history['val_acc'].append(val_acc)
print(f'Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}% | Train Macro-F1: {train_f1:.4f}')
print(f'Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}% | Val Macro-F1: {val_f1:.4f}')
print(f'Learning Rate: {optimizer.param_groups[0]["lr"]:.6f}')
best_mark = ''
if val_f1 > best_val_f1:
best_val_f1 = val_f1
torch.save(model.state_dict(), MODEL_SAVE_PATH)
best_mark = 'best'
print(f'✓ 保存最佳模型 (Macro-F1: {val_f1:.4f})')
lr_val = optimizer.param_groups[0]['lr']
log_writer.writerow([epoch+1, train_loss, train_f1, train_acc,
val_loss, val_f1, val_acc, lr_val, best_mark])
log_file.flush()
log_file.close()
print(f'\n训练完成!最佳验证 Macro-F1: {best_val_f1:.4f}')
return history
# ============================================================
# compare_models.py 导入接口
# ============================================================
def get_resnet34_10pct_preds(train_loader, val_loader, device):
model = PretrainedResNet34(num_classes=NUM_CLASSES)
model.load_state_dict(torch.load(MODEL_SAVE_PATH, map_location='cpu'))
model = model.to(device).eval()
y_true, y_preds, y_probs = [], [], []
with torch.no_grad():
for images, labels in tqdm(val_loader, desc='ResNet-34 (10%)'):
images, labels = images.to(device), labels
logits = model(images)
probs = torch.softmax(logits, dim=1)
preds = probs.argmax(dim=1)
y_true.append(labels.numpy())
y_preds.append(preds.cpu().numpy())
y_probs.append(probs.cpu().numpy())
return np.concatenate(y_true), np.concatenate(y_preds), np.concatenate(y_probs)
# ============================================================
# 独立训练入口
# ============================================================
if __name__ == '__main__':
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
device = torch.device('cuda' if torch.cuda.is_available()
else 'xpu' if hasattr(torch, 'xpu') and torch.xpu.is_available()
else 'cpu')
print(f"Device: {device}")
val_transform = transforms.Compose([
transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
train_transform = transforms.Compose([
transforms.RandomResizedCrop(IMAGE_SIZE, scale=(0.8, 1.0)),
transforms.RandomHorizontalFlip(p=0.5),
transforms.RandomRotation(degrees=15),
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
full_train_dataset = RobustImageFolder(
root=os.path.join(DATA_ROOT, 'train'),
transform=train_transform,
)
val_dataset = RobustImageFolder(
root=os.path.join(DATA_ROOT, 'val'),
transform=val_transform,
)
n_train = len(full_train_dataset)
n_subset = max(1, int(n_train * TRAIN_PCT))
indices = random.sample(range(n_train), n_subset)
train_dataset = Subset(full_train_dataset, indices)
print(f"训练集: {len(train_dataset)} / {n_train} ({TRAIN_PCT*100:.0f}%)")
print(f"验证集: {len(val_dataset)}")
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE,
shuffle=True, num_workers=NUM_WORKERS,
pin_memory=True, drop_last=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE,
shuffle=False, num_workers=NUM_WORKERS,
pin_memory=True, drop_last=False)
model = PretrainedResNet34(num_classes=NUM_CLASSES, dropout=DROPOUT)
model.freeze_early_layers()
model.print_trainable_info()
model = model.to(device)
history = train_model(model, train_loader, val_loader, device, epochs=EPOCHS, lr=LR)
model.load_state_dict(torch.load(MODEL_SAVE_PATH, map_location='cpu'))
print(f"模型已保存: {MODEL_SAVE_PATH}")
print(f"训练日志已保存: {LOG_PATH}")