上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka

guduadmin31天前

目录

《黑马头条》SpringBoot+SpringCloud+ Nacos等企业级微服务架构项目_黑马头条项目_软工菜鸡的博客-CSDN博客

04自媒体文章-自动审核

1)自媒体文章自动审核流程

2)内容安全第三方接口

2.1)概述

2.2)准备工作

2.3)文本内容审核接口

2.4)图片审核接口

2.5)项目集成

3)app端文章保存接口

3.1)表结构说明

3.2)分布式id

分布式id-技术选型

3.3)思路分析

3.4)feign接口

4)自媒体文章自动审核功能实现

4.1)表结构说明

4.2)实现

4.3)单元测试

4.4)feign远程接口调用方式

4.5)服务降级处理

5)发布文章提交审核集成

5.1)同步调用与异步调用

5.2)Springboot集成异步线程调用

6)文章审核功能-综合测试

6.1)服务启动列表

6.2)测试情况列表

7)新需求-自管理敏感词

7.1)需求分析

7.2)敏感词-过滤

7.3)DFA实现原理

7.4)自管理敏感词集成到文章审核中

测试了一下 源码不能检测 标题的敏感词汇;加了个这: wmNews.getTitle()+

8)新需求-图片识别文字审核敏感词

8.1)需求分析

8.2)图片文字识别

8 .3)Tess4j案例

8.4)管理敏感词和图片文字识别集成到文章审核

9)文章详情-静态文件生成

9.1)思路分析

9.2)实现步骤

05延迟任务精准发布文章

1)文章定时发布

2)延迟任务概述

2.1)什么是延迟任务

2.2)技术对比

2.2.1)DelayQueue

2.2.2)RabbitMQ实现延迟任务

2.2.3)redis实现

3)redis实现延迟任务

锐评:完全为了学list zset而编出来的场景,实际工作中延迟队列要设计成这样只能说太蠢了

4)延迟任务服务实现

4.1)搭建heima-leadnews-schedule模块

4.2)数据库准备

乐观锁/悲观锁

4.3)安装redis

4.4)项目集成redis

4.5)添加任务

4.6)取消任务

4.7)消费任务

4.8)未来数据定时刷新

4.8.1)reids key值匹配

4.8.2)reids管道

4.8.3)未来数据定时刷新-功能完成

4.9)分布式锁解决集群下的方法抢占执行

4.9.1)问题描述

4.9.2)分布式锁

4.9.3)redis分布式锁

4.9.4)在工具类CacheService中添加方法

4.10)数据库同步到redis

5)延迟队列解决精准时间发布文章

5.1)延迟队列服务提供对外接口

5.2)发布文章集成添加延迟队列接口

序列化工具对比

5.3)消费任务进行审核文章

06kafka及异步通知文章上下架

1)自媒体文章上下架

2)kafka概述

消息中间件对比-选择建议

kafka介绍-名词解释

3)kafka安装配置

4)kafka入门

分区机制—topic剖析

5)kafka高可用设计

5.1)集群

5.2)备份机制(Replication)

6)kafka生产者详解

6.1)发送类型

6.2)参数详解

ack确认机制

retries 重试次数

消息压缩

7)kafka消费者详解

7.1)消费者组

7.2)消息有序性

7.3)提交和偏移量

1.提交当前偏移量(同步提交)

2.异步提交

3.同步和异步组合提交

8)springboot集成kafka

8.1)入门

8.2)传递消息为对象

9)自媒体文章上下架功能完成

9.1)需求分析

9.2)流程说明

9.3)接口定义

9.4)自媒体文章上下架-功能实现

9.5)消息通知article端文章上下架



04自媒体文章-自动审核

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第1张

1)自媒体文章自动审核流程

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第2张
1 自媒体端发布文章后,开始审核文章

2 审核的主要是审核文章的 内容(文本内容和图片)

3 借助 第三方提供的接口审核文本

4 借助第三方提供的接口审核图片,由于图片存储到minIO中,需要先下载才能审核

5 如果审核失败,则需要修改自媒体文章的状态,status:2 审核失败 status:3 转到人工审核

6 如果审核成功,则需要在文章微服务中创建app端需要的文章

2)内容安全第三方接口

2.1)概述

内容安全是识别服务,支持对图片、视频、文本、语音等对象多样化场景检测,有效降低内容违规风险

目前很多平台都支持内容检测,如阿里云、腾讯云、百度AI、网易云等国内大型互联网公司都对外提供了API。

按照性能和收费来看,黑马头条项目使用的就是阿里云的内容安全接口,使用到了图片和文本的审核。

阿里云收费标准:https://www.aliyun.com/price/product/?spm=a2c4g.11186623.2.10.4146401eg5oeu8#/lvwang/detail

2.2)准备工作

您在使用内容检测API之前,需要先注册阿里云账号,添加Access Key并签约云盾内容安全。

操作步骤

  1. 前往阿里云官网注册账号。如果已有注册账号,请跳过此步骤。

进入阿里云首页后,如果没有阿里云的账户需要先进行注册,才可以进行登录。由于注册较为简单,课程和讲义不在进行体现(注册可以使用多种方式,如淘宝账号、支付宝账号、微博账号等...)。

需要实名认证和活体认证。

  1. 打开云盾内容安全产品试用页面,单击立即开通,正式开通服务。

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第3张

内容安全控制台

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第4张
  1. 在AccessKey管理页面管理您的AccessKeyID和AccessKeySecret。

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第5张

管理自己的AccessKey,可以新建和删除AccessKey

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第6张

查看自己的AccessKey,

AccessKey默认是隐藏的,第一次申请的时候可以保存AccessKey,点击显示,通过验证手机号后也可以查看

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第7张
2.3)文本内容审核接口

文本垃圾内容检测:如何调用文本检测接口进行文本内容审核_内容安全-阿里云帮助中心

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第8张

文本垃圾内容Java SDK: 如何使用JavaSDK文本反垃圾接口_内容安全-阿里云帮助中心

2.4)图片审核接口

图片垃圾内容检测:调用图片同步检测接口/green/image/scan审核图片内容_内容安全-阿里云帮助中心

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第9张

图片垃圾内容Java SDK: 如何使用JavaSDK接口检测图片是否包含风险内容_内容安全-阿里云帮助中心

2.5)项目集成

①:拷贝资料文件夹中的类到common模块下面,并添加到自动配置

包括了GreenImageScan和GreenTextScan及对应的工具类

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第10张

添加到自动配置中

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第11张

②: accessKeyId和secret(需自己申请)

在heima-leadnews-wemedia中的nacos配置中心添加以下配置:

aliyun:
 accessKeyId: ...
 secret: ...
#aliyun.scenes=porn,terrorism,ad,qrcode,live,logo
 scenes: terrorism

③:在自媒体微服务中测试类中注入审核文本和图片的bean进行测试

package com.heima.wemedia;
import java.util.Arrays;
import java.util.Map;
@SpringBootTest(classes = WemediaApplication.class)
@RunWith(SpringRunner.class)
public class AliyunTest {
    @Autowired
    private GreenTextScan greenTextScan;
    @Autowired
    private GreenImageScan greenImageScan;
    @Autowired
    private FileStorageService fileStorageService;
    @Test
    public void testScanText() throws Exception {
        Map map = greenTextScan.greeTextScan("我是一个好人,冰毒");
        System.out.println(map);
    }
    @Test
    public void testScanImage() throws Exception {
        byte[] bytes = fileStorageService.downLoadFile("http://192.168.200.130:9000/leadnews/2021/04/26/ef3cbe458db249f7bd6fb4339e593e55.jpg");
        Map map = greenImageScan.imageScan(Arrays.asList(bytes));
        System.out.println(map);
    }
}

我用的是 阿里云 云安全 增强版1小时,没审核出效果为null;估计是阿里 改接口了;

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第12张

图片审核页报错

java.lang.RuntimeException: upload file fail.
    at com.heima.common.aliyun.util.ClientUploader.uploadBytes(ClientUploader.java:129)
    at com.heima.common.aliyun.GreenImageScan.imageScan(GreenImageScan.java:71)
    at com.heima.wemedia.test.AliyunTest.testScanImage(AliyunTest.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
    at org.springframework.test.context.junit4.statements.RunAfterTestExecutionC

3)app端文章保存接口

3.1)表结构说明
《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第13张
3.2)分布式id

随着业务的增长,文章表可能要占用很大的物理存储空间,为了解决该问题,后期使用数据库分片技术。将一个数据库进行拆分,通过数据库中间件连接。如果数据库中该表选用ID自增策略,则可能产生重复的ID,此时应该使用分布式ID生成策略来生成ID。

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第14张
分布式id-技术选型
《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第15张

snowflake是Twitter开源的分布式ID生成算法,结果是一个long型的ID。

其核心思想是:使用41bit作为毫秒数,10bit作为机器的ID(5个bit是数据中心,5个bit的机器ID)(最多32个机房*32台机器(也可以自己设)),12bit作为毫秒内的流水号(意味着每个节点在每毫秒可以产生 4096 个 ID),最后还有一个符号位,永远是0(1为负数)

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第16张

文章端相关的表都使用雪花算法生成id,包括ap_article、 ap_article_config、 ap_article_content

mybatis-plus已经集成了雪花算法,完成以下两步即可在项目中集成雪花算法

第一:在实体类中的id上加入如下配置,指定类型为id_worker

@TableId(value = "id",type = IdType.ID_WORKER)
private Long id;

第二:在application.yml文件中配置数据中心id和机器id

mybatis-plus:
  mapper-locations: classpath*:mapper/*.xml
  # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
  type-aliases-package: com.heima.model.article.pojos
  global-config:
    datacenter-id: 1
    workerId: 1

datacenter-id:数据中心id(取值范围:0-31) ;workerId:机器id(取值范围:0-31)

3.3)思路分析

在文章审核成功以后需要在app的article库中新增文章数据

1.保存文章信息 ap_article

2.保存文章配置信息 ap_article_config

3.保存文章内容 ap_article_content

实现思路:

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第17张
3.4)feign接口
《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第18张

ArticleDto

package com.heima.model.article.dtos;
import com.heima.model.article.pojos.ApArticle;
import lombok.Data;
@Data
public class ArticleDto  extends ApArticle {
    /**
     * 文章内容
     */
    private String content;
}

功能实现:

①:在heima-leadnews-feign-api中新增接口

第一:线导入feign的依赖


    org.springframework.cloud
    spring-cloud-starter-openfeign

第二:定义文章端的接口

package com.heima.apis.article;
import org.springframework.web.bind.annotation.RequestBody;
@FeignClient(value = "leadnews-article")
public interface IArticleClient {
    @PostMapping("/api/v1/article/save")
    public ResponseResult saveArticle(@RequestBody ArticleDto dto) ;
}

②:在heima-leadnews-article中实现该方法

package com.heima.article.feign;
import java.io.IOException;
@RestController
public class ArticleClient implements IArticleClient {
    @Autowired
    private ApArticleService apArticleService;
    @Override
    @PostMapping("/api/v1/article/save")
    public ResponseResult saveArticle(@RequestBody ArticleDto dto) {
        return apArticleService.saveArticle(dto);
    }
}

③:拷贝mapper

在资料文件夹中拷贝ApArticleConfigMapper类到mapper文件夹中

同时,修改ApArticleConfig类,添加如下构造函数

package com.heima.model.article.pojos;
import java.io.Serializable;
/**
 * 

* APP已发布文章配置表 *

* * @author itheima */ @Data @NoArgsConstructor @TableName("ap_article_config") public class ApArticleConfig implements Serializable { public ApArticleConfig(Long articleId){ this.articleId = articleId; this.isComment = true; this.isForward = true; this.isDelete = false; this.isDown = false; } @TableId(value = "id",type = IdType.ID_WORKER) private Long id; /** * 文章id */ @TableField("article_id") private Long articleId; /** * 是否可评论 * true: 可以评论 1 * false: 不可评论 0 */ @TableField("is_comment") private Boolean isComment; /** * 是否转发 * true: 可以转发 1 * false: 不可转发 0 */ @TableField("is_forward") private Boolean isForward; /** * 是否下架 * true: 下架 1 * false: 没有下架 0 */ @TableField("is_down") private Boolean isDown; /** * 是否已删除 * true: 删除 1 * false: 没有删除 0 */ @TableField("is_delete") private Boolean isDelete; }

④:在ApArticleService中新增方法

/**
     * 保存app端相关文章
     * @param dto
     * @return
     */
ResponseResult saveArticle(ArticleDto dto) ;

实现类:

@Autowired
private ApArticleConfigMapper apArticleConfigMapper;
@Autowired
private ApArticleContentMapper apArticleContentMapper;
/**
     * 保存app端相关文章
     * @param dto
     * @return
     */
@Override
public ResponseResult saveArticle(ArticleDto dto) {
    //1.检查参数
    if(dto == null){
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    }
    ApArticle apArticle = new ApArticle();
    BeanUtils.copyProperties(dto,apArticle);
    //2.判断是否存在id
    if(dto.getId() == null){
        //2.1 不存在id  保存  文章  文章配置  文章内容
        //保存文章
        save(apArticle);
        //保存配置
        ApArticleConfig apArticleConfig = new ApArticleConfig(apArticle.getId());
        apArticleConfigMapper.insert(apArticleConfig);
        //保存 文章内容
        ApArticleContent apArticleContent = new ApArticleContent();
        apArticleContent.setArticleId(apArticle.getId());
        apArticleContent.setContent(dto.getContent());
        apArticleContentMapper.insert(apArticleContent);
    }else {
        //2.2 存在id   修改  文章  文章内容
        //修改  文章
        updateById(apArticle);
        //修改文章内容
        ApArticleContent apArticleContent = apArticleContentMapper.selectOne(Wrappers.lambdaQuery().eq(ApArticleContent::getArticleId, dto.getId()));
        apArticleContent.setContent(dto.getContent());
        apArticleContentMapper.updateById(apArticleContent);
    }
    //3.结果返回  文章的id
    return ResponseResult.okResult(apArticle.getId());
}

⑤:测试

编写junit单元测试,或使用postman进行测试

http://localhost:51802/api/v1/article/save

{
	"id":这个id要去数据库自己找 ,
    "title":"黑马头条项目背景22222222222222",
    "authoId":1102,
    "layout":1,
    "labels":"黑马头条",
    "publishTime":"2028-03-14T11:35:49.000Z",
    "images": "http://192.168.200.130:9000/leadnews/2021/04/26/5ddbdb5c68094ce393b08a47860da275.jpg",
    "content":"22222222222222222黑马头条项目背景,黑马头条项目背景,黑马头条项目背景,黑马头条项目背景,黑马头条项目背景"
}

4)自媒体文章自动审核功能实现

4.1)表结构说明

wm_news 自媒体文章表

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第19张

status字段:0 草稿 1 待审核 2 审核失败 3 人工审核 4 人工审核通过 8 审核通过(待发布) 9 已发布

4.2)实现
《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第20张

在heima-leadnews-wemedia中的service新增接口

package com.heima.wemedia.service;
public interface WmNewsAutoScanService {
    /**
     * 自媒体文章审核
     * @param id  自媒体文章id
     */
    public void autoScanWmNews(Integer id);
}

实现类:

package com.heima.wemedia.service.impl;
import java.util.*;
import java.util.stream.Collectors;
@Service
@Slf4j
@Transactional
public class WmNewsAutoScanServiceImpl implements WmNewsAutoScanService {
    @Autowired
    private WmNewsMapper wmNewsMapper;
    /**
     * 自媒体文章审核
     *
     * @param id 自媒体文章id
     */
    @Override
    public void autoScanWmNews(Integer id) {
        //1.查询自媒体文章
        WmNews wmNews = wmNewsMapper.selectById(id);
        if(wmNews == null){
            throw new RuntimeException("WmNewsAutoScanServiceImpl-文章不存在");
        }
        if(wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())){
            //从内容中提取纯文本内容和图片
            Map textAndImages = handleTextAndImages(wmNews);
            //2.审核文本内容  阿里云接口
            boolean isTextScan = handleTextScan((String) textAndImages.get("content"),wmNews);
            if(!isTextScan)return;
            //3.审核图片  阿里云接口
            boolean isImageScan =  handleImageScan((List) textAndImages.get("images"),wmNews);
            if(!isImageScan)return;
            //4.审核成功,保存app端的相关的文章数据
            ResponseResult responseResult = saveAppArticle(wmNews);
            if(!responseResult.getCode().equals(200)){
                throw new RuntimeException("WmNewsAutoScanServiceImpl-文章审核,保存app端相关文章数据失败");
            }
            //回填article_id
            wmNews.setArticleId((Long) responseResult.getData());
            updateWmNews(wmNews,(short) 9,"审核成功");
        }
    }
    @Autowired
    private IArticleClient articleClient;
    @Autowired
    private WmChannelMapper wmChannelMapper;
    @Autowired
    private WmUserMapper wmUserMapper;
    /**
     * 保存app端相关的文章数据
     * @param wmNews
     */
    private ResponseResult saveAppArticle(WmNews wmNews) {
        ArticleDto dto = new ArticleDto();
        //属性的拷贝
        BeanUtils.copyProperties(wmNews,dto);
        //文章的布局
        dto.setLayout(wmNews.getType());
        //频道
        WmChannel wmChannel = wmChannelMapper.selectById(wmNews.getChannelId());
        if(wmChannel != null){
            dto.setChannelName(wmChannel.getName());
        }
        //作者
        dto.setAuthorId(wmNews.getUserId().longValue());
        WmUser wmUser = wmUserMapper.selectById(wmNews.getUserId());
        if(wmUser != null){
            dto.setAuthorName(wmUser.getName());
        }
        //设置文章id
        if(wmNews.getArticleId() != null){
            dto.setId(wmNews.getArticleId());
        }
        dto.setCreatedTime(new Date());
        ResponseResult responseResult = articleClient.saveArticle(dto);
        return responseResult;
    }
    @Autowired
    private FileStorageService fileStorageService;
    @Autowired
    private GreenImageScan greenImageScan;
    /**
     * 审核图片
     * @param images
     * @param wmNews
     * @return
     */
    private boolean handleImageScan(List images, WmNews wmNews) {
        boolean flag = true;
        if(images == null || images.size() == 0){
            return flag;
        }
        //下载图片 minIO
        //图片去重
        images = images.stream().distinct().collect(Collectors.toList());
        List imageList = new ArrayList<>();
        for (String image : images) {
            byte[] bytes = fileStorageService.downLoadFile(image);
            imageList.add(bytes);
        }
        //审核图片
        try {
            Map map = greenImageScan.imageScan(imageList);
            if(map != null){
                //审核失败
                if(map.get("suggestion").equals("block")){
                    flag = false;
                    updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
                }
                //不确定信息  需要人工审核
                if(map.get("suggestion").equals("review")){
                    flag = false;
                    updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
                }
            }
        } catch (Exception e) {
            flag = false;
            e.printStackTrace();
        }
        return flag;
    }
    @Autowired
    private GreenTextScan greenTextScan;
    /**
     * 审核纯文本内容
     * @param content
     * @param wmNews
     * @return
     */
    private boolean handleTextScan(String content, WmNews wmNews) {
        boolean flag = true;
        if((wmNews.getTitle()+"-"+content).length() == 0){
            return flag;
        }
        try {
            Map map = greenTextScan.greeTextScan((wmNews.getTitle()+"-"+content));
            if(map != null){
                //审核失败
                if(map.get("suggestion").equals("block")){
                    flag = false;
                    updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
                }
                //不确定信息  需要人工审核
                if(map.get("suggestion").equals("review")){
                    flag = false;
                    updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
                }
            }
        } catch (Exception e) {
            flag = false;
            e.printStackTrace();
        }
        return flag;
    }
    /**
     * 修改文章内容
     * @param wmNews
     * @param status
     * @param reason
     */
    private void updateWmNews(WmNews wmNews, short status, String reason) {
        wmNews.setStatus(status);
        wmNews.setReason(reason);
        wmNewsMapper.updateById(wmNews);
    }
    /**
     * 1。从自媒体文章的内容中提取文本和图片
     * 2.提取文章的封面图片
     * @param wmNews
     * @return
     */
    private Map handleTextAndImages(WmNews wmNews) {
        //存储纯文本内容
        StringBuilder stringBuilder = new StringBuilder();
        List images = new ArrayList<>();
        //1。从自媒体文章的内容中提取文本和图片
        if(StringUtils.isNotBlank(wmNews.getContent())){
            List maps = JSONArray.parseArray(wmNews.getContent(), Map.class);
            for (Map map : maps) {
                if (map.get("type").equals("text")){
                    stringBuilder.append(map.get("value"));
                }
                if (map.get("type").equals("image")){
                    images.add((String) map.get("value"));
                }
            }
        }
        //2.提取文章的封面图片
        if(StringUtils.isNotBlank(wmNews.getImages())){
            String[] split = wmNews.getImages().split(",");
            images.addAll(Arrays.asList(split));
        }
        Map resultMap = new HashMap<>();
        resultMap.put("content",stringBuilder.toString());
        resultMap.put("images",images);
        return resultMap;
    }
}
4.3)单元测试
package com.heima.wemedia.service;
import com.heima.wemedia.WemediaApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.*;
@SpringBootTest(classes = WemediaApplication.class)
@RunWith(SpringRunner.class)
public class WmNewsAutoScanServiceTest {
    @Autowired
    private WmNewsAutoScanService wmNewsAutoScanService;
    @Test
    public void autoScanWmNews() {
        wmNewsAutoScanService.autoScanWmNews(6238);
    }
}
4.4)feign远程接口调用方式
《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第21张

在heima-leadnews-wemedia服务中已经依赖了heima-leadnews-feign-apis工程,只需要在自媒体的引导类中开启feign的远程调用即可

注解为:@EnableFeignClients(basePackages = "com.heima.apis") 需要指向apis这个包

《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第22张
4.5)服务降级处理
《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第23张
  • 服务降级是服务自我保护的一种方式,或者保护下游服务的一种方式,用于确保服务不会受请求突增影响变得不可用,确保服务不会崩溃

    • 服务降级虽然会导致请求失败,但是不会导致阻塞。

      实现步骤:

      ①:在heima-leadnews-feign-api编写降级逻辑

      package com.heima.apis.article.fallback;
      import org.springframework.stereotype.Component;
      /**
       * feign失败配置
       * @author itheima
       */
      @Component
      public class IArticleClientFallback implements IArticleClient {
          @Override
          public ResponseResult saveArticle(ArticleDto dto)  {
              return ResponseResult.errorResult(AppHttpCodeEnum.SERVER_ERROR,"获取数据失败");
          }
      }

      在自媒体微服务中添加类,扫描降级代码类的包

      package com.heima.wemedia.config;
      import org.springframework.context.annotation.ComponentScan;
      import org.springframework.context.annotation.Configuration;
      @Configuration
      @ComponentScan("com.heima.apis.article.fallback")
      public class InitConfig {
      }

      ②:远程接口中指向降级代码

      package com.heima.apis.article;
      import org.springframework.web.bind.annotation.RequestBody;
      @FeignClient(value = "leadnews-article",fallback = IArticleClientFallback.class)
      public interface IArticleClient {
          @PostMapping("/api/v1/article/save")
          public ResponseResult saveArticle(@RequestBody ArticleDto dto);
      }

      ③:客户端开启降级heima-leadnews-wemedia

      在wemedia的nacos配置中心里添加如下内容,开启服务降级,也可以指定服务响应的超时的时间

      feign:
        # 开启feign对hystrix熔断降级的支持
        hystrix:
          enabled: true
        # 修改调用超时时间
        client:
          config:
            default:
              connectTimeout: 2000
              readTimeout: 2000

      ④:测试

      在ApArticleServiceImpl类中saveArticle方法添加代码

      try {
          Thread.sleep(3000);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }

      在自媒体端进行审核测试,会出现服务降级的现象

      5)发布文章提交审核集成

      5.1)同步调用与异步调用

      同步:就是在发出一个调用时,在没有得到结果之前, 该调用就不返回(实时处理)

      异步:调用在发出之后,这个调用就直接返回了,没有返回结果(分时处理)

      《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第24张

      异步线程的方式审核文章

      5.2)Springboot集成异步线程调用

      ①:在自动审核的方法上加上@Async注解(标明要异步调用)

      @Override
      @Async  //标明当前方法是一个异步方法
      public void autoScanWmNews(Integer id) {
      	//代码略
      }

      ②:在文章发布成功后调用审核的方法

      @Autowired
      private WmNewsAutoScanService wmNewsAutoScanService;
      /**
       * 发布修改文章或保存为草稿
       * @param dto
       * @return
       */
      @Override
      public ResponseResult submitNews(WmNewsDto dto) {
          //代码略
          //审核文章
          wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
          return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
      }

      ③:在自媒体引导类中使用@EnableAsync注解开启异步调用

      @SpringBootApplication
      @EnableDiscoveryClient
      @MapperScan("com.heima.wemedia.mapper")
      @EnableFeignClients(basePackages = "com.heima.apis")
      @EnableAsync  //开启异步调用
      public class WemediaApplication {
          public static void main(String[] args) {
              SpringApplication.run(WemediaApplication.class,args);
          }
          @Bean
          public MybatisPlusInterceptor mybatisPlusInterceptor() {
              MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
              interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
              return interceptor;
          }
      }

      6)文章审核功能-综合测试

      6.1)服务启动列表

      1,nacos服务端

      2,article微服务

      3,wemedia微服务

      4,启动wemedia网关微服务

      5,启动前端系统wemedia

      6.2)测试情况列表

      1,自媒体前端发布一篇正常的文章

      审核成功后,app端的article相关数据是否可以正常保存,自媒体文章状态和app端文章id是否回显

      2,自媒体前端发布一篇包含敏感词的文章

      正常是审核失败, wm_news表中的状态是否改变,成功和失败原因正常保存

      3,自媒体前端发布一篇包含敏感图片的文章

      正常是审核失败, wm_news表中的状态是否改变,成功和失败原因正常保存

      7)新需求-自管理敏感词

      7.1)需求分析

      文章审核功能已经交付了,文章也能正常发布审核。突然,产品经理过来说要开会。

      会议的内容核心有以下内容:

      • 文章审核不能过滤一些敏感词:

        私人侦探、针孔摄象、信用卡提现、广告代理、代开发票、刻章办、出售答案、小额贷款…

        需要完成的功能:

        需要自己维护一套敏感词,在文章审核的时候,需要验证文章是否包含这些敏感词

        7.2)敏感词-过滤

        技术选型

        方案

        说明

        数据库模糊查询

        效率太低

        String.indexOf("")查找

        数据库量大的话也是比较慢

        全文检索

        分词再匹配

        DFA算法

        确定有穷自动机(一种数据结构)

        7.3)DFA实现原理

        DFA全称为:Deterministic Finite Automaton,即确定有穷自动机。

        存储:一次性的把所有的敏感词存储到了多个map中,就是下图表示这种结构

        敏感词:冰毒、大麻、大坏蛋

        《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第25张

        检索的过程

        《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第26张
        7.4)自管理敏感词集成到文章审核中

        ①:创建敏感词表,导入资料中wm_sensitive到leadnews_wemedia库中

        《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第27张
        package com.heima.model.wemedia.pojos;
        import java.io.Serializable;
        import java.util.Date;
        /**
         * 

        * 敏感词信息表 *

        * * @author itheima */ @Data @TableName("wm_sensitive") public class WmSensitive implements Serializable { private static final long serialVersionUID = 1L; /** * 主键 */ @TableId(value = "id", type = IdType.AUTO) private Integer id; /** * 敏感词 */ @TableField("sensitives") private String sensitives; /** * 创建时间 */ @TableField("created_time") private Date createdTime; }

        ②:拷贝对应的wm_sensitive的mapper到项目中

        package com.heima.wemedia.mapper;
        import com.baomidou.mybatisplus.core.mapper.BaseMapper;
        import com.heima.model.wemedia.pojos.WmSensitive;
        import org.apache.ibatis.annotations.Mapper;
        @Mapper
        public interface WmSensitiveMapper extends BaseMapper {
        }

        ③:在文章审核的代码中添加自管理敏感词审核

        第一:在WmNewsAutoScanServiceImpl中的autoScanWmNews方法上添加如下代码

        //从内容中提取纯文本内容和图片
        //.....省略
        //自管理的敏感词过滤
        boolean isSensitive = handleSensitiveScan((String) textAndImages.get("content"), wmNews);
        if(!isSensitive) return;
        //2.审核文本内容  阿里云接口
        //.....省略
        测试了一下 源码不能检测 标题的敏感词汇;加了个这: wmNews.getTitle()+

        //自管理的敏感词过滤

        boolean isSensitive = handleSensitiveScan(

        wmNews.getTitle()+textAndImages.get("content"), wmNews);

        新增自管理敏感词审核代码

        @Autowired
        private WmSensitiveMapper wmSensitiveMapper;
        /**
             * 自管理的敏感词审核
             * @param content
             * @param wmNews
             * @return
             */
        private boolean handleSensitiveScan(String content, WmNews wmNews) {
            boolean flag = true;
            //获取所有的敏感词
            List wmSensitives = wmSensitiveMapper.selectList(Wrappers.lambdaQuery().select(WmSensitive::getSensitives));
            List sensitiveList = wmSensitives.stream().map(WmSensitive::getSensitives).collect(Collectors.toList());
            //初始化敏感词库
            SensitiveWordUtil.initMap(sensitiveList);
            //查看文章中是否包含敏感词
            Map map = SensitiveWordUtil.matchWords(content);
            if(map.size() >0){
                updateWmNews(wmNews,(short) 2,"当前文章中存在违规内容"+map);
                flag = false;
            }
            return flag;
        }

        8)新需求-图片识别文字审核敏感词

        8.1)需求分析

        产品经理召集开会,文章审核功能已经交付了,文章也能正常发布审核。对于上次提出的自管理敏感词也很满意,这次会议核心的内容如下:

        • 文章中包含的图片要识别文字,过滤掉图片文字的敏感词

          《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第28张
          8.2)图片文字识别

          什么是OCR?

          OCR (Optical Character Recognition,光学字符识别)是指电子设备(例如扫描仪或数码相机)检查纸上打印的字符,通过检测暗、亮的模式确定其形状,然后用字符识别方法将形状翻译成计算机文字的过程

          方案

          说明

          百度OCR

          收费

          Tesseract-OCR

          Google维护的开源OCR引擎,支持Java,Python等语言调用

          Tess4J

          封装了Tesseract-OCR ,支持Java调用

          《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第29张
          8 .3)Tess4j案例

          ①:创建项目导入tess4j对应的依赖

          
              net.sourceforge.tess4j
              tess4j
              4.1.1
          

          ②:导入中文字体库, 把资料中的tessdata文件夹拷贝到自己的工作空间下

          《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第30张

          ③:编写测试类进行测试

          package com.heima.tess4j;
          import net.sourceforge.tess4j.ITesseract;
          import net.sourceforge.tess4j.Tesseract;
          import java.io.File;
          public class Application {
              public static void main(String[] args) {
                  try {
                      //获取本地图片
                      File file = new File("D:\26.png");
                      //创建Tesseract对象
                      ITesseract tesseract = new Tesseract();
                      //设置字体库路径
                      tesseract.setDatapath("D:\workspace\tessdata");
                      //中文识别
                      tesseract.setLanguage("chi_sim");
                      //执行ocr识别
                      String result = tesseract.doOCR(file);
                      //替换回车和tal键  使结果为一行
                      result = result.replaceAll("\r|\n","-").replaceAll(" ","");
                      System.out.println("识别的结果为:"+result);
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }
          8.4)管理敏感词和图片文字识别集成到文章审核

          ①:在heima-leadnews-common中创建工具类,简单封装一下tess4j

          需要先导入pom

          
              net.sourceforge.tess4j
              tess4j
              4.1.1
          

          工具类

          package com.heima.common.tess4j;
          import lombok.Getter;
          import lombok.Setter;
          import net.sourceforge.tess4j.ITesseract;
          import net.sourceforge.tess4j.Tesseract;
          import net.sourceforge.tess4j.TesseractException;
          import org.springframework.boot.context.properties.ConfigurationProperties;
          import org.springframework.stereotype.Component;
          import java.awt.image.BufferedImage;
          @Getter
          @Setter
          @Component
          @ConfigurationProperties(prefix = "tess4j")
          public class Tess4jClient {
              private String dataPath;
              private String language;
              public String doOCR(BufferedImage image) throws TesseractException {
                  //创建Tesseract对象
                  ITesseract tesseract = new Tesseract();
                  //设置字体库路径
                  tesseract.setDatapath(dataPath);
                  //中文识别
                  tesseract.setLanguage(language);
                  //执行ocr识别
                  String result = tesseract.doOCR(image);
                  //替换回车和tal键  使结果为一行
                  result = result.replaceAll("\r|\n", "-").replaceAll(" ", "");
                  return result;
              }
          }

          在spring.factories配置中添加该类,完整如下:

          org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
            com.heima.common.exception.ExceptionCatch,\
            com.heima.common.swagger.SwaggerConfiguration,\
            com.heima.common.swagger.Swagger2Configuration,\
            com.heima.common.aliyun.GreenTextScan,\
            com.heima.common.aliyun.GreenImageScan,\
            com.heima.common.tess4j.Tess4jClient

          ②:在heima-leadnews-wemedia中的配置中添加两个属性

          tess4j:
            data-path: D:\workspace\tessdata
            language: chi_sim

          ③:在WmNewsAutoScanServiceImpl中的handleImageScan方法上添加如下代码

          try {
              for (String image : images) {
                  byte[] bytes = fileStorageService.downLoadFile(image);
                  //图片识别文字审核---begin-----
                  //从byte[]转换为butteredImage
                  ByteArrayInputStream in = new ByteArrayInputStream(bytes);
                  BufferedImage imageFile = ImageIO.read(in);
                  //识别图片的文字
                  String result = tess4jClient.doOCR(imageFile);
                  //审核是否包含自管理的敏感词
                  boolean isSensitive = handleSensitiveScan(result, wmNews);
                  if(!isSensitive){
                      return isSensitive;
                  }
                  //图片识别文字审核---end-----
                  imageList.add(bytes);
              } 
          }catch (Exception e){
              e.printStackTrace();
          }

          最后附上文章审核的完整代码如下:

          package com.heima.wemedia.service.impl;
          import com.alibaba.fastjson.JSONArray;
          import com.baomidou.mybatisplus.core.toolkit.Wrappers;
          import com.heima.apis.article.IArticleClient;
          import com.heima.common.aliyun.GreenImageScan;
          import com.heima.common.aliyun.GreenTextScan;
          import com.heima.common.tess4j.Tess4jClient;
          import com.heima.file.service.FileStorageService;
          import com.heima.model.article.dtos.ArticleDto;
          import com.heima.model.common.dtos.ResponseResult;
          import com.heima.model.wemedia.pojos.WmChannel;
          import com.heima.model.wemedia.pojos.WmNews;
          import com.heima.model.wemedia.pojos.WmSensitive;
          import com.heima.model.wemedia.pojos.WmUser;
          import com.heima.utils.common.SensitiveWordUtil;
          import com.heima.wemedia.mapper.WmChannelMapper;
          import com.heima.wemedia.mapper.WmNewsMapper;
          import com.heima.wemedia.mapper.WmSensitiveMapper;
          import com.heima.wemedia.mapper.WmUserMapper;
          import com.heima.wemedia.service.WmNewsAutoScanService;
          import lombok.extern.slf4j.Slf4j;
          import org.apache.commons.lang3.StringUtils;
          import org.springframework.beans.BeanUtils;
          import org.springframework.beans.factory.annotation.Autowired;
          import org.springframework.scheduling.annotation.Async;
          import org.springframework.stereotype.Service;
          import org.springframework.transaction.annotation.Transactional;
          import javax.imageio.ImageIO;
          import java.awt.image.BufferedImage;
          import java.io.ByteArrayInputStream;
          import java.util.*;
          import java.util.stream.Collectors;
          @Service
          @Slf4j
          @Transactional
          public class WmNewsAutoScanServiceImpl implements WmNewsAutoScanService {
              @Autowired
              private WmNewsMapper wmNewsMapper;
              /**
               * 自媒体文章审核
               *
               * @param id 自媒体文章id
               */
              @Override
              @Async  //标明当前方法是一个异步方法
              public void autoScanWmNews(Integer id) {
          //        int a = 1/0;
                  //1.查询自媒体文章
                  WmNews wmNews = wmNewsMapper.selectById(id);
                  if (wmNews == null) {
                      throw new RuntimeException("WmNewsAutoScanServiceImpl-文章不存在");
                  }
                  if (wmNews.getStatus().equals(WmNews.Status.SUBMIT.getCode())) {
                      //从内容中提取纯文本内容和图片
                      Map textAndImages = handleTextAndImages(wmNews);
                      //自管理的敏感词过滤
                      boolean isSensitive = handleSensitiveScan((String) textAndImages.get("content"), wmNews);
                      if(!isSensitive) return;
                      //2.审核文本内容  阿里云接口
                      boolean isTextScan = handleTextScan((String) textAndImages.get("content"), wmNews);
                      if (!isTextScan) return;
                      //3.审核图片  阿里云接口
                      boolean isImageScan = handleImageScan((List) textAndImages.get("images"), wmNews);
                      if (!isImageScan) return;
                      //4.审核成功,保存app端的相关的文章数据
                      ResponseResult responseResult = saveAppArticle(wmNews);
                      if (!responseResult.getCode().equals(200)) {
                          throw new RuntimeException("WmNewsAutoScanServiceImpl-文章审核,保存app端相关文章数据失败");
                      }
                      //回填article_id
                      wmNews.setArticleId((Long) responseResult.getData());
                      updateWmNews(wmNews, (short) 9, "审核成功");
                  }
              }
              @Autowired
              private WmSensitiveMapper wmSensitiveMapper;
              /**
               * 自管理的敏感词审核
               * @param content
               * @param wmNews
               * @return
               */
              private boolean handleSensitiveScan(String content, WmNews wmNews) {
                  boolean flag = true;
                  //获取所有的敏感词
                  List wmSensitives = wmSensitiveMapper.selectList(Wrappers.lambdaQuery().select(WmSensitive::getSensitives));
                  List sensitiveList = wmSensitives.stream().map(WmSensitive::getSensitives).collect(Collectors.toList());
                  //初始化敏感词库
                  SensitiveWordUtil.initMap(sensitiveList);
                  //查看文章中是否包含敏感词
                  Map map = SensitiveWordUtil.matchWords(content);
                  if(map.size() >0){
                      updateWmNews(wmNews,(short) 2,"当前文章中存在违规内容"+map);
                      flag = false;
                  }
                  return flag;
              }
              @Autowired
              private IArticleClient articleClient;
              @Autowired
              private WmChannelMapper wmChannelMapper;
              @Autowired
              private WmUserMapper wmUserMapper;
              /**
               * 保存app端相关的文章数据
               *
               * @param wmNews
               */
              private ResponseResult saveAppArticle(WmNews wmNews) {
                  ArticleDto dto = new ArticleDto();
                  //属性的拷贝
                  BeanUtils.copyProperties(wmNews, dto);
                  //文章的布局
                  dto.setLayout(wmNews.getType());
                  //频道
                  WmChannel wmChannel = wmChannelMapper.selectById(wmNews.getChannelId());
                  if (wmChannel != null) {
                      dto.setChannelName(wmChannel.getName());
                  }
                  //作者
                  dto.setAuthorId(wmNews.getUserId().longValue());
                  WmUser wmUser = wmUserMapper.selectById(wmNews.getUserId());
                  if (wmUser != null) {
                      dto.setAuthorName(wmUser.getName());
                  }
                  //设置文章id
                  if (wmNews.getArticleId() != null) {
                      dto.setId(wmNews.getArticleId());
                  }
                  dto.setCreatedTime(new Date());
                  ResponseResult responseResult = articleClient.saveArticle(dto);
                  return responseResult;
              }
              @Autowired
              private FileStorageService fileStorageService;
              @Autowired
              private GreenImageScan greenImageScan;
              @Autowired
              private Tess4jClient tess4jClient;
              /**
               * 审核图片
               *
               * @param images
               * @param wmNews
               * @return
               */
              private boolean handleImageScan(List images, WmNews wmNews) {
                  boolean flag = true;
                  if (images == null || images.size() == 0) {
                      return flag;
                  }
                  //下载图片 minIO
                  //图片去重
                  images = images.stream().distinct().collect(Collectors.toList());
                  List imageList = new ArrayList<>();
                  try {
                      for (String image : images) {
                          byte[] bytes = fileStorageService.downLoadFile(image);
                          //图片识别文字审核---begin-----
                          //从byte[]转换为butteredImage
                          ByteArrayInputStream in = new ByteArrayInputStream(bytes);
                          BufferedImage imageFile = ImageIO.read(in);
                          //识别图片的文字
                          String result = tess4jClient.doOCR(imageFile);
                          //审核是否包含自管理的敏感词
                          boolean isSensitive = handleSensitiveScan(result, wmNews);
                          if(!isSensitive){
                              return isSensitive;
                          }
                          //图片识别文字审核---end-----
                          imageList.add(bytes);
                      }
                  }catch (Exception e){
                      e.printStackTrace();
                  }
                  //审核图片
                  try {
                      Map map = greenImageScan.imageScan(imageList);
                      if (map != null) {
                          //审核失败
                          if (map.get("suggestion").equals("block")) {
                              flag = false;
                              updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
                          }
                          //不确定信息  需要人工审核
                          if (map.get("suggestion").equals("review")) {
                              flag = false;
                              updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
                          }
                      }
                  } catch (Exception e) {
                      flag = false;
                      e.printStackTrace();
                  }
                  return flag;
              }
              @Autowired
              private GreenTextScan greenTextScan;
              /**
               * 审核纯文本内容
               *
               * @param content
               * @param wmNews
               * @return
               */
              private boolean handleTextScan(String content, WmNews wmNews) {
                  boolean flag = true;
                  if ((wmNews.getTitle() + "-" + content).length() == 0) {
                      return flag;
                  }
                  try {
                      Map map = greenTextScan.greeTextScan((wmNews.getTitle() + "-" + content));
                      if (map != null) {
                          //审核失败
                          if (map.get("suggestion").equals("block")) {
                              flag = false;
                              updateWmNews(wmNews, (short) 2, "当前文章中存在违规内容");
                          }
                          //不确定信息  需要人工审核
                          if (map.get("suggestion").equals("review")) {
                              flag = false;
                              updateWmNews(wmNews, (short) 3, "当前文章中存在不确定内容");
                          }
                      }
                  } catch (Exception e) {
                      flag = false;
                      e.printStackTrace();
                  }
                  return flag;
              }
              /**
               * 修改文章内容
               *
               * @param wmNews
               * @param status
               * @param reason
               */
              private void updateWmNews(WmNews wmNews, short status, String reason) {
                  wmNews.setStatus(status);
                  wmNews.setReason(reason);
                  wmNewsMapper.updateById(wmNews);
              }
              /**
               * 1。从自媒体文章的内容中提取文本和图片
               * 2.提取文章的封面图片
               *
               * @param wmNews
               * @return
               */
              private Map handleTextAndImages(WmNews wmNews) {
                  //存储纯文本内容
                  StringBuilder stringBuilder = new StringBuilder();
                  List images = new ArrayList<>();
                  //1。从自媒体文章的内容中提取文本和图片
                  if (StringUtils.isNotBlank(wmNews.getContent())) {
                      List maps = JSONArray.parseArray(wmNews.getContent(), Map.class);
                      for (Map map : maps) {
                          if (map.get("type").equals("text")) {
                              stringBuilder.append(map.get("value"));
                          }
                          if (map.get("type").equals("image")) {
                              images.add((String) map.get("value"));
                          }
                      }
                  }
                  //2.提取文章的封面图片
                  if (StringUtils.isNotBlank(wmNews.getImages())) {
                      String[] split = wmNews.getImages().split(",");
                      images.addAll(Arrays.asList(split));
                  }
                  Map resultMap = new HashMap<>();
                  resultMap.put("content", stringBuilder.toString());
                  resultMap.put("images", images);
                  return resultMap;
              }
          }

          9)文章详情-静态文件生成

          9.1)思路分析

          文章端创建app相关文章时,生成文章详情静态页上传到MinIO中

          《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第31张
          9.2)实现步骤

          1.新建ArticleFreemarkerService创建静态文件并上传到minIO中

          package com.heima.article.service;
          import com.heima.model.article.pojos.ApArticle;
          public interface ArticleFreemarkerService {
              /**
               * 生成静态文件上传到minIO中
               * @param apArticle
               * @param content
               */
              public void buildArticleToMinIO(ApArticle apArticle,String content);
          }

          实现

          package com.heima.article.service.impl;
          import java.util.Map;
          @Service
          @Slf4j
          @Transactional
          public class ArticleFreemarkerServiceImpl implements ArticleFreemarkerService {
              @Autowired
              private ApArticleContentMapper apArticleContentMapper;
              @Autowired
              private Configuration configuration;
              @Autowired
              private FileStorageService fileStorageService;
              @Autowired
              private ApArticleService apArticleService;
              /**
               * 生成静态文件上传到minIO中
               * @param apArticle
               * @param content
               */
              @Async
              @Override
              public void buildArticleToMinIO(ApArticle apArticle, String content) {
                  //已知文章的id
                  //4.1 获取文章内容
                  if(StringUtils.isNotBlank(content)){
                      //4.2 文章内容通过freemarker生成html文件
                      Template template = null;
                      StringWriter out = new StringWriter();
                      try {
                          template = configuration.getTemplate("article.ftl");
                          //数据模型
                          Map contentDataModel = new HashMap<>();
                          contentDataModel.put("content", JSONArray.parseArray(content));
                          //合成
                          template.process(contentDataModel,out);
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                      //4.3 把html文件上传到minio中
                      InputStream in = new ByteArrayInputStream(out.toString().getBytes());
                      String path = fileStorageService.uploadHtmlFile("", apArticle.getId() + ".html", in);
                      //4.4 修改ap_article表,保存static_url字段
                      apArticleService.update(Wrappers.lambdaUpdate().eq(ApArticle::getId,apArticle.getId())
                              .set(ApArticle::getStaticUrl,path));
                  }
              }
          }

          2.在ApArticleService的saveArticle实现方法中添加调用生成文件的方法

          /**
               * 保存app端相关文章
               * @param dto
               * @return
               */
          @Override
          public ResponseResult saveArticle(ArticleDto dto) {
              //        try {
              //            Thread.sleep(3000);
              //        } catch (InterruptedException e) {
              //            e.printStackTrace();
              //        }
              //1.检查参数
              if(dto == null){
                  return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
              }
              ApArticle apArticle = new ApArticle();
              BeanUtils.copyProperties(dto,apArticle);
              //2.判断是否存在id
              if(dto.getId() == null){
                  //2.1 不存在id  保存  文章  文章配置  文章内容
                  //保存文章
                  save(apArticle);
                  //保存配置
                  ApArticleConfig apArticleConfig = new ApArticleConfig(apArticle.getId());
                  apArticleConfigMapper.insert(apArticleConfig);
                  //保存 文章内容
                  ApArticleContent apArticleContent = new ApArticleContent();
                  apArticleContent.setArticleId(apArticle.getId());
                  apArticleContent.setContent(dto.getContent());
                  apArticleContentMapper.insert(apArticleContent);
              }else {
                  //2.2 存在id   修改  文章  文章内容
                  //修改  文章
                  updateById(apArticle);
                  //修改文章内容
                  ApArticleContent apArticleContent = apArticleContentMapper.selectOne(Wrappers.lambdaQuery().eq(ApArticleContent::getArticleId, dto.getId()));
                  apArticleContent.setContent(dto.getContent());
                  apArticleContentMapper.updateById(apArticleContent);
              }
              //异步调用 生成静态文件上传到minio中
              articleFreemarkerService.buildArticleToMinIO(apArticle,dto.getContent());
              //3.结果返回  文章的id
              return ResponseResult.okResult(apArticle.getId());
          }

          3.文章微服务开启异步调用

          《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第32张

          05延迟任务精准发布文章

          《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第33张

          1)文章定时发布

          2)延迟任务概述

          2.1)什么是延迟任务
          • 定时任务:有固定周期的,有明确的触发时间

            • 延迟队列:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟

              《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第34张

              应用场景:

              场景一:

              订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消

              场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止

              2.2)技术对比
              2.2.1)DelayQueue

              JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素

              《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第35张

              DelayQueue属于排序队列,它的特殊之处在于队列的元素必须实现Delayed接口,该接口需要实现compareTo和getDelay方法

              getDelay方法:获取元素在队列中的剩余时间,只有当剩余时间为0时元素才可以出队列。

              compareTo方法:用于排序,确定元素出队列的顺序。

              实现:

              1:在测试包jdk下创建延迟任务元素对象DelayedTask,实现compareTo和getDelay方法,

              2:在main方法中创建DelayQueue并向延迟队列中添加三个延迟任务,

              3:循环的从延迟队列中拉取任务

              public class DelayedTask  implements Delayed{
                  
                  // 任务的执行时间
                  private int executeTime = 0;
                  
                  public DelayedTask(int delay){
                      Calendar calendar = Calendar.getInstance();
                      calendar.add(Calendar.SECOND,delay);
                      this.executeTime = (int)(calendar.getTimeInMillis() /1000 );
                  }
                  /**
                   * 元素在队列中的剩余时间
                   * @param unit
                   * @return
                   */
                  @Override
                  public long getDelay(TimeUnit unit) {
                      Calendar calendar = Calendar.getInstance();
                      return executeTime - (calendar.getTimeInMillis()/1000);
                  }
                  /**
                   * 元素排序
                   * @param o
                   * @return
                   */
                  @Override
                  public int compareTo(Delayed o) {
                      long val = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
                      return val == 0 ? 0 : ( val < 0 ? -1: 1 );
                  }
                  public static void main(String[] args) {
                      DelayQueue queue = new DelayQueue();
                      
                      queue.add(new DelayedTask(5));
                      queue.add(new DelayedTask(10));
                      queue.add(new DelayedTask(15));
                      System.out.println(System.currentTimeMillis()/1000+" start consume ");
                      while(queue.size() != 0){
                          DelayedTask delayedTask = queue.poll();
                          if(delayedTask !=null ){
                              System.out.println(System.currentTimeMillis()/1000+" cosume task");
                          }
                          //每隔一秒消费一次
                          try {
                              Thread.sleep(1000);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }     
                  }
              }

              DelayQueue实现完成之后思考一个问题:

              使用线程池或者原生DelayQueue程序挂掉之后,任务都是放在内存,需要考虑未处理消息的丢失带来的影响,如何保证数据不丢失,需要持久化(磁盘)

              2.2.2)RabbitMQ实现延迟任务
              • TTL:Time To Live (消息存活时间)

                • 死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第36张
                  2.2.3)redis实现

                  zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第37张 《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第38张

                  3)redis实现延迟任务

                  实现思路

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第39张

                  问题思路

                  1.为什么任务需要存储在数据库中?

                  延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。

                  2.为什么redis中使用两种数据类型,list和zset?

                  效率问题,算法的时间复杂度; list是双向链表

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第40张

                  3.在添加zset数据的时候,为什么不需要预加载?

                  任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止zset阻塞,只需要把未来几分钟要执行的数据存入缓存即可。

                  锐评:完全为了学list zset而编出来的场景,实际工作中延迟队列要设计成这样只能说太蠢了

                  实际工作绝对用MQ

                  4)延迟任务服务实现

                  4.1)搭建heima-leadnews-schedule模块

                  leadnews-schedule是一个通用的服务,单独创建模块来管理任何类型的延迟任务

                  ①:导入资料文件夹的heima-leadnews-schedule模块到heima-leadnews-service下,如下图所示:

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第41张

                  ②:添加bootstrap.yml

                  server:
                    port: 51701
                  spring:
                    application:
                      name: leadnews-schedule
                    cloud:
                      nacos:
                        discovery:
                          server-addr: 192.168.200.130:8848
                        config:
                          server-addr: 192.168.200.130:8848
                          file-extension: yml

                  ③:在nacos中添加对应配置,并添加数据库及mybatis-plus的配置

                  spring:
                    datasource:
                      driver-class-name: com.mysql.jdbc.Driver
                      url: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
                      username: root
                      password: root
                  # 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
                  mybatis-plus:
                    mapper-locations: classpath*:mapper/*.xml
                    # 设置别名包扫描路径,通过该属性可以给包中的类注册别名
                    type-aliases-package: com.heima.model.schedule.pojos
                  4.2)数据库准备

                  导入资料中leadnews_schedule数据库

                  taskinfo 任务表

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第42张

                  实体类

                  package com.heima.model.schedule.pojos;
                  import java.io.Serializable;
                  import java.util.Date;
                  /**
                   * 

                  * *

                  * * @author itheima */ @Data @TableName("taskinfo") public class Taskinfo implements Serializable { private static final long serialVersionUID = 1L; /** * 任务id */ @TableId(type = IdType.ID_WORKER) private Long taskId; /** * 执行时间 */ @TableField("execute_time") private Date executeTime; /** * 参数 */ @TableField("parameters") private byte[] parameters; /** * 优先级 */ @TableField("priority") private Integer priority; /** * 任务类型 */ @TableField("task_type") private Integer taskType; }

                  taskinfo_logs 任务日志表

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第43张 《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第44张

                  实体类

                  package com.heima.model.schedule.pojos;
                  import java.io.Serializable;
                  import java.util.Date;
                  /**
                   * 

                  * *

                  * * @author itheima */ @Data @TableName("taskinfo_logs") public class TaskinfoLogs implements Serializable { private static final long serialVersionUID = 1L; /** * 任务id */ @TableId(type = IdType.ID_WORKER) private Long taskId; /** * 执行时间 */ @TableField("execute_time") private Date executeTime; /** * 参数 */ @TableField("parameters") private byte[] parameters; /** * 优先级 */ @TableField("priority") private Integer priority; /** * 任务类型 */ @TableField("task_type") private Integer taskType; /** * 版本号,用乐观锁 */ @Version private Integer version; /** * 状态 0=int 1=EXECUTED 2=CANCELLED */ @TableField("status") private Integer status; }
                  乐观锁/悲观锁
                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第45张

                  悲观锁效率低;

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第46张

                  乐观锁支持:

                  /**
                       * mybatis-plus乐观锁支持
                       * @return
                       */
                  @Bean
                  public MybatisPlusInterceptor optimisticLockerInterceptor(){
                      MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
                      interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());
                      return interceptor;
                  }
                  4.3)安装redis

                  ①拉取镜像

                  docker pull redis

                  ② 创建容器

                  docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"

                  ③链接测试

                  打开资料中的Redis Desktop Manager,输入host、port、password链接测试

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第47张

                  能链接成功,即可

                  4.4)项目集成redis

                  ① 在项目导入redis相关依赖,已经完成

                  
                      org.springframework.boot
                      spring-boot-starter-data-redis
                  
                  
                  
                      org.apache.commons
                      commons-pool2
                  

                  ② 在heima-leadnews-schedule中集成redis,添加以下nacos配置,链接上redis

                  spring:
                    redis:
                      host: 192.168.200.130
                      password: leadnews
                      port: 6379

                  ③ 拷贝资料文件夹下的类:CacheService到heima-leadnews-common模块下,并添加自动配置

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第48张

                  ④:测试

                  package com.heima.schedule.test;
                  import java.util.Set;
                  @SpringBootTest(classes = ScheduleApplication.class)
                  @RunWith(SpringRunner.class)
                  public class RedisTest {
                      @Autowired
                      private CacheService cacheService;
                      @Test
                      public void testList(){
                          //在list的左边添加元素
                  //        cacheService.lLeftPush("list_001","hello,redis");
                          //在list的右边获取元素,并删除
                          String list_001 = cacheService.lRightPop("list_001");
                          System.out.println(list_001);
                      }
                      @Test
                      public void testZset(){
                          //添加数据到zset中  分值
                          /*cacheService.zAdd("zset_key_001","hello zset 001",1000);
                          cacheService.zAdd("zset_key_001","hello zset 002",8888);
                          cacheService.zAdd("zset_key_001","hello zset 003",7777);
                          cacheService.zAdd("zset_key_001","hello zset 004",999999);*/
                          //按照分值获取数据
                          Set zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);
                          System.out.println(zset_key_001);
                      }
                  }
                  4.5)添加任务

                  ①:拷贝mybatis-plus生成的文件,mapper

                  ②:创建task类,用于接收添加任务的参数

                  package com.heima.model.schedule.dtos;
                  import lombok.Data;
                  import java.io.Serializable;
                  @Data
                  public class Task implements Serializable {
                      /**
                       * 任务id
                       */
                      private Long taskId;
                      /**
                       * 类型
                       */
                      private Integer taskType;
                      /**
                       * 优先级
                       */
                      private Integer priority;
                      /**
                       * 执行id
                       */
                      private long executeTime;
                      /**
                       * task参数
                       */
                      private byte[] parameters;
                      
                  }

                  ③:创建TaskService

                  package com.heima.schedule.service;
                  import com.heima.model.schedule.dtos.Task;
                  /**
                   * 对外访问接口
                   */
                  public interface TaskService {
                      /**
                       * 添加任务
                       * @param task   任务对象
                       * @return       任务id
                       */
                      public long addTask(Task task) ;
                  }

                  实现:

                  package com.heima.schedule.service.impl;
                  import java.util.Calendar;
                  import java.util.Date;
                  @Service
                  @Transactional
                  @Slf4j
                  public class TaskServiceImpl implements TaskService {
                      /**
                       * 添加延迟任务
                       *
                       * @param task
                       * @return
                       */
                      @Override
                      public long addTask(Task task) {
                          //1.添加任务到数据库中
                          boolean success = addTaskToDb(task);
                          if (success) {
                              //2.添加任务到redis
                              addTaskToCache(task);
                          }
                          return task.getTaskId();
                      }
                      @Autowired
                      private CacheService cacheService;
                      /**
                       * 把任务添加到redis中
                       *
                       * @param task
                       */
                      private void addTaskToCache(Task task) {
                          String key = task.getTaskType() + "_" + task.getPriority();
                          //获取5分钟之后的时间  毫秒值
                          Calendar calendar = Calendar.getInstance();
                          calendar.add(Calendar.MINUTE, 5);
                          long nextScheduleTime = calendar.getTimeInMillis();
                          //2.1 如果任务的执行时间小于等于当前时间,存入list
                          if (task.getExecuteTime() <= System.currentTimeMillis()) {
                              cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
                          } else if (task.getExecuteTime() <= nextScheduleTime) {
                              //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
                              cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
                          }
                      }
                      @Autowired
                      private TaskinfoMapper taskinfoMapper;
                      @Autowired
                      private TaskinfoLogsMapper taskinfoLogsMapper;
                      /**
                       * 添加任务到数据库中
                       *
                       * @param task
                       * @return
                       */
                      private boolean addTaskToDb(Task task) {
                          boolean flag = false;
                          try {
                              //保存任务表
                              Taskinfo taskinfo = new Taskinfo();
                              BeanUtils.copyProperties(task, taskinfo);
                              taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
                              taskinfoMapper.insert(taskinfo);
                              //设置taskID
                              task.setTaskId(taskinfo.getTaskId());
                              //保存任务日志数据
                              TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
                              BeanUtils.copyProperties(taskinfo, taskinfoLogs);
                              taskinfoLogs.setVersion(1);
                              taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
                              taskinfoLogsMapper.insert(taskinfoLogs);
                              flag = true;
                          } catch (Exception e) {
                              e.printStackTrace();
                          }
                          return flag;
                      }
                  }

                  ScheduleConstants常量类

                  package com.heima.common.constants;
                  public class ScheduleConstants {
                      //task状态
                      public static final int SCHEDULED=0;   //初始化状态
                      public static final int EXECUTED=1;       //已执行状态
                      public static final int CANCELLED=2;   //已取消状态
                      public static String FUTURE="future_";   //未来数据key前缀
                      public static String TOPIC="topic_";     //当前数据key前缀
                  }

                  ④:测试

                  4.6)取消任务

                  在TaskService中添加方法

                  /**
                       * 取消任务
                       * @param taskId        任务id
                       * @return              取消结果
                       */
                  public boolean cancelTask(long taskId);

                  实现

                  /**
                       * 取消任务
                       * @param taskId
                       * @return
                       */
                  @Override
                  public boolean cancelTask(long taskId) {
                      boolean flag = false;
                      //删除任务,更新日志
                      Task task = updateDb(taskId,ScheduleConstants.EXECUTED);
                      //删除redis的数据
                      if(task != null){
                          removeTaskFromCache(task);
                          flag = true;
                      }
                      return false;
                  }
                  /**
                       * 删除redis中的任务数据
                       * @param task
                       */
                  private void removeTaskFromCache(Task task) {
                      String key = task.getTaskType()+"_"+task.getPriority();
                      if(task.getExecuteTime()<=System.currentTimeMillis()){
                          cacheService.lRemove(ScheduleConstants.TOPIC+key,0,JSON.toJSONString(task));
                      }else {
                          cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));
                      }
                  }
                  /**
                       * 删除任务,更新任务日志状态
                       * @param taskId
                       * @param status
                       * @return
                       */
                  private Task updateDb(long taskId, int status) {
                      Task task = null;
                      try {
                          //删除任务
                          taskinfoMapper.deleteById(taskId);
                          TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
                          taskinfoLogs.setStatus(status);
                          taskinfoLogsMapper.updateById(taskinfoLogs);
                          task = new Task();
                          BeanUtils.copyProperties(taskinfoLogs,task);
                          task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
                      }catch (Exception e){
                          log.error("task cancel exception taskid={}",taskId);
                      }
                      return task;
                  }

                  测试

                  4.7)消费任务

                  在TaskService中添加方法

                  /**
                   * 按照类型和优先级来拉取任务
                   * @param type
                   * @param priority
                   * @return
                   */
                  public Task poll(int type,int priority);

                  实现

                  /**
                       * 按照类型和优先级拉取任务
                       * @return
                       */
                  @Override
                  public Task poll(int type,int priority) {
                      Task task = null;
                      try {
                          String key = type+"_"+priority;
                          String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
                          if(StringUtils.isNotBlank(task_json)){
                              task = JSON.parseObject(task_json, Task.class);
                              //更新数据库信息
                              updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);
                          }
                      }catch (Exception e){
                          e.printStackTrace();
                          log.error("poll task exception");
                      }
                      return task;
                  }
                  4.8)未来数据定时刷新
                  4.8.1)reids key值匹配

                  方案1:keys 模糊匹配

                  keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用!开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第49张

                  方案2:scan

                  SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后, 都会向用户返回一个新的游标, 用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数, 以此来延续之前的迭代过程。

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第50张

                  代码案例:

                  @Test
                  public void testKeys(){
                      Set keys = cacheService.keys("future_*");
                      System.out.println(keys);
                      Set scan = cacheService.scan("future_*");
                      System.out.println(scan);
                  }
                  4.8.2)reids管道

                  普通redis客户端和服务器交互模式 性能很低

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第51张

                  Pipeline请求模型

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第52张

                  官方测试结果数据对比

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第53张

                  测试案例对比:

                  //耗时6151
                  @Test
                  public  void testPiple1(){
                      long start =System.currentTimeMillis();
                      for (int i = 0; i <10000 ; i++) {
                          Task task = new Task();
                          task.setTaskType(1001);
                          task.setPriority(1);
                          task.setExecuteTime(new Date().getTime());
                          cacheService.lLeftPush("1001_1", JSON.toJSONString(task));
                      }
                      System.out.println("耗时"+(System.currentTimeMillis()- start));
                  }
                  @Test
                  public void testPiple2(){
                      long start  = System.currentTimeMillis();
                      //使用管道技术
                      List objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback() {
                          @Nullable
                          @Override
                          public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
                              for (int i = 0; i <10000 ; i++) {
                                  Task task = new Task();
                                  task.setTaskType(1001);
                                  task.setPriority(1);
                                  task.setExecuteTime(new Date().getTime());
                                  redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());
                              }
                              return null;
                          }
                      });
                      System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
                  } 
                   
                  4.8.3)未来数据定时刷新-功能完成
                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第54张

                  在TaskService中添加方法

                  @Scheduled(cron = "0 */1 * * * ?")//定时 (每分钟执行一次
                  //{秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
                  public void refresh() {
                      System.out.println(System.currentTimeMillis() / 1000 + "执行了定时任务");
                      // 获取所有未来数据集合的key值
                      Set futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*
                      for (String futureKey : futureKeys) { // future_250_250
                          String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
                          //获取该组key下当前需要消费的任务数据
                          Set tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
                          if (!tasks.isEmpty()) {
                              //将这些任务数据添加到消费者队列中
                              cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
                              System.out.println("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");
                          }
                      }
                  }

                  在引导类中添加开启任务调度注解:@EnableScheduling

                  4.9)分布式锁解决集群下的方法抢占执行
                  4.9.1)问题描述

                  启动两台heima-leadnews-schedule服务,每台服务都会去执行refresh定时任务方法

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第55张 《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第56张 《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第57张
                  4.9.2)分布式锁

                  分布式锁:控制分布式系统有序的去对共享资源进行操作,通过互斥来保证数据的一致性。

                  解决方案:

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第58张
                  4.9.3)redis分布式锁

                  sexnx (SET if Not eXists)命令在指定的 key 不存在时,为 key 设置指定的值。

                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第59张

                  这种加锁的思路是,如果 key 不存在则为 key 设置 value,如果 key 已存在则 SETNX 命令不做任何操作

                  • 客户端A请求服务器设置key的值,如果设置成功就表示加锁成功

                    • 客户端B也去请求服务器设置key的值,如果返回失败,那么就代表加锁失败

                      • 客户端A执行代码完成,删除锁

                        • 客户端B在等待一段时间后再去请求设置key的值,设置成功

                          • 客户端B执行代码完成,删除锁

                            4.9.4)在工具类CacheService中添加方法
                            /**
                             * 加锁
                             *
                             * @param name
                             * @param expire
                             * @return
                             */
                            public String tryLock(String name, long expire) {
                                name = name + "_lock";
                                String token = UUID.randomUUID().toString();
                                RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();
                                RedisConnection conn = factory.getConnection();
                                try {
                                    //参考redis命令:
                                    //set key value [EX seconds] [PX milliseconds] [NX|XX]
                                    Boolean result = conn.set(
                                            name.getBytes(),
                                            token.getBytes(),
                                            Expiration.from(expire, TimeUnit.MILLISECONDS),
                                            RedisStringCommands.SetOption.SET_IF_ABSENT //NX
                                    );
                                    if (result != null && result)
                                        return token;
                                } finally {
                                    RedisConnectionUtils.releaseConnection(conn, factory,false);
                                }
                                return null;
                            }

                            修改未来数据定时刷新的方法,如下:

                            /**
                             * 未来数据定时刷新
                             */
                            @Scheduled(cron = "0 */1 * * * ?")
                            public void refresh(){
                                String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
                                if(StringUtils.isNotBlank(token)){
                                    log.info("未来数据定时刷新---定时任务");
                                    //获取所有未来数据的集合key
                                    Set futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
                                    for (String futureKey : futureKeys) {//future_100_50
                                        //获取当前数据的key  topic
                                        String topicKey = ScheduleConstants.TOPIC+futureKey.split(ScheduleConstants.FUTURE)[1];
                                        //按照key和分值查询符合条件的数据
                                        Set tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
                                        //同步数据
                                        if(!tasks.isEmpty()){
                                            cacheService.refreshWithPipeline(futureKey,topicKey,tasks);
                                            log.info("成功的将"+futureKey+"刷新到了"+topicKey);
                                        }
                                    }
                                }
                            }
                            4.10)数据库同步到redis
                            《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第60张
                            @Scheduled(cron = "0 */5 * * * ?")
                            @PostConstruct
                            public void reloadData() {
                                clearCache();
                                log.info("数据库数据同步到缓存");
                                Calendar calendar = Calendar.getInstance();
                                calendar.add(Calendar.MINUTE, 5);
                                //查看小于未来5分钟的所有任务
                                List allTasks = taskinfoMapper.selectList(Wrappers.lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));
                                if(allTasks != null && allTasks.size() > 0){
                                    for (Taskinfo taskinfo : allTasks) {
                                        Task task = new Task();
                                        BeanUtils.copyProperties(taskinfo,task);
                                        task.setExecuteTime(taskinfo.getExecuteTime().getTime());
                                        addTaskToCache(task);
                                    }
                                }
                            }
                            private void clearCache(){
                                // 删除缓存中未来数据集合和当前消费者队列的所有key
                                Set futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
                                Set topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
                                cacheService.delete(futurekeys);
                                cacheService.delete(topickeys);
                            }

                            5)延迟队列解决精准时间发布文章

                            5.1)延迟队列服务提供对外接口

                            提供远程的feign接口,在heima-leadnews-feign-api编写类如下:

                            package com.heima.apis.schedule;
                            import org.springframework.web.bind.annotation.RequestBody;
                            @FeignClient("leadnews-schedule")
                            public interface IScheduleClient {
                                /**
                                 * 添加任务
                                 * @param task   任务对象
                                 * @return       任务id
                                 */
                                @PostMapping("/api/v1/task/add")
                                public ResponseResult  addTask(@RequestBody Task task);
                                /**
                                 * 取消任务
                                 * @param taskId        任务id
                                 * @return              取消结果
                                 */
                                @GetMapping("/api/v1/task/cancel/{taskId}")
                                public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
                                /**
                                 * 按照类型和优先级来拉取任务
                                 * @param type
                                 * @param priority
                                 * @return
                                 */
                                @GetMapping("/api/v1/task/poll/{type}/{priority}")
                                public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority")  int priority);
                            }

                            在heima-leadnews-schedule微服务下提供对应的实现

                            package com.heima.schedule.feign;
                            import org.springframework.web.bind.annotation.*;
                            @RestController
                            public class ScheduleClient  implements IScheduleClient {
                                @Autowired
                                private TaskService taskService;
                                /**
                                 * 添加任务
                                 * @param task 任务对象
                                 * @return 任务id
                                 */
                                @PostMapping("/api/v1/task/add")
                                @Override
                                public ResponseResult addTask(@RequestBody Task task) {
                                    return ResponseResult.okResult(taskService.addTask(task));
                                }
                                /**
                                 * 取消任务
                                 * @param taskId 任务id
                                 * @return 取消结果
                                 */
                                @GetMapping("/api/v1/task/cancel/{taskId}")
                                @Override
                                public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
                                    return ResponseResult.okResult(taskService.cancelTask(taskId));
                                }
                                /**
                                 * 按照类型和优先级来拉取任务
                                 * @param type
                                 * @param priority
                                 * @return
                                 */
                                @GetMapping("/api/v1/task/poll/{type}/{priority}")
                                @Override
                                public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {
                                    return ResponseResult.okResult(taskService.poll(type,priority));
                                }
                            }
                            5.2)发布文章集成添加延迟队列接口

                            在创建WmNewsTaskService

                            package com.heima.wemedia.service;
                            import com.heima.model.wemedia.pojos.WmNews;
                            public interface WmNewsTaskService {
                                /**
                                 * 添加任务到延迟队列中
                                 * @param id  文章的id
                                 * @param publishTime  发布的时间  可以做为任务的执行时间
                                 */
                                public void addNewsToTask(Integer id, Date publishTime);
                            }

                            实现:

                            package com.heima.wemedia.service.impl;
                            import org.springframework.stereotype.Service;
                            @Service
                            @Slf4j
                            public class WmNewsTaskServiceImpl  implements WmNewsTaskService {
                                @Autowired
                                private IScheduleClient scheduleClient;
                                /**
                                 * 添加任务到延迟队列中
                                 * @param id          文章的id
                                 * @param publishTime 发布的时间  可以做为任务的执行时间
                                 */
                                @Override
                                @Async
                                public void addNewsToTask(Integer id, Date publishTime) {
                                    log.info("添加任务到延迟服务中----begin");
                                    Task task = new Task();
                                    task.setExecuteTime(publishTime.getTime());
                                    task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
                                    task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
                                    WmNews wmNews = new WmNews();
                                    wmNews.setId(id);
                                    task.setParameters(ProtostuffUtil.serialize(wmNews));
                                    scheduleClient.addTask(task);
                                    log.info("添加任务到延迟服务中----end");
                                }
                                
                            }

                            枚举类:

                            package com.heima.model.common.enums;
                            import lombok.AllArgsConstructor;
                            import lombok.Getter;
                            @Getter
                            @AllArgsConstructor
                            public enum TaskTypeEnum {
                                NEWS_SCAN_TIME(1001, 1,"文章定时审核"),
                                REMOTEERROR(1002, 2,"第三方接口调用失败,重试");
                                private final int taskType; //对应具体业务
                                private final int priority; //业务不同级别
                                private final String desc; //描述信息
                            }
                            序列化工具对比
                            • JdkSerialize:java内置的序列化能将实现了Serilazable接口的对象进行序列化和反序列化, ObjectOutputStream的writeObject()方法可序列化对象生成字节数组

                              • Protostuff:google开源的protostuff采用更为紧凑的二进制数组,表现更加优异,然后使用protostuff的编译工具生成pojo类

                                拷贝资料中的两个类到heima-leadnews-utils下

                                Protostuff需要引导依赖:

                                
                                    io.protostuff
                                    protostuff-core
                                    1.6.0
                                
                                
                                    io.protostuff
                                    protostuff-runtime
                                    1.6.0
                                

                                修改发布文章代码:

                                把之前的异步调用修改为调用延迟任务

                                @Autowired
                                private WmNewsTaskService wmNewsTaskService;
                                 
                                /**
                                     * 发布修改文章或保存为草稿
                                     * @param dto
                                     * @return
                                     */
                                @Override
                                public ResponseResult submitNews(WmNewsDto dto) {
                                    //0.条件判断
                                    if(dto == null || dto.getContent() == null){
                                        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
                                    }
                                    //1.保存或修改文章
                                    WmNews wmNews = new WmNews();
                                    //属性拷贝 属性名词和类型相同才能拷贝
                                    BeanUtils.copyProperties(dto,wmNews);
                                    //封面图片  list---> string
                                    if(dto.getImages() != null && dto.getImages().size() > 0){
                                        //[1dddfsd.jpg,sdlfjldk.jpg]-->   1dddfsd.jpg,sdlfjldk.jpg
                                        String imageStr = StringUtils.join(dto.getImages(), ",");
                                        wmNews.setImages(imageStr);
                                    }
                                    //如果当前封面类型为自动 -1
                                    if(dto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){
                                        wmNews.setType(null);
                                    }
                                    saveOrUpdateWmNews(wmNews);
                                    //2.判断是否为草稿  如果为草稿结束当前方法
                                    if(dto.getStatus().equals(WmNews.Status.NORMAL.getCode())){
                                        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
                                    }
                                    //3.不是草稿,保存文章内容图片与素材的关系
                                    //获取到文章内容中的图片信息
                                    List materials =  ectractUrlInfo(dto.getContent());
                                    saveRelativeInfoForContent(materials,wmNews.getId());
                                    //4.不是草稿,保存文章封面图片与素材的关系,如果当前布局是自动,需要匹配封面图片
                                    saveRelativeInfoForCover(dto,wmNews,materials);
                                    //审核文章
                                    //        wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
                                    wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNews.getPublishTime());
                                    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
                                }
                                5.3)消费任务进行审核文章

                                WmNewsTaskService中添加方法

                                /**
                                 * 消费延迟队列数据
                                 */
                                public void scanNewsByTask();

                                实现

                                @Autowired
                                private WmNewsAutoScanServiceImpl wmNewsAutoScanService;
                                /**
                                     * 消费延迟队列数据
                                     */
                                @Scheduled(fixedRate = 1000)
                                @Override
                                @SneakyThrows
                                public void scanNewsByTask() {
                                    log.info("文章审核---消费任务执行---begin---");
                                    ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
                                    if(responseResult.getCode().equals(200) && responseResult.getData() != null){
                                        String json_str = JSON.toJSONString(responseResult.getData());
                                        Task task = JSON.parseObject(json_str, Task.class);
                                        byte[] parameters = task.getParameters();
                                        WmNews wmNews = ProtostuffUtil.deserialize(parameters, WmNews.class);
                                        System.out.println(wmNews.getId()+"-----------");
                                        wmNewsAutoScanService.autoScanWmNews(wmNews.getId());
                                    }
                                    log.info("文章审核---消费任务执行---end---");
                                }

                                在WemediaApplication自媒体的引导类中添加开启任务调度注解@EnableScheduling


                                06kafka及异步通知文章上下架

                                《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第61张

                                1)自媒体文章上下架

                                需求分析

                                《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第62张 《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第63张

                                2)kafka概述

                                消息中间件对比

                                《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第64张
                                消息中间件对比-选择建议

                                消息中间件

                                建议

                                Kafka

                                追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务

                                RocketMQ

                                可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验

                                RabbitMQ

                                性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ

                                kafka介绍

                                Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。

                                kafka官网:Apache Kafka

                                《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第65张
                                kafka介绍-名词解释
                                《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第66张
                                • producer:发布消息的对象称之为主题生产者(Kafka topic producer)

                                  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)

                                    • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)

                                      • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

                                        3)kafka安装配置

                                        Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

                                        • Docker安装zookeeper

                                          下载镜像:

                                          docker pull zookeeper:3.4.14

                                          创建容器

                                          docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
                                          • Docker安装kafka

                                            下载镜像:

                                            docker pull wurstmeister/kafka:2.12-2.3.1

                                            创建容器

                                            docker run -d --name kafka \
                                            --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
                                            --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
                                            --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
                                            --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
                                            --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
                                            --net=host wurstmeister/kafka:2.12-2.3.1
                                            《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第67张

                                            云主机无法使用--net

                                            4)kafka入门

                                            《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第68张
                                            • 生产者发送消息,多个消费者只能有一个消费者接收到消息

                                              • 生产者发送消息,多个消费者都可以接收到消息

                                                (1)创建kafka-demo项目,导入依赖

                                                
                                                    org.apache.kafka
                                                    kafka-clients
                                                

                                                (2)生产者发送消息

                                                package com.heima.kafka.sample;
                                                import java.util.Properties;
                                                /**
                                                 * 生产者
                                                 */
                                                public class ProducerQuickStart {
                                                    public static void main(String[] args) {
                                                        //1.kafka的配置信息
                                                        Properties properties = new Properties();
                                                        //kafka的连接地址
                                                        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
                                                        //发送失败,失败的重试次数
                                                        properties.put(ProducerConfig.RETRIES_CONFIG,5);
                                                        //消息key的序列化器
                                                        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
                                                        //消息value的序列化器
                                                        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
                                                        //2.生产者对象
                                                        KafkaProducer producer = new KafkaProducer(properties);
                                                        //封装发送的消息
                                                        ProducerRecord record = new ProducerRecord("itheima-topic","100001","hello kafka");
                                                        //3.发送消息
                                                        producer.send(record);
                                                        //4.关闭消息通道,必须关闭,否则消息发送不成功
                                                        producer.close();
                                                    }
                                                }

                                                (3)消费者接收消息

                                                package com.heima.kafka.sample;
                                                import java.util.Properties;
                                                /**
                                                 * 消费者
                                                 */
                                                public class ConsumerQuickStart {
                                                    public static void main(String[] args) {
                                                        //1.添加kafka的配置信息
                                                        Properties properties = new Properties();
                                                        //kafka的连接地址
                                                        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
                                                        //消费者组
                                                        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
                                                        //消息的反序列化器
                                                        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                                                        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                                                        //2.消费者对象
                                                        KafkaConsumer consumer = new KafkaConsumer(properties);
                                                        //3.订阅主题
                                                        consumer.subscribe(Collections.singletonList("itheima-topic"));
                                                        //当前线程一直处于监听状态
                                                        while (true) {
                                                            //4.获取消息
                                                            ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(1000));
                                                            for (ConsumerRecord consumerRecord : consumerRecords) {
                                                                System.out.println(consumerRecord.key());
                                                                System.out.println(consumerRecord.value());
                                                            }
                                                        }
                                                    }
                                                }

                                                总结

                                                • 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)同一个组

                                                  • 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)多个组

                                                    分区机制—topic剖析

                                                    《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第69张 《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第70张 《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第71张

                                                    5)kafka高可用设计

                                                    5.1)集群
                                                    《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第72张
                                                    • Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成

                                                      • 这样如果集群中某一台机器宕机,其他机器上的 Broker 也依然能够对外提供服务。这其实就是 Kafka 提供高可用的手段之一

                                                        5.2)备份机制(Replication)
                                                        《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第73张

                                                        Kafka 中消息的备份又叫做 副本(Replica)

                                                        Kafka 定义了两类副本:

                                                        • 领导者副本(Leader Replica)

                                                          • 追随者副本(Follower Replica)

                                                            备份机制—同步方式

                                                            《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第74张

                                                            ISR(in-sync replica)需要同步复制保存的follower

                                                            如果leader失效后,需要选出新的leader,选举的原则如下:

                                                            第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的

                                                            第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取

                                                            极端情况,就是所有副本都失效了,这时有两种方案

                                                            第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定

                                                            第二:选择第一个活过来的Replication,不一定是ISR中的,选为leader,以最快速度恢复可用性,但数据不一定完整

                                                            6)kafka生产者详解

                                                            6.1)发送类型
                                                            • 同步发送

                                                              使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

                                                              RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
                                                              System.out.println(recordMetadata.offset());
                                                              • 异步发送

                                                                调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数

                                                                //异步消息发送
                                                                producer.send(kvProducerRecord, new Callback() {
                                                                    @Override
                                                                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                                                                        if(e != null){
                                                                            System.out.println("记录异常信息到日志表中");
                                                                        }
                                                                        System.out.println(recordMetadata.offset());
                                                                    }
                                                                });
                                                                6.2)参数详解
                                                                • ack确认机制
                                                                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第75张

                                                                  代码的配置方式:

                                                                  //ack配置  消息确认机制
                                                                  prop.put(ProducerConfig.ACKS_CONFIG,"all");

                                                                  参数的选择说明

                                                                  确认机制

                                                                  说明

                                                                  acks=0

                                                                  生产者在成功写入消息之前不会等待(不需要)任何来自服务器的响应,消息有丢失的风险,但是速度最快

                                                                  acks=1(默认值)

                                                                  只要集群Leader节点收到消息,生产者就会收到一个来自服务器的成功响应

                                                                  acks=all

                                                                  只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应

                                                                  • retries 重试次数
                                                                    《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第76张

                                                                    生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms

                                                                    代码中配置方式:

                                                                    //重试次数
                                                                    prop.put(ProducerConfig.RETRIES_CONFIG,10);
                                                                    • 消息压缩

                                                                      默认情况下, 消息发送时不会被压缩。

                                                                      代码中配置方式:

                                                                      //数据压缩
                                                                      prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");

                                                                      压缩算法

                                                                      说明

                                                                      snappy

                                                                      占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用

                                                                      lz4

                                                                      占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观

                                                                      gzip

                                                                      占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法

                                                                      使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。

                                                                      7)kafka消费者详解

                                                                      7.1)消费者组
                                                                      《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第77张
                                                                      • 消费者组(Consumer Group) :指的就是由一个或多个消费者组成的群体

                                                                        • 一个发布在Topic上消息被分发给此消费者组中的一个消费者

                                                                          • 所有的消费者都在一个组中,那么这就变成了queue模型 消息队列 一对一

                                                                            • 所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型 一对多消费者

                                                                              7.2)消息有序性

                                                                              应用场景:

                                                                              • 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致

                                                                                • 充值转账两个渠道在同一个时间进行余额变更,短信通知必须要有顺序

                                                                                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第78张

                                                                                  topic分区中消息只能由消费者组中的唯一一个消费者处理,所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序。 所以,如果你想要顺序的处理Topic的所有消息,那就只提供一个分区。

                                                                                  7.3)提交和偏移量

                                                                                  kafka不会像其他JMS队列那样需要得到消费者的确认,消费者可以使用kafka来追踪消息在分区的位置(偏移量)

                                                                                  消费者会往一个叫做_consumer_offset的特殊主题发送消息,消息里包含了每个分区的偏移量。如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡

                                                                                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第79张

                                                                                  正常的情况

                                                                                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第80张

                                                                                  如果消费者2挂掉以后,会发生再均衡,消费者2负责的分区会被其他消费者进行消费

                                                                                  再均衡后不可避免会出现一些问题

                                                                                  问题一:

                                                                                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第81张

                                                                                  如果提交偏移量2小于客户端处理的最后一个消息10的偏移量,那么处于两个偏移量之间的消息就会被重复处理。

                                                                                  问题二:

                                                                                  《黑马头条》 内容安全 自动审核 feign 延迟任务精准发布 kafka,第82张

                                                                                  如果提交的偏移量5大于客户端最后一个消息11的偏移量,那么处于两个偏移量之间的消息将会丢失。

                                                                                  如果想要解决这些问题,还要知道目前kafka提交偏移量的方式:

                                                                                  提交偏移量的方式有两种,分别是自动提交偏移量和手动提交

                                                                                  • 自动提交偏移量

                                                                                    当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去

                                                                                    • 手动提交 ,当enable.auto.commit被设置为false可以有以下三种提交方式

                                                                                      • 提交当前偏移量(同步提交)

                                                                                        • 异步提交

                                                                                          • 同步和异步组合提交

                                                                                            1.提交当前偏移量(同步提交)

                                                                                            把enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。

                                                                                            只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。

                                                                                            while (true){
                                                                                                ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
                                                                                                for (ConsumerRecord record : records) {
                                                                                                    System.out.println(record.value());
                                                                                                    System.out.println(record.key());
                                                                                                    try {
                                                                                                        consumer.commitSync();//同步提交当前最新的偏移量
                                                                                                    }catch (CommitFailedException e){
                                                                                                        System.out.println("记录提交失败的异常:"+e);
                                                                                                    }
                                                                                                }
                                                                                            }
                                                                                            2.异步提交

                                                                                            手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API :commitAsync()

                                                                                            while (true){
                                                                                                ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
                                                                                                for (ConsumerRecord record : records) {
                                                                                                    System.out.println(record.value());
                                                                                                    System.out.println(record.key());
                                                                                                }
                                                                                                consumer.commitAsync(new OffsetCommitCallback() {
                                                                                                    @Override
                                                                                                    public void onComplete(Map map, Exception e) {
                                                                                                        if(e!=null){
                                                                                                            System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
                                                                                                        }
                                                                                                    }
                                                                                                });
                                                                                            }
                                                                                            3.同步和异步组合提交

                                                                                            异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。

                                                                                            相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。

                                                                                            举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

                                                                                            try {
                                                                                                while (true){
                                                                                                    ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
                                                                                                    for (ConsumerRecord record : records) {
                                                                                                        System.out.println(record.value());
                                                                                                        System.out.println(record.key());
                                                                                                    }
                                                                                                    consumer.commitAsync();
                                                                                                }
                                                                                            }catch (Exception e){+
                                                                                                e.printStackTrace();
                                                                                                System.out.println("记录错误信息:"+e);
                                                                                            }finally {
                                                                                                try {
                                                                                                    consumer.commitSync();
                                                                                                }finally {
                                                                                                    consumer.close();
                                                                                                }
                                                                                            }

                                                                                            8)springboot集成kafka

                                                                                            8.1)入门

                                                                                            1.导入spring-kafka依赖信息

                                                                                            
                                                                                                
                                                                                                    org.springframework.boot
                                                                                                    spring-boot-starter-web
                                                                                                
                                                                                                
                                                                                                
                                                                                                    org.springframework.kafka
                                                                                                    spring-kafka
                                                                                                    
                                                                                                        
                                                                                                            org.apache.kafka
                                                                                                            kafka-clients
                                                                                                        
                                                                                                    
                                                                                                
                                                                                                
                                                                                                    org.apache.kafka
                                                                                                    kafka-clients
                                                                                                
                                                                                                
                                                                                                    com.alibaba
                                                                                                    fastjson
                                                                                                
                                                                                            

                                                                                            2.在resources下创建文件application.yml

                                                                                            server:
                                                                                              port: 9991
                                                                                            spring:
                                                                                              application:
                                                                                                name: kafka-demo
                                                                                              kafka:
                                                                                                bootstrap-servers: 192.168.200.130:9092
                                                                                                producer:
                                                                                                  retries: 10
                                                                                                  key-serializer: org.apache.kafka.common.serialization.StringSerializer
                                                                                                  value-serializer: org.apache.kafka.common.serialization.StringSerializer
                                                                                                consumer:
                                                                                                  group-id: ${spring.application.name}-test
                                                                                                  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
                                                                                                  value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

                                                                                            3.消息生产者

                                                                                            package com.heima.kafka.controller;
                                                                                            import org.springframework.web.bind.annotation.RestController;
                                                                                            @RestController
                                                                                            public class HelloController {
                                                                                                @Autowired
                                                                                                private KafkaTemplate kafkaTemplate;
                                                                                                @GetMapping("/hello")
                                                                                                public String hello(){
                                                                                                    kafkaTemplate.send("itcast-topic","黑马程序员");
                                                                                                    return "ok";
                                                                                                }
                                                                                            }

                                                                                            4.消息消费者

                                                                                            package com.heima.kafka.listener;
                                                                                            import org.springframework.kafka.annotation.KafkaListener;
                                                                                            import org.springframework.stereotype.Component;
                                                                                            import org.springframework.util.StringUtils;
                                                                                            @Component
                                                                                            public class HelloListener {
                                                                                                @KafkaListener(topics = "itcast-topic")
                                                                                                public void onMessage(String message){
                                                                                                    if(!StringUtils.isEmpty(message)){
                                                                                                        System.out.println(message);
                                                                                                    }
                                                                                                }
                                                                                            }
                                                                                            8.2)传递消息为对象

                                                                                            目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

                                                                                            方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

                                                                                            方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式