上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点)

guduadmin13小时前

目录

一、数据同步

1.1、什么是数据同步

1.2、解决数据同步面临的问题

1.3、解决办法

1.3.1、同步调用

1.3.2、异步通知(推荐)

1.3.3、监听 binlog

1.3、基于 RabbitMQ 实现数据同步

1.3.1、需求

1.3.2、在“酒店搜索服务”中 声明 exchange、queue、routingKey,同时开启监听

1.3.3、在“酒店管理服务”中发布消息

1.3.4、启动微服务并测试


一、数据同步


1.1、什么是数据同步

我们知道 elasticsearch 的数据是来源于 数据库(比如 mysql).  当我们在写了代码将 mysql 中的数据导入 es 中,那么这次导入之后 mysql 的数据并不会一成不变,将来我们的业务中会有 crud,数据库中的数据就和有新增、修改、删除,那么 mysql 数据一定那发生变化, es 如果不跟着变化,就会出现问题.

比如在一个商城系统中,到了 双11 ,数据库中的商品的价格下降,而 es 还是老价格,那么用户搜索时看到的商品的价格还是没有变化,用户可能就得考虑换软件了~  

因此,我们要保证 mysql 数据变化的时候 es 也能跟着同步变化,这就是数据同步.

Ps:实际上不光是 es 存在数据同步问题,凡是涉及到数据库双写的情况,比如 redis 和 mysql,都会存在数据同步问题.

1.2、解决数据同步面临的问题

如果你现在是一个单体式的项目,所有业务都写在一个项目中,那就比较好办,无非就是在及逆行新增、修改、删除业务的时候,同时把 es 也一起更新就  ok.

但是如果我们是一个 微服务 架构的项目上,不同的业务往往会在不同的微服务上,比如 “商品数据管理业务” 和 “商品数据搜索业务” 肯定会在两个不同的微服务上,那么跨微服务的项目就没办法直接操作了.

1.3、解决办法

1.3.1、同步调用

假设我们现在有两个微服务,一个是 酒店数据管理服务,另一个是酒店数据搜索服务. 假设这两个服务之间互相不能访问对方的数据库,也就是说,酒店管理服务只能访问 mysql,而 酒店搜索服务 只能访问 es,这也符合 微服务 里的标准和规范.

这里有一种办法是同步调用,步骤如下:

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点),第1张

1.比如用户做新增操作时,首先把数据写到数据库里.

2. 数据库写完了以后紧接着调用 酒店搜索服务中的 “更新索引库” 的接口.

3. 更新 es.

最后更新完了 es 就把响应反馈给搜索服务,然后搜索服务才会把把响应反馈给  管理服务,然后再反馈给用户.  这整个过程依次执行,因此也叫同步调用.

缺陷:

1. 数据耦合,业务耦合:原本只是写数据库,写完就结束了,然后现在还需要再写完数据库的代码后面再加上 调用 “更新索引库” 接口的代码,而且这调用 这个接口的业务跟我新增业务显然没有关系啊,现在业务耦合在一起,将来必然也会影响性能.

2. 影响性能(耦合带来的问题):原本数据库写完了,比如耗时 50ms,但是现在写完数据库,你还得等待后台调用这个接口返回的响应,而这个接口又要等待 es 这里的响应,假如这里也耗时 50ms,那么总的耗时不就是这个三个步骤相加的,达到 100 ms.

3. 牵一发而动全身(耦合带来的问题):如果 步骤 2 和 步骤 3 任意一个位置出现了异常,就会导致整个业务也出现崩溃.

同步调用这么多问题,那么就需要考虑别的方案了.

1.3.2、异步通知(推荐)

这里就需要使用到 mq 来实现了,步骤如下:

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点),第2张

1. 当有人做新增操作时,先去写数据库.

2. 写完之后,不调用任何服务的接口,而是向 mq 发送一个消息,通知一下 其他服务:“我这里数据新增了啊~”,整个步骤到这里结束.

那么至于谁来监听这个消息,监听了以后做什么,跟我有关系吗?没关系,这样一来,业务的耦合就解除了;  至于其他服务耗时多少秒,跟我也没关系,我写完数据库,发完消息就结束了,因此性能也提升了;再者,就算其他服务出现异常了,跟我这里也没关系.

缺陷:

1. 这样一来,比较依赖 mq 消息的可靠性.

2. 引入新的中间件,实现的复杂度也会有一定的上升.

不过这些缺陷,跟同步调用比起来,算不了什么,因此这也是比较推荐的方案.

1.3.3、监听 binlog

mysql 默认情况下 binlog 时关闭的,一旦开启,那么每次 mysql 在做增删改的时候,都会记录相应的操作到 binlog 中.

那么可以使用类似于 canal 这样的中间件来监听 binlog,一旦发现变化,立马通知对应的微服务,这个时候就知道数据发生变更,就可以进行更新了.

优势:

这种方案他既不给任何中间件发消息,也不去调用任何接口,因此耦合度是最低的.

劣势:

1.开启 binlog,对 mysql 的压力就增加了.

2.引入新的中间件.

1.3、基于 RabbitMQ 实现数据同步

1.3.1、需求

现在有两个微服务:“酒店管理服务” 和 "酒店搜索服务"

用户操作 “酒店管理服务” 进行增删改数据、要求对 "酒店搜索服务" 中的 es 的数据也要完成相同的数据更改操作.

这里使用 MQ 的异步方式实现数据同步.

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点),第2张

1.3.2、在“酒店搜索服务”中 声明 exchange、queue、routingKey,同时开启监听

在 “酒店搜索服务” 中去声明一个 exchange,用来接收 增删改 的消息,接着声明两个队列即可,一个队列用来增改(这两用一个队列是因为在 es 中,增改可以使用同一个 DSL 语句实现),另一个用来删除(这里我是以 Bean 的方式注入到容器中了).

public class MqConstants {
    //主题交换机
    public static final String EXCHANGE_TOPIC = "hotel.topic";
    //增加 or 修改酒店 队列
    public static final String INSERT_QUEUE = "hotel.insert.queue";
    //删除酒店 队列
    public static final String DELETE_QUEUE = "hotel.delete.queue";
    //增加 or 修改酒店 routingKey
    public static final String INSERT_KEY = "hotel.insert.key";
    //删除酒店 routingKey
    public static final String DELETE_KEY = "hotel.delete.key";
}
@Configuration
public class MqConfig {
    @Bean
    public TopicExchange hotelTopicExchange() {
        return new TopicExchange(MqConstants.EXCHANGE_TOPIC, true, false);
    }
    @Bean
    public Queue hotelInsertQueue() {
        return new Queue(MqConstants.INSERT_QUEUE, true);
    }
    @Bean
    public Queue hotelDeleteQueue() {
        return new Queue(MqConstants.DELETE_QUEUE, true);
    }
    @Bean
    public Binding hotelInsertBinding() {
        return BindingBuilder.bind(hotelInsertQueue()).to(hotelTopicExchange()).with(MqConstants.INSERT_KEY);
    }
    @Bean
    public Binding hotelDeleteBinding() {
        return BindingBuilder.bind(hotelDeleteQueue()).to(hotelTopicExchange()).with(MqConstants.DELETE_KEY);
    }
}

Ps:此类(MqConfig)的包必须与启动类同级,否则声明交换机和队列失败.

最后就可以使用 @RabbitListener 监听 队列 了.

@Component
public class MqListener {
    @Autowired
    private IHotelService hotelService;
    @RabbitListener(queues = MqConstants.INSERT_QUEUE)
    public void HotelInsertOrUpdateListener(Long id) {
        hotelService.insertHotelById(id);
    }
    @RabbitListener(queues = MqConstants.DELETE_QUEUE)
    public void HotelDeleteListener(Long id) {
        hotelService.deleteHotelById(id);
    }
}

Ps:此类(MyListener,监听者类)上必须要有 @Component 注解(交由给 Spring 来管理),否则声明的交换机和队列无效.

1.3.3、在“酒店管理服务”中发布消息

酒店管理服务中,一旦用户进行酒店的 增删改,就会对数据库信息进行修改,然后将增删改的消息发布到 MQ 中.

Ps:这里不要发送 hotel 整体数据,太大可能会导致占满队列(Mq 是基于内存存储的,因此会设定队列上限),因此这里发送 id 即可.  es 这边拿到 id,就可进行相应的增删改.

    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){
        // 新增酒店
        hotelService.save(hotel);
        rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_TOPIC, MqConstants.INSERT_KEY, hotel.getId());
    }
    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
        //修改酒店信息
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);
        rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_TOPIC, MqConstants.INSERT_KEY, hotel.getId());
    }
    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        //删除酒店信息
        hotelService.removeById(id);
        rabbitTemplate.convertAndSend(MqConstants.EXCHANGE_TOPIC, MqConstants.DELETE_KEY, id);
    }

1.3.4、启动微服务并测试

a)在酒店管理页面中,修改 “7天连锁酒店(上海莘庄地铁站店)” 为 “7天连锁酒店(此处正在施工,请谨慎前往)”,如下.

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点),第4张

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点),第5张

b)在酒店搜索页面中,搜索 “施工” 关键词,就可以看到在 “酒店管理服务” 中更新的信息,已经同步到了 “酒店搜索服务” 中.

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点),第6张

ElasticSearch - 在 微服务项目 中基于 RabbitMQ 实现 ES 和 MySQL 数据异步同步(考点),第7张

网友评论

搜索
最新文章
热门文章
热门标签