首页 后端开发 Python教程 使用 MongoDB Change Streams 和 Python 进行实时数据处理

使用 MongoDB Change Streams 和 Python 进行实时数据处理

Sep 12, 2024 pm 04:15 PM

Real-Time Data Processing with MongoDB Change Streams and Python

介绍

MongoDB 中的更改流允许您的应用程序立即对实时数据更改做出反应。在这篇博文中,我将向您展示如何使用 Python 设置和使用变更流,而无需深入研究理论。我们将创建一个简单的程序来监听数据库事件,首先关注插入,然后将其扩展到其他事件类型。

变革流入门

更改流让您的应用程序能够侦听特定的数据库事件,例如插入或更新,并立即响应。想象一下用户更新其个人资料的场景;通过更改流,您可以立即在应用程序中反映此更改,而无需用户刷新页面。在此功能之前,您必须不断轮询数据库或使用复杂的方法,例如跟踪 MongoDB Oplog。变更流通过提供更加用户友好的 API 来简化这一过程。

如果没有变更流会发生什么

假设我有一个上传发票的 API。流程是客户将发票图像上传到 MongoDB,然后我们使用 AI 提取信息并更新发票。以下是上传发票的代码示例:

from pymongo import MongoClient

class MongoDatabase:
    def __init__(self, config_path: str):
        # Load the YAML configuration file using the provided utility function
        self.config_path = config_path
        self.config = read_config(path=self.config_path)

        # Initialize MongoDB connection
        self.client = MongoClient(self.config['mongodb']['uri'])
        self.db = self.client[self.config['mongodb']['database']]
        self.collection = self.db[self.config['mongodb']['collection']]

    def create_document(self, data: Dict[str, Any]) -> str:
        # Insert a new document and return the automatically generated document ID
        result = self.collection.insert_one(data)
        return str(result.inserted_id)
    def update_document_by_id(self, document_id: str, data: Dict[str, Any]):
        try:
            self.collection.update_one({"_id": document_id}, {"$set": data})
        except PyMongoError as e:
            print(f"Error updating document: {e}")
登录后复制

首先,我将把 pymongo 包装在一个类中,以防万一:))

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        # Generate invoice UUID
        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "last_modified_at": None,
            "last_modified_by": None,
            "status": "not extracted",
            "invoice_image_base64": img,
            "invoice_info": {}
        }

        invoice_uuid = mongo_db.create_document(invoice_document)
        print('Result saved to MongoDB:', invoice_uuid)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"invoice_uuid": invoice_uuid, "message": "Upload successful"}
        )
    except Exception as e:
        # Handle errors
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )
登录后复制

一个合理的问题可能是:为什么不等到 AI 模型处理完图像后再更新?问题是处理时间大约需要 4-5 分钟,我们不想影响用户体验。

卡夫卡怎么样?

另一种选择是使用 Kafka。我们可以将图像发布到 Kafka 主题,然后另一个服务将处理数据。

优点:

  • 解耦上传和处理服务。
  • 高效的大规模实时数据处理。
  • 改进的用户体验:用户上传发票后立即得到回复。处理是异步处理的。

缺点:

  • 引入了额外的复杂性。
  • 需要设置和维护 Kafka 基础设施。
  • 对于小型应用程序来说可能有点过分了。

这是一个基本实现,演示如何使用 Kafka 处理发票上传过程。

用户通过 API 端点上传发票。发票图像保存在 MongoDB 中,并向 Kafka 主题发送一条消息以进行进一步处理。

from kafka import KafkaProducer
import json
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from datetime import datetime, timezone

app = FastAPI()

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        body = await request.json()
        img = body.get("img")
        user_uuid = body.get("user_uuid")

        if not img or not is_base64(img):
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content={"status": "error", "message": "Base64 image is required"},
            )

        current_time = datetime.now(timezone.utc)
        img = valid_base64_image(img)       
        invoice_document = {
            "invoice_type": None,
            "created_at": current_time,
            "created_by": user_uuid,
            "status": "not extracted",
            "invoice_image_base64": img,
        }

        # Save the document to MongoDB
        invoice_uuid = mongo_db.create_document(invoice_document)
        mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid})

        # Send a message to Kafka topic
        producer.send('invoice_topic', invoice_document)
        producer.flush()

        return JSONResponse(
            status_code=status.HTTP_201_CREATED,
            content={"message": "Invoice upload received and will be processed"}
        )
    except Exception as e:
        return JSONResponse(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            content={"status": "error", "message": str(e)}
        )
登录后复制

Kafka消费者监听invoice_topic。当它收到消息时,它会处理发票(例如,从图像中提取信息)并更新 MongoDB 中的相应文档。

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'invoice_topic',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    invoice_document = message.value

    # Process the invoice, extract information, and update the document in MongoDB
    invoice_uuid = invoice_document["_id"]
    extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])

    mongo_db.update_document_by_id(invoice_uuid, {
        "invoice_info": extracted_data, 
        "status": "extracted"
    })

    print(f"Processed and updated invoice: {invoice_uuid}")
登录后复制

流程摘要:

  1. 上传发票:用户通过API上传发票。
  2. 保存到 MongoDB:发票文档保存在 MongoDB 中。
  3. 发送消息到 Kafka: 包含发票详细信息的消息被发送到 Kafka 主题 (invoice_topic)。
  4. Kafka 消费者处理发票: Kafka 消费者监听发票主题,处理发票,并使用提取的信息更新 MongoDB 中的相应文档。

哇,我不敢相信我自己写了这个!它确实凸显了所涉及的努力。这甚至没有考虑管理和配置三个服务的复杂性:MongoDB、Kafka 和 Invoice 服务。

使用 MongoDB Change Streams 进行发票处理

这是用 Markdown 重写的完整代码,用于演示 MongoDB 变更流,包括用于处理由变更流触发的发票处理的其他方法和函数。

我们将首先创建一个 MongoDB 包装类来处理数据库操作,例如创建文档和监听更改流。

from pymongo import MongoClient
from pymongo.errors import PyMongoError
from typing import Dict, Any
import threading
import yaml

class MongoDatabase:
    # Same code as before #

    def process_invoice(self, invoice_document: Dict[str, Any]):
        """Process the invoice by extracting data and updating the document in MongoDB."""
        try:
            # Simulate extracting information from the invoice image
            extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"])
            invoice_uuid = invoice_document["_id"]

            # Update the invoice document with the extracted data
            self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"})
            print(f"Processed and updated invoice: {invoice_uuid}")
        except Exception as e:
            print(f"Error processing invoice: {str(e)}")

    def start_change_stream_listener(self):
        """Start listening to the change stream for the collection."""
        def listen():
            try:
                with self.collection.watch() as stream:
                    for change in stream:
                        if change['operationType'] == 'insert':
                            invoice_document = change['fullDocument']
                            print(f"New invoice detected: {invoice_document['_id']}")
                            self.process_invoice(invoice_document)
            except PyMongoError as e:
                print(f"Change stream error: {str(e)}")

        # Start the change stream listener in a separate thread
        listener_thread = threading.Thread(target=listen, daemon=True)
        listener_thread.start()
登录后复制

为了方便起见,我在 MongoDatabase 类中添加了 process_invoice。但你应该把它留在其他地方

上传API应该和原来的一样。

mongo_db = MongoDatabase(config_path='path_to_your_config.yaml')
mongo_db.start_change_stream_listener()

@app.post("/api/v1/invoices/upload")
async def upload_invoice(request: Request):
    try:
        # Parse JSON body
        body = await request.json() 
        # same code as before
登录后复制

流程摘要:

  1. 用户上传发票:用户通过API上传发票。
  2. 保存到 MongoDB:发票文档保存在 MongoDB 中。
  3. 触发 MongoDB 更改流: MongoDB 更改流检测到新文档的插入。
  4. 发票处理: 变更流触发 process_invoice 函数,该函数处理发票并使用提取的信息更新 MongoDB 中的文档。

结论

借助 MongoDB 更改流,您可以高效地处理数据库中的实时更改。扩展此示例,您可以处理各种事件,例如更新和删除,使您的应用程序更具反应性和响应能力。

参考:

  • https://www.mongodb.com/developer/languages/python/python-change-streams/#listen-to-inserts-from-an-application

以上是使用 MongoDB Change Streams 和 Python 进行实时数据处理的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

Video Face Swap

Video Face Swap

使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

热门话题

Java教程
1655
14
CakePHP 教程
1413
52
Laravel 教程
1306
25
PHP教程
1252
29
C# 教程
1226
24
Python vs.C:申请和用例 Python vs.C:申请和用例 Apr 12, 2025 am 12:01 AM

Python适合数据科学、Web开发和自动化任务,而C 适用于系统编程、游戏开发和嵌入式系统。 Python以简洁和强大的生态系统着称,C 则以高性能和底层控制能力闻名。

您可以在2小时内学到多少python? 您可以在2小时内学到多少python? Apr 09, 2025 pm 04:33 PM

两小时内可以学到Python的基础知识。1.学习变量和数据类型,2.掌握控制结构如if语句和循环,3.了解函数的定义和使用。这些将帮助你开始编写简单的Python程序。

Python:游戏,Guis等 Python:游戏,Guis等 Apr 13, 2025 am 12:14 AM

Python在游戏和GUI开发中表现出色。1)游戏开发使用Pygame,提供绘图、音频等功能,适合创建2D游戏。2)GUI开发可选择Tkinter或PyQt,Tkinter简单易用,PyQt功能丰富,适合专业开发。

2小时的Python计划:一种现实的方法 2小时的Python计划:一种现实的方法 Apr 11, 2025 am 12:04 AM

2小时内可以学会Python的基本编程概念和技能。1.学习变量和数据类型,2.掌握控制流(条件语句和循环),3.理解函数的定义和使用,4.通过简单示例和代码片段快速上手Python编程。

Python与C:学习曲线和易用性 Python与C:学习曲线和易用性 Apr 19, 2025 am 12:20 AM

Python更易学且易用,C 则更强大但复杂。1.Python语法简洁,适合初学者,动态类型和自动内存管理使其易用,但可能导致运行时错误。2.C 提供低级控制和高级特性,适合高性能应用,但学习门槛高,需手动管理内存和类型安全。

Python:探索其主要应用程序 Python:探索其主要应用程序 Apr 10, 2025 am 09:41 AM

Python在web开发、数据科学、机器学习、自动化和脚本编写等领域有广泛应用。1)在web开发中,Django和Flask框架简化了开发过程。2)数据科学和机器学习领域,NumPy、Pandas、Scikit-learn和TensorFlow库提供了强大支持。3)自动化和脚本编写方面,Python适用于自动化测试和系统管理等任务。

Python和时间:充分利用您的学习时间 Python和时间:充分利用您的学习时间 Apr 14, 2025 am 12:02 AM

要在有限的时间内最大化学习Python的效率,可以使用Python的datetime、time和schedule模块。1.datetime模块用于记录和规划学习时间。2.time模块帮助设置学习和休息时间。3.schedule模块自动化安排每周学习任务。

Python:自动化,脚本和任务管理 Python:自动化,脚本和任务管理 Apr 16, 2025 am 12:14 AM

Python在自动化、脚本编写和任务管理中表现出色。1)自动化:通过标准库如os、shutil实现文件备份。2)脚本编写:使用psutil库监控系统资源。3)任务管理:利用schedule库调度任务。Python的易用性和丰富库支持使其在这些领域中成为首选工具。

See all articles