Cosmos DB大规模进行无服务器流式传输

最近,我与同事@Rajasa一起在流处理上做了大量工作,更具体地说,是关于Azure上Kappa和Lambda体系结构的可能实现。 Azure的优点在于,您可以完全自由地选择所需的技术,而您实际上并不仅限于一种或两种选择。 你真的有很多 。 现在的挑战是为您的业务案例选择正确的解决方案,对吗? 好吧,这就是软件架构师的目标! 因此,在这里,我们来看一下流媒体场景的一种可能解决方案,但请继续关注:将来还会有更多解决方案。

这篇文章很 。 那里有很多技术资料,如果您很着急,或者您已经知道所有技术资料,或者您现在不太在意技术资料,则可以忽略以下要点。

  1. 大规模的无服务器流传输是可能的,并且易于设置。 由于任何“大规模”方案都存在一些挑战,但是只要应用正确的配置就可以解决大多数问题
  2. 该示例代码将允许您设置一个流解决方案,每天可以在不到15分钟的时间内吸收近10亿条消息。 这就是为什么您现在应该在云和基础架构代码方面进行投资。 如果您已经这样做了,那就赞了。
  3. 如果您想节省一些钱,仍然需要良好的编码和优化技能。 否则要准备花大笔钱。
  4. 真正的挑战是弄清楚如何创建一个平衡的体系结构。 流式端到端解决方案中有很多可动部分,并且都需要仔细配置,否则您可能会一方面遇到瓶颈,而另一方面却遇到很多未使用的电源。 在这两种情况下,您都在赔钱。 平衡是关键
  5. 更新的天蓝色功能2.0提供了性能优势,如文章结尾的“ 2019年6月”更新段落所述。 请务必阅读!

如果您现在准备好一些技术知识,那就开始吧。

由于无服务器变得越来越普遍,并且越来越受欢迎,因此,我决定使用Azure功能作为流处理引擎并将Cosmos DB作为数据服务层来设置Kappa体系结构。 事件中心提供不可变日志支持。

100%无服务器数据流体系结构。 能行吗 如果是的话,到什么规模? 有什么陷阱? 和最佳做法? 让我们弄清楚:设置一切都很容易,但是在此过程中有些棘手的事情可能会使它变得比人们期望的要简单一些。 本文对您有所帮助。

要生成要处理的数据流,可以选择使用蝗虫。 这是一个功能强大但易于使用的用Python编写的负载测试工具。 Python也是您用来创建负载测试脚本的语言,Locust可以使用该语言来模拟所需的工作负载。 Python在今天非常流行(并且也非常强大,是我最喜欢的语言之一),所以让我们使用Locust。

我创建的脚本生成了一个JSON文档,该文档模拟了一个非常简单的IoT负载:

为了确保我拥有模拟甚至巨大工作量所需的全部带宽,我使用了Azure容器实例(感谢Noel对其进行自动化的帮助)来托管测试客户端。

Azure容器实例| 微软Azure

只需一个命令,即可在云中轻松运行应用程序容器。 Azure容器实例使您可以入门……

azure.microsoft.com

目前,即使Locust提供了一个主/客户端选项,允许您协调来自主服务器的许多客户端的工作,但由于主/从服务器有时无法按预期工作,因此我不得不诉诸创建多个独立的客户端。 不确定是蝗虫还是网络问题,但是有时客户端无法与主服务器正确通信,这使得管理测试环境变得非常困难。

幸运的是,可以使用AZ CLI和Bash轻松编写所有脚本,因此没什么大不了的。 我计划在将来修改此代码,也许要迁移到Kubernetes来处理整个负载测试环境。

为了开始使用该解决方案,我决定创建两个Locust实例,每个实例模拟500个用户,总负载略微超过1000条消息/秒,或者接近7万条消息/分钟,如度量中所示:

这不是一个很大的负担,但是在不消耗您所有信用卡预算的情况下开始就很棒。 当我对整个解决方案充满信心后,我便测试了每秒高达1万条消息的所有内容。 每天几乎有十亿条消息。 足够我的测试和许多用例。 而且,如果需要,您随时可以扩大规模。 在示例代码中,您可以在文章结尾处找到用于测试1K,5K和10K消息/秒的设置。 只是不要怪我的账单好不好?

事件中心是这里可能的选择之一(其他是IoT中心和事件网格),它实际上并不需要很多配置。 此处的主要选项是分区数和吞吐量单位。

Azure事件中心常见问题

了解Theoughput单位以及更多…

docs.microsoft.com

在我的初始示例中,我有两个测试客户端和1000 msg / sec的速度,我选择了具有2个吞吐量单位(2Mb / sec的接收速率,最大2000条消息/秒)和2个分区的Event Hubs Standard层。

分区是此处的关键,因为它们无法即时更改。 问题在于,提前知道您真正需要多少个分区也非常复杂,因为它们与您的客户端应用程序可以读取Event Hub的数据的速度密切相关。 因此,这只是一个反复试验的工作。

复杂但并非不可能。 这是我的朋友Shubha指出的一个有用的指示:分配的吞吐量单位提供的带宽分布在所有可用分区上。 因此,如果您分配2个吞吐量单位,并且有8个分区,则每个分区将仅处理1/8的负载。 这反过来意味着,如果您还有八个专用的应用程序或服务器实例来使用来自每个分区的数据,则这些实例可能会饿死,因为可能没有足够的数据来馈送它们并使它们保持繁忙。

吞吐量单位将分布在可用分区上。 确保您没有让消费者挨饿。

现在,让我重复一遍,分区的数量实际上取决于数据处理逻辑的复杂程度和速度。 如果您不清楚要使用哪个正确的数字,则可以从以下经验法则开始:创建等于您拥有或可能希望拥有的吞吐量单位数量的分区数量以后(因为一旦创建事件中心就无法更改分区号),然后测试和监视事件中心的吞吐量指标。 如果已处理消息的数量与传入消息保持一致,那就很好。 如果没有,请尝试优化和/或按比例扩大(是,按比例扩大。按比例扩大。您随时可以向外扩展)您的消费者应用程序。 如果您使用的是Azure Functions或Spark或容器化的应用程序,那么这很容易。 如果这样做不能解决问题,则您可能需要评估横向扩展选项,这通常更昂贵。

经验法则:为分区创建的数量等于您已经拥有或将来可能希望拥有的吞吐量单位数

与分区相反,吞吐量单位可以轻松扩展或缩小。 我决定使用两个,以便有一些空间可以移动,以防万一我需要恢复未处理的消息。 这是一个例子:

浅蓝色线表示正在接收的消息的吞吐量(每分钟),而深紫色表示已使用的消息。 如您所见,我重点介绍了一种情况,我需要重新启动流处理引擎,从而使消息累积在事件中心中。 重新启动处理引擎后,它会尝试赶上所累积的消息,因此它使用的带宽比通常情况下要多。 如果您不想预分配太多的吞吐量单位,则可以使用事件中心提供的“ 自动充气”选项。 但是,我建议分配比您需要的吞吐量单位的确切数量更多的资源,以便该解决方案可以快速响应更高的负载请求。 Auto-Inflate很棒,但是需要一些时间才能启动,如果您有兴趣将延迟保持在较低水平,那么有几个备用吞吐量单位可以立即使用会很有帮助。

您可能要评估的另一个选项是Capture 选项。 它允许将摄取的数据安全地存储到Blob存储中,以便以后可以使用某些批处理技术对其进行处理。 非常适合Lambda架构。 由于它可以将数据保存为Apache Avro格式,因此它也非常适合即时进行数据分析,因此您可以轻松地使用Apache Drill进行查询:

Azure Blob存储插件

由于存在与Hadoop兼容的层并且使Azure Blob存储成为可能,Drill与Azure Blob存储可以很好地配合使用…

drill.apache.org

当然,Apache Spark也是另一种选择,特别是如果您需要更通用的分布式计算平台来新建Data Lake。 在这种情况下,我建议使用Azure Databricks,它可以立即连接到Azure Blob存储:

Azure Blob存储– Databricks文档

无需任何其他设置即可从公共存储帐户读取数据。 要从私有存储中读取数据…

docs.databricks.com

通常,Apache Storm或Apache Spark或Azure Stream Analytics是流处理的常用选项。 但是,如果您不需要时间意识的功能(如“跳动”或“滚动窗口”),复杂的数据处理功能(如流联接,流的聚合等),则可以选择更轻量的解决方案。

如果选择此路径,则可以使用Docker,Kubernetes或Service Fabric创建和部署可以管理传入数据流的应用程序。 或者,您可以无服务器使用Azure功能。

由于事件中心触发了Azure函数的绑定,因此在准备好要处理一条消息或一批消息时,就很容易设置要调用的函数。 完整的文档可以在这里找到:

Azure功能的Azure事件中心绑定

了解如何在Azure Functions中使用Azure Event Hubs绑定。

docs.microsoft.com

还有一个Cosmos DB触发器绑定可用,您可能需要进行测试,因为它可以简化代码并通常表现良好:

功能1.x的Azure Cosmos DB绑定

了解如何在Azure Functions中使用Azure Cosmos DB触发器和绑定。

docs.microsoft.com

(我使用功能1.x进行了测试。如果您使用的是功能2.x,则链接为以下链接:https://docs.microsoft.com/zh-cn/azure/azure-functions/functions-bindings- cosmosdb-v2)

但是在某些情况下,您确实需要压缩任何性能,或者需要对写入数据库的方式进行更多控制,因此您可能需要手动使用Cosmos DB SDK。

好吧,如果是这种情况,请记住,直接使用Cosmos DB SDK时,使用Singleton模式以确保仅创建一个DocumentClient实例,然后由所有活动线程重用确实非常重要。

如果这不应该发生,那么您将被如下所示的异常所淹没:

 通常,每个套接字地址(协议/网络地址/端口)只能使用一种用法:xxx.xx.xxx.xxx:xxx] 

而且您将获得的表演将非常糟糕。

必须将Singleton模式应用于Cosmos DB DocumentClient对象才能获得出色的性能

因此,请确保将DocumentClient包装为Singleton模式。

在像Azure Function这样的繁重的多线程环境中,您确实要确保创建的DocumentClient的数量没有超出实际需要的数量(每个工作实例一个),因此,因为在同时,您需要确保单例实现是线程安全的。 由于Cosmos DB API大量使用了等待/异步模式,因此不能使用lock关键字来锁定共享资源并防止出现竞争情况。 幸运的是,如此处所述,SemaphoreSlim类可以为您提供帮助:

SemaphoreSlim类(System.Threading)

表示一种轻量级的替代方案,它限制了可以访问资源或…的池的线程数。

docs.microsoft.com

现在所有的手工工作真的值得吗? 让我们借助下图进行检查:

高亮显示“ A”显示在使用Cosmos DB绑定触发器时,Azure功能每秒提取和处理的消息量。 很好,但是在比初始方案更大规模的情况下,这次运行了8台服务器,它每秒无法处理足够的消息,无法跟上将消息推送到Event Hub的速度(您的淡蓝色线条可以在中间看到)。

当然,我可以将服务器数量增加到32,但是这会对我的费用产生重大影响。 除此之外,基于“事件中心”分区策略的“获取”部分中讨论的内容,由于分区数设置为16,它甚至可能无法正常工作。因此,下一步就是优化。

通过手动使用Cosmos SDK,在DocumentClient对象上实现线程安全的Singleton Pattern并并行批量写入所有文档,我可以将解决方案的性能提高50%,如突出显示的部分所示“ B”部分。

并行编写批处理中的所有文档几乎使性能翻了一番

这还不足以处理我在另一个测试中每秒发送的近6000条消息,因此我只是将Azure Function扩展到了更高的SKU,仅此而已。 放大仅需几秒钟的时间。 正如您从突出显示的“ C”部分中看到的那样,我终于能够拥有足够的计算能力来处理甚至更多实际发送的消息,这很棒,因为不仅我可以赶上等待处理的消息,但我也确信自己有足够的计算能力来处理和处理此时此刻可能发生的消息高峰。

为了找到最平衡的解决方案,我进行了几次性能测试,并将我已经完成的所有测试收集到一个图表中,该表格有助于很好地总结结果:

目标是每秒至少处理5500条消息(或每分钟330K条消息),并且您可以看到,使用优化的代码(图表中的“ Test1”),我能够使用P1v2 SKU做到这一点。 使用本地CosmosDB绑定,性能还不错,但是P1v2 SKU是不够的。 我使用P2v2达到了目标,该价格是P1v2 SKU的两倍。 设置使用8名工人的解决方案意味着每月相差超过1000 $! 对于简单的Singleton模式而言,这是一个巨大的节省,不是吗?

该图还显示,在该函数中实现了当前的数据处理逻辑后,拥有8个分区是拥有平衡解决方案的正确选择,因为增加分区不会带来任何好处。 相反,我将无缘无故地稀释可用带宽。

我可以轻而易举地就为什么仅通过使用更高的SKU和增加吞吐量单位的数量来获取处理数据方面要高于它,但是随后我还必须提高服务层的计算机功能。 现在让我们来谈谈。

在Cosmos DB上,创建了一个具有20000 RU / s(RU:资源单元)的集合,以处理1000次插入/秒,并且还允许有足够的RU / s来查询数据。

为什么是20000? 定义完所需的索引后,我进行了一些测试,以测量单个文档插入将使用多少RU。 我测量的值比6多一些。比方说7.如果我希望每秒能够插入1000个文档,并且需要1000 * 7 = 7000 RU。 简单。 由于我也想在某个时刻查询数据(在我的场景中,每个文档编写的文档也需要同时读取),因此我可以放心地假设自己可能需要14000 RU。 为其他查询添加一些空间,并且对我来说正确的起始RU值为20000 RU。 如果我错了,我总是可以稍后几乎实时地更改它。 这就是Cosmos DB的真正魅力。

请记住,20000 RU分布在您拥有的分区上,或多或少像事件中心一样。 这里最大的区别是您无法决定要获得多少个分区。

在Cosmos DB中,您无法决定要拥有多少个分区。 但是您的RU仍然会分布在可用分区中。

此处详细说明如何确定分区数:

Azure Cosmos DB中的分区和水平缩放

了解分区如何在Azure Cosmos DB中工作,如何配置分区和分区键以及如何…

docs.microsoft.com

但是可以肯定地说,有了20K RU,您几乎肯定会得到5个分区,每个分区能够支持4K RU。 这意味着您必须选择一个分区键,以便工作负载将平均分配,避免在特定分区上出现热点,否则将在20K RU限制之前以节流的方式发挥作用。

随着数据的增长,将自动添加更多分区,并将数据分配到新分区中。

在我的示例测试中,分区键设置为deviceId。 这意味着来自单个设备的所有度量都将发送到同一分区,并且数据将平均分布在所有可用分区上(因为测试客户端生成的数据分布均匀),确保我们不会达到每个分区10GB的限制我们必须牢记的分区。

根据您的情况,这可能是好事,也可能不好。 如果您主要对读取特定设备的数据感兴趣,那么该选择非常好,因为您仅命中一个分区,因此RU使用率(即成本)将是最低的,而性能则是最佳的。 另一方面,例如,如果您有兴趣了解最近一刻所有设备的状态,则最好使用时间作为分区键。

如果两者都需要怎么办? 答案是,在处理功能中,您可能需要处理数据,以便实际生成一个包含来自所有设备的合并数据的新文档,并将其存储到另一个集合中,以方便查询。 有时,此过程称为将数据“实例化”到新集合中。 用老式的(但仍然很可爱!)关系术语,这将被称为非规范化。 因此,就像关系数据库中发生的情况一样,当您对数据进行非规范化(或将处理后的数据具体化为新文档)时,请记住,保持数据逻辑上的一致性是您的责任! 因此,请始终仔细检查您的数据处理例程。

当我们将处理后的数据具体化为新文档时,我们有责任在逻辑上保持数据的一致性

没什么好说的:就像看起来一样,只要您决定手动使用Cosmos DB SDK,只要您记得正确实现了Singleton Pattern,一切都会很顺利。

我要掌握的最复杂的事情是事件中心分区和吞吐量单元,但是幸运的是,产品组帮助我正确地获取了它们。

Cosmos DB最好(在幕后)很复杂,我花了一些时间来正确地进行分区。 我已经习惯了Azure SQL / SQL Server世界中的分区概念-与Cosmos DB完全不同,但是却有着相同的词和想法-几天来,我确实很难理解它,但是在某些时候灯终于亮了。

希望本文将帮助您从一开始就正确解决问题。 使用代码将更加容易,对吧? 对。 因此,这里是:设置一切的代码,以便您可以自己尝试,可在GitHub的cosmos-db文件夹中找到。

yorek /大规模流式传输

如何在Azure中实施大规模流解决方案-yorek /大规模流

github.com

就像在这篇长篇文章开始时所说的(如果您仍然在阅读本文,还要特别感谢您!),如果您对流媒体场景感兴趣,请继续关注我的博客,因为我将与其他人做更多测试技术(Azure SQL,IoT中心,Azure Data Explorer…)在不久的将来。

我已经更新了示例以使用Azure Function 2.0,并且与它们一起使用时,本机绑定的性能运行良好,因此,仅出于性能目的,您不需要处理Cosmos DB连接并手动编写。

如果您希望从Cosmos DB获得更多信息,例如使用的RU,则仍然需要使用手动客户端管理。