package org.apache.storm.cluster;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.storm.callback.ZKStateChangedCallback;
import org.apache.storm.generated.ClusterWorkerHeartbeat;
import org.apache.storm.generated.HBExecutionException;
import org.apache.storm.generated.HBMessage;
import org.apache.storm.generated.HBMessageData;
import org.apache.storm.generated.HBPulse;
import org.apache.storm.generated.HBServerMessageType;
import org.apache.storm.pacemaker.PacemakerClientPool;
import org.apache.storm.pacemaker.PacemakerConnectionException;
import org.apache.storm.shade.org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.WrappedHBExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/data/StormApp.jar:org/apache/storm/cluster/PaceMakerStateStorage.class */
public class PaceMakerStateStorage implements IStateStorage {
    private static final int maxRetries = 10;
    private static Logger LOG = LoggerFactory.getLogger((Class<?>) PaceMakerStateStorage.class);
    private PacemakerClientPool pacemakerClientPool;
    private IStateStorage stateStorage;

    public PaceMakerStateStorage(PacemakerClientPool pacemakerClientPool, IStateStorage iStateStorage) throws Exception {
        this.pacemakerClientPool = pacemakerClientPool;
        this.stateStorage = iStateStorage;
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public String register(ZKStateChangedCallback zKStateChangedCallback) {
        return this.stateStorage.register(zKStateChangedCallback);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void unregister(String str) {
        this.stateStorage.unregister(str);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public String create_sequential(String str, byte[] bArr, List<ACL> list) {
        return this.stateStorage.create_sequential(str, bArr, list);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void mkdirs(String str, List<ACL> list) {
        this.stateStorage.mkdirs(str, list);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void delete_node(String str) {
        this.stateStorage.delete_node(str);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void set_ephemeral_node(String str, byte[] bArr, List<ACL> list) {
        this.stateStorage.set_ephemeral_node(str, bArr, list);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public Integer get_version(String str, boolean z) throws Exception {
        return this.stateStorage.get_version(str, z);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public boolean node_exists(String str, boolean z) {
        return this.stateStorage.node_exists(str, z);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public List<String> get_children(String str, boolean z) {
        return this.stateStorage.get_children(str, z);
    }

    @Override // org.apache.storm.cluster.IStateStorage, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.stateStorage.close();
        this.pacemakerClientPool.close();
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void set_data(String str, byte[] bArr, List<ACL> list) {
        this.stateStorage.set_data(str, bArr, list);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public byte[] get_data(String str, boolean z) {
        return this.stateStorage.get_data(str, z);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public VersionedData<byte[]> get_data_with_version(String str, boolean z) {
        return this.stateStorage.get_data_with_version(str, z);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void set_worker_hb(String str, byte[] bArr, List<ACL> list) {
        int i = 10;
        while (true) {
            try {
                HBPulse hBPulse = new HBPulse();
                hBPulse.set_id(str);
                hBPulse.set_details(bArr);
                if (this.pacemakerClientPool.send(new HBMessage(HBServerMessageType.SEND_PULSE, HBMessageData.pulse(hBPulse))).get_type() != HBServerMessageType.SEND_PULSE_RESPONSE) {
                    throw new WrappedHBExecutionException("Invalid Response Type");
                }
                LOG.debug("Successful set_worker_hb");
                return;
            } catch (InterruptedException e) {
                LOG.debug("set_worker_hb got interrupted: {}", (Throwable) e);
                throw new RuntimeException(e);
            } catch (HBExecutionException | PacemakerConnectionException e2) {
                if (i <= 0) {
                    throw new RuntimeException(e2);
                }
                i--;
                LOG.error("{} Failed to set_worker_hb. Will make {} more attempts.", e2.getMessage(), Integer.valueOf(i));
            }
        }
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public byte[] get_worker_hb(String str, boolean z) {
        int i = 10;
        while (true) {
            try {
                byte[] bArr = null;
                int i2 = 0;
                boolean z2 = false;
                for (HBMessage hBMessage : this.pacemakerClientPool.sendAll(new HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(str)))) {
                    if (hBMessage.get_type() != HBServerMessageType.GET_PULSE_RESPONSE) {
                        LOG.error("get_worker_hb: Invalid Response Type");
                    } else {
                        z2 = true;
                        byte[] bArr2 = hBMessage.get_data().get_pulse().get_details();
                        if (bArr2 != null) {
                            ClusterWorkerHeartbeat clusterWorkerHeartbeat = (ClusterWorkerHeartbeat) Utils.deserialize(bArr2, ClusterWorkerHeartbeat.class);
                            if (clusterWorkerHeartbeat != null && clusterWorkerHeartbeat.get_time_secs() > i2) {
                                i2 = clusterWorkerHeartbeat.get_time_secs();
                                bArr = bArr2;
                            }
                        }
                    }
                }
                if (!z2) {
                    throw new WrappedHBExecutionException("Failed to get a response.");
                    break;
                }
                return bArr;
            } catch (InterruptedException e) {
                LOG.debug("get_worker_hb got interrupted: {}", (Throwable) e);
                throw new RuntimeException(e);
            } catch (HBExecutionException | PacemakerConnectionException e2) {
                if (i <= 0) {
                    throw new RuntimeException(e2);
                }
                i--;
                LOG.error("{} Failed to get_worker_hb. Will make {} more attempts.", e2.getMessage(), Integer.valueOf(i));
            }
        }
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public List<String> get_worker_hb_children(String str, boolean z) {
        int i = 10;
        while (true) {
            try {
                HashSet hashSet = new HashSet();
                for (HBMessage hBMessage : this.pacemakerClientPool.sendAll(new HBMessage(HBServerMessageType.GET_ALL_NODES_FOR_PATH, HBMessageData.path(str)))) {
                    if (hBMessage.get_type() != HBServerMessageType.GET_ALL_NODES_FOR_PATH_RESPONSE) {
                        LOG.error("get_worker_hb_children: Invalid Response Type");
                    } else if (hBMessage.get_data().get_nodes().get_pulseIds() != null) {
                        hashSet.addAll(hBMessage.get_data().get_nodes().get_pulseIds());
                    }
                }
                LOG.debug("Successful get_worker_hb_children");
                return new ArrayList(hashSet);
            } catch (InterruptedException e) {
                LOG.debug("get_worker_hb_children got interrupted: {}", (Throwable) e);
                throw new RuntimeException(e);
            } catch (PacemakerConnectionException e2) {
                if (i <= 0) {
                    throw new RuntimeException(e2);
                }
                i--;
                LOG.error("{} Failed to get_worker_hb_children. Will make {} more attempts.", e2.getMessage(), Integer.valueOf(i));
            }
        }
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void delete_worker_hb(String str) {
        int i = 10;
        while (true) {
            boolean z = false;
            try {
                try {
                    boolean z2 = true;
                    for (HBMessage hBMessage : this.pacemakerClientPool.sendAll(new HBMessage(HBServerMessageType.DELETE_PATH, HBMessageData.path(str)))) {
                        if (hBMessage.get_type() != HBServerMessageType.DELETE_PATH_RESPONSE) {
                            LOG.debug("Failed to delete heartbeat {}", hBMessage);
                            z2 = false;
                        } else {
                            z = true;
                        }
                    }
                    if (!z2) {
                        throw new WrappedHBExecutionException("Failed to delete from all pacemakers.");
                        break;
                    }
                    return;
                } catch (HBExecutionException | PacemakerConnectionException e) {
                    if (i <= 0) {
                        if (z) {
                            LOG.warn("Unable to delete_worker_hb from every pacemaker.");
                            return;
                        } else {
                            LOG.error("Unable to delete_worker_hb from any pacemaker.");
                            throw new RuntimeException(e);
                        }
                    }
                    i--;
                    LOG.debug("{} Failed to delete_worker_hb. Will make {} more attempts.", e.getMessage(), Integer.valueOf(i));
                }
            } catch (InterruptedException e2) {
                LOG.debug("delete_worker_hb got interrupted: {}", (Throwable) e2);
                throw new RuntimeException(e2);
            }
        }
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void add_listener(ConnectionStateListener connectionStateListener) {
        this.stateStorage.add_listener(connectionStateListener);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void sync_path(String str) {
        this.stateStorage.sync_path(str);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void delete_node_blobstore(String str, String str2) {
        this.stateStorage.delete_node_blobstore(str, str2);
    }
}
