package org.apache.storm.streams.processors;

import org.apache.storm.streams.operations.CombinerAggregator;

/* loaded from: input_file:BOOT-INF/classes/data/StormApp.jar:org/apache/storm/streams/processors/MergeAggregateProcessor.class */
public class MergeAggregateProcessor<T, A, R> extends BaseProcessor<A> implements BatchProcessor {
    private final CombinerAggregator<T, A, R> aggregator;
    private A state;

    public MergeAggregateProcessor(CombinerAggregator<T, A, R> combinerAggregator) {
        this.aggregator = combinerAggregator;
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor
    protected void execute(A a) {
        if (this.state == null) {
            this.state = this.aggregator.init();
        }
        this.state = this.aggregator.merge(this.state, a);
        mayBeForwardAggUpdate(() -> {
            return this.aggregator.result(this.state);
        });
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor
    public void finish() {
        if (this.state != null) {
            this.context.forward(this.aggregator.result(this.state));
            this.state = null;
        }
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public /* bridge */ /* synthetic */ void punctuate(String str) {
        super.punctuate(str);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public /* bridge */ /* synthetic */ void execute(Object obj, String str) {
        super.execute(obj, str);
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public /* bridge */ /* synthetic */ void init(ProcessorContext processorContext) {
        super.init(processorContext);
    }
}
