目录
>步骤6:功能工程
该管道负责连续部署训练有素的模型。它首先从
结论
Q2。 mlflow用于什么? MLFlow通过提供用于跟踪实验,版本控制模型和部署的工具来使机器学习开发变得更加容易。
首页 科技周边 人工智能 使用MLOPS的比特币价格预测

使用MLOPS的比特币价格预测

Mar 09, 2025 am 11:53 AM

>对比特币或其价格波动了解不多,而是想做出投资决定来获利吗?该机器学习模型有您的支持。它可以比占星家更好地预测价格。在本文中,我们将使用ZenML和MLFlow构建一个用于预测和预测比特币价格的ML模型。因此,让我们开始我们的旅程,了解任何人如何使用ML和MLOPS工具来预测未来。

学习目标

    学会有效地使用API​​获取实时数据。
  • >
  • 了解Zenml是什么,为什么我们使用MLFLOW以及如何将其与Zenml集成。
  • >探索从想法到生产的机器学习模型的部署过程。
  • >
  • >发现如何为交互式机器学习模型预测创建用户友好的简化应用程序。
  • >

>本文是> > data Science Blogathon的一部分。 内容表>

问题语句
  • >项目实现
  • 步骤1:访问API
    • 步骤2:使用mongodb
  • 5:数据清洁
  • 步骤6:特征工程
  • 步骤7:数据拆分
  • 步骤8:型号培训
  • 步骤9:型号评估
  • 步骤10:模型部署 问题
  • 问题语句
比特币价格高度波动,而做出预测几乎是不可能的。在我们的项目中,我们正在使用Mlops的最佳实践构建LSTM模型来预测比特币和趋势。
  • > 在实施项目之前,让我们看一下项目体系结构。
  • >

    >项目实施

    让我们从访问API开始。

    我们为什么要这样做?您可以从不同的数据集中获取历史比特币价格数据,但是使用API​​,我们可以访问Live Market Data。 使用MLOPS的比特币价格预测>步骤1:访问API

    >

    >注册API访问:

    >一旦您在theccdata api页面上注册后。您可以从此pagehttps://developers.cryptocompare.com/documentation/data-api/index_cc_v1_historical_days

    获得免费的API密钥

    提取比特币价格数据:
    • 在以下代码的情况下,您可以从CCDATA API获取比特币价格数据,并将其转换为PANDAS DataFrame。另外,将API密钥保留在.env文件中。

    • 步骤2:使用mongodb
    • 连接到数据库 MongoDB是一个NOSQL数据库,以其适应性,可扩展性和以JSON式格式存储非结构化数据的能力。
      import requests
      import pandas as pd
      from dotenv import load_dotenv
      import os
      
      # Load the .env file
      load_dotenv()
      
      def fetch_crypto_data(api_uri):
          response = requests.get(
              api_uri,
              params={
                  "market": "cadli",
                  "instrument": "BTC-USD",
                  "limit": 5000,
                  "aggregate": 1,
                  "fill": "true",
                  "apply_mapping": "true",
                  "response_format": "JSON"
              },
              headers={"Content-type": "application/json; charset=UTF-8"}
          )
      
          if response.status_code == 200:
              print('API Connection Successful! \nFetching the data...')
      
              data = response.json()
              data_list = data.get('Data', [])
      
              df = pd.DataFrame(data_list)
      
              df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')
      
              return df  # Return the DataFrame
          else:
              raise Exception(f"API Error: {response.status_code} - {response.text}")
      登录后复制
      登录后复制
      登录后复制

      此代码连接到MongoDB,通过API检索比特币价格数据,并在最新记录日期后使用所有新条目更新数据库。

      介绍Zenml

      > Zenmlis是一个针对机器学习操作量身定制的开源平台,支持了灵活和生产就绪的管道的创建。此外,Zenml与多个机器学习工具集成了LokeMlFlow,Bentoml等,以创建无缝的ML Pipelines。

      ⚠️如果您是Windows用户,请尝试在系统上安装WSL。 Zenml不支持Windows。

      在此项目中,我们将实施使用Zenml的传统管道,并将MLFlow与Zenml集成进行实验跟踪。>

      >前提条件和基本zenml命令

      >

      python 3.12或更高:

      您可以从这里获得:https://www.python.org/downloads/
      • 激活您的虚拟环境:
      • >
      > zenml命令:
      import os
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from data.management.api import fetch_crypto_data  # Import the API function
      import pandas as pd
      
      load_dotenv()
      
      MONGO_URI = os.getenv("MONGO_URI")
      API_URI = os.getenv("API_URI")
      
      client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
      db = client['crypto_data']
      collection = db['historical_data']
      
      try:
          latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
          if latest_entry:
              last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
          else:
              last_date = '2011-03-27'  # Default start date if MongoDB is empty
      
          print(f"Fetching data starting from {last_date}...")
          new_data_df = fetch_crypto_data(API_URI)
      
          if latest_entry:
              new_data_df = new_data_df[new_data_df['DATE'] > last_date]
      
          if not new_data_df.empty:
              data_to_insert = new_data_df.to_dict(orient='records')
              result = collection.insert_many(data_to_insert)
              print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
          else:
              print("No new data to insert.")
      except Exception as e:
          print(f"An error occurred: {e}")
      登录后复制
      登录后复制
      登录后复制
      1. >所有核心Zenml命令及其功能如下:
      2. 步骤3:MLFLOW与Zenml
      的集成

      我们正在使用MLFlow进行实验跟踪,以跟踪我们的模型,工件,指标和超参数值。我们正在注册MLFLOW以进行实验跟踪和模型部署者:

      #create a virtual environment
      python3 -m venv venv
      
      #Activate your virtual environmnent in your project folder
      source venv/bin/activate
      登录后复制
      登录后复制
      登录后复制
      > zenml堆栈列表

      #Install zenml
      pip install zenml
      
      #To Launch zenml server and dashboard locally
      pip install "zenml[server]"
      
      #To check the zenml Version:
      zenml version
      
      #To initiate a new repository
      zenml init
      
      #To run the dashboard locally:
      zenml login --local
      
      #To know the status of our zenml Pipelines
      zenml show
      
      #To shutdown the zenml server
      zenml clean
      登录后复制
      登录后复制
      登录后复制
      项目结构

      >在这里,您可以看到项目的布局。现在让我们详细讨论它。

      > 使用MLOPS的比特币价格预测

      步骤4:数据摄入

      我们首先将数据从API摄取到MongoDB,然后将其转换为PANDAS DATAFRAME。

      #Integrating mlflow with ZenML
      zenml integration install mlflow -y
      
      #Register the experiment tracker
      zenml experiment-tracker register mlflow_tracker --flavor=mlflow
      
      #Registering the model deployer
      zenml model-deployer register mlflow --flavor=mlflow
      
      #Registering the stack
      zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set
      
      #To view the stack list
      zenml stack --list
      登录后复制
      登录后复制
      登录后复制

      我们将作为装饰器添加到

      > ingest_data()

      函数中,以将其声明为我们训练管道的一步。以同样的方式,我们将在项目体系结构中为每个步骤编写代码并创建管道。
      bitcoin_price_prediction_mlops/        # Project directory
      ├── data/                             
      │   └── management/                   
      │       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
      │       └── api.py                     # API-related utility functions
      │
      ├── pipelines/                         
      │   ├── deployment_pipeline.py         # Deployment pipeline
      │   └── training_pipeline.py           # Training pipeline
      │
      ├── saved_models/                      # Directory for storing trained models
      ├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
      │
      ├── src/                               # Source code
      │   ├── data_cleaning.py               # Data cleaning and preprocessing
      │   ├── data_ingestion.py              # Data ingestion 
      │   ├── data_splitter.py               # Data splitting 
      │   ├── feature_engineering.py         # Feature engineering 
      │   ├── model_evaluation.py            # Model evaluation
      │   └── model_training.py              # Model training
      │
      ├── steps/                             # ZenML steps
      │   ├── clean_data.py                  # ZenML step for cleaning data
      │   ├── data_splitter.py               # ZenML step for data splitting
      │   ├── dynamic_importer.py            # ZenML step for importing dynamic data
      │   ├── feature_engineering.py         # ZenML step for feature engineering
      │   ├── ingest_data.py                 # ZenML step for data ingestion
      │   ├── model_evaluation.py            # ZenML step for model evaluation
      │   ├── model_training.py              # ZenML step for training the model
      │   ├── prediction_service_loader.py   # ZenML step for loading prediction services
      │   ├── predictor.py                   # ZenML step for prediction
      │   └── utils.py                       # Utility functions for steps
      │
      ├── .env                               # Environment variables file
      ├── .gitignore                         # Git ignore file
      │
      ├── app.py                             # Streamlit user interface app
      │
      ├── README.md                          # Project documentation
      ├── requirements.txt                   # List of required packages
      ├── run_deployment.py                  # Code for running deployment and prediction pipeline
      ├── run_pipeline.py                    # Code for running training pipeline
      └── .zen/                              # ZenML directory (created automatically after ZenML initialization)
      登录后复制
      登录后复制
      登录后复制
      >要查看我如何使用

      @Step 装饰器,请查看下面的github链接(步骤文件夹)以浏览管道其他步骤的代码,即数据清洁,功能工程,数据拆分,模型培训和模型评估。>>>>>>>>。 步骤5:数据清洁 在此步骤中,我们将创建清洁摄入数据的不同策略。我们将在数据中删除不需要的列和缺失值。>

      import requests
      import pandas as pd
      from dotenv import load_dotenv
      import os
      
      # Load the .env file
      load_dotenv()
      
      def fetch_crypto_data(api_uri):
          response = requests.get(
              api_uri,
              params={
                  "market": "cadli",
                  "instrument": "BTC-USD",
                  "limit": 5000,
                  "aggregate": 1,
                  "fill": "true",
                  "apply_mapping": "true",
                  "response_format": "JSON"
              },
              headers={"Content-type": "application/json; charset=UTF-8"}
          )
      
          if response.status_code == 200:
              print('API Connection Successful! \nFetching the data...')
      
              data = response.json()
              data_list = data.get('Data', [])
      
              df = pd.DataFrame(data_list)
      
              df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')
      
              return df  # Return the DataFrame
          else:
              raise Exception(f"API Error: {response.status_code} - {response.text}")
      登录后复制
      登录后复制
      登录后复制

      >步骤6:功能工程

      >此步骤从较早的data_cleaning步骤中获取已清洁的数据。我们正在创建新功能,例如简单的移动平均值(SMA),指数移动平均值(EMA)以及滞后和滚动统计数据,以捕获趋势,减少噪声并从时间序列数据中做出更可靠的预测。此外,我们使用MinMax缩放缩放特征和目标变量。>

      import os
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from data.management.api import fetch_crypto_data  # Import the API function
      import pandas as pd
      
      load_dotenv()
      
      MONGO_URI = os.getenv("MONGO_URI")
      API_URI = os.getenv("API_URI")
      
      client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
      db = client['crypto_data']
      collection = db['historical_data']
      
      try:
          latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
          if latest_entry:
              last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
          else:
              last_date = '2011-03-27'  # Default start date if MongoDB is empty
      
          print(f"Fetching data starting from {last_date}...")
          new_data_df = fetch_crypto_data(API_URI)
      
          if latest_entry:
              new_data_df = new_data_df[new_data_df['DATE'] > last_date]
      
          if not new_data_df.empty:
              data_to_insert = new_data_df.to_dict(orient='records')
              result = collection.insert_many(data_to_insert)
              print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
          else:
              print("No new data to insert.")
      except Exception as e:
          print(f"An error occurred: {e}")
      登录后复制
      登录后复制
      登录后复制
      步骤7:数据拆分

      >现在,我们将处理的数据分为培训和测试数据集的比例为80:20。

      #create a virtual environment
      python3 -m venv venv
      
      #Activate your virtual environmnent in your project folder
      source venv/bin/activate
      登录后复制
      登录后复制
      登录后复制
      步骤8:模型培训

      在此步骤中,我们以早期停止训练Thelstm模型以防止过度拟合,并使用MLFlow的自动日志记录来跟踪我们的模型和实验,并将受过训练的模型保存为 >步骤9:模型评估
      #Install zenml
      pip install zenml
      
      #To Launch zenml server and dashboard locally
      pip install "zenml[server]"
      
      #To check the zenml Version:
      zenml version
      
      #To initiate a new repository
      zenml init
      
      #To run the dashboard locally:
      zenml login --local
      
      #To know the status of our zenml Pipelines
      zenml show
      
      #To shutdown the zenml server
      zenml clean
      登录后复制
      登录后复制
      登录后复制
      这是一个回归问题,我们正在使用评估指标,例如均方误差(MSE),均方根误差(MSE),均值绝对误差(MAE)和R-squared。

      现在,我们将在管道中组织上述所有步骤。让我们创建一个新的文件triench_pipeline.py。

      >

      >在这里,
      #Integrating mlflow with ZenML
      zenml integration install mlflow -y
      
      #Register the experiment tracker
      zenml experiment-tracker register mlflow_tracker --flavor=mlflow
      
      #Registering the model deployer
      zenml model-deployer register mlflow --flavor=mlflow
      
      #Registering the stack
      zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set
      
      #To view the stack list
      zenml stack --list
      登录后复制
      登录后复制
      登录后复制
      @pipeline

      decorator用于将functionml_pipeline()定义为zenml中的管道。

      要查看训练管道的仪表板,只需运行run_pipeline.py脚本即可。让我们创建一个run_pipeline.py文件。
      bitcoin_price_prediction_mlops/        # Project directory
      ├── data/                             
      │   └── management/                   
      │       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
      │       └── api.py                     # API-related utility functions
      │
      ├── pipelines/                         
      │   ├── deployment_pipeline.py         # Deployment pipeline
      │   └── training_pipeline.py           # Training pipeline
      │
      ├── saved_models/                      # Directory for storing trained models
      ├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
      │
      ├── src/                               # Source code
      │   ├── data_cleaning.py               # Data cleaning and preprocessing
      │   ├── data_ingestion.py              # Data ingestion 
      │   ├── data_splitter.py               # Data splitting 
      │   ├── feature_engineering.py         # Feature engineering 
      │   ├── model_evaluation.py            # Model evaluation
      │   └── model_training.py              # Model training
      │
      ├── steps/                             # ZenML steps
      │   ├── clean_data.py                  # ZenML step for cleaning data
      │   ├── data_splitter.py               # ZenML step for data splitting
      │   ├── dynamic_importer.py            # ZenML step for importing dynamic data
      │   ├── feature_engineering.py         # ZenML step for feature engineering
      │   ├── ingest_data.py                 # ZenML step for data ingestion
      │   ├── model_evaluation.py            # ZenML step for model evaluation
      │   ├── model_training.py              # ZenML step for training the model
      │   ├── prediction_service_loader.py   # ZenML step for loading prediction services
      │   ├── predictor.py                   # ZenML step for prediction
      │   └── utils.py                       # Utility functions for steps
      │
      ├── .env                               # Environment variables file
      ├── .gitignore                         # Git ignore file
      │
      ├── app.py                             # Streamlit user interface app
      │
      ├── README.md                          # Project documentation
      ├── requirements.txt                   # List of required packages
      ├── run_deployment.py                  # Code for running deployment and prediction pipeline
      ├── run_pipeline.py                    # Code for running training pipeline
      └── .zen/                              # ZenML directory (created automatically after ZenML initialization)
      登录后复制
      登录后复制
      登录后复制
      >

      >现在我们已经完成了创建管道。在下面运行命令以查看管道仪表板。

      在运行上述命令后,它将返回跟踪仪表板URL,看起来像这样。

      import os
      import logging
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from zenml import step
      import pandas as pd
      
      # Load the .env file
      load_dotenv()
      
      # Get MongoDB URI from environment variables
      MONGO_URI = os.getenv("MONGO_URI")
      
      def fetch_data_from_mongodb(collection_name:str, database_name:str):
          """
          Fetches data from MongoDB and converts it into a pandas DataFrame.
      
          collection_name: 
              Name of the MongoDB collection to fetch data.
          database_name: 
              Name of the MongoDB database.
          return: 
              A pandas DataFrame containing the data
          """
          # Connect to the MongoDB client
          client = MongoClient(MONGO_URI)
          db = client[database_name]  # Select the database
          collection = db[collection_name]  # Select the collection
      
          # Fetch all documents from the collection
          try:
              logging.info(f"Fetching data from MongoDB collection: {collection_name}...")
              data = list(collection.find())  # Convert cursor to a list of dictionaries
      
              if not data:
                  logging.info("No data found in the MongoDB collection.")
                  
      
              # Convert the list of dictionaries into a pandas DataFrame
              df = pd.DataFrame(data)
      
              # Drop the MongoDB ObjectId field if it exists (optional)
              if '_id' in df.columns:
                  df = df.drop(columns=['_id'])
      
              logging.info("Data successfully fetched and converted to a DataFrame!")
              return df
      
          except Exception as e:
              logging.error(f"An error occurred while fetching data: {e}")
              raise e  
              
              
      @step(enable_cache=False)
      def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame:
          
          logging.info("Started data ingestion process from MongoDB.")
      
          try:
              # Use the fetch_data_from_mongodb function to fetch data
              df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name)
      
              if df.empty:
                  logging.warning("No data was loaded. Check the collection name or the database content.")
              else:
                  logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.")
      
              return df
          
          except Exception as e:
              logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}")
              raise e  
      登录后复制
      登录后复制

      class DataPreprocessor:
          def __init__(self, data: pd.DataFrame):
              
              self.data = data
              logging.info("DataPreprocessor initialized with data of shape: %s", data.shape)
      
          def clean_data(self) -> pd.DataFrame:
              """
              Performs data cleaning by removing unnecessary columns, dropping columns with missing values,
              and returning the cleaned DataFrame.
      
              Returns:
                  pd.DataFrame: The cleaned DataFrame with unnecessary and missing-value columns removed.
              """
              logging.info("Starting data cleaning process.")
      
              # Drop unnecessary columns, including '_id' if it exists
              columns_to_drop = [
                  'UNIT', 'TYPE', 'MARKET', 'INSTRUMENT', 
                  'FIRST_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_TIMESTAMP', 
                  'FIRST_MESSAGE_VALUE', 'HIGH_MESSAGE_VALUE', 'HIGH_MESSAGE_TIMESTAMP', 
                  'LOW_MESSAGE_VALUE', 'LOW_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_VALUE', 
                  'TOTAL_INDEX_UPDATES', 'VOLUME_TOP_TIER', 'QUOTE_VOLUME_TOP_TIER', 
                  'VOLUME_DIRECT', 'QUOTE_VOLUME_DIRECT', 'VOLUME_TOP_TIER_DIRECT', 
                  'QUOTE_VOLUME_TOP_TIER_DIRECT', '_id'  # Adding '_id' to the list
              ]
              logging.info("Dropping columns: %s")
              self.data = self.drop_columns(self.data, columns_to_drop)
      
              # Drop columns where the number of missing values is greater than 0
              logging.info("Dropping columns with missing values.")
              self.data = self.drop_columns_with_missing_values(self.data)
      
              logging.info("Data cleaning completed. Data shape after cleaning: %s", self.data.shape)
              return self.data
      
          def drop_columns(self, data: pd.DataFrame, columns: list) -> pd.DataFrame:
              """
              Drops specified columns from the DataFrame.
      
              Returns:
                  pd.DataFrame: The DataFrame with the specified columns removed.
              """
              logging.info("Dropping columns: %s", columns)
              return data.drop(columns=columns, errors='ignore')
      
          def drop_columns_with_missing_values(self, data: pd.DataFrame) -> pd.DataFrame:
              """
              Drops columns with any missing values from the DataFrame.
      
              Parameters:
                  data: pd.DataFrame
                      The DataFrame from which columns with missing values will be removed.
              
              Returns:
                  pd.DataFrame: The DataFrame with columns containing missing values removed.
              """
              missing_columns = data.columns[data.isnull().sum() > 0]
              if not missing_columns.empty:
                  logging.info("Columns with missing values: %s", missing_columns.tolist())
              else:
                  logging.info("No columns with missing values found.")
              return data.loc[:, data.isnull().sum() == 0]
      登录后复制
      培训管道在仪表板上看起来像这样,给出了以下内容:

      >

      使用MLOPS的比特币价格预测

      使用MLOPS的比特币价格预测

      使用MLOPS的比特币价格预测

      步骤10:模型部署使用MLOPS的比特币价格预测 到目前为止,我们已经构建了模型和管道。现在,让我们将管道推入用户可以做出预测的生产中。

      >

      连续部署管道使用MLOPS的比特币价格预测

      该管道负责连续部署训练有素的模型。它首先从

      > triagn_pipeline.py.py

      文件训练模型,然后使用

      mlflow模型exployer

      来部署训练有素的模型,使用推理管道

      >我们使用推理管道使用已部署的模型对新数据进行预测。让我们看一下我们如何在项目中实施该管道的方式。

      >
      import requests
      import pandas as pd
      from dotenv import load_dotenv
      import os
      
      # Load the .env file
      load_dotenv()
      
      def fetch_crypto_data(api_uri):
          response = requests.get(
              api_uri,
              params={
                  "market": "cadli",
                  "instrument": "BTC-USD",
                  "limit": 5000,
                  "aggregate": 1,
                  "fill": "true",
                  "apply_mapping": "true",
                  "response_format": "JSON"
              },
              headers={"Content-type": "application/json; charset=UTF-8"}
          )
      
          if response.status_code == 200:
              print('API Connection Successful! \nFetching the data...')
      
              data = response.json()
              data_list = data.get('Data', [])
      
              df = pd.DataFrame(data_list)
      
              df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')
      
              return df  # Return the DataFrame
          else:
              raise Exception(f"API Error: {response.status_code} - {response.text}")
      登录后复制
      登录后复制
      登录后复制

      >让我们查看下面的推理管道中调用的每个功能:>

      dynamic_importer()

      此功能加载新数据,执行数据处理并返回数据。

      import os
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from data.management.api import fetch_crypto_data  # Import the API function
      import pandas as pd
      
      load_dotenv()
      
      MONGO_URI = os.getenv("MONGO_URI")
      API_URI = os.getenv("API_URI")
      
      client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None)
      db = client['crypto_data']
      collection = db['historical_data']
      
      try:
          latest_entry = collection.find_one(sort=[("DATE", -1)])  # Find the latest date
          if latest_entry:
              last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d')
          else:
              last_date = '2011-03-27'  # Default start date if MongoDB is empty
      
          print(f"Fetching data starting from {last_date}...")
          new_data_df = fetch_crypto_data(API_URI)
      
          if latest_entry:
              new_data_df = new_data_df[new_data_df['DATE'] > last_date]
      
          if not new_data_df.empty:
              data_to_insert = new_data_df.to_dict(orient='records')
              result = collection.insert_many(data_to_insert)
              print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.")
          else:
              print("No new data to insert.")
      except Exception as e:
          print(f"An error occurred: {e}")
      登录后复制
      登录后复制
      登录后复制
      prediction_service_loader()

      >此功能以

      @Step

      装饰。我们加载了基于pipeline_name的部署服务W.R.T和step_name,我们的部署模型可以处理新数据的预测查询。 > line 现有_services = mlflow_model_deployer_component.find_model_server()

      >

      >基于诸如pipeline名称和管道名称的给定参数,搜索可用的部署服务。如果没有可用的服务,则表明部署管道要么没有进行或遇到部署管道问题,因此它会抛出RuntimeError。

      preditionor()
      #create a virtual environment
      python3 -m venv venv
      
      #Activate your virtual environmnent in your project folder
      source venv/bin/activate
      登录后复制
      登录后复制
      登录后复制

      该函数通过MLFlowDeploymentservice和新数据采用MLFlow部署模型。进一步处理数据以匹配模型的预期格式以进行实时推断。 为了可视化连续部署和推理管道,我们需要运行run_deployment.py脚本,在此将定义部署和预测配置。 (请在下面给出的github中检查run_deployment.py代码)。>

      现在,让我们运行run_deployment.py文件以查看连续部署管道和推理管道的仪表板。

      #Install zenml
      pip install zenml
      
      #To Launch zenml server and dashboard locally
      pip install "zenml[server]"
      
      #To check the zenml Version:
      zenml version
      
      #To initiate a new repository
      zenml init
      
      #To run the dashboard locally:
      zenml login --local
      
      #To know the status of our zenml Pipelines
      zenml show
      
      #To shutdown the zenml server
      zenml clean
      登录后复制
      登录后复制
      登录后复制

      连续部署管道 - 输出

      #Integrating mlflow with ZenML
      zenml integration install mlflow -y
      
      #Register the experiment tracker
      zenml experiment-tracker register mlflow_tracker --flavor=mlflow
      
      #Registering the model deployer
      zenml model-deployer register mlflow --flavor=mlflow
      
      #Registering the stack
      zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set
      
      #To view the stack list
      zenml stack --list
      登录后复制
      登录后复制
      登录后复制

      bitcoin_price_prediction_mlops/        # Project directory
      ├── data/                             
      │   └── management/                   
      │       ├── api_to_mongodb.py          # Code to fetch data and save it to MongoDB
      │       └── api.py                     # API-related utility functions
      │
      ├── pipelines/                         
      │   ├── deployment_pipeline.py         # Deployment pipeline
      │   └── training_pipeline.py           # Training pipeline
      │
      ├── saved_models/                      # Directory for storing trained models
      ├── saved_scalers/                     # Directory for storing scalers used in data preprocessing
      │
      ├── src/                               # Source code
      │   ├── data_cleaning.py               # Data cleaning and preprocessing
      │   ├── data_ingestion.py              # Data ingestion 
      │   ├── data_splitter.py               # Data splitting 
      │   ├── feature_engineering.py         # Feature engineering 
      │   ├── model_evaluation.py            # Model evaluation
      │   └── model_training.py              # Model training
      │
      ├── steps/                             # ZenML steps
      │   ├── clean_data.py                  # ZenML step for cleaning data
      │   ├── data_splitter.py               # ZenML step for data splitting
      │   ├── dynamic_importer.py            # ZenML step for importing dynamic data
      │   ├── feature_engineering.py         # ZenML step for feature engineering
      │   ├── ingest_data.py                 # ZenML step for data ingestion
      │   ├── model_evaluation.py            # ZenML step for model evaluation
      │   ├── model_training.py              # ZenML step for training the model
      │   ├── prediction_service_loader.py   # ZenML step for loading prediction services
      │   ├── predictor.py                   # ZenML step for prediction
      │   └── utils.py                       # Utility functions for steps
      │
      ├── .env                               # Environment variables file
      ├── .gitignore                         # Git ignore file
      │
      ├── app.py                             # Streamlit user interface app
      │
      ├── README.md                          # Project documentation
      ├── requirements.txt                   # List of required packages
      ├── run_deployment.py                  # Code for running deployment and prediction pipeline
      ├── run_pipeline.py                    # Code for running training pipeline
      └── .zen/                              # ZenML directory (created automatically after ZenML initialization)
      登录后复制
      登录后复制
      登录后复制
      >推理管道 - 输出

      使用MLOPS的比特币价格预测

      运行run_deployment.py文件后,您可以看到看起来像这样的mlflow仪表板链接。

      现在,您需要在命令行中复制并粘贴上述MLFLOW UI链接并运行它。使用MLOPS的比特币价格预测

      这是MLFlow仪表板,您可以在其中看到评估指标和模型参数:

      import os
      import logging
      from pymongo import MongoClient
      from dotenv import load_dotenv
      from zenml import step
      import pandas as pd
      
      # Load the .env file
      load_dotenv()
      
      # Get MongoDB URI from environment variables
      MONGO_URI = os.getenv("MONGO_URI")
      
      def fetch_data_from_mongodb(collection_name:str, database_name:str):
          """
          Fetches data from MongoDB and converts it into a pandas DataFrame.
      
          collection_name: 
              Name of the MongoDB collection to fetch data.
          database_name: 
              Name of the MongoDB database.
          return: 
              A pandas DataFrame containing the data
          """
          # Connect to the MongoDB client
          client = MongoClient(MONGO_URI)
          db = client[database_name]  # Select the database
          collection = db[collection_name]  # Select the collection
      
          # Fetch all documents from the collection
          try:
              logging.info(f"Fetching data from MongoDB collection: {collection_name}...")
              data = list(collection.find())  # Convert cursor to a list of dictionaries
      
              if not data:
                  logging.info("No data found in the MongoDB collection.")
                  
      
              # Convert the list of dictionaries into a pandas DataFrame
              df = pd.DataFrame(data)
      
              # Drop the MongoDB ObjectId field if it exists (optional)
              if '_id' in df.columns:
                  df = df.drop(columns=['_id'])
      
              logging.info("Data successfully fetched and converted to a DataFrame!")
              return df
      
          except Exception as e:
              logging.error(f"An error occurred while fetching data: {e}")
              raise e  
              
              
      @step(enable_cache=False)
      def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame:
          
          logging.info("Started data ingestion process from MongoDB.")
      
          try:
              # Use the fetch_data_from_mongodb function to fetch data
              df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name)
      
              if df.empty:
                  logging.warning("No data was loaded. Check the collection name or the database content.")
              else:
                  logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.")
      
              return df
          
          except Exception as e:
              logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}")
              raise e  
      登录后复制
      登录后复制

      >步骤11:构建简易应用

      > Sparlit是一个令人惊叹的开源,基于Python的框架,用于创建Interactive UI,我们可以使用Sparlit快速构建Web应用程序,而无需知道后端或前端开发。首先,我们需要在系统上安装精简。

      使用MLOPS的比特币价格预测

      再次,您可以在github上找到“简化应用”的代码。

      使用MLOPS的比特币价格预测

      >这是对项目的GitHub代码和视频说明,以便您更好地理解。

      结论

      在本文中,我们成功地建立了一个端到端的,可提供生产的比特币价格预测MLOPS项目。从通过API获取数据并预处理数据到模型培训,评估和部署,我们的项目突出了MLOP在将开发与生产联系起来的关键作用。我们距离实时预测比特币价格的未来更近了一步。 API提供了对外部数据的平稳访问,例如CCDATA API的比特币价格数据,消除了对预先存在的数据集的需求。

      钥匙要点

      API启用对外部数据的无缝访问,例如来自CCDATA API的比特币价格数据,消除了对预先存在的数据集的需求。
        >
      • > zenml和mlflow是可靠的工具,可促进现实世界应用程序中机器学习模型的开发,跟踪和部署。
      • >我们遵循最佳实践,通过正确执行数据摄入,清洁,功能工程,模型培训和评估。
      • >连续部署和推理管道对于确保模型保持有效且在生产环境中可用。
      • 常见问题
      • > Q1。 Zenml可以免费使用吗?是的,Zenml是一个完全开源的MLOPS框架,它使从本地开发到生产管道的过渡与1行代码一样容易。

      Q2。 mlflow用于什么? MLFlow通过提供用于跟踪实验,版本控制模型和部署的工具来使机器学习开发变得更加容易。

      Q3。如何调试服务器守护程序没有运行错误?这是您将在项目中面临的常见错误。只需运行`Zenml注销–Local`然后`Zenml clean'',然后再运行管道。它将得到解决。

    以上是使用MLOPS的比特币价格预测的详细内容。更多信息请关注PHP中文网其他相关文章!

    本站声明
    本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

    热AI工具

    Undresser.AI Undress

    Undresser.AI Undress

    人工智能驱动的应用程序,用于创建逼真的裸体照片

    AI Clothes Remover

    AI Clothes Remover

    用于从照片中去除衣服的在线人工智能工具。

    Undress AI Tool

    Undress AI Tool

    免费脱衣服图片

    Clothoff.io

    Clothoff.io

    AI脱衣机

    Video Face Swap

    Video Face Swap

    使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

    热工具

    记事本++7.3.1

    记事本++7.3.1

    好用且免费的代码编辑器

    SublimeText3汉化版

    SublimeText3汉化版

    中文版,非常好用

    禅工作室 13.0.1

    禅工作室 13.0.1

    功能强大的PHP集成开发环境

    Dreamweaver CS6

    Dreamweaver CS6

    视觉化网页开发工具

    SublimeText3 Mac版

    SublimeText3 Mac版

    神级代码编辑软件(SublimeText3)

    最佳AI艺术生成器(免费付款)创意项目 最佳AI艺术生成器(免费付款)创意项目 Apr 02, 2025 pm 06:10 PM

    本文回顾了AI最高的艺术生成器,讨论了他们的功能,对创意项目的适用性和价值。它重点介绍了Midjourney是专业人士的最佳价值,并建议使用Dall-E 2进行高质量的可定制艺术。

    开始使用Meta Llama 3.2 -Analytics Vidhya 开始使用Meta Llama 3.2 -Analytics Vidhya Apr 11, 2025 pm 12:04 PM

    Meta的Llama 3.2:多模式和移动AI的飞跃 Meta最近公布了Llama 3.2,这是AI的重大进步,具有强大的视觉功能和针对移动设备优化的轻量级文本模型。 以成功为基础

    最佳AI聊天机器人比较(Chatgpt,Gemini,Claude&更多) 最佳AI聊天机器人比较(Chatgpt,Gemini,Claude&更多) Apr 02, 2025 pm 06:09 PM

    本文比较了诸如Chatgpt,Gemini和Claude之类的顶级AI聊天机器人,重点介绍了其独特功能,自定义选项以及自然语言处理和可靠性的性能。

    顶级AI写作助理来增强您的内容创建 顶级AI写作助理来增强您的内容创建 Apr 02, 2025 pm 06:11 PM

    文章讨论了Grammarly,Jasper,Copy.ai,Writesonic和Rytr等AI最高的写作助手,重点介绍了其独特的内容创建功能。它认为Jasper在SEO优化方面表现出色,而AI工具有助于保持音调的组成

    AV字节:Meta' llama 3.2,Google的双子座1.5等 AV字节:Meta' llama 3.2,Google的双子座1.5等 Apr 11, 2025 pm 12:01 PM

    本周的AI景观:进步,道德考虑和监管辩论的旋风。 OpenAI,Google,Meta和Microsoft等主要参与者已经释放了一系列更新,从开创性的新车型到LE的关键转变

    向员工出售AI策略:Shopify首席执行官的宣言 向员工出售AI策略:Shopify首席执行官的宣言 Apr 10, 2025 am 11:19 AM

    Shopify首席执行官TobiLütke最近的备忘录大胆地宣布AI对每位员工的基本期望是公司内部的重大文化转变。 这不是短暂的趋势。这是整合到P中的新操作范式

    构建AI代理的前7个代理抹布系统 构建AI代理的前7个代理抹布系统 Mar 31, 2025 pm 04:25 PM

    2024年见证了从简单地使用LLM进行内容生成的转变,转变为了解其内部工作。 这种探索导致了AI代理的发现 - 自主系统处理任务和最少人工干预的决策。 Buildin

    10个生成AI编码扩展,在VS代码中,您必须探索 10个生成AI编码扩展,在VS代码中,您必须探索 Apr 13, 2025 am 01:14 AM

    嘿,编码忍者!您当天计划哪些与编码有关的任务?在您进一步研究此博客之前,我希望您考虑所有与编码相关的困境,这是将其列出的。 完毕? - 让&#8217

    See all articles