宁波市网站建设_网站建设公司_React_seo优化
2026/1/14 15:47:15 网站建设 项目流程

1、依赖引入

Java/Scala 工程需要加 Flink CSV 依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>2.2.0</version></dependency>

PyFlink 用户一般可以直接在作业里使用(前提是集群环境里对应的 jar 能被加载;如果你是在远程集群跑,仍然需要按你前面“依赖管理”章节的方式把 jar 加入pipeline.jarsenv.add_jars())。

2、Java:快速读取 POJO(自动推导 Schema)

最省事的方式:让 Jackson 根据 POJO 字段推导 CSV schema:

CsvReaderFormat<SomePojo>csvFormat=CsvReaderFormat.forPojo(SomePojo.class);FileSource<SomePojo>source=FileSource.forRecordStreamFormat(csvFormat,Path.fromLocalFile(...)).build();

注意:CSV 列顺序必须和 POJO 字段顺序一致。必要时加:

@JsonPropertyOrder({"field1","field2",...})

否则列对不上会出现解析错位(最常见的“字段都不为空但值都错”的隐性 bug)。

3、Java:高级配置(自定义分隔符、禁用引号等)

需要精细控制时,用forSchema(...)自己生成CsvSchema,例如把分隔符改成|,并禁用 quote:

Function<CsvMapper,CsvSchema>schemaGenerator=mapper->mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');CsvReaderFormat<CityPojo>csvFormat=CsvReaderFormat.forSchema(()->newCsvMapper(),schemaGenerator,TypeInformation.of(CityPojo.class));FileSource<CityPojo>source=FileSource.forRecordStreamFormat(csvFormat,Path.fromLocalFile(...)).build();

对应 CSV:

Berlin|52.5167|13.3833|Germany|DE|Berlin|primary|3644826 San Francisco|37.7562|-122.443|United States|US|California||3592294 Beijing|39.905|116.3914|China|CN|Beijing|primary|19433000

更复杂的类型也能做(比如数组列),通过CsvSchema.ColumnType.ARRAY并指定数组元素分隔符:

CsvReaderFormat<ComplexPojo>csvFormat=CsvReaderFormat.forSchema(CsvSchema.builder().addColumn(newCsvSchema.Column(0,"id",CsvSchema.ColumnType.NUMBER)).addColumn(newCsvSchema.Column(4,"array",CsvSchema.ColumnType.ARRAY).withArrayElementSeparator("#")).build(),TypeInformation.of(ComplexPojo.class));

4、PyFlink:手动定义 CSV Schema(输出为 Row)

PyFlink 里通常自己建 schema,每一列映射为 Row 字段:

frompyflink.common.watermark_strategyimportWatermarkStrategyfrompyflink.tableimportDataTypesfrompyflink.datastreamimportStreamExecutionEnvironmentfrompyflink.datastream.connectors.file_systemimportFileSourcefrompyflink.formats.csvimportCsvReaderFormat,CsvSchema# 具体 import 以你环境包结构为准env=StreamExecutionEnvironment.get_execution_environment()schema=CsvSchema.builder()\.add_number_column('id',number_type=DataTypes.BIGINT())\.add_array_column('array',separator='#',element_type=DataTypes.INT())\.set_column_separator(',')\.build()source=FileSource.for_record_stream_format(CsvReaderFormat.for_schema(schema),CSV_FILE_PATH).build()ds=env.from_source(source,WatermarkStrategy.no_watermarks(),'csv-source')# ds 的 record 类型是 Row(具名字段 + 复合类型)# Types.ROW_NAMED(['id', 'array'], [Types.LONG(), Types.LIST(Types.INT())])

对应 CSV:

0,1#2#3 1, 2,1

补一个实战提醒:如果某列可能为空(比如上面的array),你后续算子处理时要把None/空数组的分支写好,否则很容易在 map/flat_map 里触发类型错误。

5、PyFlink:写 CSV(Bulk Format)

写 CSV 通常用CsvBulkWriters生成 BulkWriterFactory,再配合FileSink.for_bulk_format(...)

frompyflink.tableimportDataTypesfrompyflink.datastream.connectors.file_systemimportFileSinkfrompyflink.formats.csvimportCsvBulkWriters,CsvSchema# 具体 import 以你环境包结构为准schema=CsvSchema.builder()\.add_number_column('id',number_type=DataTypes.BIGINT())\.add_array_column('array',separator='#',element_type=DataTypes.INT())\.set_column_separator(',')\.build()sink=FileSink.for_bulk_format(OUTPUT_DIR,CsvBulkWriters.for_schema(schema)).build()ds.sink_to(sink)

6、运行模式:Batch / Streaming 都可用

CsvReaderFormat类似TextLineInputFormat,既可用于批也可用于流(持续监控目录等),具体取决于你用的 Source/RuntimeMode 以及文件系统是否支持持续发现。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询