


Real-Time Data Processing with MongoDB Change Streams and Python
Introduction
Change streams in MongoDB allow your application to react to real-time data changes instantly. In this blog post, I’ll show you how to set up and use change streams with Python, without diving too deep into theory. We'll create a simple program that listens to database events, focusing on inserts first, then extending it to other event types.
Getting Started with Change Streams
Change streams let your app listen to specific database events, like inserts or updates, and respond immediately. Imagine a scenario where a user updates their profile; with change streams, you can instantly reflect this change across your app without needing the user to refresh the page. Before this feature, you had to constantly poll the database or use complex methods like tailing the MongoDB Oplog. Change streams simplify this by providing a more user-friendly API.
What Happens Without Change Streams
Let's say I have an API to upload invoices. The flow is that customers will upload an image of the invoice to MongoDB, then we extract the information with AI and update the invoice. Here's an example of the code for uploading an invoice:
from pymongo import MongoClient class MongoDatabase: def __init__(self, config_path: str): # Load the YAML configuration file using the provided utility function self.config_path = config_path self.config = read_config(path=self.config_path) # Initialize MongoDB connection self.client = MongoClient(self.config['mongodb']['uri']) self.db = self.client[self.config['mongodb']['database']] self.collection = self.db[self.config['mongodb']['collection']] def create_document(self, data: Dict[str, Any]) -> str: # Insert a new document and return the automatically generated document ID result = self.collection.insert_one(data) return str(result.inserted_id) def update_document_by_id(self, document_id: str, data: Dict[str, Any]): try: self.collection.update_one({"_id": document_id}, {"$set": data}) except PyMongoError as e: print(f"Error updating document: {e}")
First I will wrap the pymongo inside a class, just in case :))
@app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) # Generate invoice UUID current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "last_modified_at": None, "last_modified_by": None, "status": "not extracted", "invoice_image_base64": img, "invoice_info": {} } invoice_uuid = mongo_db.create_document(invoice_document) print('Result saved to MongoDB:', invoice_uuid) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) return JSONResponse( status_code=status.HTTP_201_CREATED, content={"invoice_uuid": invoice_uuid, "message": "Upload successful"} ) except Exception as e: # Handle errors return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
A reasonable question might be: why not wait until the AI model processes the image before updating? The problem is that it takes around 4-5 minutes to process, and we don't want to affect the user experience.
How about Kafka?
Another option could be using Kafka. We could publish the image to a Kafka topic, and another service would process the data.
Pros:
- Decouples the upload and processing services.
- Efficient for large-scale, real-time data processing.
- Improved user experience: Users get an immediate response after uploading the invoice. The processing is handled asynchronously.
Cons:
- Introduces additional complexity.
- Requires setup and maintenance of Kafka infrastructure.
- May be overkill for small-scale applications.
Here’s a basic implementation to demonstrate using Kafka to handle the invoice upload process.
The user uploads an invoice through an API endpoint. The invoice image is saved in MongoDB, and a message is sent to a Kafka topic for further processing.
from kafka import KafkaProducer import json from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse from datetime import datetime, timezone app = FastAPI() producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: body = await request.json() img = body.get("img") user_uuid = body.get("user_uuid") if not img or not is_base64(img): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"status": "error", "message": "Base64 image is required"}, ) current_time = datetime.now(timezone.utc) img = valid_base64_image(img) invoice_document = { "invoice_type": None, "created_at": current_time, "created_by": user_uuid, "status": "not extracted", "invoice_image_base64": img, } # Save the document to MongoDB invoice_uuid = mongo_db.create_document(invoice_document) mongo_db.update_document_by_id(invoice_uuid, {"invoice_uuid": invoice_uuid}) # Send a message to Kafka topic producer.send('invoice_topic', invoice_document) producer.flush() return JSONResponse( status_code=status.HTTP_201_CREATED, content={"message": "Invoice upload received and will be processed"} ) except Exception as e: return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"status": "error", "message": str(e)} )
The Kafka consumer listens to the invoice_topic. When it receives a message, it processes the invoice (e.g., extracting information from the image) and updates the corresponding document in MongoDB.
from kafka import KafkaConsumer import json consumer = KafkaConsumer( 'invoice_topic', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) for message in consumer: invoice_document = message.value # Process the invoice, extract information, and update the document in MongoDB invoice_uuid = invoice_document["_id"] extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) mongo_db.update_document_by_id(invoice_uuid, { "invoice_info": extracted_data, "status": "extracted" }) print(f"Processed and updated invoice: {invoice_uuid}")
Flow Summary:
- Upload Invoice: The user uploads an invoice through the API.
- Save to MongoDB: The invoice document is saved in MongoDB.
- Send Message to Kafka: A message containing the invoice details is sent to a Kafka topic (invoice_topic).
- Kafka Consumer Processes Invoice: A Kafka consumer listens to invoice_topic, processes the invoice, and updates the corresponding document in MongoDB with the extracted information.
Wow, I can't believe I managed to write this on my own! It really highlights the effort involved. And that's not even considering the complexities of managing and configuring the three services: MongoDB, Kafka, and the Invoice service.
Invoice Processing with MongoDB Change Streams
Here's the complete code rewritten in Markdown to demonstrate MongoDB change streams, including additional methods and functions to handle invoice processing triggered by the change stream.
We'll start by creating a MongoDB wrapper class that handles database operations such as creating documents and listening to change streams.
from pymongo import MongoClient from pymongo.errors import PyMongoError from typing import Dict, Any import threading import yaml class MongoDatabase: # Same code as before # def process_invoice(self, invoice_document: Dict[str, Any]): """Process the invoice by extracting data and updating the document in MongoDB.""" try: # Simulate extracting information from the invoice image extracted_data = extract_invoice_data(invoice_document["invoice_image_base64"]) invoice_uuid = invoice_document["_id"] # Update the invoice document with the extracted data self.update_document_by_id(invoice_uuid, {"invoice_info": extracted_data, "status": "extracted"}) print(f"Processed and updated invoice: {invoice_uuid}") except Exception as e: print(f"Error processing invoice: {str(e)}") def start_change_stream_listener(self): """Start listening to the change stream for the collection.""" def listen(): try: with self.collection.watch() as stream: for change in stream: if change['operationType'] == 'insert': invoice_document = change['fullDocument'] print(f"New invoice detected: {invoice_document['_id']}") self.process_invoice(invoice_document) except PyMongoError as e: print(f"Change stream error: {str(e)}") # Start the change stream listener in a separate thread listener_thread = threading.Thread(target=listen, daemon=True) listener_thread.start()
To make it easy I add process_invoice inside the MongoDatabase class. But you should leave it somewhere else
The upload API should be like the original one.
mongo_db = MongoDatabase(config_path='path_to_your_config.yaml') mongo_db.start_change_stream_listener() @app.post("/api/v1/invoices/upload") async def upload_invoice(request: Request): try: # Parse JSON body body = await request.json() # same code as before
Flow Summary:
- User Uploads Invoice: The user uploads an invoice through the API.
- Save to MongoDB: The invoice document is saved in MongoDB.
- MongoDB Change Stream Triggered: The MongoDB change stream detects the insertion of the new document.
- Invoice Processing: The change stream triggers the process_invoice function, which processes the invoice and updates the document in MongoDB with the extracted information.
Conclusion
With MongoDB change streams, you can efficiently process real-time changes in your database. Extending this example, you can handle various events such as updates and deletes, making your application more reactive and responsive.
Reference:
- https://www.mongodb.com/developer/languages/python/python-change-streams/#listen-to-inserts-from-an-application
The above is the detailed content of Real-Time Data Processing with MongoDB Change Streams and Python. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics

Python is suitable for data science, web development and automation tasks, while C is suitable for system programming, game development and embedded systems. Python is known for its simplicity and powerful ecosystem, while C is known for its high performance and underlying control capabilities.

You can learn the basics of Python within two hours. 1. Learn variables and data types, 2. Master control structures such as if statements and loops, 3. Understand the definition and use of functions. These will help you start writing simple Python programs.

Python excels in gaming and GUI development. 1) Game development uses Pygame, providing drawing, audio and other functions, which are suitable for creating 2D games. 2) GUI development can choose Tkinter or PyQt. Tkinter is simple and easy to use, PyQt has rich functions and is suitable for professional development.

You can learn basic programming concepts and skills of Python within 2 hours. 1. Learn variables and data types, 2. Master control flow (conditional statements and loops), 3. Understand the definition and use of functions, 4. Quickly get started with Python programming through simple examples and code snippets.

Python is easier to learn and use, while C is more powerful but complex. 1. Python syntax is concise and suitable for beginners. Dynamic typing and automatic memory management make it easy to use, but may cause runtime errors. 2.C provides low-level control and advanced features, suitable for high-performance applications, but has a high learning threshold and requires manual memory and type safety management.

Python is widely used in the fields of web development, data science, machine learning, automation and scripting. 1) In web development, Django and Flask frameworks simplify the development process. 2) In the fields of data science and machine learning, NumPy, Pandas, Scikit-learn and TensorFlow libraries provide strong support. 3) In terms of automation and scripting, Python is suitable for tasks such as automated testing and system management.

To maximize the efficiency of learning Python in a limited time, you can use Python's datetime, time, and schedule modules. 1. The datetime module is used to record and plan learning time. 2. The time module helps to set study and rest time. 3. The schedule module automatically arranges weekly learning tasks.

Python is highly favored for its simplicity and power, suitable for all needs from beginners to advanced developers. Its versatility is reflected in: 1) Easy to learn and use, simple syntax; 2) Rich libraries and frameworks, such as NumPy, Pandas, etc.; 3) Cross-platform support, which can be run on a variety of operating systems; 4) Suitable for scripting and automation tasks to improve work efficiency.
