/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.yarn;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatus;
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.ApplicationClient;
import org.apache.flink.yarn.YarnMessages;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class YarnClusterClient
extends ClusterClient<ApplicationId> {
    private static final Logger LOG = LoggerFactory.getLogger(YarnClusterClient.class);
    private final AbstractYarnClusterDescriptor clusterDescriptor;
    private final int numberTaskManagers;
    private final int slotsPerTaskManager;
    private final LazApplicationClientLoader applicationClient;
    private final FiniteDuration akkaDuration;
    private final ApplicationId appId;
    private final String trackingURL;
    private final boolean newlyCreatedCluster;

    public YarnClusterClient(AbstractYarnClusterDescriptor clusterDescriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport appReport, Configuration flinkConfig, boolean newlyCreatedCluster) throws Exception {
        super(flinkConfig);
        this.akkaDuration = AkkaUtils.getTimeout((Configuration)flinkConfig);
        this.clusterDescriptor = clusterDescriptor;
        this.numberTaskManagers = numberTaskManagers;
        this.slotsPerTaskManager = slotsPerTaskManager;
        this.appId = appReport.getApplicationId();
        this.trackingURL = appReport.getTrackingUrl();
        this.newlyCreatedCluster = newlyCreatedCluster;
        this.applicationClient = new LazApplicationClientLoader(flinkConfig, this.actorSystemLoader, this.highAvailabilityServices);
    }

    private void stopAfterJob(JobID jobID) {
        Preconditions.checkNotNull((Object)jobID, (String)"The job id must not be null");
        try {
            Future replyFuture = this.getJobManagerGateway().ask((Object)new ShutdownClusterAfterJob(jobID), this.akkaDuration);
            Await.ready((Awaitable)replyFuture, (Duration)this.akkaDuration);
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to tell application master to stop once the specified job has been finised", e);
        }
    }

    public Configuration getFlinkConfiguration() {
        return this.flinkConfig;
    }

    public int getMaxSlots() {
        int maxSlots = this.numberTaskManagers * this.slotsPerTaskManager;
        return maxSlots > 0 ? maxSlots : -1;
    }

    public boolean hasUserJarsInClassPath(List<URL> userJarFiles) {
        return this.clusterDescriptor.hasUserJarFiles(userJarFiles);
    }

    public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        if (this.isDetached()) {
            if (this.newlyCreatedCluster) {
                this.stopAfterJob(jobGraph.getJobID());
            }
            return super.runDetached(jobGraph, classLoader);
        }
        return super.run(jobGraph, classLoader);
    }

    public String getWebInterfaceURL() {
        if (!this.trackingURL.startsWith("http://")) {
            return "http://" + this.trackingURL;
        }
        return this.trackingURL;
    }

    public GetClusterStatusResponse getClusterStatus() {
        try {
            Future clusterStatusOption = this.getJobManagerGateway().ask((Object)GetClusterStatus.getInstance(), this.akkaDuration);
            return (GetClusterStatusResponse)Await.result((Awaitable)clusterStatusOption, (Duration)this.akkaDuration);
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to get ClusterClient status from Application Client", e);
        }
    }

    public List<String> getNewMessages() {
        ArrayList<String> ret = new ArrayList<String>();
        while (true) {
            Object result;
            try {
                Future response = Patterns.ask((ActorRef)this.applicationClient.get(), (Object)YarnMessages.getLocalGetYarnMessage(), (Timeout)new Timeout(this.akkaDuration));
                result = Await.result((Awaitable)response, (Duration)this.akkaDuration);
            }
            catch (Exception ioe) {
                LOG.warn("Error retrieving the YARN messages locally", (Throwable)ioe);
                break;
            }
            if (!(result instanceof Option)) {
                throw new RuntimeException("LocalGetYarnMessage requires a response of type Option. Instead the response is of type " + result.getClass() + ".");
            }
            Option messageOption = (Option)result;
            LOG.debug("Received message option {}", (Object)messageOption);
            if (messageOption.isEmpty()) break;
            Object obj = messageOption.get();
            if (obj instanceof InfoMessage) {
                InfoMessage msg = (InfoMessage)obj;
                ret.add("[" + msg.date() + "] " + msg.message());
                continue;
            }
            LOG.warn("LocalGetYarnMessage returned unexpected type: " + messageOption);
        }
        return ret;
    }

    public ApplicationId getClusterId() {
        return this.appId;
    }

    public boolean isDetached() {
        return super.isDetached() || this.clusterDescriptor.isDetachedMode();
    }

    public void waitForClusterToBeReady() {
        this.logAndSysout("Waiting until all TaskManagers have connected");
        GetClusterStatusResponse lastStatus = null;
        while (true) {
            GetClusterStatusResponse currentStatus;
            if ((currentStatus = this.getClusterStatus()) != null && !currentStatus.equals(lastStatus)) {
                this.logAndSysout("TaskManager status (" + currentStatus.numRegisteredTaskManagers() + "/" + this.numberTaskManagers + ")");
                if (currentStatus.numRegisteredTaskManagers() >= this.numberTaskManagers) {
                    break;
                }
            } else if (lastStatus == null) {
                this.logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
            }
            try {
                Thread.sleep(250L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while waiting for TaskManagers", e);
            }
            lastStatus = currentStatus;
        }
        this.logAndSysout("All TaskManagers are connected");
    }

    public void shutDownCluster() {
        LOG.info("Sending shutdown request to the Application Master");
        try {
            Future response = Patterns.ask((ActorRef)this.applicationClient.get(), (Object)new YarnMessages.LocalStopYarnSession(ApplicationStatus.SUCCEEDED, "Flink YARN Client requested shutdown"), (Timeout)new Timeout(this.akkaDuration));
            Await.ready((Awaitable)response, (Duration)this.akkaDuration);
        }
        catch (Exception e) {
            LOG.warn("Error while stopping YARN cluster.", (Throwable)e);
        }
    }

    public ApplicationId getApplicationId() {
        return this.appId;
    }

    private static class LazApplicationClientLoader {
        private final Configuration flinkConfig;
        private final ClusterClient.LazyActorSystemLoader actorSystemLoader;
        private final HighAvailabilityServices highAvailabilityServices;
        private ActorRef applicationClient;

        private LazApplicationClientLoader(Configuration flinkConfig, ClusterClient.LazyActorSystemLoader actorSystemLoader, HighAvailabilityServices highAvailabilityServices) {
            this.flinkConfig = (Configuration)Preconditions.checkNotNull((Object)flinkConfig, (String)"flinkConfig");
            this.actorSystemLoader = (ClusterClient.LazyActorSystemLoader)Preconditions.checkNotNull((Object)actorSystemLoader, (String)"actorSystemLoader");
            this.highAvailabilityServices = (HighAvailabilityServices)Preconditions.checkNotNull((Object)highAvailabilityServices, (String)"highAvailabilityServices");
        }

        public ActorRef get() throws FlinkException {
            if (this.applicationClient == null) {
                ActorSystem actorSystem;
                LOG.info("Start application client.");
                try {
                    actorSystem = this.actorSystemLoader.get();
                }
                catch (FlinkException fle) {
                    throw new FlinkException("Could not start the ClusterClient's ActorSystem.", (Throwable)fle);
                }
                try {
                    this.applicationClient = actorSystem.actorOf(Props.create(ApplicationClient.class, (Object[])new Object[]{this.flinkConfig, this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID)}), "applicationClient");
                }
                catch (Exception e) {
                    throw new FlinkException("Could not start the ApplicationClient.", (Throwable)e);
                }
            }
            return this.applicationClient;
        }
    }
}

