日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

實(shí)戰(zhàn)!基于canal同步mysql數(shù)據(jù)到elasticsearch

 昵稱(chēng)10087950 2022-06-16 發(fā)布于江蘇

首發(fā)公眾號(hào):MarkerHub

原創(chuàng)作者:呂一明

視頻講解:https://www.bilibili.com/video/BV1Jq4y1w7Bc/

hello,大家好呀,好久沒(méi)寫(xiě)過(guò)原創(chuàng)了,今天帶大家做個(gè)實(shí)驗(yàn)吧,基于canal同步mysql的數(shù)據(jù)到es中!

原理啥的,都給我百度去吧,這里直接搞實(shí)驗(yàn)!

本文使用docker環(huán)境安裝mysql、canal、elasticsearch,基于binlog利用canal實(shí)現(xiàn)mysql的數(shù)據(jù)同步到elasticsearch中。

實(shí)驗(yàn)中間件版本說(shuō)明:

  • centos 8

  • mysql 5.7.36

  • es 7.16.2

  • cannal.server: 1.1.5

  • canal.adapter: 1.1.5

  • postman

0、安裝docker

基本命令:

#centos 7 安裝 docker
yum install docker

#centos 8 安裝docker
yum erase podman buildah
yum install -y yum-utils
yum-config-manager --add-repo https://download./linux/centos/docker-ce.repo
yum install docker-ce docker-ce-cli containerd.io

#檢驗(yàn)安裝是否成功
[root@localhost opt]# docker --version
Docker version 20.10.12, build e91ed57

#啟動(dòng)
systemctl start docker

#換鏡像源
sudo vim /etc/docker/daemon.json
內(nèi)容如下:
{
"registry-mirrors": ["https://m9r2r2uj.mirror."]
}
保存退出,重啟docker

#重啟
sudo service docker restart

#列出鏡像
docker images

#查看運(yùn)行進(jìn)程
docker ps

1、安裝mysql

docker pull mysql:5.7.36
docker run --name mysql5736 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=admin -d mysql:5.7.36

docker exec -it mysql5736 /bin/bash
apt-get update
apt-get install vim
cd /etc/mysql/mysql.conf.d
vim mysqld.cnf  // 修改mysql配置

配置:

[mysqld]
#binlog setting
log-bin=mysql-bin  // 開(kāi)啟logbin
binlog-format=ROW  // binlog日志格式
server-id=1  // mysql主從備份serverId,canal中不能與此相同

圖片

保存退出,重啟mysql:service mysql restart

可能會(huì)退出docker鏡像,注意重啟啟動(dòng)docker的mysql。

mysql -uroot -p
show master status // binlog日志文件
reset master; // 重啟日志

圖片

查看是否配置成功:

圖片

查看日志文件:

cd /var/lib/mysql  // 進(jìn)入日志文件目錄
mysqlbinlog -vv mysql-bin.000001 // row格式查看日志

圖片

使用數(shù)據(jù)庫(kù)工具連接上docker中的mysql,然后創(chuàng)建dailyhub數(shù)據(jù)庫(kù),然后再查看日志(mysqlbinlog -vv mysql-bin.000001)可以看到截圖如下:

圖片

到這里,mysql已經(jīng)安裝成功了。

圖片

2、安裝es

docker pull elasticsearch:7.16.2
docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" --name='es7162' -d elasticsearch:7.16.2

注意如果拉取不出對(duì)應(yīng)的版本,可以上https://registry.hub./_/elasticsearch?tab=tags&page=1&ordering=last_updated,查看對(duì)應(yīng)的版本再拉取。我之前是拉取7.15.2的實(shí)驗(yàn)的,后來(lái)過(guò)來(lái)幾天發(fā)現(xiàn)這版本已經(jīng)拉取不了了,就改成了7.16.2。或者換低一點(diǎn)的版本也可以。 圖片

查看https:///artifact/org.springframework.boot/spring-boot-starter-data-elasticsearch/2.6.2,得到版本依賴(lài)關(guān)系,在springboot2.6.2版本下,7.15.2和7.16.2都可以用。

圖片

docker啟動(dòng)es:

圖片

圖片

然后我們需要配置一下es的信息:

docker exec -ites es7162 /bin/bash
cd config
vi elasticsearch.yml

配置文件:

cluster.name: dailyhub-es
network.host: 0.0.0.0

node.name: node-1
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: "*"
node.master: true
node.data: true

docker restart es7162 重啟es,注意千萬(wàn)別寫(xiě)錯(cuò)配置的信息,否則啟動(dòng)會(huì)失敗,啟動(dòng)失敗是后可以通過(guò)docker logs -f es7162查看原因,但也只能重新來(lái)了。然后服務(wù)器訪(fǎng)問(wèn):

// 查詢(xún)es所有mapping
http://119.45.25.164:9200/_mapping?pretty=true

注意如果是云服務(wù)器的話(huà),要在安全組中配置對(duì)應(yīng)的端口開(kāi)放、還有防火墻啥的,然后安全些的話(huà),還需要給es配合賬號(hào)密碼啥的。我這里為了實(shí)驗(yàn)就簡(jiǎn)單來(lái)了。

安裝中文分詞器

可以有兩種方式安裝中文分詞器,如果在線(xiàn)安裝的時(shí)候分詞器插件下載不下來(lái)那就只能離線(xiàn)安裝了。

1、在線(xiàn)安裝中文分詞器:

docker exec -ites es7162 /bin/bash

./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.16.2/elasticsearch-analysis-ik-7.16.2.zip

圖片

2、離線(xiàn)安裝中文分詞器:

首先打開(kāi)這個(gè)鏈接:https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v7.16.2,把分詞器插件下載下來(lái),

# 把插件復(fù)制到容器內(nèi)
docker cp elasticsearch-analysis-ik-7.16.2.zip es7162:/usr/share/elasticsearch/plugins

docker exec -it es7162 /bin/bash
cd /usr/share/elasticsearch/plugins/
mkdir ik
unzip elasticsearch-analysis-ik-7.16.2.zip -d ik
rm -rf elasticsearch-analysis-ik-7.16.2.zip

圖片

重啟es,查看日志是否加載ik分詞器成功!

docker restart es7162
docker logs es7162

圖片

當(dāng)你看到日志中有輸出analysis-ik,說(shuō)明已經(jīng)安裝成功。

3、安裝canal-server

拉取鏡像并啟動(dòng):

docker pull canal/canal-server:v1.1.5

docker run --name canal115 -p 11111:11111  --link mysql5736:mysql5736 -id canal/canal-server:v1.1.5

修改對(duì)應(yīng)的配置:

docker exec -it canal115 /bin/bash
cd canal-server/conf/example/
vi instance.properties // 修改配置

# 把0改成10,只要不和mysql的id相同就行
canal.instance.mysql.slaveId=10
# 修改成mysql對(duì)應(yīng)的賬號(hào)密碼,mysql5736就是mysql鏡像的鏈接別名
canal.instance.master.address=mysql5736:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=admin

圖片

驗(yàn)證配置是否成功:

#首先重啟一下canal
docker restart  canal115

docker exec -it canal115 /bin/bash
cd canal-server/logs/example/
tail -100f example.log // 查看日志

截圖如下,說(shuō)明已經(jīng)鏈接上了mysql主機(jī),此時(shí)mysql中的數(shù)據(jù)變化,都會(huì)在canal中有同步。 圖片

可以通過(guò)Java程序測(cè)試有沒(méi)連接上mysql:

導(dǎo)入canal-client包

<!-- 為了測(cè)試canal-server是否連接mysql成功,1.1.5版本少包,所以用1.1.4版本 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
  • com.markerhub.SimpleCanalClientExample

/**
* 公眾號(hào):MarkerHub
*
* 說(shuō)明:用于測(cè)試canal是否已經(jīng)連接上了mysql
*/
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 創(chuàng)建鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("119.45.25.164",
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("
message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交確認(rèn)
// connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}

當(dāng)mysql的數(shù)據(jù)更新時(shí)候效果如下: 圖片

注意當(dāng)后面canal-adapter也連接上canal-server后,程序就監(jiān)聽(tīng)不到數(shù)據(jù)變化了。

4、安裝canal-adapter

由于目前canal-adapter沒(méi)有官方docker鏡像,所以拉去一個(gè)非官方的

docker pull slpcat/canal-adapter:v1.1.5

docker run --name adapter115 -p 8081:8081 --link mysql5736:mysql5736 --link canal115:canal115 --link es7162:es7162 -d slpcat/canal-adapter:v1.1.5

圖片

修改配置:

docker exec -it adapter115 /bin/bash
cd conf/
vi application.yml

配置修改如下,一些不需要的配置或者注釋掉的配置可以刪除掉:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: canal115:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://mysql5736:3306/dailyhub?useUnicode=true
      username: root
      password: admin
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: es7162:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: dailyhub-es

圖片

接下來(lái)是修改表映射索引文件:

docker exec -it adapter115 /bin/bash
cd conf/es7

cp -v mytest_user.yml dailyhub_collect.yml
# 刪除其他多余的
rm -rf biz_order.yml customer.yml mytest_user.yml
vi dailyhub_collect.yml

配置文件:

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: dailyhub_collect
  _id: _id
  _type: _doc
  upsert: true
#  pk: id
  sql: "
SELECT
        c.id AS _id,
        c.user_id AS userId,
        c.title AS title,
        c.url AS url,
        c.note AS note,
        c.collected AS collected,
        c.created AS created,
        c.personal AS personal,
        u.username AS username,
        u.avatar AS userAvatar
FROM
        m_collect c
LEFT JOIN m_user u ON c.user_id = u.id

"

#  objFields:
#    _labels: array:;
#   etlCondition: "where c.c_time>={}"
  commitBatch: 3000

注意對(duì)于時(shí)間類(lèi)型,在后端一定要使用LocalDateTime或者LocalDate類(lèi)型,如果是Date類(lèi)型,需要自己手動(dòng)設(shè)置格式。

5、聯(lián)合測(cè)試

然后就可以直接測(cè)試了,準(zhǔn)備測(cè)試條件:

  • 在數(shù)據(jù)庫(kù)中生成表和字段,

  • 然后elasticsearch中生成索引。先新建數(shù)據(jù)庫(kù)dailyhub。然后數(shù)據(jù)表結(jié)構(gòu):

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for m_collect
-- ----------------------------
DROP TABLE IF EXISTS `m_collect`;
CREATE TABLE `m_collect` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `collected` date DEFAULT NULL,
  `created` datetime(6) DEFAULT NULL,
  `note` varchar(255) DEFAULT NULL,
  `personal` int(11) DEFAULT NULL,
  `title` varchar(255) DEFAULT NULL,
  `url` varchar(255) DEFAULT NULL,
  `user_id` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `FK6yx2mr7fgvv204y8jw5ubsn7h` (`user_id`),
  CONSTRAINT `FK6yx2mr7fgvv204y8jw5ubsn7h` FOREIGN KEY (`user_id`) REFERENCES `m_user` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4;

-- ----------------------------
-- Records of m_collect
-- ----------------------------

-- ----------------------------
-- Table structure for m_user
-- ----------------------------
DROP TABLE IF EXISTS `m_user`;
CREATE TABLE `m_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `avatar` varchar(255) DEFAULT NULL,
  `created` datetime(6) DEFAULT NULL,
  `lasted` datetime(6) DEFAULT NULL,
  `open_id` varchar(255) DEFAULT NULL,
  `statu` int(11) DEFAULT NULL,
  `username` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;

-- ----------------------------
-- Records of m_user
-- ----------------------------
INSERT INTO `m_user` VALUES ('1', 'https://image-1300566513.cos.ap-guangzhou./upload/images/5a9f48118166308daba8b6da7e466aab.jpg', '2022-01-05 16:08:40.042000', '2022-01-06 13:07:45.153000', 'ozWZ-uAOY2iecT-byynO382u01zg', '0', '公眾號(hào):MarkerHub');

接下來(lái)借postman來(lái)新建elasticsearch的索引:

# 創(chuàng)建索引并添加映射字段
PUT http://119.45.25.164:9200/dailyhub_collect

{
    "mappings": {
        "properties": {
            "collected": {
                "type": "date",
                "format": "date_optional_time||epoch_millis"
            },
            "created": {
                "type": "date",
                "format": "date_optional_time||epoch_millis"
            },
            "note": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart"
            },
            "personal": {
                "type": "integer"
            },
            "title": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart"
            },
            "url": {
                "type": "text"
            },
            "userAvatar": {
                "type": "text"
            },
            "userId": {
                "type": "long"
            },
            "username": {
                "type": "keyword"
            }
        }
    }
}

圖片

其他常用操作:

# 刪除索引
PUT http://119.45.25.164:9200/dailyhub_collect

# 查看素有索引映射
GET http://119.45.25.164:9200/_mapping?pretty=true

# 搜索文檔
GET http://119.45.25.164:9200/dailyhub_collect/_search

# 刪除ID為1的文檔
DELETE http://119.45.25.164:9200/dailyhub_collect/_doc/1

然后我們打開(kāi)canal-adapter的輸入日志:

docker logs --tail 100  -f adapter115

然后我們?cè)趍ysql的m_collect中新添加一條記錄,可以看到日志輸出如下: 圖片

然后搜索全部文檔,發(fā)現(xiàn)es中有數(shù)據(jù)啦。

圖片

如果看到adaptar115一直出現(xiàn)這種異常,說(shuō)明啟動(dòng)順序不對(duì),啟動(dòng)順序應(yīng)該是:mysql、es、canal、adapar

2022-01-11 10:43:15.278 [Thread-2] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Broken pipe Error sync but ACK!

到這里,實(shí)驗(yàn)成功,over,關(guān)注公眾號(hào):MarkerHub,帶你做更多Java實(shí)驗(yàn)!視頻講解:https://www.bilibili.com/video/BV1Jq4y1w7Bc/

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶(hù)發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買(mǎi)等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶(hù) 評(píng)論公約

    類(lèi)似文章 更多