目录
《黑马头条》SpringBoot+SpringCloud+ Nacos等企业级微服务架构项目_黑马头条项目_软工菜鸡的博客-CSDN博客
04自媒体文章-自动审核
1)自媒体文章自动审核流程
2)内容安全第三方接口
2.1)概述
2.2)准备工作
2.3)文本内容审核接口
2.4)图片审核接口
2.5)项目集成
3)app端文章保存接口
3.1)表结构说明
3.2)分布式id
分布式id-技术选型
3.3)思路分析
3.4)feign接口
4)自媒体文章自动审核功能实现
4.1)表结构说明
4.2)实现
4.3)单元测试
4.4)feign远程接口调用方式
4.5)服务降级处理
5)发布文章提交审核集成
5.1)同步调用与异步调用
5.2)Springboot集成异步线程调用
6)文章审核功能-综合测试
6.1)服务启动列表
6.2)测试情况列表
7)新需求-自管理敏感词
7.1)需求分析
7.2)敏感词-过滤
7.3)DFA实现原理
7.4)自管理敏感词集成到文章审核中
测试了一下 源码不能检测 标题的敏感词汇;加了个这: wmNews.getTitle()+
8)新需求-图片识别文字审核敏感词
8.1)需求分析
8.2)图片文字识别
8 .3)Tess4j案例
8.4)管理敏感词和图片文字识别集成到文章审核
9)文章详情-静态文件生成
9.1)思路分析
9.2)实现步骤
05延迟任务精准发布文章
1)文章定时发布
2)延迟任务概述
2.1)什么是延迟任务
2.2)技术对比
2.2.1)DelayQueue
2.2.2)RabbitMQ实现延迟任务
2.2.3)redis实现
3)redis实现延迟任务
锐评:完全为了学list zset而编出来的场景,实际工作中延迟队列要设计成这样只能说太蠢了
4)延迟任务服务实现
4.1)搭建heima-leadnews-schedule模块
4.2)数据库准备
乐观锁/悲观锁
4.3)安装redis
4.4)项目集成redis
4.5)添加任务
4.6)取消任务
4.7)消费任务
4.8)未来数据定时刷新
4.8.1)reids key值匹配
4.8.2)reids管道
4.8.3)未来数据定时刷新-功能完成
4.9)分布式锁解决集群下的方法抢占执行
4.9.1)问题描述
4.9.2)分布式锁
4.9.3)redis分布式锁
4.9.4)在工具类CacheService中添加方法
4.10)数据库同步到redis
5)延迟队列解决精准时间发布文章
5.1)延迟队列服务提供对外接口
5.2)发布文章集成添加延迟队列接口
序列化工具对比
5.3)消费任务进行审核文章
06kafka及异步通知文章上下架
1)自媒体文章上下架
2)kafka概述
消息中间件对比-选择建议
kafka介绍-名词解释
3)kafka安装配置
4)kafka入门
分区机制—topic剖析
5)kafka高可用设计
5.1)集群
5.2)备份机制(Replication)
6)kafka生产者详解
6.1)发送类型
6.2)参数详解
ack确认机制
retries 重试次数
消息压缩
7)kafka消费者详解
7.1)消费者组
7.2)消息有序性
7.3)提交和偏移量
1.提交当前偏移量(同步提交)
2.异步提交
3.同步和异步组合提交
8)springboot集成kafka
8.1)入门
8.2)传递消息为对象
9)自媒体文章上下架功能完成
9.1)需求分析
9.2)流程说明
9.3)接口定义
9.4)自媒体文章上下架-功能实现
9.5)消息通知article端文章上下架
04自媒体文章-自动审核
1)自媒体文章自动审核流程
1 自媒体端发布文章后,开始审核文章2 审核的主要是审核文章的 内容(文本内容和图片)
3 借助 第三方提供的接口审核文本
4 借助第三方提供的接口审核图片,由于图片存储到minIO中,需要先下载才能审核
5 如果审核失败,则需要修改自媒体文章的状态,status:2 审核失败 status:3 转到人工审核
6 如果审核成功,则需要在文章微服务中创建app端需要的文章
2)内容安全第三方接口
2.1)概述
内容安全是识别服务,支持对图片、视频、文本、语音等对象多样化场景检测,有效降低内容违规风险
目前很多平台都支持内容检测,如阿里云、腾讯云、百度AI、网易云等国内大型互联网公司都对外提供了API。
按照性能和收费来看,黑马头条项目使用的就是阿里云的内容安全接口,使用到了图片和文本的审核。
阿里云收费标准:https://www.aliyun.com/price/product/?spm=a2c4g.11186623.2.10.4146401eg5oeu8#/lvwang/detail
2.2)准备工作
您在使用内容检测API之前,需要先注册阿里云账号,添加Access Key并签约云盾内容安全。
操作步骤
-
前往阿里云官网注册账号。如果已有注册账号,请跳过此步骤。
进入阿里云首页后,如果没有阿里云的账户需要先进行注册,才可以进行登录。由于注册较为简单,课程和讲义不在进行体现(注册可以使用多种方式,如淘宝账号、支付宝账号、微博账号等...)。
需要实名认证和活体认证。
-
打开云盾内容安全产品试用页面,单击立即开通,正式开通服务。
内容安全控制台
-
在AccessKey管理页面管理您的AccessKeyID和AccessKeySecret。
管理自己的AccessKey,可以新建和删除AccessKey
查看自己的AccessKey,
AccessKey默认是隐藏的,第一次申请的时候可以保存AccessKey,点击显示,通过验证手机号后也可以查看
2.3)文本内容审核接口
文本垃圾内容检测:如何调用文本检测接口进行文本内容审核_内容安全-阿里云帮助中心
文本垃圾内容Java SDK: 如何使用JavaSDK文本反垃圾接口_内容安全-阿里云帮助中心
2.4)图片审核接口
图片垃圾内容检测:调用图片同步检测接口/green/image/scan审核图片内容_内容安全-阿里云帮助中心
图片垃圾内容Java SDK: 如何使用JavaSDK接口检测图片是否包含风险内容_内容安全-阿里云帮助中心
2.5)项目集成
①:拷贝资料文件夹中的类到common模块下面,并添加到自动配置
包括了GreenImageScan和GreenTextScan及对应的工具类
添加到自动配置中
②: accessKeyId和secret(需自己申请)
在heima-leadnews-wemedia中的nacos配置中心添加以下配置:
aliyun: accessKeyId: ... secret: ... #aliyun.scenes=porn,terrorism,ad,qrcode,live,logo scenes: terrorism
③:在自媒体微服务中测试类中注入审核文本和图片的bean进行测试
package com.heima.wemedia; import java.util.Arrays; import java.util.Map; @SpringBootTest(classes = WemediaApplication.class) @RunWith(SpringRunner.class) public class AliyunTest { @Autowired private GreenTextScan greenTextScan; @Autowired private GreenImageScan greenImageScan; @Autowired private FileStorageService fileStorageService; @Test public void testScanText() throws Exception { Map map = greenTextScan.greeTextScan("我是一个好人,冰毒"); System.out.println(map); } @Test public void testScanImage() throws Exception { byte[] bytes = fileStorageService.downLoadFile("http://192.168.200.130:9000/leadnews/2021/04/26/ef3cbe458db249f7bd6fb4339e593e55.jpg"); Map map = greenImageScan.imageScan(Arrays.asList(bytes)); System.out.println(map); } }
我用的是 阿里云 云安全 增强版1小时,没审核出效果为null;估计是阿里 改接口了;
图片审核页报错
java.lang.RuntimeException: upload file fail. at com.heima.common.aliyun.util.ClientUploader.uploadBytes(ClientUploader.java:129) at com.heima.common.aliyun.GreenImageScan.imageScan(GreenImageScan.java:71) at com.heima.wemedia.test.AliyunTest.testScanImage(AliyunTest.java:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74) at org.springframework.test.context.junit4.statements.RunAfterTestExecutionC
3)app端文章保存接口
3.1)表结构说明
3.2)分布式id
随着业务的增长,文章表可能要占用很大的物理存储空间,为了解决该问题,后期使用数据库分片技术。将一个数据库进行拆分,通过数据库中间件连接。如果数据库中该表选用ID自增策略,则可能产生重复的ID,此时应该使用分布式ID生成策略来生成ID。
分布式id-技术选型
snowflake是Twitter开源的分布式ID生成算法,结果是一个long型的ID。
其核心思想是:使用41bit作为毫秒数,10bit作为机器的ID(5个bit是数据中心,5个bit的机器ID)(最多32个机房*32台机器(也可以自己设)),12bit作为毫秒内的流水号(意味着每个节点在每毫秒可以产生 4096 个 ID),最后还有一个符号位,永远是0(1为负数)
文章端相关的表都使用雪花算法生成id,包括ap_article、 ap_article_config、 ap_article_content
mybatis-plus已经集成了雪花算法,完成以下两步即可在项目中集成雪花算法
第一:在实体类中的id上加入如下配置,指定类型为id_worker
@TableId(value = "id",type = IdType.ID_WORKER) private Long id;
第二:在application.yml文件中配置数据中心id和机器id
mybatis-plus: mapper-locations: classpath*:mapper/*.xml # 设置别名包扫描路径,通过该属性可以给包中的类注册别名 type-aliases-package: com.heima.model.article.pojos global-config: datacenter-id: 1 workerId: 1
datacenter-id:数据中心id(取值范围:0-31) ;workerId:机器id(取值范围:0-31)
3.3)思路分析
在文章审核成功以后需要在app的article库中新增文章数据
1.保存文章信息 ap_article
2.保存文章配置信息 ap_article_config
3.保存文章内容 ap_article_content
实现思路:
3.4)feign接口
ArticleDto
package com.heima.model.article.dtos; import com.heima.model.article.pojos.ApArticle; import lombok.Data; @Data public class ArticleDto extends ApArticle { /** * 文章内容 */ private String content; }
功能实现:
①:在heima-leadnews-feign-api中新增接口
第一:线导入feign的依赖
org.springframework.cloud spring-cloud-starter-openfeign
第二:定义文章端的接口
package com.heima.apis.article; import org.springframework.web.bind.annotation.RequestBody; @FeignClient(value = "leadnews-article") public interface IArticleClient { @PostMapping("/api/v1/article/save") public ResponseResult saveArticle(@RequestBody ArticleDto dto) ; }
②:在heima-leadnews-article中实现该方法
package com.heima.article.feign; import java.io.IOException; @RestController public class ArticleClient implements IArticleClient { @Autowired private ApArticleService apArticleService; @Override @PostMapping("/api/v1/article/save") public ResponseResult saveArticle(@RequestBody ArticleDto dto) { return apArticleService.saveArticle(dto); } }
③:拷贝mapper
在资料文件夹中拷贝ApArticleConfigMapper类到mapper文件夹中
同时,修改ApArticleConfig类,添加如下构造函数
package com.heima.model.article.pojos; import java.io.Serializable; /** ** APP已发布文章配置表 *
* * @author itheima */ @Data @NoArgsConstructor @TableName("ap_article_config") public class ApArticleConfig implements Serializable { public ApArticleConfig(Long articleId){ this.articleId = articleId; this.isComment = true; this.isForward = true; this.isDelete = false; this.isDown = false; } @TableId(value = "id",type = IdType.ID_WORKER) private Long id; /** * 文章id */ @TableField("article_id") private Long articleId; /** * 是否可评论 * true: 可以评论 1 * false: 不可评论 0 */ @TableField("is_comment") private Boolean isComment; /** * 是否转发 * true: 可以转发 1 * false: 不可转发 0 */ @TableField("is_forward") private Boolean isForward; /** * 是否下架 * true: 下架 1 * false: 没有下架 0 */ @TableField("is_down") private Boolean isDown; /** * 是否已删除 * true: 删除 1 * false: 没有删除 0 */ @TableField("is_delete") private Boolean isDelete; }
④:在ApArticleService中新增方法
/** * 保存app端相关文章 * @param dto * @return */ ResponseResult saveArticle(ArticleDto dto) ;
实现类:
@Autowired private ApArticleConfigMapper apArticleConfigMapper; @Autowired private ApArticleContentMapper apArticleContentMapper; /** * 保存app端相关文章 * @param dto * @return */ @Override public ResponseResult saveArticle(ArticleDto dto) { //1.检查参数 if(dto == null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } ApArticle apArticle = new ApArticle(); BeanUtils.copyProperties(dto,apArticle); //2.判断是否存在id if(dto.getId() == null){ //2.1 不存在id 保存 文章 文章配置 文章内容 //保存文章 save(apArticle); //保存配置 ApArticleConfig apArticleConfig = new ApArticleConfig(apArticle.getId()); apArticleConfigMapper.insert(apArticleConfig); //保存 文章内容 ApArticleContent apArticleContent = new ApArticleContent(); apArticleContent.setArticleId(apArticle.getId()); apArticleContent.setContent(dto.getContent()); apArticleContentMapper.insert(apArticleContent); }else { //2.2 存在id 修改 文章 文章内容 //修改 文章 updateById(apArticle); //修改文章内容 ApArticleContent apArticleContent = apArticleContentMapper.selectOne(Wrappers.lambdaQuery().eq(ApArticleContent::getArticleId, dto.getId())); apArticleContent.setContent(dto.getContent()); apArticleContentMapper.updateById(apArticleContent); } //3.结果返回 文章的id return ResponseResult.okResult(apArticle.getId()); }
⑤:测试
编写junit单元测试,或使用postman进行测试
http://localhost:51802/api/v1/article/save
{ "id":这个id要去数据库自己找 , "title":"黑马头条项目背景22222222222222", "authoId":1102, "layout":1, "labels":"黑马头条", "publishTime":"2028-03-14T11:35:49.000Z", "images": "http://192.168.200.130:9000/leadnews/2021/04/26/5ddbdb5c68094ce393b08a47860da275.jpg", "content":"22222222222222222黑马头条项目背景,黑马头条项目背景,黑马头条项目背景,黑马头条项目背景,黑马头条项目背景" }
4)自媒体文章自动审核功能实现
4.1)表结构说明
wm_news 自媒体文章表
status字段:0 草稿 1 待审核 2 审核失败 3 人工审核 4 人工审核通过 8 审核通过(待发布) 9 已发布
4.2)实现
在heima-leadnews-wemedia中的service新增接口
package com.heima.wemedia.service; public interface WmNewsAutoScanService { /** * 自媒体文章审核 * @param id 自媒体文章id */ public void autoScanWmNews(Integer id); }
实现类:
package com.heima.wemedia.service.impl; import java.util.*; import java.util.stream.Collectors; @Service @Slf4j @Transactional public class WmNewsAutoScanServiceImpl implements WmNewsAutoScanService { @Autowired private WmNewsMapper wmNewsMapper; /** * 自媒体文章审核 * * @param id 自媒体文章id */ @Override public void autoScanWmNews(Integer id) { //1.查询自媒体文章 WmNews wmNews = wmNewsMapper.selectById(id); if(wmNews == null){ throw new RuntimeException("WmNewsAutoScanServiceImpl-文章不存在"); } if(wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())){ //从内容中提取纯文本内容和图片 MaptextAndImages = handleTextAndImages(wmNews); //2.审核文本内容 阿里云接口 boolean isTextScan = handleTextScan((String) textAndImages.get("content"),wmNews); if(!isTextScan)return; //3.审核图片 阿里云接口 boolean isImageScan = handleImageScan((List ) textAndImages.get("images"),wmNews); if(!isImageScan)return; //4.审核成功,保存app端的相关的文章数据 ResponseResult responseResult = saveAppArticle(wmNews); if(!responseResult.getCode().equals(200)){ throw new RuntimeException("WmNewsAutoScanServiceImpl-文章审核,保存app端相关文章数据失败"); } //回填article_id wmNews.setArticleId((Long) responseResult.getData()); updateWmNews(wmNews,(short) 9,"审核成功"); } } @Autowired private IArticleClient articleClient; @Autowired private WmChannelMapper wmChannelMapper; @Autowired private WmUserMapper wmUserMapper; /** * 保存app端相关的文章数据 * @param wmNews */ private ResponseResult saveAppArticle(WmNews wmNews) { ArticleDto dto = new ArticleDto(); //属性的拷贝 BeanUtils.copyProperties(wmNews,dto); //文章的布局 dto.setLayout(wmNews.getType()); //频道 WmChannel wmChannel = wmChannelMapper.selectById(wmNews.getChannelId()); if(wmChannel != null){ dto.setChannelName(wmChannel.getName()); } //作者 dto.setAuthorId(wmNews.getUserId().longValue()); WmUser wmUser = wmUserMapper.selectById(wmNews.getUserId()); if(wmUser != null){ dto.setAuthorName(wmUser.getName()); } //设置文章id if(wmNews.getArticleId() != null){ dto.setId(wmNews.getArticleId()); } dto.setCreatedTime(new Date()); ResponseResult responseResult = articleClient.saveArticle(dto); return responseResult; } @Autowired private FileStorageService fileStorageService; @Autowired private GreenImageScan greenImageScan; /** * 审核图片 * @param images * @param wmNews * @return */ private boolean handleImageScan(List images, WmNews wmNews) { boolean flag = true; if(images == null || images.size() == 0){ return flag; } //下载图片 minIO //图片去重 images = images.stream().distinct().collect(Collectors.toList()); List imageList = new ArrayList<>(); for (String image : images) { byte[] bytes = fileStorageService.downLoadFile(image); imageList.add(bytes); } //审核图片 try { Map map = greenImageScan.imageScan(imageList); if(map != null){ //审核失败 if(map.get("suggestion").equals("block")){ flag = false; updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容"); } //不确定信息 需要人工审核 if(map.get("suggestion").equals("review")){ flag = false; updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容"); } } } catch (Exception e) { flag = false; e.printStackTrace(); } return flag; } @Autowired private GreenTextScan greenTextScan; /** * 审核纯文本内容 * @param content * @param wmNews * @return */ private boolean handleTextScan(String content, WmNews wmNews) { boolean flag = true; if((wmNews.getTitle()+"-"+content).length() == 0){ return flag; } try { Map map = greenTextScan.greeTextScan((wmNews.getTitle()+"-"+content)); if(map != null){ //审核失败 if(map.get("suggestion").equals("block")){ flag = false; updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容"); } //不确定信息 需要人工审核 if(map.get("suggestion").equals("review")){ flag = false; updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容"); } } } catch (Exception e) { flag = false; e.printStackTrace(); } return flag; } /** * 修改文章内容 * @param wmNews * @param status * @param reason */ private void updateWmNews(WmNews wmNews, short status, String reason) { wmNews.setStatus(status); wmNews.setReason(reason); wmNewsMapper.updateById(wmNews); } /** * 1。从自媒体文章的内容中提取文本和图片 * 2.提取文章的封面图片 * @param wmNews * @return */ private Map handleTextAndImages(WmNews wmNews) { //存储纯文本内容 StringBuilder stringBuilder = new StringBuilder(); List images = new ArrayList<>(); //1。从自媒体文章的内容中提取文本和图片 if(StringUtils.isNotBlank(wmNews.getContent())){ List
4.3)单元测试
package com.heima.wemedia.service; import com.heima.wemedia.WemediaApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import static org.junit.Assert.*; @SpringBootTest(classes = WemediaApplication.class) @RunWith(SpringRunner.class) public class WmNewsAutoScanServiceTest { @Autowired private WmNewsAutoScanService wmNewsAutoScanService; @Test public void autoScanWmNews() { wmNewsAutoScanService.autoScanWmNews(6238); } }
4.4)feign远程接口调用方式
在heima-leadnews-wemedia服务中已经依赖了heima-leadnews-feign-apis工程,只需要在自媒体的引导类中开启feign的远程调用即可
注解为:@EnableFeignClients(basePackages = "com.heima.apis") 需要指向apis这个包
4.5)服务降级处理
-
服务降级是服务自我保护的一种方式,或者保护下游服务的一种方式,用于确保服务不会受请求突增影响变得不可用,确保服务不会崩溃
-
服务降级虽然会导致请求失败,但是不会导致阻塞。
实现步骤:
①:在heima-leadnews-feign-api编写降级逻辑
package com.heima.apis.article.fallback; import org.springframework.stereotype.Component; /** * feign失败配置 * @author itheima */ @Component public class IArticleClientFallback implements IArticleClient { @Override public ResponseResult saveArticle(ArticleDto dto) { return ResponseResult.errorResult(AppHttpCodeEnum.SERVER_ERROR,"获取数据失败"); } }
在自媒体微服务中添加类,扫描降级代码类的包
package com.heima.wemedia.config; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @Configuration @ComponentScan("com.heima.apis.article.fallback") public class InitConfig { }
②:远程接口中指向降级代码
package com.heima.apis.article; import org.springframework.web.bind.annotation.RequestBody; @FeignClient(value = "leadnews-article",fallback = IArticleClientFallback.class) public interface IArticleClient { @PostMapping("/api/v1/article/save") public ResponseResult saveArticle(@RequestBody ArticleDto dto); }
③:客户端开启降级heima-leadnews-wemedia
在wemedia的nacos配置中心里添加如下内容,开启服务降级,也可以指定服务响应的超时的时间
feign: # 开启feign对hystrix熔断降级的支持 hystrix: enabled: true # 修改调用超时时间 client: config: default: connectTimeout: 2000 readTimeout: 2000
④:测试
在ApArticleServiceImpl类中saveArticle方法添加代码
try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }
在自媒体端进行审核测试,会出现服务降级的现象
5)发布文章提交审核集成
5.1)同步调用与异步调用
同步:就是在发出一个调用时,在没有得到结果之前, 该调用就不返回(实时处理)
异步:调用在发出之后,这个调用就直接返回了,没有返回结果(分时处理)
异步线程的方式审核文章
5.2)Springboot集成异步线程调用
①:在自动审核的方法上加上@Async注解(标明要异步调用)
@Override @Async //标明当前方法是一个异步方法 public void autoScanWmNews(Integer id) { //代码略 }
②:在文章发布成功后调用审核的方法
@Autowired private WmNewsAutoScanService wmNewsAutoScanService; /** * 发布修改文章或保存为草稿 * @param dto * @return */ @Override public ResponseResult submitNews(WmNewsDto dto) { //代码略 //审核文章 wmNewsAutoScanService.autoScanWmNews(wmNews.getId()); return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); }
③:在自媒体引导类中使用@EnableAsync注解开启异步调用
@SpringBootApplication @EnableDiscoveryClient @MapperScan("com.heima.wemedia.mapper") @EnableFeignClients(basePackages = "com.heima.apis") @EnableAsync //开启异步调用 public class WemediaApplication { public static void main(String[] args) { SpringApplication.run(WemediaApplication.class,args); } @Bean public MybatisPlusInterceptor mybatisPlusInterceptor() { MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL)); return interceptor; } }
6)文章审核功能-综合测试
6.1)服务启动列表
1,nacos服务端
2,article微服务
3,wemedia微服务
4,启动wemedia网关微服务
5,启动前端系统wemedia
6.2)测试情况列表
1,自媒体前端发布一篇正常的文章
审核成功后,app端的article相关数据是否可以正常保存,自媒体文章状态和app端文章id是否回显
2,自媒体前端发布一篇包含敏感词的文章
正常是审核失败, wm_news表中的状态是否改变,成功和失败原因正常保存
3,自媒体前端发布一篇包含敏感图片的文章
正常是审核失败, wm_news表中的状态是否改变,成功和失败原因正常保存
7)新需求-自管理敏感词
7.1)需求分析
文章审核功能已经交付了,文章也能正常发布审核。突然,产品经理过来说要开会。
会议的内容核心有以下内容:
-
文章审核不能过滤一些敏感词:
私人侦探、针孔摄象、信用卡提现、广告代理、代开发票、刻章办、出售答案、小额贷款…
需要完成的功能:
需要自己维护一套敏感词,在文章审核的时候,需要验证文章是否包含这些敏感词
7.2)敏感词-过滤
技术选型
方案
说明
数据库模糊查询
效率太低
String.indexOf("")查找
数据库量大的话也是比较慢
全文检索
分词再匹配
DFA算法
确定有穷自动机(一种数据结构)
7.3)DFA实现原理
DFA全称为:Deterministic Finite Automaton,即确定有穷自动机。
存储:一次性的把所有的敏感词存储到了多个map中,就是下图表示这种结构
敏感词:冰毒、大麻、大坏蛋
检索的过程
7.4)自管理敏感词集成到文章审核中
①:创建敏感词表,导入资料中wm_sensitive到leadnews_wemedia库中
package com.heima.model.wemedia.pojos; import java.io.Serializable; import java.util.Date; /** *
* 敏感词信息表 *
* * @author itheima */ @Data @TableName("wm_sensitive") public class WmSensitive implements Serializable { private static final long serialVersionUID = 1L; /** * 主键 */ @TableId(value = "id", type = IdType.AUTO) private Integer id; /** * 敏感词 */ @TableField("sensitives") private String sensitives; /** * 创建时间 */ @TableField("created_time") private Date createdTime; }②:拷贝对应的wm_sensitive的mapper到项目中
package com.heima.wemedia.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.heima.model.wemedia.pojos.WmSensitive; import org.apache.ibatis.annotations.Mapper; @Mapper public interface WmSensitiveMapper extends BaseMapper
{ } ③:在文章审核的代码中添加自管理敏感词审核
第一:在WmNewsAutoScanServiceImpl中的autoScanWmNews方法上添加如下代码
//从内容中提取纯文本内容和图片 //.....省略 //自管理的敏感词过滤 boolean isSensitive = handleSensitiveScan((String) textAndImages.get("content"), wmNews); if(!isSensitive) return; //2.审核文本内容 阿里云接口 //.....省略
测试了一下 源码不能检测 标题的敏感词汇;加了个这: wmNews.getTitle()+
//自管理的敏感词过滤
boolean isSensitive = handleSensitiveScan(
wmNews.getTitle()+textAndImages.get("content"), wmNews);
新增自管理敏感词审核代码
@Autowired private WmSensitiveMapper wmSensitiveMapper; /** * 自管理的敏感词审核 * @param content * @param wmNews * @return */ private boolean handleSensitiveScan(String content, WmNews wmNews) { boolean flag = true; //获取所有的敏感词 List
wmSensitives = wmSensitiveMapper.selectList(Wrappers. lambdaQuery().select(WmSensitive::getSensitives)); List sensitiveList = wmSensitives.stream().map(WmSensitive::getSensitives).collect(Collectors.toList()); //初始化敏感词库 SensitiveWordUtil.initMap(sensitiveList); //查看文章中是否包含敏感词 Map map = SensitiveWordUtil.matchWords(content); if(map.size() >0){ updateWmNews(wmNews,(short) 2,"当前文章中存在违规内容"+map); flag = false; } return flag; } 8)新需求-图片识别文字审核敏感词
8.1)需求分析
产品经理召集开会,文章审核功能已经交付了,文章也能正常发布审核。对于上次提出的自管理敏感词也很满意,这次会议核心的内容如下:
-
文章中包含的图片要识别文字,过滤掉图片文字的敏感词
8.2)图片文字识别
什么是OCR?
OCR (Optical Character Recognition,光学字符识别)是指电子设备(例如扫描仪或数码相机)检查纸上打印的字符,通过检测暗、亮的模式确定其形状,然后用字符识别方法将形状翻译成计算机文字的过程
方案
说明
百度OCR
收费
Tesseract-OCR
Google维护的开源OCR引擎,支持Java,Python等语言调用
Tess4J
封装了Tesseract-OCR ,支持Java调用
8 .3)Tess4j案例
①:创建项目导入tess4j对应的依赖
net.sourceforge.tess4j tess4j4.1.1 ②:导入中文字体库, 把资料中的tessdata文件夹拷贝到自己的工作空间下
③:编写测试类进行测试
package com.heima.tess4j; import net.sourceforge.tess4j.ITesseract; import net.sourceforge.tess4j.Tesseract; import java.io.File; public class Application { public static void main(String[] args) { try { //获取本地图片 File file = new File("D:\26.png"); //创建Tesseract对象 ITesseract tesseract = new Tesseract(); //设置字体库路径 tesseract.setDatapath("D:\workspace\tessdata"); //中文识别 tesseract.setLanguage("chi_sim"); //执行ocr识别 String result = tesseract.doOCR(file); //替换回车和tal键 使结果为一行 result = result.replaceAll("\r|\n","-").replaceAll(" ",""); System.out.println("识别的结果为:"+result); } catch (Exception e) { e.printStackTrace(); } } }
8.4)管理敏感词和图片文字识别集成到文章审核
①:在heima-leadnews-common中创建工具类,简单封装一下tess4j
需要先导入pom
net.sourceforge.tess4j tess4j4.1.1 工具类
package com.heima.common.tess4j; import lombok.Getter; import lombok.Setter; import net.sourceforge.tess4j.ITesseract; import net.sourceforge.tess4j.Tesseract; import net.sourceforge.tess4j.TesseractException; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.awt.image.BufferedImage; @Getter @Setter @Component @ConfigurationProperties(prefix = "tess4j") public class Tess4jClient { private String dataPath; private String language; public String doOCR(BufferedImage image) throws TesseractException { //创建Tesseract对象 ITesseract tesseract = new Tesseract(); //设置字体库路径 tesseract.setDatapath(dataPath); //中文识别 tesseract.setLanguage(language); //执行ocr识别 String result = tesseract.doOCR(image); //替换回车和tal键 使结果为一行 result = result.replaceAll("\r|\n", "-").replaceAll(" ", ""); return result; } }
在spring.factories配置中添加该类,完整如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.heima.common.exception.ExceptionCatch,\ com.heima.common.swagger.SwaggerConfiguration,\ com.heima.common.swagger.Swagger2Configuration,\ com.heima.common.aliyun.GreenTextScan,\ com.heima.common.aliyun.GreenImageScan,\ com.heima.common.tess4j.Tess4jClient
②:在heima-leadnews-wemedia中的配置中添加两个属性
tess4j: data-path: D:\workspace\tessdata language: chi_sim
③:在WmNewsAutoScanServiceImpl中的handleImageScan方法上添加如下代码
try { for (String image : images) { byte[] bytes = fileStorageService.downLoadFile(image); //图片识别文字审核---begin----- //从byte[]转换为butteredImage ByteArrayInputStream in = new ByteArrayInputStream(bytes); BufferedImage imageFile = ImageIO.read(in); //识别图片的文字 String result = tess4jClient.doOCR(imageFile); //审核是否包含自管理的敏感词 boolean isSensitive = handleSensitiveScan(result, wmNews); if(!isSensitive){ return isSensitive; } //图片识别文字审核---end----- imageList.add(bytes); } }catch (Exception e){ e.printStackTrace(); }
最后附上文章审核的完整代码如下:
package com.heima.wemedia.service.impl; import com.alibaba.fastjson.JSONArray; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.heima.apis.article.IArticleClient; import com.heima.common.aliyun.GreenImageScan; import com.heima.common.aliyun.GreenTextScan; import com.heima.common.tess4j.Tess4jClient; import com.heima.file.service.FileStorageService; import com.heima.model.article.dtos.ArticleDto; import com.heima.model.common.dtos.ResponseResult; import com.heima.model.wemedia.pojos.WmChannel; import com.heima.model.wemedia.pojos.WmNews; import com.heima.model.wemedia.pojos.WmSensitive; import com.heima.model.wemedia.pojos.WmUser; import com.heima.utils.common.SensitiveWordUtil; import com.heima.wemedia.mapper.WmChannelMapper; import com.heima.wemedia.mapper.WmNewsMapper; import com.heima.wemedia.mapper.WmSensitiveMapper; import com.heima.wemedia.mapper.WmUserMapper; import com.heima.wemedia.service.WmNewsAutoScanService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.imageio.ImageIO; import java.awt.image.BufferedImage; import java.io.ByteArrayInputStream; import java.util.*; import java.util.stream.Collectors; @Service @Slf4j @Transactional public class WmNewsAutoScanServiceImpl implements WmNewsAutoScanService { @Autowired private WmNewsMapper wmNewsMapper; /** * 自媒体文章审核 * * @param id 自媒体文章id */ @Override @Async //标明当前方法是一个异步方法 public void autoScanWmNews(Integer id) { // int a = 1/0; //1.查询自媒体文章 WmNews wmNews = wmNewsMapper.selectById(id); if (wmNews == null) { throw new RuntimeException("WmNewsAutoScanServiceImpl-文章不存在"); } if (wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())) { //从内容中提取纯文本内容和图片 Map
textAndImages = handleTextAndImages(wmNews); //自管理的敏感词过滤 boolean isSensitive = handleSensitiveScan((String) textAndImages.get("content"), wmNews); if(!isSensitive) return; //2.审核文本内容 阿里云接口 boolean isTextScan = handleTextScan((String) textAndImages.get("content"), wmNews); if (!isTextScan) return; //3.审核图片 阿里云接口 boolean isImageScan = handleImageScan((List ) textAndImages.get("images"), wmNews); if (!isImageScan) return; //4.审核成功,保存app端的相关的文章数据 ResponseResult responseResult = saveAppArticle(wmNews); if (!responseResult.getCode().equals(200)) { throw new RuntimeException("WmNewsAutoScanServiceImpl-文章审核,保存app端相关文章数据失败"); } //回填article_id wmNews.setArticleId((Long) responseResult.getData()); updateWmNews(wmNews, (short) 9, "审核成功"); } } @Autowired private WmSensitiveMapper wmSensitiveMapper; /** * 自管理的敏感词审核 * @param content * @param wmNews * @return */ private boolean handleSensitiveScan(String content, WmNews wmNews) { boolean flag = true; //获取所有的敏感词 List wmSensitives = wmSensitiveMapper.selectList(Wrappers. lambdaQuery().select(WmSensitive::getSensitives)); List sensitiveList = wmSensitives.stream().map(WmSensitive::getSensitives).collect(Collectors.toList()); //初始化敏感词库 SensitiveWordUtil.initMap(sensitiveList); //查看文章中是否包含敏感词 Map map = SensitiveWordUtil.matchWords(content); if(map.size() >0){ updateWmNews(wmNews,(short) 2,"当前文章中存在违规内容"+map); flag = false; } return flag; } @Autowired private IArticleClient articleClient; @Autowired private WmChannelMapper wmChannelMapper; @Autowired private WmUserMapper wmUserMapper; /** * 保存app端相关的文章数据 * * @param wmNews */ private ResponseResult saveAppArticle(WmNews wmNews) { ArticleDto dto = new ArticleDto(); //属性的拷贝 BeanUtils.copyProperties(wmNews, dto); //文章的布局 dto.setLayout(wmNews.getType()); //频道 WmChannel wmChannel = wmChannelMapper.selectById(wmNews.getChannelId()); if (wmChannel != null) { dto.setChannelName(wmChannel.getName()); } //作者 dto.setAuthorId(wmNews.getUserId().longValue()); WmUser wmUser = wmUserMapper.selectById(wmNews.getUserId()); if (wmUser != null) { dto.setAuthorName(wmUser.getName()); } //设置文章id if (wmNews.getArticleId() != null) { dto.setId(wmNews.getArticleId()); } dto.setCreatedTime(new Date()); ResponseResult responseResult = articleClient.saveArticle(dto); return responseResult; } @Autowired private FileStorageService fileStorageService; @Autowired private GreenImageScan greenImageScan; @Autowired private Tess4jClient tess4jClient; /** * 审核图片 * * @param images * @param wmNews * @return */ private boolean handleImageScan(List images, WmNews wmNews) { boolean flag = true; if (images == null || images.size() == 0) { return flag; } //下载图片 minIO //图片去重 images = images.stream().distinct().collect(Collectors.toList()); List imageList = new ArrayList<>(); try { for (String image : images) { byte[] bytes = fileStorageService.downLoadFile(image); //图片识别文字审核---begin----- //从byte[]转换为butteredImage ByteArrayInputStream in = new ByteArrayInputStream(bytes); BufferedImage imageFile = ImageIO.read(in); //识别图片的文字 String result = tess4jClient.doOCR(imageFile); //审核是否包含自管理的敏感词 boolean isSensitive = handleSensitiveScan(result, wmNews); if(!isSensitive){ return isSensitive; } //图片识别文字审核---end----- imageList.add(bytes); } }catch (Exception e){ e.printStackTrace(); } //审核图片 try { Map map = greenImageScan.imageScan(imageList); if (map != null) { //审核失败 if (map.get("suggestion").equals("block")) { flag = false; updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容"); } //不确定信息 需要人工审核 if (map.get("suggestion").equals("review")) { flag = false; updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容"); } } } catch (Exception e) { flag = false; e.printStackTrace(); } return flag; } @Autowired private GreenTextScan greenTextScan; /** * 审核纯文本内容 * * @param content * @param wmNews * @return */ private boolean handleTextScan(String content, WmNews wmNews) { boolean flag = true; if ((wmNews.getTitle() + "-" + content).length() == 0) { return flag; } try { Map map = greenTextScan.greeTextScan((wmNews.getTitle() + "-" + content)); if (map != null) { //审核失败 if (map.get("suggestion").equals("block")) { flag = false; updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容"); } //不确定信息 需要人工审核 if (map.get("suggestion").equals("review")) { flag = false; updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容"); } } } catch (Exception e) { flag = false; e.printStackTrace(); } return flag; } /** * 修改文章内容 * * @param wmNews * @param status * @param reason */ private void updateWmNews(WmNews wmNews, short status, String reason) { wmNews.setStatus(status); wmNews.setReason(reason); wmNewsMapper.updateById(wmNews); } /** * 1。从自媒体文章的内容中提取文本和图片 * 2.提取文章的封面图片 * * @param wmNews * @return */ private Map handleTextAndImages(WmNews wmNews) { //存储纯文本内容 StringBuilder stringBuilder = new StringBuilder(); List images = new ArrayList<>(); //1。从自媒体文章的内容中提取文本和图片 if (StringUtils.isNotBlank(wmNews.getContent())) { List 9)文章详情-静态文件生成
9.1)思路分析
文章端创建app相关文章时,生成文章详情静态页上传到MinIO中
9.2)实现步骤
1.新建ArticleFreemarkerService创建静态文件并上传到minIO中
package com.heima.article.service; import com.heima.model.article.pojos.ApArticle; public interface ArticleFreemarkerService { /** * 生成静态文件上传到minIO中 * @param apArticle * @param content */ public void buildArticleToMinIO(ApArticle apArticle,String content); }
实现
package com.heima.article.service.impl; import java.util.Map; @Service @Slf4j @Transactional public class ArticleFreemarkerServiceImpl implements ArticleFreemarkerService { @Autowired private ApArticleContentMapper apArticleContentMapper; @Autowired private Configuration configuration; @Autowired private FileStorageService fileStorageService; @Autowired private ApArticleService apArticleService; /** * 生成静态文件上传到minIO中 * @param apArticle * @param content */ @Async @Override public void buildArticleToMinIO(ApArticle apArticle, String content) { //已知文章的id //4.1 获取文章内容 if(StringUtils.isNotBlank(content)){ //4.2 文章内容通过freemarker生成html文件 Template template = null; StringWriter out = new StringWriter(); try { template = configuration.getTemplate("article.ftl"); //数据模型 Map
contentDataModel = new HashMap<>(); contentDataModel.put("content", JSONArray.parseArray(content)); //合成 template.process(contentDataModel,out); } catch (Exception e) { e.printStackTrace(); } //4.3 把html文件上传到minio中 InputStream in = new ByteArrayInputStream(out.toString().getBytes()); String path = fileStorageService.uploadHtmlFile("", apArticle.getId() + ".html", in); //4.4 修改ap_article表,保存static_url字段 apArticleService.update(Wrappers.lambdaUpdate().eq(ApArticle::getId,apArticle.getId()) .set(ApArticle::getStaticUrl,path)); } } } 2.在ApArticleService的saveArticle实现方法中添加调用生成文件的方法
/** * 保存app端相关文章 * @param dto * @return */ @Override public ResponseResult saveArticle(ArticleDto dto) { // try { // Thread.sleep(3000); // } catch (InterruptedException e) { // e.printStackTrace(); // } //1.检查参数 if(dto == null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } ApArticle apArticle = new ApArticle(); BeanUtils.copyProperties(dto,apArticle); //2.判断是否存在id if(dto.getId() == null){ //2.1 不存在id 保存 文章 文章配置 文章内容 //保存文章 save(apArticle); //保存配置 ApArticleConfig apArticleConfig = new ApArticleConfig(apArticle.getId()); apArticleConfigMapper.insert(apArticleConfig); //保存 文章内容 ApArticleContent apArticleContent = new ApArticleContent(); apArticleContent.setArticleId(apArticle.getId()); apArticleContent.setContent(dto.getContent()); apArticleContentMapper.insert(apArticleContent); }else { //2.2 存在id 修改 文章 文章内容 //修改 文章 updateById(apArticle); //修改文章内容 ApArticleContent apArticleContent = apArticleContentMapper.selectOne(Wrappers.lambdaQuery().eq(ApArticleContent::getArticleId, dto.getId())); apArticleContent.setContent(dto.getContent()); apArticleContentMapper.updateById(apArticleContent); } //异步调用 生成静态文件上传到minio中 articleFreemarkerService.buildArticleToMinIO(apArticle,dto.getContent()); //3.结果返回 文章的id return ResponseResult.okResult(apArticle.getId()); }
3.文章微服务开启异步调用
05延迟任务精准发布文章
1)文章定时发布
2)延迟任务概述
2.1)什么是延迟任务
-
定时任务:有固定周期的,有明确的触发时间
-
延迟队列:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟
应用场景:
场景一:
订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消
场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止
2.2)技术对比
2.2.1)DelayQueue
JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素
DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法
getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。
compareTo方法:用于排序,确定元素出队列的顺序。
实现:
1:在测试包jdk下创建延迟任务元素对象DelayedTask,实现compareTo和getDelay方法,
2:在main方法中创建DelayQueue并向延迟队列中添加三个延迟任务,
3:循环的从延迟队列中拉取任务
public class DelayedTask implements Delayed{ // 任务的执行时间 private int executeTime = 0; public DelayedTask(int delay){ Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND,delay); this.executeTime = (int)(calendar.getTimeInMillis() /1000 ); } /** * 元素在队列中的剩余时间 * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { Calendar calendar = Calendar.getInstance(); return executeTime - (calendar.getTimeInMillis()/1000); } /** * 元素排序 * @param o * @return */ @Override public int compareTo(Delayed o) { long val = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return val == 0 ? 0 : ( val < 0 ? -1: 1 ); } public static void main(String[] args) { DelayQueue
queue = new DelayQueue (); queue.add(new DelayedTask(5)); queue.add(new DelayedTask(10)); queue.add(new DelayedTask(15)); System.out.println(System.currentTimeMillis()/1000+" start consume "); while(queue.size() != 0){ DelayedTask delayedTask = queue.poll(); if(delayedTask !=null ){ System.out.println(System.currentTimeMillis()/1000+" cosume task"); } //每隔一秒消费一次 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } DelayQueue实现完成之后思考一个问题:
使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,如何保证数据不丢失,需要持久化(磁盘)
2.2.2)RabbitMQ实现延迟任务
-
TTL:Time To Live (消息存活时间)
-
死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)
2.2.3)redis实现
zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序
3)redis实现延迟任务
实现思路
问题思路
1.为什么任务需要存储在数据库中?
延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。
2.为什么redis中使用两种数据类型,list和zset?
效率问题,算法的时间复杂度; list是双向链表
3.在添加zset数据的时候,为什么不需要预加载?
任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止zset阻塞,只需要把未来几分钟要执行的数据存入缓存即可。
锐评:完全为了学list zset而编出来的场景,实际工作中延迟队列要设计成这样只能说太蠢了
实际工作绝对用MQ
4)延迟任务服务实现
4.1)搭建heima-leadnews-schedule模块
leadnews-schedule是一个通用的服务,单独创建模块来管理任何类型的延迟任务
①:导入资料文件夹的heima-leadnews-schedule模块到heima-leadnews-service下,如下图所示:
②:添加bootstrap.yml
server: port: 51701 spring: application: name: leadnews-schedule cloud: nacos: discovery: server-addr: 192.168.200.130:8848 config: server-addr: 192.168.200.130:8848 file-extension: yml
③:在nacos中添加对应配置,并添加数据库及mybatis-plus的配置
spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC username: root password: root # 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置 mybatis-plus: mapper-locations: classpath*:mapper/*.xml # 设置别名包扫描路径,通过该属性可以给包中的类注册别名 type-aliases-package: com.heima.model.schedule.pojos
4.2)数据库准备
导入资料中leadnews_schedule数据库
taskinfo 任务表
实体类
package com.heima.model.schedule.pojos; import java.io.Serializable; import java.util.Date; /** *
* *
* * @author itheima */ @Data @TableName("taskinfo") public class Taskinfo implements Serializable { private static final long serialVersionUID = 1L; /** * 任务id */ @TableId(type = IdType.ID_WORKER) private Long taskId; /** * 执行时间 */ @TableField("execute_time") private Date executeTime; /** * 参数 */ @TableField("parameters") private byte[] parameters; /** * 优先级 */ @TableField("priority") private Integer priority; /** * 任务类型 */ @TableField("task_type") private Integer taskType; }taskinfo_logs 任务日志表
实体类
package com.heima.model.schedule.pojos; import java.io.Serializable; import java.util.Date; /** *
* *
* * @author itheima */ @Data @TableName("taskinfo_logs") public class TaskinfoLogs implements Serializable { private static final long serialVersionUID = 1L; /** * 任务id */ @TableId(type = IdType.ID_WORKER) private Long taskId; /** * 执行时间 */ @TableField("execute_time") private Date executeTime; /** * 参数 */ @TableField("parameters") private byte[] parameters; /** * 优先级 */ @TableField("priority") private Integer priority; /** * 任务类型 */ @TableField("task_type") private Integer taskType; /** * 版本号,用乐观锁 */ @Version private Integer version; /** * 状态 0=int 1=EXECUTED 2=CANCELLED */ @TableField("status") private Integer status; }乐观锁/悲观锁
悲观锁效率低;
乐观锁支持:
/** * mybatis-plus乐观锁支持 * @return */ @Bean public MybatisPlusInterceptor optimisticLockerInterceptor(){ MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor(); interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor()); return interceptor; }
4.3)安装redis
①拉取镜像
docker pull redis
② 创建容器
docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"
③链接测试
打开资料中的Redis Desktop Manager,输入host、port、password链接测试
能链接成功,即可
4.4)项目集成redis
① 在项目导入redis相关依赖,已经完成
org.springframework.boot spring-boot-starter-data-redisorg.apache.commons commons-pool2② 在heima-leadnews-schedule中集成redis,添加以下nacos配置,链接上redis
spring: redis: host: 192.168.200.130 password: leadnews port: 6379
③ 拷贝资料文件夹下的类:CacheService到heima-leadnews-common模块下,并添加自动配置
④:测试
package com.heima.schedule.test; import java.util.Set; @SpringBootTest(classes = ScheduleApplication.class) @RunWith(SpringRunner.class) public class RedisTest { @Autowired private CacheService cacheService; @Test public void testList(){ //在list的左边添加元素 // cacheService.lLeftPush("list_001","hello,redis"); //在list的右边获取元素,并删除 String list_001 = cacheService.lRightPop("list_001"); System.out.println(list_001); } @Test public void testZset(){ //添加数据到zset中 分值 /*cacheService.zAdd("zset_key_001","hello zset 001",1000); cacheService.zAdd("zset_key_001","hello zset 002",8888); cacheService.zAdd("zset_key_001","hello zset 003",7777); cacheService.zAdd("zset_key_001","hello zset 004",999999);*/ //按照分值获取数据 Set
zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888); System.out.println(zset_key_001); } } 4.5)添加任务
①:拷贝mybatis-plus生成的文件,mapper
②:创建task类,用于接收添加任务的参数
package com.heima.model.schedule.dtos; import lombok.Data; import java.io.Serializable; @Data public class Task implements Serializable { /** * 任务id */ private Long taskId; /** * 类型 */ private Integer taskType; /** * 优先级 */ private Integer priority; /** * 执行id */ private long executeTime; /** * task参数 */ private byte[] parameters; }
③:创建TaskService
package com.heima.schedule.service; import com.heima.model.schedule.dtos.Task; /** * 对外访问接口 */ public interface TaskService { /** * 添加任务 * @param task 任务对象 * @return 任务id */ public long addTask(Task task) ; }
实现:
package com.heima.schedule.service.impl; import java.util.Calendar; import java.util.Date; @Service @Transactional @Slf4j public class TaskServiceImpl implements TaskService { /** * 添加延迟任务 * * @param task * @return */ @Override public long addTask(Task task) { //1.添加任务到数据库中 boolean success = addTaskToDb(task); if (success) { //2.添加任务到redis addTaskToCache(task); } return task.getTaskId(); } @Autowired private CacheService cacheService; /** * 把任务添加到redis中 * * @param task */ private void addTaskToCache(Task task) { String key = task.getTaskType() + "_" + task.getPriority(); //获取5分钟之后的时间 毫秒值 Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); long nextScheduleTime = calendar.getTimeInMillis(); //2.1 如果任务的执行时间小于等于当前时间,存入list if (task.getExecuteTime() <= System.currentTimeMillis()) { cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task)); } else if (task.getExecuteTime() <= nextScheduleTime) { //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中 cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime()); } } @Autowired private TaskinfoMapper taskinfoMapper; @Autowired private TaskinfoLogsMapper taskinfoLogsMapper; /** * 添加任务到数据库中 * * @param task * @return */ private boolean addTaskToDb(Task task) { boolean flag = false; try { //保存任务表 Taskinfo taskinfo = new Taskinfo(); BeanUtils.copyProperties(task, taskinfo); taskinfo.setExecuteTime(new Date(task.getExecuteTime())); taskinfoMapper.insert(taskinfo); //设置taskID task.setTaskId(taskinfo.getTaskId()); //保存任务日志数据 TaskinfoLogs taskinfoLogs = new TaskinfoLogs(); BeanUtils.copyProperties(taskinfo, taskinfoLogs); taskinfoLogs.setVersion(1); taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED); taskinfoLogsMapper.insert(taskinfoLogs); flag = true; } catch (Exception e) { e.printStackTrace(); } return flag; } }
ScheduleConstants常量类
package com.heima.common.constants; public class ScheduleConstants { //task状态 public static final int SCHEDULED=0; //初始化状态 public static final int EXECUTED=1; //已执行状态 public static final int CANCELLED=2; //已取消状态 public static String FUTURE="future_"; //未来数据key前缀 public static String TOPIC="topic_"; //当前数据key前缀 }
④:测试
4.6)取消任务
在TaskService中添加方法
/** * 取消任务 * @param taskId 任务id * @return 取消结果 */ public boolean cancelTask(long taskId);
实现
/** * 取消任务 * @param taskId * @return */ @Override public boolean cancelTask(long taskId) { boolean flag = false; //删除任务,更新日志 Task task = updateDb(taskId,ScheduleConstants.EXECUTED); //删除redis的数据 if(task != null){ removeTaskFromCache(task); flag = true; } return false; } /** * 删除redis中的任务数据 * @param task */ private void removeTaskFromCache(Task task) { String key = task.getTaskType()+"_"+task.getPriority(); if(task.getExecuteTime()<=System.currentTimeMillis()){ cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task)); }else { cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task)); } } /** * 删除任务,更新任务日志状态 * @param taskId * @param status * @return */ private Task updateDb(long taskId, int status) { Task task = null; try { //删除任务 taskinfoMapper.deleteById(taskId); TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId); taskinfoLogs.setStatus(status); taskinfoLogsMapper.updateById(taskinfoLogs); task = new Task(); BeanUtils.copyProperties(taskinfoLogs,task); task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime()); }catch (Exception e){ log.error("task cancel exception taskid={}",taskId); } return task; }
测试
4.7)消费任务
在TaskService中添加方法
/** * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ public Task poll(int type,int priority);
实现
/** * 按照类型和优先级拉取任务 * @return */ @Override public Task poll(int type,int priority) { Task task = null; try { String key = type+"_"+priority; String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key); if(StringUtils.isNotBlank(task_json)){ task = JSON.parseObject(task_json, Task.class); //更新数据库信息 updateDb(task.getTaskId(),ScheduleConstants.EXECUTED); } }catch (Exception e){ e.printStackTrace(); log.error("poll task exception"); } return task; }
4.8)未来数据定时刷新
4.8.1)reids key值匹配
方案1:keys 模糊匹配
keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞
方案2:scan
SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。
代码案例:
@Test public void testKeys(){ Set
keys = cacheService.keys("future_*"); System.out.println(keys); Set scan = cacheService.scan("future_*"); System.out.println(scan); } 4.8.2)reids管道
普通redis客户端和服务器交互模式 性能很低
Pipeline请求模型
官方测试结果数据对比
测试案例对比:
//耗时6151 @Test public void testPiple1(){ long start =System.currentTimeMillis(); for (int i = 0; i <10000 ; i++) { Task task = new Task(); task.setTaskType(1001); task.setPriority(1); task.setExecuteTime(new Date().getTime()); cacheService.lLeftPush("1001_1", JSON.toJSONString(task)); } System.out.println("耗时"+(System.currentTimeMillis()- start)); } @Test public void testPiple2(){ long start = System.currentTimeMillis(); //使用管道技术 List
4.8.3)未来数据定时刷新-功能完成
在TaskService中添加方法
@Scheduled(cron = "0 */1 * * * ?")//定时 (每分钟执行一次 //{秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)} public void refresh() { System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务"); // 获取所有未来数据集合的key值 Set
futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_* for (String futureKey : futureKeys) { // future_250_250 String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1]; //获取该组key下当前需要消费的任务数据 Set tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis()); if (!tasks.isEmpty()) { //将这些任务数据添加到消费者队列中 cacheService.refreshWithPipeline(futureKey, topicKey, tasks); System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下"); } } } 在引导类中添加开启任务调度注解:@EnableScheduling
4.9)分布式锁解决集群下的方法抢占执行
4.9.1)问题描述
启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法
4.9.2)分布式锁
分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。
解决方案:
4.9.3)redis分布式锁
sexnx (SET if Not eXists)命令在指定的 key 不存在时,为 key 设置指定的值。
这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作
-
客户端A请求服务器设置key的值,如果设置成功就表示加锁成功
-
客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败
-
客户端A执行代码完成,删除锁
-
客户端B在等待一段时间后再去请求设置key的值,设置成功
-
客户端B执行代码完成,删除锁
4.9.4)在工具类CacheService中添加方法
/** * 加锁 * * @param name * @param expire * @return */ public String tryLock(String name, long expire) { name = name + "_lock"; String token = UUID.randomUUID().toString(); RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory(); RedisConnection conn = factory.getConnection(); try { //参考redis命令: //set key value [EX seconds] [PX milliseconds] [NX|XX] Boolean result = conn.set( name.getBytes(), token.getBytes(), Expiration.from(expire, TimeUnit.MILLISECONDS), RedisStringCommands.SetOption.SET_IF_ABSENT //NX ); if (result != null && result) return token; } finally { RedisConnectionUtils.releaseConnection(conn, factory,false); } return null; }
修改未来数据定时刷新的方法,如下:
/** * 未来数据定时刷新 */ @Scheduled(cron = "0 */1 * * * ?") public void refresh(){ String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30); if(StringUtils.isNotBlank(token)){ log.info("未来数据定时刷新---定时任务"); //获取所有未来数据的集合key Set
futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*"); for (String futureKey : futureKeys) {//future_100_50 //获取当前数据的key topic String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1]; //按照key和分值查询符合条件的数据 Set tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis()); //同步数据 if(!tasks.isEmpty()){ cacheService.refreshWithPipeline(futureKey,topicKey,tasks); log.info("成功的将"+futureKey+"刷新到了"+topicKey); } } } } 4.10)数据库同步到redis
@Scheduled(cron = "0 */5 * * * ?") @PostConstruct public void reloadData() { clearCache(); log.info("数据库数据同步到缓存"); Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.MINUTE, 5); //查看小于未来5分钟的所有任务 List
allTasks = taskinfoMapper.selectList(Wrappers. lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime())); if(allTasks != null && allTasks.size() > 0){ for (Taskinfo taskinfo : allTasks) { Task task = new Task(); BeanUtils.copyProperties(taskinfo,task); task.setExecuteTime(taskinfo.getExecuteTime().getTime()); addTaskToCache(task); } } } private void clearCache(){ // 删除缓存中未来数据集合和当前消费者队列的所有key Set futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_ Set topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_ cacheService.delete(futurekeys); cacheService.delete(topickeys); } 5)延迟队列解决精准时间发布文章
5.1)延迟队列服务提供对外接口
提供远程的feign接口,在heima-leadnews-feign-api编写类如下:
package com.heima.apis.schedule; import org.springframework.web.bind.annotation.RequestBody; @FeignClient("leadnews-schedule") public interface IScheduleClient { /** * 添加任务 * @param task 任务对象 * @return 任务id */ @PostMapping("/api/v1/task/add") public ResponseResult addTask(@RequestBody Task task); /** * 取消任务 * @param taskId 任务id * @return 取消结果 */ @GetMapping("/api/v1/task/cancel/{taskId}") public ResponseResult cancelTask(@PathVariable("taskId") long taskId); /** * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ @GetMapping("/api/v1/task/poll/{type}/{priority}") public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority); }
在heima-leadnews-schedule微服务下提供对应的实现
package com.heima.schedule.feign; import org.springframework.web.bind.annotation.*; @RestController public class ScheduleClient implements IScheduleClient { @Autowired private TaskService taskService; /** * 添加任务 * @param task 任务对象 * @return 任务id */ @PostMapping("/api/v1/task/add") @Override public ResponseResult addTask(@RequestBody Task task) { return ResponseResult.okResult(taskService.addTask(task)); } /** * 取消任务 * @param taskId 任务id * @return 取消结果 */ @GetMapping("/api/v1/task/cancel/{taskId}") @Override public ResponseResult cancelTask(@PathVariable("taskId") long taskId) { return ResponseResult.okResult(taskService.cancelTask(taskId)); } /** * 按照类型和优先级来拉取任务 * @param type * @param priority * @return */ @GetMapping("/api/v1/task/poll/{type}/{priority}") @Override public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) { return ResponseResult.okResult(taskService.poll(type,priority)); } }
5.2)发布文章集成添加延迟队列接口
在创建WmNewsTaskService
package com.heima.wemedia.service; import com.heima.model.wemedia.pojos.WmNews; public interface WmNewsTaskService { /** * 添加任务到延迟队列中 * @param id 文章的id * @param publishTime 发布的时间 可以做为任务的执行时间 */ public void addNewsToTask(Integer id, Date publishTime); }
实现:
package com.heima.wemedia.service.impl; import org.springframework.stereotype.Service; @Service @Slf4j public class WmNewsTaskServiceImpl implements WmNewsTaskService { @Autowired private IScheduleClient scheduleClient; /** * 添加任务到延迟队列中 * @param id 文章的id * @param publishTime 发布的时间 可以做为任务的执行时间 */ @Override @Async public void addNewsToTask(Integer id, Date publishTime) { log.info("添加任务到延迟服务中----begin"); Task task = new Task(); task.setExecuteTime(publishTime.getTime()); task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType()); task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); WmNews wmNews = new WmNews(); wmNews.setId(id); task.setParameters(ProtostuffUtil.serialize(wmNews)); scheduleClient.addTask(task); log.info("添加任务到延迟服务中----end"); } }
枚举类:
package com.heima.model.common.enums; import lombok.AllArgsConstructor; import lombok.Getter; @Getter @AllArgsConstructor public enum TaskTypeEnum { NEWS_SCAN_TIME(1001, 1,"文章定时审核"), REMOTEERROR(1002, 2,"第三方接口调用失败,重试"); private final int taskType; //对应具体业务 private final int priority; //业务不同级别 private final String desc; //描述信息 }
序列化工具对比
-
JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组
-
Protostuff:google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo类
拷贝资料中的两个类到heima-leadnews-utils下
Protostuff需要引导依赖:
io.protostuff protostuff-core1.6.0 io.protostuff protostuff-runtime1.6.0 修改发布文章代码:
把之前的异步调用修改为调用延迟任务
@Autowired private WmNewsTaskService wmNewsTaskService; /** * 发布修改文章或保存为草稿 * @param dto * @return */ @Override public ResponseResult submitNews(WmNewsDto dto) { //0.条件判断 if(dto == null || dto.getContent() == null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //1.保存或修改文章 WmNews wmNews = new WmNews(); //属性拷贝 属性名词和类型相同才能拷贝 BeanUtils.copyProperties(dto,wmNews); //封面图片 list---> string if(dto.getImages() != null && dto.getImages().size() > 0){ //[1dddfsd.jpg,sdlfjldk.jpg]--> 1dddfsd.jpg,sdlfjldk.jpg String imageStr = StringUtils.join(dto.getImages(), ","); wmNews.setImages(imageStr); } //如果当前封面类型为自动 -1 if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){ wmNews.setType(null); } saveOrUpdateWmNews(wmNews); //2.判断是否为草稿 如果为草稿结束当前方法 if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){ return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); } //3.不是草稿,保存文章内容图片与素材的关系 //获取到文章内容中的图片信息 List
materials = ectractUrlInfo(dto.getContent()); saveRelativeInfoForContent(materials,wmNews.getId()); //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片 saveRelativeInfoForCover(dto,wmNews,materials); //审核文章 // wmNewsAutoScanService.autoScanWmNews(wmNews.getId()); wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime()); return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); } 5.3)消费任务进行审核文章
WmNewsTaskService中添加方法
/** * 消费延迟队列数据 */ public void scanNewsByTask();
实现
@Autowired private WmNewsAutoScanServiceImpl wmNewsAutoScanService; /** * 消费延迟队列数据 */ @Scheduled(fixedRate = 1000) @Override @SneakyThrows public void scanNewsByTask() { log.info("文章审核---消费任务执行---begin---"); ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority()); if(responseResult.getCode().equals(200) && responseResult.getData() != null){ String json_str = JSON.toJSONString(responseResult.getData()); Task task = JSON.parseObject(json_str, Task.class); byte[] parameters = task.getParameters(); WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class); System.out.println(wmNews.getId()+"-----------"); wmNewsAutoScanService.autoScanWmNews(wmNews.getId()); } log.info("文章审核---消费任务执行---end---"); }
在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling
06kafka及异步通知文章上下架
1)自媒体文章上下架
需求分析
2)kafka概述
消息中间件对比
消息中间件对比-选择建议
消息中间件
建议
Kafka
追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务
RocketMQ
可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验
RabbitMQ
性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ
kafka介绍
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。
kafka官网:Apache Kafka
kafka介绍-名词解释
-
producer:发布消息的对象称之为主题生产者(Kafka topic producer)
-
topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
-
consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
-
broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
3)kafka安装配置
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
-
Docker安装zookeeper
下载镜像:
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
-
Docker安装kafka
下载镜像:
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \ --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \ --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \ --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \ --net=host wurstmeister/kafka:2.12-2.3.1
云主机无法使用--net
4)kafka入门
-
生产者发送消息,多个消费者只能有一个消费者接收到消息
-
生产者发送消息,多个消费者都可以接收到消息
(1)创建kafka-demo项目,导入依赖
org.apache.kafka kafka-clients(2)生产者发送消息
package com.heima.kafka.sample; import java.util.Properties; /** * 生产者 */ public class ProducerQuickStart { public static void main(String[] args) { //1.kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092"); //发送失败,失败的重试次数 properties.put(ProducerConfig.RETRIES_CONFIG,5); //消息key的序列化器 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //消息value的序列化器 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); //2.生产者对象 KafkaProducer
producer = new KafkaProducer (properties); //封装发送的消息 ProducerRecord record = new ProducerRecord ("itheima-topic","100001","hello kafka"); //3.发送消息 producer.send(record); //4.关闭消息通道,必须关闭,否则消息发送不成功 producer.close(); } } (3)消费者接收消息
package com.heima.kafka.sample; import java.util.Properties; /** * 消费者 */ public class ConsumerQuickStart { public static void main(String[] args) { //1.添加kafka的配置信息 Properties properties = new Properties(); //kafka的连接地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092"); //消费者组 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2"); //消息的反序列化器 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //2.消费者对象 KafkaConsumer
consumer = new KafkaConsumer (properties); //3.订阅主题 consumer.subscribe(Collections.singletonList("itheima-topic")); //当前线程一直处于监听状态 while (true) { //4.获取消息 ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord consumerRecord : consumerRecords) { System.out.println(consumerRecord.key()); System.out.println(consumerRecord.value()); } } } } 总结
-
生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)同一个组
-
生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)多个组
分区机制—topic剖析
5)kafka高可用设计
5.1)集群
-
Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成
-
这样如果集群中某一台机器宕机,其他机器上的 Broker 也依然能够对外提供服务。这其实就是 Kafka 提供高可用的手段之一
5.2)备份机制(Replication)
Kafka 中消息的备份又叫做 副本(Replica)
Kafka 定义了两类副本:
-
领导者副本(Leader Replica)
-
追随者副本(Follower Replica)
备份机制—同步方式
ISR(in-sync replica)需要同步复制保存的follower
如果leader失效后,需要选出新的leader,选举的原则如下:
第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的
第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取
极端情况,就是所有副本都失效了,这时有两种方案
第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定
第二:选择第一个活过来的Replication,不一定是ISR中的,选为leader,以最快速度恢复可用性,但数据不一定完整
6)kafka生产者详解
6.1)发送类型
-
同步发送
使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功
RecordMetadata recordMetadata = producer.send(kvProducerRecord).get(); System.out.println(recordMetadata.offset());
-
异步发送
调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数
//异步消息发送 producer.send(kvProducerRecord, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e != null){ System.out.println("记录异常信息到日志表中"); } System.out.println(recordMetadata.offset()); } });
6.2)参数详解
-
ack确认机制
代码的配置方式:
//ack配置 消息确认机制 prop.put(ProducerConfig.ACKS_CONFIG,"all");
参数的选择说明
确认机制
说明
acks=0
生产者在成功写入消息之前不会等待(不需要)任何来自服务器的响应,消息有丢失的风险,但是速度最快
acks=1(默认值)
只要集群Leader节点收到消息,生产者就会收到一个来自服务器的成功响应
acks=all
只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
-
retries 重试次数
生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms
代码中配置方式:
//重试次数 prop.put(ProducerConfig.RETRIES_CONFIG,10);
-
消息压缩
默认情况下, 消息发送时不会被压缩。
代码中配置方式:
//数据压缩 prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
压缩算法
说明
snappy
占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用
lz4
占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观
gzip
占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法
使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
7)kafka消费者详解
7.1)消费者组
-
消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体
-
一个发布在Topic上消息被分发给此消费者组中的一个消费者
-
所有的消费者都在一个组中,那么这就变成了queue模型 消息队列 一对一
-
所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型 一对多消费者
7.2)消息有序性
应用场景:
-
即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致
-
充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序
topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。
7.3)提交和偏移量
kafka不会像其他JMS队列那样需要得到消费者的确认,消费者可以使用kafka来追踪消息在分区的位置(偏移量)
消费者会往一个叫做_consumer_offset的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡
正常的情况
如果消费者2挂掉以后,会发生再均衡,消费者2负责的分区会被其他消费者进行消费
再均衡后不可避免会出现一些问题
问题一:
如果提交偏移量2小于客户端处理的最后一个消息10的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
问题二:
如果提交的偏移量5大于客户端最后一个消息11的偏移量,那么处于两个偏移量之间的消息将会丢失。
如果想要解决这些问题,还要知道目前kafka提交偏移量的方式:
提交偏移量的方式有两种,分别是自动提交偏移量和手动提交
-
自动提交偏移量
当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去
-
手动提交 ,当enable.auto.commit被设置为false可以有以下三种提交方式
-
提交当前偏移量(同步提交)
-
异步提交
-
同步和异步组合提交
1.提交当前偏移量(同步提交)
把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。
while (true){ ConsumerRecords
records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { System.out.println(record.value()); System.out.println(record.key()); try { consumer.commitSync();//同步提交当前最新的偏移量 }catch (CommitFailedException e){ System.out.println("记录提交失败的异常:"+e); } } } 2.异步提交
手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API :commitAsync()
while (true){ ConsumerRecords
records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { System.out.println(record.value()); System.out.println(record.key()); } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map map, Exception e) { if(e!=null){ System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e); } } }); } 3.同步和异步组合提交
异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。
相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。
举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。
try { while (true){ ConsumerRecords
records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { System.out.println(record.value()); System.out.println(record.key()); } consumer.commitAsync(); } }catch (Exception e){+ e.printStackTrace(); System.out.println("记录错误信息:"+e); }finally { try { consumer.commitSync(); }finally { consumer.close(); } } 8)springboot集成kafka
8.1)入门
1.导入spring-kafka依赖信息
org.springframework.boot spring-boot-starter-weborg.springframework.kafka spring-kafkaorg.apache.kafka kafka-clientsorg.apache.kafka kafka-clientscom.alibaba fastjson2.在resources下创建文件application.yml
server: port: 9991 spring: application: name: kafka-demo kafka: bootstrap-servers: 192.168.200.130:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: ${spring.application.name}-test key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.消息生产者
package com.heima.kafka.controller; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { @Autowired private KafkaTemplate
kafkaTemplate; @GetMapping("/hello") public String hello(){ kafkaTemplate.send("itcast-topic","黑马程序员"); return "ok"; } } 4.消息消费者
package com.heima.kafka.listener; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @Component public class HelloListener { @KafkaListener(topics = "itcast-topic") public void onMessage(String message){ if(!StringUtils.isEmpty(message)){ System.out.println(message); } } }
8.2)传递消息为对象
目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式
方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍
方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式
-
发送消息
@GetMapping("/hello") public String hello(){ User user = new User(); user.setUsername("xiaowang"); user.setAge(18); kafkaTemplate.send("user-topic", JSON.toJSONString(user)); return "ok"; }
-
接收消息
package com.heima.kafka.listener; import org.springframework.util.StringUtils; @Component public class HelloListener { @KafkaListener(topics = "user-topic") public void onMessage(String message){ if(!StringUtils.isEmpty(message)){ User user = JSON.parseObject(message, User.class); System.out.println(user); } } }
9)自媒体文章上下架功能完成
9.1)需求分析
-
已发表且已上架的文章可以下架
-
已发表且已下架的文章可以上架
9.2)流程说明
9.3)接口定义
说明
接口路径
/api/v1/news/down_or_up
请求方式
POST
参数
DTO
响应结果
ResponseResult
DTO
@Data public class WmNewsDto { private Integer id; /** * 是否上架 0 下架 1 上架 */ private Short enable; }
ResponseResult
9.4)自媒体文章上下架-功能实现
9.4.1)接口定义
在heima-leadnews-wemedia工程下的WmNewsController新增方法
@PostMapping("/down_or_up") public ResponseResult downOrUp(@RequestBody WmNewsDto dto){ return null; }
在WmNewsDto中新增enable属性 ,完整的代码如下:
package com.heima.model.wemedia.dtos; import lombok.Data; import java.util.Date; import java.util.List; @Data public class WmNewsDto { private Integer id; /** * 标题 */ private String title; /** * 频道id */ private Integer channelId; /** * 标签 */ private String labels; /** * 发布时间 */ private Date publishTime; /** * 文章内容 */ private String content; /** * 文章封面类型 0 无图 1 单图 3 多图 -1 自动 */ private Short type; /** * 提交时间 */ private Date submitedTime; /** * 状态 提交为1 草稿为0 */ private Short status; /** * 封面图片列表 多张图以逗号隔开 */ private List
images; /** * 上下架 0 下架 1 上架 */ private Short enable; } 9.4.2)业务层编写
在WmNewsService新增方法
/** * 文章的上下架 * @param dto * @return */ public ResponseResult downOrUp(WmNewsDto dto);
实现方法
/** * 文章的上下架 * @param dto * @return */ @Override public ResponseResult downOrUp(WmNewsDto dto) { //1.检查参数 if(dto.getId() == null){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //2.查询文章 WmNews wmNews = getById(dto.getId()); if(wmNews == null){ return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST,"文章不存在"); } //3.判断文章是否已发布 if(!wmNews.getStatus().equals(WmNews.Status.PUBLISHED.getCode())){ return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID,"当前文章不是发布状态,不能上下架"); } //4.修改文章enable if(dto.getEnable() != null && dto.getEnable() > -1 && dto.getEnable() < 2){ update(Wrappers.
lambdaUpdate().set(WmNews::getEnable,dto.getEnable()) .eq(WmNews::getId,wmNews.getId())); } return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); } 9.4.3)控制器
@PostMapping("/down_or_up") public ResponseResult downOrUp(@RequestBody WmNewsDto dto){ return wmNewsService.downOrUp(dto); }
9.4.4)测试
9.5)消息通知article端文章上下架
9.5.1)在heima-leadnews-common模块下导入kafka依赖
org.springframework.kafka spring-kafkaorg.apache.kafka kafka-clients9.5.2)在自媒体端的nacos配置中心配置kafka的生产者
spring: kafka: bootstrap-servers: 192.168.200.130:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
9.5.3)在自媒体端文章上下架后发送消息
//发送消息,通知article端修改文章配置 if(wmNews.getArticleId() != null){ Map
map = new HashMap<>(); map.put("articleId",wmNews.getArticleId()); map.put("enable",dto.getEnable()); kafkaTemplate.send(WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC,JSON.toJSONString(map)); } 常量类:
public class WmNewsMessageConstants { public static final String WM_NEWS_UP_OR_DOWN_TOPIC="wm.news.up.or.down.topic"; }
9.5.4)在article端的nacos配置中心配置kafka的消费者
spring: kafka: bootstrap-servers: 192.168.200.130:9092 consumer: group-id: ${spring.application.name} key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
9.5.5)在article端编写监听,接收数据
package com.heima.article.listener; import com.alibaba.fastjson.JSON; import com.heima.article.service.ApArticleConfigService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @Slf4j public class ArtilceIsDownListener { @Autowired private ApArticleConfigService apArticleConfigService; @KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC) public void onMessage(String message){ if(StringUtils.isNotBlank(message)){ Map map = JSON.parseObject(message, Map.class); apArticleConfigService.updateByMap(map); log.info("article端文章配置修改,articleId={}",map.get("articleId")); } } }
9.5.6)修改ap_article_config表的数据
新建ApArticleConfigService
package com.heima.article.service; import com.baomidou.mybatisplus.extension.service.IService; import com.heima.model.article.pojos.ApArticleConfig; import java.util.Map; public interface ApArticleConfigService extends IService { /** * 修改文章配置 * @param map */ public void updateByMap(Map map); }
实现类:
package com.heima.article.service.impl; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.heima.article.mapper.ApArticleConfigMapper; import com.heima.article.service.ApArticleConfigService; import com.heima.model.article.pojos.ApArticleConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.Map; @Service @Slf4j @Transactional public class ApArticleConfigServiceImpl extends ServiceImpl implements ApArticleConfigService { /** * 修改文章配置 * @param map */ @Override public void updateByMap(Map map) { //0 下架 1 上架 Object enable = map.get("enable"); boolean isDown = true; if(enable.equals(1)){ isDown = false; } //修改文章配置 update(Wrappers.lambdaUpdate().eq(ApArticleConfig::getArticleId,map.get("articleId")).set(ApArticleConfig::getIsDown,isDown)); } }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章