diff --git a/Dataloader.py b/Dataloader.py new file mode 100644 index 0000000..fab833b --- /dev/null +++ b/Dataloader.py @@ -0,0 +1,167 @@ +""" +目前是一份数据加载用的代码,没有调整,因为现在还没有配置好数据集 +这个文件目前还不能运行!!! + + +author:yukun-hh +date :2026-4-10 +""" +import torch +from torch.utils.data import DataLoader, random_split +from torchvision import datasets, transforms +import os +from PIL import Image +import matplotlib.pyplot as plt +import numpy as np + + +def create_dataloaders(data_root='..', + batch_size=32, + image_size=256, + val_split=0.2, + num_workers=4, + augment=True): + """ + 创建训练和验证的 DataLoader + + Args: + data_root: 项目根目录(包含 train 和 val 文件夹) + batch_size: 批次大小 + image_size: 统一缩放的尺寸(256x256) + val_split: 从训练集中划分验证集的比例(如果你没有独立的 val 文件夹) + num_workers: 数据加载线程数 + augment: 是否使用数据增强 + + Returns: + train_loader, val_loader, class_names + """ + + # 1. 定义图像预处理(转换)流程 + # ================================== + + # 训练时的数据增强(提高泛化能力) + train_transform = transforms.Compose([ + # 随机调整大小(保留长宽比后裁剪) + transforms.RandomResizedCrop(image_size, scale=(0.8, 1.0)), + + # 随机水平翻转(对于垃圾分拣,翻转后类别不变) + transforms.RandomHorizontalFlip(p=0.5), + + # 随机旋转(±15度) + transforms.RandomRotation(degrees=15), + + # 随机亮度/对比度调整(模拟不同光照) + transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2), + + # 转换为张量 + transforms.ToTensor(), + + # 标准化(使用 ImageNet 的均值标准差,可改为自己数据集的) + transforms.Normalize(mean=[0.485, 0.456, 0.406], + std=[0.229, 0.224, 0.225]) + ]) + + # 验证时的预处理(只做必要的操作) + val_transform = transforms.Compose([ + # 直接缩放到固定大小 + transforms.Resize((image_size, image_size)), + + # 转换为张量 + transforms.ToTensor(), + + # 标准化 + transforms.Normalize(mean=[0.485, 0.456, 0.406], + std=[0.229, 0.224, 0.225]) + ]) + + # 2. 加载数据集 + # ================================== + print("使用独立的 val 文件夹") + train_dataset = datasets.ImageFolder( + root=os.path.join(data_root, 'train'), + transform=train_transform if augment else val_transform + ) + + val_dataset = datasets.ImageFolder( + root=os.path.join(data_root, 'val'), + transform=val_transform + ) + print(f"训练集大小: {len(train_dataset)}") + print(f"验证集大小: {len(val_dataset)}") + + + # 3. 创建 DataLoader + # ================================== + train_loader = DataLoader( + train_dataset, + batch_size=batch_size, + shuffle=True, # 训练集打乱顺序 + num_workers=num_workers, + pin_memory=True, # 加速 GPU 传输 + drop_last=True # 丢弃最后一个不完整的 batch + ) + + val_loader = DataLoader( + val_dataset, + batch_size=batch_size, + shuffle=False, # 验证集不需要打乱 + num_workers=num_workers, + pin_memory=True, + drop_last=False + ) + + # 4. 获取类别名称 + class_names = train_dataset.classes if hasattr(train_dataset, 'classes') else ['0', '1', '2', '3'] + print(f"类别: {class_names}") + print(f"类别映射: {train_dataset.class_to_idx if hasattr(train_dataset, 'class_to_idx') else '0-3'}") + + return train_loader, val_loader, class_names + + +# ========== 辅助函数:检查数据加载是否正确 ========== + +def visualize_batch(dataloader, class_names, num_images=8): + """可视化一个 batch 的图像,检查数据是否正确""" + images, labels = next(iter(dataloader)) + + # 反标准化(用于显示) + mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1) + std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1) + + fig, axes = plt.subplots(1, min(num_images, len(images)), figsize=(15, 3)) + if len(images) == 1: + axes = [axes] + + for i in range(min(num_images, len(images))): + img = images[i].cpu() + img = img * std + mean # 反标准化 + img = torch.clamp(img, 0, 1) # 裁剪到 [0,1] + img = img.permute(1, 2, 0).numpy() + + axes[i].imshow(img) + axes[i].set_title(f'{class_names[labels[i]]}') + axes[i].axis('off') + + plt.tight_layout() + plt.show() + + # 打印批次信息 + print(f"Batch 图像形状: {images.shape}") + print(f"Batch 标签: {labels}") + print(f"标签分布: {torch.bincount(labels)}") + + + + + +# ========== 使用示例 ========== + +if __name__ == '__main__': + train_loader, val_loader, class_names = create_dataloaders( + data_root='..', # 与trash-division同级文件夹 + batch_size=32, # 根据你的显存调整 + image_size=256, # 与你模型输入一致 + num_workers=4, # Windows 可能需设为 0 + augment=True # 训练时使用数据增强 + ) + visualize_batch(train_loader, class_names, num_images=8) diff --git a/Model.py b/Model.py new file mode 100644 index 0000000..d568b97 --- /dev/null +++ b/Model.py @@ -0,0 +1,99 @@ +""" +这个文件是模型的定义文件,请不要擅自修改,如有疑问微信群里反馈 +单独运行本文件将会输出模型结构 +目前的话是一个36层的模型,模型总量应该是在80M左右 如果到时候还是欠拟合的话再考虑去做更深的结构 +author : yukun-hh +date : 2026-4-10 + +""" +#神经网络模型库 +import torch +from torch import nn +from torch.nn import functional as F +from torchsummary import summary +#残差块 +class Resblock(nn.Module): + def __init__(self, input_channels,output_channels,use_1x1conv=False,strides=1): + """ + + :param input_channels: 进入残差块时的原通道 + :param output_channels: 输出时的通道数 + :param use_1x1conv: 如果输入和输出通道不相等时,需要用一个1x1的卷积层对原来的输入进行一个通道提升 + :param strides: 默认1,如果大于1起到缩小张量的作用 + """ + super().__init__() + self.conv1 = nn.Conv2d(input_channels,output_channels,kernel_size=3,padding=1,stride=strides) + self.conv2 = nn.Conv2d(output_channels,output_channels,kernel_size=3,padding=1,stride=1) + if use_1x1conv: + self.conv3 = nn.Conv2d(input_channels, output_channels,kernel_size=1, stride=strides) + else: + self.conv3 = None + self.bn1 = nn.BatchNorm2d(output_channels) + self.bn2 = nn.BatchNorm2d(output_channels) + def forward(self,X): + Y = F.relu(self.bn1(self.conv1(X))) + Y = self.bn2(self.conv2(Y)) + if self.conv3 is not None: + X = self.conv3(X) + Y += X + return F.relu(Y) + +class Net(): + """ + 模型的主要结构就在这里了,到时也好该和调用 + 现在必须实现的方法: + 目前还是以图片缩放到256*256构建残差块 + """ + net = nn.Sequential() + def resnet_block(self,input_channels, num_channels, num_residuals, + first_block=False): + """ + :param input_channels: 输入维度 + :param num_channels: 输出维度 + :param num_residuals: 单个残差层的残差块数 + :param first_block: 第一块不用下采样 特殊控制 + :return: list[nn.Module] + """ + blk = [] + + for i in range(num_residuals): + if i == 0 and not first_block: + blk.append(Resblock(input_channels, num_channels, + use_1x1conv=True, strides=2)) + else: + blk.append(Resblock(num_channels, num_channels)) + return blk + def __init__(self): + b1 = nn.Sequential( nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3), + nn.BatchNorm2d(64), nn.ReLU(), + nn.MaxPool2d(kernel_size=3, stride=2, padding=1) + ) + """ + 7×7 卷积层,输出通道 64,步长 2,填充 3 + (3×256×256)->(64×128×128) + 批归一化 relu层 + 最大池化 + (64×128×128)->(64×64×64) + """ + b2 = nn.Sequential(*self.resnet_block(64, 64, num_residuals=3, first_block=True)) + b3 = nn.Sequential(*self.resnet_block(64, 128, num_residuals=4)) + b4 = nn.Sequential(*self.resnet_block(128, 256, num_residuals=6)) + b5 = nn.Sequential(*self.resnet_block(256, 512, num_residuals=3)) + self.net = nn.Sequential(b1, b2, b3, b4, b5,nn.AdaptiveAvgPool2d((1,1)),nn.Flatten(), nn.Linear(512, 4)) + def get_network(self): + return self.net + + + +if __name__ == '__main__': + Net_new = Net() + X = torch.rand(size=(1, 3, 256, 256)) + summary(Net_new.get_network(), input_size=(3, 256, 256)) + + + + + + + + diff --git a/Train.py b/Train.py new file mode 100644 index 0000000..6c87c69 --- /dev/null +++ b/Train.py @@ -0,0 +1,198 @@ +""" +目前是由AI先生成了一份训练用代码,没有调整,因为现在还没有设计好数据迭代器 +这个文件目前还不能运行!!! + +最佳模型将会保存在根目录下 +author:yukun-hh +date :2026-4-10 +""" +import torch +import torch.nn as nn +import torch.optim as optim +from tqdm import tqdm # 进度条,可选 +import matplotlib.pyplot as plt +from Model import Net + +def train_one_epoch(model, train_loader, criterion, optimizer, device, epoch): + """训练一个epoch""" + model.train() # 设置为训练模式 + running_loss = 0.0 + correct = 0 + total = 0 + + # 使用 tqdm 显示进度条(可选) + pbar = tqdm(train_loader, desc=f'Epoch {epoch + 1} [Train]') + + for images, labels in pbar: + # 将数据移到 GPU/CPU + images, labels = images.to(device), labels.to(device) + + # 前向传播 + outputs = model(images) + loss = criterion(outputs, labels) + + # 反向传播 + optimizer.zero_grad() # 清空梯度 + loss.backward() # 计算梯度 + optimizer.step() # 更新参数 + + # 统计 + running_loss += loss.item() * images.size(0) + _, predicted = outputs.max(1) + total += labels.size(0) + correct += predicted.eq(labels).sum().item() + + # 更新进度条信息 + pbar.set_postfix({'loss': loss.item(), 'acc': 100. * correct / total}) + + epoch_loss = running_loss / total + epoch_acc = 100. * correct / total + return epoch_loss, epoch_acc + + +def validate(model, val_loader, criterion, device): + """验证函数""" + model.eval() # 设置为评估模式 + running_loss = 0.0 + correct = 0 + total = 0 + + with torch.no_grad(): # 不计算梯度,节省内存 + for images, labels in tqdm(val_loader, desc='[Validate]'): + images, labels = images.to(device), labels.to(device) + + outputs = model(images) + loss = criterion(outputs, labels) + + running_loss += loss.item() * images.size(0) + _, predicted = outputs.max(1) + total += labels.size(0) + correct += predicted.eq(labels).sum().item() + + epoch_loss = running_loss / total + epoch_acc = 100. * correct / total + return epoch_loss, epoch_acc + + +def train(model, train_loader, val_loader, epochs=50, lr=0.001, device='cuda'): + """主训练函数""" + + # 1. 定义损失函数和优化器 + criterion = nn.CrossEntropyLoss() # 多分类用交叉熵 + + # 优化器选择(推荐 Adam 或 SGD) + optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4) + # 或者使用 SGD + 动量 + # optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=1e-4) + + # 学习率调度器(可选,帮助收敛) + scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1) + # 或者用余弦退火 + # scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs) + + # 2. 记录训练历史 + history = { + 'train_loss': [], + 'train_acc': [], + 'val_loss': [], + 'val_acc': [] + } + + best_val_acc = 0.0 + + # 3. 开始训练 + for epoch in range(epochs): + print(f'\n{"=" * 50}') + print(f'Epoch {epoch + 1}/{epochs}') + + # 训练 + train_loss, train_acc = train_one_epoch(model, train_loader, criterion, + optimizer, device, epoch) + + # 验证 + val_loss, val_acc = validate(model, val_loader, criterion, device) + + # 更新学习率 + scheduler.step() + + # 记录 + history['train_loss'].append(train_loss) + history['train_acc'].append(train_acc) + history['val_loss'].append(val_loss) + history['val_acc'].append(val_acc) + + # 打印结果 + print(f'Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2f}%') + print(f'Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%') + print(f'Learning Rate: {optimizer.param_groups[0]["lr"]:.6f}') + + # 保存最佳模型 + if val_acc > best_val_acc: + best_val_acc = val_acc + torch.save(model.state_dict(), 'best_model.pth') + print(f'✓ 保存最佳模型 (Acc: {val_acc:.2f}%)') + + # 4. 绘制训练曲线 + plot_training_history(history) + + print(f'\n{"=" * 50}') + print(f'训练完成!最佳验证准确率: {best_val_acc:.2f}%') + + return model, history + + +def plot_training_history(history): + """绘制训练曲线""" + fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4)) + + # 损失曲线 + ax1.plot(history['train_loss'], label='Train Loss') + ax1.plot(history['val_loss'], label='Val Loss') + ax1.set_xlabel('Epoch') + ax1.set_ylabel('Loss') + ax1.set_title('Training and Validation Loss') + ax1.legend() + ax1.grid(True) + + # 准确率曲线 + ax2.plot(history['train_acc'], label='Train Acc') + ax2.plot(history['val_acc'], label='Val Acc') + ax2.set_xlabel('Epoch') + ax2.set_ylabel('Accuracy (%)') + ax2.set_title('Training and Validation Accuracy') + ax2.legend() + ax2.grid(True) + + plt.tight_layout() + plt.savefig('training_history.png', dpi=150) + plt.show() + + +# ========== 使用示例 ========== +if __name__ == '__main__': + # 假设你的 dataloader 已经写好了 + # train_loader = ... + # val_loader = ... + + # 1. 创建模型 + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + model = Net().get_network() # 根据你的 Net 类调整 + model = model.to(device) + + # 打印模型信息 + print(f'Device: {device}') + print(f'Model parameters: {sum(p.numel() for p in model.parameters()):,}') + + # 2. 开始训练 + trained_model, history = train( + model=model, + train_loader=train_loader, + val_loader=val_loader, + epochs=50, + lr=0.001, + device=device + ) + + # 3. 加载最佳模型用于预测 + model.load_state_dict(torch.load('best_model.pth')) + print('训练完成,最佳模型已加载') \ No newline at end of file