Ramalan Harga Bitcoin Menggunakan MLOPS
Tidak tahu banyak tentang Bitcoin atau turun naik harga tetapi ingin membuat keputusan pelaburan untuk membuat keuntungan? Model pembelajaran mesin ini mempunyai belakang anda. Ia boleh meramalkan harga lebih baik daripada ahli astrologi. Dalam artikel ini, kami akan membina model ML untuk meramalkan dan meramalkan harga bitcoin, menggunakan ZenML dan MLFlow. Oleh itu, mari kita mulakan perjalanan kami untuk memahami bagaimana orang boleh menggunakan alat ML dan MLOPS untuk meramalkan masa depan.
Objektif Pembelajaran
- belajar untuk mengambil data langsung menggunakan API dengan cekap.
- fahami apa zenml, mengapa kita menggunakan mlflow, dan bagaimana anda boleh mengintegrasikannya dengan zenml.
- meneroka proses penempatan untuk model pembelajaran mesin, dari idea ke pengeluaran.
- Ketahui cara membuat aplikasi Streamlit mesra pengguna untuk ramalan model pembelajaran mesin interaktif.
Jadual Kandungan
Langkah 8: Latihan Model
- Pernyataan Masalah
- Harga bitcoin sangat tidak menentu, dan membuat ramalan adalah mustahil. Dalam projek kami, kami menggunakan Praktikesto Terbaik TMLOPS membina model LSTM untuk meramalkan Bitcoinprices dan Trend.
- Sebelum melaksanakan projek, mari kita lihat seni bina projek.
- Pelaksanaan Projek
- mari kita mulakan dengan mengakses API.
- Mengapa kita melakukan ini? Anda boleh mendapatkan data harga bitcoin bersejarah dari dataset yang berbeza, tetapi dengan API, kita boleh mempunyai akses kepada data pasaran hidup.
- Langkah 1: Mengakses API
- Daftar untuk API Access:
- sebaik sahaja anda mendaftar di halaman API TheCCData. Anda boleh mendapatkan kunci API percuma dari pagehttps ini: //developers.cryptocompare.com/documentation/data-api/index_cc_v1_historical_days
- Ambil data harga bitcoin:
- Dengan kod di bawah, anda boleh mengambil data harga bitcoin dari API CCData dan menukarnya ke dalam data data PANDAS. Juga, simpan kunci API dalam fail .env.
- Langkah 2: Menyambung ke pangkalan data menggunakan mongoDB
import requests
import pandas as pd
from dotenv import load_dotenv
import os
# Load the .env file
load_dotenv()
def fetch_crypto_data(api_uri):
response = requests.get(
api_uri,
params={
"market": "cadli",
"instrument": "BTC-USD",
"limit": 5000,
"aggregate": 1,
"fill": "true",
"apply_mapping": "true",
"response_format": "JSON"
},
headers={"Content-type": "application/json; charset=UTF-8"}
)
if response.status_code == 200:
print('API Connection Successful! \nFetching the data...')
data = response.json()
data_list = data.get('Data', [])
df = pd.DataFrame(data_list)
df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s')
return df # Return the DataFrame
else:
raise Exception(f"API Error: {response.status_code} - {response.text}")
Salin selepas log masukSalin selepas log masukSalin selepas log masuk
import requests import pandas as pd from dotenv import load_dotenv import os # Load the .env file load_dotenv() def fetch_crypto_data(api_uri): response = requests.get( api_uri, params={ "market": "cadli", "instrument": "BTC-USD", "limit": 5000, "aggregate": 1, "fill": "true", "apply_mapping": "true", "response_format": "JSON" }, headers={"Content-type": "application/json; charset=UTF-8"} ) if response.status_code == 200: print('API Connection Successful! \nFetching the data...') data = response.json() data_list = data.get('Data', []) df = pd.DataFrame(data_list) df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s') return df # Return the DataFrame else: raise Exception(f"API Error: {response.status_code} - {response.text}")
Kod ini menyambung ke MongoDB, mengambil data harga bitcoin melalui API, dan mengemas kini pangkalan data dengan semua entri baru selepas tarikh log masuk terkini.
memperkenalkan zenml
zenmlis Platform sumber terbuka yang disesuaikan untuk operasi pembelajaran mesin, menyokong penciptaan saluran paip yang fleksibel dan siap pengeluaran. Di samping itu, ZenML mengintegrasikan dengan pelbagai alat pembelajaran mesin likemlflow, bentoml, dan lain -lain, untuk membuat saluran paip ML yang lancar.
⚠️ Jika anda adalah pengguna Windows, cuba pasang WSL pada sistem anda. ZenML tidak menyokong Windows.
Dalam projek ini, kami akan melaksanakan saluran paip tradisional, yang menggunakan ZenML, dan kami akan mengintegrasikan MLFlow dengan ZenML, untuk penjejakan eksperimen.
pra-syarat dan perintah ZENML asas
- python 3.12 atau lebih tinggi: anda boleh mendapatkannya dari sini: https: //www.python.org/downloads/
- Aktifkan persekitaran maya anda:
import os from pymongo import MongoClient from dotenv import load_dotenv from data.management.api import fetch_crypto_data # Import the API function import pandas as pd load_dotenv() MONGO_URI = os.getenv("MONGO_URI") API_URI = os.getenv("API_URI") client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None) db = client['crypto_data'] collection = db['historical_data'] try: latest_entry = collection.find_one(sort=[("DATE", -1)]) # Find the latest date if latest_entry: last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d') else: last_date = '2011-03-27' # Default start date if MongoDB is empty print(f"Fetching data starting from {last_date}...") new_data_df = fetch_crypto_data(API_URI) if latest_entry: new_data_df = new_data_df[new_data_df['DATE'] > last_date] if not new_data_df.empty: data_to_insert = new_data_df.to_dict(orient='records') result = collection.insert_many(data_to_insert) print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.") else: print("No new data to insert.") except Exception as e: print(f"An error occurred: {e}")
- perintah zenml:
Semua perintah ZenML teras bersama -sama dengan fungsi mereka disediakan di bawah:
#create a virtual environment python3 -m venv venv #Activate your virtual environmnent in your project folder source venv/bin/activate
Langkah 3: Integrasi Mlflow dengan Zenml
Kami menggunakan MLFlow untuk penjejakan eksperimen, untuk menjejaki model, artifak, metrik, dan nilai hiperparameter kami. Kami mendaftarkan MLFlow untuk pengesanan eksperimen dan penempatan model di sini:
#Install zenml pip install zenml #To Launch zenml server and dashboard locally pip install "zenml[server]" #To check the zenml Version: zenml version #To initiate a new repository zenml init #To run the dashboard locally: zenml login --local #To know the status of our zenml Pipelines zenml show #To shutdown the zenml server zenml clean
Senarai Stack Zenml
di sini, anda dapat melihat susun atur projek. Sekarang mari kita bincangkannya satu demi satu secara terperinci.
#Integrating mlflow with ZenML zenml integration install mlflow -y #Register the experiment tracker zenml experiment-tracker register mlflow_tracker --flavor=mlflow #Registering the model deployer zenml model-deployer register mlflow --flavor=mlflow #Registering the stack zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set #To view the stack list zenml stack --list
Kami mula -mula menelan data dari API ke MongoDB dan mengubahnya menjadi PANDAS DataFrame.
bitcoin_price_prediction_mlops/ # Project directory ├── data/ │ └── management/ │ ├── api_to_mongodb.py # Code to fetch data and save it to MongoDB │ └── api.py # API-related utility functions │ ├── pipelines/ │ ├── deployment_pipeline.py # Deployment pipeline │ └── training_pipeline.py # Training pipeline │ ├── saved_models/ # Directory for storing trained models ├── saved_scalers/ # Directory for storing scalers used in data preprocessing │ ├── src/ # Source code │ ├── data_cleaning.py # Data cleaning and preprocessing │ ├── data_ingestion.py # Data ingestion │ ├── data_splitter.py # Data splitting │ ├── feature_engineering.py # Feature engineering │ ├── model_evaluation.py # Model evaluation │ └── model_training.py # Model training │ ├── steps/ # ZenML steps │ ├── clean_data.py # ZenML step for cleaning data │ ├── data_splitter.py # ZenML step for data splitting │ ├── dynamic_importer.py # ZenML step for importing dynamic data │ ├── feature_engineering.py # ZenML step for feature engineering │ ├── ingest_data.py # ZenML step for data ingestion │ ├── model_evaluation.py # ZenML step for model evaluation │ ├── model_training.py # ZenML step for training the model │ ├── prediction_service_loader.py # ZenML step for loading prediction services │ ├── predictor.py # ZenML step for prediction │ └── utils.py # Utility functions for steps │ ├── .env # Environment variables file ├── .gitignore # Git ignore file │ ├── app.py # Streamlit user interface app │ ├── README.md # Project documentation ├── requirements.txt # List of required packages ├── run_deployment.py # Code for running deployment and prediction pipeline ├── run_pipeline.py # Code for running training pipeline └── .zen/ # ZenML directory (created automatically after ZenML initialization)
Kami menambah @step sebagai penghias kepada ingest_data () berfungsi untuk mengisytiharkannya sebagai langkah saluran paip latihan kami. Dengan cara yang sama, kami akan menulis kod untuk setiap langkah dalam seni bina projek dan membuat saluran paip.
Untuk melihat bagaimana saya telah menggunakan penghias@step , lihat pautan GitHub di bawah (folder langkah) untuk melalui kod untuk langkah -langkah lain dalam saluran paip iaitu pembersihan data, kejuruteraan ciri, pemisahan data, latihan model, dan penilaian model. Langkah 5: Pembersihan Data
Dalam langkah ini, kami akan membuat strategi yang berbeza untuk membersihkan data yang ditelan. Kami akan menggugurkan lajur yang tidak diingini dan nilai yang hilang dalam data.
import requests import pandas as pd from dotenv import load_dotenv import os # Load the .env file load_dotenv() def fetch_crypto_data(api_uri): response = requests.get( api_uri, params={ "market": "cadli", "instrument": "BTC-USD", "limit": 5000, "aggregate": 1, "fill": "true", "apply_mapping": "true", "response_format": "JSON" }, headers={"Content-type": "application/json; charset=UTF-8"} ) if response.status_code == 200: print('API Connection Successful! \nFetching the data...') data = response.json() data_list = data.get('Data', []) df = pd.DataFrame(data_list) df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s') return df # Return the DataFrame else: raise Exception(f"API Error: {response.status_code} - {response.text}")
Langkah 6: Kejuruteraan Ciri
Langkah ini mengambil data yang dibersihkan dari langkah data_cleaning sebelumnya. Kami mencipta ciri-ciri baru seperti purata bergerak mudah (SMA), purata bergerak eksponen (EMA), dan statistik yang tertinggal dan bergulir untuk menangkap trend, mengurangkan bunyi, dan membuat ramalan yang lebih dipercayai dari data siri masa. Di samping itu, kami skala ciri dan pembolehubah sasaran menggunakan skala Minmax.
import os from pymongo import MongoClient from dotenv import load_dotenv from data.management.api import fetch_crypto_data # Import the API function import pandas as pd load_dotenv() MONGO_URI = os.getenv("MONGO_URI") API_URI = os.getenv("API_URI") client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None) db = client['crypto_data'] collection = db['historical_data'] try: latest_entry = collection.find_one(sort=[("DATE", -1)]) # Find the latest date if latest_entry: last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d') else: last_date = '2011-03-27' # Default start date if MongoDB is empty print(f"Fetching data starting from {last_date}...") new_data_df = fetch_crypto_data(API_URI) if latest_entry: new_data_df = new_data_df[new_data_df['DATE'] > last_date] if not new_data_df.empty: data_to_insert = new_data_df.to_dict(orient='records') result = collection.insert_many(data_to_insert) print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.") else: print("No new data to insert.") except Exception as e: print(f"An error occurred: {e}")
Langkah 7: Pemisahan data
Sekarang, kita membahagikan data yang diproses ke dalam latihan dan ujian dataset dalam nisbah 80:20.
#create a virtual environment python3 -m venv venv #Activate your virtual environmnent in your project folder source venv/bin/activate
Langkah 8: Latihan Model
Dalam langkah ini, kami melatih model Thelstm dengan berhenti awal untuk mengelakkan terlalu banyak, dan dengan menggunakan pembalakan automatik MLFlow untuk mengesan model dan eksperimen kami dan menyimpan model terlatih sebagai lstm_model.keras .
#Install zenml pip install zenml #To Launch zenml server and dashboard locally pip install "zenml[server]" #To check the zenml Version: zenml version #To initiate a new repository zenml init #To run the dashboard locally: zenml login --local #To know the status of our zenml Pipelines zenml show #To shutdown the zenml server zenml clean
Oleh kerana ini adalah masalah regresi, kami menggunakan metrik penilaian seperti kesilapan kuadrat min (MSE), kesilapan akar kuadrat (MSE), kesilapan mutlak (MAE), dan R-kuadrat.
#Integrating mlflow with ZenML zenml integration install mlflow -y #Register the experiment tracker zenml experiment-tracker register mlflow_tracker --flavor=mlflow #Registering the model deployer zenml model-deployer register mlflow --flavor=mlflow #Registering the stack zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set #To view the stack list zenml stack --list
bitcoin_price_prediction_mlops/ # Project directory ├── data/ │ └── management/ │ ├── api_to_mongodb.py # Code to fetch data and save it to MongoDB │ └── api.py # API-related utility functions │ ├── pipelines/ │ ├── deployment_pipeline.py # Deployment pipeline │ └── training_pipeline.py # Training pipeline │ ├── saved_models/ # Directory for storing trained models ├── saved_scalers/ # Directory for storing scalers used in data preprocessing │ ├── src/ # Source code │ ├── data_cleaning.py # Data cleaning and preprocessing │ ├── data_ingestion.py # Data ingestion │ ├── data_splitter.py # Data splitting │ ├── feature_engineering.py # Feature engineering │ ├── model_evaluation.py # Model evaluation │ └── model_training.py # Model training │ ├── steps/ # ZenML steps │ ├── clean_data.py # ZenML step for cleaning data │ ├── data_splitter.py # ZenML step for data splitting │ ├── dynamic_importer.py # ZenML step for importing dynamic data │ ├── feature_engineering.py # ZenML step for feature engineering │ ├── ingest_data.py # ZenML step for data ingestion │ ├── model_evaluation.py # ZenML step for model evaluation │ ├── model_training.py # ZenML step for training the model │ ├── prediction_service_loader.py # ZenML step for loading prediction services │ ├── predictor.py # ZenML step for prediction │ └── utils.py # Utility functions for steps │ ├── .env # Environment variables file ├── .gitignore # Git ignore file │ ├── app.py # Streamlit user interface app │ ├── README.md # Project documentation ├── requirements.txt # List of required packages ├── run_deployment.py # Code for running deployment and prediction pipeline ├── run_pipeline.py # Code for running training pipeline └── .zen/ # ZenML directory (created automatically after ZenML initialization)
@pipeline penghias digunakan untuk menentukan fungsiML_PIPELINE () sebagai saluran paip di zenml.
Untuk melihat papan pemuka untuk saluran paip latihan, hanya jalankan skrip run_pipeline.py. Mari buat fail run_pipeline.py.
import os import logging from pymongo import MongoClient from dotenv import load_dotenv from zenml import step import pandas as pd # Load the .env file load_dotenv() # Get MongoDB URI from environment variables MONGO_URI = os.getenv("MONGO_URI") def fetch_data_from_mongodb(collection_name:str, database_name:str): """ Fetches data from MongoDB and converts it into a pandas DataFrame. collection_name: Name of the MongoDB collection to fetch data. database_name: Name of the MongoDB database. return: A pandas DataFrame containing the data """ # Connect to the MongoDB client client = MongoClient(MONGO_URI) db = client[database_name] # Select the database collection = db[collection_name] # Select the collection # Fetch all documents from the collection try: logging.info(f"Fetching data from MongoDB collection: {collection_name}...") data = list(collection.find()) # Convert cursor to a list of dictionaries if not data: logging.info("No data found in the MongoDB collection.") # Convert the list of dictionaries into a pandas DataFrame df = pd.DataFrame(data) # Drop the MongoDB ObjectId field if it exists (optional) if '_id' in df.columns: df = df.drop(columns=['_id']) logging.info("Data successfully fetched and converted to a DataFrame!") return df except Exception as e: logging.error(f"An error occurred while fetching data: {e}") raise e @step(enable_cache=False) def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame: logging.info("Started data ingestion process from MongoDB.") try: # Use the fetch_data_from_mongodb function to fetch data df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name) if df.empty: logging.warning("No data was loaded. Check the collection name or the database content.") else: logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.") return df except Exception as e: logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}") raise e
class DataPreprocessor: def __init__(self, data: pd.DataFrame): self.data = data logging.info("DataPreprocessor initialized with data of shape: %s", data.shape) def clean_data(self) -> pd.DataFrame: """ Performs data cleaning by removing unnecessary columns, dropping columns with missing values, and returning the cleaned DataFrame. Returns: pd.DataFrame: The cleaned DataFrame with unnecessary and missing-value columns removed. """ logging.info("Starting data cleaning process.") # Drop unnecessary columns, including '_id' if it exists columns_to_drop = [ 'UNIT', 'TYPE', 'MARKET', 'INSTRUMENT', 'FIRST_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_TIMESTAMP', 'FIRST_MESSAGE_VALUE', 'HIGH_MESSAGE_VALUE', 'HIGH_MESSAGE_TIMESTAMP', 'LOW_MESSAGE_VALUE', 'LOW_MESSAGE_TIMESTAMP', 'LAST_MESSAGE_VALUE', 'TOTAL_INDEX_UPDATES', 'VOLUME_TOP_TIER', 'QUOTE_VOLUME_TOP_TIER', 'VOLUME_DIRECT', 'QUOTE_VOLUME_DIRECT', 'VOLUME_TOP_TIER_DIRECT', 'QUOTE_VOLUME_TOP_TIER_DIRECT', '_id' # Adding '_id' to the list ] logging.info("Dropping columns: %s") self.data = self.drop_columns(self.data, columns_to_drop) # Drop columns where the number of missing values is greater than 0 logging.info("Dropping columns with missing values.") self.data = self.drop_columns_with_missing_values(self.data) logging.info("Data cleaning completed. Data shape after cleaning: %s", self.data.shape) return self.data def drop_columns(self, data: pd.DataFrame, columns: list) -> pd.DataFrame: """ Drops specified columns from the DataFrame. Returns: pd.DataFrame: The DataFrame with the specified columns removed. """ logging.info("Dropping columns: %s", columns) return data.drop(columns=columns, errors='ignore') def drop_columns_with_missing_values(self, data: pd.DataFrame) -> pd.DataFrame: """ Drops columns with any missing values from the DataFrame. Parameters: data: pd.DataFrame The DataFrame from which columns with missing values will be removed. Returns: pd.DataFrame: The DataFrame with columns containing missing values removed. """ missing_columns = data.columns[data.isnull().sum() > 0] if not missing_columns.empty: logging.info("Columns with missing values: %s", missing_columns.tolist()) else: logging.info("No columns with missing values found.") return data.loc[:, data.isnull().sum() == 0]
Paip latihan kelihatan seperti ini di papan pemuka, yang diberikan di bawah:
Langkah 10: Penggunaan Model
Paip Penyebaran Berterusan
Paip ini bertanggungjawab untuk terus menggunakan model terlatih. Ia mula -mula menjalankan
ml_pipeline ()
darilatihan_pipeline.py
fail untuk melatih model, kemudian menggunakanimport joblib import pandas as pd from abc import ABC, abstractmethod from sklearn.preprocessing import MinMaxScaler # Abstract class for Feature Engineering strategy class FeatureEngineeringStrategy(ABC): @abstractmethod def generate_features(self, df: pd.DataFrame) -> pd.DataFrame: pass # Concrete class for calculating SMA, EMA, RSI, and other features class TechnicalIndicators(FeatureEngineeringStrategy): def generate_features(self, df: pd.DataFrame) -> pd.DataFrame: # Calculate SMA, EMA, and RSI df['SMA_20'] = df['CLOSE'].rolling(window=20).mean() df['SMA_50'] = df['CLOSE'].rolling(window=50).mean() df['EMA_20'] = df['CLOSE'].ewm(span=20, adjust=False).mean() # Price difference features df['OPEN_CLOSE_diff'] = df['OPEN'] - df['CLOSE'] df['HIGH_LOW_diff'] = df['HIGH'] - df['LOW'] df['HIGH_OPEN_diff'] = df['HIGH'] - df['OPEN'] df['CLOSE_LOW_diff'] = df['CLOSE'] - df['LOW'] # Lagged features df['OPEN_lag1'] = df['OPEN'].shift(1) df['CLOSE_lag1'] = df['CLOSE'].shift(1) df['HIGH_lag1'] = df['HIGH'].shift(1) df['LOW_lag1'] = df['LOW'].shift(1) # Rolling statistics df['CLOSE_roll_mean_14'] = df['CLOSE'].rolling(window=14).mean() df['CLOSE_roll_std_14'] = df['CLOSE'].rolling(window=14).std() # Drop rows with missing values (due to rolling windows, shifts) df.dropna(inplace=True) return df # Abstract class for Scaling strategy class ScalingStrategy(ABC): @abstractmethod def scale(self, df: pd.DataFrame, features: list, target: str): pass # Concrete class for MinMax Scaling class MinMaxScaling(ScalingStrategy): def scale(self, df: pd.DataFrame, features: list, target: str): """ Scales the features and target using MinMaxScaler. Parameters: df: pd.DataFrame The DataFrame containing the features and target. features: list List of feature column names. target: str The target column name. Returns: pd.DataFrame, pd.DataFrame: Scaled features and target """ scaler_X = MinMaxScaler(feature_range=(0, 1)) scaler_y = MinMaxScaler(feature_range=(0, 1)) X_scaled = scaler_X.fit_transform(df[features].values) y_scaled = scaler_y.fit_transform(df[[target]].values) joblib.dump(scaler_X, 'saved_scalers/scaler_X.pkl') joblib.dump(scaler_y, 'saved_scalers/scaler_y.pkl') return X_scaled, y_scaled, scaler_y # FeatureEngineeringContext: This will use the Strategy Pattern class FeatureEngineering: def __init__(self, feature_strategy: FeatureEngineeringStrategy, scaling_strategy: ScalingStrategy): self.feature_strategy = feature_strategy self.scaling_strategy = scaling_strategy def process_features(self, df: pd.DataFrame, features: list, target: str): # Generate features using the provided strategy df_with_features = self.feature_strategy.generate_features(df) # Scale features and target using the provided strategy X_scaled, y_scaled, scaler_y = self.scaling_strategy.scale(df_with_features, features, target) return df_with_features, X_scaled, y_scaled, scaler_y
Pipeline Inference
Kami menggunakan saluran paip kesimpulan untuk membuat ramalan pada data baru, menggunakan model yang digunakan. Mari kita lihat bagaimana kami melaksanakan saluran paip ini dalam projek kami.
import requests import pandas as pd from dotenv import load_dotenv import os # Load the .env file load_dotenv() def fetch_crypto_data(api_uri): response = requests.get( api_uri, params={ "market": "cadli", "instrument": "BTC-USD", "limit": 5000, "aggregate": 1, "fill": "true", "apply_mapping": "true", "response_format": "JSON" }, headers={"Content-type": "application/json; charset=UTF-8"} ) if response.status_code == 200: print('API Connection Successful! \nFetching the data...') data = response.json() data_list = data.get('Data', []) df = pd.DataFrame(data_list) df['DATE'] = pd.to_datetime(df['TIMESTAMP'], unit='s') return df # Return the DataFrame else: raise Exception(f"API Error: {response.status_code} - {response.text}")
mari kita lihat tentang setiap fungsi yang dipanggil dalam saluran paip kesimpulan di bawah:
dynamic_importer ()
Fungsi ini memuat data baru, melakukan pemprosesan data, dan mengembalikan data.
import os from pymongo import MongoClient from dotenv import load_dotenv from data.management.api import fetch_crypto_data # Import the API function import pandas as pd load_dotenv() MONGO_URI = os.getenv("MONGO_URI") API_URI = os.getenv("API_URI") client = MongoClient(MONGO_URI, ssl=True, ssl_certfile=None, ssl_ca_certs=None) db = client['crypto_data'] collection = db['historical_data'] try: latest_entry = collection.find_one(sort=[("DATE", -1)]) # Find the latest date if latest_entry: last_date = pd.to_datetime(latest_entry['DATE']).strftime('%Y-%m-%d') else: last_date = '2011-03-27' # Default start date if MongoDB is empty print(f"Fetching data starting from {last_date}...") new_data_df = fetch_crypto_data(API_URI) if latest_entry: new_data_df = new_data_df[new_data_df['DATE'] > last_date] if not new_data_df.empty: data_to_insert = new_data_df.to_dict(orient='records') result = collection.insert_many(data_to_insert) print(f"Inserted {len(result.inserted_ids)} new records into MongoDB.") else: print("No new data to insert.") except Exception as e: print(f"An error occurred: {e}")
prediction_service_loader ()
Fungsi ini dihiasi dengan @step . Kami memuatkan perkhidmatan penempatan W.R.T Model yang digunakan berdasarkan pipeline_name, dan step_name, di mana model yang digunakan kami bersedia untuk memproses pertanyaan ramalan untuk data baru.
garis sedia ada_services = mlflow_model_deployer_component.find_model_server () Mencari perkhidmatan penyebaran yang tersedia berdasarkan parameter yang diberikan seperti nama saluran paip dan nama langkah paip. Sekiranya tiada perkhidmatan yang tersedia, ia menunjukkan bahawa saluran paip penempatan sama ada tidak dijalankan atau mengalami masalah dengan saluran paip penempatan, jadi ia melemparkan runtimeerror.
#create a virtual environment python3 -m venv venv #Activate your virtual environmnent in your project folder source venv/bin/activate
Predictor ()
Fungsi mengambil dalam model MLFlow yang dikerahkan melalui MLFlowDeploymentservice dan data baru. Data diproses lebih jauh untuk memadankan format model yang diharapkan untuk membuat kesimpulan masa nyata.
#Install zenml pip install zenml #To Launch zenml server and dashboard locally pip install "zenml[server]" #To check the zenml Version: zenml version #To initiate a new repository zenml init #To run the dashboard locally: zenml login --local #To know the status of our zenml Pipelines zenml show #To shutdown the zenml server zenml clean
Untuk memvisualisasikan saluran penyebaran dan kesimpulan yang berterusan, kita perlu menjalankan skrip run_deployment.py, di mana konfigurasi penempatan dan ramalan akan ditakrifkan. (Sila periksa kod run_deployment.py dalam github yang diberikan di bawah).
#Integrating mlflow with ZenML zenml integration install mlflow -y #Register the experiment tracker zenml experiment-tracker register mlflow_tracker --flavor=mlflow #Registering the model deployer zenml model-deployer register mlflow --flavor=mlflow #Registering the stack zenml stack register local-mlflow-stack-new -a default -o default -d mlflow -e mlflow_tracker --set #To view the stack list zenml stack --list
Sekarang mari kita jalankan fail run_deployment.py untuk melihat papan pemuka saluran paip penempatan berterusan dan saluran paip kesimpulan.
bitcoin_price_prediction_mlops/ # Project directory ├── data/ │ └── management/ │ ├── api_to_mongodb.py # Code to fetch data and save it to MongoDB │ └── api.py # API-related utility functions │ ├── pipelines/ │ ├── deployment_pipeline.py # Deployment pipeline │ └── training_pipeline.py # Training pipeline │ ├── saved_models/ # Directory for storing trained models ├── saved_scalers/ # Directory for storing scalers used in data preprocessing │ ├── src/ # Source code │ ├── data_cleaning.py # Data cleaning and preprocessing │ ├── data_ingestion.py # Data ingestion │ ├── data_splitter.py # Data splitting │ ├── feature_engineering.py # Feature engineering │ ├── model_evaluation.py # Model evaluation │ └── model_training.py # Model training │ ├── steps/ # ZenML steps │ ├── clean_data.py # ZenML step for cleaning data │ ├── data_splitter.py # ZenML step for data splitting │ ├── dynamic_importer.py # ZenML step for importing dynamic data │ ├── feature_engineering.py # ZenML step for feature engineering │ ├── ingest_data.py # ZenML step for data ingestion │ ├── model_evaluation.py # ZenML step for model evaluation │ ├── model_training.py # ZenML step for training the model │ ├── prediction_service_loader.py # ZenML step for loading prediction services │ ├── predictor.py # ZenML step for prediction │ └── utils.py # Utility functions for steps │ ├── .env # Environment variables file ├── .gitignore # Git ignore file │ ├── app.py # Streamlit user interface app │ ├── README.md # Project documentation ├── requirements.txt # List of required packages ├── run_deployment.py # Code for running deployment and prediction pipeline ├── run_pipeline.py # Code for running training pipeline └── .zen/ # ZenML directory (created automatically after ZenML initialization)
saluran paip penempatan berterusan - output
Setelah menjalankan fail run_deployment.py, anda dapat melihat pautan papan pemuka mlflow yang kelihatan seperti ini.
Sekarang anda perlu menyalin dan menyisipkan pautan UI MLFlow di atas dalam baris arahan anda dan jalankannya.
import os import logging from pymongo import MongoClient from dotenv import load_dotenv from zenml import step import pandas as pd # Load the .env file load_dotenv() # Get MongoDB URI from environment variables MONGO_URI = os.getenv("MONGO_URI") def fetch_data_from_mongodb(collection_name:str, database_name:str): """ Fetches data from MongoDB and converts it into a pandas DataFrame. collection_name: Name of the MongoDB collection to fetch data. database_name: Name of the MongoDB database. return: A pandas DataFrame containing the data """ # Connect to the MongoDB client client = MongoClient(MONGO_URI) db = client[database_name] # Select the database collection = db[collection_name] # Select the collection # Fetch all documents from the collection try: logging.info(f"Fetching data from MongoDB collection: {collection_name}...") data = list(collection.find()) # Convert cursor to a list of dictionaries if not data: logging.info("No data found in the MongoDB collection.") # Convert the list of dictionaries into a pandas DataFrame df = pd.DataFrame(data) # Drop the MongoDB ObjectId field if it exists (optional) if '_id' in df.columns: df = df.drop(columns=['_id']) logging.info("Data successfully fetched and converted to a DataFrame!") return df except Exception as e: logging.error(f"An error occurred while fetching data: {e}") raise e @step(enable_cache=False) def ingest_data(collection_name: str = "historical_data", database_name: str = "crypto_data") -> pd.DataFrame: logging.info("Started data ingestion process from MongoDB.") try: # Use the fetch_data_from_mongodb function to fetch data df = fetch_data_from_mongodb(collection_name=collection_name, database_name=database_name) if df.empty: logging.warning("No data was loaded. Check the collection name or the database content.") else: logging.info(f"Data ingestion completed. Number of records loaded: {len(df)}.") return df except Exception as e: logging.error(f"Error while reading data from {collection_name} in {database_name}: {e}") raise e
di sini adalah papan pemuka mlflow, di mana anda dapat melihat metrik penilaian dan parameter model:
Langkah 11: Membina aplikasi Streamlit
Sekali lagi, anda boleh mencari kod di GitHub untuk aplikasi Streamlit.
Kesimpulan
Dalam artikel ini, kami telah berjaya membina projek MLOPS Ramalan Bitcoin yang siap sedia, siap sedia. Dari memperoleh data melalui API dan memprosesnya untuk latihan model, penilaian, dan penempatan, projek kami menyoroti peranan kritikal MLOPS dalam menghubungkan pembangunan dengan pengeluaran. Kami satu langkah lebih dekat untuk membentuk masa depan meramalkan harga bitcoin dalam masa nyata. API memberikan akses lancar kepada data luaran, seperti data harga bitcoin dari API CCData, menghapuskan keperluan untuk dataset yang sedia ada.
Takeaways Key
- API membolehkan akses lancar ke data luaran, seperti data harga bitcoin dari CCData API, menghapuskan keperluan untuk dataset yang sedia ada.
- zenml dan mlflow adalah alat yang mantap yang memudahkan pembangunan, penjejakan, dan penggunaan model pembelajaran mesin dalam aplikasi dunia nyata.
- kami telah mengikuti amalan terbaik dengan melakukan pengambilan data, pembersihan, kejuruteraan ciri, latihan model, dan penilaian dengan betul.
- Pipelin penempatan dan kesimpulan yang berterusan adalah penting untuk memastikan model tetap cekap dan boleh didapati dalam persekitaran pengeluaran.
Q1. Adakah zenml percuma untuk digunakan? a. Ya, ZenML adalah rangka kerja MLOPS sumber terbuka yang menjadikan peralihan dari pembangunan tempatan ke saluran paip pengeluaran semudah 1 baris kod.
Q2. Apa yang digunakan oleh MLFlow?
a. Ini adalah kesilapan biasa yang akan anda hadapi dalam projek. Hanya jalankan `zenml logout -local` kemudian` zenml clean`, dan kemudian `zenml login -local`, sekali lagi menjalankan saluran paip. Ia akan diselesaikan.
Atas ialah kandungan terperinci Ramalan Harga Bitcoin Menggunakan MLOPS. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Alat AI Hot

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool
Gambar buka pakaian secara percuma

Clothoff.io
Penyingkiran pakaian AI

Video Face Swap
Tukar muka dalam mana-mana video dengan mudah menggunakan alat tukar muka AI percuma kami!

Artikel Panas

Alat panas

Notepad++7.3.1
Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina
Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1
Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6
Alat pembangunan web visual

SublimeText3 versi Mac
Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Topik panas

Meta's Llama 3.2: Lompat ke hadapan dalam Multimodal dan Mobile AI META baru -baru ini melancarkan Llama 3.2, kemajuan yang ketara dalam AI yang memaparkan keupayaan penglihatan yang kuat dan model teks ringan yang dioptimumkan untuk peranti mudah alih. Membina kejayaan o

Hei ada, pengekodan ninja! Apa tugas yang berkaitan dengan pengekodan yang anda telah merancang untuk hari itu? Sebelum anda menyelam lebih jauh ke dalam blog ini, saya ingin anda memikirkan semua kesengsaraan yang berkaitan dengan pengekodan anda-lebih jauh menyenaraikan mereka. Selesai? - Let ’

Landskap AI minggu ini: Badai kemajuan, pertimbangan etika, dan perdebatan pengawalseliaan. Pemain utama seperti Openai, Google, Meta, dan Microsoft telah melepaskan kemas kini, dari model baru yang terobosan ke peralihan penting di LE

Memo CEO Shopify Tobi Lütke baru -baru ini dengan berani mengisytiharkan penguasaan AI sebagai harapan asas bagi setiap pekerja, menandakan peralihan budaya yang signifikan dalam syarikat. Ini bukan trend seketika; Ini adalah paradigma operasi baru yang disatukan ke p

Pengenalan Bayangkan berjalan melalui galeri seni, dikelilingi oleh lukisan dan patung yang terang. Sekarang, bagaimana jika anda boleh bertanya setiap soalan dan mendapatkan jawapan yang bermakna? Anda mungkin bertanya, "Kisah apa yang anda ceritakan?

Pengenalan OpenAI telah mengeluarkan model barunya berdasarkan seni bina "strawberi" yang sangat dijangka. Model inovatif ini, yang dikenali sebagai O1, meningkatkan keupayaan penalaran, yang membolehkannya berfikir melalui masalah MOR

Pernyataan Jadual Alter SQL: Menambah lajur secara dinamik ke pangkalan data anda Dalam pengurusan data, kebolehsuaian SQL adalah penting. Perlu menyesuaikan struktur pangkalan data anda dengan cepat? Pernyataan Jadual ALTER adalah penyelesaian anda. Butiran panduan ini menambah colu

Laporan Indeks Perisikan Buatan 2025 yang dikeluarkan oleh Stanford University Institute for Manusia Berorientasikan Kecerdasan Buatan memberikan gambaran yang baik tentang revolusi kecerdasan buatan yang berterusan. Mari kita menafsirkannya dalam empat konsep mudah: kognisi (memahami apa yang sedang berlaku), penghargaan (melihat faedah), penerimaan (cabaran muka), dan tanggungjawab (cari tanggungjawab kita). Kognisi: Kecerdasan buatan di mana -mana dan berkembang pesat Kita perlu menyedari betapa cepatnya kecerdasan buatan sedang berkembang dan menyebarkan. Sistem kecerdasan buatan sentiasa bertambah baik, mencapai hasil yang sangat baik dalam ujian matematik dan pemikiran kompleks, dan hanya setahun yang lalu mereka gagal dalam ujian ini. Bayangkan AI menyelesaikan masalah pengekodan kompleks atau masalah saintifik peringkat siswazah-sejak tahun 2023
