首頁 後端開發 Python教學 使用 KubeMQ 增強 GenAI 應用程式:有效擴展檢索增強生成 (RAG)

使用 KubeMQ 增強 GenAI 應用程式:有效擴展檢索增強生成 (RAG)

Dec 26, 2024 am 09:00 AM

Enhancing GenAI Applications With KubeMQ: Efficiently Scaling Retrieval-Augmented Generation (RAG)

隨著生成式人工智慧(GenAI) 在各行業的應用激增,組織越來越多地利用檢索增強生成(RAG) 技術透過即時、上下文豐富的內容來支援其人工智慧模型數據。管理此類應用程式中複雜的資訊流帶來了重大挑戰,特別是在處理大規模連續生成的資料時。 KubeMQ 是一個強大的訊息代理,作為簡化多個 RAG 進程的路由的解決方案而出現,確保 GenAI 應用程式中的高效資料處理。

為了進一步提高 RAG 工作流程的效率和可擴展性,整合 FalkorDB 這樣的高效能資料庫至關重要。 FalkorDB 為 RAG 系統所依賴的動態知識庫提供可靠且可擴展的儲存解決方案,確保快速資料檢索以及與 KubeMQ 等訊息傳遞系統的無縫整合。

了解 GenAI 工作流程中的 RAG

RAG 是一種透過整合檢索機制來增強生成式 AI 模型的範例,可讓模型在推理過程中存取外部知識庫。這種方法透過將產生的回應基於可用的最新相關信息,顯著提高了產生回應的準確性、相關性和及時性。

在使用 RAG 的典型 GenAI 工作流程中,該過程涉及多個步驟:

  1. 查詢處理:解釋使用者的輸入以了解意圖和上下文

  2. 檢索:從動態知識庫(例如 FalkorDB)中取得相關文件或數據,確保快速且有效率地存取最新且相關的資訊。

  3. 產生:使用輸入和檢索到的資料產生回應

  4. 回應交付:提供使用者最終的、豐富的輸出

擴展這些步驟,尤其是在資料不斷產生和更新的環境中,需要一種高效可靠的機制來在 RAG 管道的各個組件之間傳輸資料。

KubeMQ 在 RAG 處理中的關鍵作用

大規模處理連續資料流

在物聯網網路、社群媒體平台或即時分析系統等場景中,不斷產生新數據,人工智慧模型必須迅速適應以合併這些資訊。傳統的請求-回應架構在高吞吐量條件下可能成為瓶頸,導致延遲問題和效能下降。

KubeMQ 透過提供可擴展且強大的基礎設施來管理高吞吐量訊息傳遞場景,以實現服務之間的高效資料路由。透過將 KubeMQ 整合到 RAG 管道中,每個新資料點都會發佈到訊息佇列或串流中,確保檢索元件可以立即存取最新訊息,而不會壓垮系統。這種即時數據處理能力對於維持 GenAI 輸出的相關性和準確性至關重要。

作為最佳路由器

KubeMQ 提供各種訊息傳遞模式 - 包括佇列、串流、發布-訂閱 (pub/sub) 和遠端過程呼叫 (RPC) - 使其成為 RAG 管道中多功能且功能強大的路由器。其低延遲和高效能特性可確保及時的訊息傳遞,這對於即時 GenAI 應用程式至關重要,因為延遲會嚴重影響使用者體驗和系統效率。

此外,KubeMQ 處理複雜路由邏輯的能力允許複雜的資料分發策略。這確保了人工智慧系統的不同組件在需要時準確接收所需的數據,而不會出現不必要的重複或延遲。

整合 FalkorDB 以增強資料管理

雖然 KubeMQ 在服務之間有效地路由訊息,FalkorDB 透過提供可擴展且高效能的圖形資料庫解決方案來儲存和檢索 RAG 流程所需的大量資料來補充這一點。這種整合確保當新資料流經 KubeMQ 時,它會無縫儲存在 FalkorDB 中,使其可隨時用於檢索操作,而不會引入延遲或瓶頸。

增強可擴展性和可靠性

隨著 GenAI 應用程式的用戶群和資料量不斷增長,可擴展性成為最重要的問題。 KubeMQ 具有可擴展性,支援水平擴展以無縫適應增加的負載。它確保隨著 RAG 進程數量的增加或資料產生的加速,訊息傳遞基礎設施保持穩健和回應能力。

此外,KubeMQ 還提供訊息持久化和容錯能力。當發生系統故障或網路中斷時,KubeMQ 可確保訊息不會遺失且系統可以正常復原。這種可靠性對於維護人工智慧應用程式的完整性至關重要,用戶依賴這些應用程式來獲取及時、準確的資訊。

消除專用路由服務的需求

在 RAG 管道中實作用於資料處理的自訂路由服務可能會佔用大量資源且複雜。通常需要大量的開發工作來建立、維護和擴展這些服務,從而分散了核心人工智慧應用程式開發的注意力。

透過採用 KubeMQ,組織無需建立客製化路由解決方案。 KubeMQ 提供開箱即用的功能,可滿足 RAG 程序的路由需求,包括複雜的路由模式、訊息過濾和優先權處理。這不僅減少了開發和維護開銷,還加快了 GenAI 解決方案的上市時間。

透過REST和SDK統一訪問

KubeMQ 提供了多個與其訊息代理功能互動的介面:

  • REST API:支援與語言無關的集成,允許以任何程式語言編寫的服務透過 HTTP 發送和接收訊息

  • SDK:為各種程式語言(例如Python、Java、Go 和.NET)提供用戶端程式庫,透過本機整合促進更有效率的通訊模式和更好的效能

這種靈活性允許開發人員為其特定用例選擇最合適的方法,從而簡化架構並加快開發週期。資料路由的單一接觸點簡化了 RAG 管道不同組件之間的通信,從而增強了整體系統的一致性。

在 RAG 管道中實現 KubeMQ:詳細範例

程式碼範例展示如何透過將 KubeMQ 整合到 RAG 管道來建立電影資訊檢索系統。它設定了一個伺服器,從爛番茄中提取電影 URL,以使用 GPT-4 建立知識圖譜。用戶可以透過聊天用戶端與該系統交互,發送與電影相關的查詢並接收人工智慧產生的回應。此用例示範如何在實際應用程式中處理連續資料攝取和即時查詢處理,利用 KubeMQ 在影片上下文中進行高效的訊息處理和服務間通訊。

架構概述

  1. 資料攝取服務:擷取新資料並發佈到可用的 KubeMQ 串流

  2. 檢索服務:訂閱KubeMQ流以接收更新並刷新知識庫

  3. 產生服務:監聽查詢請求,與AI模型交互,並產生回應

  4. 回應服務:將產生的回應透過適當的管道傳回使用者

設定 KubeMQ

確保 KubeMQ 可以運行,這可以透過使用 Docker 部署來實現:

docker run -d --rm \
  -p 8080:8080 \
  -p 50000:50000 \
  -p 9090:9090 \
  -e KUBEMQ_TOKEN="your token"
登入後複製
登入後複製

此指令啟動 KubeMQ,並為 REST 和 gRPC 通訊公開必要的連接埠。

RAG伺服器端

此程式碼(GitHub 儲存庫)實作了一個 RAG 伺服器,該伺服器處理聊天查詢並使用 KubeMQ 進行訊息處理來管理知識來源。

docker run -d --rm \
  -p 8080:8080 \
  -p 50000:50000 \
  -p 9090:9090 \
  -e KUBEMQ_TOKEN="your token"
登入後複製
登入後複製

伺服器執行兩個主執行緒:一個透過名為「rag-chat-query」的通道訂閱聊天查詢,並使用GPT-4 的知識圖來處理它們,另一個從名為「rag」的佇列中持續拉取-sources-queue」將新來源加入知識圖譜。知識圖譜使用從JSON 檔案載入的自訂本體進行初始化,並使用OpenAI 的GPT-4模型進行處理。

發送來源資料以攝取到 RAG 知識圖譜中

# server.py

import json
import threading
from typing import List

from dotenv import load_dotenv
load_dotenv()
import time
from kubemq.common import CancellationToken
from kubemq.cq import Client as CQClient, QueryMessageReceived, QueryResponseMessage, QueriesSubscription
from kubemq.queues import Client as QueuesClient
from graphrag_sdk.models.openai import OpenAiGenerativeModel
from graphrag_sdk.model_config import KnowledgeGraphModelConfig
from graphrag_sdk import KnowledgeGraph, Ontology
from graphrag_sdk.source import URL

class RAGServer:
   def __init__(self):
       self.cq_client = CQClient(address="localhost:50000")
       self.queues_client = QueuesClient(address="localhost:50000")
       model = OpenAiGenerativeModel(model_name="gpt-4o")
       with open("ontology.json", "r") as f:
           ontology = json.load(f)
       ontology = Ontology.from_json(ontology)
       self.kg = KnowledgeGraph(
           name="movies",
           model_config=KnowledgeGraphModelConfig.with_model(model),
           ontology=ontology)
       self.chat = self.kg.chat_session()
       self.shutdown_event = threading.Event()
       self.threads: List[threading.Thread] = []

   def handle_chat(self, request: QueryMessageReceived):
       try:
           message = request.body.decode('utf-8')
           print(f"Received chat message: {message}")
           result= self.chat.send_message(message)
           answer = result.get("response","No answer")
           print(f"Chat response: {answer}")
           response = QueryResponseMessage(
               query_received=request,
               is_executed=True,
               body=answer.encode('utf-8')
           )
           self.cq_client.send_response_message(response)
       except Exception as e:
           print(f"Error processing chat message: {str(e)}")
           self.cq_client.send_response_message(QueryResponseMessage(
               query_received=request,
               is_executed=False,
               error=str(e)
           ))

   def pull_from_queue(self):
       while not self.shutdown_event.is_set():
           try:
               result = self.queues_client.pull("rag-sources-queue", 10, 1)
               if result.is_error:
                   print(f"Error pulling message from queue: {result.error}")
                   continue
               sources = []
               for message in result.messages:
                   source = message.body.decode('utf-8')
                   print(f"Received source: {source}, adding to knowledge graph")
                   sources.append(URL(message.body.decode('utf-8')))
               if sources:
                   self.kg.process_sources(sources)
           except Exception as e:
               if not self.shutdown_event.is_set():  # Only log if not shutting down
                   print(f"Error processing sources: {str(e)}")

   def subscribe_to_chat_queries(self):
       def on_error(err: str):
           if not self.shutdown_event.is_set():  # Only log if not shutting down
               print(f"Error: {err}")

       cancellation_token = CancellationToken()

       try:
           self.cq_client.subscribe_to_queries(
               subscription=QueriesSubscription(
                   channel="rag-chat-query",
                   on_receive_query_callback=self.handle_chat,
                   on_error_callback=on_error,
               ),
               cancel=cancellation_token
           )

           # Wait for shutdown signal
           while not self.shutdown_event.is_set():
               time.sleep(0.1)


           # Cancel subscription when shutdown is requested
           cancellation_token.cancel()

       except Exception as e:
           if not self.shutdown_event.is_set():
               print(f"Error in subscription thread: {str(e)}")
   def run(self):

       chat_thread = threading.Thread(target=self.subscribe_to_chat_queries)
       queue_thread = threading.Thread(target=self.pull_from_queue)

       self.threads.extend([chat_thread, queue_thread])

       for thread in self.threads:
           thread.daemon = True  # Make threads daemon so they exit when main thread exits
           thread.start()

       print("RAG server started")
       try:
           while True:
               time.sleep(1)
       except KeyboardInterrupt:
           print("\nShutting down gracefully...")
           self.shutdown()
           self.cq_client.close()
           self.queues_client.close()

   def shutdown(self):

       print("Initiating shutdown sequence...")
       self.shutdown_event.set()  # Signal all threads to stop

       for thread in self.threads:
           thread.join(timeout=5.0)  # Wait up to 5 seconds for each thread
           if thread.is_alive():
               print(f"Warning: Thread {thread.name} did not shutdown cleanly")

       print("Shutdown complete")
if __name__ == "__main__":
   rag_server = RAGServer()
   rag_server.run()
登入後複製
此程式碼實作了一個簡單的用戶端,透過 KubeMQ 的佇列系統將影片 URL 傳送到 RAG 伺服器。具體來說,它會建立一個連接到 KubeMQ 的 SourceClient 類,並將訊息傳送到「rag-sources-queue」通道,該通道與 RAG 伺服器監控的佇列相同。當作為主程式運行時,它會發送一個爛番茄電影 URL 清單(包括《駭客任務》電影、《疾速追殺》和《生死時速》),由 RAG 伺服器處理並添加到知識圖譜中。

發送和接收問題和解答

# sources_client.py

from kubemq.queues import *

class SourceClient:
   def __init__(self, address="localhost:50000"):
       self.client = Client(address=address)

   def send_source(self, message: str) :
       send_result = self.client.send_queues_message(
           QueueMessage(
               channel="rag-sources-queue",
               body=message.encode("utf-8"),
           )
       )
       if send_result.is_error:
           print(f"message send error, error:{send_result.error}")

if __name__ == "__main__":
   client = SourceClient()
   urls = ["https://www.rottentomatoes.com/m/side_by_side_2012",
       "https://www.rottentomatoes.com/m/matrix",
       "https://www.rottentomatoes.com/m/matrix_revolutions",
       "https://www.rottentomatoes.com/m/matrix_reloaded",
       "https://www.rottentomatoes.com/m/speed_1994",
       "https://www.rottentomatoes.com/m/john_wick_chapter_4"]
   for url in urls:
       client.send_source(url)
   print("done")
登入後複製
此程式碼實作了一個聊天用戶端,透過 KubeMQ 的查詢系統與 RAG 伺服器進行通訊。 ChatClient 類別將訊息傳送到「rag-chat-query」通道並等待回應,每個查詢都有 30 秒的逾時時間。當作為主程式運行時,它會透過發送兩個有關《駭客任務》導演及其與基努·裡維斯的聯繫的相關問題來演示客戶端的功能,並在收到問題時列印每個回應。

程式碼庫

所有程式碼範例都可以在我的原始 GitHub 儲存庫的分支中找到。

結論

將 KubeMQ 整合到 GenAI 應用程式的 RAG 管道中,為處理連續資料流和複雜的進程間通訊提供了可擴展、可靠且高效的機制。透過充當具有多種訊息傳遞模式的統一路由器,KubeMQ 簡化了整體架構,減少了對自訂路由解決方案的需求,並加快了開發週期。

此外,合併 FalkorDB 透過提供與 KubeMQ 無縫整合的高效能知識庫來增強資料管理。這種組合可確保優化資料檢索和存儲,支援 RAG 流程的動態要求。

處理高吞吐量場景的能力,與持久性和容錯等功能相結合,確保 GenAI 應用程式即使在重負載或面臨系統中斷的情況下也能保持響應能力和可靠性。

透過利用 KubeMQ 和 FalkorDB,組織可以專注於增強其 AI 模型並提供有價值的見解和服務,並確信其資料路由基礎設施強大且能夠滿足現代 AI 工作流程的需求。

以上是使用 KubeMQ 增強 GenAI 應用程式:有效擴展檢索增強生成 (RAG)的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

熱門話題

Java教學
1664
14
CakePHP 教程
1421
52
Laravel 教程
1315
25
PHP教程
1266
29
C# 教程
1239
24
Python vs.C:申請和用例 Python vs.C:申請和用例 Apr 12, 2025 am 12:01 AM

Python适合数据科学、Web开发和自动化任务,而C 适用于系统编程、游戏开发和嵌入式系统。Python以简洁和强大的生态系统著称,C 则以高性能和底层控制能力闻名。

2小時的Python計劃:一種現實的方法 2小時的Python計劃:一種現實的方法 Apr 11, 2025 am 12:04 AM

2小時內可以學會Python的基本編程概念和技能。 1.學習變量和數據類型,2.掌握控制流(條件語句和循環),3.理解函數的定義和使用,4.通過簡單示例和代碼片段快速上手Python編程。

Python:遊戲,Guis等 Python:遊戲,Guis等 Apr 13, 2025 am 12:14 AM

Python在遊戲和GUI開發中表現出色。 1)遊戲開發使用Pygame,提供繪圖、音頻等功能,適合創建2D遊戲。 2)GUI開發可選擇Tkinter或PyQt,Tkinter簡單易用,PyQt功能豐富,適合專業開發。

您可以在2小時內學到多少python? 您可以在2小時內學到多少python? Apr 09, 2025 pm 04:33 PM

兩小時內可以學到Python的基礎知識。 1.學習變量和數據類型,2.掌握控制結構如if語句和循環,3.了解函數的定義和使用。這些將幫助你開始編寫簡單的Python程序。

Python與C:學習曲線和易用性 Python與C:學習曲線和易用性 Apr 19, 2025 am 12:20 AM

Python更易學且易用,C 則更強大但複雜。 1.Python語法簡潔,適合初學者,動態類型和自動內存管理使其易用,但可能導致運行時錯誤。 2.C 提供低級控制和高級特性,適合高性能應用,但學習門檻高,需手動管理內存和類型安全。

Python和時間:充分利用您的學習時間 Python和時間:充分利用您的學習時間 Apr 14, 2025 am 12:02 AM

要在有限的時間內最大化學習Python的效率,可以使用Python的datetime、time和schedule模塊。 1.datetime模塊用於記錄和規劃學習時間。 2.time模塊幫助設置學習和休息時間。 3.schedule模塊自動化安排每週學習任務。

Python:探索其主要應用程序 Python:探索其主要應用程序 Apr 10, 2025 am 09:41 AM

Python在web開發、數據科學、機器學習、自動化和腳本編寫等領域有廣泛應用。 1)在web開發中,Django和Flask框架簡化了開發過程。 2)數據科學和機器學習領域,NumPy、Pandas、Scikit-learn和TensorFlow庫提供了強大支持。 3)自動化和腳本編寫方面,Python適用於自動化測試和系統管理等任務。

Python:自動化,腳本和任務管理 Python:自動化,腳本和任務管理 Apr 16, 2025 am 12:14 AM

Python在自動化、腳本編寫和任務管理中表現出色。 1)自動化:通過標準庫如os、shutil實現文件備份。 2)腳本編寫:使用psutil庫監控系統資源。 3)任務管理:利用schedule庫調度任務。 Python的易用性和豐富庫支持使其在這些領域中成為首選工具。

See all articles