• Batch 示例
    • 运行一个示例
    • Word Count
    • Page Rank
    • Connected Components(连通组件算法)

    Batch 示例

    以下示例展示了 Flink 从简单的WordCount到图算法的应用。示例代码展示了 Flink’s DataSet API 的使用。

    完整的源代码可以在 Flink 源代码库的 flink-examples-batch 模块找到。

    运行一个示例

    在开始运行一个示例前,我们假设你已经有了 Flink 的运行示例。导航栏中的“快速开始(Quickstart)”和“安装(Setup)” 标签页提供了启动 Flink 的不同方法。

    最简单的方法就是执行 ./bin/start-cluster.sh,从而启动一个只有一个 JobManager 和 TaskManager 的本地 Flink 集群。

    每个 Flink 的 binary release 都会包含一个examples(示例)目录,其中可以找到这个页面上每个示例的 jar 包文件。

    可以通过执行以下命令来运行WordCount 示例:

    1. ./bin/flink run ./examples/batch/WordCount.jar

    其他的示例也可以通过类似的方式执行。

    注意很多示例在不传递执行参数的情况下都会使用内置数据。如果需要利用 WordCount 程序计算真实数据,你需要传递存储数据的文件路径。

    1. ./bin/flink run ./examples/batch/WordCount.jar --input /path/to/some/text/data --output /path/to/result

    注意非本地文件系统需要一个对应前缀,例如 hdfs://

    Word Count

    WordCount 是大数据系统中的 “Hello World”。他可以计算一个文本集合中不同单词的出现频次。这个算法分两步进行: 第一步,把所有文本切割成单独的单词。第二步,把单词分组并分别统计。

    1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    2. DataSet<String> text = env.readTextFile("/path/to/file");
    3. DataSet<Tuple2<String, Integer>> counts =
    4. // 把每一行文本切割成二元组,每个二元组为: (word,1)
    5. text.flatMap(new Tokenizer())
    6. // 根据二元组的第“0”位分组,然后对第“1”位求和
    7. .groupBy(0)
    8. .sum(1);
    9. counts.writeAsCsv(outputPath, "\n", " ");
    10. // 自定义函数
    11. public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
    12. @Override
    13. public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    14. // 统一大小写并把每一行切割为单词
    15. String[] tokens = value.toLowerCase().split("\\W+");
    16. // 消费二元组
    17. for (String token : tokens) {
    18. if (token.length() > 0) {
    19. out.collect(new Tuple2<String, Integer>(token, 1));
    20. }
    21. }
    22. }
    23. }

    WordCount 示例增加如下执行参数: —input <path> —output <path>即可实现上述算法。 任何文本文件都可作为测试数据使用。

    1. val env = ExecutionEnvironment.getExecutionEnvironment
    2. // 获取输入数据
    3. val text = env.readTextFile("/path/to/file")
    4. val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
    5. .map { (_, 1) }
    6. .groupBy(0)
    7. .sum(1)
    8. counts.writeAsCsv(outputPath, "\n", " ")

    WordCount 示例增加如下执行参数: —input <path> —output <path>即可实现上述算法。 任何文本文件都可作为测试数据使用。

    Page Rank

    PageRank算法可以计算互联网中一个网页的重要性,这个重要性通过由一个页面指向其他页面的链接定义。PageRank 算法是一个重复执行相同运算的迭代图算法。在每一次迭代中,每个页面把他当前的 rank 值分发给他所有的邻居节点,并且通过他收到邻居节点的 rank 值更新自身的 rank 值。PageRank 算法因 Google 搜索引擎的使用而流行,它根据网页的重要性来对搜索结果进行排名。

    在这个简单的示例中,PageRank 算法由一个批量迭代和一些固定次数的迭代实现。

    1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    2. // 通过解析一个CSV文件来获取每个页面原始的rank值
    3. DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
    4. .types(Long.class, Double.class)
    5. // 链接被编码为邻接表: (page-id, Array(neighbor-ids))
    6. DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);
    7. // 设置迭代数据集合
    8. IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
    9. DataSet<Tuple2<Long, Double>> newRanks = iteration
    10. // 为每个页面匹配其对应的出边,并发送rank值
    11. .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
    12. // 收集并计算新的rank值
    13. .groupBy(0).sum(1)
    14. // 施加阻尼系数
    15. .map(new Dampener(DAMPENING_FACTOR, numPages));
    16. DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
    17. newRanks,
    18. newRanks.join(iteration).where(0).equalTo(0)
    19. // 结束条件
    20. .filter(new EpsilonFilter()));
    21. finalPageRanks.writeAsCsv(outputPath, "\n", " ");
    22. // 自定义函数
    23. public static final class JoinVertexWithEdgesMatch
    24. implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>,
    25. Tuple2<Long, Double>> {
    26. @Override
    27. public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj,
    28. Collector<Tuple2<Long, Double>> out) {
    29. Long[] neighbors = adj.f1;
    30. double rank = page.f1;
    31. double rankToDistribute = rank / ((double) neigbors.length);
    32. for (int i = 0; i < neighbors.length; i++) {
    33. out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute));
    34. }
    35. }
    36. }
    37. public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
    38. private final double dampening, randomJump;
    39. public Dampener(double dampening, double numVertices) {
    40. this.dampening = dampening;
    41. this.randomJump = (1 - dampening) / numVertices;
    42. }
    43. @Override
    44. public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
    45. value.f1 = (value.f1 * dampening) + randomJump;
    46. return value;
    47. }
    48. }
    49. public static final class EpsilonFilter
    50. implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
    51. @Override
    52. public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
    53. return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
    54. }
    55. }

    PageRank代码实现了以上示例。他需要以下参数来运行: —pages <path> —links <path> —output <path> —numPages <n> —iterations <n>

    1. // 自定义类型
    2. case class Link(sourceId: Long, targetId: Long)
    3. case class Page(pageId: Long, rank: Double)
    4. case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
    5. // 初始化执行环境
    6. val env = ExecutionEnvironment.getExecutionEnvironment
    7. // 通过解析一个CSV文件来获取每个页面原始的rank值
    8. val pages = env.readCsvFile[Page](pagesInputPath)
    9. // 链接被编码为邻接表: (page-id, Array(neighbor-ids))
    10. val links = env.readCsvFile[Link](linksInputPath)
    11. // 将原始rank值赋给每个页面
    12. val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
    13. // 通过输入链接建立邻接表
    14. val adjacencyLists = links
    15. // initialize lists
    16. .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
    17. // concatenate lists
    18. .groupBy("sourceId").reduce {
    19. (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
    20. }
    21. // 开始迭代
    22. val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
    23. currentRanks =>
    24. val newRanks = currentRanks
    25. // 发送rank值给目标页面
    26. .join(adjacencyLists).where("pageId").equalTo("sourceId") {
    27. (page, adjacent, out: Collector[Page]) =>
    28. for (targetId <- adjacent.targetIds) {
    29. out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
    30. }
    31. }
    32. // 收集rank值并求和更新
    33. .groupBy("pageId").aggregate(SUM, "rank")
    34. // 施加阻尼系数
    35. .map { p =>
    36. Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
    37. }
    38. // 如果没有明显的rank更新则停止迭代
    39. val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
    40. (current, next, out: Collector[Int]) =>
    41. // check for significant update
    42. if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
    43. }
    44. (newRanks, termination)
    45. }
    46. val result = finalRanks
    47. // 输出结果
    48. result.writeAsCsv(outputPath, "\n", " ")

    PageRank代码 实现了以上示例。他需要以下参数来执行: —pages <path> —links <path> —output <path> —numPages <n> —iterations <n>

    输入文件是纯文本文件,并且必须存为以下格式:

    • 页面被表示为一个长整型(long)ID并由换行符分割
      • 例如 "1\n2\n12\n42\n63\n" 给出了ID为 1, 2, 12, 42和63的五个页面。
    • 链接由空格分割的两个页面ID来表示。每个链接由换行符来分割。
      • 例如 "1 2\n2 12\n1 12\n42 63\n" 表示了以下四个有向链接: (1)->(2), (2)->(12), (1)->(12) 和 (42)->(63).

    这个简单的实现版本要求每个页面至少有一个入链接和一个出链接(一个页面可以指向自己)。

    Connected Components(连通组件算法)

    Connected Components 通过给相连的顶点相同的组件ID来标示出一个较大的图中的连通部分。类似PageRank,Connected Components 也是一个迭代算法。在每一次迭代中,每个顶点把他现在的组件ID传播给所有邻居顶点。当一个顶点接收到的组件ID小于他自身的组件ID时,这个顶点也更新其组件ID为这个新组件ID。

    这个代码实现使用了增量迭代: 没有改变其组件 ID 的顶点不会参与下一轮迭代。这种方法会带来更好的性能,因为后面的迭代可以只处理少量的需要计算的顶点。

    1. // 读取顶点和边的数据
    2. DataSet<Long> vertices = getVertexDataSet(env);
    3. DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());
    4. // 分配初始的组件ID(等于每个顶点的ID)
    5. DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
    6. // 开始一个增量迭代
    7. DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
    8. verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);
    9. // 应用迭代计算逻辑:
    10. DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
    11. // 链接相应的边
    12. .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
    13. // 选出最小的邻居组件ID
    14. .groupBy(0).aggregate(Aggregations.MIN, 1)
    15. // 如果邻居的组件ID更小则进行更新
    16. .join(iteration.getSolutionSet()).where(0).equalTo(0)
    17. .flatMap(new ComponentIdFilter());
    18. // 停止增量迭代 (增量和新的数据集是相同的)
    19. DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
    20. // 输出结果
    21. result.writeAsCsv(outputPath, "\n", " ");
    22. // 自定义函数
    23. public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {
    24. @Override
    25. public Tuple2<T, T> map(T vertex) {
    26. return new Tuple2<T, T>(vertex, vertex);
    27. }
    28. }
    29. public static final class UndirectEdge
    30. implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
    31. Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
    32. @Override
    33. public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
    34. invertedEdge.f0 = edge.f1;
    35. invertedEdge.f1 = edge.f0;
    36. out.collect(edge);
    37. out.collect(invertedEdge);
    38. }
    39. }
    40. public static final class NeighborWithComponentIDJoin
    41. implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
    42. @Override
    43. public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
    44. return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
    45. }
    46. }
    47. public static final class ComponentIdFilter
    48. implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
    49. Tuple2<Long, Long>> {
    50. @Override
    51. public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value,
    52. Collector<Tuple2<Long, Long>> out) {
    53. if (value.f0.f1 < value.f1.f1) {
    54. out.collect(value.f0);
    55. }
    56. }
    57. }

    ConnectedComponents代码 实现了以上示例。他需要以下参数来运行: —vertices <path> —edges <path> —output <path> —iterations <n>

    1. // 初始化运行环境
    2. val env = ExecutionEnvironment.getExecutionEnvironment
    3. // 读顶点和边的数据
    4. // 分配初始的组件ID(等于每个顶点的ID)
    5. val vertices = getVerticesDataSet(env).map { id => (id, id) }
    6. // 通过发出每条输入边自身和他的反向边得到无向边
    7. val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
    8. // 开始增量迭代
    9. val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
    10. (s, ws) =>
    11. // 开始迭代逻辑: 链接相应的边
    12. val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
    13. (edge._2, vertex._2)
    14. }
    15. // 选择组件ID最小的邻居节点
    16. val minNeighbors = allNeighbors.groupBy(0).min(1)
    17. // 如果邻居的ID更小则更新
    18. val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
    19. (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
    20. if (newVertex._2 < oldVertex._2) out.collect(newVertex)
    21. }
    22. // 增量和新的数据集是一致的
    23. (updatedComponents, updatedComponents)
    24. }
    25. verticesWithComponents.writeAsCsv(outputPath, "\n", " ")

    ConnectedComponents代码 实现了以上示例。他需要以下参数来运行: —vertices <path> —edges <path> —output <path> —iterations <n>

    输入文件是纯文本文件并且必须被存储为如下格式:

    • 顶点被表示为 ID,并且由换行符分隔。
      • 例如 "1\n2\n12\n42\n63\n" 表示 (1), (2), (12), (42) 和 (63)五个顶点。
    • 边被表示为空格分隔的顶点对。边由换行符分隔:
      • 例如 "1 2\n2 12\n1 12\n42 63\n" 表示四条无向边: (1)-(2), (2)-(12), (1)-(12), and (42)-(63)。