Hyperf对接报表 如何在 HyperF 中为帆布报表设计一套插件化的数据处理管道(Pipeline),使业务方可以在不修改核心代码的前提下,自定义报 表数据的清洗、聚合和格式化逻辑?

张开发
2026/4/16 16:02:45 15 分钟阅读

分享文章

Hyperf对接报表 如何在 HyperF 中为帆布报表设计一套插件化的数据处理管道(Pipeline),使业务方可以在不修改核心代码的前提下,自定义报 表数据的清洗、聚合和格式化逻辑?
HyperF 报表插件化数据处理管道)选型 hyperf/pipeline 原生管道 hyperf/di 容器自动注入 Redis 插件注册表 --- 架构总览 原始数据 └─ Pipeline ├─ Stage1: CleanPipe# 清洗去空值、脱敏├─ Stage2: AggregatePipe# 聚合分组求和、均值├─ Stage3: FormatPipe# 格式化日期、金额└─ Stage N:[业务自定义]# 插件扩展点零侵入↓ 渲染引擎 → XLSX --- 一、管道契约?php // app/Pipeline/Contract/PipeInterface.php namespace App\Pipeline\Contract;interface PipeInterface{/** * param arrayobject$rows当前批次数据 * param callable$next下一个管道节点 * param PipeContext$ctx管道上下文模板配置、租户信息 */ publicfunctionhandle(array$rows, callable$next, PipeContext$ctx): array;/** 管道元数据用于注册中心展示 */ publicfunctiondescriptor(): PipeDescriptor;}?php // app/Pipeline/Contract/PipeContext.php namespace App\Pipeline\Contract;final class PipeContext{publicfunction__construct(publicreadonlyint$templateId, publicreadonlyint$tenantId, publicreadonlyarray$config, // 模板级插件配置 public array$meta[], // 管道间共享状态聚合中间结果等){}}?php // app/Pipeline/Contract/PipeDescriptor.php namespace App\Pipeline\Contract;final class PipeDescriptor{publicfunction__construct(publicreadonlystring$name, // 唯一标识 publicreadonlystring$label, // 展示名 publicreadonlystring$category, // clean / aggregate /format/ custom publicreadonlyarray$configSchema[], // JSON Schema前端动态渲染配置表单){}}--- 二、内置管道实现?php // app/Pipeline/Builtin/CleanPipe.php namespace App\Pipeline\Builtin;use App\Pipeline\Contract\{PipeInterface, PipeContext, PipeDescriptor};class CleanPipe implements PipeInterface{publicfunctionhandle(array$rows, callable$next, PipeContext$ctx): array{$rules$ctx-config[clean]??[];$rowsarray_map(function($row)use($rules){$row(array)$row;// 去除空值字段if($rules[remove_null]??false){$rowarray_filter($row, fn($v)$v!null$v!);}// 字段脱敏 foreach($rules[mask_fields]??[]as$field$pattern){isset($row[$field])$row[$field]preg_replace($pattern,***,(string)$row[$field]);}// 类型强转 foreach($rules[cast]??[]as$field$type){isset($row[$field])$row[$field]match($type){int(int)$row[$field],float(float)$row[$field],string(string)$row[$field],bool(bool)$row[$field], default$row[$field],};}return(object)$row;},$rows);return$next($rows);}publicfunctiondescriptor(): PipeDescriptor{returnnew PipeDescriptor(clean,数据清洗,clean,[remove_null[typeboolean,label移除空值],mask_fields[typemap,label脱敏字段],cast[typemap,label类型转换],]);}}?php // app/Pipeline/Builtin/AggregatePipe.php namespace App\Pipeline\Builtin;use App\Pipeline\Contract\{PipeInterface, PipeContext, PipeDescriptor};class AggregatePipe implements PipeInterface{publicfunctionhandle(array$rows, callable$next, PipeContext$ctx): array{$rules$ctx-config[aggregate]??[];if(!$rules)return$next($rows);$groupBy$rules[group_by]?? null;$aggs$rules[aggs]??[];//[fieldsum|avg|count|max|min]if(!$groupBy)return$next($rows);// 流式聚合利用 meta 跨批次累积 foreach($rowsas$row){$row(array)$row;$key$row[$groupBy]??__all__;$state$ctx-meta[agg][$key];foreach($aggsas$field$fn){$val(float)($row[$field]??0);match($fn){sum$state[$field][val]($state[$field][val]??0)$val,count$state[$field][val]($state[$field][val]??0)1,max$state[$field][val]max($state[$field][val]?? PHP_INT_MIN,$val),min$state[$field][val]min($state[$field][val]?? PHP_INT_MAX,$val),avg[$state[$field][sum]($state[$field][sum]??0)$val,$state[$field][count]($state[$field][count]??0)1,$state[$field][val]$state[$field][sum]/$state[$field][count],], defaultnull,};}}// 非最后批次直接透传最后批次由 FinalizeAggPipe 输出汇总行return$next($rows);}publicfunctiondescriptor(): PipeDescriptor{returnnew PipeDescriptor(aggregate,数据聚合,aggregate,[group_by[typestring,label分组字段],aggs[typemap,label聚合函数 (sum/avg/count/max/min)],]);}}?php // app/Pipeline/Builtin/FormatPipe.php namespace App\Pipeline\Builtin;use App\Pipeline\Contract\{PipeInterface, PipeContext, PipeDescriptor};class FormatPipe implements PipeInterface{publicfunctionhandle(array$rows, callable$next, PipeContext$ctx): array{$rules$ctx-config[format]??[];$rowsarray_map(function($row)use($rules){$row(array)$row;foreach($rulesas$field$fmt){if(!isset($row[$field]))continue;$row[$field]match($fmt[type]){datedate($fmt[pattern]??Y-m-d, strtotime($row[$field])),money¥.number_format($row[$field]/100,2),percentround($row[$field]*100,2).%,enum($fmt[map][$row[$field]]??$row[$field]),truncatemb_substr($row[$field],0,$fmt[length]??50), default$row[$field],};}return(object)$row;},$rows);return$next($rows);}publicfunctiondescriptor(): PipeDescriptor{returnnew PipeDescriptor(format,数据格式化,format,[fields[typemap,label字段格式规则],]);}}--- 三、插件注册表 — 零侵入扩展点?php // app/Pipeline/PipeRegistry.php namespace App\Pipeline;use App\Pipeline\Contract\PipeInterface;use Hyperf\DbConnection\Db;class PipeRegistry{/** var arraystring, class-stringPipeInterface*/ private array$pipes[];publicfunctionregister(string$name, string$class): void{if(!is_a($class, PipeInterface::class,true)){throw new\InvalidArgumentException({$class} 必须实现 PipeInterface);}$this-pipes[$name]$class;}publicfunctionget(string$name): PipeInterface{$class$this-pipes[$name]?? throw new\RuntimeException(未注册的管道: {$name});returnmake($class);// 容器解析支持依赖注入}/** 按模板配置构建有序管道列表 */ publicfunctionbuildForTemplate(int$templateId): array{$keypipe_chain:{$templateId};if($cachedredis()-get($key)){returnarray_map(fn($n)$this-get($n), unserialize($cached));}$namesDb::table(template_pipes)-where(template_id,$templateId)-where(enabled,1)-orderBy(sort_order)-pluck(pipe_name)-all();redis()-setex($key,300, serialize($names));returnarray_map(fn($n)$this-get($n),$names);}publicfunctionall(): array{returnarray_map(fn($class)make($class)-descriptor(),$this-pipes);}}?php // config/autoload/dependencies.php — 插件注册业务方只需在此追加return[\App\Pipeline\PipeRegistry::classfunction(){$rnew\App\Pipeline\PipeRegistry();// 内置管道$r-register(clean,\App\Pipeline\Builtin\CleanPipe::class);$r-register(aggregate,\App\Pipeline\Builtin\AggregatePipe::class);$r-register(format,\App\Pipeline\Builtin\FormatPipe::class);// 业务方自定义插件 — 零侵入只加这一行$r-register(tax_calc,\App\Plugin\Finance\TaxCalcPipe::class);$r-register(risk_flag,\App\Plugin\Risk\RiskFlagPipe::class);return$r;},];--- 四、业务方自定义插件示例?php // app/Plugin/Finance/TaxCalcPipe.php — 财务团队自行维护不动核心代码 namespace App\Plugin\Finance;use App\Pipeline\Contract\{PipeInterface, PipeContext, PipeDescriptor};class TaxCalcPipe implements PipeInterface{publicfunctionhandle(array$rows, callable$next, PipeContext$ctx): array{$rate$ctx-config[tax_calc][rate]??0.13;$rowsarray_map(function($row)use($rate){$row(array)$row;if(isset($row[amount])){$row[tax]bcmul((string)$row[amount],(string)$rate,2);$row[amount_inc]bcadd((string)$row[amount],$row[tax],2);} return(object)$row;},$rows);return $next($rows);} public function descriptor():PipeDescriptor { return new PipeDescriptor(tax_calc,税额计算,custom,[ rate[typenumber,label税率,default0.13],]);} }---五、管道执行引擎?php//app/Pipeline/PipelineRunner.php namespace App\Pipeline;use App\Pipeline\Contract\{PipeContext,PipeInterface};class PipelineRunner { public function __construct(private readonly PipeRegistry $registry){} public function run(array $rows,int $templateId,array $config,int $tenantId):array { $pipes$this-registry-buildForTemplate($templateId);$ctxnew PipeContext($templateId,$tenantId,$config);return $this-buildChain($pipes,$ctx)($rows);}/**递归构建责任链*/private function buildChain(array $pipes,PipeContext $ctx):callable { if(!$pipes)return fn($rows)$rows;$pipearray_shift($pipes);$next$this-buildChain($pipes,$ctx);return fn($rows)$pipe-handle($rows,$next,$ctx);} }---六、集成到报表导出?php//app/Service/ReportRenderService.php namespace App\Service;use App\Pipeline\PipelineRunner;use Hyperf\DbConnection\Db;use OpenSpout\Writer\XLSX\Writer;use OpenSpout\Common\Entity\Row;class ReportRenderService { public function __construct(private readonly PipelineRunner $pipeline){} public function render(string $taskId,array $params):string { $tplDb::table(report_templates)-find($params[template_id]);$configjson_decode($tpl-pipe_config,true);//模板级管道配置 $path/tmp/reports/{$taskId}.xlsx;$writernew Writer();$writer-openToFile($path);$writer-addRow(Row::fromValues(json_decode($tpl-headers,true)));foreach($this-fetchBatches($params)as$batch){// 每批数据过管道$processed$this-pipeline-run($batch,$params[template_id],$config,$params[tenant_id]);$writer-addRows(array_map(fn($r)Row::fromValues(array_values((array)$r)),$processed));unset($batch,$processed);}$writer-close();return$path;}privatefunctionfetchBatches(array$params):\Generator{$lastId0;do{$rowsDb::table($params[table])-where(tenant_id,$params[tenant_id])-where(id,,$lastId)-orderBy(id)-limit(5000)-get()-all();if(!$rows)break;$lastIdend($rows)-id;yield$rows;}while(count($rows)5000);}}--- 七、管道配置表 CREATE TABLE template_pipes(idINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, template_id INT UNSIGNED NOT NULL, pipe_name VARCHAR(64)NOT NULL, sort_order TINYINT NOT NULL DEFAULT0, enabled TINYINT NOT NULL DEFAULT1, INDEX idx_tpl(template_id, enabled, sort_order));-- 示例数据财务报表启用税额计算插件 INSERT INTO template_pipes VALUES(1,42,clean,1,1,1);INSERT INTO template_pipes VALUES(2,42,tax_calc,2,1,1);INSERT INTO template_pipes VALUES(3,42,aggregate,3,1,1);INSERT INTO template_pipes VALUES(4,42,format,4,1,1);--- 八、扩展点总览 业务方扩展只需两步1. 实现 PipeInterface └─ app/Plugin/{Team}/{Name}Pipe.php2. 注册到 dependencies.php └─$r-register(name, XxxPipe::class);然后在 template_pipes 表配置启用即可核心代码零改动。 管道执行流 原始批次(5000行)→ CleanPipe(去空/脱敏/类型转换)→ TaxCalcPipe(财务插件计算税额)→ AggregatePipe(按部门分组求和跨批次累积 meta)→ FormatPipe(金额→¥格式时间戳→日期)→ XLSX 写入

更多文章