Machines don’t break down out of nowhere—there are at all times indicators. The issue? People aren’t at all times nice at noticing them. That’s the place Machine Predictive Upkeep is available in! This information will take you thru the thrilling world of Machine Predictive Upkeep, utilizing AWS and MLOps to make sure your tools stays predictably dependable.
Studying Goals
- Perceive methods to design and implement an end-to-end MLOps pipeline for predictive upkeep, overlaying information ingestion, mannequin coaching, and deployment.
- Be taught to combine important instruments like Docker, FastAPI, and AWS providers to construct a strong, production-ready machine studying utility.
- Discover the usage of GitHub Actions for automating CI/CD workflows, making certain easy and dependable code integration and deployment.
- Arrange finest practices for monitoring, efficiency monitoring, and steady enchancment to maintain your machine studying fashions environment friendly and maintainable.
This text was printed as part of the Knowledge Science Blogathon.
Downside: Unplanned Downtime & Upkeep Prices
Surprising tools failures in industrial settings trigger downtime and monetary losses. In our undertaking, we’re utilizing MLOps finest practices and machine studying to detect points early, enabling well timed repairs and lowering disruptions.
Earlier than diving into implementation, let’s take a better have a look at the undertaking structure.
Needed Stipulations
Beneath we’ll first look within the stipulations required:
Clone the repository:
git clone "https://github.com/karthikponna/Predictive_Maintenance_MLOps.git"
cd Predictive_Maintenance_MLOps
Create and activate the digital surroundings:
# For macOS and Linux:
python3 -m venv venv
supply venv/bin/activate
# For Home windows:
python -m venv venv
.venvScriptsactivate
Set up Required Dependencies:
pip set up -r necessities.txt
Set Up Setting Variables:
# Create a `.env` file and add your MongoDB connection string:
MONGO_URI=your_mongodb_connection_string
Venture Construction
The Venture Construction outlines the important thing parts and group of the undertaking, making certain readability and maintainability. It helps in understanding how totally different modules work together and the way the general system is designed. A well-defined construction simplifies growth, debugging, and scalability.
project_root/
│
├── .github/
│ └── workflows/
│ └── principal.yml
│
├── data_schema/
│ └── schema.yaml
│
├── final_model/
│ ├── mannequin.pkl
│ └── preprocessor.pkl
│
├── Machine_Predictive_Data/
│ └── predictive_maintenance.csv
│
├── machine_predictive_maintenance/
│ ├── cloud/
│ ├── parts/
│ ├── fixed/
│ ├── entity/
│ ├── exception/
│ ├── logging/
│ ├── pipeline/
│ ├── utils/
│ └── __init__.py
│
├── my_venv/
│
├── notebooks/
│ ├── EDA.ipynb
│ ├── prediction_output/
│
├── templates/
│ └── desk.html
│
├── valid_data/
| └──check.csv
│
├── .env
├── .gitignore
├── app.py
├── Dockerfile
├── principal.py
├── push_data.py
├── README.md
├── necessities.txt
├── setup.py
├── test_mongodb.py
Knowledge Ingestion
On this undertaking, we use a machine predictive upkeep CSV file, changing it into JSON data, and inserting it right into a MongoDB assortment.
Dataset Hyperlink: https://www.kaggle.com/datasets/shivamb/machine-predictive-maintenance-classification
Right here is the code snippet to transform CSV -> JSON data -> MongoDB
class PredictiveDataExtract():
def __init__(self):
"""
Initializes the PredictiveDataExtract class.
"""
strive:
move
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def csv_to_json_convertor(self, file_path):
strive:
information = pd.read_csv(file_path)
information.reset_index(drop=True, inplace=True)
data = checklist(json.masses(information.T.to_json()).values())
return data
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def insert_data_mongodb(self, data, database, assortment):
strive:
self.data = data
self.database = database
self.assortment = assortment
self.mongo_client = pymongo.MongoClient(MONGO_DB_URL)
self.database = self.mongo_client[self.database]
self.assortment = self.database[self.collection]
self.assortment.insert_many(self.data)
return(len(self.data))
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
if __name__=="__main__":
FILE_PATH="Machine_Predictive_Data/predictive_maintenance.csv"
DATABASE="Predictive_Maintenance_MLOps"
assortment = "Machine_Predictive_Data"
predictive_data_obj=PredictiveDataExtract()
data = predictive_data_obj.csv_to_json_convertor(FILE_PATH)
no_of_records = predictive_data_obj.insert_data_mongodb(data, DATABASE, assortment)
print(no_of_records)
Right here is the code snippet to fetch information from MongoDB, break up the info into practice and check CSV information, and retailer them as a Knowledge Ingestion artifact.
class DataIngestion:
def __init__(self, data_ingestion_config:DataIngestionConfig):
"""
Initializes the DataIngestion class with the offered configuration.
Parameters:
data_ingestion_config: DataIngestionConfig
Configuration object containing particulars for information ingestion.
"""
strive:
self.data_ingestion_config=data_ingestion_config
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def export_collection_as_dataframe(self):
strive:
database_name= self.data_ingestion_config.database_name
collection_name= self.data_ingestion_config.collection_name
self.mongo_client = pymongo.MongoClient(MONGO_DB_URL)
assortment = self.mongo_client[database_name][collection_name]
df = pd.DataFrame(checklist(assortment.discover()))
if "_id" in df.columns.to_list():
df = df.drop(columns=["_id"], axis=1)
df.exchange({"na":np.nan},inplace=True)
return df
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def export_data_into_feature_store(self,dataframe: pd.DataFrame):
strive:
feature_store_file_path=self.data_ingestion_config.feature_store_file_path
#creating folder
dir_path = os.path.dirname(feature_store_file_path)
os.makedirs(dir_path,exist_ok=True)
dataframe.to_csv(feature_store_file_path,index=False,header=True)
return dataframe
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e,sys)
def split_data_as_train_test(self, dataframe:pd.DataFrame):
strive:
train_set, test_set = train_test_split(
dataframe, test_size=self.data_ingestion_config.train_test_split_ratio
)
logging.information("Carried out practice check break up on the dataframe")
logging.information(
"Exited split_data_as_train_test methodology of Data_Ingestion class"
)
dir_path = os.path.dirname(self.data_ingestion_config.training_file_path)
os.makedirs(dir_path, exist_ok=True)
logging.information(f"Exporting practice and check file path.")
train_set.to_csv(
self.data_ingestion_config.training_file_path, index=False, header=True
)
test_set.to_csv(
self.data_ingestion_config.testing_file_path, index=False, header=True
)
logging.information(f"Exported practice and check file path." )
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def initiate_data_ingestion(self):
strive:
dataframe = self.export_collection_as_dataframe()
dataframe = self.export_data_into_feature_store(dataframe)
self.split_data_as_train_test(dataframe)
dataingestionartifact= DataIngestionArtifact(trained_file_path=self.data_ingestion_config.training_file_path,
test_file_path=self.data_ingestion_config.testing_file_path)
return dataingestionartifact
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e,sys)
Knowledge Validation
On this step, we’ll examine if the ingested information meets the anticipated format by making certain all required columns are current utilizing a predefined schema and evaluating the coaching and testing information for any variations. Then we save the clear information and create a drift report, so solely high quality information is used for the subsequent step, which is reworking the info.
Knowledge Validation code snippet:
class DataValidation:
def __init__(self, data_ingestion_artifact:DataIngestionArtifact,
data_validation_config: DataValidationConfig):
strive:
self.data_ingestion_artifact= data_ingestion_artifact
self.data_validation_config= data_validation_config
self._schema_config = read_yaml_file(SCHEMA_FILE_PATH)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
@staticmethod
def read_data(file_path) -> pd.DataFrame:
strive:
return pd.read_csv(file_path)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def validate_number_of_columns(self, dataframe:pd.DataFrame)-> bool:
strive:
number_of_columns = len(self._schema_config)
logging.information(f"Required variety of columns:{number_of_columns}")
logging.information(f"Knowledge body has columns:{len(dataframe.columns)}")
if len(dataframe.columns) == number_of_columns:
return True
return False
besides Exception as e:
elevate MachinePredictiveMaintenanceException (e, sys)
def is_columns_exist(self, df:pd.DataFrame) -> bool:
strive:
dataframe_columns = df.columns
missing_numerical_columns = []
missing_categorical_columns = []
for column in self._schema_config["numerical_columns"]:
if column not in dataframe_columns:
missing_numerical_columns.append(column)
if len(missing_numerical_columns) > 0:
logging.information(f"Lacking numerical column: {missing_numerical_columns}")
for column in self._schema_config["categorical_columns"]:
if column not in dataframe_columns:
missing_categorical_columns.append(column)
if len(missing_categorical_columns) > 0:
logging.information(f"Lacking categorical column: {missing_categorical_columns}")
return False if len(missing_categorical_columns)>0 or len(missing_numerical_columns)>0 else True
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def detect_dataset_drift(self, base_df, current_df, threshold = 0.05) -> bool :
strive:
standing = True
report = {}
for column in base_df.columns:
d1 = base_df[column]
d2 = current_df[column]
is_same_dist = ks_2samp(d1, d2)
if threshold <= is_same_dist.pvalue:
is_found = False
else:
is_found = True
standing = False
report.replace({column:{
"p_value": float(is_same_dist.pvalue),
"drift_status": is_found
}})
drift_report_file_path = self.data_validation_config.drift_report_file_path
dir_path = os.path.dirname(drift_report_file_path)# create listing
os.makedirs(dir_path, exist_ok=True)
write_yaml_file(file_path=drift_report_file_path, content material=report)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def initiate_data_validation(self)-> DataValidationArtifact:
strive:
validation_error_msg = ""
logging.information("Beginning information validation")
train_file_path = self.data_ingestion_artifact.trained_file_path
test_file_path = self.data_ingestion_artifact.test_file_path
# learn the info from the practice and check
train_dataframe = DataValidation.read_data(train_file_path)
test_dataframe = DataValidation.read_data(test_file_path)
# validate variety of columns
standing = self.validate_number_of_columns(dataframe=train_dataframe)
logging.information(f"All required columns current in coaching dataframe: {standing}")
if not standing:
validation_error_msg += f"Practice dataframe doesn't include all columns.n"
standing = self.validate_number_of_columns(dataframe=test_dataframe)
if not standing:
validation_error_msg += f"Take a look at dataframe doesn't include all columns.n"
standing = self.is_columns_exist(df=train_dataframe)
if not standing:
validation_error_msg += f"Columns are lacking in coaching dataframe."
standing = self.is_columns_exist(df=test_dataframe)
if not standing:
validation_error_msg += f"columns are lacking in check dataframe."
## lets examine datadrift
standing=self.detect_dataset_drift(base_df=train_dataframe,current_df=test_dataframe)
dir_path=os.path.dirname(self.data_validation_config.valid_train_file_path)
os.makedirs(dir_path,exist_ok=True)
train_dataframe.to_csv(
self.data_validation_config.valid_train_file_path, index=False, header=True
)
test_dataframe.to_csv(
self.data_validation_config.valid_test_file_path, index=False, header=True
)
data_validation_artifact = DataValidationArtifact(
validation_status=standing,
valid_train_file_path=self.data_validation_config.valid_train_file_path,
valid_test_file_path=self.data_validation_config.valid_test_file_path,
invalid_train_file_path=None,
invalid_test_file_path=None,
drift_report_file_path=self.data_validation_config.drift_report_file_path,
)
return data_validation_artifact
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e,sys)
Beneath is the drift report generated by the Knowledge Validation.
Air temperature [K]:
drift_status: true
p_value: 0.016622943467175914
Failure Sort:
drift_status: false
p_value: 1.0
Course of temperature [K]:
drift_status: false
p_value: 0.052940072765804994
Product ID:
drift_status: false
p_value: 0.09120557172716418
Rotational pace [rpm]:
drift_status: false
p_value: 0.2673520066245566
Goal:
drift_status: false
p_value: 0.999999998717466
Software put on [min]:
drift_status: false
p_value: 0.13090856779628832
Torque [Nm]:
drift_status: false
p_value: 0.5001773464540389
Sort:
drift_status: false
p_value: 1.0
UDI:
drift_status: true
p_value: 0.022542489133976953
Knowledge Transformation
Right here, we’ll clear and rework the validated information by changing temperature options from Kelvin to Celsius, dropping pointless columns, and making use of a change pipeline that makes use of ordinal encoding and Min-Max scaling, and dealing with information imbalance utilizing SMOTEENN.
The reworked coaching and check datasets are saved as .npy information together with a serialized preprocessing object i.e. Minmax scaler(preprocessing.pkl) all encapsulated as an artifact for additional mannequin coaching.
class DataTransformation:
def __init__(self,data_validation_artifact: DataValidationArtifact,
data_transformation_config: DataTransformationConfig):
strive:
self.data_validation_artifact: DataValidationArtifact = data_validation_artifact
self.data_transformation_config: DataTransformationConfig = data_transformation_config
self._schema_config = read_yaml_file(file_path=SCHEMA_FILE_PATH)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e,sys)
@staticmethod
def read_data(file_path) -> pd.DataFrame:
strive:
return pd.read_csv(file_path)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def get_data_transformer_object(self):
strive:
logging.information("Bought numerical cols from schema config")
scaler = MinMaxScaler()
# Fetching classes for OrdinalEncoder from schema config
ordinal_categories = self._schema_config.get('ordinal_categories', [])
ordinal_encoder = OrdinalEncoder(classes=ordinal_categories)
logging.information("Initialized MinMaxScaler, OrdinalEncoder with classes")
ordinal_columns = self._schema_config['ordinal_columns']
scaling_features = self._schema_config['scaling_features']
preprocessor = ColumnTransformer(
[
("Ordinal_Encoder", ordinal_encoder, ordinal_columns),
("MinMaxScaling", scaler, scaling_features)
]
)
logging.information("Created preprocessor object from ColumnTransformer")
return preprocessor
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def initiate_data_transformation(self) -> DataTransformationArtifact:
strive:
logging.information("Beginning information transformation")
preprocessor = self.get_data_transformer_object()
train_df = DataTransformation.read_data(self.data_validation_artifact.valid_train_file_path)
test_df = DataTransformation.read_data(self.data_validation_artifact.valid_test_file_path)
input_feature_train_df = train_df.drop(columns=[TARGET_COLUMN], axis=1)
target_feature_train_df = train_df[TARGET_COLUMN]
input_feature_train_df['Air temperature [c]'] = input_feature_train_df['Air temperature [K]'] - 273.15
input_feature_train_df['Process temperature [c]'] = input_feature_train_df['Process temperature [K]'] - 273.15
drop_cols = self._schema_config['drop_columns']
input_feature_train_df = drop_columns(df=input_feature_train_df, cols = drop_cols)
logging.information("Accomplished dropping the columns for Coaching dataset")
input_feature_test_df = test_df.drop(columns=[TARGET_COLUMN], axis=1)
target_feature_test_df = test_df[TARGET_COLUMN]
input_feature_test_df['Air temperature [c]'] = input_feature_test_df['Air temperature [K]'] - 273.15
input_feature_test_df['Process temperature [c]'] = input_feature_test_df['Process temperature [K]'] - 273.15
drop_cols = self._schema_config['drop_columns']
input_feature_test_df = drop_columns(df=input_feature_test_df, cols = drop_cols)
logging.information("Accomplished dropping the columns for Testing dataset")
input_feature_train_arr = preprocessor.fit_transform(input_feature_train_df)
input_feature_test_arr = preprocessor.rework(input_feature_test_df)
smt = SMOTEENN(sampling_strategy="minority")
input_feature_train_final, target_feature_train_final = smt.fit_resample(
input_feature_train_arr, target_feature_train_df
)
logging.information("Utilized SMOTEENN on coaching dataset")
input_feature_test_final, target_feature_test_final = smt.fit_resample(
input_feature_test_arr, target_feature_test_df
)
train_arr = np.c_[
input_feature_train_final, np.array(target_feature_train_final)
]
test_arr = np.c_[
input_feature_test_final, np.array(target_feature_test_final)
]
save_numpy_array_data(self.data_transformation_config.transformed_train_file_path, array=train_arr, )
save_numpy_array_data(self.data_transformation_config.transformed_test_file_path,array=test_arr,)
save_object( self.data_transformation_config.transformed_object_file_path, preprocessor,)
save_object( "final_model/preprocessor.pkl", preprocessor,)
data_transformation_artifact=DataTransformationArtifact(
transformed_object_file_path=self.data_transformation_config.transformed_object_file_path,
transformed_train_file_path=self.data_transformation_config.transformed_train_file_path,
transformed_test_file_path=self.data_transformation_config.transformed_test_file_path
)
return data_transformation_artifact
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
Coaching and Analysis
Now we’ll practice totally different classification fashions on the reworked coaching information like Choice Tree, Random Forest, Gradient Boosting, Logistic Regression, and AdaBoost fashions and consider their efficiency utilizing analysis metrics like f1-score, precision, and recall, after which log these particulars with MLflow.
After choosing the right mannequin, it saves each the mannequin and transformation object domestically to artifact and analysis metrics to make use of it for deployment.
class ModelTrainer:
def __init__(self, data_transformation_artifact: DataTransformationArtifact, model_trainer_config: ModelTrainerConfig):
strive:
self.data_transformation_artifact = data_transformation_artifact
self.model_trainer_config = model_trainer_config
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def track_mlflow(self, best_model, classification_metric, input_example):
"""
Log mannequin and metrics to MLflow.
Args:
best_model: The educated mannequin object.
classification_metric: The classification metrics (f1, precision, recall).
input_example: An instance enter information pattern for the mannequin.
"""
strive:
with mlflow.start_run():
f1_score=classification_metric.f1_score
precision_score=classification_metric.precision_score
recall_score=classification_metric.recall_score
mlflow.log_metric("f1_score",f1_score)
mlflow.log_metric("precision",precision_score)
mlflow.log_metric("recall_score",recall_score)
mlflow.sklearn.log_model(best_model,"mannequin", input_example=input_example)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
def train_model(self, X_train, y_train, X_test, y_test ):
"""
Practice a number of fashions and choose the best-performing one primarily based on analysis metrics.
Args:
X_train: Coaching options.
y_train: Coaching labels.
X_test: Testing options.
y_test: Testing labels.
Returns:
ModelTrainerArtifact: An artifact containing particulars in regards to the educated mannequin and its metrics.
"""
fashions = {
"Random Forest": RandomForestClassifier(verbose=1),
"Choice Tree": DecisionTreeClassifier(),
"Gradient Boosting": GradientBoostingClassifier(verbose=1),
"Logistic Regression": LogisticRegression(verbose=1),
"AdaBoost": AdaBoostClassifier(),
}
params={
"Choice Tree": {
'criterion':['gini', 'entropy', 'log_loss'],
# 'splitter':['best','random'],
# 'max_features':['sqrt','log2'],
},
"Random Forest":{
# 'criterion':['gini', 'entropy', 'log_loss'],
# 'max_features':['sqrt','log2',None],
'n_estimators': [8,16,32,128,256]
},
"Gradient Boosting":{
# 'loss':['log_loss', 'exponential'],
'learning_rate':[.1,.01,.05,.001],
'subsample':[0.6,0.7,0.75,0.85,0.9],
# 'criterion':['squared_error', 'friedman_mse'],
# 'max_features':['auto','sqrt','log2'],
'n_estimators': [8,16,32,64,128,256]
},
"Logistic Regression":{},
"AdaBoost":{
'learning_rate':[.1,.01,.001],
'n_estimators': [8,16,32,64,128,256]
}
}
model_report: dict = evaluate_models(X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test,
fashions=fashions, param=params)
best_model_score = max(sorted(model_report.values()))
logging.information(f"Finest Mannequin Rating: {best_model_score}")
best_model_name = checklist(model_report.keys())[
list(model_report.values()).index(best_model_score)
]
logging.information(f"Finest Mannequin Title: {best_model_name}")
best_model = fashions[best_model_name]
y_train_pred = best_model.predict(X_train)
classification_train_metric = get_classification_score(y_true=y_train, y_pred=y_train_pred)
input_example = X_train[:1]
print(input_example)
# Monitor the experiments with mlflow
self.track_mlflow(best_model, classification_train_metric, input_example)
y_test_pred=best_model.predict(X_test)
classification_test_metric = get_classification_score(y_true=y_test, y_pred=y_test_pred)
# Monitor the experiments with mlflow
self.track_mlflow(best_model, classification_test_metric, input_example)
preprocessor = load_object(file_path=self.data_transformation_artifact.transformed_object_file_path)
model_dir_path = os.path.dirname(self.model_trainer_config.trained_model_file_path)
os.makedirs(model_dir_path,exist_ok=True)
Machine_Predictive_Model = MachinePredictiveModel(mannequin=best_model)
save_object(self.model_trainer_config.trained_model_file_path,obj=MachinePredictiveModel)
save_object("final_model/mannequin.pkl",best_model)
## Mannequin Coach Artifact
model_trainer_artifact=ModelTrainerArtifact(trained_model_file_path=self.model_trainer_config.trained_model_file_path,
train_metric_artifact=classification_train_metric,
test_metric_artifact=classification_test_metric
)
logging.information(f"Mannequin coach artifact: {model_trainer_artifact}")
return model_trainer_artifact
def initiate_model_trainer(self) -> ModelTrainerArtifact:
strive:
train_file_path = self.data_transformation_artifact.transformed_train_file_path
test_file_path = self.data_transformation_artifact.transformed_test_file_path
#loading coaching array and testing array
train_arr = load_numpy_array_data(train_file_path)
test_arr = load_numpy_array_data(test_file_path)
X_train, y_train, X_test, y_test = (
train_arr[:, :-1],
train_arr[:, -1],
test_arr[:, :-1],
test_arr[:, -1],
)
model_trainer_artifact=self.train_model(X_train,y_train,X_test,y_test)
return model_trainer_artifact
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e, sys)
Now, let’s create a training_pipeline.py file the place we sequentially combine all of the steps of knowledge ingestion, validation, transformation, and mannequin coaching into a whole pipeline.
def run_pipeline(self):
"""
Executes your complete coaching pipeline.
Returns:
ModelTrainerArtifact: Accommodates metadata in regards to the educated mannequin.
"""
strive:
data_ingestion_artifact= self.data_ingestion()
data_validation_artifact= self.data_validation(data_ingestion_artifact=data_ingestion_artifact)
data_transformation_artifact= self.data_transformation(data_validation_artifact=data_validation_artifact)
model_trainer_artifact= self.model_trainer(data_transformation_artifact=data_transformation_artifact)
self.sync_artifact_dir_to_s3()
self.sync_saved_model_dir_to_s3()
return model_trainer_artifact
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e,sys)
You’ll be able to visually see how we’ve got constructed the training_pipeline.

Now that we’ve accomplished creating the pipeline, run the training_pipeline.py file to view the artifacts generated within the earlier steps.
python training_pipeline.py
Run the next command to view the MLflow dashboard.
mlflow ui # Launch the MLflow dashboard to watch experiments.
As you possibly can see, we’ve got efficiently logged mannequin metrics like recall, precision, and F1-score in MLflow.

AWS Integration
After operating the training_pipeline.py file, we generated artifacts and saved them domestically. Now, we’ll retailer these artifacts in an AWS S3 bucket to allow cloud-based storage and accessibility.
Utilizing a Docker picture, we’ll push it to AWS ECR through GitHub Actions after which deploy it to manufacturing utilizing AWS EC2. We are going to talk about this course of in additional element within the upcoming sections.

AWS S3
Comply with these steps to create an AWS S3 bucket:
Step1: Obtain AWS CLI
- You’ll be able to click on right here to obtain AWS CLI for Home windows, Linux, and macOS
Step2: Log in to AWS IAM Consumer
- You’ll be able to sign up to the Console together with your IAM consumer credentials and choose the suitable AWS area (eg., us-east-1).
- As soon as logged in, seek for ‘IAM’ within the AWS search bar. Navigate to ‘Customers,’ choose your username, and go to the ‘Safety Credentials’ tab. Beneath ‘Entry Keys,’ click on ‘Create entry key,’ select ‘CLI,’ after which affirm by clicking ‘Create entry key’.
- Now, open your terminal and sort aws configure. Enter your AWS Entry Key and Secret Entry Key when prompted, then press Enter.
Your IAM consumer has been efficiently related to your undertaking.
Step3: Navigate to S3 Service
- As soon as logged in, seek for “S3” within the AWS search bar.
- Click on on Amazon S3 service and choose the “Create bucket” button. Subsequent, sort in a singular bucket title (eg., machinepredictive)
- Evaluate the configuration settings, after which click on on “Create bucket.”
Your bucket is now created, and you can begin importing artifacts. Now, add the next code to your training_pipeline.py file and run it once more to see the artifacts in your AWS S3 bucket.
def sync_artifact_dir_to_s3(self):
"""
Syncs the artifact listing to S3.
"""
strive:
aws_bucket_url = f"s3://{TRAINING_BUCKET_NAME}/artifact/{self.training_pipeline_config.timestamp}"
self.s3_sync.sync_folder_to_s3(folder = self.training_pipeline_config.artifact_dir,aws_bucket_url=aws_bucket_url)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e,sys)
def sync_saved_model_dir_to_s3(self):
"""
Syncs the saved mannequin listing to S3.
"""
strive:
aws_bucket_url = f"s3://{TRAINING_BUCKET_NAME}/final_model/{self.training_pipeline_config.timestamp}"
self.s3_sync.sync_folder_to_s3(folder = self.training_pipeline_config.model_dir,aws_bucket_url=aws_bucket_url)
besides Exception as e:
elevate MachinePredictiveMaintenanceException(e,sys)

Amazon Elastic Container Registry (ECR)
Comply with these steps to create an ECR repository.
- Seek for ‘ECR’ within the AWS search bar.
- Click on Create Repository, and choose Non-public Repository.
- Present a repository title (e.g., machinepredictive) and click on Create.

Copy the URI out of your ECR repository, which ought to look one thing like “788614365622.dkr.ecr.ap-southeast-2.amazonaws.com“, and put it aside someplace. We are going to want it later to stick into GitHub Secrets and techniques.
Docker Integration for Deployment
Dockerizing our undertaking ensures it runs easily in any surroundings with out dependency points. It’s a must have device for packaging and sharing purposes effortlessly.
Retaining the Docker picture as small as potential is necessary for effectivity. We will reduce its dimension by utilizing strategies like multi-staging and selecting a light-weight Python base picture.
FROM python:3.10-slim-buster
WORKDIR /app
COPY . /app
RUN apt replace -y && apt set up awscli -y
RUN apt-get replace && pip set up -r necessities.txt
CMD ["python3", "app.py"]
Setup Motion Secrets and techniques and Variables
To securely retailer delicate data like AWS credentials and repository URIs, we have to arrange GitHub Motion Secrets and techniques in our repository. Comply with these steps:
Step1: Open Your GitHub Repository
Navigate to your repository on GitHub.
Step2: Go to Settings
On the high of your repository web page, find and click on on the “Settings” tab.
Step3: Entry Secrets and techniques and Variables
Within the left sidebar, scroll right down to Secrets and techniques and Variables → Choose Actions.
Step4: Create a New Secret
Click on the New repository secret button.
Step5: Add AWS Credentials
- Create a secret with the title “AWS_ACCESS_KEY_ID” and paste your AWS Entry Key.
- Create one other secret named AWS_SECRET_ACCESS_KEY and paste your AWS Secret Entry Key.
- create a secret named “AWS_REGION” and paste your chosen area (eg., us-east-1).
Step6: Add AWS ECR Repository URI
- Copy your ECR repository URI (e.g., 788614365622.dkr.ecr.ap-southeast-2.amazonaws.com).
- Create a brand new secret named “AWS_ECR_LOGIN_URI” and paste the copied URI.
- Create a brand new secret named “ECR_REPOSITORY_NAME” and paste the title of the ECR repository e.g., machinepredictive).

AWS EC2
Now let’s perceive methods to create an occasion with AWS EC2. Comply with these steps.
- Navigate to the EC2 service within the AWS Administration Console and click on Launch an occasion.
- Title your occasion (e.g., machinepredictive) and choose Ubuntu as your working system.
- Select the occasion sort as t2.giant.
- Choose your default key pair.
- Beneath community safety, select the default VPC and configure the safety teams as wanted.
- Lastly, click on Launch Occasion.
AWS EC2 CLI
After creating the occasion named machinepredictive, choose its Occasion ID and click on Join. Then, below EC2 Occasion Join, click on Join once more. This can open an AWS CLI interface the place you possibly can run the mandatory instructions.
Now, enter these instructions within the CLI one after the other to arrange Docker in your EC2 occasion:
sudo apt-get replace -y
sudo apt-get improve
curl -fsSL https://get.docker.com -o get-docker.sh
sudo sh get-docker.sh
sudo usermod -aG docker ubuntu
newgrp docker
Preserve your CLI session open. Subsequent, navigate to your GitHub repository, click on on Settings, then go to the Actions part, and below Runners, click on New self-hosted runner. Select the Linux runner picture. Now, run the next instructions one after the other in your CLI to obtain and configure your self-hosted runner:
# Create a folder and navigate into it
mkdir actions-runner && cd actions-runner
# Obtain the newest runner bundle
curl -o actions-runner-linux-x64-2.322.0.tar.gz -L https://github.com/actions/runner/releases/obtain/v2.322.0/actions-runner-linux-x64-2.322.0.tar.gz
# Non-obligatory: Validate the hash
echo "b13b784808359f31bc79b08a191f5f83757852957dd8fe3dbfcc38202ccf5768 actions-runner-linux-x64-2.322.0.tar.gz" | shasum -a 256 -c
# Extract the installer
tar xzf ./actions-runner-linux-x64-2.322.0.tar.gz
# Configure the runner
./config.sh --url https://github.com/karthikponna/Predictive_Maintenance_MLOps --token "Paste your token right here"
# Run the runner
./run.sh
This can obtain, configure, and begin your self-hosted GitHub Actions runner in your EC2 occasion.
After organising the self-hosted GitHub Actions runner, the CLI will immediate you to enter a reputation for the runner. Sort self-hosted and press Enter.
CI/CD with GitHub Actions
You’ll be able to take a look at the .github/workflows/principal.yml code file.
Now, let’s dive into what every part of this principal.yml file does.
- Steady Integration Job: This job runs on an Ubuntu runner to take a look at the code, lint it, and execute unit checks.
- Steady Supply Job: Triggered after CI, this job installs utilities, configures AWS credentials, logs into Amazon ECR, and builds, tags, and pushes your Docker picture to ECR.
- Steady Deployment Job: Operating on a self-hosted runner, this job pulls the newest Docker picture from ECR, runs it as a container to serve customers, and cleans up any earlier pictures or containers.

As soon as the Steady Deployment Job completes efficiently, you’ll see an output like this within the AWS CLI.

Open your AWS EC2 occasion by clicking its Occasion ID. Confirm that the occasion state is Operating and find the Public IPv4 DNS. Click on on the “Open tackle” choice to robotically launch your FastAPI utility in your browser. Now, let’s dive into FastAPI.
FastAPI
You’ll be able to take a look at the app.py file.
On this FastAPI utility, I’ve created two principal routes, one for coaching the mannequin and one other for producing predictions. Let’s discover every route intimately.

Practice route
The /practice endpoint begins the mannequin coaching course of utilizing a predefined dataset, making it simple to replace and enhance the mannequin with new information. It’s particularly helpful for retraining the mannequin to enhance accuracy or incorporate new information, making certain the mannequin stays up-to-date and performs optimally.

Predict route
The /predict endpoint accepts a CSV file through POST. This endpoint handles incoming information, leverages the educated mannequin to generate predictions, after which returns the outcomes formatted as JSON. This route is ideal for making use of the mannequin to new datasets, making it environment friendly for large-scale prediction duties.
Within the /predict route, we’ve included a pattern check.csv file; you possibly can obtain it right here.

Conclusion
Collectively, we constructed a full production-ready Predictive Upkeep MLOps undertaking—from gathering and preprocessing information to coaching, evaluating, and deploying our mannequin utilizing Docker, AWS, and FastAPI. This undertaking exhibits how MLOps can bridge the hole between growth and manufacturing, making it simpler to construct sturdy, scalable options.
Bear in mind, this information is all about studying and making use of these strategies to your information science initiatives. Don’t hesitate to experiment, innovate, and discover new enhancements as you progress. Thanks for sticking with me till the top—continue to learn, maintain doing, and continue to grow!
GitHub repo: https://github.com/karthikponna/Predictive_Maintenance_MLOps
Key Takeaways
- We constructed an end-to-end MLOps pipeline that covers information ingestion, mannequin coaching, and deployment.
- Docker, AWS, and FastAPI work collectively seamlessly to maneuver from growth to manufacturing.
- Dockerizing our ML undertaking is vital—it ensures it runs easily in any surroundings with none dependency complications.
- Steady deployment ensures the mannequin stays environment friendly and up-to-date in real-world purposes.
Regularly Requested Questions
A. Docker ensures our ML undertaking runs easily in any surroundings by eliminating dependency points.
A. AWS providers like EC2, S3, and ECR allow seamless deployment, storage, and scaling of our utility.
A. MLflow makes machine studying growth simpler by providing instruments for monitoring experiments, versioning fashions, and deploying them.
A. GitHub Actions automates the CI/CD course of—operating checks, constructing Docker pictures, and deploying updates—making certain a easy transition from growth to manufacturing.
The media proven on this article is just not owned by Analytics Vidhya and is used on the Creator’s discretion.
Login to proceed studying and revel in expert-curated content material.