package io.camunda.zeebe.broker;

import io.atomix.cluster.AtomixCluster;
import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.utils.net.Address;
import io.camunda.zeebe.broker.bootstrap.BrokerContext;
import io.camunda.zeebe.broker.bootstrap.BrokerStartupContextImpl;
import io.camunda.zeebe.broker.bootstrap.BrokerStartupProcess;
import io.camunda.zeebe.broker.bootstrap.CloseProcess;
import io.camunda.zeebe.broker.bootstrap.StartProcess;
import io.camunda.zeebe.broker.clustering.ClusterServices;
import io.camunda.zeebe.broker.clustering.ClusterServicesImpl;
import io.camunda.zeebe.broker.engine.impl.SubscriptionApiCommandMessageHandlerService;
import io.camunda.zeebe.broker.exporter.repo.ExporterLoadException;
import io.camunda.zeebe.broker.exporter.repo.ExporterRepository;
import io.camunda.zeebe.broker.partitioning.PartitionManager;
import io.camunda.zeebe.broker.partitioning.PartitionManagerImpl;
import io.camunda.zeebe.broker.system.EmbeddedGatewayService;
import io.camunda.zeebe.broker.system.SystemContext;
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.configuration.ClusterCfg;
import io.camunda.zeebe.broker.system.configuration.DataCfg;
import io.camunda.zeebe.broker.system.configuration.ExporterCfg;
import io.camunda.zeebe.broker.system.configuration.SocketBindingCfg;
import io.camunda.zeebe.broker.system.configuration.backpressure.BackpressureCfg;
import io.camunda.zeebe.broker.system.management.BrokerAdminService;
import io.camunda.zeebe.broker.system.management.BrokerAdminServiceImpl;
import io.camunda.zeebe.broker.system.management.LeaderManagementRequestHandler;
import io.camunda.zeebe.broker.system.management.deployment.PushDeploymentRequestHandler;
import io.camunda.zeebe.broker.system.monitoring.BrokerHealthCheckService;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageMonitor;
import io.camunda.zeebe.broker.transport.backpressure.PartitionAwareRequestLimiter;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiServiceImpl;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.transport.ServerTransport;
import io.camunda.zeebe.transport.TransportFactory;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.LogUtil;
import io.camunda.zeebe.util.VersionUtil;
import io.camunda.zeebe.util.exception.UncheckedExecutionException;
import io.camunda.zeebe.util.jar.ExternalJarLoadException;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorScheduler;
import io.camunda.zeebe.util.sched.ConcurrencyControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.netty.util.NetUtil;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/broker/Broker.class */
public final class Broker implements AutoCloseable {
    public static final Logger LOG = Loggers.SYSTEM_LOGGER;
    private final SystemContext systemContext;
    private ClusterServicesImpl clusterServices;
    private CompletableFuture<Broker> startFuture;
    private LeaderManagementRequestHandler managementRequestHandler;
    private CommandApiServiceImpl commandApiService;
    private final ActorScheduler scheduler;
    private CloseProcess closeProcess;
    private EmbeddedGatewayService embeddedGatewayService;
    private final SpringBrokerBridge springBrokerBridge;
    private DiskSpaceUsageMonitor diskSpaceUsageMonitor;
    private BrokerAdminService brokerAdminService;
    private PartitionManagerImpl partitionManager;
    private final BrokerStartupActor brokerStartupActor;
    private BrokerContext brokerContext;
    private boolean isClosed = false;
    private final List<DiskSpaceUsageListener> diskSpaceUsageListeners = new ArrayList();
    private final TestCompanionClass testCompanionObject = new TestCompanionClass();
    private final List<PartitionListener> partitionListeners = new ArrayList();
    private final BrokerInfo localBroker = createBrokerInfo(getConfig());
    private BrokerHealthCheckService healthCheckService = new BrokerHealthCheckService(this.localBroker);

    /* loaded from: input_file:io/camunda/zeebe/broker/Broker$BrokerStartupActor.class */
    private static final class BrokerStartupActor extends Actor {
        private final BrokerStartupProcess brokerStartupProcess;
        private final int nodeId;

        private BrokerStartupActor(BrokerStartupContextImpl brokerStartupContextImpl) {
            this.nodeId = brokerStartupContextImpl.getBrokerInfo().getNodeId();
            brokerStartupContextImpl.setConcurrencyControl(this.actor);
            this.brokerStartupProcess = new BrokerStartupProcess(brokerStartupContextImpl);
        }

        public String getName() {
            return buildActorName(this.nodeId, "Startup");
        }

        private ActorFuture<BrokerContext> start() {
            ActorFuture<BrokerContext> createFuture = createFuture();
            this.actor.run(() -> {
                this.actor.runOnCompletion(this.brokerStartupProcess.start(), createFuture);
            });
            return createFuture;
        }

        private ActorFuture<Void> stop() {
            ActorFuture<Void> createFuture = createFuture();
            this.actor.run(() -> {
                this.actor.runOnCompletion(this.brokerStartupProcess.stop(), createFuture);
            });
            return createFuture;
        }
    }

    @Deprecated
    /* loaded from: input_file:io/camunda/zeebe/broker/Broker$TestCompanionClass.class */
    private static final class TestCompanionClass {
        private AtomixCluster atomix;

        private TestCompanionClass() {
        }
    }

    public Broker(SystemContext systemContext, SpringBrokerBridge springBrokerBridge) {
        this.systemContext = systemContext;
        this.springBrokerBridge = springBrokerBridge;
        this.scheduler = this.systemContext.getScheduler();
        this.brokerStartupActor = new BrokerStartupActor(new BrokerStartupContextImpl(this.localBroker, systemContext.getBrokerConfiguration(), springBrokerBridge, this.scheduler, this.healthCheckService));
        this.scheduler.submitActor(this.brokerStartupActor);
    }

    public void addPartitionListener(PartitionListener partitionListener) {
        this.partitionListeners.add(partitionListener);
    }

    public synchronized CompletableFuture<Broker> start() {
        if (this.startFuture == null) {
            logBrokerStart();
            this.startFuture = new CompletableFuture<>();
            LogUtil.doWithMDC(this.systemContext.getDiagnosticContext(), this::internalStart);
        }
        return this.startFuture;
    }

    private void logBrokerStart() {
        if (LOG.isInfoEnabled()) {
            BrokerCfg config = getConfig();
            LOG.info("Version: {}", VersionUtil.getVersion());
            LOG.info("Starting broker {} with configuration {}", Integer.valueOf(config.getCluster().getNodeId()), config.toJson());
        }
    }

    private void internalStart() {
        try {
            this.closeProcess = initStart().start();
            this.startFuture.complete(this);
            if (this.healthCheckService != null) {
                this.healthCheckService.setBrokerStarted();
            }
        } catch (Exception e) {
            LOG.error("Failed to start broker {}!", Integer.valueOf(getConfig().getCluster().getNodeId()), e);
            UncheckedExecutionException uncheckedExecutionException = new UncheckedExecutionException("Failed to start broker", e);
            this.startFuture.completeExceptionally(uncheckedExecutionException);
            throw uncheckedExecutionException;
        }
    }

    private StartProcess initStart() {
        BrokerCfg config = getConfig();
        StartProcess startProcess = new StartProcess("Broker-" + this.localBroker.getNodeId());
        startProcess.addStep("Migrated Startup Steps", this::migratedStartupSteps);
        startProcess.addStep("command api transport and handler", () -> {
            return commandApiTransportAndHandlerStep(config, this.localBroker);
        });
        startProcess.addStep("subscription api", () -> {
            return subscriptionAPIStep(this.localBroker);
        });
        startProcess.addStep("cluster services", () -> {
            this.clusterServices.start().join();
        });
        if (config.getGateway().isEnable()) {
            startProcess.addStep("embedded gateway", () -> {
                this.embeddedGatewayService = new EmbeddedGatewayService(config, this.scheduler, this.clusterServices.getMessagingService(), this.clusterServices.getMembershipService(), this.clusterServices.getEventService());
                return this.embeddedGatewayService;
            });
        }
        startProcess.addStep("disk space monitor", () -> {
            return diskSpaceMonitorStep(config.getData());
        });
        startProcess.addStep("leader management request handler", () -> {
            return managementRequestStep(this.localBroker);
        });
        startProcess.addStep("zeebe partitions", () -> {
            return partitionsStep(config, this.localBroker);
        });
        startProcess.addStep("register diskspace usage listeners", this::addDiskSpaceUsageListeners);
        startProcess.addStep("upgrade manager", this::addBrokerAdminService);
        return startProcess;
    }

    private AutoCloseable migratedStartupSteps() {
        this.brokerContext = (BrokerContext) this.brokerStartupActor.start().join();
        this.partitionListeners.addAll(this.brokerContext.getPartitionListeners());
        this.clusterServices = this.brokerContext.getClusterServices();
        this.testCompanionObject.atomix = this.clusterServices.getAtomixCluster();
        return () -> {
            this.brokerStartupActor.stop().join();
            this.healthCheckService = null;
        };
    }

    private BrokerInfo createBrokerInfo(BrokerCfg brokerCfg) {
        ClusterCfg cluster = brokerCfg.getCluster();
        BrokerInfo brokerInfo = new BrokerInfo(cluster.getNodeId(), NetUtil.toSocketAddressString(brokerCfg.getNetwork().getCommandApi().getAdvertisedAddress()));
        brokerInfo.setClusterSize(cluster.getClusterSize()).setPartitionsCount(cluster.getPartitionsCount()).setReplicationFactor(cluster.getReplicationFactor());
        String version = VersionUtil.getVersion();
        if (version != null && !version.isBlank()) {
            brokerInfo.setVersion(version);
        }
        return brokerInfo;
    }

    private AutoCloseable addBrokerAdminService() {
        ConcurrencyControl brokerAdminServiceImpl = new BrokerAdminServiceImpl();
        scheduleActor(brokerAdminServiceImpl);
        brokerAdminServiceImpl.injectAdminAccess(this.partitionManager.createAdminAccess(brokerAdminServiceImpl));
        brokerAdminServiceImpl.injectPartitionInfoSource(this.partitionManager.getPartitions());
        this.brokerAdminService = brokerAdminServiceImpl;
        this.springBrokerBridge.registerBrokerAdminServiceSupplier(() -> {
            return this.brokerAdminService;
        });
        return brokerAdminServiceImpl;
    }

    private AutoCloseable commandApiTransportAndHandlerStep(BrokerCfg brokerCfg, BrokerInfo brokerInfo) {
        ManagedMessagingService createMessagingService = createMessagingService(brokerCfg.getCluster(), brokerCfg.getNetwork().getCommandApi());
        createMessagingService.start().join();
        LOG.debug("Bound command API to {}, using advertised address {} ", createMessagingService.bindingAddresses(), createMessagingService.address());
        ServerTransport createServerTransport = new TransportFactory(this.scheduler).createServerTransport(brokerInfo.getNodeId(), createMessagingService);
        BackpressureCfg backpressure = brokerCfg.getBackpressure();
        PartitionAwareRequestLimiter newNoopLimiter = PartitionAwareRequestLimiter.newNoopLimiter();
        if (backpressure.isEnabled()) {
            newNoopLimiter = PartitionAwareRequestLimiter.newLimiter(backpressure);
        }
        this.commandApiService = new CommandApiServiceImpl(createServerTransport, brokerInfo, newNoopLimiter, this.scheduler, brokerCfg.getExperimental().getQueryApi());
        this.partitionListeners.add(this.commandApiService);
        scheduleActor(this.commandApiService);
        this.diskSpaceUsageListeners.add(this.commandApiService);
        return () -> {
            this.commandApiService.close();
            createServerTransport.close();
            createMessagingService.stop().join();
        };
    }

    private ManagedMessagingService createMessagingService(ClusterCfg clusterCfg, SocketBindingCfg socketBindingCfg) {
        MessagingConfig messagingConfig = new MessagingConfig();
        messagingConfig.setInterfaces(List.of(socketBindingCfg.getHost()));
        messagingConfig.setPort(Integer.valueOf(socketBindingCfg.getPort()));
        return new NettyMessagingService(clusterCfg.getClusterName(), Address.from(socketBindingCfg.getAdvertisedHost(), socketBindingCfg.getAdvertisedPort()), messagingConfig);
    }

    private AutoCloseable subscriptionAPIStep(BrokerInfo brokerInfo) {
        Actor subscriptionApiCommandMessageHandlerService = new SubscriptionApiCommandMessageHandlerService(brokerInfo, this.clusterServices.getCommunicationService());
        this.partitionListeners.add(subscriptionApiCommandMessageHandlerService);
        scheduleActor(subscriptionApiCommandMessageHandlerService);
        this.diskSpaceUsageListeners.add(subscriptionApiCommandMessageHandlerService);
        return subscriptionApiCommandMessageHandlerService;
    }

    private void addDiskSpaceUsageListeners() {
        List<DiskSpaceUsageListener> list = this.diskSpaceUsageListeners;
        DiskSpaceUsageMonitor diskSpaceUsageMonitor = this.diskSpaceUsageMonitor;
        Objects.requireNonNull(diskSpaceUsageMonitor);
        list.forEach(diskSpaceUsageMonitor::addDiskUsageListener);
    }

    private void scheduleActor(Actor actor) {
        this.systemContext.getScheduler().submitActor(actor).join();
    }

    private AutoCloseable diskSpaceMonitorStep(DataCfg dataCfg) {
        try {
            FileUtil.ensureDirectoryExists(new File(dataCfg.getDirectory()).toPath());
            this.diskSpaceUsageMonitor = new DiskSpaceUsageMonitor(dataCfg);
            if (!dataCfg.isDiskUsageMonitoringEnabled()) {
                LOG.info("Skipping start of disk space usage monitor, as it is disabled by configuration");
                return () -> {
                };
            }
            scheduleActor(this.diskSpaceUsageMonitor);
            this.diskSpaceUsageListeners.forEach(diskSpaceUsageListener -> {
                this.diskSpaceUsageMonitor.addDiskUsageListener(diskSpaceUsageListener);
            });
            return () -> {
                this.diskSpaceUsageMonitor.close();
            };
        } catch (IOException e) {
            throw new UncheckedIOException("Failed to create data directory", e);
        }
    }

    private AutoCloseable managementRequestStep(BrokerInfo brokerInfo) {
        this.managementRequestHandler = new LeaderManagementRequestHandler(brokerInfo, this.clusterServices.getCommunicationService(), this.clusterServices.getEventService());
        scheduleActor(this.managementRequestHandler);
        this.partitionListeners.add(this.managementRequestHandler);
        this.diskSpaceUsageListeners.add(this.managementRequestHandler);
        return this.managementRequestHandler;
    }

    private AutoCloseable partitionsStep(BrokerCfg brokerCfg, BrokerInfo brokerInfo) {
        ActorScheduler actorScheduler = this.scheduler;
        ClusterServicesImpl clusterServicesImpl = this.clusterServices;
        BrokerHealthCheckService brokerHealthCheckService = this.healthCheckService;
        PushDeploymentRequestHandler pushDeploymentRequestHandler = this.managementRequestHandler.getPushDeploymentRequestHandler();
        List<DiskSpaceUsageListener> list = this.diskSpaceUsageListeners;
        Objects.requireNonNull(list);
        this.partitionManager = new PartitionManagerImpl(actorScheduler, brokerCfg, brokerInfo, clusterServicesImpl, brokerHealthCheckService, pushDeploymentRequestHandler, (v1) -> {
            r9.add(v1);
        }, this.partitionListeners, this.commandApiService, buildExporterRepository(brokerCfg));
        this.partitionManager.start().join();
        return () -> {
            this.partitionManager.stop().join();
            this.partitionManager = null;
        };
    }

    private ExporterRepository buildExporterRepository(BrokerCfg brokerCfg) {
        ExporterRepository exporterRepository = new ExporterRepository();
        for (Map.Entry<String, ExporterCfg> entry : brokerCfg.getExporters().entrySet()) {
            String key = entry.getKey();
            ExporterCfg value = entry.getValue();
            try {
                exporterRepository.load(key, value);
            } catch (ExporterLoadException | ExternalJarLoadException e) {
                throw new IllegalStateException("Failed to load exporter with configuration: " + value, e);
            }
        }
        return exporterRepository;
    }

    public BrokerCfg getConfig() {
        return this.systemContext.getBrokerConfiguration();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        LogUtil.doWithMDC(this.systemContext.getDiagnosticContext(), () -> {
            if (this.isClosed || this.startFuture == null) {
                return;
            }
            this.startFuture.thenAccept(broker -> {
                this.closeProcess.closeReverse();
                this.isClosed = true;
                LOG.info("Broker shut down.");
            }).join();
        });
    }

    public EmbeddedGatewayService getEmbeddedGatewayService() {
        return this.embeddedGatewayService;
    }

    @Deprecated
    public AtomixCluster getAtomixCluster() {
        return this.testCompanionObject.atomix;
    }

    public ClusterServices getClusterServices() {
        return this.clusterServices;
    }

    public DiskSpaceUsageMonitor getDiskSpaceUsageMonitor() {
        return this.diskSpaceUsageMonitor;
    }

    public BrokerAdminService getBrokerAdminService() {
        return this.brokerAdminService;
    }

    public SystemContext getSystemContext() {
        return this.systemContext;
    }

    public PartitionManager getPartitionManager() {
        return this.partitionManager;
    }
}
