使用 MongoDB Change Streams 和 Python 进行实时数据处理
介绍
MongoDB 中的更改流允许您的应用程序立即对实时数据更改做出反应。在这篇博文中,我将向您展示如何使用 Python 设置和使用变更流,而无需深入研究理论。我们将创建一个简单的程序来监听数据库事件,首先关注插入,然后将其扩展到其他事件类型。
变革流入门
更改流让您的应用程序能够侦听特定的数据库事件,例如插入或更新,并立即响应。想象一下用户更新其个人资料的场景;通过更改流,您可以立即在应用程序中反映此更改,而无需用户刷新页面。在此功能之前,您必须不断轮询数据库或使用复杂的方法,例如跟踪 MongoDB Oplog。变更流通过提供更加用户友好的 API 来简化这一过程。
如果没有变更流会发生什么
假设我有一个上传发票的 API。流程是客户将发票图像上传到 MongoDB,然后我们使用 AI 提取信息并更新发票。以下是上传发票的代码示例:
from pymongo import MongoClient class MongoDatabase: def __init__(self, config_path: str): # Load the YAML configuration file using the provided utility function self.config_path = config_path self.config = read_config(path=self.config_path) # Initialize MongoDB connection self.client = MongoClient(self.config['mongodb']['uri']) self.db = self.client[self.config['mongodb']['database']] self.collection = self.db[self.config['mongodb']['collection']] def create_document(self, data: Dict[str, Any]) -> str: # Insert a new document and return the automatically generated document ID result = self.collection.insert_one(data) return str(result.inserted_id) def update_document_by_id(self, document_id: str, data: Dict[str, Any]): try: self.collection.update_one({"_id": document_id}, {"$set": data}) except PyMongoError as e: print(f"Error updating document: {e}")
首先,我将把 pymongo 包装在一个类中,以防万一:))
@app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) # Generate invoice UUID current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "last_modified_at": None, "last_modified_by": None, "status": "not extracted", "invoice_image_base64": img, "invoice_info": {} } invoice_uuid = mongo_db.create_document(invoice_document) print('Result saved to MongoDB:', invoice_uuid) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) return JSONResponse( status_code=status.HTTP_201_CREATED, content={"invoice_uuid": invoice_uuid, "message": "Upload successful"} ) except Exception as e: # Handle errors return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
一个合理的问题可能是:为什么不等到 AI 模型处理完图像后再更新?问题是处理时间大约需要 4-5 分钟,我们不想影响用户体验。
卡夫卡怎么样?
另一种选择是使用 Kafka。我们可以将图像发布到 Kafka 主题,然后另一个服务将处理数据。
优点:
- 解耦上传和处理服务。
- 高效的大规模实时数据处理。
- 改进的用户体验:用户上传发票后立即得到回复。处理是异步处理的。
缺点:
- 引入了额外的复杂性。
- 需要设置和维护 Kafka 基础设施。
- 对于小型应用程序来说可能有点过分了。
这是一个基本实现,演示如何使用 Kafka 处理发票上传过程。
用户通过 API 端点上传发票。发票图像保存在 MongoDB 中,并向 Kafka 主题发送一条消息以进行进一步处理。
from kafka import KafkaProducer import json from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse from datetime import datetime, timezone app = FastAPI() producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "status": "not extracted", "invoice_image_base64": img, } # Save the document to MongoDB invoice_uuid = mongo_db.create_document(invoice_document) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) # Send a message to Kafka topic producer.send('invoice_topic', invoice_document) producer.flush() return JSONResponse( status_code=status.HTTP_201_CREATED, content={"message": "Invoice upload received and will be processed"} ) except Exception as e: return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
Kafka消费者监听invoice_topic。当它收到消息时,它会处理发票(例如,从图像中提取信息)并更新 MongoDB 中的相应文档。
from kafka import KafkaConsumer import json consumer = KafkaConsumer( 'invoice_topic', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) for message in consumer: invoice_document = message.value # Process the invoice, extract information, and update the document in MongoDB invoice_uuid = invoice_document["_id"] extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) mongo_db.update_document_by_id(invoice_uuid, { "invoice_info": extracted_data, "status": "extracted" }) print(f"Processed and updated invoice: {invoice_uuid}")
流程摘要:
- 上传发票:用户通过API上传发票。
- 保存到 MongoDB:发票文档保存在 MongoDB 中。
- 发送消息到 Kafka: 包含发票详细信息的消息被发送到 Kafka 主题 (invoice_topic)。
- Kafka 消费者处理发票: Kafka 消费者监听发票主题,处理发票,并使用提取的信息更新 MongoDB 中的相应文档。
哇,我不敢相信我自己写了这个!它确实凸显了所涉及的努力。这甚至没有考虑管理和配置三个服务的复杂性:MongoDB、Kafka 和 Invoice 服务。
使用 MongoDB Change Streams 进行发票处理
这是用 Markdown 重写的完整代码,用于演示 MongoDB 变更流,包括用于处理由变更流触发的发票处理的其他方法和函数。
我们将首先创建一个 MongoDB 包装类来处理数据库操作,例如创建文档和监听更改流。
from pymongo import MongoClient from pymongo.errors import PyMongoError from typing import Dict, Any import threading import yaml class MongoDatabase: # Same code as before # def process_invoice(self, invoice_document: Dict[str, Any]): """Process the invoice by extracting data and updating the document in MongoDB.""" try: # Simulate extracting information from the invoice image extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) invoice_uuid = invoice_document["_id"] # Update the invoice document with the extracted data self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"}) print(f"Processed and updated invoice: {invoice_uuid}") except Exception as e: print(f"Error processing invoice: {str(e)}") def start_change_stream_listener(self): """Start listening to the change stream for the collection.""" def listen(): try: with self.collection.watch() as stream: for change in stream: if change['operationType'] == 'insert': invoice_document = change['fullDocument'] print(f"New invoice detected: {invoice_document['_id']}") self.process_invoice(invoice_document) except PyMongoError as e: print(f"Change stream error: {str(e)}") # Start the change stream listener in a separate thread listener_thread = threading.Thread(target=listen, daemon=True) listener_thread.start()
为了方便起见,我在 MongoDatabase 类中添加了 process_invoice。但你应该把它留在其他地方
上传API应该和原来的一样。
mongo_db = MongoDatabase(config_path='path_to_your_config.yaml') mongo_db.start_change_stream_listener() @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() # same code as before
流程摘要:
- 用户上传发票:用户通过API上传发票。
- 保存到 MongoDB:发票文档保存在 MongoDB 中。
- 触发 MongoDB 更改流: MongoDB 更改流检测到新文档的插入。
- 发票处理: 变更流触发 process_invoice 函数,该函数处理发票并使用提取的信息更新 MongoDB 中的文档。
结论
借助 MongoDB 更改流,您可以高效地处理数据库中的实时更改。扩展此示例,您可以处理各种事件,例如更新和删除,使您的应用程序更具反应性和响应能力。
参考:
- https://www.mongodb.com/developer/languages/python/python-change-streams/#listen-to-inserts-from-an-application
以上是使用 MongoDB Change Streams 和 Python 进行实时数据处理的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

Python适合数据科学、Web开发和自动化任务,而C 适用于系统编程、游戏开发和嵌入式系统。 Python以简洁和强大的生态系统着称,C 则以高性能和底层控制能力闻名。

两小时内可以学到Python的基础知识。1.学习变量和数据类型,2.掌握控制结构如if语句和循环,3.了解函数的定义和使用。这些将帮助你开始编写简单的Python程序。

Python在游戏和GUI开发中表现出色。1)游戏开发使用Pygame,提供绘图、音频等功能,适合创建2D游戏。2)GUI开发可选择Tkinter或PyQt,Tkinter简单易用,PyQt功能丰富,适合专业开发。

2小时内可以学会Python的基本编程概念和技能。1.学习变量和数据类型,2.掌握控制流(条件语句和循环),3.理解函数的定义和使用,4.通过简单示例和代码片段快速上手Python编程。

Python更易学且易用,C 则更强大但复杂。1.Python语法简洁,适合初学者,动态类型和自动内存管理使其易用,但可能导致运行时错误。2.C 提供低级控制和高级特性,适合高性能应用,但学习门槛高,需手动管理内存和类型安全。

Python在web开发、数据科学、机器学习、自动化和脚本编写等领域有广泛应用。1)在web开发中,Django和Flask框架简化了开发过程。2)数据科学和机器学习领域,NumPy、Pandas、Scikit-learn和TensorFlow库提供了强大支持。3)自动化和脚本编写方面,Python适用于自动化测试和系统管理等任务。

要在有限的时间内最大化学习Python的效率,可以使用Python的datetime、time和schedule模块。1.datetime模块用于记录和规划学习时间。2.time模块帮助设置学习和休息时间。3.schedule模块自动化安排每周学习任务。

Python在自动化、脚本编写和任务管理中表现出色。1)自动化:通过标准库如os、shutil实现文件备份。2)脚本编写:使用psutil库监控系统资源。3)任务管理:利用schedule库调度任务。Python的易用性和丰富库支持使其在这些领域中成为首选工具。
