package org.apache.storm.messaging.netty;

import java.io.IOException;
import java.net.ConnectException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.serialization.KryoValuesDeserializer;
import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/data/StormApp.jar:org/apache/storm/messaging/netty/StormClientHandler.class */
public class StormClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StormClientHandler.class);
    private final Client client;
    private final KryoValuesDeserializer des;
    private final AtomicBoolean[] remoteBpStatus;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StormClientHandler(Client client, AtomicBoolean[] atomicBooleanArr, Map<String, Object> map) {
        this.client = client;
        this.remoteBpStatus = atomicBooleanArr;
        this.des = new KryoValuesDeserializer(map);
    }

    @Override // org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.storm.shade.io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof ControlMessage) {
            ControlMessage controlMessage = (ControlMessage) obj;
            if (controlMessage == ControlMessage.FAILURE_RESPONSE) {
                LOG.info("failure response:{}", controlMessage);
                return;
            }
            return;
        }
        if (!(obj instanceof BackPressureStatus)) {
            if (!(obj instanceof List)) {
                throw new RuntimeException("Don't know how to handle a message of type " + obj + " (" + this.client.getDstAddress() + ")");
            }
            List list = (List) obj;
            if (list.size() < 1) {
                throw new RuntimeException("Didn't see enough load metrics (" + this.client.getDstAddress() + ") " + list);
            }
            TaskMessage taskMessage = (TaskMessage) list.get(list.size() - 1);
            if (taskMessage.task() != -1) {
                throw new RuntimeException("Metrics messages are sent to the system task (" + this.client.getDstAddress() + ") " + taskMessage);
            }
            List<Object> deserialize = this.des.deserialize(taskMessage.message());
            if (deserialize.size() < 1) {
                throw new RuntimeException("No metrics data in the metrics message (" + this.client.getDstAddress() + ") " + deserialize);
            }
            if (!(deserialize.get(0) instanceof Map)) {
                throw new RuntimeException("The metrics did not have a map in the first slot (" + this.client.getDstAddress() + ") " + deserialize);
            }
            this.client.setLoadMetrics((Map) deserialize.get(0));
            return;
        }
        BackPressureStatus backPressureStatus = (BackPressureStatus) obj;
        if (backPressureStatus.bpTasks != null) {
            Iterator<Integer> it = backPressureStatus.bpTasks.iterator();
            while (it.hasNext()) {
                try {
                    this.remoteBpStatus[it.next().intValue()].set(true);
                } catch (ArrayIndexOutOfBoundsException e) {
                    LOG.error("BP index out of bounds {}", (Throwable) e);
                }
            }
        }
        if (backPressureStatus.nonBpTasks != null) {
            Iterator<Integer> it2 = backPressureStatus.nonBpTasks.iterator();
            while (it2.hasNext()) {
                try {
                    this.remoteBpStatus[it2.next().intValue()].set(false);
                } catch (ArrayIndexOutOfBoundsException e2) {
                    LOG.error("BP index out of bounds {}", (Throwable) e2);
                }
            }
        }
        LOG.debug("Received BackPressure status update : {}", backPressureStatus);
    }

    @Override // org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.storm.shade.io.netty.channel.ChannelHandlerAdapter, org.apache.storm.shade.io.netty.channel.ChannelHandler, org.apache.storm.shade.io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        if (th instanceof ConnectException) {
            return;
        }
        if (th instanceof IOException) {
            LOG.info("Connection to {} failed: {}", this.client.getDstAddress(), th.getMessage());
        } else {
            LOG.error("Connection to {} failed: {}", this.client.getDstAddress(), th);
        }
    }
}
