Maison développement back-end Tutoriel Python Améliorer les applications GenAI avec KubeMQ : mise à l'échelle efficace de la génération augmentée par récupération (RAG)

Améliorer les applications GenAI avec KubeMQ : mise à l'échelle efficace de la génération augmentée par récupération (RAG)

Dec 26, 2024 am 09:00 AM

Enhancing GenAI Applications With KubeMQ: Efficiently Scaling Retrieval-Augmented Generation (RAG)

Alors que l'adoption de l'IA générative (GenAI) se développe dans tous les secteurs, les organisations exploitent de plus en plus les techniques de génération augmentée par récupération (RAG) pour renforcer leurs modèles d'IA avec des données en temps réel et riches en contexte. données. La gestion du flux complexe d'informations dans de telles applications pose des défis importants, en particulier lorsqu'il s'agit de données générées en continu à grande échelle. KubeMQ, un courtier de messages robuste, apparaît comme une solution pour rationaliser le routage de plusieurs processus RAG, garantissant une gestion efficace des données dans les applications GenAI.

Pour améliorer encore l'efficacité et l'évolutivité des flux de travail RAG, l'intégration d'une base de données hautes performances telle que FalkorDB est essentielle. FalkorDB fournit une solution de stockage fiable et évolutive pour les bases de connaissances dynamiques dont dépendent les systèmes RAG, garantissant une récupération rapide des données et une intégration transparente avec des systèmes de messagerie comme KubeMQ.

Comprendre RAG dans les workflows GenAI

RAG est un paradigme qui améliore les modèles d'IA génératifs en intégrant un mécanisme de récupération, permettant aux modèles d'accéder à des bases de connaissances externes lors de l'inférence. Cette approche améliore considérablement l'exactitude, la pertinence et la rapidité des réponses générées en les fondant sur les informations disponibles les plus récentes et les plus pertinentes.

Dans les flux de travail GenAI typiques employant RAG, le processus comporte plusieurs étapes :

  1. Traitement des requêtes : interprétation de la saisie de l'utilisateur pour comprendre l'intention et le contexte

  2. Récupération : Récupération de documents ou de données pertinents à partir d'une base de connaissances dynamique, telle que FalkorDB, qui garantit un accès rapide et efficace aux informations les plus récentes et pertinentes.

  3. Génération : Produire une réponse en utilisant à la fois les données saisies et les données récupérées

  4. Livraison de la réponse : fourniture du résultat final enrichi à l'utilisateur

La mise à l'échelle de ces étapes, en particulier dans les environnements où les données sont générées et mises à jour en permanence, nécessite un mécanisme efficace et fiable pour le flux de données entre les différents composants du pipeline RAG.

Le rôle critique de KubeMQ dans le traitement RAG

Gérer des flux de données continus à grande échelle

Dans des scénarios tels que les réseaux IoT, les plateformes de médias sociaux ou les systèmes d'analyse en temps réel, de nouvelles données sont sans cesse produites et les modèles d'IA doivent s'adapter rapidement pour intégrer ces informations. Les architectures requête-réponse traditionnelles peuvent devenir des goulots d'étranglement dans des conditions de haut débit, entraînant des problèmes de latence et une dégradation des performances.

KubeMQ gère des scénarios de messagerie à haut débit en fournissant une infrastructure évolutive et robuste pour un routage efficace des données entre les services. En intégrant KubeMQ dans le pipeline RAG, chaque nouveau point de données est publié dans une file d'attente ou un flux de messages, garantissant ainsi que les composants de récupération ont un accès immédiat aux dernières informations sans surcharger le système. Cette capacité de traitement des données en temps réel est cruciale pour maintenir la pertinence et l'exactitude des résultats de GenAI.

Servir de routeur optimal

KubeMQ offre une variété de modèles de messagerie, notamment les files d'attente, les flux, la publication-abonnement (pub/sub) et les appels de procédure à distance (RPC), ce qui en fait un routeur polyvalent et puissant au sein d'un pipeline RAG. Sa faible latence et ses caractéristiques hautes performances garantissent une livraison rapide des messages, ce qui est essentiel pour les applications GenAI en temps réel où les retards peuvent avoir un impact significatif sur l'expérience utilisateur et l'efficacité du système.

De plus, la capacité de KubeMQ à gérer une logique de routage complexe permet des stratégies sophistiquées de distribution de données. Cela garantit que les différents composants du système d'IA reçoivent précisément les données dont ils ont besoin, quand ils en ont besoin, sans duplication ni retard inutiles.

Intégration de FalkorDB pour une gestion améliorée des données

Alors que KubeMQ achemine efficacement les messages entre les services, FalkorDB complète cela en fournissant une solution de base de données graphique évolutive et hautes performances pour stocker et récupérer les grandes quantités de données requises par les processus RAG. Cette intégration garantit que lorsque les nouvelles données transitent par KubeMQ, elles sont stockées de manière transparente dans FalkorDB, ce qui les rend facilement disponibles pour les opérations de récupération sans introduire de latence ni de goulots d'étranglement.

Améliorer l'évolutivité et la fiabilité

À mesure que les applications GenAI augmentent à la fois en termes de base d'utilisateurs et de volume de données, l'évolutivité devient une préoccupation primordiale. KubeMQ est évolutif et prend en charge la mise à l'échelle horizontale pour s'adapter de manière transparente à une charge accrue. Il garantit qu'à mesure que le nombre de processus RAG augmente ou que la génération de données s'accélère, l'infrastructure de messagerie reste robuste et réactive.

De plus, KubeMQ assure la persistance des messages et la tolérance aux pannes. En cas de panne du système ou de perturbation du réseau, KubeMQ garantit que les messages ne sont pas perdus et que le système peut récupérer correctement. Cette fiabilité est essentielle au maintien de l'intégrité des applications d'IA dont les utilisateurs dépendent pour obtenir des informations précises et opportunes.

Éliminer le besoin de services de routage dédiés

La mise en œuvre de services de routage personnalisés pour le traitement des données dans les pipelines RAG peut être complexe et gourmande en ressources. La création, la maintenance et la mise à l'échelle de ces services nécessitent souvent des efforts de développement importants, ce qui détourne l'attention du développement d'applications d'IA de base.

En adoptant KubeMQ, les organisations éliminent le besoin de créer des solutions de routage sur mesure. KubeMQ fournit des fonctionnalités prêtes à l'emploi qui répondent aux besoins de routage des processus RAG, notamment des modèles de routage complexes, le filtrage des messages et la gestion des priorités. Cela réduit non seulement les frais de développement et de maintenance, mais accélère également la mise sur le marché des solutions GenAI.

Accès unifié via REST et SDK

KubeMQ propose plusieurs interfaces pour interagir avec ses capacités de courtier de messages :

  • API REST : permet une intégration indépendante du langage, permettant aux services écrits dans n'importe quel langage de programmation d'envoyer et de recevoir des messages via HTTP

  • SDK : fournit des bibliothèques client pour divers langages de programmation (tels que Python, Java, Go et .NET), facilitant des modèles de communication plus efficaces et de meilleures performances grâce à des intégrations natives

Cette flexibilité permet aux développeurs de choisir la méthode la plus appropriée pour leur cas d'utilisation spécifique, simplifiant l'architecture et accélérant les cycles de développement. Un point de contact unique pour le routage des données rationalise la communication entre les différents composants du pipeline RAG, améliorant ainsi la cohérence globale du système.

Implémentation de KubeMQ dans un pipeline RAG : un exemple détaillé

L'exemple de code montre comment créer un système de récupération d'informations sur les films en intégrant KubeMQ dans un pipeline RAG. Il configure un serveur qui ingère les URL de films de Rotten Tomatoes pour créer un graphique de connaissances à l'aide de GPT-4. Les utilisateurs peuvent interagir avec ce système via un client de chat, en envoyant des requêtes liées aux films et en recevant des réponses générées par l'IA. Ce cas d'utilisation montre comment gérer l'ingestion continue de données et le traitement des requêtes en temps réel dans une application pratique, en utilisant KubeMQ pour une gestion efficace des messages et une communication interservices dans le contexte de films.

Présentation de l'architecture

  1. Service d'ingestion de données : capture et publie de nouvelles données dans les flux KubeMQ dès qu'elles deviennent disponibles

  2. Service de récupération : Abonnez-vous au flux KubeMQ pour recevoir les mises à jour et actualiser la base de connaissances

  3. Service de génération : écoute les demandes de requête, interagit avec le modèle d'IA et génère des réponses

  4. Service de réponse : renvoie les réponses générées aux utilisateurs via les canaux appropriés

Configuration de KubeMQ

Assurez-vous que KubeMQ est opérationnel, ce qui peut être réalisé en le déployant à l'aide de Docker :

docker run -d --rm \
  -p 8080:8080 \
  -p 50000:50000 \
  -p 9090:9090 \
  -e KUBEMQ_TOKEN="your token"
Copier après la connexion
Copier après la connexion

Cette commande démarre KubeMQ avec les ports nécessaires exposés pour les communications REST et gRPC.

Côté serveur RAG

Ce code (dépôt GitHub) implémente un serveur RAG qui traite les requêtes de chat et gère les sources de connaissances à l'aide de KubeMQ pour la gestion des messages.

docker run -d --rm \
  -p 8080:8080 \
  -p 50000:50000 \
  -p 9090:9090 \
  -e KUBEMQ_TOKEN="your token"
Copier après la connexion
Copier après la connexion

Le serveur exécute deux threads principaux : un qui s'abonne aux requêtes de chat via un canal appelé "rag-chat-query" et les traite à l'aide d'un graphe de connaissances avec GPT-4, et un autre qui extrait continuellement d'une file d'attente appelée "rag -sources-queue" pour ajouter de nouvelles sources au graphe de connaissances. Le graphe de connaissances est initialisé avec une ontologie personnalisée chargée à partir d'un fichier JSON et utilise le modèle GPT-4 d'OpenAI pour le traitement. Le serveur implémente une gestion des arrêts et des erreurs en douceur, garantissant que tous les threads sont correctement terminés lorsque le serveur est arrêté.

Envoi de données sources à ingérer dans RAG Knowledge Graph

# server.py

import json
import threading
from typing import List

from dotenv import load_dotenv
load_dotenv()
import time
from kubemq.common import CancellationToken
from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription
from kubemq.queues import Client as QueuesClient
from graphrag_sdk.models.openai import OpenAiGenerativeModel
from graphrag_sdk.model_config import KnowledgeGraphModelConfig
from graphrag_sdk import KnowledgeGraph, Ontology
from graphrag_sdk.source import URL

class RAGServer:
   def __init__(self):
       self.cq_client = CQClient(address="localhost:50000")
       self.queues_client = QueuesClient(address="localhost:50000")
       model = OpenAiGenerativeModel(model_name="gpt-4o")
       with open("ontology.json", "r") as f:
           ontology = json.load(f)
       ontology = Ontology.from_json(ontology)
       self.kg = KnowledgeGraph(
           name="movies",
           model_config=KnowledgeGraphModelConfig.with_model(model),
           ontology=ontology)
       self.chat = self.kg.chat_session()
       self.shutdown_event = threading.Event()
       self.threads: List[threading.Thread] = []

   def handle_chat(self, request: QueryMessageReceived):
       try:
           message = request.body.decode('utf-8')
           print(f"Received chat message: {message}")
           result= self.chat.send_message(message)
           answer = result.get("response","No answer")
           print(f"Chat response: {answer}")
           response = QueryResponseMessage(
               query_received=request,
               is_executed=True,
               body=answer.encode('utf-8')
           )
           self.cq_client.send_response_message(response)
       except Exception as e:
           print(f"Error processing chat message: {str(e)}")
           self.cq_client.send_response_message(QueryResponseMessage(
               query_received=request,
               is_executed=False,
               error=str(e)
           ))

   def pull_from_queue(self):
       while not self.shutdown_event.is_set():
           try:
               result = self.queues_client.pull("rag-sources-queue", 10, 1)
               if result.is_error:
                   print(f"Error pulling message from queue: {result.error}")
                   continue
               sources = []
               for message in result.messages:
                   source = message.body.decode('utf-8')
                   print(f"Received source: {source}, adding to knowledge graph")
                   sources.append(URL(message.body.decode('utf-8')))
               if sources:
                   self.kg.process_sources(sources)
           except Exception as e:
               if not self.shutdown_event.is_set():  # Only log if not shutting down
                   print(f"Error processing sources: {str(e)}")

   def subscribe_to_chat_queries(self):
       def on_error(err: str):
           if not self.shutdown_event.is_set():  # Only log if not shutting down
               print(f"Error: {err}")

       cancellation_token = CancellationToken()

       try:
           self.cq_client.subscribe_to_queries(
               subscription=QueriesSubscription(
                   channel="rag-chat-query",
                   on_receive_query_callback=self.handle_chat,
                   on_error_callback=on_error,
               ),
               cancel=cancellation_token
           )

           # Wait for shutdown signal
           while not self.shutdown_event.is_set():
               time.sleep(0.1)


           # Cancel subscription when shutdown is requested
           cancellation_token.cancel()

       except Exception as e:
           if not self.shutdown_event.is_set():
               print(f"Error in subscription thread: {str(e)}")
   def run(self):

       chat_thread = threading.Thread(target=self.subscribe_to_chat_queries)
       queue_thread = threading.Thread(target=self.pull_from_queue)

       self.threads.extend([chat_thread, queue_thread])

       for thread in self.threads:
           thread.daemon = True  # Make threads daemon so they exit when main thread exits
           thread.start()

       print("RAG server started")
       try:
           while True:
               time.sleep(1)
       except KeyboardInterrupt:
           print("\nShutting down gracefully...")
           self.shutdown()
           self.cq_client.close()
           self.queues_client.close()

   def shutdown(self):

       print("Initiating shutdown sequence...")
       self.shutdown_event.set()  # Signal all threads to stop

       for thread in self.threads:
           thread.join(timeout=5.0)  # Wait up to 5 seconds for each thread
           if thread.is_alive():
               print(f"Warning: Thread {thread.name} did not shutdown cleanly")

       print("Shutdown complete")
if __name__ == "__main__":
   rag_server = RAGServer()
   rag_server.run()
Copier après la connexion

Ce code implémente un client simple qui envoie les URL des films au serveur RAG via le système de file d'attente de KubeMQ. Plus précisément, il crée une classe SourceClient qui se connecte à KubeMQ et envoie des messages au canal « rag-sources-queue », qui est la même file d'attente que celle surveillée par le serveur RAG. Lorsqu'il est exécuté en tant que programme principal, il envoie une liste d'URL de films Rotten Tomatoes (y compris les films Matrix, John Wick et Speed) à traiter et à ajouter au graphique de connaissances par le serveur RAG.

Envoyer et recevoir des questions et des réponses

# sources_client.py

from kubemq.queues import *

class SourceClient:
   def __init__(self, address="localhost:50000"):
       self.client = Client(address=address)

   def send_source(self, message: str) :
       send_result = self.client.send_queues_message(
           QueueMessage(
               channel="rag-sources-queue",
               body=message.encode("utf-8"),
           )
       )
       if send_result.is_error:
           print(f"message send error, error:{send_result.error}")

if __name__ == "__main__":
   client = SourceClient()
   urls = ["https://www.rottentomatoes.com/m/side_by_side_2012",
       "https://www.rottentomatoes.com/m/matrix",
       "https://www.rottentomatoes.com/m/matrix_revolutions",
       "https://www.rottentomatoes.com/m/matrix_reloaded",
       "https://www.rottentomatoes.com/m/speed_1994",
       "https://www.rottentomatoes.com/m/john_wick_chapter_4"]
   for url in urls:
       client.send_source(url)
   print("done")
Copier après la connexion

Ce code implémente un client de chat qui communique avec le serveur RAG via le système de requêtes de KubeMQ. La classe ChatClient envoie des messages au canal "rag-chat-query" et attend les réponses, avec un délai d'expiration de 30 secondes pour chaque requête. Lorsqu'il est exécuté en tant que programme principal, il démontre la fonctionnalité du client en envoyant deux questions connexes sur le réalisateur de Matrix et sa connexion à Keanu Reeves, en imprimant chaque réponse au fur et à mesure qu'il les reçoit.

Référentiel de codes

Tous les exemples de code peuvent être trouvés dans mon fork du référentiel GitHub d'origine.

Conclusion

L'intégration de KubeMQ dans les pipelines RAG pour les applications GenAI fournit un mécanisme évolutif, fiable et efficace pour gérer les flux de données continus et les communications inter-processus complexes. En servant de routeur unifié avec des modèles de messagerie polyvalents, KubeMQ simplifie l'architecture globale, réduit le besoin de solutions de routage personnalisées et accélère les cycles de développement.

De plus, l'intégration de FalkorDB améliore la gestion des données en offrant une base de connaissances hautes performances qui s'intègre parfaitement à KubeMQ. Cette combinaison garantit une récupération et un stockage optimisés des données, prenant en charge les exigences dynamiques des processus RAG.

La capacité à gérer des scénarios à haut débit, combinée à des fonctionnalités telles que la persistance et la tolérance aux pannes, garantit que les applications GenAI restent réactives et fiables, même sous de lourdes charges ou face à des perturbations du système.

En tirant parti de KubeMQ et FalkorDB, les organisations peuvent se concentrer sur l'amélioration de leurs modèles d'IA et fournir des informations et des services précieux, en étant sûres que leur infrastructure de routage de données est robuste et capable de répondre aux exigences des flux de travail d'IA modernes.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn

Outils d'IA chauds

Undresser.AI Undress

Undresser.AI Undress

Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover

AI Clothes Remover

Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool

Undress AI Tool

Images de déshabillage gratuites

Clothoff.io

Clothoff.io

Dissolvant de vêtements AI

Video Face Swap

Video Face Swap

Échangez les visages dans n'importe quelle vidéo sans effort grâce à notre outil d'échange de visage AI entièrement gratuit !

Article chaud

<🎜>: Bubble Gum Simulator Infinity - Comment obtenir et utiliser les clés royales
4 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌
<🎜>: Grow A Garden - Guide de mutation complet
3 Il y a quelques semaines By DDD
Nordhold: Système de fusion, expliqué
4 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌
Mandragora: Whispers of the Witch Tree - Comment déverrouiller le grappin
3 Il y a quelques semaines By 尊渡假赌尊渡假赌尊渡假赌

Outils chauds

Bloc-notes++7.3.1

Bloc-notes++7.3.1

Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise

SublimeText3 version chinoise

Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1

Envoyer Studio 13.0.1

Puissant environnement de développement intégré PHP

Dreamweaver CS6

Dreamweaver CS6

Outils de développement Web visuel

SublimeText3 version Mac

SublimeText3 version Mac

Logiciel d'édition de code au niveau de Dieu (SublimeText3)

Sujets chauds

Tutoriel Java
1673
14
Tutoriel PHP
1278
29
Tutoriel C#
1257
24
Python vs C: courbes d'apprentissage et facilité d'utilisation Python vs C: courbes d'apprentissage et facilité d'utilisation Apr 19, 2025 am 12:20 AM

Python est plus facile à apprendre et à utiliser, tandis que C est plus puissant mais complexe. 1. La syntaxe Python est concise et adaptée aux débutants. Le typage dynamique et la gestion automatique de la mémoire le rendent facile à utiliser, mais peuvent entraîner des erreurs d'exécution. 2.C fournit des fonctionnalités de contrôle de bas niveau et avancées, adaptées aux applications haute performance, mais a un seuil d'apprentissage élevé et nécessite une gestion manuelle de la mémoire et de la sécurité.

Apprendre Python: 2 heures d'étude quotidienne est-elle suffisante? Apprendre Python: 2 heures d'étude quotidienne est-elle suffisante? Apr 18, 2025 am 12:22 AM

Est-ce suffisant pour apprendre Python pendant deux heures par jour? Cela dépend de vos objectifs et de vos méthodes d'apprentissage. 1) Élaborer un plan d'apprentissage clair, 2) Sélectionnez les ressources et méthodes d'apprentissage appropriées, 3) la pratique et l'examen et la consolidation de la pratique pratique et de l'examen et de la consolidation, et vous pouvez progressivement maîtriser les connaissances de base et les fonctions avancées de Python au cours de cette période.

Python vs. C: Explorer les performances et l'efficacité Python vs. C: Explorer les performances et l'efficacité Apr 18, 2025 am 12:20 AM

Python est meilleur que C dans l'efficacité du développement, mais C est plus élevé dans les performances d'exécution. 1. La syntaxe concise de Python et les bibliothèques riches améliorent l'efficacité du développement. Les caractéristiques de type compilation et le contrôle du matériel de CC améliorent les performances d'exécution. Lorsque vous faites un choix, vous devez peser la vitesse de développement et l'efficacité de l'exécution en fonction des besoins du projet.

Python vs C: Comprendre les principales différences Python vs C: Comprendre les principales différences Apr 21, 2025 am 12:18 AM

Python et C ont chacun leurs propres avantages, et le choix doit être basé sur les exigences du projet. 1) Python convient au développement rapide et au traitement des données en raison de sa syntaxe concise et de son typage dynamique. 2) C convient à des performances élevées et à une programmation système en raison de son typage statique et de sa gestion de la mémoire manuelle.

Quelle partie fait partie de la bibliothèque standard Python: listes ou tableaux? Quelle partie fait partie de la bibliothèque standard Python: listes ou tableaux? Apr 27, 2025 am 12:03 AM

PythonlistSaReparmentofthestandardLibrary, tandis que les coloccules de colocède, tandis que les colocculations pour la base de la Parlementaire, des coloments de forage polyvalent, tandis que la fonctionnalité de la fonctionnalité nettement adressée.

Python: automatisation, script et gestion des tâches Python: automatisation, script et gestion des tâches Apr 16, 2025 am 12:14 AM

Python excelle dans l'automatisation, les scripts et la gestion des tâches. 1) Automatisation: La sauvegarde du fichier est réalisée via des bibliothèques standard telles que le système d'exploitation et la fermeture. 2) Écriture de script: utilisez la bibliothèque PSUTIL pour surveiller les ressources système. 3) Gestion des tâches: utilisez la bibliothèque de planification pour planifier les tâches. La facilité d'utilisation de Python et la prise en charge de la bibliothèque riche en font l'outil préféré dans ces domaines.

Python pour l'informatique scientifique: un look détaillé Python pour l'informatique scientifique: un look détaillé Apr 19, 2025 am 12:15 AM

Les applications de Python en informatique scientifique comprennent l'analyse des données, l'apprentissage automatique, la simulation numérique et la visualisation. 1.Numpy fournit des tableaux multidimensionnels et des fonctions mathématiques efficaces. 2. Scipy étend la fonctionnalité Numpy et fournit des outils d'optimisation et d'algèbre linéaire. 3. Pandas est utilisé pour le traitement et l'analyse des données. 4.Matplotlib est utilisé pour générer divers graphiques et résultats visuels.

Python pour le développement Web: applications clés Python pour le développement Web: applications clés Apr 18, 2025 am 12:20 AM

Les applications clés de Python dans le développement Web incluent l'utilisation des cadres Django et Flask, le développement de l'API, l'analyse et la visualisation des données, l'apprentissage automatique et l'IA et l'optimisation des performances. 1. Framework Django et Flask: Django convient au développement rapide d'applications complexes, et Flask convient aux projets petits ou hautement personnalisés. 2. Développement de l'API: Utilisez Flask ou DjangorestFramework pour construire RestulAPI. 3. Analyse et visualisation des données: utilisez Python pour traiter les données et les afficher via l'interface Web. 4. Apprentissage automatique et AI: Python est utilisé pour créer des applications Web intelligentes. 5. Optimisation des performances: optimisée par la programmation, la mise en cache et le code asynchrones

See all articles