上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

【极数系列】Flink集成DataSource读取文件数据(08)

guduadmin221月前

文章目录

  • 01 引言
  • 02 简介概述
  • 03 基于文件读取数据
    • 3.1 readTextFile(path)
    • 3.2 readFile(fileInputFormat, path)
    • 3.3 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
    • 3.4 实现原理
    • 3.5 注意事项
    • 3.6 支持读取的文件形式
    • 04 源码实战demo
      • 4.1 pom.xml依赖
      • 4.2 创建文件数据流作业
      • 4.3 运行程序查看日志

        01 引言

        源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git
        模块:aurora_flink
        主类:FlinkFileSourceJob(文件)
        

        02 简介概述

        1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。
        2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。
        3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。
        

        03 基于文件读取数据

        3.1 readTextFile(path)

        读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。

        3.2 readFile(fileInputFormat, path)

        按照指定的文件输入格式读取(一次)文件。

        3.3 readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)

        这是前两个方法内部调用的方法。它基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的不同,source 可能定期(每 interval 毫秒)监控路径上的新数据(watchType 为 FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理一次当前路径中的数据然后退出(watchType 为 FileProcessingMode.PROCESS_ONCE)。使用 pathFilter,用户可以进一步排除正在处理的文件。

        3.4 实现原理

        底层Flink 将文件读取过程拆分为两个子任务,即 目录监控数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 watchType),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。

        3.5 注意事项

        (1)如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。

        (2)如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,source 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。

        3.6 支持读取的文件形式

        1.本地文件

        2.HDFS文件

        3.文件夹

        4.压缩文件

        04 源码实战demo

        4.1 pom.xml依赖

        
        
            4.0.0
            com.xsy
            aurora_flink
            1.0-SNAPSHOT
            
            
                
                11
                
                3.8.1
                
                UTF-8
                
                UTF-8
                
                1.2.75
                
                2.17.1
                
                1.18.0
                
                2.11
                
                2.17.1
            
            
            
                
                
                    com.alibaba
                    fastjson
                    ${fastjson.version}
                
                
                
                    org.apache.flink
                    flink-java
                    ${flink.version}
                
                
                    org.apache.flink
                    flink-streaming-scala_2.12
                    ${flink.version}
                
                
                
                    org.apache.flink
                    flink-clients
                    ${flink.version}
                
                
                
                
                    org.apache.logging.log4j
                    log4j-slf4j-impl
                    ${log4j.version}
                
                
                    org.apache.logging.log4j
                    log4j-api
                    ${log4j.version}
                
                
                    org.apache.logging.log4j
                    log4j-core
                    ${log4j.version}
                
                
            
            
            
                ${project.name}
                
                
                    
                        src/main/resources
                    
                    
                        src/main/java
                        
                            **/*.xml
                        
                    
                
                
                    
                        org.apache.maven.plugins
                        maven-shade-plugin
                        3.1.1
                        
                            
                                package
                                
                                    shade
                                
                                
                                    
                                        
                                            org.apache.flink:force-shading
                                            org.google.code.flindbugs:jar305
                                            org.slf4j:*
                                            org.apache.logging.log4j:*
                                        
                                    
                                    
                                        
                                            *:*
                                            
                                                META-INF/*.SF
                                                META-INF/*.DSA
                                                META-INF/*.RSA
                                            
                                        
                                    
                                    
                                        
                                            org.xsy.sevenhee.flink.TestStreamJob
                                        
                                    
                                
                            
                        
                    
                
                
                
                    
                        
                        
                            org.springframework.boot
                            spring-boot-maven-plugin
                            ${spring.boot.version}
                            
                                true
                                ${project.build.finalName}
                            
                            
                                
                                    
                                        repackage
                                    
                                
                            
                        
                        
                        
                            maven-compiler-plugin
                            ${maven.plugin.version}
                            
                                ${java.version}
                                ${java.version}
                                UTF-8
                                
                                    -parameters
                                
                            
                        
                    
                
            
            
            
                
                    aliyun-repos
                    https://maven.aliyun.com/nexus/content/groups/public/
                    
                        false
                    
                
            
            
            
                
                    aliyun-plugin
                    https://maven.aliyun.com/nexus/content/groups/public/
                    
                        false
                    
                
            
        
        

        4.2 创建文件数据流作业

        package com.aurora.source;
        import org.apache.flink.api.common.RuntimeExecutionMode;
        import org.apache.flink.streaming.api.datastream.DataStreamSource;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.slf4j.Logger;
        import org.slf4j.LoggerFactory;
        /**
         * @description flink的文件source应用
         * @author 浅夏的猫
         * @datetime 23:03 2024/1/28
        */
        public class FlinkFileSourceJob {
            private static final Logger logger = LoggerFactory.getLogger(FlinkFileSourceJob.class);
            public static void main(String[] args) throws Exception {
                //1.创建Flink运行环境
                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                //2.设置Flink运行模式:
                //STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)
                env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
                //3.基于文件的source使用(本地/HDFS文件/文件夹/压缩文件)
                //3.1本地文件
                DataStreamSource dataStreamSourceFile = env.readTextFile("E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties");
                //3.2 HDFS文件,前提你已经搭建环境
        //        DataStreamSource dataStreamSourceHdfs = env.readTextFile("hdfs://localhost:8020//source/application.txt");
                //3.3文件夹
                DataStreamSource dataStreamSourceDir = env.readTextFile("E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources");
                //3.4压缩文件
                DataStreamSource dataStreamSourceRar = env.readTextFile("E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\test.rar");
                //4.输出打印
                dataStreamSourceFile.print();
        //        dataStreamSourceHdfs.print();
                dataStreamSourceDir.print();
                dataStreamSourceRar.print();
                //5.启动运行
                env.execute();
            }
        }
        

        4.3 运行程序查看日志

        【极数系列】Flink集成DataSource读取文件数据(08),在这里插入图片描述,第1张

网友评论

搜索
最新文章
热门文章
热门标签
 
 梦见去世的人复活了是什么预兆  梦见刮胡子刮破流血  爸爸明明活着却梦到他去世