/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.RegisterApplicationMasterResponseReflector;
import org.apache.flink.yarn.RegisteredYarnWorkerNode;
import org.apache.flink.yarn.YarnContainerInLaunch;
import org.apache.flink.yarn.YarnResourceManagerCallbackHandler;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.messages.ContainersAllocated;
import org.apache.flink.yarn.messages.ContainersComplete;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import scala.Option;

public class YarnFlinkResourceManager
extends FlinkResourceManager<RegisteredYarnWorkerNode> {
    private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
    private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
    static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
    private final Map<ResourceID, YarnContainerInLaunch> containersInLaunch;
    private final Map<ContainerId, Container> containersBeingReturned;
    private final YarnConfiguration yarnConfig;
    private final ContaineredTaskManagerParameters taskManagerParameters;
    private final ContainerLaunchContext taskManagerLaunchContext;
    private final String applicationMasterHostName;
    private final String webInterfaceURL;
    private final int yarnHeartbeatIntervalMillis;
    private final int maxFailedContainers;
    private YarnResourceManagerCallbackHandler resourceManagerCallbackHandler;
    private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
    private NMClient nodeManagerClient;
    private int numPendingContainerRequests;
    private int failedContainersSoFar;
    private RegisterApplicationMasterResponseReflector applicationMasterResponseReflector;

    public YarnFlinkResourceManager(Configuration flinkConfig, YarnConfiguration yarnConfig, LeaderRetrievalService leaderRetrievalService, String applicationMasterHostName, String webInterfaceURL, ContaineredTaskManagerParameters taskManagerParameters, ContainerLaunchContext taskManagerLaunchContext, int yarnHeartbeatIntervalMillis, int maxFailedContainers, int numInitialTaskManagers) {
        this(flinkConfig, yarnConfig, leaderRetrievalService, applicationMasterHostName, webInterfaceURL, taskManagerParameters, taskManagerLaunchContext, yarnHeartbeatIntervalMillis, maxFailedContainers, numInitialTaskManagers, new YarnResourceManagerCallbackHandler());
    }

    public YarnFlinkResourceManager(Configuration flinkConfig, YarnConfiguration yarnConfig, LeaderRetrievalService leaderRetrievalService, String applicationMasterHostName, String webInterfaceURL, ContaineredTaskManagerParameters taskManagerParameters, ContainerLaunchContext taskManagerLaunchContext, int yarnHeartbeatIntervalMillis, int maxFailedContainers, int numInitialTaskManagers, YarnResourceManagerCallbackHandler callbackHandler) {
        this(flinkConfig, yarnConfig, leaderRetrievalService, applicationMasterHostName, webInterfaceURL, taskManagerParameters, taskManagerLaunchContext, yarnHeartbeatIntervalMillis, maxFailedContainers, numInitialTaskManagers, callbackHandler, (AMRMClientAsync<AMRMClient.ContainerRequest>)AMRMClientAsync.createAMRMClientAsync((int)yarnHeartbeatIntervalMillis, (AMRMClientAsync.CallbackHandler)callbackHandler), NMClient.createNMClient());
    }

    public YarnFlinkResourceManager(Configuration flinkConfig, YarnConfiguration yarnConfig, LeaderRetrievalService leaderRetrievalService, String applicationMasterHostName, String webInterfaceURL, ContaineredTaskManagerParameters taskManagerParameters, ContainerLaunchContext taskManagerLaunchContext, int yarnHeartbeatIntervalMillis, int maxFailedContainers, int numInitialTaskManagers, YarnResourceManagerCallbackHandler callbackHandler, AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient, NMClient nodeManagerClient) {
        super(numInitialTaskManagers, flinkConfig, leaderRetrievalService);
        this.applicationMasterResponseReflector = new RegisterApplicationMasterResponseReflector(this.LOG);
        this.yarnConfig = Objects.requireNonNull(yarnConfig);
        this.taskManagerParameters = Objects.requireNonNull(taskManagerParameters);
        this.taskManagerLaunchContext = Objects.requireNonNull(taskManagerLaunchContext);
        this.applicationMasterHostName = Objects.requireNonNull(applicationMasterHostName);
        this.webInterfaceURL = webInterfaceURL;
        this.yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMillis;
        this.maxFailedContainers = maxFailedContainers;
        this.resourceManagerCallbackHandler = (YarnResourceManagerCallbackHandler)Preconditions.checkNotNull((Object)callbackHandler);
        this.resourceManagerClient = (AMRMClientAsync)Preconditions.checkNotNull(resourceManagerClient);
        this.nodeManagerClient = (NMClient)Preconditions.checkNotNull((Object)nodeManagerClient);
        this.containersInLaunch = new HashMap<ResourceID, YarnContainerInLaunch>();
        this.containersBeingReturned = new HashMap<ContainerId, Container>();
    }

    protected void handleMessage(Object message) {
        if (message instanceof ContainersAllocated) {
            this.containersAllocated(((ContainersAllocated)message).containers());
        } else if (message instanceof ContainersComplete) {
            this.containersComplete(((ContainersComplete)message).containers());
        } else {
            super.handleMessage(message);
        }
    }

    protected void initialize() throws Exception {
        this.LOG.info("Initializing YARN resource master");
        this.resourceManagerCallbackHandler.initialize(this.self());
        this.resourceManagerClient.init((org.apache.hadoop.conf.Configuration)this.yarnConfig);
        this.resourceManagerClient.start();
        this.nodeManagerClient.init((org.apache.hadoop.conf.Configuration)this.yarnConfig);
        this.nodeManagerClient.start();
        this.nodeManagerClient.cleanupRunningContainersOnStop(true);
        this.LOG.info("Registering Application Master with tracking url {}", (Object)this.webInterfaceURL);
        Option portOption = AkkaUtils.getAddress((ActorSystem)this.getContext().system()).port();
        int actorSystemPort = portOption.isDefined() ? (Integer)portOption.get() : -1;
        RegisterApplicationMasterResponse response = this.resourceManagerClient.registerApplicationMaster(this.applicationMasterHostName, actorSystemPort, this.webInterfaceURL);
        List<Container> containersFromPreviousAttempts = this.applicationMasterResponseReflector.getContainersFromPreviousAttempts(response);
        if (!containersFromPreviousAttempts.isEmpty()) {
            this.LOG.info("Retrieved {} TaskManagers from previous attempt", (Object)containersFromPreviousAttempts.size());
            long now = System.currentTimeMillis();
            for (Container c : containersFromPreviousAttempts) {
                YarnContainerInLaunch containerInLaunch = new YarnContainerInLaunch(c, now);
                this.containersInLaunch.put(containerInLaunch.getResourceID(), containerInLaunch);
            }
            this.updateProgress();
        }
    }

    protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) {
        FinalApplicationStatus yarnStatus = this.getYarnStatus(finalStatus);
        this.LOG.info("Unregistering application from the YARN Resource Manager");
        try {
            this.resourceManagerClient.unregisterApplicationMaster(yarnStatus, optionalDiagnostics, "");
        }
        catch (Throwable t) {
            this.LOG.error("Could not unregister the application master.", t);
        }
        try {
            this.resourceManagerClient.stop();
        }
        catch (Throwable t) {
            this.LOG.error("Could not cleanly shut down the Asynchronous Resource Manager Client", t);
        }
        try {
            this.nodeManagerClient.stop();
        }
        catch (Throwable t) {
            this.LOG.error("Could not cleanly shut down the Node Manager Client", t);
        }
        this.getContext().system().stop(this.getSelf());
    }

    protected void fatalError(String message, Throwable error) {
        this.LOG.error("FATAL ERROR IN YARN APPLICATION MASTER: " + message, error);
        this.LOG.error("Shutting down process");
        System.exit(-13);
    }

    protected void requestNewWorkers(int numWorkers) {
        int containerMemorySizeMB;
        long mem = this.taskManagerParameters.taskManagerTotalMemoryMB();
        if (mem <= Integer.MAX_VALUE) {
            containerMemorySizeMB = (int)mem;
        } else {
            containerMemorySizeMB = Integer.MAX_VALUE;
            this.LOG.error("Decreasing container size from {} MB to {} MB (integer value overflow)", (Object)mem, (Object)containerMemorySizeMB);
        }
        for (int i = 0; i < numWorkers; ++i) {
            ++this.numPendingContainerRequests;
            this.LOG.info("Requesting new TaskManager container with {} megabytes memory. Pending requests: {}", (Object)containerMemorySizeMB, (Object)this.numPendingContainerRequests);
            Priority priority = Priority.newInstance((int)0);
            int taskManagerSlots = this.taskManagerParameters.numSlots();
            int vcores = this.config.getInteger(YarnConfigOptions.VCORES, Math.max(taskManagerSlots, 1));
            Resource capability = Resource.newInstance((int)containerMemorySizeMB, (int)vcores);
            this.resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(capability, null, null, priority));
        }
        this.resourceManagerClient.setHeartbeatInterval(500);
    }

    protected void releasePendingWorker(ResourceID id) {
        YarnContainerInLaunch container = this.containersInLaunch.remove(id);
        if (container != null) {
            this.releaseYarnContainer(container.container());
        } else {
            this.LOG.error("Cannot find container {} to release. Ignoring request.", (Object)id);
        }
    }

    protected void releaseStartedWorker(RegisteredYarnWorkerNode worker) {
        this.releaseYarnContainer(worker.yarnContainer());
    }

    private void releaseYarnContainer(Container container) {
        this.LOG.info("Releasing YARN container {}", (Object)container.getId());
        this.containersBeingReturned.put(container.getId(), container);
        try {
            this.nodeManagerClient.stopContainer(container.getId(), container.getNodeId());
        }
        catch (Throwable t) {
            this.LOG.error("Error while calling YARN Node Manager to release container", t);
        }
        this.resourceManagerClient.releaseAssignedContainer(container.getId());
    }

    protected RegisteredYarnWorkerNode workerStarted(ResourceID resourceID) {
        YarnContainerInLaunch inLaunch = this.containersInLaunch.remove(resourceID);
        if (inLaunch == null) {
            return null;
        }
        return new RegisteredYarnWorkerNode(inLaunch.container());
    }

    protected Collection<RegisteredYarnWorkerNode> reacceptRegisteredWorkers(Collection<ResourceID> toConsolidate) {
        ArrayList<RegisteredYarnWorkerNode> accepted = new ArrayList<RegisteredYarnWorkerNode>();
        for (ResourceID resourceID : toConsolidate) {
            YarnContainerInLaunch yci = this.containersInLaunch.remove(resourceID);
            if (yci != null) {
                this.LOG.info("YARN container consolidation recognizes Resource {} ", (Object)resourceID);
                accepted.add(new RegisteredYarnWorkerNode(yci.container()));
                continue;
            }
            if (this.isStarted(resourceID)) {
                this.LOG.info("TaskManager {} has already been registered at the resource manager.", (Object)resourceID);
                continue;
            }
            this.LOG.info("YARN container consolidation does not recognize TaskManager {}", (Object)resourceID);
        }
        return accepted;
    }

    protected int getNumWorkerRequestsPending() {
        return this.numPendingContainerRequests;
    }

    protected int getNumWorkersPendingRegistration() {
        return this.containersInLaunch.size();
    }

    private void containersAllocated(List<Container> containers) {
        int numRequired = this.getDesignatedWorkerPoolSize();
        int numRegistered = this.getNumberOfStartedTaskManagers();
        for (Container container : containers) {
            this.numPendingContainerRequests = Math.max(0, this.numPendingContainerRequests - 1);
            this.LOG.info("Received new container: {} - Remaining pending container requests: {}", (Object)container.getId(), (Object)this.numPendingContainerRequests);
            if (numRegistered + this.containersInLaunch.size() < numRequired) {
                YarnContainerInLaunch containerInLaunch = new YarnContainerInLaunch(container);
                ResourceID resourceID = containerInLaunch.getResourceID();
                this.containersInLaunch.put(resourceID, containerInLaunch);
                String message = "Launching TaskManager in container " + containerInLaunch + " on host " + container.getNodeId().getHost();
                this.LOG.info(message);
                this.sendInfoMessage(message);
                try {
                    this.taskManagerLaunchContext.getEnvironment().put(ENV_FLINK_CONTAINER_ID, resourceID.getResourceIdString());
                    this.nodeManagerClient.startContainer(container, this.taskManagerLaunchContext);
                }
                catch (Throwable t) {
                    this.containersInLaunch.remove(resourceID);
                    this.LOG.error("Could not start TaskManager in container " + containerInLaunch, t);
                    this.containersBeingReturned.put(container.getId(), container);
                    this.resourceManagerClient.releaseAssignedContainer(container.getId());
                }
                continue;
            }
            this.LOG.info("Returning excess container {}", (Object)container.getId());
            this.containersBeingReturned.put(container.getId(), container);
            this.resourceManagerClient.releaseAssignedContainer(container.getId());
        }
        this.updateProgress();
        if (this.numPendingContainerRequests <= 0) {
            this.resourceManagerClient.setHeartbeatInterval(this.yarnHeartbeatIntervalMillis);
        }
        this.triggerCheckWorkers();
    }

    private void containersComplete(List<ContainerStatus> containers) {
        for (ContainerStatus status : containers) {
            String exitStatus;
            ResourceID id = new ResourceID(status.getContainerId().toString());
            if (this.containersBeingReturned.remove(status.getContainerId()) != null) {
                this.LOG.info("Container {} completed successfully with diagnostics: {}", (Object)id, (Object)status.getDiagnostics());
                continue;
            }
            switch (status.getExitStatus()) {
                case -103: {
                    exitStatus = "Vmem limit exceeded (-103)";
                    break;
                }
                case -104: {
                    exitStatus = "Pmem limit exceeded (-104)";
                    break;
                }
                default: {
                    exitStatus = String.valueOf(status.getExitStatus());
                }
            }
            YarnContainerInLaunch launched = this.containersInLaunch.remove(id);
            if (launched != null) {
                this.LOG.info("Container {} failed, with a TaskManager in launch or registration. Exit status: {}", (Object)id, (Object)exitStatus);
            } else {
                this.LOG.info("Container {} failed. Exit status: {}", (Object)id, (Object)exitStatus);
                this.notifyWorkerFailed(id, "Container " + id + " failed. Exit status: {}" + exitStatus);
            }
            ++this.failedContainersSoFar;
            String diagMessage = String.format("Diagnostics for container %s in state %s : exitStatus=%s diagnostics=%s", id, status.getState(), exitStatus, status.getDiagnostics());
            this.sendInfoMessage(diagMessage);
            this.LOG.info(diagMessage);
            this.LOG.info("Total number of failed containers so far: " + this.failedContainersSoFar);
            if (this.maxFailedContainers < 0 || this.failedContainersSoFar <= this.maxFailedContainers) continue;
            String msg = "Stopping YARN session because the number of failed containers (" + this.failedContainersSoFar + ") exceeded the maximum failed containers (" + this.maxFailedContainers + "). This number is controlled by the '" + YarnConfigOptions.MAX_FAILED_CONTAINERS.key() + "' configuration setting. By default its the number of requested containers.";
            this.LOG.error(msg);
            this.self().tell(this.decorateMessage(new StopCluster(ApplicationStatus.FAILED, msg)), ActorRef.noSender());
            return;
        }
        this.updateProgress();
        this.triggerCheckWorkers();
    }

    static ResourceID extractResourceID(Container container) {
        return new ResourceID(container.getId().toString());
    }

    private void updateProgress() {
        float progress;
        int required = this.getDesignatedWorkerPoolSize();
        int available = this.getNumberOfStartedTaskManagers() + this.containersInLaunch.size();
        float f = progress = required <= 0 ? 1.0f : (float)available / (float)required;
        if (this.resourceManagerCallbackHandler != null) {
            this.resourceManagerCallbackHandler.setCurrentProgress(progress);
        }
    }

    private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
        if (status == null) {
            return FinalApplicationStatus.UNDEFINED;
        }
        switch (status) {
            case SUCCEEDED: {
                return FinalApplicationStatus.SUCCEEDED;
            }
            case FAILED: {
                return FinalApplicationStatus.FAILED;
            }
            case CANCELED: {
                return FinalApplicationStatus.KILLED;
            }
        }
        return FinalApplicationStatus.UNDEFINED;
    }

    public static Props createActorProps(Class<? extends YarnFlinkResourceManager> actorClass, Configuration flinkConfig, YarnConfiguration yarnConfig, LeaderRetrievalService leaderRetrievalService, String applicationMasterHostName, String webFrontendURL, ContaineredTaskManagerParameters taskManagerParameters, ContainerLaunchContext taskManagerLaunchContext, int numInitialTaskManagers, Logger log) {
        int maxFailedContainers;
        long yarnExpiryIntervalMS;
        int yarnHeartbeatIntervalMS = flinkConfig.getInteger(YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
        if ((long)yarnHeartbeatIntervalMS >= (yarnExpiryIntervalMS = yarnConfig.getLong("yarn.am.liveness-monitor.expiry-interval-ms", 600000L))) {
            log.warn("The heartbeat interval of the Flink Application master ({}) is greater than YARN's expiry interval ({}). The application is likely to be killed by YARN.", (Object)yarnHeartbeatIntervalMS, (Object)yarnExpiryIntervalMS);
        }
        if ((maxFailedContainers = flinkConfig.getInteger(YarnConfigOptions.MAX_FAILED_CONTAINERS.key(), numInitialTaskManagers)) >= 0) {
            log.info("YARN application tolerates {} failed TaskManager containers before giving up", (Object)maxFailedContainers);
        }
        return Props.create(actorClass, (Object[])new Object[]{flinkConfig, yarnConfig, leaderRetrievalService, applicationMasterHostName, webFrontendURL, taskManagerParameters, taskManagerLaunchContext, yarnHeartbeatIntervalMS, maxFailedContainers, numInitialTaskManagers});
    }
}

