package org.apache.storm.streams.processors;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.storm.streams.Pair;
import org.apache.storm.streams.ProcessorNode;
import org.apache.storm.streams.RefCountedTuple;
import org.apache.storm.streams.StreamUtil;
import org.apache.storm.streams.WindowNode;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/data/StormApp.jar:org/apache/storm/streams/processors/EmittingProcessorContext.class */
public class EmittingProcessorContext implements ProcessorContext {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) EmittingProcessorContext.class);
    private final ProcessorNode processorNode;
    private final String outputStreamId;
    private final String punctuationStreamId;
    private final OutputCollector collector;
    private final Fields outputFields;
    private long eventTimestamp;
    private String timestampField;
    private final List<RefCountedTuple> anchors = new ArrayList();
    private final Values punctuation = new Values(WindowNode.PUNCTUATION);

    public EmittingProcessorContext(ProcessorNode processorNode, OutputCollector outputCollector, String str) {
        this.processorNode = processorNode;
        this.outputStreamId = str;
        this.collector = outputCollector;
        this.outputFields = processorNode.getOutputFields();
        this.punctuationStreamId = StreamUtil.getPunctuationStream(str);
    }

    @Override // org.apache.storm.streams.processors.ProcessorContext
    public <T> void forward(T t) {
        if (WindowNode.PUNCTUATION.equals(t)) {
            emit(this.punctuation, this.punctuationStreamId);
            maybeAck();
        } else if (!this.processorNode.emitsPair()) {
            emit(new Values(t), this.outputStreamId);
        } else {
            Pair pair = (Pair) t;
            emit(new Values(pair.getFirst(), pair.getSecond()), this.outputStreamId);
        }
    }

    @Override // org.apache.storm.streams.processors.ProcessorContext
    public <T> void forward(T t, String str) {
        if (str.equals(this.outputStreamId)) {
            forward(t);
        }
    }

    @Override // org.apache.storm.streams.processors.ProcessorContext
    public boolean isWindowed() {
        return this.processorNode.isWindowed();
    }

    @Override // org.apache.storm.streams.processors.ProcessorContext
    public Set<String> getWindowedParentStreams() {
        return this.processorNode.getWindowedParentStreams();
    }

    public void setTimestampField(String str) {
        this.timestampField = str;
    }

    public void setAnchor(RefCountedTuple refCountedTuple) {
        if (this.processorNode.isWindowed() && this.processorNode.isBatch()) {
            refCountedTuple.increment();
            this.anchors.add(refCountedTuple);
            return;
        }
        if (this.anchors.isEmpty()) {
            this.anchors.add(refCountedTuple);
        } else {
            this.anchors.set(0, refCountedTuple);
        }
        if (StreamUtil.isPunctuation(refCountedTuple.tuple().getValue(0))) {
            refCountedTuple.increment();
        }
    }

    public void setEventTimestamp(long j) {
        this.eventTimestamp = j;
    }

    private void maybeAck() {
        if (this.anchors.isEmpty()) {
            return;
        }
        for (RefCountedTuple refCountedTuple : this.anchors) {
            refCountedTuple.decrement();
            if (refCountedTuple.shouldAck()) {
                LOG.debug("Acking {} ", refCountedTuple);
                this.collector.ack(refCountedTuple.tuple());
                refCountedTuple.setAcked();
            }
        }
        this.anchors.clear();
    }

    private Collection<Tuple> tuples(Collection<RefCountedTuple> collection) {
        return (Collection) collection.stream().map((v0) -> {
            return v0.tuple();
        }).collect(Collectors.toList());
    }

    private void emit(Values values, String str) {
        if (this.timestampField != null) {
            values.add(Long.valueOf(this.eventTimestamp));
        }
        if (this.anchors.isEmpty()) {
            LOG.debug("Emit un-anchored, outputStreamId: {}, values: {}", str, values);
            this.collector.emit(str, values);
        } else {
            LOG.debug("Emit, outputStreamId: {}, anchors: {}, values: {}", str, this.anchors, values);
            this.collector.emit(str, tuples(this.anchors), values);
        }
    }
}
