package org.springframework.messaging.simp.stomp;

import java.security.Principal;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.SimpLogging;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.MessageHeaderInitializer;
import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.ListenableFutureTask;

/* loaded from: input_file:BOOT-INF/lib/spring-messaging-5.2.9.RELEASE.jar:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.class */
public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler {
    public static final String SYSTEM_SESSION_ID = "_system_";
    private static final long HEARTBEAT_MULTIPLIER = 3;
    private static final int MAX_TIME_TO_CONNECTED_FRAME = 60000;
    private static final byte[] EMPTY_PAYLOAD = new byte[0];
    private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<>(new VoidCallable());
    private static final Message<byte[]> HEARTBEAT_MESSAGE;
    private String relayHost;
    private int relayPort;
    private String clientLogin;
    private String clientPasscode;
    private String systemLogin;
    private String systemPasscode;
    private long systemHeartbeatSendInterval;
    private long systemHeartbeatReceiveInterval;
    private final Map<String, MessageHandler> systemSubscriptions;

    @Nullable
    private String virtualHost;

    @Nullable
    private TcpOperations<byte[]> tcpClient;

    @Nullable
    private MessageHeaderInitializer headerInitializer;
    private final DefaultStats stats;
    private final Map<String, StompConnectionHandler> connectionHandlers;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-messaging-5.2.9.RELEASE.jar:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler$DefaultStats.class */
    public class DefaultStats implements Stats {
        private final AtomicInteger connect;
        private final AtomicInteger connected;
        private final AtomicInteger disconnect;

        private DefaultStats() {
            this.connect = new AtomicInteger();
            this.connected = new AtomicInteger();
            this.disconnect = new AtomicInteger();
        }

        public void incrementConnectCount() {
            this.connect.incrementAndGet();
        }

        public void incrementConnectedCount() {
            this.connected.incrementAndGet();
        }

        public void incrementDisconnectCount() {
            this.disconnect.incrementAndGet();
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.Stats
        public int getTotalHandlers() {
            return StompBrokerRelayMessageHandler.this.connectionHandlers.size();
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.Stats
        public int getTotalConnect() {
            return this.connect.get();
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.Stats
        public int getTotalConnected() {
            return this.connected.get();
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.Stats
        public int getTotalDisconnect() {
            return this.disconnect.get();
        }

        public String toString() {
            return StompBrokerRelayMessageHandler.this.connectionHandlers.size() + " sessions, " + StompBrokerRelayMessageHandler.this.getTcpClientInfo() + (StompBrokerRelayMessageHandler.this.isBrokerAvailable() ? " (available)" : " (not available)") + ", processed CONNECT(" + this.connect.get() + ")-CONNECTED(" + this.connected.get() + ")-DISCONNECT(" + this.disconnect.get() + ")";
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-messaging-5.2.9.RELEASE.jar:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler$Stats.class */
    public interface Stats {
        int getTotalHandlers();

        int getTotalConnect();

        int getTotalConnected();

        int getTotalDisconnect();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-messaging-5.2.9.RELEASE.jar:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler$StompConnectionHandler.class */
    public class StompConnectionHandler implements TcpConnectionHandler<byte[]> {
        private final String sessionId;
        private final boolean isRemoteClientSession;
        private final StompHeaderAccessor connectHeaders;
        private final MessageChannel outboundChannel;

        @Nullable
        private volatile TcpConnection<byte[]> tcpConnection;
        private volatile boolean isStompConnected;

        protected StompConnectionHandler(StompBrokerRelayMessageHandler stompBrokerRelayMessageHandler, String str, StompHeaderAccessor stompHeaderAccessor) {
            this(str, stompHeaderAccessor, true);
        }

        private StompConnectionHandler(String str, StompHeaderAccessor stompHeaderAccessor, boolean z) {
            Assert.notNull(str, "'sessionId' must not be null");
            Assert.notNull(stompHeaderAccessor, "'connectHeaders' must not be null");
            this.sessionId = str;
            this.connectHeaders = stompHeaderAccessor;
            this.isRemoteClientSession = z;
            this.outboundChannel = StompBrokerRelayMessageHandler.this.getClientOutboundChannelForSession(str);
        }

        public String getSessionId() {
            return this.sessionId;
        }

        @Nullable
        protected TcpConnection<byte[]> getTcpConnection() {
            return this.tcpConnection;
        }

        @Override // org.springframework.messaging.tcp.TcpConnectionHandler
        public void afterConnected(TcpConnection<byte[]> tcpConnection) {
            if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.debug("TCP connection opened in session=" + getSessionId());
            }
            this.tcpConnection = tcpConnection;
            tcpConnection.onReadInactivity(() -> {
                if (this.tcpConnection == null || this.isStompConnected) {
                    return;
                }
                handleTcpConnectionFailure("No CONNECTED frame received in 60000 ms.", null);
            }, 60000L);
            tcpConnection.send(MessageBuilder.createMessage(StompBrokerRelayMessageHandler.EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
        }

        @Override // org.springframework.messaging.tcp.TcpConnectionHandler
        public void afterConnectFailure(Throwable th) {
            handleTcpConnectionFailure("Failed to connect: " + th.getMessage(), th);
        }

        protected void handleTcpConnectionFailure(String str, @Nullable Throwable th) {
            if (StompBrokerRelayMessageHandler.this.logger.isInfoEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.info("TCP connection failure in session " + this.sessionId + ": " + str, th);
            }
            try {
                sendStompErrorFrameToClient(str);
            } finally {
                try {
                    clearConnection();
                } catch (Throwable th2) {
                    if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled()) {
                        StompBrokerRelayMessageHandler.this.logger.debug("Failure while clearing TCP connection state in session " + this.sessionId, th2);
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void sendStompErrorFrameToClient(String str) {
            if (this.isRemoteClientSession) {
                StompHeaderAccessor create = StompHeaderAccessor.create(StompCommand.ERROR);
                if (StompBrokerRelayMessageHandler.this.getHeaderInitializer() != null) {
                    StompBrokerRelayMessageHandler.this.getHeaderInitializer().initHeaders(create);
                }
                create.setSessionId(this.sessionId);
                Principal user = this.connectHeaders.getUser();
                if (user != null) {
                    create.setUser(user);
                }
                create.setMessage(str);
                create.setLeaveMutable(true);
                handleInboundMessage(MessageBuilder.createMessage(StompBrokerRelayMessageHandler.EMPTY_PAYLOAD, create.getMessageHeaders()));
            }
        }

        protected void handleInboundMessage(Message<?> message) {
            if (this.isRemoteClientSession) {
                this.outboundChannel.send(message);
            }
        }

        @Override // org.springframework.messaging.tcp.TcpConnectionHandler
        public void handleMessage(Message<byte[]> message) {
            StompHeaderAccessor stompHeaderAccessor = (StompHeaderAccessor) MessageHeaderAccessor.getAccessor((Message<?>) message, StompHeaderAccessor.class);
            Assert.state(stompHeaderAccessor != null, "No StompHeaderAccessor");
            stompHeaderAccessor.setSessionId(this.sessionId);
            Principal user = this.connectHeaders.getUser();
            if (user != null) {
                stompHeaderAccessor.setUser(user);
            }
            StompCommand command = stompHeaderAccessor.getCommand();
            if (StompCommand.CONNECTED.equals(command)) {
                if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled()) {
                    StompBrokerRelayMessageHandler.this.logger.debug("Received " + stompHeaderAccessor.getShortLogMessage(StompBrokerRelayMessageHandler.EMPTY_PAYLOAD));
                }
                afterStompConnected(stompHeaderAccessor);
            } else if (StompBrokerRelayMessageHandler.this.logger.isErrorEnabled() && StompCommand.ERROR.equals(command)) {
                StompBrokerRelayMessageHandler.this.logger.error("Received " + stompHeaderAccessor.getShortLogMessage(message.getPayload()));
            } else if (StompBrokerRelayMessageHandler.this.logger.isTraceEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.trace("Received " + stompHeaderAccessor.getDetailedLogMessage(message.getPayload()));
            }
            handleInboundMessage(message);
        }

        protected void afterStompConnected(StompHeaderAccessor stompHeaderAccessor) {
            this.isStompConnected = true;
            StompBrokerRelayMessageHandler.this.stats.incrementConnectedCount();
            initHeartbeats(stompHeaderAccessor);
        }

        private void initHeartbeats(StompHeaderAccessor stompHeaderAccessor) {
            if (this.isRemoteClientSession) {
                return;
            }
            TcpConnection<byte[]> tcpConnection = this.tcpConnection;
            Assert.state(tcpConnection != null, "No TcpConnection available");
            long j = this.connectHeaders.getHeartbeat()[0];
            long j2 = this.connectHeaders.getHeartbeat()[1];
            long j3 = stompHeaderAccessor.getHeartbeat()[0];
            long j4 = stompHeaderAccessor.getHeartbeat()[1];
            if (j > 0 && j4 > 0) {
                tcpConnection.onWriteInactivity(() -> {
                    tcpConnection.send(StompBrokerRelayMessageHandler.HEARTBEAT_MESSAGE).addCallback(r1 -> {
                    }, th -> {
                        handleTcpConnectionFailure("Failed to forward heartbeat: " + th.getMessage(), th);
                    });
                }, Math.max(j, j4));
            }
            if (j2 <= 0 || j3 <= 0) {
                return;
            }
            long max = Math.max(j2, j3) * StompBrokerRelayMessageHandler.HEARTBEAT_MULTIPLIER;
            tcpConnection.onReadInactivity(() -> {
                handleTcpConnectionFailure("No messages received in " + max + " ms.", null);
            }, max);
        }

        @Override // org.springframework.messaging.tcp.TcpConnectionHandler
        public void handleFailure(Throwable th) {
            if (this.tcpConnection != null) {
                handleTcpConnectionFailure("Transport failure: " + th.getMessage(), th);
            } else if (StompBrokerRelayMessageHandler.this.logger.isErrorEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.error("Transport failure: " + th);
            }
        }

        @Override // org.springframework.messaging.tcp.TcpConnectionHandler
        public void afterConnectionClosed() {
            if (this.tcpConnection == null) {
                return;
            }
            try {
                if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled()) {
                    StompBrokerRelayMessageHandler.this.logger.debug("TCP connection to broker closed in session " + this.sessionId);
                }
                sendStompErrorFrameToClient("Connection to broker closed.");
            } finally {
                try {
                    this.tcpConnection = null;
                    clearConnection();
                } catch (Throwable th) {
                }
            }
        }

        public ListenableFuture<Void> forward(final Message<?> message, final StompHeaderAccessor stompHeaderAccessor) {
            TcpConnection<byte[]> tcpConnection = this.tcpConnection;
            if (!this.isStompConnected || tcpConnection == null) {
                if (!this.isRemoteClientSession) {
                    throw new IllegalStateException("Cannot forward messages " + (tcpConnection != null ? "before STOMP CONNECTED. " : "while inactive. ") + "Consider subscribing to receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean. Dropped " + stompHeaderAccessor.getShortLogMessage(message.getPayload()));
                }
                if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled()) {
                    StompBrokerRelayMessageHandler.this.logger.debug("TCP connection closed already, ignoring " + stompHeaderAccessor.getShortLogMessage(message.getPayload()));
                }
                return StompBrokerRelayMessageHandler.EMPTY_TASK;
            }
            Message<?> createMessage = (stompHeaderAccessor.isMutable() && stompHeaderAccessor.isModified()) ? MessageBuilder.createMessage(message.getPayload(), stompHeaderAccessor.getMessageHeaders()) : message;
            StompCommand command = stompHeaderAccessor.getCommand();
            if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled() && (StompCommand.SEND.equals(command) || StompCommand.SUBSCRIBE.equals(command) || StompCommand.UNSUBSCRIBE.equals(command) || StompCommand.DISCONNECT.equals(command))) {
                StompBrokerRelayMessageHandler.this.logger.debug("Forwarding " + stompHeaderAccessor.getShortLogMessage(message.getPayload()));
            } else if (StompBrokerRelayMessageHandler.this.logger.isTraceEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.trace("Forwarding " + stompHeaderAccessor.getDetailedLogMessage(message.getPayload()));
            }
            ListenableFuture<Void> send = tcpConnection.send(createMessage);
            send.addCallback(new ListenableFutureCallback<Void>() { // from class: org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler.1
                @Override // org.springframework.util.concurrent.SuccessCallback
                public void onSuccess(@Nullable Void r4) {
                    if (stompHeaderAccessor.getCommand() == StompCommand.DISCONNECT) {
                        StompConnectionHandler.this.afterDisconnectSent(stompHeaderAccessor);
                    }
                }

                @Override // org.springframework.util.concurrent.FailureCallback
                public void onFailure(Throwable th) {
                    if (StompConnectionHandler.this.tcpConnection != null) {
                        StompConnectionHandler.this.handleTcpConnectionFailure("failed to forward " + stompHeaderAccessor.getShortLogMessage(message.getPayload()), th);
                    } else if (StompBrokerRelayMessageHandler.this.logger.isErrorEnabled()) {
                        StompBrokerRelayMessageHandler.this.logger.error("Failed to forward " + stompHeaderAccessor.getShortLogMessage(message.getPayload()));
                    }
                }
            });
            return send;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void afterDisconnectSent(StompHeaderAccessor stompHeaderAccessor) {
            if (stompHeaderAccessor.getReceipt() == null) {
                try {
                    clearConnection();
                } catch (Throwable th) {
                    if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled()) {
                        StompBrokerRelayMessageHandler.this.logger.debug("Failure while clearing TCP connection state in session " + this.sessionId, th);
                    }
                }
            }
        }

        public void clearConnection() {
            if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.debug("Cleaning up connection state for session " + this.sessionId);
            }
            if (this.isRemoteClientSession) {
                StompBrokerRelayMessageHandler.this.connectionHandlers.remove(this.sessionId);
            }
            this.isStompConnected = false;
            TcpConnection<byte[]> tcpConnection = this.tcpConnection;
            this.tcpConnection = null;
            if (tcpConnection != null) {
                if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled()) {
                    StompBrokerRelayMessageHandler.this.logger.debug("Closing TCP connection in session " + this.sessionId);
                }
                tcpConnection.close();
            }
        }

        public String toString() {
            return "StompConnectionHandler[sessionId=" + this.sessionId + "]";
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-messaging-5.2.9.RELEASE.jar:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler$SystemStompConnectionHandler.class */
    private class SystemStompConnectionHandler extends StompConnectionHandler {
        public SystemStompConnectionHandler(StompHeaderAccessor stompHeaderAccessor) {
            super(StompBrokerRelayMessageHandler.SYSTEM_SESSION_ID, stompHeaderAccessor, false);
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler
        protected void afterStompConnected(StompHeaderAccessor stompHeaderAccessor) {
            if (StompBrokerRelayMessageHandler.this.logger.isInfoEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.info("\"System\" session connected.");
            }
            super.afterStompConnected(stompHeaderAccessor);
            StompBrokerRelayMessageHandler.this.publishBrokerAvailableEvent();
            sendSystemSubscriptions();
        }

        private void sendSystemSubscriptions() {
            int i = 0;
            for (String str : StompBrokerRelayMessageHandler.this.getSystemSubscriptions().keySet()) {
                StompHeaderAccessor create = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
                int i2 = i;
                i++;
                create.setSubscriptionId(String.valueOf(i2));
                create.setDestination(str);
                if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled()) {
                    StompBrokerRelayMessageHandler.this.logger.debug("Subscribing to " + str + " on \"system\" connection.");
                }
                TcpConnection<byte[]> tcpConnection = getTcpConnection();
                if (tcpConnection != null) {
                    tcpConnection.send(MessageBuilder.createMessage(StompBrokerRelayMessageHandler.EMPTY_PAYLOAD, create.getMessageHeaders())).addCallback(r1 -> {
                    }, th -> {
                        handleTcpConnectionFailure("Failed to subscribe in \"system\" session.", th);
                    });
                }
            }
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler
        protected void handleInboundMessage(Message<?> message) {
            StompHeaderAccessor stompHeaderAccessor = (StompHeaderAccessor) MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
            if (stompHeaderAccessor == null || !StompCommand.MESSAGE.equals(stompHeaderAccessor.getCommand())) {
                return;
            }
            String destination = stompHeaderAccessor.getDestination();
            if (destination == null) {
                if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled()) {
                    StompBrokerRelayMessageHandler.this.logger.debug("Got message on \"system\" connection, with no destination: " + stompHeaderAccessor.getDetailedLogMessage(message.getPayload()));
                }
            } else if (!StompBrokerRelayMessageHandler.this.getSystemSubscriptions().containsKey(destination)) {
                if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled()) {
                    StompBrokerRelayMessageHandler.this.logger.debug("Got message on \"system\" connection with no handler: " + stompHeaderAccessor.getDetailedLogMessage(message.getPayload()));
                }
            } else {
                try {
                    StompBrokerRelayMessageHandler.this.getSystemSubscriptions().get(destination).handleMessage(message);
                } catch (Throwable th) {
                    if (StompBrokerRelayMessageHandler.this.logger.isDebugEnabled()) {
                        StompBrokerRelayMessageHandler.this.logger.debug("Error while handling message on \"system\" connection.", th);
                    }
                }
            }
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler
        protected void handleTcpConnectionFailure(String str, @Nullable Throwable th) {
            super.handleTcpConnectionFailure(str, th);
            StompBrokerRelayMessageHandler.this.publishBrokerUnavailableEvent();
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler, org.springframework.messaging.tcp.TcpConnectionHandler
        public void afterConnectionClosed() {
            super.afterConnectionClosed();
            StompBrokerRelayMessageHandler.this.publishBrokerUnavailableEvent();
        }

        @Override // org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.StompConnectionHandler
        public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor stompHeaderAccessor) {
            try {
                ListenableFuture<Void> forward = super.forward(message, stompHeaderAccessor);
                if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
                    forward.get();
                }
                return forward;
            } catch (Throwable th) {
                throw new MessageDeliveryException(message, th);
            }
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-messaging-5.2.9.RELEASE.jar:org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler$VoidCallable.class */
    private static class VoidCallable implements Callable<Void> {
        private VoidCallable() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() {
            return null;
        }
    }

    public StompBrokerRelayMessageHandler(SubscribableChannel subscribableChannel, MessageChannel messageChannel, SubscribableChannel subscribableChannel2, Collection<String> collection) {
        super(subscribableChannel, messageChannel, subscribableChannel2, collection);
        this.relayHost = "127.0.0.1";
        this.relayPort = 61613;
        this.clientLogin = "guest";
        this.clientPasscode = "guest";
        this.systemLogin = "guest";
        this.systemPasscode = "guest";
        this.systemHeartbeatSendInterval = 10000L;
        this.systemHeartbeatReceiveInterval = 10000L;
        this.systemSubscriptions = new HashMap(4);
        this.stats = new DefaultStats();
        this.connectionHandlers = new ConcurrentHashMap();
    }

    public void setRelayHost(String str) {
        Assert.hasText(str, "relayHost must not be empty");
        this.relayHost = str;
    }

    public String getRelayHost() {
        return this.relayHost;
    }

    public void setRelayPort(int i) {
        this.relayPort = i;
    }

    public int getRelayPort() {
        return this.relayPort;
    }

    public void setClientLogin(String str) {
        Assert.hasText(str, "clientLogin must not be empty");
        this.clientLogin = str;
    }

    public String getClientLogin() {
        return this.clientLogin;
    }

    public void setClientPasscode(String str) {
        Assert.hasText(str, "clientPasscode must not be empty");
        this.clientPasscode = str;
    }

    public String getClientPasscode() {
        return this.clientPasscode;
    }

    public void setSystemLogin(String str) {
        Assert.hasText(str, "systemLogin must not be empty");
        this.systemLogin = str;
    }

    public String getSystemLogin() {
        return this.systemLogin;
    }

    public void setSystemPasscode(String str) {
        this.systemPasscode = str;
    }

    public String getSystemPasscode() {
        return this.systemPasscode;
    }

    public void setSystemHeartbeatSendInterval(long j) {
        this.systemHeartbeatSendInterval = j;
    }

    public long getSystemHeartbeatSendInterval() {
        return this.systemHeartbeatSendInterval;
    }

    public void setSystemHeartbeatReceiveInterval(long j) {
        this.systemHeartbeatReceiveInterval = j;
    }

    public long getSystemHeartbeatReceiveInterval() {
        return this.systemHeartbeatReceiveInterval;
    }

    public void setSystemSubscriptions(@Nullable Map<String, MessageHandler> map) {
        this.systemSubscriptions.clear();
        if (map != null) {
            this.systemSubscriptions.putAll(map);
        }
    }

    public Map<String, MessageHandler> getSystemSubscriptions() {
        return this.systemSubscriptions;
    }

    public void setVirtualHost(@Nullable String str) {
        this.virtualHost = str;
    }

    @Nullable
    public String getVirtualHost() {
        return this.virtualHost;
    }

    public void setTcpClient(@Nullable TcpOperations<byte[]> tcpOperations) {
        this.tcpClient = tcpOperations;
    }

    @Nullable
    public TcpOperations<byte[]> getTcpClient() {
        return this.tcpClient;
    }

    public void setHeaderInitializer(@Nullable MessageHeaderInitializer messageHeaderInitializer) {
        this.headerInitializer = messageHeaderInitializer;
    }

    @Nullable
    public MessageHeaderInitializer getHeaderInitializer() {
        return this.headerInitializer;
    }

    public String getStatsInfo() {
        return this.stats.toString();
    }

    public Stats getStats() {
        return this.stats;
    }

    public int getConnectionCount() {
        return this.connectionHandlers.size();
    }

    @Override // org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler
    protected void startInternal() {
        if (this.tcpClient == null) {
            this.tcpClient = initTcpClient();
        }
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Starting \"system\" session, " + toString());
        }
        StompHeaderAccessor create = StompHeaderAccessor.create(StompCommand.CONNECT);
        create.setAcceptVersion("1.1,1.2");
        create.setLogin(this.systemLogin);
        create.setPasscode(this.systemPasscode);
        create.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval);
        String virtualHost = getVirtualHost();
        if (virtualHost != null) {
            create.setHost(virtualHost);
        }
        create.setSessionId(SYSTEM_SESSION_ID);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Forwarding " + create.getShortLogMessage(EMPTY_PAYLOAD));
        }
        SystemStompConnectionHandler systemStompConnectionHandler = new SystemStompConnectionHandler(create);
        this.connectionHandlers.put(systemStompConnectionHandler.getSessionId(), systemStompConnectionHandler);
        this.stats.incrementConnectCount();
        this.tcpClient.connect(systemStompConnectionHandler, new FixedIntervalReconnectStrategy(5000L));
    }

    private ReactorNettyTcpClient<byte[]> initTcpClient() {
        StompDecoder stompDecoder = new StompDecoder();
        if (this.headerInitializer != null) {
            stompDecoder.setHeaderInitializer(this.headerInitializer);
        }
        ReactorNettyTcpClient<byte[]> reactorNettyTcpClient = new ReactorNettyTcpClient<>(this.relayHost, this.relayPort, new StompReactorNettyCodec(stompDecoder));
        reactorNettyTcpClient.setLogger(SimpLogging.forLog(reactorNettyTcpClient.getLogger()));
        return reactorNettyTcpClient;
    }

    @Override // org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler
    protected void stopInternal() {
        publishBrokerUnavailableEvent();
        if (this.tcpClient != null) {
            try {
                this.tcpClient.shutdown().get(5000L, TimeUnit.MILLISECONDS);
            } catch (Throwable th) {
                this.logger.error("Error in shutdown of TCP client", th);
            }
        }
    }

    @Override // org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler
    protected void handleMessageInternal(Message<?> message) {
        StompHeaderAccessor wrap;
        StompCommand command;
        String sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
        if (!isBrokerAvailable()) {
            if (sessionId == null || SYSTEM_SESSION_ID.equals(sessionId)) {
                throw new MessageDeliveryException("Message broker not active. Consider subscribing to receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean.");
            }
            StompConnectionHandler stompConnectionHandler = this.connectionHandlers.get(sessionId);
            if (stompConnectionHandler != null) {
                stompConnectionHandler.sendStompErrorFrameToClient("Broker not available.");
                stompConnectionHandler.clearConnection();
                return;
            }
            StompHeaderAccessor create = StompHeaderAccessor.create(StompCommand.ERROR);
            if (getHeaderInitializer() != null) {
                getHeaderInitializer().initHeaders(create);
            }
            create.setSessionId(sessionId);
            Principal user = SimpMessageHeaderAccessor.getUser(message.getHeaders());
            if (user != null) {
                create.setUser(user);
            }
            create.setMessage("Broker not available.");
            getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, create.getMessageHeaders()));
            return;
        }
        MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, (Class<MessageHeaderAccessor>) MessageHeaderAccessor.class);
        if (accessor == null) {
            throw new IllegalStateException("No header accessor (not using the SimpMessagingTemplate?): " + message);
        }
        if (accessor instanceof StompHeaderAccessor) {
            wrap = (StompHeaderAccessor) accessor;
            command = wrap.getCommand();
        } else {
            if (!(accessor instanceof SimpMessageHeaderAccessor)) {
                throw new IllegalStateException("Unexpected header accessor type " + accessor.getClass() + " in " + message);
            }
            wrap = StompHeaderAccessor.wrap(message);
            command = wrap.getCommand();
            if (command == null) {
                command = wrap.updateStompCommandAsClientMessage();
            }
        }
        if (sessionId == null) {
            if (!SimpMessageType.MESSAGE.equals(wrap.getMessageType())) {
                if (this.logger.isErrorEnabled()) {
                    this.logger.error("Only STOMP SEND supported from within the server side. Ignoring " + message);
                    return;
                }
                return;
            }
            sessionId = SYSTEM_SESSION_ID;
            wrap.setSessionId(sessionId);
        }
        String destination = wrap.getDestination();
        if (command == null || !command.requiresDestination() || checkDestinationPrefix(destination)) {
            if (StompCommand.CONNECT.equals(command) || StompCommand.STOMP.equals(command)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(wrap.getShortLogMessage(EMPTY_PAYLOAD));
                }
                StompHeaderAccessor wrap2 = wrap.isMutable() ? wrap : StompHeaderAccessor.wrap(message);
                wrap2.setLogin(this.clientLogin);
                wrap2.setPasscode(this.clientPasscode);
                if (getVirtualHost() != null) {
                    wrap2.setHost(getVirtualHost());
                }
                StompConnectionHandler stompConnectionHandler2 = new StompConnectionHandler(this, sessionId, wrap2);
                this.connectionHandlers.put(sessionId, stompConnectionHandler2);
                this.stats.incrementConnectCount();
                Assert.state(this.tcpClient != null, "No TCP client available");
                this.tcpClient.connect(stompConnectionHandler2);
                return;
            }
            if (!StompCommand.DISCONNECT.equals(command)) {
                StompConnectionHandler stompConnectionHandler3 = this.connectionHandlers.get(sessionId);
                if (stompConnectionHandler3 != null) {
                    stompConnectionHandler3.forward(message, wrap);
                    return;
                } else {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("No TCP connection for session " + sessionId + " in " + message);
                        return;
                    }
                    return;
                }
            }
            StompConnectionHandler stompConnectionHandler4 = this.connectionHandlers.get(sessionId);
            if (stompConnectionHandler4 != null) {
                this.stats.incrementDisconnectCount();
                stompConnectionHandler4.forward(message, wrap);
            } else if (this.logger.isDebugEnabled()) {
                this.logger.debug("Ignoring DISCONNECT in session " + sessionId + ". Connection already cleaned up.");
            }
        }
    }

    public String toString() {
        return "StompBrokerRelay[" + getTcpClientInfo() + "]";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getTcpClientInfo() {
        return this.tcpClient != null ? this.tcpClient.toString() : this.relayHost + ":" + this.relayPort;
    }

    static {
        EMPTY_TASK.run();
        HEARTBEAT_MESSAGE = MessageBuilder.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, StompHeaderAccessor.createForHeartbeat().getMessageHeaders());
    }
}
