package org.apache.storm.cluster;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.Config;
import org.apache.storm.callback.DefaultWatcherCallBack;
import org.apache.storm.callback.WatcherCallBack;
import org.apache.storm.callback.ZKStateChangedCallback;
import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework;
import org.apache.storm.shade.org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.storm.shade.org.apache.zookeeper.CreateMode;
import org.apache.storm.shade.org.apache.zookeeper.KeeperException;
import org.apache.storm.shade.org.apache.zookeeper.Watcher;
import org.apache.storm.shade.org.apache.zookeeper.data.ACL;
import org.apache.storm.utils.Utils;
import org.apache.storm.zookeeper.ClientZookeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/data/StormApp.jar:org/apache/storm/cluster/ZKStateStorage.class */
public class ZKStateStorage implements IStateStorage {
    private static Logger LOG = LoggerFactory.getLogger((Class<?>) ZKStateStorage.class);
    private ConcurrentHashMap<String, ZKStateChangedCallback> callbacks = new ConcurrentHashMap<>();
    private CuratorFramework zkWriter;
    private CuratorFramework zkReader;
    private AtomicBoolean active;
    private boolean isNimbus;
    private Map<String, Object> authConf;
    private Map<String, Object> conf;

    /* loaded from: input_file:BOOT-INF/classes/data/StormApp.jar:org/apache/storm/cluster/ZKStateStorage$ZkWatcherCallBack.class */
    private class ZkWatcherCallBack implements WatcherCallBack {
        private ZkWatcherCallBack() {
        }

        @Override // org.apache.storm.callback.WatcherCallBack
        public void execute(Watcher.Event.KeeperState keeperState, Watcher.Event.EventType eventType, String str) {
            if (ZKStateStorage.this.active.get()) {
                if (keeperState.equals(Watcher.Event.KeeperState.SyncConnected)) {
                    ZKStateStorage.LOG.debug("Received event {} : {} : {}", keeperState, eventType, str);
                } else {
                    ZKStateStorage.LOG.debug("Received event {} : {}: {} with disconnected Zookeeper.", keeperState, eventType, str);
                }
                if (eventType.equals(Watcher.Event.EventType.None)) {
                    return;
                }
                Iterator it = ZKStateStorage.this.callbacks.entrySet().iterator();
                while (it.hasNext()) {
                    ((ZKStateChangedCallback) ((Map.Entry) it.next()).getValue()).changed(eventType, str);
                }
            }
        }
    }

    public ZKStateStorage(Map<String, Object> map, Map<String, Object> map2, ClusterStateContext clusterStateContext) throws Exception {
        this.conf = map;
        this.authConf = map2;
        if (clusterStateContext.getDaemonType().equals(DaemonType.NIMBUS)) {
            this.isNimbus = true;
        }
        CuratorFramework mkZk = mkZk(clusterStateContext.getDaemonType());
        ClientZookeeper.mkdirs(mkZk, String.valueOf(map.get(Config.STORM_ZOOKEEPER_ROOT)), clusterStateContext.getDefaultZkAcls());
        mkZk.close();
        this.active = new AtomicBoolean(true);
        this.zkWriter = mkZk(new ZkWatcherCallBack(), clusterStateContext.getDaemonType());
        if (this.isNimbus) {
            this.zkReader = mkZk(new ZkWatcherCallBack(), clusterStateContext.getDaemonType());
        } else {
            this.zkReader = this.zkWriter;
        }
    }

    private CuratorFramework mkZk(DaemonType daemonType) {
        return ClientZookeeper.mkClient(this.conf, (List) this.conf.get(Config.STORM_ZOOKEEPER_SERVERS), this.conf.get(Config.STORM_ZOOKEEPER_PORT), "", new DefaultWatcherCallBack(), this.authConf, daemonType);
    }

    private CuratorFramework mkZk(WatcherCallBack watcherCallBack, DaemonType daemonType) throws NumberFormatException {
        return ClientZookeeper.mkClient(this.conf, (List) this.conf.get(Config.STORM_ZOOKEEPER_SERVERS), this.conf.get(Config.STORM_ZOOKEEPER_PORT), String.valueOf(this.conf.get(Config.STORM_ZOOKEEPER_ROOT)), watcherCallBack, this.authConf, daemonType);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void delete_node_blobstore(String str, String str2) {
        ClientZookeeper.deleteNodeBlobstore(this.zkWriter, str, str2);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public String register(ZKStateChangedCallback zKStateChangedCallback) {
        String uuid = UUID.randomUUID().toString();
        this.callbacks.put(uuid, zKStateChangedCallback);
        return uuid;
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void unregister(String str) {
        this.callbacks.remove(str);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public String create_sequential(String str, byte[] bArr, List<ACL> list) {
        return ClientZookeeper.createNode(this.zkWriter, str, bArr, CreateMode.EPHEMERAL_SEQUENTIAL, list);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void mkdirs(String str, List<ACL> list) {
        ClientZookeeper.mkdirs(this.zkWriter, str, list);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void delete_node(String str) {
        ClientZookeeper.deleteNode(this.zkWriter, str);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void set_ephemeral_node(String str, byte[] bArr, List<ACL> list) {
        ClientZookeeper.mkdirs(this.zkWriter, ClientZookeeper.parentPath(str), list);
        if (!ClientZookeeper.exists(this.zkWriter, str, false)) {
            ClientZookeeper.createNode(this.zkWriter, str, bArr, CreateMode.EPHEMERAL, list);
            return;
        }
        try {
            ClientZookeeper.setData(this.zkWriter, str, bArr);
        } catch (RuntimeException e) {
            if (!Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
                throw e;
            }
            ClientZookeeper.createNode(this.zkWriter, str, bArr, CreateMode.EPHEMERAL, list);
        }
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public Integer get_version(String str, boolean z) throws Exception {
        return ClientZookeeper.getVersion(this.zkReader, str, z);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public boolean node_exists(String str, boolean z) {
        return ClientZookeeper.existsNode(this.zkReader, str, z);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public List<String> get_children(String str, boolean z) {
        return ClientZookeeper.getChildren(this.zkReader, str, z);
    }

    @Override // org.apache.storm.cluster.IStateStorage, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.active.set(false);
        this.zkWriter.close();
        if (this.isNimbus) {
            this.zkReader.close();
        }
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void set_data(String str, byte[] bArr, List<ACL> list) {
        if (ClientZookeeper.exists(this.zkWriter, str, false)) {
            ClientZookeeper.setData(this.zkWriter, str, bArr);
            return;
        }
        ClientZookeeper.mkdirs(this.zkWriter, ClientZookeeper.parentPath(str), list);
        try {
            ClientZookeeper.createNode(this.zkWriter, str, bArr, CreateMode.PERSISTENT, list);
        } catch (RuntimeException e) {
            if (!Utils.exceptionCauseIsInstanceOf(KeeperException.NodeExistsException.class, e)) {
                throw e;
            }
            ClientZookeeper.setData(this.zkWriter, str, bArr);
        }
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public byte[] get_data(String str, boolean z) {
        return ClientZookeeper.getData(this.zkReader, str, z);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public VersionedData<byte[]> get_data_with_version(String str, boolean z) {
        return ClientZookeeper.getDataWithVersion(this.zkReader, str, z);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void set_worker_hb(String str, byte[] bArr, List<ACL> list) {
        set_data(str, bArr, list);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public byte[] get_worker_hb(String str, boolean z) {
        return ClientZookeeper.getData(this.zkReader, str, z);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public List<String> get_worker_hb_children(String str, boolean z) {
        return get_children(str, z);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void delete_worker_hb(String str) {
        delete_node(str);
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void add_listener(ConnectionStateListener connectionStateListener) {
        ClientZookeeper.addListener(this.zkReader, (curatorFramework, connectionState) -> {
            connectionStateListener.stateChanged(curatorFramework, connectionState);
        });
    }

    @Override // org.apache.storm.cluster.IStateStorage
    public void sync_path(String str) {
        ClientZookeeper.syncPath(this.zkWriter, str);
    }
}
