2018-09-28: 示例job已上傳至github,地址見文末
0. 前言本文介紹了使用Kettle 對一張業(yè)務表數據(500萬條數據以上)進行實時(10秒)同步,采用了時間戳增量回滾同步的方法。關于ETL和Kettle的入門知識大家可以閱讀相關的blog和文檔學習。 1. 時間戳增量回滾同步假定在源數據表中有一個字段會記錄數據的新增或修改時間,可以通過它對數據在時間維度上進行排序。通過中間表記錄每次更新的時間戳,在下一個同步周期時,通過這個時間戳同步該時間戳以后的增量數據。這是時間戳增量同步。 但是時間戳增量同步不能對源數據庫中歷史數據的刪除操作進行同步,我們可以通過在每次同步時,把時間戳往前回滾一段時間,從而同步一定時間段內的刪除操作。這就是時間戳增量回滾同步,這個名字是我自己給取得,意會即可,就是在時間戳增量同步的同時回滾一定的時間段。 說明: 源數據表 需要被同步的數據表 目標數據表 同步至的數據表 中間表 存儲時間戳的表
2. 前期準備在兩個數據庫中分別創(chuàng)建數據表,并通過腳本在源數據表中插入500萬條數據,完成后再以每秒一條的速度插入新數據,模擬生產環(huán)境。 源數據表結構如下: CREATE TABLE `im_message` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`sender` varchar(45) COLLATE utf8_bin NOT NULL COMMENT '消息發(fā)送者:SYSTEM',
`send_time` datetime(6) NOT NULL,
`receiver` varchar(45) COLLATE utf8_bin NOT NULL COMMENT '消息接受者',
`content` varchar(255) COLLATE utf8_bin NOT NULL COMMENT '消息內容',
`is_read` tinyint(4) NOT NULL COMMENT '消息是否被讀?。?-未讀;非0-已讀',
`read_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin COMMENT='消息表'
3. 作業(yè)流程開始組件 建時間戳中間表 獲取中間表的時間戳,并設置為全局變量 刪除目標表中時間戳及時間戳以后的數據 抽取兩個數據表的時間戳及時間戳以后的數據進行比對,并根據比對結果進行刪除、新增或修改操作 更新時間戳
4. 創(chuàng)建作業(yè)作業(yè)的最終截圖如下:
 4.1 創(chuàng)建作業(yè)和DB連接打開Spoon工具,新建作業(yè),然后在左側主對象樹DB連接中新建DB連接。創(chuàng)建連接并測試通過后可以在左側DB連接下右鍵共享出來。因為在單個作業(yè)或者轉換中新建的DB連接都是局域數據源,在其他轉換和作業(yè)中是不能使用的,即使屬于同一個作業(yè)下的不同轉換,所以需要把他們共享,這樣DB連接就會成為全局數據源,不用多次編輯。 4.2 建時間戳中間表這一步是為了在目標數據庫建中間表etl_temp ,并插入初始的時間戳字段。因為該作業(yè)在生產環(huán)境是循環(huán)調用的,該步驟在每一個同步周期中都會調用,所以在建表時需要判斷該表是否已經存在,如果不存在才建表。 SQL代碼和組件配置截圖如下: CREATE TABLE IF NOT EXISTS etl_temp(id int primary key,time_stamp timestamp);
INSERT IGNORE INTO etl_temp (id,time_stamp) VALUES (1,'2018-05-22 00:00:00');

我把該作業(yè)時間戳的ID設為1,在接下來的步驟中也是通過這個ID查詢我們想要的時間戳
4.2 獲取時間戳并設為變量新建一個轉換,在轉換中使用表輸入和設置變量兩個組件 表輸入SQL 代碼和組件配置截圖如下
在Kettle 中設置的變量都是字符串類型,為了便于比較。我在SQL語句把查出的時間戳進行了格式轉換 select date_format(time_stamp , '%Y-%m-%d %H:%i:%s') time_stamp from etl_temp where id='1'

設置變量變量活動類型可以為該變量設置四種有效活動范圍,分別是JVM、該Job、父Job和祖父Job 
4.3 刪除目標表中時間戳及時間戳以后的數據這樣做有兩個好處: 避免在同步中重復或者遺漏數據。例如當時間戳在源數據表中不是唯一的,上一次同步周期最后一條數據的時間戳是2018-05-25 18:12:12 ,那么上一次同步周期結束后中間表中的時間戳就會更新為2018-05-25 18:12:12 。如果在下一個同步周期時源數據表中仍然有時間戳為2018-05-25 18:12:12 的新數據,那么同步就會出現(xiàn)數據不一致。采用大于時間戳的方式同步就會遺漏數據,采用等于時間戳的方式同步就會重復同步數據。 增加健壯性 當作業(yè)異常結束后,不用做任何多余的操作就可以重啟。因為會刪除目標表中時間戳及時間戳以后的數據,所以不用擔心數據一致性問題
2018-09-29:對增加健壯性進行補充:在一次同步周期中腳本異常中斷,這時候中間表的時間戳沒有更新,但是目標表已經同步了部分數據,當再次啟動腳本就會出現(xiàn)數據重復的情況,而且在很多時候因為主鍵的存在,腳本啟動會報錯
在組件中使用了上一步驟設置的變量,所以必須勾選使用變量替換 delete from test_kettle.im_message where send_time>='${TIME_STAMP}'

4.4 抽取、比對和更新數據這一步才是真正的數據同步步驟,完成了數據的抽取、比對,并根據不同的比對結果刪除、更新、插入或不做任何操作。 正如前文所說,為了同步刪除操作,在原始表輸入和目標表輸入步驟中回滾了一定時間段。其中回滾的時間段設置為了全局的參數。左右空白處右鍵即可設置參數,該作業(yè)下的所有作業(yè)和轉換都能使用,設置如下圖 
轉換截圖如下 
原始表輸入SELECT
id
, sender
, send_time
, receiver
, content
, is_read
, read_time
FROM ueqcsd.im_message
where send_time>= date_sub(str_to_date('${TIME_STAMP}','%Y-%m-%d %H:%i:%s'), interval ${ROLL_BACK_DAYS} day);

目標表輸入SELECT
id
, sender
, send_time
, receiver
, content
, is_read
, read_time
FROM test_kettle.im_message
where send_time>= date_sub(str_to_date('${TIME_STAMP}','%Y-%m-%d %H:%i:%s'), interval ${ROLL_BACK_DAYS} day);

注意兩個組件的數據庫鏈接是不同的,當然它們也就這個和名字不同
比對記錄對兩個表輸入查出的數據進行比對,并把比對的結果寫進輸入流,傳遞給后面的組件。 比對的結果有三種: 標注字段表示比對結果的字段名,后面有用。關鍵字段表示比對的字段,在這個作業(yè)中我們比較兩個的主鍵ID 。 
Switch該步驟對上一步驟產生的標注字段進行路由,不同的結果路由到不同的步驟。其中目標步驟表示下一步驟的名字。 
插入Kettle 有一個插入/更新組件,但是據網友介紹這個組件性能低下,每秒最多只能同步幾百條數據,所有我對插入和更新分別作了不同的處理。插入使用表輸出組件;更新使用更新組件。 為了進一步提升同步效率,我在表輸出組件使用了多線程(右鍵>改變開始復制的數量),使同步速度達到每秒12000條。Switch組件和表輸出組件中間的虛擬組件(空操作)也是為了使用多線程添加的。


勾選批量插入,可以極大提高同步速度
更新和刪除

4.5 更新時間戳set @new_etl_start_time_stamp = (SELECT SEND_TIME FROM test_kettle.im_message ORDER BY SEND_TIME DESC LIMIT 1);
update etl_temp set time_stamp=@new_etl_start_time_stamp where id='1';

4.6 發(fā)送郵箱關于發(fā)送郵件組件網上有很多資料,就不多做介紹。特別強調一點,郵箱密碼是 單獨的授權碼,而不是郵箱登錄密碼。 運行在開發(fā)環(huán)境點擊Spoon界面左上角三角符號運行作業(yè)即可。 在第一次運行時,為了提高同步效率,可以先不創(chuàng)建目標表的索引。在第一此同步完成后,再創(chuàng)建索引。然后在START組件中編輯調度邏輯,再次啟動。 如下圖所示 
運行日志如下圖 
|