- Apache Flink 教程
- Apache Flink - 主页
- Apache Flink - 大数据平台
- 批处理与实时处理
- Apache Flink - 简介
- Apache Flink - 架构
- Apache Flink - 系统要求
- Apache Flink - 设置/安装
- Apache Flink - API 概念
- Apache Flink - 表 API 和 SQL
- 创建 Flink 应用程序
- Apache Flink - 运行 Flink 程序
- Apache Flink - 库
- Apache Flink - 机器学习
- Apache Flink - 用例
- Apache Flink - Flink、Spark、Hadoop
- Apache Flink - 结论
- Apache Flink 资源
- Apache Flink - 快速指南
- Apache Flink - 有用的资源
- Apache Flink - 讨论
Apache Flink - 快速指南
Apache Flink - 大数据平台
过去 10 年数据的进步是巨大的;这产生了“大数据”这个术语。数据没有固定大小,可以称之为大数据;传统系统(RDBMS)无法处理的任何数据都是大数据。该大数据可以是结构化、半结构化或非结构化格式。最初,数据有三个维度:数量、速度、多样性。现在的维度已经超出了三个V。我们现在添加了其他 V——准确性、有效性、漏洞、价值、可变性等。
大数据导致了多种有助于数据存储和处理的工具和框架的出现。有一些流行的大数据框架,如 Hadoop、Spark、Hive、Pig、Storm 和 Zookeeper。它还提供了在医疗保健、金融、零售、电子商务等多个领域创建下一代产品的机会。
无论是跨国公司还是初创企业,每个人都在利用大数据来存储和处理数据并做出更明智的决策。
Apache Flink - 批处理与实时处理
就大数据而言,有两种类型的处理 -
- 批量处理
- 实时处理
基于一段时间内收集的数据的处理称为批处理。例如,银行经理想要处理过去 1 个月的数据(随着时间的推移收集),以了解过去 1 个月内被取消的支票数量。
基于即时数据以获得即时结果的处理称为实时处理。例如,银行经理在欺诈交易(即时结果)发生后立即收到欺诈警报。
下表列出了批处理和实时处理之间的差异 -
批量处理 | 实时处理 |
---|---|
静态文件 |
事件流 |
按分钟、小时、天等定期处理。 |
立即处理 纳秒 |
磁盘存储上的过去数据 |
内存存储 |
示例 - 账单生成 |
示例 - ATM 交易警报 |
如今,每个组织都大量使用实时处理。欺诈检测、医疗保健实时警报和网络攻击警报等用例需要实时处理即时数据;即使是几毫秒的延迟也会产生巨大的影响。
对于这种实时用例来说,一种理想的工具是能够以流而非批的形式输入数据的工具。Apache Flink 就是那个实时处理工具。
Apache Flink - 简介
Apache Flink 是一个可以处理流数据的实时处理框架。它是一个开源流处理框架,适用于高性能、可扩展且准确的实时应用程序。它具有真正的流模型,并且不将输入数据作为批处理或微批处理。
Apache Flink 由 Data Artisans 公司创立,现由 Apache Flink Community 在 Apache License 下开发。迄今为止,该社区拥有超过 479 名贡献者和 15500 多项提交。
Apache Flink 上的生态系统
下图显示了 Apache Flink 生态系统的不同层 -
贮存
Apache Flink 有多个选项可以读取/写入数据。以下是基本存储列表 -
- HDFS(Hadoop分布式文件系统)
- 本地文件系统
- S3
- RDBMS(MySQL、Oracle、MS SQL 等)
- MongoDB
- 数据库
- 阿帕奇·卡夫卡
- 阿帕奇水槽
部署
您可以在本地模式、集群模式或云端部署 Apache Fink。集群模式可以是standalone、YARN、MESOS。
在云上,Flink 可以部署在 AWS 或 GCP 上。
核心
这是运行时层,提供分布式处理、容错、可靠性、本机迭代处理能力等。
API 和库
这是 Apache Flink 的最顶层也是最重要的一层。它具有负责批处理的 Dataset API 和负责流处理的 Datastream API。还有其他库,例如 Flink ML(用于机器学习)、Gelly(用于图形处理)、Tables for SQL。该层为 Apache Flink 提供了多种功能。
Apache Flink - 架构
Apache Flink 工作在 Kappa 架构上。Kappa架构有一个单一的处理器——流,它将所有输入视为流,流引擎实时处理数据。kappa架构中的批量数据是流式传输的一个特例。
下图展示了Apache Flink 架构。
Kappa 架构的关键思想是通过单个流处理引擎处理批量和实时数据。
大多数大数据框架都基于 Lambda 架构,该架构具有用于批处理和流数据的单独处理器。在 Lambda 架构中,批处理视图和流视图有单独的代码库。为了查询和获取结果,需要合并代码库。不维护单独的代码库/视图并将它们合并是一种痛苦,但 Kappa 架构解决了这个问题,因为它只有一个实时视图,因此不需要合并代码库。
这并不意味着 Kappa 架构取代了 Lambda 架构,它完全取决于用例和应用程序来决定哪种架构更可取。
下图展示了 Apache Flink 作业执行架构。
程序
它是一段在 Flink 集群上运行的代码。
客户
它负责获取代码(程序)并构建作业数据流图,然后将其传递给JobManager。它还检索作业结果。
工作经理
从Client接收到Job Dataflow Graph后,它负责创建执行图。它将作业分配给集群中的TaskManager并监督作业的执行。
任务管理器
它负责执行JobManager分配的所有任务。所有任务管理器以指定的并行度在各自的插槽中运行任务。它负责将任务的状态发送给JobManager。
Apache Flink 的特点
Apache Flink 的特点如下:
它有一个流处理器,可以运行批处理和流程序。
它可以以闪电般的速度处理数据。
API 可用于 Java、Scala 和 Python。
提供所有常用操作的API,方便程序员使用。
以低延迟(纳秒)和高吞吐量处理数据。
它的容错能力。如果节点、应用程序或硬件出现故障,不会影响集群。
可以轻松与Apache Hadoop、Apache MapReduce、Apache Spark、HBase等大数据工具集成。
可以定制内存管理以实现更好的计算。
它具有高度可扩展性,可以扩展到集群中的数千个节点。
Apache Flink 中的窗口化非常灵活。
提供图形处理、机器学习、复杂事件处理库。
Apache Flink - 系统要求
以下是下载和运行 Apache Flink 的系统要求 -
推荐操作系统
- 微软Windows 10
- Ubuntu 16.04 LTS
- 苹果 macOS 10.13/High Sierra
内存要求
- 内存 - 最低 4 GB,建议 8 GB
- 存储空间 - 30 GB
注意- Java 8 必须在已设置环境变量的情况下可用。
Apache Flink - 设置/安装
在开始设置/安装 Apache Flink 之前,让我们检查系统中是否安装了 Java 8。
Java版本
我们现在将继续下载 Apache Flink。
wget http://mirrors.estointernet.in/apache/flink/flink-1.7.1/flink-1.7.1-bin-scala_2.11.tgz
现在,解压缩 tar 文件。
tar -xzf flink-1.7.1-bin-scala_2.11.tgz
进入Flink的主目录。
cd flink-1.7.1/
启动 Flink 集群。
./bin/start-cluster.sh
打开 Mozilla 浏览器并访问以下 URL,它将打开 Flink Web Dashboard。
http://本地主机:8081
这就是 Apache Flink Dashboard 的用户界面的样子。
现在Flink集群已经启动并运行。
Apache Flink - API 概念
Flink 拥有丰富的 API,开发人员可以使用这些 API 对批量数据和实时数据执行转换。各种转换包括映射、过滤、排序、连接、分组和聚合。Apache Flink 的这些转换是在分布式数据上执行的。让我们讨论 Apache Flink 提供的不同 API。
数据集API
Apache Flink 中的 Dataset API 用于对一段时间内的数据进行批量操作。该 API 可以在 Java、Scala 和 Python 中使用。它可以对数据集应用不同类型的转换,例如过滤、映射、聚合、连接和分组。
数据集是从本地文件等源创建的,或者通过从特定源读取文件来创建的,结果数据可以写入不同的接收器(如分布式文件或命令行终端)。Java 和 Scala 编程语言都支持此 API。
这是 Dataset API 的 Wordcount 程序 -
public class WordCountProg { public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<String> text = env.fromElements( "Hello", "My Dataset API Flink Program"); DataSet<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .groupBy(0) .sum(1); wordCounts.print(); } public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { for (String word : line.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
数据流API
该API用于处理连续流中的数据。您可以对流数据执行各种操作,例如过滤、映射、加窗、聚合。该数据流有多种来源,例如消息队列、文件、套接字流,并且结果数据可以写入不同的接收器(例如命令行终端)。Java 和 Scala 编程语言都支持此 API。
这是 DataStream API 的流式字数统计程序,其中有连续的字数统计流,并且数据在第二个窗口中分组。
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WindowWordCountProg { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env .socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); dataStream.print(); env.execute("Streaming WordCount Example"); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
Apache Flink - 表 API 和 SQL
Table API 是一种类似 SQL 表达式语言的关系 API。该API可以进行批处理和流处理。它可以嵌入 Java 和 Scala 数据集和数据流 API。您可以从现有数据集和数据流或外部数据源创建表。通过这个关系API,您可以执行连接、聚合、选择和过滤等操作。无论输入是批处理还是流,查询的语义都保持不变。
这是一个示例 Table API 程序 -
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment val env = StreamExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // register a Table tableEnv.registerTable("table1", ...) // or tableEnv.registerTableSource("table2", ...) // or tableEnv.registerExternalCatalog("extCat", ...) // register an output Table tableEnv.registerTableSink("outputTable", ...); // create a Table from a Table API query val tapiResult = tableEnv.scan("table1").select(...) // Create a Table from a SQL query val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...") // emit a Table API result Table to a TableSink, same for SQL result tapiResult.insertInto("outputTable") // execute env.execute()
Apache Flink - 创建 Flink 应用程序
在本章中,我们将学习如何创建 Flink 应用程序。
打开 Eclipse IDE,单击“新建项目”并选择“Java 项目”。
给出项目名称并单击“完成”。
现在,单击“完成”,如以下屏幕截图所示。
现在,右键单击src并转到“新建>>类”。
提供类名称并单击“完成”。
将以下代码复制并粘贴到编辑器中。
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.util.Collector; public class WordCount { // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); // get input data DataSet<String> text = env.readTextFile(params.get("input")); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); // emit result if (params.has("output")) { counts.writeAsCsv(params.get("output"), "\n", " "); // execute program env.execute("WordCount Example"); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); counts.print(); } } // ************************************************************************* // USER FUNCTIONS // ************************************************************************* public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } }
您将在编辑器中看到许多错误,因为需要将 Flink 库添加到该项目中。
右键单击项目>>构建路径>>配置构建路径。
选择“库”选项卡并单击“添加外部 JAR”。
进入 Flink 的 lib 目录,选择所有 4 个库,然后单击“确定”。
转到“订购和导出”选项卡,选择所有库,然后单击“确定”。
您将看到错误不再存在。
现在,让我们导出这个应用程序。右键单击该项目,然后单击“导出”。
选择 JAR 文件并单击下一步
指定目标路径并单击“下一步”
单击下一步>
单击“浏览”,选择主类 (WordCount),然后单击“完成”。
注意- 如果您收到任何警告,请单击“确定”。
运行以下命令。它将进一步运行您刚刚创建的 Flink 应用程序。
./bin/flink run /home/ubuntu/wordcount.jar --input README.txt --output /home/ubuntu/output
Apache Flink - 运行 Flink 程序
在本章中,我们将学习如何运行 Flink 程序。
让我们在 Flink 集群上运行 Flink wordcount 示例。
转到 Flink 的主目录并在终端中运行以下命令。
bin/flink run examples/batch/WordCount.jar -input README.txt -output /home/ubuntu/flink-1.7.1/output.txt
转到 Flink 仪表板,您将能够看到已完成的作业及其详细信息。
如果您单击已完成的作业,您将获得作业的详细概述。
要检查 wordcount 程序的输出,请在终端中运行以下命令。
cat output.txt
Apache Flink - 库
在本章中,我们将了解 Apache Flink 的不同库。
复杂事件处理 (CEP)
FlinkCEP 是 Apache Flink 中的一个 API,它分析连续流数据的事件模式。这些事件接近实时,具有高吞吐量和低延迟。该 API 主要用于传感器数据,这些数据是实时的,处理起来非常复杂。
CEP 分析输入流的模式并很快给出结果。它能够在事件模式复杂的情况下提供实时通知和警报。FlinkCEP 可以连接到不同类型的输入源并分析其中的模式。
使用 CEP 的示例架构如下所示 -
传感器数据将来自不同的来源,Kafka 将充当分布式消息传递框架,它将流分发到 Apache Flink,FlinkCEP 将分析复杂的事件模式。
您可以使用 Pattern API 在 Apache Flink 中编写程序来进行复杂的事件处理。它允许您决定从连续流数据中检测的事件模式。以下是一些最常用的 CEP 模式 -
开始
它用于定义起始状态。以下程序显示了如何在 Flink 程序中定义它 -
Pattern<Event, ?> next = start.next("next");
在哪里
用于定义当前状态下的过滤条件。
patternState.where(new FilterFunction <Event>() { @Override public boolean filter(Event value) throws Exception { } });
下一个
它用于附加新的模式状态以及传递先前模式所需的匹配事件。
Pattern<Event, ?> next = start.next("next");
其次是
它用于附加新的模式状态,但这里其他事件可以在两个匹配事件之间发生。
Pattern<Event, ?> followedBy = start.followedBy("next");
杰利
Apache Flink 的图形 API 是 Gelly。Gelly 用于使用一组方法和实用程序对 Flink 应用程序执行图形分析。您可以通过 Gelly 以分布式方式使用 Apache Flink API 来分析巨大的图表。还有其他图形库(例如 Apache Giraph)用于相同目的,但由于 Gelly 是在 Apache Flink 之上使用的,因此它使用单一 API。从开发和运营的角度来看,这非常有帮助。
让我们使用 Apache Flink API 运行一个示例 - Gelly。
首先,您需要将 Apache Flink 的 opt 目录中的 2 个 Gelly jar 文件复制到其 lib 目录中。然后运行 flink-gelly-examples jar。
cp opt/flink-gelly* lib/ ./bin/flink run examples/gelly/flink-gelly-examples_*.jar
现在让我们运行 PageRank 示例。
PageRank 计算每个顶点的分数,它是通过内边传输的 PageRank 分数的总和。每个顶点的分数在出边之间平均分配。高分顶点与其他高分顶点链接。
结果包含顶点 ID 和 PageRank 分数。
usage: flink run examples/flink-gelly-examples_<version>.jar --algorithm PageRank [algorithm options] --input <input> [input options] --output <output> [output options] ./bin/flink run examples/gelly/flink-gelly-examples_*.jar --algorithm PageRank --input CycleGraph --vertex_count 2 --output Print
Apache Flink - 机器学习
Apache Flink 的机器学习库称为 FlinkML。由于机器学习的使用在过去 5 年中呈指数级增长,Flink 社区决定将这个机器学习 APO 也添加到其生态系统中。FlinkML 中的贡献者和算法列表正在不断增加。该 API 尚未成为二进制发行版的一部分。
这是使用 FlinkML 的线性回归的示例 -
// LabeledVector is a feature vector with a label (class or real value) val trainingData: DataSet[LabeledVector] = ... val testingData: DataSet[Vector] = ... // Alternatively, a Splitter is used to break up a DataSet into training and testing data. val dataSet: DataSet[LabeledVector] = ... val trainTestData: DataSet[TrainTestDataSet] = Splitter.trainTestSplit(dataSet) val trainingData: DataSet[LabeledVector] = trainTestData.training val testingData: DataSet[Vector] = trainTestData.testing.map(lv => lv.vector) val mlr = MultipleLinearRegression() .setStepsize(1.0) .setIterations(100) .setConvergenceThreshold(0.001) mlr.fit(trainingData) // The fitted model can now be used to make predictions val predictions: DataSet[LabeledVector] = mlr.predict(testingData)
在flink-1.7.1/examples/batch/路径中,您将找到 KMeans.jar 文件。让我们运行这个 FlinkML 示例。
该示例程序使用默认点和质心数据集运行。
./bin/flink run examples/batch/KMeans.jar --output Print
Apache Flink - 用例
在本章中,我们将了解 Apache Flink 中的一些测试用例。
Apache Flink - 布伊格电信
布伊格电信是法国最大的电信组织之一。它拥有 11+000000 移动用户和 25+000000 固定客户。Bouygues 第一次听说 Apache Flink 是在巴黎举行的 Hadoop 小组会议上。从那时起,他们一直在多个用例中使用 Flink。他们每天通过 Apache Flink 实时处理数十亿条消息。
Bouygues 对于 Apache Flink 的评价是这样的:“我们最终选择了 Flink,因为该系统支持真正的流式传输 - 无论是在 API 层面还是在运行时层面,为我们提供了我们所寻求的可编程性和低延迟。此外,与其他解决方案相比,我们能够在更短的时间内使用 Flink 启动并运行我们的系统,从而获得更多可用的开发人员资源来扩展系统中的业务逻辑。”
在布伊格,客户体验是重中之重。他们实时分析数据,以便可以向工程师提供以下见解 -
通过网络提供实时客户体验
全球网络上正在发生的事情
网络评估和运营
他们创建了一个名为 LUX(记录用户体验)的系统,该系统通过内部数据参考处理来自网络设备的大量日志数据,提供体验质量指标,记录客户体验并构建警报功能,以检测 60 年内数据消耗的任何故障。秒。
为了实现这一目标,他们需要一个能够实时获取大量数据、易于设置并提供丰富的 API 集来处理流数据的框架。Apache Flink 非常适合 Bouygues Telecom。
Apache Flink - 阿里巴巴
阿里巴巴是全球最大的电子商务零售公司,2015年收入为3940亿美元。阿里巴巴搜索是所有客户的入口点,它显示所有搜索并相应推荐。
阿里巴巴在其搜索引擎中使用 Apache Flink 实时显示结果,为每个用户提供最高的准确性和相关性。
阿里巴巴正在寻找一个框架,它是 -
非常敏捷地为整个搜索基础架构流程维护一个代码库。
为网站上产品的可用性更改提供低延迟。
一致且具有成本效益。
Apache Flink 满足上述所有要求。他们需要一个框架,该框架具有单个处理引擎,并且可以使用同一引擎处理批处理和流数据,这就是 Apache Flink 所做的。
他们还使用 Blink(Flink 的分叉版本)来满足搜索的一些独特要求。他们还使用 Apache Flink 的 Table API,对搜索进行了一些改进。
阿里巴巴对 apache Flink 的评价是这样的:“回顾过去,对于 Blink 和 Flink 在阿里巴巴来说无疑是非常重要的一年。没有人想到我们会在一年内取得如此大的进步,我们非常感谢大家在社区中为我们提供帮助的人们。Flink 已被证明可以在非常大的范围内发挥作用。我们比以往任何时候都更加致力于继续与社区合作,推动 Flink 向前发展! ”
Apache Flink - Flink、Spark、Hadoop
这是一个综合表格,显示了三种最流行的大数据框架之间的比较:Apache Flink、Apache Spark 和 Apache Hadoop。
阿帕奇Hadoop | 阿帕奇火花 | 阿帕奇弗林克 | |
---|---|---|---|
原产地年份 |
2005年 | 2009年 | 2009年 |
发源地 |
MapReduce (谷歌) Hadoop (雅虎) | 加州大学伯克利分校 | 柏林工业大学 |
数据处理引擎 |
批 | 批 | 溪流 |
处理速度 |
比 Spark 和 Flink 慢 | 比 Hadoop 快 100 倍 | 比火花还快 |
编程语言 |
Java、C、C++、Ruby、Groovy、Perl、Python | Java、Scala、Python 和 R | Java 和 Scala |
编程模型 |
映射减少 | 弹性分布式数据集 (RDD) | 循环数据流 |
数据传输 |
批 | 批 | 流水线和批量 |
内存管理 |
基于磁盘 | JVM 管理 | 主动管理 |
潜伏 |
低的 | 中等的 | 低的 |
吞吐量 |
中等的 | 高的 | 高的 |
优化 |
手动的 | 手动的 | 自动的 |
应用程序编程接口 |
低级 | 高水平 | 高水平 |
流媒体支持 |
不适用 | 火花流 | Flink 流式传输 |
SQL支持 |
蜂巢、黑斑羚 | SparkSQL | 表 API 和 SQL |
图表支持 |
不适用 | 图X | 杰利 |
机器学习支持 |
不适用 | SparkML | FlinkML |
Apache Flink - 结论
我们在上一章中看到的比较表几乎总结了这些指针。Apache Flink 是最适合实时处理和用例的框架。其单一引擎系统是独一无二的,可以使用不同的 API(例如 Dataset 和 DataStream)处理批处理和流数据。
这并不意味着 Hadoop 和 Spark 被淘汰,最适合的大数据框架的选择始终取决于用例,并且因用例而异。Hadoop 和 Flink 或 Spark 和 Flink 的组合可能适合多种用例。
尽管如此,Flink 是目前最好的实时处理框架。Apache Flink 的发展令人惊叹,其社区的贡献者数量与日俱增。
快乐闪烁!