济南市网站建设_网站建设公司_Windows Server_seo优化
2025/12/21 15:08:50 网站建设 项目流程

安装

Flink2.1.1 docker安装

Java代码示例

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example.flink</groupId><artifactId>flink-kafka-es7-sql</artifactId><version>1.0.0</version><name>flink-kafka-es7-sql</name><packaging>jar</packaging><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><flink.version>2.1.1</flink.version><scala.binary.version>2.12</scala.binary.version><log4j.version>2.24.3</log4j.version><commons-math3.version>3.6.1</commons-math3.version><lombok.version>1.18.26</lombok.version></properties><dependencies><!-- Flink Streaming 核心依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!-- Flink 客户端依赖,用于本地执行 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><!--        本地启动需要--><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-table-planner_2.12</artifactId>--><!--            <version>2.1.1</version>--><!--        </dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>4.0.1-2.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>4.0.0-2.0</version></dependency><!-- Flink JSON Format --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-math3</artifactId><version>${commons-math3.version}</version></dependency></dependencies><build><plugins><!-- Java 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin><!-- 统一打包插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.4.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><transformers><!-- 避免 META-INF 冲突 --><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.example.flink.KafkaEs7Sql</mainClass></transformer></transformers><!-- 可选:把签名去掉,防止非法包 --><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude>//会和Flink集群自带的 Kryo/Objenesis冲突。<exclude>com/esotericsoftware/kryo/**</exclude><exclude>org/objenesis/**</exclude><exclude>META-INF/versions/9/org/objenesis/**</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
</project>

代码

package com.example.flink;import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** Flink SQL application to consume from Kafka and count files per userId*/
public class KafkaEs7Sql {public static void main(String[] args) throws Exception {// 1. 创建流执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//每 5 秒触发一次env.enableCheckpointing(5000);// 精确一次env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);// 60 s 超时env.getCheckpointConfig().setCheckpointTimeout(60000);// 禁止并发,降低 IO 峰刺env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 两次 CP 至少间隔 500 msenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 作业取消后保留 CP,便于手工恢复env.getCheckpointConfig().setExternalizedCheckpointRetention(ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);// 2. 创建表环境final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);/* 2. 源表(Processing Time 窗口无需事件时间字段) */tableEnv.executeSql("CREATE TABLE test_file_source (" +"  userId STRING," +"  type STRING," +"  fileType STRING," +"  fileUrl STRING," +"  rlsFileList ARRAY<ROW<fileUrl STRING, filePath STRING, fileType STRING>>," +"  shootTime BIGINT," +"  uploadTime BIGINT," +"  location STRING," +"  duration BIGINT," +"  pt AS PROCTIME()" +") WITH (" +"  'connector' = 'kafka'," +"  'topic' = 'user_file_topic'," +"  'properties.bootstrap.servers' = 'kk.kk.kk.kk:9092,kk.kk.kk.kk:9092,kk.kk.kk.kk:9092'," +"  'format' = 'json'," +"  'scan.startup.mode' = 'earliest-offset'" +")");tableEnv.executeSql("CREATE TABLE es_sink (" +"  id STRING," +"  userId STRING," +"  total BIGINT," +"  proTime TIMESTAMP(3)," +"  PRIMARY KEY (id) NOT ENFORCED" +") WITH (" +"  'connector' = 'elasticsearch-7'," +"  'hosts' = 'http://es.es.es.es:9200'," +"  'index' = 'test_file_index7'," +"  'document-id.key-delimiter' = '_'," +"  'format' = 'json'," +// 【修改点】使用有效的值 at-least-once"  'sink.delivery-guarantee' = 'at-least-once'" +")");/* 4. 30 秒窗口统计 + 当前时间作为 timestamp */TableResult tableResult = tableEnv.executeSql("INSERT INTO es_sink " +"SELECT " +"  UUID() AS id, " +"  userId, " +"  COUNT(*) AS total, " +"  CURRENT_TIMESTAMP AS proTime " +"FROM test_file_source " +"GROUP BY " +"  userId, " +"  TUMBLE(pt, INTERVAL '30' SECOND)");tableResult.await();}
}

运行示例

上传KafkaEs7Sql的jar包到flink

upload_kafka_es7_jar

写入ES数据

test_file_index7_search

查看ES数据结构

{"test_file_index7" : {"mappings" : {"properties" : {"id" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}},"proTime" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}},"total" : {"type" : "long"},"userId" : {"type" : "text","fields" : {"keyword" : {"type" : "keyword","ignore_above" : 256}}}}}}
}

备注

查看es数据

#查看数据
http://es.es.es.es:9200/test_file_index7/_search?pretty
#查看结构
http://es.es.es.es:9200/test_file_index7/_mapping?pretty

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询