GPU、CPU解耦,充分利用并发加速图形特征计算

张开发
2026/4/7 18:55:16 15 分钟阅读

分享文章

GPU、CPU解耦,充分利用并发加速图形特征计算
1. 为何要解耦GPU适合进行并行运算而且GPU模型在进行语义分割任务和一些预测标签任务的时候如果一个图片计算量较大的时候GPU并行化程度已经非常高了所以GPU在计算大图片的时候是瓶颈。但是我的图片主要是小图片大数量一张一张进行GPU运算的时候GPU性能没有榨干显存和计算量都没有吃满所以很简单的一个思路就是使用batch把图片增加一个维度进行拼接这样就一批一批的输出图片但是这样GPU每运算成功一个批次就会有batch_size个图片需要CPU后处理。这个后处理也比较复杂而之前写的图片后处理的代码也是一张一张的处理所以这部分的后处理也需要进行并行。两个并行也就需要GPU和CPU进行完全解耦充分利用GPU和CPU两部分的计算能力。2. 解耦实现话不多说直接上代码2.1. 引入必要的包和库from model.DepthAnalysis import DepthModel from PIL import Image import numpy as np from sklearn.cluster import KMeans from model.CannyAnalysis import CannyModel import cv2 import threading from queue import Queue from PIL import Image import os from index_database import IndexDatabase from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker depth_model DepthModel() canny_model CannyModel()引入的包有点乱其中model模块是我自己写的。我使用的是SQL库进行数据的存储这里还有个坑不能用yield进行数据的读取会造成数据库超时这个之后再说。引入库之后就到了第二部定义一些计算函数。2.2. CPU计算函数的定义可以看到这里的运算都是CPU密集型运算主要计算量在于卷积运算还有一些图片的numpy矩阵运算还有聚类运算使用numpy和cv库可以极大加快运算速度。def get_congestion(feature_map, sigma7, ksize71): feature_map feature_map.astype(np.float32) H, W feature_map.shape congestion_map cv2.GaussianBlur(feature_map, (ksize, ksize), sigma, borderTypecv2.BORDER_REPLICATE) congestion_map np.reshape(congestion_map, (H, W)) return congestion_map, np.mean(congestion_map) def depth_cluster(depth_analysis, cluster_num4, sample_ratio0.05): depth_image depth_model.depth_analysis2depth_image(depth_analysis) depth_image_array np.array(depth_image) h, w depth_image_array.shape X depth_image_array.reshape(-1, 1) n len(X) idx np.random.choice(n, int(n * sample_ratio), replaceFalse) X_sample X[idx] kmeans KMeans(n_clusterscluster_number, n_init10, random_state0) kmeans.fit(X_sample) labels kmeans.predict(X) label_map labels.reshape(h, w) return label_map def get_depth_feature(depth_analysis, cluster_number4, sample_ratio0.05): label_map depth_cluster(depth_analysisdepth_analysis, cluster_numbercluster_number, sample_ratiosample_ratio) # 这里也有坑如果没有这个类型转换为浮点型在计算的时候如果是uint8数据类型会有数据溢出的问题 label_map label_map.astype(np.float32) labels_map_show (label_map - np.min(label_map)) * 255 / (np.max(label_map) - np.min(label_map)) labels_map_show Image.fromarray(labels_map_show.astype(np.uint8)) canny_img canny_model.get_canny_img(labels_map_show) depth_feature_map np.array(canny_img) return depth_feature_map def cpu_postprocess(image_id, depth_analysis): depth_feature_map get_depth_feature(depth_analysisdepth_analysis) _, depth_score get_congestion(feature_mapdepth_feature_map) return image_id, depth_score2.3. 数据库函数接下来进入主函数层面主函数里面包括的第一部分数据加载函数以及一些数据写入的函数。def load_tasks_from_db(): session Session() for database in session.query(IndexDatabase).yield_per(100): for position in [xm, xp, zm, zp]: yield { id: database.id, position: position, panorama_sid: database.panorama_sid } def batch_generate(task_iter,batch_sizebatch_size): batch [] for task in task_iter: batch.append(task) if len(batch) batch_size: yield batch batch [] if batch: yield batch def write_to_db(image_id, image_position, depth_score): session Session() database_item session.query(IndexDatabase).filter(IndexDatabase.id image_id).first() try: if image_position xm: database_item.xm_depth_index depth_score elif image_position xp: database_item.xp_depth_index depth_score elif image_position zm: database_item.zm_depth_index depth_score elif image_position zp: database_item.zp_depth_index depth_score session.commit() except Exception as e: print(e) session.rollback() finally: session.close()2.4. CPU、GPU主运行解耦实现***这部分是CPU运行部分queue是核心的GPU和CPU信息交互的关键GPU每次运算完成一个batch就会推到task_queue中而每次cup都并行的从task_queue中拿任务每拿走一个就标记已完成然后写入数据库以下的代码写的就是CPU从queue拿任务的代码。task_queue Queue(maxsize36) # 限流防止内存爆 NUM_WORKERS 8 def worker(): while True: item task_queue.get() if item is None: task_queue.task_done() break image_id, image_position, depth item try: image_id, depth_score cpu_postprocess(image_id, depth) write_to_db(image_id, image_position, depth_score) except Exception as e: print(fError processing {image_id}: {e}) task_queue.task_done() threads [] for _ in range(NUM_WORKERS): t threading.Thread(targetworker, daemonTrue) t.start() threads.append(t)下面代码是GPU不断放入queue_task任务。batch_count 0 for batch in batch_generator(load_tasks_from_db(), batch_sizebatch_size): batch_count 1 if batch_count % 20 0: elapsed time.time() - start_time processed_tasks batch_count * batch_size progress processed_tasks / total_tasks speed processed_tasks / elapsed # tasks/sec remaining_tasks total_tasks - processed_tasks eta remaining_tasks / speed if speed 0 else 0 print( f[Progress] f{processed_tasks}/{total_tasks} f({progress * 100:.2f}%) | fSpeed: {speed:.2f} img/s | fETA: {eta / 3600:.2f} hour| fQueue: {task_queue.qsize()} ) images [] ids [] positions [] for task in batch: filepath f./project/{task[panorama_sid]}_{task[position]}.jpg img Image.open(filepath).convert(RGB) images.append(img) ids.append(task[id]) positions.append(task[position]) depths depth_model.get_depth_analysis_batch(images) for i in range(len(depths)): task_queue.put((ids[i], positions[i], depths[i]))最后这里等待所有任务处理结束。# 等所有任务处理完 task_queue.join() # 发停止信号 for _ in range(NUM_WORKERS): task_queue.put(None) for t in threads: t.join()至此所有的程序运行完成实现了完全的CPU和GPU解耦并发处理图片并写入数据库。而这样运行一段时间后报错Traceback (most recent call last): File D:\software\Anaconda\envs\pytorch\lib\site-packages\pymysql\connections.py, line 789, in _read_bytes data self._rfile.read(num_bytes) File D:\software\Anaconda\envs\pytorch\lib\socket.py, line 717, in readinto return self._sock.recv_into(b) ConnectionResetError: [WinError 10054] 远程主机强迫关闭了一个现有的连接。聪明你知道这是怎么回事吗如果知道请在评论区告诉我哦。

更多文章