package org.apache.storm.nimbus;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.storm.Constants;
import org.apache.storm.DaemonConfig;
import org.apache.storm.daemon.supervisor.Supervisor;
import org.apache.storm.generated.SupervisorAssignments;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.INodeAssignmentSentCallBack;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.SupervisorClient;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/data/StormApp.jar:org/apache/storm/nimbus/AssignmentDistributionService.class */
public class AssignmentDistributionService implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AssignmentDistributionService.class);
    private ExecutorService service;
    private Random random;
    private volatile Map<Integer, LinkedBlockingQueue<NodeAssignments>> assignmentsQueue;
    private Map<String, Supervisor> localSupervisors;
    private Map conf;
    private INodeAssignmentSentCallBack sendAssignmentCallback;
    private volatile boolean active = false;
    private int threadsNum = 0;
    private int queueSize = 0;
    private boolean isLocalMode = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/classes/data/StormApp.jar:org/apache/storm/nimbus/AssignmentDistributionService$DistributeTask.class */
    public static class DistributeTask implements Runnable {
        private AssignmentDistributionService service;
        private Integer queueIndex;

        DistributeTask(AssignmentDistributionService assignmentDistributionService, Integer num) {
            this.service = assignmentDistributionService;
            this.queueIndex = num;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.service.isActive()) {
                try {
                    sendAssignmentsToNode(this.service.nextAssignments(this.queueIndex));
                } catch (InterruptedException e) {
                    if (this.service.isActive()) {
                        AssignmentDistributionService.LOG.error("Get an unexpected interrupt when distributing assignments to node, {}", e.getCause());
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }

        private void sendAssignmentsToNode(NodeAssignments nodeAssignments) {
            if (this.service.isLocalMode) {
                Supervisor supervisor = (Supervisor) this.service.localSupervisors.get(nodeAssignments.getNode());
                if (supervisor != null) {
                    supervisor.sendSupervisorAssignments(nodeAssignments.getAssignments());
                    this.service.sendAssignmentCallback.nodeAssignmentSent(nodeAssignments.getNode(), true);
                    return;
                } else {
                    AssignmentDistributionService.LOG.error("Can not find node {} for assignments distribution", nodeAssignments.getNode());
                    this.service.sendAssignmentCallback.nodeAssignmentSent(nodeAssignments.getNode(), false);
                    throw new RuntimeException("null for node " + nodeAssignments.getNode() + " supervisor instance.");
                }
            }
            try {
                try {
                    SupervisorClient configuredClient = SupervisorClient.getConfiguredClient(this.service.getConf(), nodeAssignments.getHost(), nodeAssignments.getServerPort().intValue());
                    Throwable th = null;
                    try {
                        configuredClient.getIface().sendSupervisorAssignments(nodeAssignments.getAssignments());
                        this.service.sendAssignmentCallback.nodeAssignmentSent(nodeAssignments.getNode(), true);
                    } catch (Exception e) {
                        nodeAssignments.getMetricsRegistry().getMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS).mark();
                        AssignmentDistributionService.LOG.error("Exception when trying to send assignments to node {}: {}", nodeAssignments.getNode(), e.getMessage());
                        this.service.sendAssignmentCallback.nodeAssignmentSent(nodeAssignments.getNode(), false);
                    }
                    if (configuredClient != null) {
                        if (0 != 0) {
                            try {
                                configuredClient.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            configuredClient.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                AssignmentDistributionService.LOG.error("Exception to create supervisor client for node {}: {}", nodeAssignments.getNode(), th3.getMessage());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/classes/data/StormApp.jar:org/apache/storm/nimbus/AssignmentDistributionService$NodeAssignments.class */
    public static class NodeAssignments {
        private String node;
        private String host;
        private Integer serverPort;
        private SupervisorAssignments assignments;
        private StormMetricsRegistry metricsRegistry;

        private NodeAssignments(String str, String str2, Integer num, SupervisorAssignments supervisorAssignments, StormMetricsRegistry stormMetricsRegistry) {
            this.node = str;
            this.host = str2;
            this.serverPort = num;
            this.assignments = supervisorAssignments;
            this.metricsRegistry = stormMetricsRegistry;
        }

        public static NodeAssignments getInstance(String str, String str2, Integer num, SupervisorAssignments supervisorAssignments, StormMetricsRegistry stormMetricsRegistry) {
            return new NodeAssignments(str, str2, num, supervisorAssignments, stormMetricsRegistry);
        }

        public String getNode() {
            return this.node;
        }

        public String getHost() {
            return this.host;
        }

        public Integer getServerPort() {
            return this.serverPort;
        }

        public SupervisorAssignments getAssignments() {
            return this.assignments;
        }

        public StormMetricsRegistry getMetricsRegistry() {
            return this.metricsRegistry;
        }
    }

    public static AssignmentDistributionService getInstance(Map map, INodeAssignmentSentCallBack iNodeAssignmentSentCallBack) {
        AssignmentDistributionService assignmentDistributionService = new AssignmentDistributionService();
        assignmentDistributionService.prepare(map, iNodeAssignmentSentCallBack);
        return assignmentDistributionService;
    }

    public void prepare(Map map, INodeAssignmentSentCallBack iNodeAssignmentSentCallBack) {
        this.conf = map;
        this.sendAssignmentCallback = iNodeAssignmentSentCallBack;
        this.random = new Random(47L);
        this.threadsNum = ObjectReader.getInt(map.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREADS), 10).intValue();
        this.queueSize = ObjectReader.getInt(map.get(DaemonConfig.NIMBUS_ASSIGNMENTS_SERVICE_THREAD_QUEUE_SIZE), 100).intValue();
        this.assignmentsQueue = new HashMap();
        for (int i = 0; i < this.threadsNum; i++) {
            this.assignmentsQueue.put(Integer.valueOf(i), new LinkedBlockingQueue<>(this.queueSize));
        }
        this.service = Executors.newFixedThreadPool(this.threadsNum);
        this.active = true;
        for (int i2 = 0; i2 < this.threadsNum; i2++) {
            this.service.submit(new DistributeTask(this, Integer.valueOf(i2)));
        }
        this.localSupervisors = new HashMap();
        if (ConfigUtils.isLocalMode(map)) {
            this.isLocalMode = true;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.active = false;
        this.service.shutdownNow();
        try {
            this.service.awaitTermination(1L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            LOG.error("Failed to close assignments distribute service");
        }
        this.assignmentsQueue = null;
    }

    public void addAssignmentsForNode(String str, String str2, Integer num, SupervisorAssignments supervisorAssignments, StormMetricsRegistry stormMetricsRegistry) {
        try {
            if (num == null) {
                LOG.warn("Discard an assignment distribution for node {} because server port info is missing.", str);
            } else {
                if (!nextQueue().offer(NodeAssignments.getInstance(str, str2, num, supervisorAssignments, stormMetricsRegistry), 5L, TimeUnit.SECONDS)) {
                    LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full.", str);
                }
            }
        } catch (InterruptedException e) {
            LOG.error("Add node assignments interrupted: {}", e.getMessage());
            throw new RuntimeException(e);
        }
    }

    public void addLocalSupervisor(Supervisor supervisor) {
        this.localSupervisors.put(supervisor.getId(), supervisor);
    }

    private Integer nextQueueId() {
        return Integer.valueOf(this.random.nextInt(this.threadsNum));
    }

    private LinkedBlockingQueue<NodeAssignments> nextQueue() {
        return this.assignmentsQueue.get(nextQueueId());
    }

    private LinkedBlockingQueue<NodeAssignments> getQueueById(Integer num) {
        return this.assignmentsQueue.get(num);
    }

    public NodeAssignments nextAssignments(Integer num) throws InterruptedException {
        while (true) {
            NodeAssignments poll = getQueueById(num).poll();
            if (poll != null) {
                return poll;
            }
            Time.sleep(100L);
        }
    }

    public boolean isActive() {
        return this.active;
    }

    public Map getConf() {
        return this.conf;
    }
}
