package org.apache.storm.scheduler.blacklist;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.storm.DaemonConfig;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.blacklist.reporters.IReporter;
import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
import org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy;
import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
import org.apache.storm.shade.com.google.common.collect.EvictingQueue;
import org.apache.storm.shade.com.google.common.collect.Sets;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/data/StormApp.jar:org/apache/storm/scheduler/blacklist/BlacklistScheduler.class */
public class BlacklistScheduler implements IScheduler {
    public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 300;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BlacklistScheduler.class);
    private final IScheduler underlyingScheduler;
    private StormMetricsRegistry metricsRegistry;
    protected int toleranceTime;
    protected int toleranceCount;
    protected int resumeTime;
    protected IReporter reporter;
    protected IBlacklistStrategy blacklistStrategy;
    protected int nimbusMonitorFreqSecs;
    protected Map<String, Set<Integer>> cachedSupervisors;
    protected EvictingQueue<Map<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow;
    protected EvictingQueue<Map<String, Integer>> sendAssignmentFailureCount;
    private final Map<String, Integer> assignmentFailures = new HashMap();
    protected int windowSize;
    protected volatile Set<String> blacklistedSupervisorIds;
    private boolean blacklistOnBadSlots;
    private Map<String, Object> conf;
    private boolean blacklistSendAssignentFailures;

    public BlacklistScheduler(IScheduler iScheduler) {
        this.underlyingScheduler = iScheduler;
    }

    @Override // org.apache.storm.scheduler.IScheduler
    public void prepare(Map<String, Object> map, StormMetricsRegistry stormMetricsRegistry) {
        LOG.info("Preparing black list scheduler");
        this.underlyingScheduler.prepare(map, stormMetricsRegistry);
        this.conf = map;
        this.metricsRegistry = stormMetricsRegistry;
        this.toleranceTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME), 300).intValue();
        this.toleranceCount = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), 3).intValue();
        this.resumeTime = ObjectReader.getInt(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), 1800).intValue();
        this.blacklistSendAssignentFailures = ObjectReader.getBoolean(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_ENABLE_SEND_ASSIGNMENT_FAILURES), false);
        this.reporter = (IReporter) initializeInstance(ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER), LogReporter.class.getName()), "blacklist reporter");
        this.blacklistStrategy = (IBlacklistStrategy) initializeInstance(ObjectReader.getString(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY), DefaultBlacklistStrategy.class.getName()), "blacklist strategy");
        this.nimbusMonitorFreqSecs = ObjectReader.getInt(this.conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)).intValue();
        this.blacklistStrategy.prepare(this.conf);
        this.windowSize = this.toleranceTime / this.nimbusMonitorFreqSecs;
        this.badSupervisorsToleranceSlidingWindow = EvictingQueue.create(this.windowSize);
        this.sendAssignmentFailureCount = EvictingQueue.create(this.windowSize);
        this.cachedSupervisors = new HashMap();
        this.blacklistedSupervisorIds = new HashSet();
        this.blacklistOnBadSlots = ObjectReader.getBoolean(this.conf.get(DaemonConfig.BLACKLIST_SCHEDULER_ASSUME_SUPERVISOR_BAD_BASED_ON_BAD_SLOT), true);
        stormMetricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", () -> {
            return Integer.valueOf(this.blacklistedSupervisorIds.size());
        });
    }

    @Override // org.apache.storm.scheduler.IScheduler
    public void cleanup() {
        LOG.info("Cleanup black list scheduler");
        this.underlyingScheduler.cleanup();
    }

    @Override // org.apache.storm.scheduler.IScheduler
    public void schedule(Topologies topologies, Cluster cluster) {
        LOG.debug("running Black List scheduler");
        LOG.debug("AssignableSlots: {}", cluster.getAssignableSlots());
        LOG.debug("AvailableSlots: {}", cluster.getAvailableSlots());
        LOG.debug("UsedSlots: {}", cluster.getUsedSlots());
        Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
        this.blacklistStrategy.resumeFromBlacklist();
        trackMissedHeartbeats(supervisors);
        trackAssignmentFailures();
        this.blacklistedSupervisorIds = refreshBlacklistedSupervisorIds(cluster, topologies);
        cluster.setBlacklistedHosts(getBlacklistHosts(cluster, this.blacklistedSupervisorIds));
        removeLongTimeDisappearFromCache();
        this.underlyingScheduler.schedule(topologies, cluster);
    }

    @Override // org.apache.storm.scheduler.IScheduler
    public Map<String, Map<String, Double>> config() {
        return this.underlyingScheduler.config();
    }

    private void trackMissedHeartbeats(Map<String, SupervisorDetails> map) {
        Sets.SetView<String> difference = Sets.difference(this.cachedSupervisors.keySet(), map.keySet());
        HashMap hashMap = new HashMap();
        for (String str : difference) {
            hashMap.put(str, this.cachedSupervisors.get(str));
        }
        for (Map.Entry<String, SupervisorDetails> entry : map.entrySet()) {
            String key = entry.getKey();
            SupervisorDetails value = entry.getValue();
            if (!this.cachedSupervisors.containsKey(key)) {
                this.cachedSupervisors.put(key, value.getAllPorts());
            } else if (this.blacklistOnBadSlots) {
                Set<Integer> badSlots = badSlots(value, key);
                if (badSlots.size() > 0) {
                    hashMap.put(key, badSlots);
                }
            }
        }
        this.badSupervisorsToleranceSlidingWindow.add(hashMap);
    }

    private void trackAssignmentFailures() {
        if (this.blacklistSendAssignentFailures) {
            HashMap hashMap = new HashMap();
            synchronized (this.assignmentFailures) {
                hashMap.putAll(this.assignmentFailures);
                this.assignmentFailures.clear();
            }
            this.sendAssignmentFailureCount.add(hashMap);
        }
    }

    private Set<Integer> badSlots(SupervisorDetails supervisorDetails, String str) {
        Set<Integer> set = this.cachedSupervisors.get(str);
        Set<Integer> allPorts = supervisorDetails.getAllPorts();
        Sets.SetView difference = Sets.difference(allPorts, set);
        if (difference.size() > 0) {
            HashSet hashSet = new HashSet(difference);
            hashSet.addAll(set);
            this.cachedSupervisors.put(str, hashSet);
        }
        return Sets.difference(set, allPorts);
    }

    private Set<String> refreshBlacklistedSupervisorIds(Cluster cluster, Topologies topologies) {
        Set<String> blacklist = this.blacklistStrategy.getBlacklist(new ArrayList(this.badSupervisorsToleranceSlidingWindow), new ArrayList(this.sendAssignmentFailureCount), cluster, topologies);
        LOG.info("Supervisors {} are blacklisted.", blacklist);
        return blacklist;
    }

    private Set<String> getBlacklistHosts(Cluster cluster, Set<String> set) {
        HashSet hashSet = new HashSet();
        for (String str : set) {
            String host = cluster.getHost(str);
            if (host != null) {
                hashSet.add(host);
            } else {
                LOG.info("supervisor {} is not alive, do not need to add to blacklist.", str);
            }
        }
        return hashSet;
    }

    private void removeLongTimeDisappearFromCache() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Iterator<Map<String, Set<Integer>>> it = this.badSupervisorsToleranceSlidingWindow.iterator();
        while (it.hasNext()) {
            Map<String, Set<Integer>> next = it.next();
            for (String str : next.keySet()) {
                int intValue = ((Integer) hashMap.getOrDefault(str, 0)).intValue();
                Set<Integer> set = next.get(str);
                if (set.equals(this.cachedSupervisors.get(str))) {
                    hashMap.put(str, Integer.valueOf(intValue + 1));
                }
                Iterator<Integer> it2 = set.iterator();
                while (it2.hasNext()) {
                    WorkerSlot workerSlot = new WorkerSlot(str, it2.next());
                    hashMap2.put(workerSlot, Integer.valueOf(((Integer) hashMap2.getOrDefault(workerSlot, 0)).intValue() + 1));
                }
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            String str2 = (String) entry.getKey();
            if (((Integer) entry.getValue()).intValue() == this.windowSize) {
                this.cachedSupervisors.remove(str2);
                LOG.info("Supervisor {} was never back to normal during tolerance period, probably dead. Will remove from cache.", str2);
            }
        }
        for (Map.Entry entry2 : hashMap2.entrySet()) {
            WorkerSlot workerSlot2 = (WorkerSlot) entry2.getKey();
            String nodeId = workerSlot2.getNodeId();
            Integer valueOf = Integer.valueOf(workerSlot2.getPort());
            if (((Integer) entry2.getValue()).intValue() == this.windowSize) {
                Set<Integer> set2 = this.cachedSupervisors.get(nodeId);
                if (set2 != null) {
                    set2.remove(valueOf);
                    this.cachedSupervisors.put(nodeId, set2);
                }
                LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.", workerSlot2);
            }
        }
    }

    private Object initializeInstance(String str, String str2) {
        try {
            return ReflectionUtils.newInstance(str);
        } catch (RuntimeException e) {
            Throwable cause = e.getCause();
            if (cause instanceof ClassNotFoundException) {
                LOG.error("Can't find {} for name {}", str2, str);
            } else if (cause instanceof InstantiationException) {
                LOG.error("Throw InstantiationException {} for name {}", str2, str);
            } else if (cause instanceof IllegalAccessException) {
                LOG.error("Throw IllegalAccessException {} for name {}", str2, str);
            } else {
                LOG.error("Throw unexpected exception {} {} for name {}", cause, str2, str);
            }
            throw e;
        }
    }

    public Set<String> getBlacklistSupervisorIds() {
        return Collections.unmodifiableSet(this.blacklistedSupervisorIds);
    }

    @Override // org.apache.storm.scheduler.INodeAssignmentSentCallBack
    public void nodeAssignmentSent(String str, boolean z) {
        if (this.blacklistSendAssignentFailures && !z) {
            synchronized (this.assignmentFailures) {
                this.assignmentFailures.put(str, Integer.valueOf(this.assignmentFailures.getOrDefault(str, 0).intValue() + 1));
            }
        }
    }
}
