作者:小傅哥,博客:https://bugstack.cn
本文的宗旨在于通過簡單干凈實踐的方式教會讀者,配置出一套 Canal 工具服務(wù),來同步分庫分表的數(shù)據(jù)到 Elasticsearch 文件夾系統(tǒng)中。同時在 SpringBoot 工程中,配置出兩套數(shù)據(jù)源,一套是 MySQL + MyBatis,一套是 Elasticsearch + MyBatis。【這是非常重要的設(shè)計手段】
雖然現(xiàn)在有 TiDB 這樣的分布式數(shù)據(jù)庫,但對于分庫分表 + 數(shù)據(jù)同步ES,依然是非常主流的方案。同時也有一部分是把分庫分表的數(shù)據(jù)同步到 TiDB 使用。
本文涉及的工程:
- xfg-dev-tech-canal:https://gitcode.net/KnowledgePlanet/road-map/xfg-dev-tech-canaldocs/dev-ops/xfg-dev-tech-canal-docker-compose.yml:提供了所需的環(huán)境安裝,mysql、canal-server、canal-adapter、elasticsearch、kibanaGithub:https://github.com/alibaba/canal
一、組件介紹
canal ,譯為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費。
早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業(yè)務(wù)需求,實現(xiàn)方式主要是基于業(yè)務(wù) trigger 獲取增量變更。從 2010 年開始,業(yè)務(wù)逐步嘗試數(shù)據(jù)庫日志解析獲取增量變更進(jìn)行同步,由此衍生出了大量的數(shù)據(jù)庫增量訂閱和消費業(yè)務(wù)。
它的工作原理是,canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議。在 MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal ) 這樣 canal 再解析 binary log (binlog)進(jìn)行配置分發(fā),同步到 Elasticsearch 等系統(tǒng)中進(jìn)行使用。
那么有了 canal 就可以把分庫分表的數(shù)據(jù)同步到 Elasticsearch,提供匯總查詢和聚合操作,也就不需要把輪訓(xùn)每個分庫分表數(shù)據(jù)了。
二、測試預(yù)期
本文的案例會把MySQL,2庫4表的數(shù)據(jù),通過 Sharding 分庫分表寫入數(shù)據(jù)后,同步到 Elasticsearch。分庫分表如下(環(huán)境安裝中會自動安裝數(shù)據(jù)庫和設(shè)置庫表);
三、環(huán)境安裝
為了讓讀者伙伴更加簡單的學(xué)習(xí)到這一項方案技能,小傅哥這里把所需的環(huán)境都配置成一整套的 docker compose 腳本文件(ARM、AMD),你只要執(zhí)行安裝即可。安全前注意,無論是本機還是云服務(wù)器都需要安裝 docker-ce
1. 環(huán)境腳本
-
- 打開 xfg-dev-tech-canal 工程,下面就是 docker compose 的執(zhí)行腳本。mac/windows 如果安裝了 docker 可以直接點擊如圖的三角號安裝。如果是在 Linux 安裝了 docker 可以把 dev-ops 整個文件夾都上傳到云服務(wù)器,之后通過腳本;
docker-compose -f xfg-dev-tech-canal-docker-compose.yml up -d
- 進(jìn)行安裝。
1.1 開啟 binlog
mysql 數(shù)據(jù)同步需要創(chuàng)建一個 canal 的賬戶,之后還需要開啟 binlog 日志
- 在 mysql 配置文件夾中,設(shè)置了初始化授權(quán)的賬戶、導(dǎo)入的庫表,以及開啟 mysql-bin 和配置要采集的庫。如果你有配置自己其他的庫要同步也可以如此配置。
1.2 庫表采集配置
- 本文選擇的是 es 同步方式,所以需要在 canal-adapter 中 es7 文件夾添加同步的庫表 yml 配置。以及在 application.yml 中配置出需要鏈接的庫表以及同步的目標(biāo)地址,也就是 es 的地址?!疽驗楸疚牡陌咐窃谕粋€ docker compose 下安裝,所以直接用名稱 elsticsearch 即可訪問】
2. 運行狀態(tài)
- 安裝完成后可以進(jìn)入 portainer 查看各個組件的運行,如果有哪個運行失敗了,可以點擊那個小文件的圖標(biāo),它可以查看日志。
3. 創(chuàng)建索引
在 doc/dev-ops/curl 下提供了創(chuàng)建 Elasticsearch 的腳本;你可以點擊執(zhí)行或者直接復(fù)制執(zhí)行,也可以復(fù)制導(dǎo)入到 ApiPost 里執(zhí)行。
以上這些腳本是為了創(chuàng)建出數(shù)據(jù)庫表同步到 Elasticsearch 后對應(yīng)的索引和映射的字段。文章下面會用到。
3.1 創(chuàng)建
curl?-X?PUT?"127.0.0.1:9200/xfg_dev_tech.user_order"?-H?'Content-Type:?application/json'?-d'
{
????"mappings":?{
??????"properties":?{
????????"_user_id":{"type":?"text"},
????????"_user_name":{"type":?"text"},
????????"_order_id":{"type":?"text"},
????????"_uuid":{"type":?"text"},
????????"_create_time":{"type":?"date"},
????????"_update_time":{"type":?"date"}
??????}
????}
}'
3.2 添加
curl?-X?PUT?"127.0.0.1:9200/xfg_dev_tech.user_order/_mapping"?-H?'Content-Type:?application/json'?-d'
{
??"properties":?{
????"_sku_name":?{
??????"type":?"text"
????}
??}
}'
3.3 刪除
curl?-X?DELETE?"127.0.0.1:9200/xfg_dev_tech.user_order"
4. 創(chuàng)建索引(Kibana)
4.1 索引管理
地址:http://127.0.0.1:5601/app/management/kibana/indexPatterns
- 填寫完,點擊創(chuàng)建索引模式即可。
4.2 數(shù)據(jù)頁面
地址:http://127.0.0.1:5601/app/discover
- 等后面同步數(shù)據(jù)了以后,直接在這里點刷新就可以看到了。
5. 許可證
kibana 提供了免費30天的試用許可,安裝后可以使用 x-pack-sql-jdbc。它的好處是可以讓我們通過 MyBatis 的方式查詢 Elasticsearch 數(shù)據(jù)。
地址:http://127.0.0.1:5601/app/management/stack/license_management
Elasticsearch 提供了 x-pack-sql-jdbc,讓對 Elasticsearch 的查詢也可以像使用 MySQL 數(shù)據(jù)庫一樣通過 MyBatis 進(jìn)行查詢。但這個 x-pack-sql-jdbc 是付費的,免費可以使用 30 天。之后你可以選擇使用重新安裝,破解,或者使用 Elasticsearch 的查詢方式。還可以自己開發(fā)一個 Elasticsearch JDBC,GitHub 上也有類似的組件。
使用時需要引入 POM 配置;
<!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/x-pack-sql-jdbc -->
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>x-pack-sql-jdbc</artifactId>
<version>7.17.14</version>
</dependency>
三、工程配置
本節(jié)涉及到了簡明教程中所講解的 Sharding 分庫分表的使用,因為我們需要把分庫分表的數(shù)據(jù)通過 canal 同步到 Elasticsearch。(也可以使用其他分庫分表組件)
在工程中配置一套 Sharding 分庫分表映射的 MyBatis MyBatis,在配置一套 Elasticsearch x-pack-sql-jdbc 數(shù)據(jù)源映射的 MyBatis Mapper。這樣可以讀寫分別走自己設(shè)定好的 Mapper 對象了。
1. 創(chuàng)建數(shù)據(jù)源
@Configuration
public?class?DataSourceConfig?{
????@Configuration
????@MapperScan(basePackages?=?"cn.bugstack.xfg.dev.tech.infrastructure.dao.elasticsearch",?sqlSessionFactoryRef?=?"elasticsearchSqlSessionFactory")
????static?class?ElasticsearchMyBatisConfig?{
????????@Bean("elasticsearchDataSource")
????????@ConfigurationProperties(prefix?=?"spring.elasticsearch.datasource")
????????public?DataSource?igniteDataSource(Environment?environment)?{
????????????return?new?EsDataSource();
????????}
????????@Bean("elasticsearchSqlSessionFactory")
????????public?SqlSessionFactory?elasticsearchSqlSessionFactory(DataSource?elasticsearchDataSource)?throws?Exception?{
????????????SqlSessionFactoryBean?factoryBean?=?new?SqlSessionFactoryBean();
????????????factoryBean.setDataSource(elasticsearchDataSource);
????????????factoryBean.setMapperLocations(new?PathMatchingResourcePatternResolver().getResources("classpath:/mybatis/mapper/elasticsearch/*.xml"));
????????????return?factoryBean.getObject();
????????}
????}
????@Configuration
????@MapperScan(basePackages?=?"cn.bugstack.xfg.dev.tech.infrastructure.dao.mysql",?sqlSessionFactoryRef?=?"mysqlSqlSessionFactory")
????static?class?MysqlMyBatisConfig?{
????????@Bean("mysqlDataSource")
????????@ConfigurationProperties(prefix?=?"spring.mysql.datasource")
????????public?DataSource?mysqlDataSource(Environment?environment)?{
????????????return?DataSourceBuilder.create()
????????????????????.url(environment.getProperty("spring.mysql.datasource.url"))
????????????????????.driverClassName(environment.getProperty("spring.mysql.datasource.driver-class-name"))
????????????????????.build();
????????}
????????@Bean("mysqlSqlSessionFactory")
????????public?SqlSessionFactory?mysqlSqlSessionFactory(DataSource?mysqlDataSource)?throws?Exception?{
????????????SqlSessionFactoryBean?factoryBean?=?new?SqlSessionFactoryBean();
????????????factoryBean.setDataSource(mysqlDataSource);
????????????factoryBean.setMapperLocations(new?PathMatchingResourcePatternResolver().getResources("classpath:/mybatis/mapper/mysql/*.xml"));
????????????return?factoryBean.getObject();
????????}
????}
}
- ElasticsearchMyBatisConfig 使用 EsDataSource 創(chuàng)建數(shù)據(jù)源和映射 MyBatis Mapper 文件。MysqlMyBatisConfig 使用 DataSourceBuilder 創(chuàng)建 Sharding 提供的數(shù)據(jù)源和映射 MyBatis Mapper 文件。
2. Mapper 映射
<?xml?version="1.0"?encoding="UTF-8"?>
<!DOCTYPE?mapper?PUBLIC?"-//mybatis.org//DTD?Mapper?3.0//EN"?"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper?namespace="cn.bugstack.xfg.dev.tech.infrastructure.dao.elasticsearch.IElasticsearchUserOrderDao">
????<resultMap?id="dataMap"?type="cn.bugstack.xfg.dev.tech.infrastructure.po.UserOrderPO">
????????<result?column="_user_id"?property="userId"/>
????????<result?column="_user_name"?property="userName"/>
????????<result?column="_order_id"?property="orderId"/>
????????<result?column="_sku_name"?property="skuName"/>
????????<result?column="_update_time"?property="updateTime"/>
????????<result?column="_create_time"?property="createTime"/>
????</resultMap>
????<select?id="selectByUserId"?resultMap="dataMap">
????????select?_user_id,?_user_name,?_order_id,?_sku_name
????????from?"xfg_dev_tech.user_order"
????????order?by?_update_time
????????limit?10
????</select>
</mapper>
這個是 Elasticsearch 映射的 Mapper 文件,映射的字段就是前面安裝環(huán)境的時候設(shè)置的索引和字段?,F(xiàn)在你使用 Elasticsearch 就不用在工程中硬編碼查詢語句了,變得非常方便。
四、工程測試
1. 寫入數(shù)據(jù)
@Test
public?void?test_insert()?throws?InterruptedException?{
????for?(int?i?=?0;?i?<?3;?i++)?{
????????UserOrderPO?userOrderPO?=?UserOrderPO.builder()
????????????????.userName("小傅哥")
????????????????.userId("xfg_"?+?RandomStringUtils.randomAlphabetic(6))
????????????????.userMobile("+86?13521408***")
????????????????.sku("13811216")
????????????????.skuName("《手寫MyBatis:漸進(jìn)式源碼實踐》")
????????????????.orderId(RandomStringUtils.randomNumeric(11))
????????????????.quantity(1)
????????????????.unitPrice(BigDecimal.valueOf(128))
????????????????.discountAmount(BigDecimal.valueOf(50))
????????????????.tax(BigDecimal.ZERO)
????????????????.totalAmount(BigDecimal.valueOf(78))
????????????????.orderDate(new?Date())
????????????????.orderStatus(0)
????????????????.isDelete(0)
????????????????.uuid(UUID.randomUUID().toString().replace("-",?""))
????????????????.ipv4("127.0.0.1")
????????????????.ipv6("2001:0db8:85a3:0000:0000:8a2e:0370:7334".getBytes())
????????????????.extData("{"device":?{"machine":?"IPhone?14?Pro",?"location":?"shanghai"}}")
????????????????.build();
????????userOrderDao.insert(userOrderPO);
????????Thread.sleep(100);
????}
}
- 循環(huán)插入3條數(shù)據(jù),按需你可以設(shè)置更多條數(shù)據(jù)。這里的用戶編號 userId 是隨機的,也是切分鍵的 ID,所以會在不同的庫表寫入數(shù)據(jù)。
2. 數(shù)據(jù)驗證
- MySQL:http://127.0.0.1:8899/ docker compose 配置的管理后臺,可以 root/123456 登錄Kibana:http://127.0.0.1:5601/app/discover 查詢寫入的數(shù)據(jù)。
3. 查詢數(shù)據(jù)
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public?class?UserOrderDaoTest?{
????@Resource
????private?IElasticsearchUserOrderDao?userOrderDao;
????@Test
????public?void?test()?{
????????List<UserOrderPO>?userOrderPOS?=?userOrderDao.selectByUserId();
????????log.info("測試結(jié)果:{}",?JSON.toJSONString(userOrderPOS));
????}
}
- 通過 Elasticsearch 走 x-pack-sql-jdbc 的方式再把數(shù)據(jù)查詢出來。
五、加入學(xué)習(xí)
小傅哥是一個大廠的架構(gòu)師,經(jīng)常會帶著伙伴們,卷這些實際場景中非常有必要的技術(shù)。也會帶著伙伴實戰(zhàn)項目,這些項目也都是來自于互聯(lián)網(wǎng)大廠中真實的業(yè)務(wù)場景,所有學(xué)習(xí)這樣的項目無論是實習(xí)、校招、社招,都是有非常強的競爭力。別人還在玩玩具,而你已經(jīng)漲能力!
這樣的項目學(xué)習(xí)在小傅哥星球「碼農(nóng)會鎖」有8個,每個都是從0到1開發(fā)并提供簡歷模板和面試題,并且還在繼續(xù)開發(fā),后續(xù)還將有更多!價格嘎嘎實惠,早點加入,早點提升自己。項目地址:https://gaga.plus