Merge branch 'main' into data_cleaning_test

This commit is contained in:
weikaiwen348-code 2026-04-13 15:48:15 +08:00
commit 3809899218
3 changed files with 464 additions and 0 deletions

167
Dataloader.py Normal file
View file

@ -0,0 +1,167 @@
"""
目前是一份数据加载用的代码没有调整因为现在还没有配置好数据集
这个文件目前还不能运行
author:yukun-hh
date 2026-4-10
"""
import torch
from torch.utils.data import DataLoader, random_split
from torchvision import datasets, transforms
import os
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
def create_dataloaders(data_root='..',
batch_size=32,
image_size=256,
val_split=0.2,
num_workers=4,
augment=True):
"""
创建训练和验证的 DataLoader
Args:
data_root: 项目根目录包含 train val 文件夹
batch_size: 批次大小
image_size: 统一缩放的尺寸256x256
val_split: 从训练集中划分验证集的比例如果你没有独立的 val 文件夹
num_workers: 数据加载线程数
augment: 是否使用数据增强
Returns:
train_loader, val_loader, class_names
"""
# 1. 定义图像预处理(转换)流程
# ==================================
# 训练时的数据增强(提高泛化能力)
train_transform = transforms.Compose([
# 随机调整大小(保留长宽比后裁剪)
transforms.RandomResizedCrop(image_size, scale=(0.8, 1.0)),
# 随机水平翻转(对于垃圾分拣,翻转后类别不变)
transforms.RandomHorizontalFlip(p=0.5),
# 随机旋转±15度
transforms.RandomRotation(degrees=15),
# 随机亮度/对比度调整(模拟不同光照)
transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
# 转换为张量
transforms.ToTensor(),
# 标准化(使用 ImageNet 的均值标准差,可改为自己数据集的)
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# 验证时的预处理(只做必要的操作)
val_transform = transforms.Compose([
# 直接缩放到固定大小
transforms.Resize((image_size, image_size)),
# 转换为张量
transforms.ToTensor(),
# 标准化
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
# 2. 加载数据集
# ==================================
print("使用独立的 val 文件夹")
train_dataset = datasets.ImageFolder(
root=os.path.join(data_root, 'train'),
transform=train_transform if augment else val_transform
)
val_dataset = datasets.ImageFolder(
root=os.path.join(data_root, 'val'),
transform=val_transform
)
print(f"训练集大小: {len(train_dataset)}")
print(f"验证集大小: {len(val_dataset)}")
# 3. 创建 DataLoader
# ==================================
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=True, # 训练集打乱顺序
num_workers=num_workers,
pin_memory=True, # 加速 GPU 传输
drop_last=True # 丢弃最后一个不完整的 batch
)
val_loader = DataLoader(
val_dataset,
batch_size=batch_size,
shuffle=False, # 验证集不需要打乱
num_workers=num_workers,
pin_memory=True,
drop_last=False
)
# 4. 获取类别名称
class_names = train_dataset.classes if hasattr(train_dataset, 'classes') else ['0', '1', '2', '3']
print(f"类别: {class_names}")
print(f"类别映射: {train_dataset.class_to_idx if hasattr(train_dataset, 'class_to_idx') else '0-3'}")
return train_loader, val_loader, class_names
# ========== 辅助函数:检查数据加载是否正确 ==========
def visualize_batch(dataloader, class_names, num_images=8):
"""可视化一个 batch 的图像,检查数据是否正确"""
images, labels = next(iter(dataloader))
# 反标准化(用于显示)
mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)
fig, axes = plt.subplots(1, min(num_images, len(images)), figsize=(15, 3))
if len(images) == 1:
axes = [axes]
for i in range(min(num_images, len(images))):
img = images[i].cpu()
img = img * std + mean # 反标准化
img = torch.clamp(img, 0, 1) # 裁剪到 [0,1]
img = img.permute(1, 2, 0).numpy()
axes[i].imshow(img)
axes[i].set_title(f'{class_names[labels[i]]}')
axes[i].axis('off')
plt.tight_layout()
plt.show()
# 打印批次信息
print(f"Batch 图像形状: {images.shape}")
print(f"Batch 标签: {labels}")
print(f"标签分布: {torch.bincount(labels)}")
# ========== 使用示例 ==========
if __name__ == '__main__':
train_loader, val_loader, class_names = create_dataloaders(
data_root='..', # 与trash-division同级文件夹
batch_size=32, # 根据你的显存调整
image_size=256, # 与你模型输入一致
num_workers=4, # Windows 可能需设为 0
augment=True # 训练时使用数据增强
)
visualize_batch(train_loader, class_names, num_images=8)

99
Model.py Normal file
View file

@ -0,0 +1,99 @@
"""
这个文件是模型的定义文件请不要擅自修改如有疑问微信群里反馈
单独运行本文件将会输出模型结构
目前的话是一个36层的模型模型总量应该是在80M左右 如果到时候还是欠拟合的话再考虑去做更深的结构
author : yukun-hh
date : 2026-4-10
"""
#神经网络模型库
import torch
from torch import nn
from torch.nn import functional as F
from torchsummary import summary
#残差块
class Resblock(nn.Module):
def __init__(self, input_channels,output_channels,use_1x1conv=False,strides=1):
"""
:param input_channels: 进入残差块时的原通道
:param output_channels: 输出时的通道数
:param use_1x1conv: 如果输入和输出通道不相等时需要用一个1x1的卷积层对原来的输入进行一个通道提升
:param strides: 默认1如果大于1起到缩小张量的作用
"""
super().__init__()
self.conv1 = nn.Conv2d(input_channels,output_channels,kernel_size=3,padding=1,stride=strides)
self.conv2 = nn.Conv2d(output_channels,output_channels,kernel_size=3,padding=1,stride=1)
if use_1x1conv:
self.conv3 = nn.Conv2d(input_channels, output_channels,kernel_size=1, stride=strides)
else:
self.conv3 = None
self.bn1 = nn.BatchNorm2d(output_channels)
self.bn2 = nn.BatchNorm2d(output_channels)
def forward(self,X):
Y = F.relu(self.bn1(self.conv1(X)))
Y = self.bn2(self.conv2(Y))
if self.conv3 is not None:
X = self.conv3(X)
Y += X
return F.relu(Y)
class Net():
"""
模型的主要结构就在这里了到时也好该和调用
现在必须实现的方法
目前还是以图片缩放到256256构建残差块
"""
net = nn.Sequential()
def resnet_block(self,input_channels, num_channels, num_residuals,
first_block=False):
"""
:param input_channels: 输入维度
:param num_channels: 输出维度
:param num_residuals: 单个残差层的残差块数
:param first_block: 第一块不用下采样 特殊控制
:return: list[nn.Module]
"""
blk = []
for i in range(num_residuals):
if i == 0 and not first_block:
blk.append(Resblock(input_channels, num_channels,
use_1x1conv=True, strides=2))
else:
blk.append(Resblock(num_channels, num_channels))
return blk
def __init__(self):
b1 = nn.Sequential( nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
nn.BatchNorm2d(64), nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)
"""
7×7 卷积层输出通道 64步长 2填充 3
(3×256×256)->(64×128×128)
批归一化 relu层
最大池化
(64×128×128)->(64×64×64)
"""
b2 = nn.Sequential(*self.resnet_block(64, 64, num_residuals=3, first_block=True))
b3 = nn.Sequential(*self.resnet_block(64, 128, num_residuals=4))
b4 = nn.Sequential(*self.resnet_block(128, 256, num_residuals=6))
b5 = nn.Sequential(*self.resnet_block(256, 512, num_residuals=3))
self.net = nn.Sequential(b1, b2, b3, b4, b5,nn.AdaptiveAvgPool2d((1,1)),nn.Flatten(), nn.Linear(512, 4))
def get_network(self):
return self.net
if __name__ == '__main__':
Net_new = Net()
X = torch.rand(size=(1, 3, 256, 256))
summary(Net_new.get_network(), input_size=(3, 256, 256))

198
Train.py Normal file
View file

@ -0,0 +1,198 @@
"""
目前是由AI先生成了一份训练用代码没有调整因为现在还没有设计好数据迭代器
这个文件目前还不能运行
最佳模型将会保存在根目录下
author:yukun-hh
date 2026-4-10
"""
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm # 进度条,可选
import matplotlib.pyplot as plt
from Model import Net
def train_one_epoch(model, train_loader, criterion, optimizer, device, epoch):
"""训练一个epoch"""
model.train() # 设置为训练模式
running_loss = 0.0
correct = 0
total = 0
# 使用 tqdm 显示进度条(可选)
pbar = tqdm(train_loader, desc=f'Epoch {epoch + 1} [Train]')
for images, labels in pbar:
# 将数据移到 GPU/CPU
images, labels = images.to(device), labels.to(device)
# 前向传播
outputs = model(images)
loss = criterion(outputs, labels)
# 反向传播
optimizer.zero_grad() # 清空梯度
loss.backward() # 计算梯度
optimizer.step() # 更新参数
# 统计
running_loss += loss.item() * images.size(0)
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
# 更新进度条信息
pbar.set_postfix({'loss': loss.item(), 'acc': 100. * correct / total})
epoch_loss = running_loss / total
epoch_acc = 100. * correct / total
return epoch_loss, epoch_acc
def validate(model, val_loader, criterion, device):
"""验证函数"""
model.eval() # 设置为评估模式
running_loss = 0.0
correct = 0
total = 0
with torch.no_grad(): # 不计算梯度,节省内存
for images, labels in tqdm(val_loader, desc='[Validate]'):
images, labels = images.to(device), labels.to(device)
outputs = model(images)
loss = criterion(outputs, labels)
running_loss += loss.item() * images.size(0)
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
epoch_loss = running_loss / total
epoch_acc = 100. * correct / total
return epoch_loss, epoch_acc
def train(model, train_loader, val_loader, epochs=50, lr=0.001, device='cuda'):
"""主训练函数"""
# 1. 定义损失函数和优化器
criterion = nn.CrossEntropyLoss() # 多分类用交叉熵
# 优化器选择(推荐 Adam 或 SGD
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
# 或者使用 SGD + 动量
# optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=1e-4)
# 学习率调度器(可选,帮助收敛)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)
# 或者用余弦退火
# scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)
# 2. 记录训练历史
history = {
'train_loss': [],
'train_acc': [],
'val_loss': [],
'val_acc': []
}
best_val_acc = 0.0
# 3. 开始训练
for epoch in range(epochs):
print(f'\n{"=" * 50}')
print(f'Epoch {epoch + 1}/{epochs}')
# 训练
train_loss, train_acc = train_one_epoch(model, train_loader, criterion,
optimizer, device, epoch)
# 验证
val_loss, val_acc = validate(model, val_loader, criterion, device)
# 更新学习率
scheduler.step()
# 记录
history['train_loss'].append(train_loss)
history['train_acc'].append(train_acc)
history['val_loss'].append(val_loss)
history['val_acc'].append(val_acc)
# 打印结果
print(f'Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}%')
print(f'Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%')
print(f'Learning Rate: {optimizer.param_groups[0]["lr"]:.6f}')
# 保存最佳模型
if val_acc > best_val_acc:
best_val_acc = val_acc
torch.save(model.state_dict(), 'best_model.pth')
print(f'✓ 保存最佳模型 (Acc: {val_acc:.2f}%)')
# 4. 绘制训练曲线
plot_training_history(history)
print(f'\n{"=" * 50}')
print(f'训练完成!最佳验证准确率: {best_val_acc:.2f}%')
return model, history
def plot_training_history(history):
"""绘制训练曲线"""
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
# 损失曲线
ax1.plot(history['train_loss'], label='Train Loss')
ax1.plot(history['val_loss'], label='Val Loss')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.set_title('Training and Validation Loss')
ax1.legend()
ax1.grid(True)
# 准确率曲线
ax2.plot(history['train_acc'], label='Train Acc')
ax2.plot(history['val_acc'], label='Val Acc')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Accuracy (%)')
ax2.set_title('Training and Validation Accuracy')
ax2.legend()
ax2.grid(True)
plt.tight_layout()
plt.savefig('training_history.png', dpi=150)
plt.show()
# ========== 使用示例 ==========
if __name__ == '__main__':
# 假设你的 dataloader 已经写好了
# train_loader = ...
# val_loader = ...
# 1. 创建模型
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Net().get_network() # 根据你的 Net 类调整
model = model.to(device)
# 打印模型信息
print(f'Device: {device}')
print(f'Model parameters: {sum(p.numel() for p in model.parameters()):,}')
# 2. 开始训练
trained_model, history = train(
model=model,
train_loader=train_loader,
val_loader=val_loader,
epochs=50,
lr=0.001,
device=device
)
# 3. 加载最佳模型用于预测
model.load_state_dict(torch.load('best_model.pth'))
print('训练完成,最佳模型已加载')