嘉楠K230AI开发板测评7--AI Demo开发框架
[复制链接]
本帖最后由 dfjs 于 2024-11-6 21:01 编辑
嘉楠科K230AI开发板测评7--AI视觉篇
1、AI视觉开发框架
更高级的机器视觉(AI视觉)需要使用KPU。可以简单类别比计算机的GPU(显卡),本质是实现高速的图像数据运算。
KPU是K230内部一个神经网络处理器,它可以在低功耗的情况下实现卷积神经网络计算,实时获取被检测目标的大小、坐标和种类,对人脸或者物体进行检测和分类。K230 KPU支持INT8和INT16, 典型网络下实测推理能力可达K210的13.7倍,MAC利用率超70%。
CanMV官方基于K230专门搭建了配套的AI视觉开发框架,框架结构如下图所示:
这个框架简单来说就是Sensor(摄像头)默认输出两路图像,一路格式为YUV420,直接给到Display显示;另一路格式为RGB888,给到AI部分进行处理。AI主要实现任务的前处理、推理和后处理流程,得到后处理结果后将其绘制在osd image实例上,并送给Display叠加,最后在HDMI、LCD或IDE缓冲区显示识别结果。
这套框架的优势是用户可以直接基于处理结果编程实现自己的功能,同时AI主要实现任务的前处理、推理和后处理流程也是通过Python代码实现,方便用户深入二次开发。充分满足不同用户和开发者的需求。
AI视觉开发框架主要API接口有:
PineLine : 将sensor、display封装成固定接口,用于采集图像、画图以及结果图片显示。
Ai2d : 预处理(Preprocess)相关接口。
AIBase : 模型推理主要接口。
2、相关接口
可在该网址查看:https://developer.canaan-creative.com/k230_canmv/main/zh/example/ai/AI_Demo%E8%AF%B4%E6%98%8E%E6%96%87%E6%A1%A3.html
2.1 PipeLine
将Media部分的代码封装在PipeLine类型中,通过固定的接口实现整个流程操作。
PipeLine类提供的接口包括:
- 初始化参数
rgb888p_size:list类型,预设给到AI部分的图像分辨率;如rgb888p_size=[1920,1080]。
display_size:list类型,显示部分Display的分辨率;如display_size=[1920,1080]。
display_mode:str类型,显示模式,包括”hdmi“和”lcd“;如display_mode=”hdmi“。
debug_mode:int类型,耗时调试模式,如果大于0,打印操作耗时;如debug_mode=0。
- creat(sensor=None,hmirror=None,vfilp=None)
sensor:参数为可选参数,类型为Sensor对象,可自主配置现有CanMV、01Studio和k230d zero开发板实现了自动探测,可以默认使用create()实现。
hmirror:默认为None,当主动设置时为bool类型(True/False),表示是否实现水平方向镜像显示。
vflip: 默认为None,当主动设置时为bool类型(True/False),表示是否实现垂直方向翻转。
- get_frame
返回一帧ulab.numpy.ndarray类型图像数据,分辨率为rgb888p_size,排布为CHW。
- show_image
PipeLine实例中预设一帧OSD图像,该接口将成员变量osd_img显示在屏幕上。
- destroy
销毁PipeLine实例。
示例代码:
from libs.PipeLine import PipeLine, ScopedTiming
from media.media import *
import gc
import sys,os
if __name__ == "__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="hdmi"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 初始化PipeLine,用于图像处理流程
pl = PipeLine(rgb888p_size=[1920,1080], display_size=display_size, display_mode=display_mode)
pl.create() # 创建PipeLine实例
try:
while True:
os.exitpoint() # 检查是否有退出信号
with ScopedTiming("total",1):
img = pl.get_frame() # 获取当前帧数据
print(img.shape)
gc.collect() # 垃圾回收
except Exception as e:
sys.print_exception(e) # 打印异常信息
finally:
pl.destroy() # 销毁PipeLine实例
通过pl.get_frame()接口获取一帧分辨率为rgb888p_size的图像,类型为ulab.numpy.ndarray,排布为CHW。基于上面的代码得到了一帧图像给AI处理,只关注AI推理部分的操作即可。
图像AI开发过程包括:图像预处理、模型推理、输出后处理的过程,整个过程封装在Ai2d类和AIBase类中。
2.2 Ai2d
对于Ai2d类,我们给出了常见的几种预处理方法,包括crop/shift/pad/resize/affine。该类别提供的接口包括:
- 初始化参数
debug_mode:int类型,耗时调试模式,如果大于0,打印操作耗时;如debug_mode=0。
- Set_ai2d_dtype(input_format,output_format,input_type,output_type)
设置ai2d计算过程中的输入输出数据类型,输入输出数据格式。
- Crop(start_x,start_y,width,height)预处理crop函数:
start_x:宽度方向的起始像素,int类型
start_y: 高度方向的起始像素,int类型
width: 宽度方向的crop长度,int类型
height: 高度方向的crop长度,int类型
- Shift(shift_va预处理shift函数:
shift_val:右移的比特数,int类型
- Pad(paddings,pad_mode,pad_val)预处理padding函数:
paddings:各个维度的padding, size=8,分别表示dim0到dim4的前后padding的个数,其中dim0/dim1固定配置{0, 0},list类型
pad_mode:只支持pad constant,配置0即可,int类型
pad_val:每个channel的padding value,list类型
- Resize(interp_method,interp_mode)预处理resize函数:
interp_method:resize插值方法,ai2d_interp_method类型
interp_mode:resize模式,ai2d_interp_mode类型
- Affine(interp_method,crop_round,bound_ind,bound_val,bound_smooth,M)预处理affine函数:
interp_method:Affine采用的插值方法,ai2d_interp_method类型
cord_round:整数边界0或者1,uint32_t类型
bound_ind:边界像素模式0或者1,uint32_t类型
bound_val:边界填充值,uint32_t类型
bound_smooth:边界平滑0或者1,uint32_t类型
M:仿射变换矩阵对应的vector,仿射变换为Y=[a_0, a_1; a_2, a_3] \cdot X + [b_0, b_1] $, 则 M=[a_0,a_1,b_0,a_2,a_3,b_1 ],list类型
- Build(ai2d_input_shape,ai2d_output_shape):ai2d构造函数,前面配置的预处理方法起作用。
- Run使用ai2d完成预处理
注意:
(1) Affine和Resize功能是互斥的,不能同时开启; (2) Shift功能的输入格式只能是Raw16; (3) Pad value是按通道配置的,对应的list元素个数要与channel数相等; (4) 当配置了多个功能时,执行顺序是Crop->Shift->Resize/Affine->Pad, 配置参数时注意要匹配;如果不符合该顺序,需要初始化多个Ai2d实例实现预处理过程;
示例代码如下:
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AI2D import Ai2d
from media.media import *
import nncase_runtime as nn
import gc
import sys,os
if __name__ == "__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="hdmi"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 初始化PipeLine,用于图像处理流程
pl = PipeLine(rgb888p_size=[512,512], display_size=display_size, display_mode=display_mode)
pl.create() # 创建PipeLine实例
my_ai2d=Ai2d(debug_mode=0) #初始化Ai2d实例
# 配置resize预处理方法
my_ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
# 构建预处理过程
my_ai2d.build([1,3,512,512],[1,3,640,640])
try:
while True:
os.exitpoint() # 检查是否有退出信号
with ScopedTiming("total",1):
img = pl.get_frame() # 获取当前帧数据
print(img.shape) # 原图shape为[1,3,512,512]
ai2d_output_tensor=my_ai2d.run(img) # 执行resize预处理
ai2d_output_np=ai2d_output_tensor.to_numpy() # 类型转换
print(ai2d_output_np.shape) # 预处理后的shape为[1,3,640,640]
gc.collect() # 垃圾回收
except Exception as e:
sys.print_exception(e) # 打印异常信息
finally:
pl.destroy() # 销毁PipeLine实例
2.3 AIBase
AIBase部分封装了实现模型推理的主要接口,也是进行AI开发主要关注的部分。用户需要按照自己demo的要求实现前处理和后处理部分。
AIBase提供的接口包括:
- 初始化参数
kmodel_path:str类型,kmodel路径,用于初始化kpu对象并加载kmodel;
model_input_size:list类型,可选,模型输入分辨率,在单输入时起作用,格式为[width,height],如:model_input_size=[512,512];
rgb888p_size:list类型,可选,AI得到的图像的分辨率,在单输入时起作用,格式为[width,height],如:rgb888p_size=[640,640];
debug_mode:int类型,耗时调试模式,如果大于0,打印操作耗时;如debug_mode=0。
- get_kmodel_inputs_num():返回当前模型的输入个数
- get_kmodel_outputs_num():返回当前模型的输出个数
- preprocess(input_np):使用ai2d对input_np做预处理,,如果不使用单个ai2d实例做预处理,需要在子类重写该函数。
- inference(tensors):对预处理后得到的kmodel的输入(类型为tensor)进行推理,得到多个输出(类型为ulab.numpy.ndarray)
- postprocess(results):模型输出后处理函数,该函数需要用户在任务子类重写,因为不同AI任务的后处理是不同的
- run(input_np):模型的前处理、推理、后处理流程,适用于单ai2d实例能解决的前处理的AI任务,其他任务需要用户在子类重写。
- deinit():AIBase销毁函数。
2.4 ScopedTiming
ScopedTiming 类在PipeLine.py模块内,是一个用来测量代码块执行时间的上下文管理器。上下文管理器通过定义包含 __enter__ 和 __exit__ 方法的类来创建。当在 with 语句中使用该类的实例时,__enter__ 在进入 with 块时被调用,__exit__ 在离开时被调用。
示例代码:
from libs.PipeLine import ScopedTiming
def test_time():
with ScopedTiming("test",1):
#####代码#####
# ...
##############
3、应用方法和示例
用户可根据具体的AI场景自写任务类继承AIBase,可以将任务分为如下四类:单模型任务、多模型任务,自定义预处理任务、无预处理任务。不同任务需要编写不同的代码实现,具体如下图所示:
3.1 单模型任务
该任务只有一个模型,只需要关注该模型的前处理、推理、后处理过程,此类任务的前处理使用Ai2d实现,可能使用一个Ai2d实例,也可能使用多个Ai2d实例,后处理基于场景自定义。
编写自定义任务类,主要关注任务类的config_preprocess、postprocess、以及该任务需要的其他方法如:draw_result等。
如果该任务包含多个Ai2d实例,则需要重写preprocess,按照预处理的顺序设置预处理阶段的计算过程。
单模型任务的伪代码结构如下:
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys
# 自定义AI任务类,继承自AIBase基类
class MyAIApp(AIBase):
def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
# 调用基类的构造函数
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)
# 模型文件路径
self.kmodel_path = kmodel_path
# 模型输入分辨率
self.model_input_size = model_input_size
# sensor给到AI的图像分辨率,并对宽度进行16的对齐
self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]
# 显示分辨率,并对宽度进行16的对齐
self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]
# 是否开启调试模式
self.debug_mode = debug_mode
# 实例化Ai2d,用于实现模型预处理
self.ai2d = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 配置预处理操作,这里使用了resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self, input_image_size=None):
with ScopedTiming("set preprocess config", self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size
# 配置resize预处理方法
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
# 构建预处理流程
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
def postprocess(self, results):
with ScopedTiming("postprocess", self.debug_mode > 0):
pass
# 绘制结果到画面上,需要根据任务自己写
def draw_result(self, pl, dets):
with ScopedTiming("display_draw", self.debug_mode > 0):
pass
if __name__ == "__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="hdmi"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 设置模型路径,这里要替换成当前任务模型
kmodel_path = "example_test.kmodel"
rgb888p_size = [1920, 1080]
###### 其它参数########
...
######################
# 初始化PipeLine,用于图像处理流程
pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
pl.create() # 创建PipeLine实例
# 初始化自定义AI任务实例
my_ai = MyAIApp(kmodel_path, model_input_size=[320, 320],rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
my_ai.config_preprocess() # 配置预处理
try:
while True:
os.exitpoint() # 检查是否有退出信号
with ScopedTiming("total",1):
img = pl.get_frame() # 获取当前帧数据
res = my_ai.run(img) # 推理当前帧
my_ai.draw_result(pl, res) # 绘制结果
pl.show_image() # 显示结果
gc.collect() # 垃圾回收
except Exception as e:
sys.print_exception(e) # 打印异常信息
finally:
my_ai.deinit() # 反初始化
pl.destroy() # 销毁PipeLine实例
多个Ai2d实例时的伪代码如下:
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys
# 自定义AI任务类,继承自AIBase基类
class MyAIApp(AIBase):
def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
# 调用基类的构造函数
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)
# 模型文件路径
self.kmodel_path = kmodel_path
# 模型输入分辨率
self.model_input_size = model_input_size
# sensor给到AI的图像分辨率,并对宽度进行16的对齐
self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]
# 显示分辨率,并对宽度进行16的对齐
self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]
# 是否开启调试模式
self.debug_mode = debug_mode
# 实例化Ai2d,用于实现模型预处理
self.ai2d_resize = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d_resize.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 实例化Ai2d,用于实现模型预处理
self.ai2d_resize = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d_resize.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 实例化Ai2d,用于实现模型预处理
self.ai2d_crop = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d_crop.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 配置预处理操作,这里使用了resize和crop,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self, input_image_size=None):
with ScopedTiming("set preprocess config", self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size
# 配置resize预处理方法
self.ai2d_resize.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
# 构建预处理流程
self.ai2d_resize.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,640,640])
# 配置crop预处理方法
self.ai2d_crop.crop(0,0,320,320)
# 构建预处理流程
self.ai2d_crop.build([1,3,640,640],[1,3,320,320])
# 假设该任务需要crop和resize预处理,顺序是先resize再crop,该顺序不符合ai2d的处理顺序,因此需要设置两个Ai2d实例分别处理
def preprocess(self,input_np):
resize_tensor=self.ai2d_resize.run(input_np)
resize_np=resize_tensor.to_numpy()
crop_tensor=self.ai2d_crop.run(resize_np)
return [crop_tensor]
# 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
def postprocess(self, results):
with ScopedTiming("postprocess", self.debug_mode > 0):
pass
# 绘制结果到画面上,需要根据任务自己写
def draw_result(self, pl, dets):
with ScopedTiming("display_draw", self.debug_mode > 0):
pass
# 重写deinit,释放多个ai2d资源
def deinit(self):
with ScopedTiming("deinit",self.debug_mode > 0):
del self.ai2d_resize
del self.ai2d_crop
super().deinit()
if __name__ == "__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="hdmi"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 设置模型路径,这里要替换成当前任务模型
kmodel_path = "example_test.kmodel"
rgb888p_size = [1920, 1080]
###### 其它参数########
...
######################
# 初始化PipeLine,用于图像处理流程
pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
pl.create() # 创建PipeLine实例
# 初始化自定义AI任务实例
my_ai = MyAIApp(kmodel_path, model_input_size=[320, 320],rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
my_ai.config_preprocess() # 配置预处理
try:
while True:
os.exitpoint() # 检查是否有退出信号
with ScopedTiming("total",1):
img = pl.get_frame() # 获取当前帧数据
res = my_ai.run(img) # 推理当前帧
my_ai.draw_result(pl, res) # 绘制结果
pl.show_image() # 显示结果
gc.collect() # 垃圾回收
except Exception as e:
sys.print_exception(e) # 打印异常信息
finally:
my_ai.deinit() # 反初始化
pl.destroy() # 销毁PipeLine实例
3.2 自定义预处理任务
该任务只有一个模型,只需要关注该模型的前处理、推理、后处理过程,此类任务的前处理不使用Ai2d实现,可以使用ulab.numpy自定义,后处理基于场景自定义。
编写自定义任务类,主要关注任务类的preprocess、postprocess、以及该任务需要的其他方法如:draw_result等
对于需要重写前处理(不使用提供的ai2d类,自己手动写预处理)的AI任务伪代码如下:
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys
# 自定义AI任务类,继承自AIBase基类
class MyAIApp(AIBase):
def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
# 调用基类的构造函数
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)
# 模型文件路径
self.kmodel_path = kmodel_path
# 模型输入分辨率
self.model_input_size = model_input_size
# sensor给到AI的图像分辨率,并对宽度进行16的对齐
self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]
# 显示分辨率,并对宽度进行16的对齐
self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]
# 是否开启调试模式
self.debug_mode = debug_mode
# 实例化Ai2d,用于实现模型预处理
self.ai2d = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 对于不使用ai2d完成预处理的AI任务,使用封装的接口或者ulab.numpy实现预处理,需要在子类中重写该函数
def preprocess(self,input_np):
#############
#注意自定义预处理过程
#############
return [tensor]
# 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
def postprocess(self, results):
with ScopedTiming("postprocess", self.debug_mode > 0):
pass
# 绘制结果到画面上,需要根据任务自己写
def draw_result(self, pl, dets):
with ScopedTiming("display_draw", self.debug_mode > 0):
pass
if __name__ == "__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="hdmi"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 设置模型路径,这里要替换成当前任务模型
kmodel_path = "example_test.kmodel"
rgb888p_size = [1920, 1080]
###### 其它参数########
...
######################
# 初始化PipeLine,用于图像处理流程
pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
pl.create() # 创建PipeLine实例
# 初始化自定义AI任务实例
my_ai = MyAIApp(kmodel_path, model_input_size=[320, 320],rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
my_ai.config_preprocess() # 配置预处理
try:
while True:
os.exitpoint() # 检查是否有退出信号
with ScopedTiming("total",1):
img = pl.get_frame() # 获取当前帧数据
res = my_ai.run(img) # 推理当前帧
my_ai.draw_result(pl, res) # 绘制结果
pl.show_image() # 显示结果
gc.collect() # 垃圾回收
except Exception as e:
sys.print_exception(e) # 打印异常信息
finally:
my_ai.deinit() # 反初始化
pl.destroy() # 销毁PipeLine实例
3.3 无预处理任务
该任务只有一个模型且不需要预处理,只需要关注该模型的推理和后处理过程,此类任务一般作为多模型任务的一部分,直接对前一个模型的输出做为输入推理,后处理基于需求自定义。
编写自定义任务类,主要关注任务类的run(模型推理的整个过程,包括preprocess、inference、postprocess中的全部或某一些步骤)、postprocess、以及该任务需要的其他方法如:draw_results等
对于不需要预处理(直接输入推理)的AI任务伪代码如下:
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys
# 自定义AI任务类,继承自AIBase基类
class MyAIApp(AIBase):
def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
# 调用基类的构造函数
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)
# 模型文件路径
self.kmodel_path = kmodel_path
# 模型输入分辨率
self.model_input_size = model_input_size
# sensor给到AI的图像分辨率,并对宽度进行16的对齐
self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]
# 显示分辨率,并对宽度进行16的对齐
self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]
# 是否开启调试模式
self.debug_mode = debug_mode
# 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
def postprocess(self, results):
with ScopedTiming("postprocess", self.debug_mode > 0):
pass
# 对于用预处理的AI任务,需要在子类中重写该函数
def run(self,inputs_np):
# 先将ulab.numpy.ndarray列表转换成tensor列表
tensors=[]
for input_np in inputs_np:
tensors.append(nn.from_numpy(input_np))
# 调用AIBase内的inference函数进行模型推理
results=self.inference(tensors)
# 调用当前子类的postprocess方法进行自定义后处理
outputs=self.postprocess(results)
return outputs
# 绘制结果到画面上,需要根据任务自己写
def draw_result(self, pl, dets):
with ScopedTiming("display_draw", self.debug_mode > 0):
pass
if __name__ == "__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="hdmi"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
# 设置模型路径,这里要替换成当前任务模型
kmodel_path = "example_test.kmodel"
rgb888p_size = [1920, 1080]
###### 其它参数########
...
######################
# 初始化PipeLine,用于图像处理流程
pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
pl.create() # 创建PipeLine实例
# 初始化自定义AI任务实例
my_ai = MyAIApp(kmodel_path, model_input_size=[320, 320],rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
my_ai.config_preprocess() # 配置预处理
try:
while True:
os.exitpoint() # 检查是否有退出信号
with ScopedTiming("total",1):
img = pl.get_frame() # 获取当前帧数据
res = my_ai.run(img) # 推理当前帧
my_ai.draw_result(pl, res) # 绘制结果
pl.show_image() # 显示结果
gc.collect() # 垃圾回收
except Exception as e:
sys.print_exception(e) # 打印异常信息
finally:
my_ai.deinit() # 反初始化
pl.destroy() # 销毁PipeLine实例
3.4 多模型任务
该任务包含多个模型,可能是串联,也可能是其他组合方式。对于每个模型基本上属于前三种模型中的一种,最后通过一个完整的任务类将上述模型子任务统一起来。
编写多个子模型任务类,不同子模型任务参照前三种任务定义。不同任务关注不同的方法。
编写多模型任务类,将子模型任务类统一起来实现整个场景。
以双模型串联推理为例,给出的伪代码如下:
from libs.PipeLine import PipeLine, ScopedTiming
from libs.AIBase import AIBase
from libs.AI2D import Ai2d
import os
from media.media import *
import nncase_runtime as nn
import ulab.numpy as np
import image
import gc
import sys
# 自定义AI任务类,继承自AIBase基类
class MyAIApp_1(AIBase):
def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
# 调用基类的构造函数
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)
# 模型文件路径
self.kmodel_path = kmodel_path
# 模型输入分辨率
self.model_input_size = model_input_size
# sensor给到AI的图像分辨率,并对宽度进行16的对齐
self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]
# 显示分辨率,并对宽度进行16的对齐
self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]
# 是否开启调试模式
self.debug_mode = debug_mode
# 实例化Ai2d,用于实现模型预处理
self.ai2d = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 配置预处理操作,这里使用了resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self, input_image_size=None):
with ScopedTiming("set preprocess config", self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size
# 配置resize预处理方法
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
# 构建预处理流程
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
def postprocess(self, results):
with ScopedTiming("postprocess", self.debug_mode > 0):
pass
# 自定义AI任务类,继承自AIBase基类
class MyAIApp_2(AIBase):
def __init__(self, kmodel_path, model_input_size, rgb888p_size=[224,224], display_size=[1920,1080], debug_mode=0):
# 调用基类的构造函数
super().__init__(kmodel_path, model_input_size, rgb888p_size, debug_mode)
# 模型文件路径
self.kmodel_path = kmodel_path
# 模型输入分辨率
self.model_input_size = model_input_size
# sensor给到AI的图像分辨率,并对宽度进行16的对齐
self.rgb888p_size = [ALIGN_UP(rgb888p_size[0], 16), rgb888p_size[1]]
# 显示分辨率,并对宽度进行16的对齐
self.display_size = [ALIGN_UP(display_size[0], 16), display_size[1]]
# 是否开启调试模式
self.debug_mode = debug_mode
# 实例化Ai2d,用于实现模型预处理
self.ai2d = Ai2d(debug_mode)
# 设置Ai2d的输入输出格式和类型
self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT, nn.ai2d_format.NCHW_FMT, np.uint8, np.uint8)
# 配置预处理操作,这里使用了resize,Ai2d支持crop/shift/pad/resize/affine,具体代码请打开/sdcard/app/libs/AI2D.py查看
def config_preprocess(self, input_image_size=None):
with ScopedTiming("set preprocess config", self.debug_mode > 0):
# 初始化ai2d预处理配置,默认为sensor给到AI的尺寸,可以通过设置input_image_size自行修改输入尺寸
ai2d_input_size = input_image_size if input_image_size else self.rgb888p_size
# 配置resize预处理方法
self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
# 构建预处理流程
self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])
# 自定义当前任务的后处理,results是模型输出array列表,需要根据实际任务重写
def postprocess(self, results):
with ScopedTiming("postprocess", self.debug_mode > 0):
pass
class MyApp:
def __init__(kmodel1_path,kmodel2_path,kmodel1_input_size,kmodel2_input_size,rgb888p_size,display_size,debug_mode):
# 创建两个模型推理的实例
self.app_1=MyApp_1(kmodel1_path,kmodel1_input_size,rgb888p_size,display_size,debug_mode)
self.app_2=MyApp_2(kmodel2_path,kmodel2_input_size,rgb888p_size,display_size,debug_mode)
self.app_1.config_preprocess()
# 编写run函数,具体代码根据AI任务的需求编写,此处只是给出一个示例
def run(self,input_np):
outputs_1=self.app_1.run(input_np)
outputs_2=[]
for out in outputs_1:
self.app_2.config_preprocess(out)
out_2=self.app_2.run(input_np)
outputs_2.append(out_2)
return outputs_1,outputs_2
# 绘制
def draw_result(self,pl,outputs_1,outputs_2):
pass
######其他函数########
# 省略
####################
if __name__ == "__main__":
# 显示模式,默认"hdmi",可以选择"hdmi"和"lcd"
display_mode="hdmi"
if display_mode=="hdmi":
display_size=[1920,1080]
else:
display_size=[800,480]
rgb888p_size = [1920, 1080]
# 设置模型路径,这里要替换成当前任务模型
kmodel1_path = "test_kmodel1.kmodel"
kmdoel1_input_size=[320,320]
kmodel2_path = "test_kmodel2.kmodel"
kmodel2_input_size=[48,48]
###### 其它参数########
# 省略
######################
# 初始化PipeLine,用于图像处理流程
pl = PipeLine(rgb888p_size=rgb888p_size, display_size=display_size, display_mode=display_mode)
pl.create() # 创建PipeLine实例
# 初始化自定义AI任务实例
my_ai = MyApp(kmodel1_path,kmodel2_path, kmodel1_input_size,kmodel2_input_size,rgb888p_size=rgb888p_size, display_size=display_size, debug_mode=0)
my_ai.config_preprocess() # 配置预处理
try:
while True:
os.exitpoint() # 检查是否有退出信号
with ScopedTiming("total",1):
img = pl.get_frame() # 获取当前帧数据
outputs_1,outputs_2 = my_ai.run(img) # 推理当前帧
my_ai.draw_result(pl, outputs_1,outputs_2) # 绘制结果
pl.show_image() # 显示结果
gc.collect() # 垃圾回收
except Exception as e:
sys.print_exception(e) # 打印异常信息
finally:
my_ai.app_1.deinit() # 反初始化
my_ai.app_2.deinit()
pl.destroy() # 销毁PipeLine实例
|