/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import java.io.File;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.yarn.Utils;
import org.apache.flink.yarn.YarnFlinkResourceManager;
import org.apache.flink.yarn.YarnJobManager;
import org.apache.flink.yarn.YarnTaskManager;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.concurrent.duration.FiniteDuration;

public class YarnApplicationMasterRunner {
    protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
    private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5L, TimeUnit.MINUTES);
    private static final Map<String, String> ENV = System.getenv();
    private static final int INIT_ERROR_EXIT_CODE = 31;
    private static final int ACTOR_DIED_EXIT_CODE = 32;

    public static void main(String[] args) {
        EnvironmentInformation.logEnvironmentInfo((Logger)LOG, (String)"YARN ApplicationMaster / ResourceManager / JobManager", (String[])args);
        SignalHandler.register((Logger)LOG);
        JvmShutdownSafeguard.installAsShutdownHook((Logger)LOG);
        int returnCode = new YarnApplicationMasterRunner().run(args);
        System.exit(returnCode);
    }

    protected int run(String[] args) {
        try {
            LOG.debug("All environment variables: {}", ENV);
            String yarnClientUsername = ENV.get("HADOOP_USER_NAME");
            Utils.require(yarnClientUsername != null, "YARN client user name environment variable (%s) not set", "HADOOP_USER_NAME");
            String currDir = ENV.get(ApplicationConstants.Environment.PWD.key());
            Utils.require(currDir != null, "Current working directory variable (%s) not set", ApplicationConstants.Environment.PWD.key());
            LOG.debug("Current working Directory: {}", (Object)currDir);
            String remoteKeytabPrincipal = ENV.get("_KEYTAB_PRINCIPAL");
            LOG.info("remoteKeytabPrincipal obtained {}", (Object)remoteKeytabPrincipal);
            UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
            LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}", (Object)currentUser.getShortUserName(), (Object)yarnClientUsername);
            Map<String, String> dynamicProperties = FlinkYarnSessionCli.getDynamicProperties(ENV.get("_DYNAMIC_PROPERTIES"));
            LOG.debug("YARN dynamic properties: {}", dynamicProperties);
            final Configuration flinkConfig = YarnApplicationMasterRunner.createConfiguration(currDir, dynamicProperties, LOG);
            File f = new File(currDir, "krb5.keytab");
            if (remoteKeytabPrincipal != null && f.exists()) {
                String keytabPath = f.getAbsolutePath();
                LOG.debug("keytabPath: {}", (Object)keytabPath);
                flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, f.getAbsolutePath());
                flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
            }
            SecurityConfiguration sc = new SecurityConfiguration(flinkConfig);
            SecurityUtils.install((SecurityConfiguration)sc);
            return (Integer)SecurityUtils.getInstalledContext().runSecured((Callable)new Callable<Integer>(){

                @Override
                public Integer call() {
                    return YarnApplicationMasterRunner.this.runApplicationMaster(flinkConfig);
                }
            });
        }
        catch (Throwable t) {
            LOG.error("YARN Application Master initialization failed", t);
            return 31;
        }
    }

    protected int runApplicationMaster(Configuration config) {
        ActorSystem actorSystem = null;
        WebMonitor webMonitor = null;
        HighAvailabilityServices highAvailabilityServices = null;
        MetricRegistryImpl metricRegistry = null;
        int numberProcessors = Hardware.getNumberCPUCores();
        ScheduledExecutorService futureExecutor = Executors.newScheduledThreadPool(numberProcessors, (ThreadFactory)new ExecutorThreadFactory("yarn-jobmanager-future"));
        ExecutorService ioExecutor = Executors.newFixedThreadPool(numberProcessors, (ThreadFactory)new ExecutorThreadFactory("yarn-jobmanager-io"));
        try {
            int slotsPerTaskManager;
            int numInitialTaskManagers;
            int taskManagerContainerMemory;
            String currDir = ENV.get(ApplicationConstants.Environment.PWD.key());
            Utils.require(currDir != null, "Current working directory variable (%s) not set", ApplicationConstants.Environment.PWD.key());
            String appMasterHostname = ENV.get(ApplicationConstants.Environment.NM_HOST.key());
            Utils.require(appMasterHostname != null, "ApplicationMaster hostname variable %s not set", ApplicationConstants.Environment.NM_HOST.key());
            LOG.info("YARN assigned hostname for application master: {}", (Object)appMasterHostname);
            String remoteKeytabPrincipal = ENV.get("_KEYTAB_PRINCIPAL");
            File f = new File(currDir, "krb5.keytab");
            if (remoteKeytabPrincipal != null && f.exists()) {
                String keytabPath = f.getAbsolutePath();
                LOG.debug("keytabPath: {}", (Object)keytabPath);
                config.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
                config.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
            }
            YarnConfiguration yarnConfig = new YarnConfiguration();
            try {
                taskManagerContainerMemory = Integer.parseInt(ENV.get("_CLIENT_TM_MEMORY"));
            }
            catch (NumberFormatException e) {
                throw new RuntimeException("Invalid value for _CLIENT_TM_MEMORY : " + e.getMessage());
            }
            try {
                numInitialTaskManagers = Integer.parseInt(ENV.get("_CLIENT_TM_COUNT"));
            }
            catch (NumberFormatException e) {
                throw new RuntimeException("Invalid value for _CLIENT_TM_COUNT : " + e.getMessage());
            }
            try {
                slotsPerTaskManager = Integer.parseInt(ENV.get("_SLOTS"));
            }
            catch (NumberFormatException e) {
                throw new RuntimeException("Invalid value for _SLOTS : " + e.getMessage());
            }
            ContaineredTaskManagerParameters taskManagerParameters = ContaineredTaskManagerParameters.create((Configuration)config, (long)taskManagerContainerMemory, (int)slotsPerTaskManager);
            LOG.info("TaskManagers will be created with {} task slots", (Object)taskManagerParameters.numSlots());
            LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, JVM direct memory limit {} MB", new Object[]{taskManagerParameters.taskManagerTotalMemoryMB(), taskManagerParameters.taskManagerHeapSizeMB(), taskManagerParameters.taskManagerDirectMemoryLimitMB()});
            String amPortRange = config.getString(YarnConfigOptions.APPLICATION_MASTER_PORT);
            actorSystem = BootstrapTools.startActorSystem((Configuration)config, (String)appMasterHostname, (String)amPortRange, (Logger)LOG);
            String akkaHostname = (String)AkkaUtils.getAddress((ActorSystem)actorSystem).host().get();
            int akkaPort = (Integer)AkkaUtils.getAddress((ActorSystem)actorSystem).port().get();
            LOG.info("Actor system bound to hostname {}.", (Object)akkaHostname);
            Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration((Configuration)config, (String)akkaHostname, (int)akkaPort, (int)slotsPerTaskManager, (FiniteDuration)TASKMANAGER_REGISTRATION_TIMEOUT);
            LOG.debug("TaskManager configuration: {}", (Object)taskManagerConfig);
            ContainerLaunchContext taskManagerContext = Utils.createTaskExecutorContext(config, yarnConfig, ENV, taskManagerParameters, taskManagerConfig, currDir, this.getTaskManagerClass(), LOG);
            config.setString(JobManagerOptions.ADDRESS, akkaHostname);
            config.setInteger(JobManagerOptions.PORT, akkaPort);
            highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices((Configuration)config, (Executor)ioExecutor, (HighAvailabilityServicesUtils.AddressResolution)HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
            LOG.debug("Starting Web Frontend");
            Time webMonitorTimeout = Time.milliseconds((long)config.getLong(WebOptions.TIMEOUT));
            webMonitor = BootstrapTools.startWebMonitorIfConfigured((Configuration)config, (HighAvailabilityServices)highAvailabilityServices, (LeaderGatewayRetriever)new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds((long)50L)), (MetricQueryServiceRetriever)new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout), (Time)webMonitorTimeout, (ScheduledExecutor)new ScheduledExecutorServiceAdapter(futureExecutor), (Logger)LOG);
            metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration((Configuration)config));
            metricRegistry.startQueryService(actorSystem, null);
            LOG.debug("Starting JobManager actor");
            ActorRef jobManager = (ActorRef)JobManager.startJobManagerActors((Configuration)config, (ActorSystem)actorSystem, (ScheduledExecutorService)futureExecutor, (Executor)ioExecutor, (HighAvailabilityServices)highAvailabilityServices, (MetricRegistry)metricRegistry, (Option)(webMonitor == null ? Option.empty() : Option.apply((Object)webMonitor.getRestAddress())), (Option)new Some((Object)"jobmanager"), (Option)Option.empty(), this.getJobManagerClass(), this.getArchivistClass())._1();
            String webMonitorURL = webMonitor == null ? null : webMonitor.getRestAddress();
            LOG.debug("Starting YARN Flink Resource Manager");
            Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(this.getResourceManagerClass(), config, yarnConfig, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), appMasterHostname, webMonitorURL, taskManagerParameters, taskManagerContext, numInitialTaskManagers, LOG);
            ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);
            LOG.debug("Starting process reapers for JobManager and YARN Application Master");
            actorSystem.actorOf(Props.create(ProcessReaper.class, (Object[])new Object[]{resourceMaster, LOG, 32}), "YARN_Resource_Master_Process_Reaper");
            actorSystem.actorOf(Props.create(ProcessReaper.class, (Object[])new Object[]{jobManager, LOG, 32}), "JobManager_Process_Reaper");
        }
        catch (Throwable t) {
            LOG.error("YARN Application Master initialization failed", t);
            if (webMonitor != null) {
                try {
                    webMonitor.stop();
                }
                catch (Throwable ignored) {
                    LOG.warn("Failed to stop the web frontend", t);
                }
            }
            if (actorSystem != null) {
                try {
                    actorSystem.shutdown();
                }
                catch (Throwable tt) {
                    LOG.error("Error shutting down actor system", tt);
                }
            }
            futureExecutor.shutdownNow();
            ioExecutor.shutdownNow();
            return 31;
        }
        LOG.info("YARN Application Master started");
        actorSystem.awaitTermination();
        if (webMonitor != null) {
            try {
                webMonitor.stop();
            }
            catch (Throwable t) {
                LOG.error("Failed to stop the web frontend", t);
            }
        }
        if (highAvailabilityServices != null) {
            try {
                highAvailabilityServices.close();
            }
            catch (Throwable t) {
                LOG.error("Failed to stop the high availability services.", t);
            }
        }
        if (metricRegistry != null) {
            try {
                metricRegistry.shutdown().get();
            }
            catch (Throwable t) {
                LOG.error("Could not properly shut down the metric registry.", t);
            }
        }
        ExecutorUtils.gracefulShutdown((long)AkkaUtils.getTimeout((Configuration)config).toMillis(), (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{futureExecutor, ioExecutor});
        return 0;
    }

    protected Class<? extends YarnFlinkResourceManager> getResourceManagerClass() {
        return YarnFlinkResourceManager.class;
    }

    protected Class<? extends JobManager> getJobManagerClass() {
        return YarnJobManager.class;
    }

    protected Class<? extends MemoryArchivist> getArchivistClass() {
        return MemoryArchivist.class;
    }

    protected Class<? extends TaskManager> getTaskManagerClass() {
        return YarnTaskManager.class;
    }

    private static Configuration createConfiguration(String baseDirectory, Map<String, String> additional, Logger log) {
        LOG.info("Loading config from directory " + baseDirectory);
        Configuration configuration = GlobalConfiguration.loadConfiguration((String)baseDirectory);
        for (Map.Entry<String, String> property : additional.entrySet()) {
            configuration.setString(property.getKey(), property.getValue());
        }
        String cliZKNamespace = ENV.get("_ZOOKEEPER_NAMESPACE");
        if (cliZKNamespace != null && !cliZKNamespace.isEmpty()) {
            configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, cliZKNamespace);
        }
        if (configuration.getInteger(WebOptions.PORT, 0) >= 0) {
            configuration.setInteger(WebOptions.PORT, 0);
        }
        BootstrapTools.substituteDeprecatedConfigPrefix((Configuration)configuration, (String)"yarn.application-master.env.", (String)"containerized.master.env.");
        BootstrapTools.substituteDeprecatedConfigPrefix((Configuration)configuration, (String)"yarn.taskmanager.env.", (String)"containerized.taskmanager.env.");
        String localDirs = ENV.get(ApplicationConstants.Environment.LOCAL_DIRS.key());
        BootstrapTools.updateTmpDirectoriesInConfiguration((Configuration)configuration, (String)localDirs);
        return configuration;
    }
}

