Apache Pulsar启动了哪些服务

这篇文章主要讲解了“Apache Pulsar启动了哪些服务”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Apache Pulsar启动了哪些服务”吧!

1.启动入口

PulsarStandaloneStarter
在standalone模式下,主要启动了以下几个服务

  1. PulsarService

  2. PulsarAdmin

  3. LocalBookeeperEnsemble

  4. WorkerService

PulsarBrokerStarter.BrokerStarter
在普通模式下,启动了以下几个服务

  1. PulsarService

  2. BookieServer

  3. AutoRecoveryMain

  4. StatsProvider

  5. WorkerService

简单说一些这几个服务

  • WorkerService: Pulsar function 相关,可以不启动

  • PulsarService: 主要的PulsarBroker相关

  • BookieServer: Bookeeper相关

  • AutoRecoveryMain: Bookeeper autorecovery相关

  • StatsProvider: Metric Exporter类似的功能

2. PulsarService

PulsarService.start

  1. ProtocolHandlers
    支持不同protocol处理(kafka协议等)

  2. localZookeeperConnectionProvider
    维护zk session 和zk连接

  3. startZkCacheService

  • LocalZooKeeperCache => LocalZooKeeperCacheService

  • GlobalZooKeeperCache => ConfigurationCacheService

  • BookkeeperClientFactory
    创建配置Bookkeeper 客户端

  • managedLedgerClientFactory
    维护一个ManagedLedger的客户端,借用BookkeeperClient

  • BrokerService
    这个是服务器的主要逻辑了,这个放在后面说

  • loadManager
    收集集群机器负载,并根据负载情况均衡负载

  • startNamespaceService
    NameSpaceService,管理放置的ResourceBundle,和LoadManager相关

  • schemaStorage

  • schemaRegistryService
    上面2个都是和Schema相关的

  • defaultOffloader
    LedgerOffloader,用来将Ledger(Bookkeeper)中的冷数据放到其他存储当中

    1. WebService

    2. webSocketService
      http,websocket相关

    3. LeaderElectionService
      和LoadManager有关,如果是集中方式的话需要选出一个Leader定期根据集群情况进行均衡负载

    4. transactionMetadataStoreService
      事务相关

    5. metricGenerator
      metric相关

    6. WorkerService
      pulsar function 相关

    3. BrokerService

    public void start() throws Exception {
            // producer id 分布式生成器
            this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath,
                    pulsar.getConfiguration().getClusterName());
    
            // 网络层配置
            ServerBootstrap bootstrap = defaultServerBootstrap.clone();
    
            ServiceConfiguration serviceConfig = pulsar.getConfiguration();
    
            bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
            ...
            // 绑定端口
            listenChannel = bootstrap.bind(addr).sync().channel();
            ...
    
           // metric
            this.startStatsUpdater(
                    serviceConfig.getStatsUpdateInitialDelayInSecs(),
                    serviceConfig.getStatsUpdateFrequencyInSecs());
    
           // 启动了一堆需要定期执行的任务
            this.startInactivityMonitor();
           // 启动3个schedule任务分别检测
           // 1. 长时间无效的topic
           // 2. 长时间无效的producer(和message去重相关)
           // 3. 长时间无效的subscription
            this.startMessageExpiryMonitor();
            this.startCompactionMonitor();
            this.startMessagePublishBufferMonitor();
            this.startConsumedLedgersMonitor();
            this.startBacklogQuotaChecker();
            this.updateBrokerPublisherThrottlingMaxRate();
            this.startCheckReplicationPolicies();
    
            // register listener to capture zk-latency
            ClientCnxnAspect.addListener(zkStatsListener);
            ClientCnxnAspect.registerExecutor(pulsar.getExecutor());

    4. PulsarChannelInitializer

    顺着netty的初始化方式我们直接看ChannelInitializer,这里应该和Kafka类似进行处理请求的操作。

    protected void initChannel(SocketChannel ch) throws Exception {
            
            ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         
            ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
                brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));
    
            ch.pipeline().addLast("flowController", new FlowControlHandler());
            ServerCnx cnx = new ServerCnx(pulsar);
            ch.pipeline().addLast("handler", cnx);
    
            connections.put(ch.remoteAddress(), cnx);
        }

    5. ServerCnx

    这个类的作用可以对标KafkaApis,处理各种Api请求
    这个类实际上是一个ChannelHandler
    继承了PulsarHandler(主要负责一些连接的keepalive逻辑)
    PulsarHandler继承了 PulsarDecoder ( 主要负责序列化,反序列化Api请求)
    PulsarDecoder实际上是一个 ChannelInboundHandlerAdapter

    而PulsarAPi实际上是通过Pulsar.proto 生成的,这里编写了各种Api的定义

    感谢各位的阅读,以上就是“Apache Pulsar启动了哪些服务”的内容了,经过本文的学习后,相信大家对Apache Pulsar启动了哪些服务这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!