日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

HBase權(quán)威指南(中文版)——第三章(第二部分)

 閑來看看 2013-08-14

第三章 客戶端API: 基礎(chǔ)篇(第三部分)

批量操作

前面介紹了如何逐條和批量地添加、讀取、刪除數(shù)據(jù)。在這一節(jié),我們將介紹如何一次執(zhí)行多種不同類型的操作處理多行記錄。

事實(shí)上,一些使用List結(jié)構(gòu)的批量操作,如delete(List<Delete>
deletes)
或者get(List(Get> gets)等,底層都是依靠batch實(shí)現(xiàn)的。封裝這些接口可以更好的提高友好性。

在下面介紹的批量操作中,您將會(huì)看到一個(gè)新的類型叫Row,前面將到的Put、Get、Delete類都是從Row類的子類。

void
batch(List<Row> actions, Object[] results)

throws
IOException, InterruptedException

Object[] batch(List<Row>
actions)

throws
IOException, InterruptedException

由于Row的存在,以及它和Get、Put、Delete的繼承關(guān)系,決定了可以在一個(gè)列表中混合多種不同類型的操作。示例3-16給出了這種使用的例子。

值得注意地是,您不應(yīng)該將一個(gè)rowPutDelete操作混合在一起。因?yàn)?/SPAN>List中多個(gè)操作在服務(wù)器端執(zhí)行的順序是無法保證的,這樣會(huì)得到一個(gè)無法預(yù)料到的結(jié)果。

示例3-16 批量操作

private final
static byte[] ROW1 = Bytes.toBytes(“row1″);

private final
static byte[] ROW2 = Bytes.toBytes(“row2″);

private final
static byte[] COLFAM1 = Bytes.toBytes(“colfam1″);

private final
static byte[] COLFAM2 = Bytes.toBytes(“colfam2″);

private final
static byte[] QUAL1 = Bytes.toBytes(“qual1″);

private final
static byte[] QUAL2 = Bytes.toBytes(“qual2″);

List<Row>
batch = new ArrayList<Row>();

Put put = new
Put(ROW2);

put.add(COLFAM2,
QUAL1, Bytes.toBytes(“val5″));

batch.add(put);

Get get1 = new
Get(ROW1);

get1.addColumn(COLFAM1,
QUAL1);

batch.add(get1);

Delete delete =
new Delete(ROW1);

delete.deleteColumns(COLFAM1,
QUAL2);

batch.add(delete);

Get get2 = new
Get(ROW2);

get2.addFamily(Bytes.toBytes(“BOGUS”));

batch.add(get2);

Object[] results =
new Object[batch.size()];

try {

table.batch(batch,
results);

} catch (Exception
e) {

System.err.println(“Error:
” + e);

}

for (int i = 0; i
< results.length; i++) {

System.out.println(“Result["
+ i + "]: ” + results[i]);

}

首先定義了一組指向row、column familycolumn
qualifier
的常量,方便重用。然后創(chuàng)建一個(gè)Row實(shí)例的列表。向其中分別加入一個(gè)Put、GetDelete實(shí)例,再添加一個(gè)指向不存在列的Put操作。創(chuàng)建一個(gè)Result數(shù)組,大小和batch的大小相同。然后執(zhí)行batch方法,返回的結(jié)果放在result中,最后打印出Result數(shù)組的值。

整個(gè)程序的輸出如下:

Before batch
call…

KV:
row1/colfam1:qual1/1/Put/vlen=4, Value: val1

KV:
row1/colfam1:qual2/2/Put/vlen=4, Value: val2

KV: row1/colfam1:qual3/3/Put/vlen=4,
Value: val3

Result[0]:
keyvalues=NONE

Result[1]:
keyvalues={row1/colfam1:qual1/1/Put/vlen=4}

Result[2]:
keyvalues=NONE

Result[3]:
org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException:

org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException:

Column family
BOGUS does not exist in …

After batch
call…

KV:
row1/colfam1:qual1/1/Put/vlen=4, Value: val1

KV:
row1/colfam1:qual3/3/Put/vlen=4, Value: val3

KV:
row2/colfam2:qual1/1308836506340/Put/vlen=4, Value: val5

Error:
org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException:

Failed 1 action:
NoSuchColumnFamilyException: 1 time,

servers with
issues: 10.0.0.43:60020,

在前面的例子中,事先插入了一些測(cè)試數(shù)據(jù)并打印出來,讓您方便地看出示例代碼做的事情。最后發(fā)現(xiàn),想刪除的記錄成功被刪除,想插入的新記錄也成功被插入了。

Get操作的結(jié)果,需要從Result數(shù)組中找到。Result數(shù)組的大小和請(qǐng)求操作的總數(shù)相等。第一個(gè)ResultPut操作的結(jié)果,為一個(gè)空的KeyValue結(jié)構(gòu);第二個(gè)Result的值是Get操作的結(jié)果,可以打印出它的值。第三個(gè)值為Delete操作的值,為一個(gè)空的KeyValue結(jié)構(gòu)。表3-7給出了ResultRow類型之前的對(duì)應(yīng)關(guān)系。

3-7 batch()調(diào)用可能返回的結(jié)果

Result

Description

null

The operation has failed to
communicate with the remote server.

Empty Result

Returned for successful Put
and Delete operations.

Result

Returned for successful Get
operations, but may also be empty when there was no matching row or column.

Throwable

In case the servers return an
exception for the operation it is returned to the client as-is. You can use
it to check what went wrong and maybe handle the problem automatically in
your code.

觀察示例3-16執(zhí)行的結(jié)果,您可以發(fā)現(xiàn)空的Result對(duì)象打印出了keyvalues=NONE。 Get操作打印出了對(duì)應(yīng)取到的值。對(duì)于錯(cuò)誤列上的Put操作得到了一個(gè)異常。

值得注意地是,當(dāng)您使用batch()方法時(shí),里面的Put實(shí)例并不會(huì)使用客戶端緩存。batch()調(diào)用是同步的,同時(shí)直接向服務(wù)器發(fā)送請(qǐng)求。沒有時(shí)延和其它處理過程,這和批量put操作是不同的。因此,您需要選擇合適于您的。

batch調(diào)用有兩種形式,一種將Result數(shù)據(jù)放在參數(shù)中,另一種放在返回值中。

void
batch(List<Row> actions, Object[] results)

throws
IOException, InterruptedException

Object[]
batch(List<Row> actions)

throws
IOException, InterruptedException

它們之間比較大的區(qū)別在于,當(dāng)拋出異常時(shí),第一方法的result中被填充了部分結(jié)果。而第二個(gè)方法在異常時(shí),將會(huì)返回null

兩種方法都支持get、put、delete操作。如果執(zhí)行其中的任何一個(gè)請(qǐng)求時(shí)出錯(cuò),一個(gè)客戶端異常將會(huì)被拋出,報(bào)告出錯(cuò)情況??蛻舳司彺娌粫?huì)被使用到。void batch(actions,
results)
會(huì)得到所有成功操作的結(jié)果和其中失敗的服務(wù)端異常。Object[] batch( actions )只返回客戶端異常,沒有結(jié)果被填充到返回值中。

所有的操作都將在check之前執(zhí)行:當(dāng)您發(fā)現(xiàn)一個(gè)action出現(xiàn)了錯(cuò)誤,但其它的操作也將被執(zhí)行。在最差的場(chǎng)景下,所有的action都返回失敗。

另一方面,batch操作在意瞬時(shí)失敗,比如NotServingRegionException(比如一個(gè)Region Server被下線了),它會(huì)進(jìn)行重試幾次。配置項(xiàng)hbase.client.retries.number會(huì)設(shè)定重試的次數(shù)(默認(rèn)的重試次數(shù)為10)。

Row Locks(行鎖)

       更新操作,比如put()、delete()、checkAndPut()等等對(duì)于一個(gè)row來說是互斥執(zhí)行的,從而保證一個(gè)低層面的原子性。Region Server提供了一個(gè)row lock行級(jí)鎖來保證只有擁有鎖的客戶端才能夠?qū)υ撔羞M(jìn)行修改。在實(shí)際中,客戶端并不提供這種顯示的鎖,而是依賴于一種將每個(gè)操作獨(dú)立開的機(jī)制。

       您應(yīng)該最大限度地避免使用row lock,很容易出現(xiàn)RDBMS中類似的死鎖現(xiàn)象。當(dāng)Lock在等待超時(shí)的過程中,兩個(gè)被掛起的客戶端都持有一個(gè)句柄,這類句柄屬于稀缺資源。如果這個(gè)鎖被加在了一個(gè)訪問很頻繁的行上,那么很多客戶端都會(huì)被阻塞。

當(dāng)您使用put使用將一個(gè)Put實(shí)例發(fā)送到服務(wù)器時(shí),如果您使用了如下的構(gòu)造函數(shù):

Put(byte[] row)

在這個(gè)構(gòu)造函數(shù)里,并沒有出現(xiàn)RowLock參數(shù),服務(wù)器會(huì)根據(jù)您的行為,自動(dòng)為您創(chuàng)建一個(gè)鎖。這個(gè)鎖對(duì)于客戶端是透明的,客戶端無法獲取到這個(gè)生命期很短的服務(wù)器端的鎖。相比于服務(wù)器自動(dòng)創(chuàng)建的隱式鎖??蛻舳艘部梢允褂蔑@式鎖,并且在多個(gè)操作中使用到它??梢酝ㄟ^以下的方法:

RowLock
lockRow(byte[] row) throws IOException

void
unlockRow(RowLock rl) throws IOException

第一個(gè)方法lockRowrowkey為參數(shù),返回一個(gè)RockLock實(shí)例,這個(gè)實(shí)例可以傳遞到PutDelete的構(gòu)造函數(shù)中去。一旦您成功的獲取到一個(gè)鎖后,在使用完之后,您必須調(diào)用unlockRow方法釋放它。

您申請(qǐng)到的LockRow會(huì)鎖定整行,它通過rowkey來確定行,一旦擁有,別的客戶端將不能對(duì)該行進(jìn)行更新操作。

當(dāng)這個(gè)行鎖無論是被客戶端或者服務(wù)器占有時(shí),另一個(gè)想申請(qǐng)一個(gè)該行的新行鎖的行為都會(huì)被掛起,直到這行原有的行鎖被釋放或者過期。后者是對(duì)出錯(cuò)進(jìn)程持有行鎖情況的一個(gè)兼容。

一個(gè)行鎖的默認(rèn)超時(shí)時(shí)間是1分鐘,可以在hbase-site.xml進(jìn)行配置:

<property>

<name>hbase.regionserver.lease.period</name>

<value>120000</value>

</property>

上述代碼將超時(shí)時(shí)間改為120秒。當(dāng)然,這個(gè)值不能設(shè)的太大,否則,每個(gè)客戶端申請(qǐng)一個(gè)被別的進(jìn)程占用的鎖的最大等待時(shí)間都會(huì)變?yōu)檫@個(gè)值。示例3-17給出一個(gè)由用戶生成的行鎖阻塞其它讀取客戶端的例子。

示例3-17 使用顯示行鎖

static class
UnlockedPut implements Runnable {

@Override

public void run()
{

try {

HTable
table = new HTable(conf, “testtable”);

Put
put = new Put(ROW1);

put.add(COLFAM1,
QUAL1, VAL3);

long
time = System.currentTimeMillis();

System.out.println(“Thread
trying to put same row now…”);

table.put(put);

System.out.println(“Wait
time: ” +

(System.currentTimeMillis()
- time) + “ms”);

} catch
(IOException e) {

System.err.println(“Thread
error: ” + e);

}

}

}

System.out.println(“Taking
out lock…”);

RowLock lock =
table.lockRow(ROW1);

System.out.println(“Lock
ID: ” + lock.getLockId());

Thread thread =
new Thread(new UnlockedPut());

thread.start();

try {

System.out.println(“Sleeping
5secs in main()…”);

Thread.sleep(5000);

} catch
(InterruptedException e) {

//
ignore

}

try {

Put put1 = new
Put(ROW1, lock);

put1.add(COLFAM1,
QUAL1, VAL1);

table.put(put1);

Put put2 = new
Put(ROW1, lock);

put2.add(COLFAM1, QUAL1,
VAL2);

table.put(put2);

} catch (Exception
e) {

System.err.println(“Error:
” + e);

} finally {

System.out.println(“Releasing
lock…”);

table.unlockRow(lock);

}

首先定義了一個(gè)的線程,它會(huì)不斷使用隱式鎖不斷地訪問同一行上的記錄,在取得鎖之前,put操作始終會(huì)被掛起,在線程內(nèi)部會(huì)打印出等待的時(shí)間。

主線程,先顯示的創(chuàng)建一個(gè)行鎖,然后啟動(dòng)一個(gè)前面定義的線程實(shí)例。接著,主線程sleep 5秒之后,將這個(gè)鎖傳給Put對(duì)象,進(jìn)行put操作之后,這個(gè)鎖會(huì)被釋放,從而前面那個(gè)無鎖的線程會(huì)繼續(xù)運(yùn)行。

運(yùn)行這段示例代碼,將會(huì)得到如下的輸出:

Taking out lock…

Lock ID:
4751274798057238718

Sleeping 5secs in
main()…

Thread trying to
put same row now…

Releasing lock…

Wait time: 5007ms

After thread
ended…

KV:
row1/colfam1:qual1/1300775520118/Put/vlen=4, Value: val2

KV:
row1/colfam1:qual1/1300775520113/Put/vlen=4, Value: val1

KV:
row1/colfam1:qual1/1300775515116/Put/vlen=4, Value: val3

可以看出無鎖的線程的確被阻塞了5秒鐘。直到主線程做完兩次put操作后釋放了行鎖,才繼續(xù)運(yùn)行。我們可以看到一個(gè)有趣的現(xiàn)象,由無鎖對(duì)象最后插入的值,卻擁有最小的時(shí)間戳。這是因?yàn)椋瑢?shí)際上無鎖線程的put操作是最早執(zhí)行的。一旦它被發(fā)送到Region Server服務(wù)器,Region Server便會(huì)給它打上一個(gè)時(shí)間戳,雖然此時(shí)它還無法獲取到行鎖而被阻塞,但此時(shí)的時(shí)間戳已經(jīng)生成了。主線程一共花了7ms向服務(wù)器提交了兩次Put命令,并釋放了行鎖。

當(dāng)您使用一個(gè)先前申請(qǐng)到的行鎖,但是由于超時(shí)無效時(shí),您將會(huì)收到服務(wù)器端生成的一個(gè)錯(cuò)誤,以UnknownRowLockException異常的形式返回。它說明,服務(wù)器端已經(jīng)將這個(gè)鎖廢棄掉了。此時(shí),您可以將它drop掉,然后重新申請(qǐng)一個(gè)新的鎖。

Scan(掃描)

前面我們討論了基本的CRUD類型的操作,現(xiàn)在輪到scan了,類似于數(shù)據(jù)庫(kù)系統(tǒng)中的游標(biāo)(cursor)。它可以充分使用到HBase提供的順序性的、排序的存儲(chǔ)結(jié)構(gòu)。

基本介紹

scan操作與get操作非常相似。介紹它之前,必須先介紹另一個(gè)類Scan。由于scan更像是一個(gè)迭代器,因此,并沒有scan()的方法,取而代之的是getScanner方法,它會(huì)返回您想要遍歷訪問的scanner對(duì)象??梢岳萌缦碌姆椒ㄈ〉剿?/SPAN>

ResultScanner
getScanner(Scan scan) throws IOException

ResultScanner
getScanner(byte[] family) throws IOException

ResultScanner
getScanner(byte[] family, byte[] qualifier)

throws
IOException

后面兩個(gè)方法是為了加強(qiáng)友好性,會(huì)隱式地創(chuàng)建一個(gè)scan對(duì)象,然后再調(diào)用getScanner(Scan
scan)
方法取到ResultScanner。

Scan類有如下的構(gòu)造函數(shù):

Scan()

Scan(byte[]
startRow, Filter filter)

Scan(byte[]
startRow)

Scan(byte[]
startRow, byte[] stopRow)

Get方法不同的是,您不再指定一個(gè)具體的rowkey,您現(xiàn)在可以選擇性的指定一個(gè)startRow參數(shù),這個(gè)參數(shù)定義HTable中,要讀取的rowkey的起始位置??蛇x的參數(shù)stopRow定義讀取rowkey的結(jié)束位置。

startRow是被包含的,而endRow是不被包含的,因此,可以用以表述式[startRow,
stopRow)
來描述這種關(guān)系。

Scan提供的一個(gè)特殊的功能就是您不必精確地指定rowkey。相反,scan會(huì)相匹配與指定的startKey相等或者大于它的所有的rowkey。如是沒有指定startKey,那么將會(huì)從表的起始位置開始遍歷。

如果指定了stopKey,那么只會(huì)遍歷rowkey小于stopKey的所有記錄。如果沒有指定stopKey,那么scan會(huì)遍歷到表的末尾。

還有另一個(gè)可選的參數(shù)Filter,這個(gè)參數(shù)指向一個(gè)Filter實(shí)例。Scan對(duì)象常常使用空的構(gòu)造函數(shù)來創(chuàng)建,另外的參數(shù)都可以通過gettersetter方法進(jìn)行指定。

Scan實(shí)例被創(chuàng)建后,如果想加入更多的限制條件,您可以使用很多的方法來限制讀出的數(shù)據(jù)。當(dāng)然,您也可以創(chuàng)建一個(gè)空的Scan,將整個(gè)表的所有的column familycolumn
qualifier
讀出來。

Scan
addFamily(byte [] family)

Scan
addColumn(byte[] family, byte[] qualifier)

Get類相似,您也可以使用addFamily來設(shè)置column
families
或者使用addColumn來設(shè)置column,從而限定讀出數(shù)據(jù)的條件。

如果您只需要數(shù)據(jù)的一部分,那么通過限制Scan的范圍,只是體現(xiàn)了HBase的強(qiáng)大之處,由于數(shù)據(jù)是以column family為物理單元分隔文件的,不在Scan范圍內(nèi)的column family對(duì)應(yīng)的文件根本不會(huì)被打開,這只是面向列存儲(chǔ)最大的優(yōu)勢(shì)所在。

Scan
setTimeRange(long minStamp, long maxStamp) throws IOException

Scan
setTimeStamp(long timestamp)

Scan
setMaxVersions()

Scan setMaxVersions(int
maxVersions)

您還可以限制Scantimestamp,設(shè)置timestamp的范圍,設(shè)置掃描的版本數(shù)。使用setStartRow(),setStopRow()setFilter(),可以達(dá)到構(gòu)造函數(shù)中的參數(shù)值同樣的效果。方法hasFilter()可以判斷一個(gè)Scan中是否添加了filter。表3-8列出了其它一些方法。

3-8 Scan的方法一覽

Result

Description

getStartRow()/getStopRow()

Can be used to retrieve the
currently assigned values.

getTimeRange()

Retrieves the associated timestamp or time range of the Get
instance. Note that there is no getTimeStamp() since the API converts a value
assigned with setTimeStamp() into a TimeRange instance internally, setting
the minimum and maximum values to the given timestamp.

getMaxVersions()

Returns the currently configured number of versions that should
be retrieved from the table for every column.

getFilter()

Special filter instances can be used to select certain columns
or cells, based on a wide variety of conditions. You can get the currently
assigned filter using this method. It may return null if none was previously
set.

setCacheBlocks()

/getCacheBlocks()

Each HBase region server has a block cache that efficiently
retains recently accessed data for subsequent reads of contiguous
information. In some events it is better to not engage the cache to avoid too
much churn when doing full table scans. These methods give you control over
this feature.

numFamilies()

Convenience method to retrieve the size of the family map,
containing the families added using the addFamily() or addColumn() calls.

hasFamilies()

Another helper to check if a familyor columnhas been added to the current
instance of the Scan class.

getFamilies()

/setFamilyMap()

/getFamilyMap()

These methods give you access to the column families and
specific columns, as added by the addFamily() and/or addColumn() calls. The
family map is a map where the key is the family name and the value is a list
of added column qualifiers for this particular family. The getFamilies()
returns an array of all stored families, i.e., containing only the family
names (as

byte[] arrays).

當(dāng)您創(chuàng)建了Scan實(shí)例之后,您就需要調(diào)用HTablegetScanner方法,取得ResultScanner實(shí)例。在下一節(jié)中,我們將詳細(xì)介紹ResultScanner類。

ResultScanner

Scans不會(huì)將所有匹配到的行通過一個(gè)RPC調(diào)用發(fā)送到客戶端,而是分多次。這是因?yàn)?,一行?shù)據(jù)可能很大,放在一次傳輸會(huì)消耗掉很多資源、花費(fèi)很長(zhǎng)的時(shí)間。

ResultScannerscan轉(zhuǎn)化成一種類似于get的操作,將結(jié)果通過迭代器訪問出來。它具有自己的一些方法:

Result next()
throws IOException

Result[] next(int
nbRows) throws IOException

void close()

有兩種形式的next函數(shù)可以調(diào)用,close()操作用來釋放資源。

Next()調(diào)用返回一個(gè)單獨(dú)的Result實(shí)例,存放一個(gè)可用的row對(duì)象。同樣的,您也可以使用next(int
nbRows)
一次取回來很多行,該調(diào)用返回一個(gè)Result的數(shù)組對(duì)象,數(shù)組中的每一行都代表一個(gè)唯一的row。當(dāng)然,取到的值可能小于nbRows,但這很少在未取完數(shù)據(jù)時(shí)發(fā)生。可以查看,前面對(duì)Result實(shí)例的介紹,學(xué)習(xí)如何使用這個(gè)類。示例3-18給出了如何使用scan訪問一個(gè)表。

示例3-18 使用scan訪問數(shù)據(jù)

Scan scan1 = new
Scan();

ResultScanner
scanner1 = table.getScanner(scan1);

for (Result res :
scanner1) {

System.out.println(res);

}

scanner1.close();

Scan scan2 = new
Scan();

scan2.addFamily(Bytes.toBytes(“colfam1″));

ResultScanner
scanner2 = table.getScanner(scan2);

for (Result res :
scanner2) {

System.out.println(res);

}

scanner2.close();

Scan scan3 = new
Scan();

scan3.addColumn(Bytes.toBytes(“colfam1″),
Bytes.toBytes(“col-5″)).

addColumn(Bytes.toBytes(“colfam2″),
Bytes.toBytes(“col-33″)).

setStartRow(Bytes.toBytes(“row-10″)).

setStopRow(Bytes.toBytes(“row-20″));

ResultScanner
scanner3 = table.getScanner(scan3);

for (Result res :
scanner3) {

System.out.println(res);

}

scanner3.close();

首先創(chuàng)建一個(gè)空的Scan對(duì)象,使用這個(gè)對(duì)象對(duì)表進(jìn)行遍歷,然后關(guān)閉這個(gè)scanner釋放相關(guān)資源。接著再創(chuàng)建一個(gè)只查詢colfam1下記錄的scanner,并打印相關(guān)的記錄。最后創(chuàng)建一個(gè)只掃描列colfam1:col-5colfam2:col-33,且rowkey范圍從row-10row-20的所有記錄。

為了測(cè)試上述示例,首先創(chuàng)建一個(gè)表,含有colfam1colfam2兩個(gè)column family。然后很這個(gè)表中插入100行記錄。我們不列出全表掃描的輸出結(jié)果,而僅列出scan2scan3的輸出結(jié)果:

Scanning table
#3…

keyvalues={row-10/colfam1:col-5/1300803775078/Put/vlen=8,

row-10/colfam2:col-33/1300803775099/Put/vlen=9}

keyvalues={row-100/colfam1:col-5/1300803780079/Put/vlen=9,

row-100/colfam2:col-33/1300803780095/Put/vlen=10}

keyvalues={row-11/colfam1:col-5/1300803775152/Put/vlen=8,

row-11/colfam2:col-33/1300803775170/Put/vlen=9}

keyvalues={row-12/colfam1:col-5/1300803775212/Put/vlen=8,

row-12/colfam2:col-33/1300803775246/Put/vlen=9}

keyvalues={row-13/colfam1:col-5/1300803775345/Put/vlen=8,

row-13/colfam2:col-33/1300803775376/Put/vlen=9}

keyvalues={row-14/colfam1:col-5/1300803775479/Put/vlen=8,

row-14/colfam2:col-33/1300803775498/Put/vlen=9}

keyvalues={row-15/colfam1:col-5/1300803775554/Put/vlen=8,

row-15/colfam2:col-33/1300803775582/Put/vlen=9}

keyvalues={row-16/colfam1:col-5/1300803775665/Put/vlen=8,

row-16/colfam2:col-33/1300803775687/Put/vlen=9}

keyvalues={row-17/colfam1:col-5/1300803775734/Put/vlen=8,

row-17/colfam2:col-33/1300803775748/Put/vlen=9}

keyvalues={row-18/colfam1:col-5/1300803775791/Put/vlen=8,

row-18/colfam2:col-33/1300803775805/Put/vlen=9}

keyvalues={row-19/colfam1:col-5/1300803775843/Put/vlen=8,

row-19/colfam2:col-33/1300803775859/Put/vlen=9}

keyvalues={row-2/colfam1:col-5/1300803774463/Put/vlen=7,

row-2/colfam2:col-33/1300803774485/Put/vlen=8}

再一次強(qiáng)調(diào)的時(shí),輸出的結(jié)果與插入的順序無關(guān),有趣的是,rowkey的排列按照了字母序進(jìn)行輸出。

Caching vs. Batching

以目前為止,每個(gè)next()操作都是一次RPC調(diào)用,即使當(dāng)你使用next( int
nbRows)
時(shí)。因?yàn)檫@個(gè)next( int nbRows )只是一個(gè)簡(jiǎn)單的next()調(diào)用的客戶端循環(huán)。顯然,這樣的性能是很難令人滿意的。因此,在一次RPC調(diào)用中返回多行便顯得意義重大。這個(gè)機(jī)制叫制scanner緩存(scanner
caching)
,它默認(rèn)是關(guān)閉的。

您可以使用在兩個(gè)層面打開它:在表一級(jí)設(shè)定,將會(huì)對(duì)這個(gè)表的所有的scanner實(shí)例生效。在scan一級(jí)設(shè)定,將只會(huì)對(duì)這個(gè)scan生效。當(dāng)然您也可以對(duì)通過對(duì)HTable實(shí)例進(jìn)行設(shè)定,達(dá)到對(duì)所有的表生效。

void
setScannerCaching(int scannerCaching)

int
getScannerCaching()

當(dāng)然您也可以在HBase安裝時(shí),修改默認(rèn)值??梢酝ㄟ^修改配置文件hbase-site.xml實(shí)現(xiàn):

<property>

<name>hbase.client.scanner.caching</name>

<value>10</value>

</property>

上述配置文件將cache的默認(rèn)大小改為了10。當(dāng)然,您還可以在代碼中繼續(xù)設(shè)定新的值。

在您通過getScanner方法,得到一個(gè)Scanner對(duì)象后,通過setScannerCaching()設(shè)置緩存大小,getScannerCaching()得到目前的緩存大小。API將會(huì)把設(shè)置的大小傳遞給scanner對(duì)象,除非您使用了Scan類的方法:

void
setCaching(int caching)

int getCaching()

scan直接設(shè)置cache大小擁有最高的優(yōu)先級(jí)。通過對(duì)緩存大小的設(shè)定,可以使一次RPC調(diào)用返回多行記錄。兩種next()都會(huì)使用到這個(gè)緩存。

您可能需要找到RPC操作的數(shù)據(jù)和內(nèi)存占用情況的一個(gè)折中,scanner的緩存大小越大,讀取的性能越好(當(dāng)然值過大,也不好),但緩存的條目多了之后,一次傳輸消耗的時(shí)間越長(zhǎng),占用的堆空間大小也越大,還會(huì)引發(fā)OOM異常(OutOfMemoryException)。

當(dāng)從服務(wù)器到客戶端傳輸數(shù)據(jù)的時(shí)間或者客戶端處理數(shù)據(jù)的時(shí)間大于了scanner設(shè)置的超時(shí)時(shí)間,那么客戶端將會(huì)收到一個(gè)ScannerTimeoutException。

示例3-19 scanner超時(shí)的例子

Scan scan = new
Scan();

ResultScanner
scanner = table.getScanner(scan);

int scannerTimeout
= (int) conf.getLong(

HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
-1);

try {

Thread.sleep(scannerTimeout
+ 5000);

} catch
(InterruptedException e) {

//
ignore

}

while (true){

try {

Result
result = scanner.next();

if
(result == null) break;

System.out.println(result);

} catch (Exception
e) {

e.printStackTrace();

break;

}

}

scanner.close();

首先獲取當(dāng)前scanner的超時(shí)時(shí)間,然后sleep一會(huì)兒,等待超時(shí)。接著嘗試打印出取到的結(jié)果集。將會(huì)得到如下的輸出:

Adding rows to
table…

Current (local)
lease period: 60000

Sleeping now for
65000ms…

Attempting to
iterate over scanner…

Exception in
thread “main” java.lang.RuntimeException:

org.apache.hadoop.hbase.client.ScannerTimeoutException:
65094ms passed

since
the last invocation, timeout is currently set to 60000

at
org.apache.hadoop.hbase.client.HTable$ClientScanner$1.hasNext

at
ScanTimeoutExample.main

Caused by:
org.apache.hadoop.hbase.client.ScannerTimeoutException: 65094ms

passed
since the last invocation, timeout is currently set to 60000

at
org.apache.hadoop.hbase.client.HTable$ClientScanner.next

at
org.apache.hadoop.hbase.client.HTable$ClientScanner$1.hasNext


1 more

Caused by:
org.apache.hadoop.hbase.UnknownScannerException:

org.apache.hadoop.hbase.UnknownScannerException:
Name: -315058406354472427

at
org.apache.hadoop.hbase.regionserver.HRegionServer.next

scanner超時(shí)之后,客戶端嘗試打印從服務(wù)器上取出的值時(shí),將會(huì)將到一個(gè)異常。

您可能想在客戶端代碼中加入以下代碼來增大超時(shí)時(shí)間:

Configuration conf
= HBaseConfiguration.create()

conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
120000)

但由于scan的超時(shí)時(shí)間是配在Region Server上的,因此,上述配置并不會(huì)生效。如果您真的想修改這個(gè)值,您只有去Region Server上去修改hbase-site.xml,并重啟集群。

從輸出打印的堆??梢钥闯?,ScannerTimeoutException嵌套在了UnknownScannerException之中,這意味著next()調(diào)用使用了一個(gè)已經(jīng)過期的scanner ID,但這個(gè)ID已經(jīng)被刪除了。換句話說,客戶端存儲(chǔ)的scanner ID,Region Server已經(jīng)不認(rèn)識(shí)了,從而拋出一個(gè)UnknownScannerException。

現(xiàn)在,您已經(jīng)學(xué)會(huì)了如何使用客戶端的scanner緩存來提高批量交互的性能。但有一點(diǎn)要注意的是,對(duì)于非常大的行,可能無法放入客戶端的內(nèi)存中。使用HBase客戶端API中的batching,可以處理這種情況:

void setBatch(int
batch)

int getBatch()

caching(處理層次為rows)相對(duì)應(yīng),batching處理的層次是columns。它控制一次next()調(diào)用傳輸多少個(gè)columns。通過ResultScannersetBatch()方法可以進(jìn)行設(shè)置,setBatch(5)將使每個(gè)Result實(shí)例,返回5個(gè)columns

當(dāng)一行含有非常多的列時(shí),您可以使用setBatch方法,一次next()返回一行中的部分列。Result中返回的列也可能達(dá)不到batching的值,比較一行有17列,batching的值為5,那么前三次next()將得到5,5,5,最后一次調(diào)用只能返回2個(gè)列。

通過設(shè)置cachingbatch的大小,scanner可以在選擇rowkey范圍查詢時(shí)控制RPC的多少。示例3-20用兩個(gè)參數(shù)來對(duì)Result實(shí)例大小與請(qǐng)求次數(shù)進(jìn)行調(diào)優(yōu)。

示例3-20 使用cachingbatch兩個(gè)參數(shù)

private static
void scan(int caching, int batch) throws IOException {

Logger log =
Logger.getLogger(“org.apache.hadoop”);

final int[]
counters = {0, 0};

Appender appender
= new AppenderSkeleton() {

@Override

protected void
append(LoggingEvent event) {

String msg =
event.getMessage().toString();

if (msg != null
&& msg.contains(“Call: next”)) {

counters[0]++;

}

}

@Override

public void
close() {}

@Override

public boolean
requiresLayout() {

return
false;

}

};

log.removeAllAppenders();

log.setAdditivity(false);

log.addAppender(appender);

log.setLevel(Level.DEBUG);

Scan scan = new
Scan();

scan.setCaching(caching);

scan.setBatch(batch);

ResultScanner
scanner = table.getScanner(scan);

for (Result result
: scanner) {

counters[1]++;

}

scanner.close();

System.out.println(“Caching:
” + caching + “, Batch: ” + batch +

“,
Results: ” + counters[1] + “, RPCs: ” + counters[0]);

}

public static void
main(String[] args) throws IOException {

scan(1, 1);

scan(200, 1);

scan(2000, 100);

scan(2, 100);

scan(2, 10);

scan(5, 100);

scan(5, 20);

scan(10, 10);

}

示例代碼首先設(shè)置cachingbatch的參數(shù),然后打印Result的大小和RPC的次數(shù)。對(duì)不同的cachingbatch大小進(jìn)行了組合測(cè)試。

Caching: 1, Batch:
1, Results: 200, RPCs: 201

Caching: 200,
Batch: 1, Results: 200, RPCs: 2

Caching: 2000,
Batch: 100, Results: 10, RPCs: 1

Caching: 2, Batch:
100, Results: 10, RPCs: 6

Caching: 2, Batch:
10, Results: 20, RPCs: 11

Caching: 5, Batch:
100, Results: 10, RPCs: 3

Caching: 5, Batch:
20, Results: 10, RPCs: 3

Caching: 10,
Batch: 10, Results: 20, RPCs: 3

通過調(diào)整兩個(gè)參數(shù)的值,可以觀察它們對(duì)結(jié)果的影響。表3-9給出了一些組合的結(jié)果。為了運(yùn)行示例3-20,首先創(chuàng)建了一個(gè)擁有兩個(gè)column family的表,添加了10行,每行中,每個(gè)column family下添加10個(gè)column。這就意味著一共存在著200個(gè)columns或者叫做cell,每個(gè)cell只有一個(gè)版本。

3-9 示例參數(shù)的影響

Caching

Batch

Results

RPCs

Description

1

1

200

201

Each column is returned as a
separate Result instance. One more RPC is needed to realize the scan is
complete.

200

1

200

2

Each column is a separate
Result, but they are all transferred in one RPC (plus the extra check).

2

10

20

11

The batch is half the row
width, so 200 divided by 10 is 20 Results needed. 10 RPCs (plus the check) to
transfer them.

5

100

10

3

The batch is too large for
each row, so all 20 columns are batched. This requires 10 Result instances.
Caching brings the number of RPCs down to two (plus the check).

5

20

10

3

This is the same as above, but
this time the batch matches the columns available. The outcome is the same.

10

10

20

3

This divides the table into
smaller Result instances, but larger caching also means only two RPCs are
needed.

為了計(jì)算RPC的次數(shù),您需要首先將行數(shù)與最行的column數(shù)相乘,然后用這個(gè)值除以batchcolumn數(shù)中的較小值。最后用這個(gè)值除以caching大小。用數(shù)學(xué)公式表示為:

RPCs = (Rows *
Cols per Row) / Min(Cols per Row, Batch Size) /Scanner Caching

還需要額外的RPC操作來打開和關(guān)閉scanner。因此,還scanner還需要兩次額外的RPC操作。

3-2描述了cachingbatching是如何起作用的。如圖所示,該表具有9行值,每行都有不定數(shù)目的column。設(shè)置scannercaching的大小為6batch大小為3。您可以看到,需要3RPC操作來轉(zhuǎn)輸數(shù)據(jù)(虛線包圍的部分)。

3-2 通過cachingbatching控制scan操作RPC的次數(shù)

由于batch大小小于一行中column的數(shù)目,因此,服務(wù)器將3個(gè)columns打成一個(gè)Result,一次RPC操作可以傳輸6個(gè)這樣的Result。如果batch大小不設(shè)置,而caching大小被設(shè)置時(shí),每行記錄將包含一行中所有column,這樣一個(gè)Result實(shí)例中就是一個(gè)完整的行。只有當(dāng)您設(shè)置了batch參數(shù),才有可能把一行拆成多個(gè)Result實(shí)例。

一開始您可能不需要考慮cachingbatching的大小設(shè)置,當(dāng)您進(jìn)行應(yīng)用程序的調(diào)優(yōu)時(shí),您必須對(duì)這個(gè)原理非常清楚才能找到一個(gè)最好的平衡點(diǎn)。

輔助功能

在進(jìn)一步了解客戶端的其它功能之前,我們有必要先了解HBase和它的客戶端提供的一些有用的輔助功能。

HTable方法集

客戶端API代表了一個(gè)HTable實(shí)例,它提供了訪問一個(gè)HBase表的一些方法。除了前面提到的一些對(duì)于訪問HBase表的主要方法,還有另外一些值得留意的方法:

void close()

該方法前面有所提及,但考慮到它的重要性,這樣有必要專門再次提及。在結(jié)束了對(duì)表的訪問之后,一定要調(diào)用close()接口。當(dāng)close()被調(diào)用時(shí),客戶端會(huì)調(diào)用flushCache()方法,將客戶端緩存區(qū)中緩存的數(shù)據(jù)提交到服務(wù)器。

byte[]
getTableName()

這個(gè)方法可以方例地取出表名。

HTableDescriptor
getTableDescriptor()

HBase中每個(gè)表都會(huì)使用一個(gè)HTableDescriptor的實(shí)例。您可以通過getTableDescriptor()獲取對(duì)表信息的訪問。

static Boolean
isTableEnabled(table)

HTable4個(gè)靜態(tài)方法,它們都需要一個(gè)配置對(duì)象,如沒有提供configuration,HTable會(huì)在程序的classpath下使用一個(gè)默認(rèn)的configuration。該函數(shù)檢查ZooKeepertable表是否為enable狀態(tài)。

byte[][]
getStartKeys()

byte[][]
getEndKeys()

pair<byte[][],
byte[][]> getStartEndKeys()

這幾個(gè)函數(shù)可以訪問表中當(dāng)前的rowkey范圍,隨著數(shù)據(jù)的不斷增加,調(diào)用后也會(huì)得到不同的結(jié)果。這3個(gè)方法返回byte數(shù)組。您可以使用Bytes.toStringBinary()來打印出key值。

void
clearRegionCache()

HRegionLocation
getRegionLocation( row )

Map<HRegionInfo,
HServerAddress> getRegionInfo()

這幾個(gè)方法使您可以取出Region的信息,您可以使用第一個(gè)方法來清楚客戶端上的緩存,也可以使用第三個(gè)方法來取出所有Region信息。這些方法幫助一些高級(jí)使用者來利用Region信息,比較路由負(fù)載、計(jì)算靠近數(shù)據(jù)等。

void
prewarmRegionCache(Map<HRegionInfo, HServerAddress> regionMap)

static void
setRegionCachePrefetch(table, enable)

static Boolean
getRegionCachePrefetch(table)

這也是一組高級(jí)用法的API。這組API可以提前將tableRegion信息緩存到客戶端。使用上述API,您可以提供一個(gè)Region的列表來對(duì)Region信息進(jìn)行預(yù)熱。

Bytes

該類用來將Java類型,比如String或者long轉(zhuǎn)化為rawbyte數(shù)組等HBase支持的類型。因此,再介紹這個(gè)類和它的函數(shù)是有意義的。

大多數(shù)方法都有這三種形式,比如:

static long
toLong(byte[] bytes)

static long
toLong(byte[] bytes, int offset)

static long
toLong(byte[] bytes, int offset, int length)

它們的輸出都是byte數(shù)組,偏移量、長(zhǎng)度,后兩個(gè)可以缺省。它們的使用方式取決于您擁有的byte數(shù)組。如果您是使用Bytes.toBytes()方法得到的,那么您可以安全的使用第一個(gè)API,整個(gè)bytes數(shù)據(jù)存放著待轉(zhuǎn)化的值。

HBase內(nèi)部,將數(shù)據(jù)存放在一個(gè)大的字節(jié)數(shù)組中,使用如下的方法:

static int
putLong(byte[] bytes, int offset, long val)

這個(gè)方法允許您將一個(gè)Long對(duì)象寫入到一個(gè)給定的字節(jié)數(shù)組中的指定位置。如果您想從一個(gè)大數(shù)組中取出數(shù)據(jù),可以使用toLong方法。

Bytes類支持的Java類型包括:StringBoolean、short、intlong、doublefloat。除了這些,表3-10中還列出了一些有用的方法。

3-10 Bytes提供的一些方法

Result

Description

toStringBinary()

While working very similar to
toString(), this variant has an extra safeguard to convert nonprintable data
into their human-readable hexadecimal numbers. Whenever you are not sure what
a byte array contains you should use this method to print its content, for
example, to the console, or into a logfile.

compareTo()/equals()

These methods allow you to
compare two byte[], that is, byte arrays. The former gives you a comparison
result and the latter a boolean value, indicating whether the given arrays
are equal to each other.

add()/head()/tail()

You can use these to add two
byte arrays to each other, resulting in a new, concatenated array, or to get
the first, or last, few bytes of the given byte array.

binarySearch()

This performs a binary search
in the given array of values. It operates on byte arrays for the values and
the key you are searching for.

incrementBytes()

This increments a long value
in its byte array representation, as if you had used toBytes(long) to create
it. You can decrement using a negative amount parameter.

Bytes提供的一些方法與Java提供的ByteBuffer有一些復(fù)疊。區(qū)別是前者在處理的過程中不會(huì)生成新的實(shí)例,因此,它采用了一些優(yōu)化。對(duì)于HBase中,這種類型與字節(jié)之間的轉(zhuǎn)化操作被頻繁使用,因此,通過這種優(yōu)化,可以避免非常耗時(shí)的GC操作。


    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約