这篇文章主要介绍“RocketMQ broker启动流程是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“RocketMQ broker启动流程是什么”文章能帮助大家解决问题。
1. 启动入口
本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
前面我们已经分析完了
NameServerproducerBrokerbrokerorg.apache.rocketmq.broker.BrokerStartuppublic class BrokerStartup {
    ...
    public static void main(String[] args) {
        start(createBrokerController(args));
    }
    ...
}在
main()createBrokerController(...)BrokerControllerstart(...)Broker接下来我们就来分析这两个操作。
2. 创建BrokerController
创建
BrokerControllerBrokerStartup#createBrokerController/**
 * 创建 broker 的配置参数
 */
public static BrokerController createBrokerController(String[] args) {
    ...
    try {
        //解析命令行参数
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
            new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
        }
        // 处理配置
        final BrokerConfig brokerConfig = new BrokerConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        final NettyClientConfig nettyClientConfig = new NettyClientConfig();
        // tls安全相关
        nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
            String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
        // 配置端口
        nettyServerConfig.setListenPort(10911);
        // 消息存储的配置
        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
        ...
        // 将命令行中的配置设置到brokerConfig对象中
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
        // 检查环境变量:ROCKETMQ_HOME
        if (null == brokerConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match 
                the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }
        //省略一些配置
        ...
        // 创建 brokerController
        final BrokerController controller = new BrokerController(
            brokerConfig,
            nettyServerConfig,
            nettyClientConfig,
            messageStoreConfig);
        controller.getConfiguration().registerConfig(properties);
        // 初始化
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        // 关闭钩子,在关闭前处理一些操作
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            private volatile boolean hasShutdown = false;
            private AtomicInteger shutdownTimes = new AtomicInteger(0);
            @Override
            public void run() {
                synchronized (this) {
                    if (!this.hasShutdown) {
                        ...
                        // 这里会发一条注销消息给nameServer
                        controller.shutdown();
                        ...
                    }
                }
            }
        }, "ShutdownHook"));
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }
    return null;
}这个方法的代码有点长,但功能并不多,总的来说就三个功能:
处理配置:主要是处理
nettyServerConfignettyClientConfigNameServer创建及初始化
controllercontroller.initialize()注册关闭钩子:调用
Runtime.getRuntime().addShutdownHook(...)2.1 controller实例化
BrokerControllerBrokerStartup#createBrokerControllerpublic BrokerController(
    final BrokerConfig brokerConfig,
    final NettyServerConfig nettyServerConfig,
    final NettyClientConfig nettyClientConfig,
    final MessageStoreConfig messageStoreConfig
) {
    // 4个核心配置信息
    this.brokerConfig = brokerConfig;
    this.nettyServerConfig = nettyServerConfig;
    this.nettyClientConfig = nettyClientConfig;
    this.messageStoreConfig = messageStoreConfig;
    // 管理consumer消费消息的offset
    this.consumerOffsetManager = new ConsumerOffsetManager(this);
    // 管理topic配置
    this.topicConfigManager = new TopicConfigManager(this);
    // 处理 consumer 拉消息请求的
    this.pullMessageProcessor = new PullMessageProcessor(this);
    this.pullRequestHoldService = new PullRequestHoldService(this);
    // 消息送达的监听器
    this.messageArrivingListener 
        = new NotifyMessageArrivingListener(this.pullRequestHoldService);
    ...
    // 往外发消息的组件
    this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
    ...
}BrokerController核心配置赋值:主要是
brokerConfignettyServerConfignettyClientConfigmessageStoreConfigConsumerOffsetManagerconsumertopictopicConfigManagertopictopictopictopicpullMessageProcessormessageArrivingListenerbrokerOuterAPINameServer以上这些组件的用处,这里先混个脸熟,我们后面再分析。
2.2 初始化controller
我们再来看看初始化操作,方法为
BrokerController#initializepublic boolean initialize() throws CloneNotSupportedException {
    // 加载配置文件中的配置
    boolean result = this.topicConfigManager.load();
    result = result && this.consumerOffsetManager.load();
    result = result && this.subscriptionGroupManager.load();
    result = result && this.consumerFilterManager.load();
    if (result) {
        try {
            // 消息存储管理组件,管理磁盘上的消息
            this.messageStore =
                new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
                    this.messageArrivingListener, this.brokerConfig);
            // 启用了DLeger,就创建DLeger相关组件
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                ...
            }
            // broker统计组件
            this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
            //load plugin
            MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, 
                brokerStatsManager, messageArrivingListener, brokerConfig);
            this.messageStore = MessageStoreFactory.build(context, this.messageStore);
            this.messageStore.getDispatcherList().addFirst(
                new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
        } catch (IOException e) {
            result = false;
            log.error("Failed to initialize", e);
        }
    }
    // 加载磁盘上的记录,如commitLog写入的位置、消费者主题/队列的信息
    result = result && this.messageStore.load();
    if (result) {
        // 处理 nettyServer
        this.remotingServer = new NettyRemotingServer(
            this.nettyServerConfig, this.clientHousekeepingService);
        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
        fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
        this.fastRemotingServer = new NettyRemotingServer(
            fastConfig, this.clientHousekeepingService);
        // 创建线程池start... 这里会创建多种类型的线程池
        ...
        // 处理consumer pull操作的线程池
        this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getPullMessageThreadPoolNums(),
            this.brokerConfig.getPullMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.pullThreadPoolQueue,
            new ThreadFactoryImpl("PullMessageThread_"));
        ...
        // 创建线程池end...
        // 注册处理器
        this.registerProcessor();
        // 启动定时任务start... 这里会启动好多的定时任务
        ...
        // 定时将consumer消费到的offset进行持久化操作,即将数据保存到磁盘上
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.consumerOffsetManager.persist();
                } catch (Throwable e) {
                    log.error("schedule persist consumerOffset error.", e);
                }
            }
        }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
        ...
        // 启动定时任务end...
        ...
        // 开启 DLeger 的一些操作
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            ...
        }
        // 处理tls配置
        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            ...
        }
        // 初始化一些操作
        initialTransaction();
        initialAcl();
        initialRpcHooks();
    }
    return result;
}这个还是很长,关键部分都做了注释,该方法所做的工作如下:
加载配置文件中的配置
赋值与初始化操作
创建线程池
注册处理器
启动定时任务
这里我们来看下注册处理器的操作
this.registerProcessor()2.2.1 注册处理器:BrokerController#registerProcessor
this.registerProcessor()BrokerController#registerProcessorpublic void registerProcessor() {
    /**
     * SendMessageProcessor
     */
    SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
    sendProcessor.registerSendMessageHook(sendMessageHookList);
    sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, 
        this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,  
        this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, 
        this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, 
        this.sendMessageExecutor);
    ...
    /**
     * PullMessageProcessor
     */
    this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, 
        this.pullMessageExecutor);
    this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
    /**
        * ReplyMessageProcessor
        */
    ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
    replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
    ...
}这个方法里注册了许许多多的处理器,这里仅列出了与消息相关的内容,如发送消息、回复消息、拉取消息等,后面在处理
producerconsumer2.2.2 remotingServer注册处理器:NettyRemotingServer#registerProcessor
我们来看下
remotingServerNettyRemotingServer#registerProcessorpublic class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    ...
    @Override
    public void registerProcessor(int requestCode, NettyRequestProcessor processor, 
            ExecutorService executor) {
        ExecutorService executorThis = executor;
        if (null == executor) {
            executorThis = this.publicExecutor;
        }
        Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, 
                ExecutorService>(processor, executorThis);
        // 注册到processorTable 中
        this.processorTable.put(requestCode, pair);
    }
    ...
}最终,这些处理器注册到了
processorTableNettyRemotingAbstractHashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>>这是一个
hashMapkeycodevaluePairNettyRequestProcessorExecutorServicecodeNettyRequestProcessorhashMap2.3 注册关闭钩子:
Runtime.getRuntime().addShutdownHook(...)接着我们来看看注册关闭钩子的操作:
// 关闭钩子,在关闭前处理一些操作
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    private volatile boolean hasShutdown = false;
    private AtomicInteger shutdownTimes = new AtomicInteger(0);
    @Override
    public void run() {
        synchronized (this) {
            if (!this.hasShutdown) {
                ...
                // 这里会发一条注销消息给nameServer
                controller.shutdown();
                ...
            }
        }
    }
}, "ShutdownHook"));跟进
BrokerController#shutdownpublic void shutdown() {
    // 调用各组件的shutdown方法
    ...
    // 发送注销消息到NameServer
    this.unregisterBrokerAll();
    ...
    // 持久化consumer的消费偏移量
    this.consumerOffsetManager.persist();
    // 又是调用各组件的shutdown方法
    ...这个方法里会调用各组件的
shutdown()NameServerBrokerController#unregisterBrokerAllprivate void unregisterBrokerAll() {
    // 发送一条注销消息给nameServer
    this.brokerOuterAPI.unregisterBrokerAll(
        this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId());
}继续进入
BrokerOuterAPI#unregisterBrokerAllpublic void unregisterBrokerAll(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId
) {
    // 获取所有的 nameServer,遍历发送注销消息
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null) {
        for (String namesrvAddr : nameServerAddressList) {
            try {
                this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
                log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
            } catch (Exception e) {
                log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
            }
        }
    }
}这个方法里,会获取到所有的
nameServerBrokerOuterAPI#unregisterBrokerpublic void unregisterBroker(
    final String namesrvAddr,
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId
) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, 
        InterruptedException, MQBrokerException {
    UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
    requestHeader.setBrokerAddr(brokerAddr);
    requestHeader.setBrokerId(brokerId);
    requestHeader.setBrokerName(brokerName);
    requestHeader.setClusterName(clusterName);
    // 发送的注销消息:RequestCode.UNREGISTER_BROKER
    RemotingCommand request = RemotingCommand.createRequestCommand(
            c, requestHeader);
    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return;
        }
        default:
            break;
    }
    throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
}最终调用的是
RemotingClient#invokeSynccodeRequestCode.UNREGISTER_BROKERNameServerbroker3. 启动Broker:start(...)
我们再来看看
BrokerBrokerController#startpublic void start() throws Exception {
    // 启动各组件
    // 启动消息存储相关组件
    if (this.messageStore != null) {
        this.messageStore.start();
    }
    // 启动 remotingServer,其实就是启动一个netty服务,用来接收producer传来的消息
    if (this.remotingServer != null) {
        this.remotingServer.start();
    }
    ...
    // broker对外发放消息的组件,向nameServer上报存活消息时使用了它,也是一个netty服务
    if (this.brokerOuterAPI != null) {
        this.brokerOuterAPI.start();
    }
    ...
    // broker 核心的心跳注册任务
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false, 
                    brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
        // brokerConfig.getRegisterNameServerPeriod() 值为 1000 * 30,最终计算得到默认30秒执行一次
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), 
            TimeUnit.MILLISECONDS);
    ...
}这个方法主要就是启动各组件了,这里列出了几大重要组件的启动:
messageStorecommitLogflushcomsumeQueueflushremotingServernettyproducerbrokerOuterAPInettynameServer启动定时任务:
brokernameServer这里我们重点来看定时任务是如何发送心跳发送的。
处理注册消息发送的时间间隔如下:
Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)这行代码看着长,但意思就一句话:时间间隔可以自行配置,但不能小于10s,不能大于60s,默认是30s.
处理消息注册的方法为
BrokerController#registerBrokerAll(...)public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
        boolean oneway, boolean forceRegister) {
    TopicConfigSerializeWrapper topicConfigWrapper 
            = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    // 处理topic相关配置
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
        || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
        ...
    }
    // 这里会判断是否需要进行注册
    if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.brokerConfig.getRegisterBrokerTimeoutMills())) {
        // 进行注册操作    
        doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
    }
}这个方法就是用来处理注册操作的,不过注册前会先验证下是否需要注册,验证是否需要注册的方法为
BrokerController#needRegisterprivate boolean needRegister(final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final int timeoutMills) {
    TopicConfigSerializeWrapper topicConfigWrapper 
        = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    // 判断是否需要进行注册
    List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, 
        brokerId, topicConfigWrapper, timeoutMills);
    // 有一个发生了变化,就表示需要注册了    
    boolean needRegister = false;
    for (Boolean changed : changeList) {
     &						
						
						
						
						
						
						
					以上就是RocketMQ broker启动流程是什么的详细内容,更多关于RocketMQ broker启动流程是什么的资料请关注九品源码其它相关文章!