SingleStore 提供了变更数据捕获 (CDC) 解决方案,可将数据从 MongoDB 流式传输到 SingleStore Kai。在本文中,我们将了解如何将 Apache Kafka 代理连接到 MongoDB Atlas,然后使用 CDC 解决方案将数据从 MongoDB Atlas 流式传输到 SingleStore Kai。我们还将使用 Metabase 为 SingleStore Kai 创建一个简单的分析仪表板。
介绍
CDC 是一种跟踪数据库或系统中发生的更改的方法。SingleStore 现在提供了与 MongoDB 配合使用的 CDC 解决方案。
为了演示 CDC 解决方案,我们将使用Kafka 代理将数据流式传输到 MongoDB Atlas 集群,然后使用 CDC 管道将数据从 MongoDB Atlas 传播到 SingleStore Kai。我们还将使用 Metabase 创建一个简单的分析仪表板。
图 1 显示了我们系统的高级架构。
图 1. 高级架构(来源:SingleStore)。
我们将在以后的文章中重点介绍使用 CDC 解决方案的其他场景。
MongoDB Atlas
我们将在 M0 沙箱中使用 MongoDB Atlas。我们将在Database Access下配置具有atlasAdmin权限的管理员用户。我们将暂时允许从网络访问下的任何地方(IP 地址 0.0.0.0/0)进行访问。我们将记下用户名、密码和主机。
Apache Kafka
我们将配置 Kafka 代理将数据流式传输到MongoDB Atlas中。我们将使用 Jupyter Notebook 来实现此目的。
首先,我们将安装一些库:
!pip install pymongo kafka-python --quiet
接下来,我们将连接到 MongoDB Atlas 和 Kafka 代理:
from kafka import KafkaConsumer from pymongo import MongoClient try: client = MongoClient("mongodb+srv://: @ /?retryWrites=true&w=majority") db = client.adtech print("Connected successfully") except: print("Could not connect") consumer = KafkaConsumer( "ad_events", bootstrap_servers = ["public-kafka.memcompute.com:9092"]
我们将用我们之前从 MongoDB Atlas 保存的值替换
最初,我们将 100 条记录加载到 MongoDB Atlas 中,如下所示:
MAX_ITERATIONS = 100 for iteration, message in enumerate(consumer, start = 1): if iteration > MAX_ITERATIONS: break try: record = message.value.decode("utf-8") user_id, event_name, advertiser, campaign, gender, income, page_url, region, country = map(str.strip, record.split("\t")) events_record = { "user_id": int(user_id), "event_name": event_name, "advertiser": advertiser, "campaign": int(campaign.split()[0]), "gender": gender, "income": income, "page_url": page_url, "region": region, "country": country } db.events.insert_one(events_record) except Exception as e: print(f"Iteration {iteration}: Could not insert data - {str(e)}")
数据应该成功加载,我们应该看到一个名为 的数据库,adtech其中包含一个名为 的集合events。集合中的文档在结构上应类似于以下示例:
_id: ObjectId('64ec906d0e8c0f7bcf72a8ed') user_id: 3857963415 event_name: "Impression" advertiser: "Sherwin-Williams" campaign: 13 gender: "Female" income: "25k and below", page_url: "/2013/02/how-to-make-glitter-valentines-heart-boxes.html/" region: "Michigan" country: "US"这些文档代表广告活动事件。该events集合存储 的详细信息advertiser以及campaign有关用户的各种人口统计信息,例如gender和income。
SingleStore Kai
上一篇文章介绍了创建免费 SingleStoreDB 云帐户的步骤。我们将使用以下设置:
- 工作区组名称: CDC 演示组
- 云提供商: AWS
- 区域:美国东部 1(弗吉尼亚北部)
- 工作区名称: cdc-demo
- 尺码: S-00
- 设置:
- SingleStore Kai 选择
一旦工作区可用,我们将记下密码和主机。该主机可从CDC Demo Group > Overview > Workspaces > cdc-demo > Connect > Connect Directly > SQL IDE > Host获取。稍后我们将需要元数据库的此信息。我们还将通过在CDC 演示组 > 防火墙下配置防火墙来暂时允许从任何地方进行访问。
从左侧导航窗格中,我们选择DEVELOP > SQL Editor来创建adtech数据库link,如下所示:
CREATE DATABASE IF NOT EXISTS adtech; USE adtech; DROP LINK adtech.link; CREATE LINK adtech.link AS MONGODB CONFIG '{"mongodb.hosts": "
我们将用我们之前从 MongoDB Atlas 保存的值替换:27017, :27017, :27017", "collection.include.list": "adtech.*", "mongodb.ssl.enabled": "true", "mongodb.authsource": "admin", "mongodb.members.auto.discover": "false"}' CREDENTIALS '{"mongodb.user": " ", "mongodb.password": " "}'; CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK adtech.link '*' FORMAT AVRO; 和。 我们还需要将 、 和的值替换 为 MongoDB Atlas 中每个值的完整地址。 我们现在将检查是否有任何表,如下所示:
SHOW TABLES;
这应该显示一张名为events:
+------------------+ | Tables_in_adtech | +------------------+ | events | +------------------+
我们将检查表的结构:
DESCRIBE events;
输出应如下所示:
+-------+------+------+------+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------+------+------+------+---------+-------+ | _id | text | NO | UNI | NULL | | | _more | JSON | NO | | NULL | | +-------+------+------+------+---------+-------+
接下来,我们将检查是否有pipelines:
SHOW PIPELINES;
这将显示events当前调用的一个管道Stopped:
+---------------------+---------+-----------+ | Pipelines_in_adtech | State | Scheduled | +---------------------+---------+-----------+ | events | Stopped | False | +---------------------+---------+-----------+
现在我们将启动events管道:
START ALL PIPELINES;
并且状态应更改为Running:
+---------------------+---------+-----------+ | Pipelines_in_adtech | State | Scheduled | +---------------------+---------+-----------+ | events | Running | False | +---------------------+---------+-----------+
如果我们现在运行以下命令:
SELECT COUNT(*) FROM events;
它应该返回 100 作为结果:
+----------+ | COUNT(*) | +----------+ | 100 | +----------+
我们将检查表中的一行events,如下所示:
SELECT * FROM events LIMIT 1;
输出应类似于以下内容:
+--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | _id | _more | +--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | {"$oid": "64ec906d0e8c0f7bcf72a8f7"} | {"_id":{"$oid":"64ec906d0e8c0f7bcf72a8f7"},"advertiser":"Wendys","campaign":13,"country":"US","event_name":"Click","gender":"Female","income":"75k - 99k","page_url":"/2014/05/flamingo-pop-bridal-shower-collab-with.html","region":"New Mexico","user_id":3857963416} | +--------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
CDC 解决方案已成功连接到 MongoDB Atlas 并将所有 100 条记录复制到 SingleStore Kai。
现在让我们使用 Metabase 创建一个仪表板。
元数据库
上一篇文章描述了如何安装、配置和创建元数据库连接的详细信息。我们将使用前一篇文章中使用的查询的细微变化来创建可视化。
1. 活动总数
SELECT COUNT(*) FROM events;
2. 各地区活动
SELECT _more::country AS `events.country`, COUNT(_more::country) AS 'events.countofevents' FROM adtech.events AS events GROUP BY 1;
3. Top 5 广告商活动
SELECT _more::advertiser AS `events.advertiser`, COUNT(*) AS `events.count` FROM adtech.events AS events WHERE (_more::advertiser LIKE '%Subway%' OR _more::advertiser LIKE '%McDonals%' OR _more::advertiser LIKE '%Starbucks%' OR _more::advertiser LIKE '%Dollar General%' OR _more::advertiser LIKE '%YUM! Brands%' OR _more::advertiser LIKE '%Dunkin Brands Group%') GROUP BY 1 ORDER BY `events.count` DESC;
4. 按性别和收入划分的广告访问者
SELECT * FROM (SELECT *, DENSE_RANK() OVER (ORDER BY xx.z___min_rank) AS z___pivot_row_rank, RANK() OVER (PARTITION BY xx.z__pivot_col_rank ORDER BY xx.z___min_rank) AS z__pivot_col_ordering, CASE WHEN xx.z___min_rank = xx.z___rank THEN 1 ELSE 0 END AS z__is_highest_ranked_cell FROM (SELECT *, Min(aa.z___rank) OVER (PARTITION BY aa.`events.income`) AS z___min_rank FROM (SELECT *, RANK() OVER (ORDER BY CASE WHEN bb.z__pivot_col_rank = 1 THEN (CASE WHEN bb.`events.count` IS NOT NULL THEN 0 ELSE 1 END) ELSE 2 END, CASE WHEN bb.z__pivot_col_rank = 1 THEN bb.`events.count` ELSE NULL END DESC, bb.`events.count` DESC, bb.z__pivot_col_rank, bb.`events.income`) AS z___rank FROM (SELECT *, DENSE_RANK() OVER (ORDER BY CASE WHEN ww.`events.gender` IS NULL THEN 1 ELSE 0 END, ww.`events.gender`) AS z__pivot_col_rank FROM (SELECT _more::gender AS `events.gender`, _more::income AS `events.income`, COUNT(*) AS `events.count` FROM adtech.events AS events WHERE (_more::income <> 'unknown' OR _more::income IS NULL) GROUP BY 1, 2) ww) bb WHERE bb.z__pivot_col_rank <= 16384) aa) xx) zz WHERE (zz.z__pivot_col_rank <= 50 OR zz.z__is_highest_ranked_cell = 1) AND (zz.z___pivot_row_rank <= 500 OR zz.z__pivot_col_ordering = 1) ORDER BY zz.z___pivot_row_rank;
图 2 显示了 AdTech 仪表板上图表大小和位置的示例。我们将自动刷新选项设置为 1 分钟。
图 2.最终仪表板。
如果我们通过更改 使用 Jupyter Notebook 将更多数据加载到 MongoDB Atlas 中 MAX_ITERATIONS,我们将看到数据传播到 SingleStore Kai 以及 AdTech 仪表板中反映的新数据。
总结
在本文中,我们创建了一个 CDC 管道,以使用 SingleStore Kai 增强 MongoDB Atlas。正如多个基准测试所强调的那样,SingleStore Kai 因其卓越的性能而可用于分析。我们还使用 Metabase 创建了一个快速的可视化仪表板,以帮助我们深入了解我们的广告活动。
作者:Akmal Chaudhri
更多技术干货请关注公号【云原生数据库】
squids.cn,云数据库RDS,迁移工具DBMotion,云备份DBTwin等数据库生态工具。
irds.cn,多数据库管理平台(私有云)。
猜你喜欢
- 18小时前JavaMySql+hadoop高校固定资产管理系统 74965(免费领源码)计算机毕业设计选题推荐上万套实战教程JAVA、PHP,node.js,C++、python等
- 18小时前怎样查看kafka写数据送到topic是否成功
- 18小时前基于STM32的四旋翼无人机项目(二):MPU6050姿态解算(含上位机3D姿态显示教学)
- 15小时前跑步机维修(武汉跑步机维修)
- 12小时前时光不负有心人(星光照亮赶路人,时光不负有心人)
- 12小时前辣椒种子催芽方法(辣椒种子催芽方法图解)
- 12小时前互联网理财(互联网理财平台排名)
- 10小时前鸿蒙系统什么时候能用的简单介绍
- 3小时前厦门自贸区蚕丝被真假(厦门自贸区蚕丝被好吗)
- 3小时前dnfcc是什么意思(dnfc是什么职业)
网友评论
- 搜索
- 最新文章
- 热门文章