zookeeper是hadoop下面的一個(gè)子項(xiàng)目, 用來(lái)進(jìn)行分布式系統(tǒng)之間的相互協(xié)調(diào)。
在zookeeper源碼包的recipe目錄下有一個(gè)互斥鎖lock的實(shí)現(xiàn)范例,筆者對(duì)其簡(jiǎn)要包裝,以便看起來(lái)更為明了:
- package org.apache.zookeeper.recipes.lock;
-
- import java.io.IOException;
-
- import org.apache.zookeeper.KeeperException;
- import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooKeeper;
-
- public class DistributedLock {
-
- private WriteLock lock;
- private String lockPath = "/lock";
- private ZooKeeper zooKeeper ;
-
- public DistributedLock(ZooKeeper zooKeeper){
- this.zooKeeper = zooKeeper;
- }
-
-
-
-
-
-
-
-
-
-
- public boolean lock(){
- lock = new WriteLock(zooKeeper, lockPath, null);
- try {
- while (true) {
- if (lock.lock()) {
- return true;
- }
-
- }
- } catch (KeeperException e) {
- e.printStackTrace();
- return false;
- } catch (InterruptedException e) {
- e.printStackTrace();
- return false;
- }
-
- }
-
-
-
-
-
-
-
-
-
- public void unlock(){
- lock.unlock();
- }
-
- public static void main(String args[]){
-
- try {
- Watcher wh=new Watcher(){
- public void process(org.apache.zookeeper.WatchedEvent event)
- {
- System.out.println(event.toString());
- }
- };
-
- ZooKeeper zooKeeper = new ZooKeeper("localhost:2181", 20000, wh);
- final DistributedLock distributedLock = new DistributedLock(zooKeeper);
-
- for(int i = 0; i < 100 ; i ++){
- Thread thread = new Thread(new Runnable(){
-
- @Override
- public void run() {
- if(distributedLock.lock()){
- System.out.println("獲得鎖---------------");
-
- }
-
- distributedLock.unlock();
-
- }
-
- });
- }
-
- Thread.sleep(2000*1000);
-
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- }
- }
WirteLock實(shí)現(xiàn):
- package org.apache.zookeeper.recipes.lock;
-
- import org.apache.log4j.Logger;
- import org.apache.zookeeper.KeeperException;
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
- import org.apache.zookeeper.ZooKeeper;
- import org.apache.zookeeper.data.ACL;
- import org.apache.zookeeper.data.Stat;
-
- import java.util.List;
- import java.util.SortedSet;
- import java.util.TreeSet;
-
-
-
- public class WriteLock extends ProtocolSupport {
- private static final Logger LOG = Logger.getLogger(WriteLock.class);
-
- private final String dir;
- private String id;
- private ZNodeName idName;
- private String ownerId;
- private String lastChildId;
- private byte[] data = {0x12, 0x34};
- private LockListener callback;
- private LockZooKeeperOperation zop;
-
-
-
-
-
-
-
-
- public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl) {
- super(zookeeper);
- this.dir = dir;
- if (acl != null) {
- setAcl(acl);
- }
- this.zop = new LockZooKeeperOperation();
- }
-
-
-
-
-
-
-
-
- public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl,
- LockListener callback) {
- this(zookeeper, dir, acl);
- this.callback = callback;
- }
-
-
-
-
-
- public LockListener getLockListener() {
- return this.callback;
- }
-
-
-
-
-
- public void setLockListener(LockListener callback) {
- this.callback = callback;
- }
-
-
-
-
-
-
-
-
-
- public synchronized void unlock() throws RuntimeException {
-
- if (!isClosed() && id != null) {
-
-
-
- try {
-
- ZooKeeperOperation zopdel = new ZooKeeperOperation() {
- public boolean execute() throws KeeperException,
- InterruptedException {
- zookeeper.delete(id, -1);
- return Boolean.TRUE;
- }
- };
- zopdel.execute();
- } catch (InterruptedException e) {
- LOG.warn("Caught: " + e, e);
-
- Thread.currentThread().interrupt();
- } catch (KeeperException.NoNodeException e) {
-
- } catch (KeeperException e) {
- LOG.warn("Caught: " + e, e);
- throw (RuntimeException) new RuntimeException(e.getMessage()).
- initCause(e);
- }
- finally {
- if (callback != null) {
- callback.lockReleased();
- }
- id = null;
- }
- }
- }
-
-
-
-
-
-
- private class LockWatcher implements Watcher {
- public void process(WatchedEvent event) {
-
- LOG.debug("Watcher fired on path: " + event.getPath() + " state: " +
- event.getState() + " type " + event.getType());
- try {
- lock();
- } catch (Exception e) {
- LOG.warn("Failed to acquire lock: " + e, e);
- }
- }
- }
-
-
-
-
-
- private class LockZooKeeperOperation implements ZooKeeperOperation {
-
-
-
-
-
-
-
-
-
- private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)
- throws KeeperException, InterruptedException {
- List<String> names = zookeeper.getChildren(dir, false);
- for (String name : names) {
- if (name.startsWith(prefix)) {
- id = name;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found id created last time: " + id);
- }
- break;
- }
- }
- if (id == null) {
- id = zookeeper.create(dir + "/" + prefix, data,
- getAcl(), EPHEMERAL_SEQUENTIAL);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created id: " + id);
- }
- }
-
- }
-
-
-
-
-
-
- public boolean execute() throws KeeperException, InterruptedException {
- do {
- if (id == null) {
- long sessionId = zookeeper.getSessionId();
- String prefix = "x-" + sessionId + "-";
-
-
- findPrefixInChildren(prefix, zookeeper, dir);
- idName = new ZNodeName(id);
- }
- if (id != null) {
- List<String> names = zookeeper.getChildren(dir, false);
- if (names.isEmpty()) {
- LOG.warn("No children in: " + dir + " when we've just " +
- "created one! Lets recreate it...");
-
- id = null;
- } else {
-
- SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();
- for (String name : names) {
- sortedNames.add(new ZNodeName(dir + "/" + name));
- }
- ownerId = sortedNames.first().getName();
- SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);
- if (!lessThanMe.isEmpty()) {
- ZNodeName lastChildName = lessThanMe.last();
- lastChildId = lastChildName.getName();
- if (LOG.isDebugEnabled()) {
- LOG.debug("watching less than me node: " + lastChildId);
- }
- Stat stat = zookeeper.exists(lastChildId, new LockWatcher());
- if (stat != null) {
- return Boolean.FALSE;
- } else {
- LOG.warn("Could not find the" +
- " stats for less than me: " + lastChildName.getName());
- }
- } else {
- if (isOwner()) {
- if (callback != null) {
- callback.lockAcquired();
- }
- return Boolean.TRUE;
- }
- }
- }
- }
- }
- while (id == null);
- return Boolean.FALSE;
- }
- };
-
-
-
-
-
-
- public synchronized boolean lock() throws KeeperException, InterruptedException {
- if (isClosed()) {
- return false;
- }
- ensurePathExists(dir);
-
- return (Boolean) retryOperation(zop);
- }
-
-
-
-
-
- public String getDir() {
- return dir;
- }
-
-
-
-
-
- public boolean isOwner() {
- return id != null && ownerId != null && id.equals(ownerId);
- }
-
-
-
-
-
- public String getId() {
- return this.id;
- }
- }