Zookeeper
1、Zookeeper 的概述
- Zookeeper 是一個開源的分布式協(xié)調服務框架 ,主要用來解決分布式集群中應用系統(tǒng)的一致性問題和數(shù)據(jù)管理問題

2、Zookeeper的特點
- Zookeeper 本質上是一個分布式文件系統(tǒng), 適合存放小文件,也可以理解為一個數(shù)據(jù)庫

- 在上圖左側, Zookeeper 中存儲的其實是一個又一個 Znode, Znode 是 Zookeeper 中的節(jié)點
- Znode 是有路徑的, 例如
/data/host1 , /data/host2 , 這個路徑也可以理解為是 Znode 的 Name
- Znode 也可以攜帶數(shù)據(jù), 例如說某個 Znode 的路徑是
/data/host1 , 其值是一個字符串 "192.168.0.1"
- 正因為 Znode 的特性, 所以 Zookeeper 可以對外提供出一個類似于文件系統(tǒng)的試圖, 可以通過操作文件系統(tǒng)的方式操作 Zookeeper
- 獲取 Znode 攜帶的數(shù)據(jù)
- 刪除 Znode
3、Zookeeper的架構
Zookeeper集群是一個基于主從框架的高可用集群

每個服務器承擔如下三種角色中的一種
-
Leader一個Zookeeper集群同一時間只會有一個實際工作的Leader,它會發(fā)起并維護與各Follwer及Observer間的心跳。所有的寫操作必須要通過Leader完成再有Leader將寫操作廣播給其他服務器。
-
Follower一個Zookeeper集群可能同時存在多個Follower,它會響應Leader的心跳。
follower可直接處理并返回客戶端的讀請求,同時會將寫請求轉發(fā)給Leader處理,并且負責在Leader處理寫請求時對請求進行投票。
-
Observer角色與Follower類似,但是無投票權。

4、Zookeeper的應用場景
4.1、數(shù)據(jù)發(fā)布/訂閱
數(shù)據(jù)發(fā)布/訂閱系統(tǒng),需要發(fā)布者將數(shù)據(jù)發(fā)布到Zookeeper的節(jié)點上,供訂閱者進行數(shù)據(jù)訂閱,進而達到動態(tài)獲取數(shù)據(jù)的目的,實現(xiàn)配置信息的集中式管理和數(shù)據(jù)的動態(tài)更新。
? 發(fā)布/訂閱一般有兩種設計模式:推模式和拉模式,服務端主動將數(shù)據(jù)更新發(fā)送給所有訂閱的客戶端稱為推模式;客戶端主動請求獲取最新數(shù)據(jù)稱為拉模式.
Zookeeper采用了推拉相結合的模式,客戶端向服務端注冊自己需要關注的節(jié)點,一旦該節(jié)點數(shù)據(jù)發(fā)生變更,那么服務端就會向相應的客戶端推送Watcher事件通知,客戶端接收到此通知后,主動到服務端獲取最新的數(shù)據(jù)。
4.2、命名服務
命名服務是分步實現(xiàn)系統(tǒng)中較為常見的一類場景,分布式系統(tǒng)中,被命名的實體通??梢允羌褐械臋C器、提供的服務地址或遠程對象等,通過命名服務,客戶端可以根據(jù)指定名字來獲取資源的實體,在分布式環(huán)境中,上層應用僅僅需要一個全局唯一的名字。Zookeeper可以實現(xiàn)一套分布式全局唯一ID的分配機制。

通過調用Zookeeper節(jié)點創(chuàng)建的API接口就可以創(chuàng)建一個順序節(jié)點,并且在API返回值中會返回這個節(jié)點的完整名字,利用此特性,可以生成全局ID,其步驟如下
1. 客戶端根據(jù)任務類型,在指定類型的任務下通過調用接口創(chuàng)建一個順序節(jié)點,如"job-"。
2. 創(chuàng)建完成后,會返回一個完整的節(jié)點名,如"job-00000001"。
3. 客戶端拼接type類型和返回值后,就可以作為全局唯一ID了,如"type2-job-00000001"。
4.3、分布式協(xié)調/通知
Zookeeper中特有的Watcher注冊于異步通知機制,能夠很好地實現(xiàn)分布式環(huán)境下不同機器,甚至不同系統(tǒng)之間的協(xié)調與通知,從而實現(xiàn)對數(shù)據(jù)變更的實時處理。通常的做法是不同的客戶端都對Zookeeper上的同一個數(shù)據(jù)節(jié)點進行Watcher注冊,監(jiān)聽數(shù)據(jù)節(jié)點的變化(包括節(jié)點本身和子節(jié)點),若數(shù)據(jù)節(jié)點發(fā)生變化,那么所有訂閱的客戶端都能夠接收到相應的Watcher通知,并作出相應處理。
在絕大多數(shù)分布式系統(tǒng)中,系統(tǒng)機器間的通信無外乎心跳檢測、工作進度匯報和系統(tǒng)調度。
?、?心跳檢測,不同機器間需要檢測到彼此是否在正常運行,可以使用Zookeeper實現(xiàn)機器間的心跳檢測,基于其臨時節(jié)點特性(臨時節(jié)點的生存周期是客戶端會話,客戶端若當即后,其臨時節(jié)點自然不再存在),可以讓不同機器都在Zookeeper的一個指定節(jié)點下創(chuàng)建臨時子節(jié)點,不同的機器之間可以根據(jù)這個臨時子節(jié)點來判斷對應的客戶端機器是否存活。通過Zookeeper可以大大減少系統(tǒng)耦合。
?、?工作進度匯報,通常任務被分發(fā)到不同機器后,需要實時地將自己的任務執(zhí)行進度匯報給分發(fā)系統(tǒng),可以在Zookeeper上選擇一個節(jié)點,每個任務客戶端都在這個節(jié)點下面創(chuàng)建臨時子節(jié)點,這樣不僅可以判斷機器是否存活,同時各個機器可以將自己的任務執(zhí)行進度寫到該臨時節(jié)點中去,以便中心系統(tǒng)能夠實時獲取任務的執(zhí)行進度。
③ 系統(tǒng)調度,Zookeeper能夠實現(xiàn)如下系統(tǒng)調度模式:分布式系統(tǒng)由控制臺和一些客戶端系統(tǒng)兩部分構成,控制臺的職責就是需要將一些指令信息發(fā)送給所有的客戶端,以控制他們進行相應的業(yè)務邏輯,后臺管理人員在控制臺上做一些操作,實際上就是修改Zookeeper上某些節(jié)點的數(shù)據(jù),Zookeeper可以把數(shù)據(jù)變更以時間通知的形式發(fā)送給訂閱客戶端。
4.4、分布式鎖
分布式鎖用于控制分布式系統(tǒng)之間同步訪問共享資源的一種方式,可以保證不同系統(tǒng)訪問一個或一組資源時的一致性,主要分為排它鎖和共享鎖。
排它鎖又稱為寫鎖或獨占鎖,若事務T1對數(shù)據(jù)對象O1加上了排它鎖,那么在整個加鎖期間,只允許事務T1對O1進行讀取和更新操作,其他任何事務都不能再對這個數(shù)據(jù)對象進行任何類型的操作,直到T1釋放了排它鎖。
?、?獲取鎖,在需要獲取排它鎖時,所有客戶端通過調用接口,在/exclusive_lock節(jié)點下創(chuàng)建臨時子節(jié)點/exclusive_lock/lock。Zookeeper可以保證只有一個客戶端能夠創(chuàng)建成功,沒有成功的客戶端需要注冊/exclusive_lock節(jié)點監(jiān)聽。
?、?釋放鎖,當獲取鎖的客戶端宕機或者正常完成業(yè)務邏輯都會導致臨時節(jié)點的刪除,此時,所有在/exclusive_lock節(jié)點上注冊監(jiān)聽的客戶端都會收到通知,可以重新發(fā)起分布式鎖獲取。
共享鎖又稱為讀鎖,若事務T1對數(shù)據(jù)對象O1加上共享鎖,那么當前事務只能對O1進行讀取操作,其他事務也只能對這個數(shù)據(jù)對象加共享鎖,直到該數(shù)據(jù)對象上的所有共享鎖都被釋放。在需要獲取共享鎖時,所有客戶端都會到/shared_lock下面創(chuàng)建一個臨時順序節(jié)點

4.5、分布式隊列
有一些時候,多個團隊需要共同完成一個任務,比如,A團隊將Hadoop集群計算的結果交給B團隊繼續(xù)計算,B完成了自己任務再交給C團隊繼續(xù)做。這就有點像業(yè)務系統(tǒng)的工作流一樣,一環(huán)一環(huán)地傳下 去.
分布式環(huán)境下,我們同樣需要一個類似單進程隊列的組件,用來實現(xiàn)跨進程、跨主機、跨網(wǎng)絡的數(shù)據(jù)共享和數(shù)據(jù)傳遞,這就是我們的分布式隊列。
5、Zookeeper的選舉機制
Leader選舉是保證分布式數(shù)據(jù)一致性的關鍵所在。當Zookeeper集群中的一臺服務器出現(xiàn)以下兩種情況之一時,需要進入Leader選舉。
5.1、服務器啟動時期的Leader選舉
若進行Leader選舉,則至少需要兩臺機器,這里選取3臺機器組成的服務器集群為例。在集群初始化階段,當有一臺服務器Server1啟動時,其單獨無法進行和完成Leader選舉,當?shù)诙_服務器Server2啟動時,此時兩臺機器可以相互通信,每臺機器都試圖找到Leader,于是進入Leader選舉過程。選舉過程如下
(1) 每個Server發(fā)出一個投票。由于是初始情況,Server1和Server2都會將自己作為Leader服務器來進行投票,每次投票會包含所推舉的服務器的myid和ZXID,使用(myid, ZXID)來表示,此時Server1的投票為(1, 0),Server2的投票為(2, 0),然后各自將這個投票發(fā)給集群中其他機器。
(2) 接受來自各個服務器的投票。集群的每個服務器收到投票后,首先判斷該投票的有效性,如檢查是否是本輪投票、是否來自LOOKING狀態(tài)的服務器。
(3) 處理投票。針對每一個投票,服務器都需要將別人的投票和自己的投票進行PK,PK規(guī)則如下
· 優(yōu)先檢查ZXID。ZXID比較大的服務器優(yōu)先作為Leader。
· 如果ZXID相同,那么就比較myid。myid較大的服務器作為Leader服務器。
對于Server1而言,它的投票是(1, 0),接收Server2的投票為(2, 0),首先會比較兩者的ZXID,均為0,再比較myid,此時Server2的myid最大,于是更新自己的投票為(2, 0),然后重新投票,對于Server2而言,其無須更新自己的投票,只是再次向集群中所有機器發(fā)出上一次投票信息即可。
(4) 統(tǒng)計投票。每次投票后,服務器都會統(tǒng)計投票信息,判斷是否已經(jīng)有過半機器接受到相同的投票信息,對于Server1、Server2而言,都統(tǒng)計出集群中已經(jīng)有兩臺機器接受了(2, 0)的投票信息,此時便認為已經(jīng)選出了Leader。
(5) 改變服務器狀態(tài)。一旦確定了Leader,每個服務器就會更新自己的狀態(tài),如果是Follower,那么就變更為FOLLOWING,如果是Leader,就變更為LEADING。
5.2、服務器運行時期的Leader選舉
在Zookeeper運行期間,Leader與非Leader服務器各司其職,即便當有非Leader服務器宕機或新加入,此時也不會影響Leader,但是一旦Leader服務器掛了,那么整個集群將暫停對外服務,進入新一輪Leader選舉,其過程和啟動時期的Leader選舉過程基本一致過程相同。
6、Zookeeper安裝
集群規(guī)劃
服務器IP |
主機名 |
myid的值 |
192.168.174.10 |
hadoop01 |
1 |
192.168.174.11 |
hadoop02 |
2 |
192.168.174.12 |
hadoop03 |
3 |
第一步:下載zookeeeper的壓縮包,下載網(wǎng)址如下
Zookeeper下載地址
我們在這個網(wǎng)址下載我們使用的zk版本為3.4.9
下載完成之后,上傳到我們的linux的/root路徑下準備進行安裝
第二步:解壓
解壓zookeeper的壓縮包到/myapp/Zookeeper路徑下去,然后準備進行安裝
tar -zxf /root/zookeeper-3.4.9.tar.gz -C /myapp/Zookeeper/
第三步:修改配置文件
第一臺機器修改配置文件
cd /myapp/Zookeeper/zookeeper-3.4.9/conf/
cp zoo_sample.cfg zoo.cfg
mkdir -p /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/
vim zoo.cfg
dataDir=/myapp/Zookeeper/zookeeper-3.4.9/zkdatas/
# 保留多少個快照
autopurge.snapRetainCount=3
# 日志多少小時清理一次
autopurge.purgeInterval=1
# 集群中服務器地址
server.1=hadoop01:2888:3888
server.2=hadoop02:2888:3888
server.3=hadoop03:2888:3888
第四步:添加myid配置
在第一臺機器的
/export/servers/zookeeper-3.4.9/zkdatas /這個路徑下創(chuàng)建一個文件,文件名為myid ,文件內容為1
echo 1 > /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/myid
第五步:安裝包分發(fā)并修改myid的值
安裝包分發(fā)到其他機器的前提是:其他主機也要有相應的目錄。
第一臺機器上面執(zhí)行以下兩個命令
scp -r /myapp/Zookeeper/zookeeper-3.4.9/ node02:/myapp/Zookeeper/
scp -r /myapp/Zookeeper/zookeeper-3.4.9/ node03:/myapp/Zookeeper/
第二臺機器上修改myid的值為2
echo 2 > /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/myid

第三臺機器上修改myid的值為3
echo 3 > /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/myid

第六步:三臺機器啟動zookeeper服務
三臺機器啟動zookeeper服務
這個命令三臺機器都要執(zhí)行
/myapp/Zookeeper/zookeeper-3.4.9/bin/zkServer.sh start
查看啟動狀態(tài)
/myapp/Zookeeper//zookeeper-3.4.9/bin/zkServer.sh status

7、 shell腳本安裝Zookeeper
#!/bin/bash
if [ -d /myapp/Zookeeper/zookeeper-3.4.9 ]; then
echo "Zookeeper已經(jīng)安裝"
else
echo "請輸入一共多少主機"
read rootsum
echo "請分別輸入主機名"
for ((i=1; i<=${rootsum}; i++)) {
echo "請輸入第${i}臺主機名:"
read rootname[i]
ssh ${rootname[i]} "mkdir -p /myapp/Zookeeper"
}
echo "請輸入Zookeeper壓縮包路徑"
read newfile
echo "正在為你安裝Zookeeper..........."
tar -zxf $newfile -C /myapp/Zookeeper/
cp /myapp/Zookeeper/zookeeper-3.4.9/conf/zoo_sample.cfg /myapp/Zookeeper/zookeeper-3.4.9/conf/zoo.cfg
sed -i -e '12d' /myapp/Zookeeper/zookeeper-3.4.9/conf/zoo.cfg
mkdir -p /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/
arr1=("dataDir=/myapp/Zookeeper/zookeeper-3.4.9/zkdatas" "# 保留多少個快照" "autopurge.snapRetainCount=3" "# 日志多少小時清理一次" "autopurge.purgeInterval=1" "# 集群中服務器地址" "server.1=hadoop01:2888:3888" "server.2=hadoop02:2888:3888" "server.3=hadoop03:2888:3888")
#echo ${#arr1[*]}
for (( i=0; i<=${#arr1[*]}; i++)) {
echo ${arr1[$i]} >> /myapp/Zookeeper/zookeeper-3.4.9/conf/zoo.cfg
}
echo 1 >> /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/myid
echo "==============================================================================="
echo "==============================================================================="
echo "==============================================================================="
for ((e=1; e<=${rootsum}; e++)){
if test $[ e + 1 ] -gt $rootsum; then
ssh ${rootname[e]} sed -i -e s/1/${e}/ /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/myid
else
scp -r /myapp/Zookeeper/zookeeper-3.4.9 ${rootname[e + 1]}:/myapp/Zookeeper
ssh ${rootname[e]} sed -i -e s/1/${e}/ /myapp/Zookeeper/zookeeper-3.4.9/zkdatas/myid
fi
}
nl /myapp/Zookeeper/zookeeper-3.4.9/conf/zoo.cfg
for i in ${rootname[*]};
do
ssh $i "echo '#Zookeeper' >> /etc/profile"
ssh $i "echo 'export ZOOKEEPER_HOME=/myapp/Zookeeper/zookeeper-3.4.9' >> /etc/profile"
ssh $i "echo 'export PATH=\$PATH:\$ZOOKEEPER_HOME/bin' >> /etc/profile"
done
echo "==================================================================================="
echo "Zookeeper 安裝已完成"
fi
8、Zookeeper的數(shù)據(jù)模型
- Zookeeper的數(shù)據(jù)模型,在結構上和標準文件系統(tǒng)的非常相似,擁有一個層次的命名空間,都是采用樹形層次結構。

- Zookeeper樹中的每個節(jié)點被稱為一個Zonde。和文件系統(tǒng)的目錄樹一樣,Zookeeper樹中的每一個節(jié)點可以擁有子節(jié)點。
但也有不同之處:
- Znode兼具文件和目錄兩種特點。既像文件一樣維護著數(shù)據(jù)、元信息、ACL、時間戳等數(shù)據(jù)結構,又像目錄一樣可以作為路徑標識的一部分,并可以具有子Znode。用戶對Znode具有增、刪、改、查等操作(權限允許的情況下)。
- Zonde存儲數(shù)據(jù)大小有限制。Zookeeper雖然可以關聯(lián)一些數(shù)據(jù),但并沒有被設計為常規(guī)的數(shù)據(jù)庫或者大數(shù)據(jù)存儲,相反的是,他用來管理調度數(shù)據(jù),比如分布式引用中的配置文件信息、狀態(tài)信息、匯集位置等等。這些數(shù)據(jù)的共同特性就是它們都是很小的數(shù)據(jù),通常以KB為大小單位。Zookeeper的服務器和客戶端都被設計為嚴格檢查并限制每個Zonde的數(shù)據(jù)大小至多1M,常規(guī)使用中應該遠小于此值。、
- Zonde 通過路徑引用,如同Unix中的文件路徑。路徑必須是絕對的,因此他們必須由斜杠字符來開頭。除此之外,他們必須是唯一的,也就是說每一路徑只有一個表示,因此這些路徑不能改變。在Zookeeper中,路徑由Unicode字符串組成,并且有一些限制。字符串"/zookeeper"用來保存管理信息,比如關鍵配額信息。
- 每個Zonde有3部分組成:
- stat:此為狀態(tài)信息,描述該Zonde的版本,權限等信息
- data:與該Zonde關聯(lián)的數(shù)據(jù)
- children:該Zonde下的子節(jié)點
9、Zonde節(jié)點類型
9.1、 Zonde有兩種,分別為臨時節(jié)點和永久節(jié)點。節(jié)點的類型在創(chuàng)建時即被確定并且不能改變。
- 臨時節(jié)點:該節(jié)點的生命周期依賴于他們的會話。一旦會話結束,臨時節(jié)點會被自動刪除,當然也可以手動刪除。臨時節(jié)點不允許擁有子節(jié)點。
- 永久節(jié)點:該節(jié)點的生命周期不依賴于會話,并且只有在客戶端顯示執(zhí)行刪除操作的時候,他們才能被刪除。
9.2、 Zonde還有一個序列化的特性,如果創(chuàng)建的時候指定的話,該Znode的名字后面會自動追加一個不斷增加的序列號。序列號對于此節(jié)點的父節(jié)點來說是唯一的,這樣便會記錄每個子節(jié)點創(chuàng)建的先后順序。它的格式為"%10d"(10位數(shù)字,沒有數(shù)值的數(shù)位用0補充,例如"0000000001")
9.3、 這樣便會存在四種類型的Zonde節(jié)點,分別對應:
- PERSISTENT:永久節(jié)點
- EPHEMERAL:臨時節(jié)點
- PERSISTENT_SEQUENTIAL:永久節(jié)點、序列化
- EPHEMERAL_SEQUENTIAL:臨時節(jié)點、序列化
10、Zookeeper的Shell 客戶端操作
10.1、登錄Zookeeper客戶端
bin/zkCli.sh -server hadoop01:2181
10.2、Zookeeper客戶端操作命令
命令 |
說明 |
參數(shù) |
create [-s] [-e] path data acl |
創(chuàng)建Znode |
-s 指定是順序節(jié)點 -e 指定是臨時節(jié)點 |
ls path [watch] |
列出Path下所有子Znode |
|
get path [watch] |
獲取Path對應的Znode的數(shù)據(jù)和屬性 |
|
ls2 path [watch] |
查看Path下所有子Znode以及子Znode的屬性 |
|
set path data [version] |
更新節(jié)點 |
version 數(shù)據(jù)版本 |
delete path [version] |
刪除節(jié)點, 如果要刪除的節(jié)點有子Znode則無法刪除 |
version 數(shù)據(jù)版本 |
rmr path |
刪除節(jié)點, 如果有子Znode則遞歸刪除 |
|
setquota -n|-b val path |
修改Znode配額 |
-n 設置子節(jié)點最大個數(shù) -b 設置節(jié)點數(shù)據(jù)最大長度 |
history |
列出歷史記錄 |
|
create /app1 hello
create -s /app3 world
create -s /app3 world
create -s -e /tempnode2 aaa
get /app1
set /app1 xxx
delete /app1 刪除的節(jié)點不能有子節(jié)點
rmr /app1 遞歸刪除
10.3、Znode 的特點
- 文件系統(tǒng)的核心是
Znode
- 如果想要選取一個
Znode , 需要使用路徑的形式, 例如 /test1/test11
- Znode 本身并不是文件, 也不是文件夾, Znode 因為具有一個類似于 Name 的路徑, 所以可以從邏輯上實現(xiàn)一個樹狀文件系統(tǒng)
- ZK 保證 Znode 訪問的原子性, 不會出現(xiàn)部分 ZK 節(jié)點更新成功, 部分 ZK 節(jié)點更新失敗的問題
Znode 中數(shù)據(jù)是有大小限制的, 最大只能為1M
Znode 是由三個部分構成
stat : 狀態(tài), Znode的權限信息, 版本等
data : 數(shù)據(jù), 每個Znode都是可以攜帶數(shù)據(jù)的, 無論是否有子節(jié)點
children : 子節(jié)點列表
10.4、Znode 的類型
- 每個
Znode 有兩大特性, 可以構成四種不同類型的Znode
- 持久性
持久 客戶端斷開時, 不會刪除持有的Znode
臨時 客戶端斷開時, 刪除所有持有的Znode, 臨時Znode不允許有子Znode
- 順序性
有序 創(chuàng)建的Znode有先后順序, 順序就是在后面追加一個序列號, 序列號是由父節(jié)點管理的自增
無序 創(chuàng)建的Znode沒有先后順序
10.5、Znode的屬性
每個Znode都包含一系列的屬性,通過命令get,可以獲得節(jié)點的屬性。

dataVersion :數(shù)據(jù)版本, 每次當Znode 中的數(shù)據(jù)發(fā)生變化的時候, dataVersion 都會增加1(即使設置的是相同的數(shù)據(jù)),可有效避免了數(shù)據(jù)更新時出現(xiàn)的先后順序問題。
cversion : 節(jié)點版本, 每次當Znode 的節(jié)點發(fā)生變化的時候, cversion 都會增加1。
aclVersion :ACL(Access Control List) 的版本號, 當Znode 的權限信息發(fā)生變化的時候aclVersion會自增
cZxid :zonde創(chuàng)建的事務ID
mZxid :Zonde被修改的事務id,即每次對znode的修改都會更新mZxid。
- 對于zk來說,每次的變化都會產(chǎn)生一個唯一的事務id,zxid(Zookeeper Transaction Id)。通過zxid,可以確定更新操作的先后順序。例如,如果zxid1小于zxid,說明zxid1操作先于zxid2發(fā)生,zxid對于整個zk都是唯一的。
ctime :節(jié)點創(chuàng)建的時間戳。
mtime :節(jié)點最新 一次更新發(fā)生的時間戳。
ephemeralOwner :如果Znode 為臨時節(jié)點, ephemeralOwner 表示與該節(jié)點關聯(lián)的SessionId ,如果不是臨時節(jié)點ephemeralOwner值為0。
10.6、Zookeeper會話
- 在ZK中所有的客戶端和服務器的交互都是在某一個
Session 中的, 客戶端和服務器創(chuàng)建一個連接的時候同時也會創(chuàng)建一個Session
Session 會在不同的狀態(tài)之間進行切換: CONNECTING , CONNECTED , RECONNECTING , RECONNECTED , CLOSED
- ZK中的會話兩端也需要進行心跳檢測, 服務端會檢測如果超過超時時間沒收到客戶端的心跳, 則會關閉連接, 釋放資源, 關閉會話
10.7、Watcher 通知機制
- 通知類似于數(shù)據(jù)庫中的觸發(fā)器, 對某個Znode設置
Watcher , 當Znode發(fā)生變化的時候, WatchManager 會調用對應的Watcher
- 當Znode發(fā)生刪除, 修改, 創(chuàng)建, 子節(jié)點修改的時候, 對應的
Watcher 會得到通知
Watcher 的特點
- 一次性觸發(fā) 一個
Watcher 只會被觸發(fā)一次, 如果需要繼續(xù)監(jiān)聽, 則需要再次添加 Watcher
- 事件封裝:
Watcher 得到的事件是被封裝過的, 包括三個內容 keeperState, eventType, path
KeeperState |
EventType |
觸發(fā)條件 |
說明 |
|
None |
連接成功 |
|
SyncConnected |
NodeCreated |
Znode被創(chuàng)建 |
此時處于連接狀態(tài) |
SyncConnected |
NodeDeleted |
Znode被刪除 |
此時處于連接狀態(tài) |
SyncConnected |
NodeDataChanged |
Znode數(shù)據(jù)被改變 |
此時處于連接狀態(tài) |
SyncConnected |
NodeChildChanged |
Znode的子Znode數(shù)據(jù)被改變 |
此時處于連接狀態(tài) |
Disconnected |
None |
客戶端和服務端斷開連接 |
此時客戶端和服務器處于斷開連接狀態(tài) |
Expired |
None |
會話超時 |
會收到一個SessionExpiredException |
AuthFailed |
None |
權限驗證失敗 |
會收到一個AuthFailedException |
11、Zookeeper的JavaAPI操作
這里操作Zookeeper的JavaAPI使用的是一套zookeeper客戶端框架Curator,解決了很多Zookeeper客戶端非常底層的細節(jié)開發(fā)工作。
Curator包含了幾個包:
Maven依賴(使用curator的版本:2.12.0,對應Zookeeper的版本為:3.4.x,如果跨版本會有兼容性問題,很有可能導致節(jié)點操作失?。?;
11.1、創(chuàng)建Java工程,導入Jar包
創(chuàng)建maven java工程,導入jar包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven./POM/4.0.0"
xmlns:xsi="http://www./2001/XMLSchema-instance"
xsi:schemaLocation="http://maven./POM/4.0.0 http://maven./xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>Zookeeper_JavaAPI</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.google.collect</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<!-- same version with the zookeeper-->
<version>3.4.9</version>
</dependency>
</dependencies>
</project>
11.2、節(jié)點操作
11.2.1、創(chuàng)建永久節(jié)點
package cn.itcast.zookeeper_api;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
public class ZookeeperAPITest {
@Test
public void createZnode() throws Exception {
//1. 定制一個重試策略
/*
param1:重試的間隔時間
param2:重試的最大次數(shù)
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2. 獲取一個客戶端對象
/*
param1:要連接的Zookeeper服務器列表
param2:會話的超時時間
param3:鏈接超時時間
param4:重試策略
*/
String connectionStr="192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
//3。開啟客戶端
client.start();
//4. 創(chuàng)建永久節(jié)點
/*
PERSISTENT:永久節(jié)點
PERSISTENT_SEQUENTIAL:永久序列化節(jié)點
EPHEMERAL:臨時節(jié)點
EPHEMERAL_SEQUENTIAL:臨時序列化節(jié)點
*/
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/world01","world".getBytes());
//5. 關閉客戶端
client.close();
}
}
11.2.2、創(chuàng)建臨時節(jié)點
package cn.itcast.zookeeper_api;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
public class ZookeeperAPITest {
@Test
public void createZnode() throws Exception {
//1. 定制一個重試策略
/*
param1:重試的間隔時間
param2:重試的最大次數(shù)
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2. 獲取一個客戶端對象
/*
param1:要連接的Zookeeper服務器列表
param2:會話的超時時間
param3:鏈接超時時間
param4:重試策略
*/
String connectionStr="192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
//3。開啟客戶端
client.start();
//4. 創(chuàng)建臨時節(jié)點
/*
PERSISTENT:永久節(jié)點
PERSISTENT_SEQUENTIAL:永久序列化節(jié)點
EPHEMERAL:臨時節(jié)點
EPHEMERAL_SEQUENTIAL:臨時序列化節(jié)點
*/
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/world01","world".getBytes());
Thread.sleep(5000);
//5. 關閉客戶端
client.close();
}
}
11.2.3、修改節(jié)點數(shù)據(jù)
package cn.itcast.zookeeper_api;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
public class ZookeeperAPITest {
@Test
public void SetZnodeData() throws Exception {
//1. 定制一個重試策略
/*
param1:重試的間隔時間
param2:重試的最大次數(shù)
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2. 獲取一個客戶端對象
/*
param1:要連接的Zookeeper服務器列表
param2:會話的超時時間
param3:鏈接超時時間
param4:重試策略
*/
String connectionStr="192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr,8000,8000,retryPolicy);
//3。開啟客戶端
client.start();
//4. 修改節(jié)點數(shù)據(jù)
client.setData().forPath("/hell04","hello7".getBytes());
//5. 關閉客戶端
client.close();
}
}
11.2.4、獲取節(jié)點數(shù)據(jù)
package cn.itcast.zookeeper_api;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
public class ZookeeperAPITest {
@Test
public void getZnodeData() throws Exception {
//1. 定制一個重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2. 獲取客戶端
String conectionStr="192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181,";
CuratorFramework client = CuratorFrameworkFactory.newClient(conectionStr,8000,8000,retryPolicy);
//3. 啟動客戶端
client.start();
//4. 獲取節(jié)點數(shù)據(jù)
byte[] bytes = client.getData().forPath("/app1");
System.out.println(new String(bytes));
//5. 關閉客戶端
client.close();
}
}
11.2.5、節(jié)點watch機制
package cn.itcast.zookeeper_api;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
public class ZookeeperAPITest {
@Test
public void WatchZnode() throws Exception {
//1. 定制一個重試策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
//2. 獲取客戶端
String conectionStr="192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181,";
CuratorFramework client = CuratorFrameworkFactory.newClient(conectionStr,8000,8000,retryPolicy);
//3. 啟動客戶端
client.start();
//4. 創(chuàng)建一個TreeCache對象,指定要監(jiān)控的節(jié)點路徑
final TreeCache treeCache = new TreeCache(client, "/hell04");
//5. 自定義一個監(jiān)聽器
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
ChildData data = treeCacheEvent.getData();
if (data != null) {
switch (treeCacheEvent.getType()){
case NODE_ADDED:
System.out.println("監(jiān)控到有新增節(jié)點!");
break;
case NODE_REMOVED:
System.out.println("監(jiān)控到有節(jié)點被移除!");
break;
case NODE_UPDATED:
System.out.println("監(jiān)控到節(jié)點被更新!");
break;
default:
break;
}
}
}
});
//6. 開始監(jiān)聽
treeCache.start();
//7. 設置休眠時間
Thread.sleep(100000000);
//8. 關閉客戶端
client.close();
}
}
|