defparse_model(d, ch, verbose=True): # model_dict, input_channels(3) """ Parse a YOLO model.yaml dictionary into a PyTorch model. Args: d (dict): Model dictionary. ch (int): Input channels. verbose (bool): Whether to print model details. Returns: (tuple): Tuple containing the PyTorch model and sorted list of output layers. """ import ast # 抽象语法树, 安全解析python字符串
# Args legacy = True# backward compatibility for v3/v5/v8/v9 models max_channels = float("inf") # 模型缩放的上限 nc, act, scales = (d.get(x) for x in ("nc", "activation", "scales")) # 类别数量 激活函数 缩放因子字典 depth, width, kpt_shape = (d.get(x, 1.0) for x in ("depth_multiple", "width_multiple", "kpt_shape")) # 模型宽度深度缩放乘数, 比如某层n=3, depth=0.5 那么真正的模块数就是round(3*5)=2 if scales: # 这里的设置和上面设置是相同的, 两种不同方式指定缩放因子 scale = d.get("scale") ifnot scale: scale = tuple(scales.keys())[0] LOGGER.warning(f"no model scale passed. Assuming scale='{scale}'.") depth, width, max_channels = scales[scale]
if act: # 指定激活函数 Conv.default_act = eval(act) # redefine default activation, i.e. Conv.default_act = torch.nn.SiLU() if verbose: LOGGER.info(f"{colorstr('activation:')}{act}") # print
if verbose: LOGGER.info(f"\n{'':>3}{'from':>20}{'n':>3}{'params':>10}{'module':<45}{'arguments':<30}") ch = [ch] # 记录每层输出通道数 # 模型模块列表 记录哪些层的输出需要保留 最后一层输出通道数 layers, save, c2 = [], [], ch[-1] # layers, savelist, ch out # frozenset是一个不可变的集合,是为了快速查找 # base_modules是基础模块,repeat_modules是base_modules的一个子集,是重复模块 base_modules = frozenset( { Classify, Conv, ConvTranspose, GhostConv, Bottleneck, GhostBottleneck, SPP, SPPF, C2fPSA, C2PSA, DWConv, Focus, BottleneckCSP, C1, C2, C2f, C3k2, RepNCSPELAN4, ELAN1, ADown, AConv, SPPELAN, C2fAttn, C3, C3TR, C3Ghost, torch.nn.ConvTranspose2d, DWConvTranspose2d, C3x, RepC3, PSA, SCDown, C2fCIB, A2C2f, } ) repeat_modules = frozenset( # modules with 'repeat' arguments { BottleneckCSP, C1, C2, C2f, C3k2, C2fAttn, C3, C3TR, C3Ghost, C3x, RepC3, C2fPSA, C2fCIB, C2PSA, A2C2f, } ) # i是当前层索引 f是当前层输入来源索引 n是重复次数 m是模块名 args是模块参数, 这个参数一般是模块定义参数的第1个起步, 比如conv(1,2,3,4), args就是(2,3,4), 输入通道不需要指定 for i, (f, n, m, args) inenumerate(d["backbone"] + d["head"]): # from, number, module, args m = ( getattr(torch.nn, m[3:]) if"nn."in m elsegetattr(__import__("torchvision").ops, m[16:]) if"torchvision.ops."in m elseglobals()[m] ) # get module for j, a inenumerate(args): ifisinstance(a, str): with contextlib.suppress(ValueError): args[j] = locals()[a] if a inlocals() else ast.literal_eval(a) # 解析参数, 除了常量以外, 还能解析nc这种变量 n = n_ = max(round(n * depth), 1) if n > 1else n # depth gain if m in base_modules: c1, c2 = ch[f], args[0] if c2 != nc: # if c2 not equal to number of classes (i.e. for Classify() output) c2 = make_divisible(min(c2, max_channels) * width, 8) # 除了分类头那里, yolo尽量控制输出通道数为8的倍数 if m is C2fAttn: # set 1) embed channels and 2) num heads args[1] = make_divisible(min(args[1], max_channels // 2) * width, 8) args[2] = int(max(round(min(args[2], max_channels // 2 // 32)) * width, 1) if args[2] > 1else args[2])
args = [c1, c2, *args[1:]] # 重新组装最终的参数列表 if m in repeat_modules: args.insert(2, n) # number of repeats n = 1 if m is C3k2: # for M/L/X sizes legacy = False if scale in"mlx": args[3] = True if m is A2C2f: legacy = False if scale in"lx": # for L/X sizes args.extend((True, 1.2)) if m is C2fCIB: legacy = False elif m is AIFI: args = [ch[f], *args] elif m infrozenset({HGStem, HGBlock}): c1, cm, c2 = ch[f], args[0], args[1] args = [c1, cm, c2, *args[2:]] if m is HGBlock: args.insert(4, n) # number of repeats n = 1 elif m is ResNetLayer: c2 = args[1] if args[3] else args[1] * 4 elif m is torch.nn.BatchNorm2d: args = [ch[f]] elif m is Concat: c2 = sum(ch[x] for x in f) elif m infrozenset( {Detect, WorldDetect, YOLOEDetect, Segment, YOLOESegment, Pose, OBB, ImagePoolingAttn, v10Detect} ): args.append([ch[x] for x in f]) if m is Segment or m is YOLOESegment: args[2] = make_divisible(min(args[2], max_channels) * width, 8) if m in {Detect, YOLOEDetect, Segment, YOLOESegment, Pose, OBB}: m.legacy = legacy elif m is RTDETRDecoder: # special case, channels arg must be passed in index 1 args.insert(1, [ch[x] for x in f]) elif m is CBLinear: c2 = args[0] c1 = ch[f] args = [c1, c2, *args[1:]] elif m is CBFuse: c2 = ch[f[-1]] elif m infrozenset({TorchVision, Index}): c2 = args[0] c1 = ch[f] args = [*args[1:]] else: c2 = ch[f]
m_ = torch.nn.Sequential(*(m(*args) for _ inrange(n))) if n > 1else m(*args) # module t = str(m)[8:-2].replace("__main__.", "") # module type m_.np = sum(x.numel() for x in m_.parameters()) # number params m_.i, m_.f, m_.type = i, f, t # attach index, 'from' index, type if verbose: LOGGER.info(f"{i:>3}{str(f):>20}{n_:>3}{m_.np:10.0f}{t:<45}{str(args):<30}") # print save.extend(x % i for x in ([f] ifisinstance(f, int) else f) if x != -1) # append to savelist layers.append(m_) if i == 0: ch = [] ch.append(c2) return torch.nn.Sequential(*layers), sorted(save)
def_forward_once(self, x, profile=False, visualize=False): """Performs a forward pass on the YOLOv5 model, enabling profiling and feature visualization options.""" y, dt = [], [] # outputs for m in self.model: if m.f != -1: # if not from previous layer x = y[m.f] ifisinstance(m.f, int) else [x if j == -1else y[j] for j in m.f] # from earlier layers, 如果有多层就使用多层数据, 这样下面的时候就必须使用concat, m就必须是concat, 或者在这里就concat if profile: self._profile_one_layer(m, x, dt) x = m(x) # run y.append(x if m.i in self.save elseNone) # save output if visualize: feature_visualization(x, m.type, m.i, save_dir=visualize) return x
def__init__( self, model: Union[str, Path] = "yolov8n.pt", task: str = None, verbose: bool = False, ) -> None: self.callbacks = callbacks.get_default_callbacks() # 回调函数, 在模型关键点地方执行函数 self.predictor = None# reuse predictor 预测器 self.model = None# model object 模型 self.trainer = None# trainer object 训练器 self.ckpt = None# if loaded from *.pt 如果model是.pt文件加载过来,模型的参数就保留在这 self.cfg = None# if loaded from *.yaml 如果是yaml文件加载的模型, yaml文件就保留在这 self.ckpt_path = None self.overrides = {} # overrides for trainer object 额外参数保留的点 self.metrics = None# validation/training metrics 记录指标 self.session = None# HUB session 是否是hub里面的模型 self.task = task # task type 模型是做什么任务的 model = str(model).strip() # 如果是hub里面的模型就下载 # Check if Ultralytics HUB model from https://hub.ultralytics.com if self.is_hub_model(model): # Fetch model from HUB checks.check_requirements("hub-sdk>=0.0.12") session = HUBTrainingSession.create_session(model) model = session.model_file if session.train_args: # training sent from HUB self.session = session
# Check if Triton Server model elif self.is_triton_model(model): self.model_name = self.model = model return
# Load or create new YOLO model if Path(model).suffix in {".yaml", ".yml"}: self._new(model, task=task, verbose=verbose) else: self._load(model, task=task)
from copy import copy from ultralytics.models.yolo.detect.train import DetectionTrainer
classYoloV1Trainer(DetectionTrainer): defget_model(self, cfg=None, weights=None, verbose=True): model = Yolov1Model(cfg, nc=self.data["nc"], verbose=verbose) if weights: model.load(weights=weights) return model