package org.apache.storm.streams.processors;

import java.util.HashMap;
import java.util.Map;
import org.apache.storm.streams.Pair;
import org.apache.storm.streams.operations.Reducer;

/* loaded from: input_file:BOOT-INF/classes/data/StormApp.jar:org/apache/storm/streams/processors/ReduceByKeyProcessor.class */
public class ReduceByKeyProcessor<K, V> extends BaseProcessor<Pair<K, V>> implements BatchProcessor {
    private final Reducer<V> reducer;
    private final Map<K, V> state = new HashMap();

    public ReduceByKeyProcessor(Reducer<V> reducer) {
        this.reducer = reducer;
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor
    public void execute(Pair<K, V> pair) {
        K first = pair.getFirst();
        V second = pair.getSecond();
        V v = this.state.get(first);
        V apply = v == null ? second : this.reducer.apply(v, second);
        this.state.put(first, apply);
        mayBeForwardAggUpdate(() -> {
            return Pair.of(first, apply);
        });
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor
    public void finish() {
        for (Map.Entry<K, V> entry : this.state.entrySet()) {
            this.context.forward(Pair.of(entry.getKey(), entry.getValue()));
        }
        this.state.clear();
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public /* bridge */ /* synthetic */ void punctuate(String str) {
        super.punctuate(str);
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public /* bridge */ /* synthetic */ void init(ProcessorContext processorContext) {
        super.init(processorContext);
    }
}
