package org.apache.storm.streams.processors;

import java.util.HashMap;
import java.util.Map;
import org.apache.storm.streams.Pair;
import org.apache.storm.streams.operations.CombinerAggregator;

/* loaded from: input_file:BOOT-INF/classes/data/StormApp.jar:org/apache/storm/streams/processors/AggregateByKeyProcessor.class */
public class AggregateByKeyProcessor<K, V, A, R> extends BaseProcessor<Pair<K, V>> implements BatchProcessor {
    private final CombinerAggregator<V, A, R> aggregator;
    private final boolean emitAggregate;
    private final Map<K, A> state;

    public AggregateByKeyProcessor(CombinerAggregator<V, A, R> combinerAggregator) {
        this(combinerAggregator, false);
    }

    public AggregateByKeyProcessor(CombinerAggregator<V, A, R> combinerAggregator, boolean z) {
        this.state = new HashMap();
        this.aggregator = combinerAggregator;
        this.emitAggregate = z;
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor
    public void execute(Pair<K, V> pair) {
        K first = pair.getFirst();
        V second = pair.getSecond();
        A a = this.state.get(first);
        if (a == null) {
            a = this.aggregator.init();
        }
        this.state.put(first, this.aggregator.apply(a, second));
        if (this.emitAggregate) {
            mayBeForwardAggUpdate(() -> {
                return Pair.of(first, this.state.get(first));
            });
        } else {
            mayBeForwardAggUpdate(() -> {
                return Pair.of(first, this.aggregator.result(this.state.get(first)));
            });
        }
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor
    public void finish() {
        for (Map.Entry<K, A> entry : this.state.entrySet()) {
            if (this.emitAggregate) {
                this.context.forward(Pair.of(entry.getKey(), entry.getValue()));
            } else {
                this.context.forward(Pair.of(entry.getKey(), this.aggregator.result(entry.getValue())));
            }
        }
        this.state.clear();
    }

    public String toString() {
        return "AggregateByKeyProcessor{aggregator=" + this.aggregator + ", emitAggregate=" + this.emitAggregate + ", state=" + this.state + "}";
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public /* bridge */ /* synthetic */ void punctuate(String str) {
        super.punctuate(str);
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public /* bridge */ /* synthetic */ void init(ProcessorContext processorContext) {
        super.init(processorContext);
    }
}
