主页 > 人工智能  > 

【黑马点评优化】2-Canel实现多级缓存(Redis+Caffeine)同步

【黑马点评优化】2-Canel实现多级缓存(Redis+Caffeine)同步

【黑马点评优化】2-Canel实现多级缓存(Redis+Caffeine)同步 0 背景1 配置MySQL1.1 开启MySQL的binlog功能1.1.1 找到mysql配置文件my.ini的位置1.1.2 开启binlog 1.2 创建canal用户 2 下载配置canal2.1 canal 1.1.5下载2.2 配置canal2.3 启动canal2.4 测试 3 canal实现双写一致3.1 Redis操作封装3.2 编写监听器3.3 修改ShopServiceImpl.java3.4 测试 参考资料

0 背景

【黑马点评优化】之使用Caffeine+Redis实现应用级二层缓存_caffeine redis二级缓存-CSDN博客

当时使用Redis+Caffeine实现对商铺信息的应用层两级缓存。文章提到了两级缓存Redis+Caffeine可以解决缓存雪等问题也可以提高接口的性能,但是可能会出现缓存一致性问题。如果数据频繁的变更,可能会导致Redis和Caffeine数据不一致的问题。

为此,使用Canel来解决这一问题。

MySQL工作原理如下:

一句话总结(详细工作原理,可以查看下面的介绍)

模拟MySQL从库读取binlog实现数据变更监听。它支持数据过滤、转换,并能将变更数据推送到不同的下游系统,如消息队列和其他数据库。

我的相关软件版本 mysql:8.0.36 redis:6.2.6 canal:1.1.5

1 配置MySQL 1.1 开启MySQL的binlog功能 1.1.1 找到mysql配置文件my.ini的位置

开始的时候找不到mysql配置文件(my.ini)的位置

通过下列方法找到:

先连接mysql

mysql -h localhost -u root -p123456 (注意,有密码的话,才-p123456)

输入以下命令

show variables like 'datadir';

这样就找到了配置文件所在的位置

1.1.2 开启binlog

my.ini的最后几行加上以下内容

[mysqld] log-bin=mysql-bin binlog-format=ROW server-id=1 1.2 创建canal用户

DROP USER IF EXISTS 'canal'@'%';

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

但是由于mysql8.0之后 如果只设置了 % 的访问权限, 会导致localhost无法访问 所以 我们需要把当前权限更新为 localhost 再执行一遍 继续执行下述命令

select mysql;

update user set host = 'localhost' where user = 'canal' and host='%';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES; 查看所有用户访问权限结果如下:

-- 查看所有用户访问权限 SELECT DISTINCT CONCAT('User: ''',user,'''@''',host,''';') AS query FROM mysql.user;

自MySQL 8.0.3开始,身份验证插件默认使用caching_sha2_password

解决:修改canal用户对应的身份验证插件为mysql_native_password

因此,接着执行下列命令

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'password';

ALTER USER 'canal'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';

之后我们再次查看canal用户对应的身份验证插件,如下即修改成功

2 下载配置canal 2.1 canal 1.1.5下载

在这里我们选择1.1.15版本的canal,因为canal1.1.15版本以后的canal不兼容Java 1.8

Release v1.1.5 · alibaba/canal (github )

选择canal.deployer-1.1.5.tar.gz下载,之后解压到自己的软件目录下。

2.2 配置canal

进入解压后的Canal目录,找到conf目录下的example实例,通常情况下,你可以通过修改conf/example/instance.properties文件来配置Canal连接到MySQL的参数,主要配置项包括:

canal.instance.master.address:MySQL服务器地址和端口。 canal.instance.dbUsername和canal.instance.dbPassword:用于连接MySQL的用户名和密码。 canal.instance.connectionCharset:数据库的字符集,通常为UTF-8。 canal.instance.tsdb.enable:是否启用表结构历史记录功能,建议开启。

2.3 启动canal

当安装好canal的时候,在window中启动bat的时候有些问题,JDK17版本已经不持’PermSize=128m’

因此,删除start.bat中的这一行

之后,双击bin/start.bat运行即可。

结果如下:

2.4 测试

pom.xml中导入依赖

<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.7</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.7</version> </dependency>

之后,编写测试类如下:

import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal mon.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.protocol.Message; import org.jetbrains.annotations.NotNull; public class CanalTest { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmtryCount = 1200; while (emptyCount < totalEmtryCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(@NotNull List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(@NotNull List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }

运行后,能看到,监听到了数据库的变化

3 canal实现双写一致

导入依赖

<dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version> </dependency>

新建com.hmdp.cache.handler包

3.1 Redis操作封装

新建ShopRedisHandler类

package com.hmdp.cache.handler; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.benmanes.caffeine.cache.Cache; import com.hmdp.entity.Shop; import com.hmdp.service.IShopService; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.List; import static com.hmdp.utils.RedisConstants.CACHE_SHOP_KEY; @Component public class ShopRedisHandler implements InitializingBean { @Autowired private StringRedisTemplate redisTemplate; @Autowired private IShopService shopService; private static final ObjectMapper MAPPER = new ObjectMapper(); @Resource private Cache<String, Object> shopCache; // 缓存预热 @Override public void afterPropertiesSet() throws Exception { // 初始化缓存 // 1.查询商品信息 List<Shop> shopList = shopService.list(); // 2.放入缓存 for (Shop shop : shopList) { // 2.1.item序列化为JSON String json = MAPPER.writeValueAsString(shop); // 2.2 存入caffeind String key = CACHE_SHOP_KEY + shop.getId(); shopCache.put(key, shop); // 2.2.存入redis redisTemplate.opsForValue().set(key, json); } // // 3.查询商品库存信息 // List<ItemStock> stockList = stockService.list(); // // 4.放入缓存 // for (ItemStock stock : stockList) { // // 2.1.item序列化为JSON // String json = MAPPER.writeValueAsString(stock); // // 2.2.存入redis // redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json); // } } public void saveShop(Shop shop) { try { String json = MAPPER.writeValueAsString(shop); String key = CACHE_SHOP_KEY + shop.getId(); redisTemplate.opsForValue().set(key + shop.getId(), json); } catch (JsonProcessingException e) { throw new RuntimeException(e); } } public void deleteShopById(Long id) { String key = CACHE_SHOP_KEY + id; redisTemplate.delete(key + id); } } 3.2 编写监听器

新建ShopHandler类

通过实现EntryHandler<T>接口编写监听器,监听Canal消息。注意两点:

实现类通过@CanalTable("tb_item")指定监听的表信息EntryHandler的泛型是与表对应的实体类 package com.hmdp.cache.handler; import com.github.benmanes.caffeine.cache.Cache; import com.hmdp.entity.Shop; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import top.javatool.canal.client.annotation.CanalTable; import top.javatool.canal.client.handler.EntryHandler; import javax.annotation.Resource; import static com.hmdp.utils.RedisConstants.CACHE_SHOP_KEY; @CanalTable(value = "tb_shop") @Component public class ShopHandler implements EntryHandler<Shop>{ @Autowired private ShopRedisHandler redisHandler; @Resource private Cache<String, Object> shopCache; @Override public void insert(Shop shop) { // 写数据到JVM进程缓存 String key = CACHE_SHOP_KEY + shop.getId(); shopCache.put(key , shop); // 写数据到redis redisHandler.saveShop(shop); } @Override public void update(Shop before, Shop after) { // 写数据到JVM进程缓存 String key = CACHE_SHOP_KEY + after.getId(); shopCache.put(key, after); // 写数据到redis redisHandler.saveShop(after); } @Override public void delete(Shop shop) { // 删除数据到JVM进程缓存 String key = CACHE_SHOP_KEY + shop.getId(); shopCache.invalidate(key); // 删除数据到redis redisHandler.deleteShopById(shop.getId()); } } 3.3 修改ShopServiceImpl.java

修改ShopServiceImpl中的update方法。注释掉删除缓存的操作。

缓存更新由监听器完成。

@Override public Result update(Shop shop) { Long id = shop.getId(); if(id == null){ return Result.fail("店铺id不能为空"); } //1.更新数据库 updateById(shop); // @TODO 现在不再需要删除缓存了,由canal监听数据库的变化,然后更新缓存 //2.删除缓存 // stringRedisTemplate.delete(CACHE_SHOP_KEY + id); return Result.ok(); } 3.4 测试

接下来可以自行调断点测试,也可以运行项目,然后更改数据库,查看终端输出。

参考资料

mysql 8.0找不到my.ini配置文件解决方案_mysql80没有my.ini-CSDN博客

Docker整合canal 踩坑实录_com.alibaba.otter.canal.parse.exception.canalparse-CSDN博客

Canal 1.1.5 启动报错:caching_sha2_password Auth failed_canal启动出现canal.deployer-1.1.5.jar:na-CSDN博客

Canal启动和运行出现的问题_canal启动闪退-CSDN博客

标签:

【黑马点评优化】2-Canel实现多级缓存(Redis+Caffeine)同步由讯客互联人工智能栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“【黑马点评优化】2-Canel实现多级缓存(Redis+Caffeine)同步