package org.apache.storm.streams.processors;

import java.util.ArrayList;
import org.apache.storm.shade.com.google.common.collect.ArrayListMultimap;
import org.apache.storm.shade.com.google.common.collect.Multimap;
import org.apache.storm.streams.Pair;

/* loaded from: input_file:BOOT-INF/classes/data/StormApp.jar:org/apache/storm/streams/processors/CoGroupByKeyProcessor.class */
public class CoGroupByKeyProcessor<K, V1, V2> extends BaseProcessor<Pair<K, ?>> implements BatchProcessor {
    private final String firstStream;
    private final String secondStream;
    private final Multimap<K, V1> firstMap = ArrayListMultimap.create();
    private final Multimap<K, V2> secondMap = ArrayListMultimap.create();

    public CoGroupByKeyProcessor(String str, String str2) {
        this.firstStream = str;
        this.secondStream = str2;
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public void execute(Pair<K, ?> pair, String str) {
        K first = pair.getFirst();
        if (str.equals(this.firstStream)) {
            this.firstMap.put(first, pair.getSecond());
        } else if (str.equals(this.secondStream)) {
            this.secondMap.put(first, pair.getSecond());
        }
        if (this.context.isWindowed()) {
            return;
        }
        forwardValues();
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor
    public void finish() {
        forwardValues();
        this.firstMap.clear();
        this.secondMap.clear();
    }

    private void forwardValues() {
        this.firstMap.asMap().forEach((obj, collection) -> {
            this.context.forward(Pair.of(obj, Pair.of(new ArrayList(collection), this.secondMap.removeAll(obj))));
        });
        this.secondMap.asMap().forEach((obj2, collection2) -> {
            this.context.forward(Pair.of(obj2, Pair.of(this.firstMap.removeAll(obj2), new ArrayList(collection2))));
        });
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public /* bridge */ /* synthetic */ void punctuate(String str) {
        super.punctuate(str);
    }

    @Override // org.apache.storm.streams.processors.BaseProcessor, org.apache.storm.streams.processors.Processor
    public /* bridge */ /* synthetic */ void init(ProcessorContext processorContext) {
        super.init(processorContext);
    }
}
