什么是Kafka Global Table (GlobalKTable)

在kafka中,有三个重要的抽象,分别为KStream,KTable和GlobalKTable。本文主要讲解GlobalTable。

Global table存在的理由

Kafka根据数据的key来分区,一般情况下相同的key会存入相同的分区中,如果使用两个KTable来进行join操作,那么join的结果需要进一步在硬盘中进行分区操作。而硬盘的不断寻址读写操作会严重影响性能。在流操作应用中,经常会用到join操作来合并两张表,并且其中的一个或者两个的数据集很小,而且改动不频繁。所以,基于两张表的特点,为了解决这个性能问题,提出了global table。

小结一下:

  1. 数据的特点:两个join操作中一个或者两个表比较小并且update的频率不高
  2. 问题:现有的KTable将数据在硬盘中分区。频繁的硬盘的读写影响系统性能。

Global Table

它是一个高层的抽象。假设A join B,其中A和B是两个数据集,其中B大小不大,存在global table中。A在kafka系统中被根据其key分区。在join操作时,B会整个儿复制到A分区相应的Kafka节点上面,这样所有的B数据都能够被A直接使用到。并且这个join操作不是基于A和B两个数据集的key的,B数据集并没有被分区。Join之后的结果不存在再分区的过程,这样子就提高了系统的性能。

如果不使用global table,而是使用KTable,其操作图是下面这个样子的:

什么是Kafka Global Table (GlobalKTable)

那么使用global table之后,其为:
什么是Kafka Global Table (GlobalKTable)
也就是少了红圈中的内容。

而对应的代码分别为如下所示:
代码1:使用KTable

final KStream<Long, Purchase> purchases = builder.stream("purchase");
final KTable<Long, Customer> customers = builder.table("customer", "customer-store");
final KTable<Long, Product> products = builder.table("product", "product-store");
 
     
// re-key purchases stream on customerId
purchases.map((key, purchase) -> KeyValue.pair(purchase.customerId(), purchase))
        // join to customers. This will create an intermediate topic to repartition
        // the purchases stream based on the customerId
        .leftJoin(customers, EnrichedPurchase::new)
        // re-key enrichedPurchase based on productId
        .map((key, enrichedPurchase) ->
                   KeyValue.pair(enrichedPurchase.productId(), enrichedPurchase))
        // join to products. This will create an intermediate topic to repartition
        // the previous intermediate topic based on productId
        .leftJoin(products, EnrichedPurchase::withProduct);

代码2: 使用global table

final KStream<Long, Purchase> purchases = builder.stream("purchase");
final GlobalKTable<Long, Customer> customers = builder.globalTable("customer", "customer-store");
final GlobalKTable<Long, Product> products = builder.globalTable("product", "product-store");
 
 
// join to the customers table by providing a mapping to the customerId
purchases.leftJoin(customers,
                   ((key, purchase) -> KeyValue.pair(purchase.customerId(), purchase),
                      EnrichedPurchase::new)
          // join to the products table by providing a mapping to the productId
          .leftJoin(products,
                    KeyValue.pair(enrichedPurchase.productId(), enrichedPurchase),
                    EnrichedPurchase::withProduct);

结束! 谢谢!

参考:

文章1
文章2