Apache Flink - 快速指南


Apache Flink - 大数据平台

过去 10 年数据的进步是巨大的;这产生了“大数据”这个术语。数据没有固定大小,可以称之为大数据;传统系统(RDBMS)无法处理的任何数据都是大数据。该大数据可以是结构化、半结构化或非结构化格式。最初,数据有三个维度:数量、速度、多样性。现在的维度已经超出了三个V。我们现在添加了其他 V——准确性、有效性、漏洞、价值、可变性等。

大数据导致了多种有助于数据存储和处理的工具和框架的出现。有一些流行的大数据框架,如 Hadoop、Spark、Hive、Pig、Storm 和 Zookeeper。它还提供了在医疗保健、金融、零售、电子商务等多个领域创建下一代产品的机会。

无论是跨国公司还是初创企业,每个人都在利用大数据来存储和处理数据并做出更明智的决策。

Apache Flink - 批处理与实时处理

就大数据而言,有两种类型的处理 -

  • 批量处理
  • 实时处理

基于一段时间内收集的数据的处理称为批处理。例如,银行经理想要处理过去 1 个月的数据(随着时间的推移收集),以了解过去 1 个月内被取消的支票数量。

基于即时数据以获得即时结果的处理称为实时处理。例如,银行经理在欺诈交易(即时结果)发生后立即收到欺诈警报。

下表列出了批处理和实时处理之间的差异 -

批量处理 实时处理

静态文件

事件流

按分钟、小时、天等定期处理。

立即处理

纳秒

磁盘存储上的过去数据

内存存储

示例 - 账单生成

示例 - ATM 交易警报

如今,每个组织都大量使用实时处理。欺诈检测、医疗保健实时警报和网络攻击警报等用例需要实时处理即时数据;即使是几毫秒的延迟也会产生巨大的影响。

对于这种实时用例来说,一种理想的工具是能够以流而非批的形式输入数据的工具。Apache Flink 就是那个实时处理工具。

Apache Flink - 简介

Apache Flink 是一个可以处理流数据的实时处理框架。它是一个开源流处理框架,适用于高性能、可扩展且准确的实时应用程序。它具有真正的流模型,并且不将输入数据作为批处理或微批处理。

Apache Flink 由 Data Artisans 公司创立,现由 Apache Flink Community 在 Apache License 下开发。迄今为止,该社区拥有超过 479 名贡献者和 15500 多项提交。

Apache Flink 上的生态系统

下图显示了 Apache Flink 生态系统的不同层 -

Apache Flink 上的生态系统

贮存

Apache Flink 有多个选项可以读取/写入数据。以下是基本存储列表 -

  • HDFS(Hadoop分布式文件系统)
  • 本地文件系统
  • S3
  • RDBMS(MySQL、Oracle、MS SQL 等)
  • MongoDB
  • 数据库
  • 阿帕奇·卡夫卡
  • 阿帕奇水槽

部署

您可以在本地模式、集群模式或云端部署 Apache Fink。集群模式可以是standalone、YARN、MESOS。

在云上,Flink 可以部署在 AWS 或 GCP 上。

核心

这是运行时层,提供分布式处理、容错、可靠性、本机迭代处理能力等。

API 和库

这是 Apache Flink 的最顶层也是最重要的一层。它具有负责批处理的 Dataset API 和负责流处理的 Datastream API。还有其他库,例如 Flink ML(用于机器学习)、Gelly(用于图形处理)、Tables for SQL。该层为 Apache Flink 提供了多种功能。

Apache Flink - 架构

Apache Flink 工作在 Kappa 架构上。Kappa架构有一个单一的处理器——流,它将所有输入视为流,流引擎实时处理数据。kappa架构中的批量数据是流式传输的一个特例。

下图展示了Apache Flink 架构

Apache Flink 架构

Kappa 架构的关键思想是通过单个流处理引擎处理批量和实时数据。

大多数大数据框架都基于 Lambda 架构,该架构具有用于批处理和流数据的单独处理器。在 Lambda 架构中,批处理视图和流视图有单独的代码库。为了查询和获取结果,需要合并代码库。不维护单独的代码库/视图并将它们合并是一种痛苦,但 Kappa 架构解决了这个问题,因为它只有一个实时视图,因此不需要合并代码库。

这并不意味着 Kappa 架构取代了 Lambda 架构,它完全取决于用例和应用程序来决定哪种架构更可取。

下图展示了 Apache Flink 作业执行架构。

执行架构

程序

它是一段在 Flink 集群上运行的代码。

客户

它负责获取代码(程序)并构建作业数据流图,然后将其传递给JobManager。它还检索作业结果。

工作经理

从Client接收到Job Dataflow Graph后,它负责创建执行图。它将作业分配给集群中的TaskManager并监督作业的执行。

任务管理器

它负责执行JobManager分配的所有任务。所有任务管理器以指定的并行度在各自的插槽中运行任务。它负责将任务的状态发送给JobManager。

Apache Flink 的特点

Apache Flink 的特点如下:

  • 它有一个流处理器,可以运行批处理和流程序。

  • 它可以以闪电般的速度处理数据。

  • API 可用于 Java、Scala 和 Python。

  • 提供所有常用操作的API,方便程序员使用。

  • 以低延迟(纳秒)和高吞吐量处理数据。

  • 它的容错能力。如果节点、应用程序或硬件出现故障,不会影响集群。

  • 可以轻松与Apache Hadoop、Apache MapReduce、Apache Spark、HBase等大数据工具集成。

  • 可以定制内存管理以实现更好的计算。

  • 它具有高度可扩展性,可以扩展到集群中的数千个节点。

  • Apache Flink 中的窗口化非常灵活。

  • 提供图形处理、机器学习、复杂事件处理库。

Apache Flink - 系统要求

以下是下载和运行 Apache Flink 的系统要求 -

推荐操作系统

  • 微软Windows 10
  • Ubuntu 16.04 LTS
  • 苹果 macOS 10.13/High Sierra

内存要求

  • 内存 - 最低 4 GB,建议 8 GB
  • 存储空间 - 30 GB

注意- Java 8 必须在已设置环境变量的情况下可用。

Apache Flink - 设置/安装

在开始设置/安装 Apache Flink 之前,让我们检查系统中是否安装了 Java 8。

Java版本

安装1

我们现在将继续下载 Apache Flink。

wget http://mirrors.estointernet.in/apache/flink/flink-1.7.1/flink-1.7.1-bin-scala_2.11.tgz
安装2

现在,解压缩 tar 文件。

tar -xzf flink-1.7.1-bin-scala_2.11.tgz
安装3

进入Flink的主目录。

cd flink-1.7.1/

启动 Flink 集群。

./bin/start-cluster.sh
安装4

打开 Mozilla 浏览器并访问以下 URL,它将打开 Flink Web Dashboard。

http://本地主机:8081

这就是 Apache Flink Dashboard 的用户界面的样子。

Flink集群

现在Flink集群已经启动并运行。

Apache Flink - API 概念

Flink 拥有丰富的 API,开发人员可以使用这些 API 对批量数据和实时数据执行转换。各种转换包括映射、过滤、排序、连接、分组和聚合。Apache Flink 的这些转换是在分布式数据上执行的。让我们讨论 Apache Flink 提供的不同 API。

数据集API

Apache Flink 中的 Dataset API 用于对一段时间内的数据进行批量操作。该 API 可以在 Java、Scala 和 Python 中使用。它可以对数据集应用不同类型的转换,例如过滤、映射、聚合、连接和分组。

数据集是从本地文件等源创建的,或者通过从特定源读取文件来创建的,结果数据可以写入不同的接收器(如分布式文件或命令行终端)。Java 和 Scala 编程语言都支持此 API。

这是 Dataset API 的 Wordcount 程序 -

public class WordCountProg {
   public static void main(String[] args) throws Exception {
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      DataSet<String> text = env.fromElements(
      "Hello",
      "My Dataset API Flink Program");

      DataSet<Tuple2<String, Integer>> wordCounts = text
      .flatMap(new LineSplitter())
      .groupBy(0)
      .sum(1);

      wordCounts.print();
   }

   public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
      @Override
      public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
         for (String word : line.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
         }
      }
   }
}

数据流API

该API用于处理连续流中的数据。您可以对流数据执行各种操作,例如过滤、映射、加窗、聚合。该数据流有多种来源,例如消息队列、文件、套接字流,并且结果数据可以写入不同的接收器(例如命令行终端)。Java 和 Scala 编程语言都支持此 API。

这是 DataStream API 的流式字数统计程序,其中有连续的字数统计流,并且数据在第二个窗口中分组。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCountProg {
   public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      DataStream<Tuple2<String, Integer>> dataStream = env
      .socketTextStream("localhost", 9999)
      .flatMap(new Splitter())
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1);
      dataStream.print();
      env.execute("Streaming WordCount Example");
   }
   public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
      @Override
      public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
         for (String word: sentence.split(" ")) {
            out.collect(new Tuple2<String, Integer>(word, 1));
         }
      }
   }
}

Apache Flink - 表 API 和 SQL

Table API 是一种类似 SQL 表达式语言的关系 API。该API可以进行批处理和流处理。它可以嵌入 Java 和 Scala 数据集和数据流 API。您可以从现有数据集和数据流或外部数据源创建表。通过这个关系API,您可以执行连接、聚合、选择和过滤等操作。无论输入是批处理还是流,查询的语义都保持不变。

这是一个示例 Table API 程序 -

// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// create a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register a Table
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...) // or
tableEnv.registerExternalCatalog("extCat", ...)

// register an output Table
tableEnv.registerTableSink("outputTable", ...);
// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable")

// execute
env.execute()

Apache Flink - 创建 Flink 应用程序

在本章中,我们将学习如何创建 Flink 应用程序。

打开 Eclipse IDE,单击“新建项目”并选择“Java 项目”。

创建 Flink 应用程序

给出项目名称并单击“完成”。

创建 Flink 应用程序2

现在,单击“完成”,如以下屏幕截图所示。

创建 Flink 应用程序3

现在,右键单击src并转到“新建>>类”。

创建 Flink 应用程序4

提供类名称并单击“完成”。

创建 Flink 应用程序5

将以下代码复制并粘贴到编辑器中。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {

   // *************************************************************************
   // PROGRAM
   // *************************************************************************
   public static void main(String[] args) throws Exception {
      final ParameterTool params = ParameterTool.fromArgs(args);
      // set up the execution environment
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
      // make parameters available in the web interface
      env.getConfig().setGlobalJobParameters(params);
      // get input data
      DataSet<String> text = env.readTextFile(params.get("input"));
      DataSet<Tuple2<String, Integer>> counts =
      // split up the lines in pairs (2-tuples) containing: (word,1)
      text.flatMap(new Tokenizer())
      // group by the tuple field "0" and sum up tuple field "1"
      .groupBy(0)
      .sum(1);
      // emit result
      if (params.has("output")) {
         counts.writeAsCsv(params.get("output"), "\n", " ");
         // execute program
         env.execute("WordCount Example");
      } else {
         System.out.println("Printing result to stdout. Use --output to specify output path.");
         counts.print();
      }
   }
   
   // *************************************************************************
   // USER FUNCTIONS
   // *************************************************************************
   public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
      public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
         // normalize and split the line
         String[] tokens = value.toLowerCase().split("\\W+");
         // emit the pairs
         for (String token : tokens) {
            if (token.length() > 0) {
               out.collect(new Tuple2<>(token, 1));
            }
         }
      }
   }
}

您将在编辑器中看到许多错误,因为需要将 Flink 库添加到该项目中。

添加 Flink 库

右键单击项目>>构建路径>>配置构建路径。

右键单击项目

选择“库”选项卡并单击“添加外部 JAR”。

选择图书馆

进入 Flink 的 lib 目录,选择所有 4 个库,然后单击“确定”。

Flinks lib目录

转到“订购和导出”选项卡,选择所有库,然后单击“确定”。

订单和导出选项卡

您将看到错误不再存在。

现在,让我们导出这个应用程序。右键单击该项目,然后单击“导出”。

导出此应用程序

选择 JAR 文件并单击下一步

选择 JAR 文件

指定目标路径并单击“下一步”

目的地路径

单击下一步>

点击下一步

单击“浏览”,选择主类 (WordCount),然后单击“完成”。

单击“完成”

注意- 如果您收到任何警告,请单击“确定”。

运行以下命令。它将进一步运行您刚刚创建的 Flink 应用程序。

./bin/flink run /home/ubuntu/wordcount.jar --input README.txt --output /home/ubuntu/output
收到警告

Apache Flink - 运行 Flink 程序

在本章中,我们将学习如何运行 Flink 程序。

让我们在 Flink 集群上运行 Flink wordcount 示例。

转到 Flink 的主目录并在终端中运行以下命令。

bin/flink run examples/batch/WordCount.jar -input README.txt -output /home/ubuntu/flink-1.7.1/output.txt
Flink 主目录

转到 Flink 仪表板,您将能够看到已完成的作业及其详细信息。

Flink 仪表板

如果您单击已完成的作业,您将获得作业的详细概述。

单击已完成的作业

要检查 wordcount 程序的输出,请在终端中运行以下命令。

cat output.txt
输出字数统计程序

Apache Flink - 库

在本章中,我们将了解 Apache Flink 的不同库。

复杂事件处理 (CEP)

FlinkCEP 是 Apache Flink 中的一个 API,它分析连续流数据的事件模式。这些事件接近实时,具有高吞吐量和低延迟。该 API 主要用于传感器数据,这些数据是实时的,处理起来非常复杂。

CEP 分析输入流的模式并很快给出结果。它能够在事件模式复杂的情况下提供实时通知和警报。FlinkCEP 可以连接到不同类型的输入源并分析其中的模式。

使用 CEP 的示例架构如下所示 -

具有 CEP 的架构

传感器数据将来自不同的来源,Kafka 将充当分布式消息传递框架,它将流分发到 Apache Flink,FlinkCEP 将分析复杂的事件模式。

您可以使用 Pattern API 在 Apache Flink 中编写程序来进行复杂的事件处理。它允许您决定从连续流数据中检测的事件模式。以下是一些最常用的 CEP 模式 -

开始

它用于定义起始状态。以下程序显示了如何在 Flink 程序中定义它 -

Pattern<Event, ?> next = start.next("next");

在哪里

用于定义当前状态下的过滤条件。

patternState.where(new FilterFunction <Event>() {  
   @Override 
      public boolean filter(Event value) throws Exception { 
   } 
});

下一个

它用于附加新的模式状态以及传递先前模式所需的匹配事件。

Pattern<Event, ?> next = start.next("next");

其次是

它用于附加新的模式状态,但这里其他事件可以在两个匹配事件之间发生。

Pattern<Event, ?> followedBy = start.followedBy("next");

杰利

Apache Flink 的图形 API 是 Gelly。Gelly 用于使用一组方法和实用程序对 Flink 应用程序执行图形分析。您可以通过 Gelly 以分布式方式使用 Apache Flink API 来分析巨大的图表。还有其他图形库(例如 Apache Giraph)用于相同目的,但由于 Gelly 是在 Apache Flink 之上使用的,因此它使用单一 API。从开发和运营的角度来看,这非常有帮助。

让我们使用 Apache Flink API 运行一个示例 - Gelly。

首先,您需要将 Apache Flink 的 opt 目录中的 2 个 Gelly jar 文件复制到其 lib 目录中。然后运行 ​​flink-gelly-examples jar。

cp opt/flink-gelly* lib/ 
./bin/flink run examples/gelly/flink-gelly-examples_*.jar 
杰利

现在让我们运行 PageRank 示例。

PageRank 计算每个顶点的分数,它是通过内边传输的 PageRank 分数的总和。每个顶点的分数在出边之间平均分配。高分顶点与其他高分顶点链接。

结果包含顶点 ID 和 PageRank 分数。

usage: flink run examples/flink-gelly-examples_<version>.jar --algorithm PageRank [algorithm options] --input <input> [input options] --output <output> [output options] 

./bin/flink run examples/gelly/flink-gelly-examples_*.jar --algorithm PageRank --input CycleGraph --vertex_count 2 --output Print 
PageRank 分数

Apache Flink - 机器学习

Apache Flink 的机器学习库称为 FlinkML。由于机器学习的使用在过去 5 年中呈指数级增长,Flink 社区决定将这个机器学习 APO 也添加到其生态系统中。FlinkML 中的贡献者和算法列表正在不断增加。该 API 尚未成为二进制发行版的一部分。

这是使用 FlinkML 的线性回归的示例 -

// LabeledVector is a feature vector with a label (class or real value)
val trainingData: DataSet[LabeledVector] = ...
val testingData: DataSet[Vector] = ...

// Alternatively, a Splitter is used to break up a DataSet into training and testing data.
val dataSet: DataSet[LabeledVector] = ...
val trainTestData: DataSet[TrainTestDataSet] = Splitter.trainTestSplit(dataSet)
val trainingData: DataSet[LabeledVector] = trainTestData.training
val testingData: DataSet[Vector] = trainTestData.testing.map(lv => lv.vector)
val mlr = MultipleLinearRegression()

.setStepsize(1.0)
.setIterations(100)
.setConvergenceThreshold(0.001)
mlr.fit(trainingData)

// The fitted model can now be used to make predictions
val predictions: DataSet[LabeledVector] = mlr.predict(testingData)

在flink-1.7.1/examples/batch/路径中,您将找到 KMeans.jar 文件。让我们运行这个 FlinkML 示例。

该示例程序使用默认点和质心数据集运行。

./bin/flink run examples/batch/KMeans.jar --output Print
质心数据集

Apache Flink - 用例

在本章中,我们将了解 Apache Flink 中的一些测试用例。

Apache Flink - 布伊格电信

布伊格电信是法国最大的电信组织之一。它拥有 11+000000 移动用户和 25+000000 固定客户。Bouygues 第一次听说 Apache Flink 是在巴黎举行的 Hadoop 小组会议上。从那时起,他们一直在多个用例中使用 Flink。他们每天通过 Apache Flink 实时处理数十亿条消息。

Bouygues 对于 Apache Flink 的评价是这样的:“我们最终选择了 Flink,因为该系统支持真正的流式传输 - 无论是在 API 层面还是在运行时层面,为我们提供了我们所寻求的可编程性和低延迟。此外,与其他解决方案相比,我们能够在更短的时间内使用 Flink 启动并运行我们的系统,从而获得更多可用的开发人员资源来扩展系统中的业务逻辑。”

在布伊格,客户体验是重中之重。他们实时分析数据,以便可以向工程师提供以下见解 -

  • 通过网络提供实时客户体验

  • 全球网络上正在发生的事情

  • 网络评估和运营

他们创建了一个名为 LUX(记录用户体验)的系统,该系统通过内部数据参考处理来自网络设备的大量日志数据,提供体验质量指标,记录客户体验并构建警报功能,以检测 60 年内数据消耗的任何故障。秒。

为了实现这一目标,他们需要一个能够实时获取大量数据、易于设置并提供丰富的 API 集来处理流数据的框架。Apache Flink 非常适合 Bouygues Telecom。

Apache Flink - 阿里巴巴

阿里巴巴是全球最大的电子商务零售公司,2015年收入为3940亿美元。阿里巴巴搜索是所有客户的入口点,它显示所有搜索并相应推荐。

阿里巴巴在其搜索引擎中使用 Apache Flink 实时显示结果,为每个用户提供最高的准确性和相关性。

阿里巴巴正在寻找一个框架,它是 -

  • 非常敏捷地为整个搜索基础架构流程维护一个代码库。

  • 为网站上产品的可用性更改提供低延迟。

  • 一致且具有成本效益。

Apache Flink 满足上述所有要求。他们需要一个框架,该框架具有单个处理引擎,并且可以使用同一引擎处理批处理和流数据,这就是 Apache Flink 所做的。

他们还使用 Blink(Flink 的分叉版本)来满足搜索的一些独特要求。他们还使用 Apache Flink 的 Table API,对搜索进行了一些改进。

阿里巴巴对 apache Flink 的评价是这样的:“回顾过去,对于 Blink 和 Flink 在阿里巴巴来说无疑是非常重要的一年。没有人想到我们会在一年内取得如此大的进步,我们非常感谢大家在社区中为我们提供帮助的人们。Flink 已被证明可以在非常大的范围内发挥作用。我们比以往任何时候都更加致力于继续与社区合作,推动 Flink 向前发展!

Apache Flink - Flink、Spark、Hadoop

这是一个综合表格,显示了三种最流行的大数据框架之间的比较:Apache Flink、Apache Spark 和 Apache Hadoop。

阿帕奇Hadoop 阿帕奇火花 阿帕奇弗林克

原产地年份

2005年 2009年 2009年

发源地

MapReduce (谷歌) Hadoop (雅虎) 加州大学伯克利分校 柏林工业大学

数据处理引擎

溪流

处理速度

比 Spark 和 Flink 慢 比 Hadoop 快 100 倍 比火花还快

编程语言

Java、C、C++、Ruby、Groovy、Perl、Python Java、Scala、Python 和 R Java 和 Scala

编程模型

映射减少 弹性分布式数据集 (RDD) 循环数据流

数据传输

流水线和批量

内存管理

基于磁盘 JVM 管理 主动管理

潜伏

低的 中等的 低的

吞吐量

中等的 高的 高的

优化

手动的 手动的 自动的

应用程序编程接口

低级 高水平 高水平

流媒体支持

不适用 火花流 Flink 流式传输

SQL支持

蜂巢、黑斑羚 SparkSQL 表 API 和 SQL

图表支持

不适用 图X 杰利

机器学习支持

不适用 SparkML FlinkML

Apache Flink - 结论

我们在上一章中看到的比较表几乎总结了这些指针。Apache Flink 是最适合实时处理和用例的框架。其单一引擎系统是独一无二的,可以使用不同的 API(例如 Dataset 和 DataStream)处理批处理和流数据。

这并不意味着 Hadoop 和 Spark 被淘汰,最适合的大数据框架的选择始终取决于用例,并且因用例而异。Hadoop 和 Flink 或 Spark 和 Flink 的组合可能适合多种用例。

尽管如此,Flink 是目前最好的实时处理框架。Apache Flink 的发展令人惊叹,其社区的贡献者数量与日俱增。

快乐闪烁!