package org.apache.storm.streams;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.shade.com.google.common.collect.ArrayListMultimap;
import org.apache.storm.shade.com.google.common.collect.HashBasedTable;
import org.apache.storm.shade.com.google.common.collect.Multimap;
import org.apache.storm.shade.com.google.common.collect.Table;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.graph.DirectedSubgraph;
import org.apache.storm.shade.org.jgrapht.traverse.TopologicalOrderIterator;
import org.apache.storm.stats.ClientStatsUtil;
import org.apache.storm.streams.processors.ChainedProcessorContext;
import org.apache.storm.streams.processors.EmittingProcessorContext;
import org.apache.storm.streams.processors.ForwardingProcessorContext;
import org.apache.storm.streams.processors.ProcessorContext;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/data/StormApp.jar:org/apache/storm/streams/ProcessorBoltDelegate.class */
class ProcessorBoltDelegate implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ProcessorBoltDelegate.class);
    private final String id;
    private final DirectedGraph<Node, Edge> graph;
    private final List<ProcessorNode> nodes;
    private final List<ProcessorNode> outgoingProcessors = new ArrayList();
    private final Set<EmittingProcessorContext> emittingProcessorContexts = new HashSet();
    private final Table<ProcessorNode, String, Integer> punctuationState = HashBasedTable.create();
    private final Map<String, Integer> streamToInputTaskCount = new HashMap();
    private Map<String, Object> topoConf;
    private TopologyContext topologyContext;
    private OutputCollector outputCollector;
    private Multimap<String, ProcessorNode> streamToInitialProcessors;
    private String timestampField;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProcessorBoltDelegate(String str, DirectedGraph<Node, Edge> directedGraph, List<ProcessorNode> list) {
        this.id = str;
        this.graph = directedGraph;
        this.nodes = new ArrayList(list);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getId() {
        return this.id;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addNodes(Collection<ProcessorNode> collection) {
        this.nodes.addAll(collection);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<ProcessorNode> getNodes() {
        return Collections.unmodifiableList(this.nodes);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        ProcessorContext chainedProcessorContext;
        this.topoConf = map;
        this.topologyContext = topologyContext;
        this.outputCollector = outputCollector;
        DirectedSubgraph directedSubgraph = new DirectedSubgraph(this.graph, new HashSet(this.nodes), null);
        TopologicalOrderIterator topologicalOrderIterator = new TopologicalOrderIterator(directedSubgraph);
        while (topologicalOrderIterator.hasNext()) {
            Node node = (Node) topologicalOrderIterator.next();
            if (!(node instanceof ProcessorNode)) {
                throw new IllegalStateException("Not a processor node " + node);
            }
            ProcessorNode processorNode = (ProcessorNode) node;
            List<ProcessorNode> children = StreamUtil.getChildren(directedSubgraph, processorNode);
            if (children.isEmpty()) {
                chainedProcessorContext = createEmittingContext(processorNode);
            } else {
                ArrayListMultimap create = ArrayListMultimap.create();
                for (ProcessorNode processorNode2 : children) {
                    Iterator<String> it = processorNode2.getParentStreams(processorNode).iterator();
                    while (it.hasNext()) {
                        create.put(it.next(), processorNode2);
                    }
                }
                ForwardingProcessorContext forwardingProcessorContext = new ForwardingProcessorContext(processorNode, create);
                chainedProcessorContext = hasOutgoingChild(processorNode, new HashSet(children)) ? new ChainedProcessorContext(processorNode, forwardingProcessorContext, createEmittingContext(processorNode)) : forwardingProcessorContext;
            }
            processorNode.initProcessorContext(chainedProcessorContext);
        }
        if (this.timestampField != null) {
            Iterator<EmittingProcessorContext> it2 = this.emittingProcessorContexts.iterator();
            while (it2.hasNext()) {
                it2.next().setTimestampField(this.timestampField);
            }
        }
        for (String str : this.streamToInitialProcessors.keySet()) {
            this.streamToInputTaskCount.put(str, Integer.valueOf(getStreamInputTaskCount(topologyContext, str)));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        for (ProcessorNode processorNode : this.nodes) {
            for (String str : processorNode.getOutputStreams()) {
                if (this.timestampField == null) {
                    outputFieldsDeclarer.declareStream(str, processorNode.getOutputFields());
                } else {
                    ArrayList arrayList = new ArrayList();
                    arrayList.addAll(processorNode.getOutputFields().toList());
                    arrayList.add(this.timestampField);
                    outputFieldsDeclarer.declareStream(str, new Fields(arrayList));
                }
                outputFieldsDeclarer.declareStream(StreamUtil.getPunctuationStream(str), StreamUtil.getPunctuationFields());
            }
        }
    }

    void setAnchor(RefCountedTuple refCountedTuple) {
        Iterator<EmittingProcessorContext> it = this.emittingProcessorContexts.iterator();
        while (it.hasNext()) {
            it.next().setAnchor(refCountedTuple);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Pair<Object, String> getValueAndStream(Tuple tuple) {
        Object value;
        String sourceStreamId;
        if (tuple.getSourceComponent().startsWith(ClientStatsUtil.SPOUT)) {
            value = tuple;
            sourceStreamId = tuple.getSourceGlobalStreamId().get_componentId() + tuple.getSourceGlobalStreamId().get_streamId();
        } else if (isPair(tuple)) {
            value = Pair.of(tuple.getValue(0), tuple.getValue(1));
            sourceStreamId = tuple.getSourceStreamId();
        } else {
            value = tuple.getValue(0);
            sourceStreamId = tuple.getSourceStreamId();
        }
        return Pair.of(value, sourceStreamId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processAndAck(Tuple tuple) {
        RefCountedTuple refCountedTuple = new RefCountedTuple(tuple);
        setAnchor(refCountedTuple);
        if (isEventTimestamp()) {
            setEventTimestamp(tuple.getLongByField(getTimestampField()).longValue());
        }
        Pair<Object, String> valueAndStream = getValueAndStream(tuple);
        process(valueAndStream.getFirst(), valueAndStream.getSecond());
        ack(refCountedTuple);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void process(Object obj, String str) {
        LOG.debug("Process value {}, sourceStreamId {}", obj, str);
        if (StreamUtil.isPunctuation(obj)) {
            punctuateInitialProcessors(str);
        } else {
            executeInitialProcessors(obj, str);
        }
    }

    private void punctuateInitialProcessors(String str) {
        String sourceStream = StreamUtil.getSourceStream(str);
        for (ProcessorNode processorNode : this.streamToInitialProcessors.get(sourceStream)) {
            if (shouldPunctuate(processorNode, sourceStream)) {
                processorNode.getProcessor().punctuate(null);
                clearPunctuationState(processorNode);
            }
        }
    }

    private void executeInitialProcessors(Object obj, String str) {
        Iterator<ProcessorNode> it = this.streamToInitialProcessors.get(str).iterator();
        while (it.hasNext()) {
            it.next().getProcessor().execute(obj, str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setStreamToInitialProcessors(Multimap<String, ProcessorNode> multimap) {
        this.streamToInitialProcessors = multimap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addStreamToInitialProcessors(Multimap<String, ProcessorNode> multimap) {
        this.streamToInitialProcessors.putAll(multimap);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<String> getInitialStreams() {
        return this.streamToInitialProcessors.keySet();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isEventTimestamp() {
        return this.timestampField != null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setEventTimestamp(long j) {
        Iterator<EmittingProcessorContext> it = this.emittingProcessorContexts.iterator();
        while (it.hasNext()) {
            it.next().setEventTimestamp(j);
        }
    }

    private String getTimestampField() {
        return this.timestampField;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setTimestampField(String str) {
        this.timestampField = str;
    }

    private void ack(RefCountedTuple refCountedTuple) {
        if (refCountedTuple.shouldAck()) {
            LOG.debug("ACKing tuple {}", refCountedTuple);
            this.outputCollector.ack(refCountedTuple.tuple());
            refCountedTuple.setAcked();
        }
    }

    private ProcessorContext createEmittingContext(ProcessorNode processorNode) {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = processorNode.getOutputStreams().iterator();
        while (it.hasNext()) {
            arrayList.add(new EmittingProcessorContext(processorNode, this.outputCollector, it.next()));
        }
        this.emittingProcessorContexts.addAll(arrayList);
        this.outgoingProcessors.add(processorNode);
        return new ChainedProcessorContext(processorNode, arrayList);
    }

    private boolean hasOutgoingChild(ProcessorNode processorNode, Set<ProcessorNode> set) {
        for (Node node : getChildNodes(processorNode)) {
            if (((node instanceof ProcessorNode) && !set.contains(node)) || (node instanceof SinkNode)) {
                return true;
            }
        }
        return false;
    }

    private Set<Node> getChildNodes(Node node) {
        HashSet hashSet = new HashSet();
        for (Node node2 : StreamUtil.getChildren(this.graph, node)) {
            if ((node2 instanceof WindowNode) || (node2 instanceof PartitionNode)) {
                hashSet.addAll(getChildNodes(node2));
            } else {
                hashSet.add(node2);
            }
        }
        return hashSet;
    }

    private boolean shouldPunctuate(ProcessorNode processorNode, String str) {
        if (processorNode.getWindowedParentStreams().isEmpty()) {
            return true;
        }
        updateCount(processorNode, str);
        if (this.punctuationState.row(processorNode).size() != processorNode.getWindowedParentStreams().size()) {
            return false;
        }
        Set<String> keySet = this.punctuationState.row(processorNode).keySet();
        if (!keySet.equals(processorNode.getWindowedParentStreams())) {
            throw new IllegalStateException("Received punctuation from streams " + keySet + " expected " + processorNode.getWindowedParentStreams());
        }
        for (String str2 : keySet) {
            if (this.streamToInputTaskCount.get(str2) == null) {
                throw new IllegalStateException("Punctuation received on unexpected stream '" + str2 + "' for which input task count is not set.");
            }
            if (this.punctuationState.get(processorNode, str2).intValue() < this.streamToInputTaskCount.get(str2).intValue()) {
                return false;
            }
        }
        return true;
    }

    private void updateCount(ProcessorNode processorNode, String str) {
        Integer num = this.punctuationState.get(processorNode, str);
        if (num == null) {
            this.punctuationState.put(processorNode, str, 1);
        } else {
            this.punctuationState.put(processorNode, str, Integer.valueOf(num.intValue() + 1));
        }
    }

    private void clearPunctuationState(ProcessorNode processorNode) {
        if (this.punctuationState.isEmpty()) {
            return;
        }
        Map<String, Integer> row = this.punctuationState.row(processorNode);
        if (row.isEmpty()) {
            return;
        }
        row.clear();
    }

    private boolean isPair(Tuple tuple) {
        return tuple.size() == (this.timestampField == null ? 2 : 3);
    }

    private int getStreamInputTaskCount(TopologyContext topologyContext, String str) {
        int i = 0;
        for (GlobalStreamId globalStreamId : topologyContext.getThisSources().keySet()) {
            if (str.equals(getStreamId(globalStreamId))) {
                i += topologyContext.getComponentTasks(globalStreamId.get_componentId()).size();
            }
        }
        return i;
    }

    private String getStreamId(GlobalStreamId globalStreamId) {
        return globalStreamId.get_componentId().startsWith(ClientStatsUtil.SPOUT) ? globalStreamId.get_componentId() + globalStreamId.get_streamId() : globalStreamId.get_streamId();
    }
}
