import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.ZooKeeper; /** * * @description Zookeeper Session演示類 * @author zhangchaoyang * @date 2014-6-22 */ public class SessionDemo { /** * zoo.cfg中的配置: * * <pre> * tickTime=2000 * minSessionTimeout=4000(至少是tickTime的2倍) * maxSessionTimeout=40000(最大是tickTime的20倍) * </pre> * * 如果客戶端建立連接時指定的TIMEOUT不在[minSessionTimeout,maxSessionTimeout]區(qū)間內(nèi), * 服務端會強制把它修改到該區(qū)間內(nèi) */ private static final int TIMEOUT = 40000; // Session // Timeout設為40秒,因為心跳周期為2秒,所以如果server向client連續(xù)發(fā)送20個心跳都收不到回應,則Session過期失效 private static ZooKeeper zkp = null; private static void connect() throws IOException { zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, null); } private static void createNode() throws KeeperException, InterruptedException { if (zkp != null) { zkp.create("/znodename", "znodedata".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } } private static String getData() throws KeeperException, InterruptedException { if (zkp != null) { Stat stat = zkp.exists("/znodename", false); return new String(zkp.getData("/znodename", false, stat)); } return null; } private static void disconnect() throws InterruptedException { if (zkp != null) { zkp.close(); } } /** * 休息,在此期間我們有三種選擇:<br> * <ol> * <li>永久性斷開網(wǎng)絡連接 * <li>斷開網(wǎng)絡連接一段時間timespan后再連上,其中timespan<{@code TIMEOUT} * <li>斷開網(wǎng)絡連接一段時間timespan后再連上,其中timespan>{@code TIMEOUT} * </ol> */ private static void sleepForNetworkDisturbances() { try { Thread.sleep(2 * TIMEOUT); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { try { connect(); } catch (IOException e) { System.err .println("Can't create zookeeper client, please check the network."); } System.out.println("Session build."); try { createNode(); } catch (Exception e) { System.err.println("Create znode failed."); } System.out.println("znode created."); sleepForNetworkDisturbances(); try { String data = getData(); if (data != null) { // 在“休息”期間做了第2件事情,Sesion沒有過期,EPHEMERAL節(jié)點依然存在 System.out.println("data=" + data); } } catch (KeeperException e) { e.printStackTrace(); // 在“休息”期間做了第1件事情 if (e instanceof ConnectionLossException) { System.err .println("Oops, network is disconnected. Retry getData()."); // 如果session沒有失效,而僅僅是網(wǎng)絡異常,則可以重新嘗試獲取數(shù)據(jù),可能在重試時網(wǎng)絡已經(jīng)正常了 try { Thread.sleep(1000); String data = getData(); if (data != null) { System.out.println("data=" + data); } else { System.out.println("can't get data."); } } catch (Exception e1) { e1.printStackTrace(); } } // 在“休息”期間做了第3件事情,則session會過期 else if (e instanceof SessionExpiredException) { System.err .println("Session Expired, client will reconnect and create znode again."); // 當發(fā)再Session Expired時,必須重新建立連接,即new一個ZooKeeper try { connect(); createNode(); String data = getData(); if (data != null) { System.out.println("data=" + data); } else { System.out.println("can't get data."); } } catch (Exception e1) { e1.printStackTrace(); } } } catch (InterruptedException e) { e.printStackTrace(); } try { disconnect(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Client disconnected."); } } Watcher可以注冊watcher的方法:getData、exists、getChildren。
可以觸發(fā)watcher的方法:create、delete、setData。連接斷開的情況下觸發(fā)的watcher會丟失。
一個Watcher實例是一個回調(diào)函數(shù),被回調(diào)一次后就被移除了。如果還需要關注數(shù)據(jù)的變化,需要再次注冊watcher。
New ZooKeeper時注冊的watcher叫default watcher,它不是一次性的,只對client的連接狀態(tài)變化作出反應。
什么樣的操作會產(chǎn)生什么類型的事件:
什么操作會觸發(fā)EventType.None? 事件類型與watcher的對應關系:
操作與watcher的對應關系:
import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; /** * * @description Zookeeper Watcher演示類 * @author zhangchaoyang * @date 2014-6-22 */ public class WatcherDemo { private static ZooKeeper zkp = null; private static final int TIMEOUT = 6000; private static Watcher getWatcher(final String msg) { return new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(msg + "上的監(jiān)聽被觸發(fā)\t事件類型" + event.getType() + "\t發(fā)生變化的節(jié)點" + event.getPath()); } }; } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { System.out.println("--------------1----------------"); zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, getWatcher("CONNECT")); Thread.sleep(1000); System.out.println("--------------2----------------"); zkp.create("/znodename", "znodedata".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zkp.create("/znodename/childnode", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); Stat stat = zkp.exists("/znodename", getWatcher("EXISTS")); zkp.getChildren("/", getWatcher("GETCHILDREN")); zkp.getData("/znodename", getWatcher("GETDATA"), stat); stat = zkp.exists("/znodename/childnode", getWatcher("EXISTS")); zkp.getChildren("/znodename", getWatcher("GETCHILDREN")); zkp.getData("/znodename/childnode", getWatcher("GETDATA"), stat); // zkp.close(); zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, getWatcher("CONNECT")); Thread.sleep(1000); System.out.println("--------------3----------------"); zkp.delete("/znodename/childnode", -1); zkp.delete("/znodename", -1); zkp.close(); } } import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.ConnectionLossException; import org.apache.zookeeper.KeeperException.SessionExpiredException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * * @description 自定義持久性的zookeeper watcher * @author zhangchaoyang * @date 2014-6-22 */ public class PersistWatcher { private static final int TIMEOUT = 6000; private static final String znode = "/globalconfnode"; private static String globalConfData = ""; private static Watcher getConnectWatcher() { return new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType().equals(EventType.None)) { System.out.println("連接狀態(tài)發(fā)生變化。"); } } }; } private static Watcher getExistsWatcher(final ZooKeeper zkp) { return new Watcher() { @Override public void process(WatchedEvent event) { try { if (event.getType().equals(EventType.NodeDataChanged) || event.getType().equals(EventType.NodeCreated)) { // 節(jié)點被創(chuàng)建或修改時更新緩存中的值 Stat stat = zkp.exists(znode, this);// 再次注冊監(jiān)聽 String data = new String( zkp.getData(znode, false, stat)); globalConfData = data; } else if (event.getType().equals(EventType.NodeDeleted)) { // 節(jié)點被刪除時報警 System.out .println("global configuration node have been deleted!"); try { // 再次注冊監(jiān)聽 zkp.exists(znode, this); } catch (KeeperException e) { if (e instanceof ConnectionLossException) { System.out.println("連接已斷開。"); } } } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }; } public static void main(String[] args) { try { ZooKeeper zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, getConnectWatcher()); zkp.exists(znode, getExistsWatcher(zkp)); zkp.create(znode, "config_value".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); Thread.sleep(500);// 修改節(jié)點后必須sleep,等待watcher回調(diào)完成 System.out.println(globalConfData); for (int i = 0; i < 4; i++) { zkp.setData(znode, ("config_value" + i).getBytes(), -1); Thread.sleep(500);// 修改節(jié)點后必須sleep,等待watcher回調(diào)完成 System.out.println(globalConfData); } zkp.close();// EPHEMERAL節(jié)點會被刪除,但Session并不會馬上失效(只不過ConnectionLoss了),所以還是會觸發(fā)watcher try { // 此時Session已失效 zkp.exists(znode, false); } catch (KeeperException e) { if (e instanceof SessionExpiredException) System.out.println("Session已失效。"); } } catch (IOException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } ACL每個節(jié)點有單獨的ACL,子節(jié)點不能繼承父節(jié)點的ACL
ACL有三個維度:schema,id,permision
Schema有7種:
world: 它下面只有一個id, 叫anyone
auth: 它不需要id
digest: 它對應的id為username:BASE64(SHA1(password))
ip: 它對應的id為客戶機的IP地址,設置的時候可以設置一個ip段,比如ip:192.168.1.0/16, 表示匹配前16個bit的IP段
super: 在這種scheme情況下,對應的id擁有超級權限
sasl: sasl的對應的id,是一個通過了kerberos認證的用戶id
Permission有5種:
CREATE(c),DELETE(d),READ(r),WRITE(w),ADMIN(a)
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoAuthException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; /** * * @description Zookeeper ACL演示類 * @author zhangchaoyang * @date 2014-6-22 */ public class AclDemo { private static final int TIMEOUT = 6000; public static void main(String[] args) throws IOException, KeeperException, InterruptedException { ZooKeeper zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, null); String schema = "digest";// schema類型有:world,auth,digest,ip,super String auth = "username:password"; zkp.addAuthInfo(schema, auth.getBytes()); List<ACL> acls = new ArrayList<ACL>(); for (ACL id : Ids.CREATOR_ALL_ACL) { acls.add(id); } zkp.create("/znodename", "znodedata".getBytes(), acls, CreateMode.PERSISTENT); ZooKeeper zoo = null; try { zoo = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, null); System.out.println("采用不合法的認證方式:"); String badAuthentication = "username:wrongpass"; zoo.addAuthInfo(schema, badAuthentication.getBytes()); zoo.getData("/znodename", null, null); } catch (KeeperException e) { if (e instanceof NoAuthException) { System.out.println("認證失?。? + e.getMessage()); } System.out.println("采用合法的認證方式:"); zoo.addAuthInfo(schema, auth.getBytes()); String data = new String(zoo.getData("/znodename", null, null)); if (data != null) { System.out.println("認證成功:data=" + data); } } finally { if (zoo != null && zoo.getState().isAlive()) { zoo.close(); } } zkp.delete("/znodename", -1); zkp.close(); } } 開源工具menagerie基于ZooKeeper實現(xiàn)了分布式的:
ReentrantZkLock
ReentrantZkReadWriteLock
Semaphore
CyclicBarrier
CountDownLatch
BlockingQueue
HashMap
ListSet
import java.io.IOException; import java.util.Date; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; /** * * @description 使用ZooKeeper實現(xiàn)分布式鎖 * @author zhangchaoyang * @date 2014-6-22 */ public class ZooKeeperLock { private static Logger logger = Logger.getLogger(ZooKeeperLock.class); private static ZooKeeper zk = null; private static final int TIMEOUT = 1000 * 60; private static String connStr = null; public static void setServerPath(String path) { connStr = path + "/app/bqas/lock"; logger.info("ZooKeeperLock zookeeper node:" + connStr); } public static boolean getLock(String lockname) throws KeeperException, InterruptedException, IOException { connect(connStr, TIMEOUT); if (lockname.contains("-")) { throw new RuntimeException("鎖名稱不能包含'-'"); } boolean lock = false; String path = zk.create("/" + lockname + "-", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); int selfIndex = getIndex(path); List<String> children = zk.getChildren("/", false); int min = getMinIndex(children); if (min == selfIndex) { lock = true; } return lock; } public static void releaseLock(String lockname) throws InterruptedException, KeeperException { disconnect(); } private static int getIndex(String str) { int index = -1; int pos = str.lastIndexOf("-"); if (pos >= 0) { try { index = Integer.parseInt(str.substring(pos + 1)); } catch (NumberFormatException e) { e.printStackTrace(); } } return index; } private static int getMinIndex(List<String> list) { int min = Integer.MAX_VALUE; for (String ele : list) { int index = getIndex(ele); if (index < 0) { throw new RuntimeException("SEQUENTIAL節(jié)點名中不包含數(shù)字:" + ele); } if (index < min) { min = index; } } return min; } private static void waitUntilConnected(CountDownLatch connectedLatch) { if (States.CONNECTING == zk.getState()) { try { connectedLatch.await(); } catch (InterruptedException e) { throw new IllegalStateException(e); } } } public static boolean connect(String hostPath, int sessionTimeout) { if (zk == null || zk.getState() == States.CLOSED) { try { CountDownLatch connectedLatch = new CountDownLatch(1); Watcher watcher = new ConnectedWatcher(connectedLatch); zk = new ZooKeeper(hostPath, sessionTimeout, watcher); waitUntilConnected(connectedLatch); } catch (Exception e) { logger.error("Connect to Zookeeper failed:", e); return false; } } return true; } public static boolean disconnect() { if (zk != null) { if (States.CLOSED != zk.getState()) { try { zk.close(); } catch (InterruptedException e) { logger.error("Disconnect from Zookeeper failed:", e); return false; } } } return true; } static class ConnectedWatcher implements Watcher { private CountDownLatch connectedLatch; ConnectedWatcher(CountDownLatch connectedLatch) { this.connectedLatch = connectedLatch; } @Override public void process(WatchedEvent event) { // 事件狀態(tài)為SyncConnected時,說明與服務端的連接已建立好 if (event.getState() == KeeperState.SyncConnected) { connectedLatch.countDown(); } } } public static void main(String[] args) { String lockname = "writeHitCount2DBlock"; System.out.println("begin to run."); ZooKeeperLock.setServerPath("192.168.119.96:2181"); try { boolean havelock = ZooKeeperLock.getLock(lockname); if (havelock) { Date date = new Date(); System.out .println("I got the lock,and I will write DB!" + date); Thread.sleep(1000);// 休息一段時間之后再釋放鎖 } System.out.println("Job done, I will release the lock."); ZooKeeperLock.releaseLock(lockname); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
|
|
來自: IT技術武館 > 《Hadoop及生態(tài)圈相關》