Skip to content
Snippets Groups Projects
Commit a78af924 authored by Ryan Godwin's avatar Ryan Godwin
Browse files

It's working

parent aba6b8ba
No related branches found
No related tags found
1 merge request!1Progress toward getting the evaluation integrated
......@@ -29,8 +29,4 @@ RUN chown -R vscode:vscode /usr/local/lib/R/site-library
USER vscode
# [Optional] If your R requirements rarely change, uncomment this section to add them to the image.
# RUN mkdir -p /tmp/r
# COPY docs/packages.sh /tmp/r
# RUN /bin/bash /tmp/r/packages.sh && rm -rf /tmp/r
......@@ -3,4 +3,10 @@
# to make a blank database
#sqlite3 mlflow.db "VACUUM;"
git flow init -d
cd src/
python3 -m pip install pip setuptools wheel
python3 -m pip install -e ".[dev]"
cd ../
export PYTHONPATH="${PYTHONPATH}:/workspaces/hrv_meg/GeneralStore"
mlflow server --backend-store-uri sqlite:////data/DATASCI/lab_notebook/mlflow.db --default-artifact-root /data/DATASCI/lab_notebook/mlruns
\ No newline at end of file
GeneralStore @ f04db658
Subproject commit b68e30b04425de8e32302801f3681a856feb2591
Subproject commit f04db6583cb0b4e255f138880e414445034aa1bc
mlflow==1.26.1
scikit-learn==1.1.1
optbinning==0.14.1
pandas==1.4.3
openpyxl==3.0.10
numpy==1.23.0
matplotlib==3.5.2
scikit-learn-intelex==2021.6.3
shap==0.41.0
numpyencoder==0.3.0
argparse==1.4.0
mkdocs==1.3.1
mkdocstrings==0.18.1
rich==12.5.1
black==22.6.0
flake8==5.0.2
typer==0.6.1
jupyter
pillow
opencv-python
mne
mlflow
mpld3
mne-icalabel
scipy
tensorflow
# config.py
import logging
import sys
from pathlib import Path
......@@ -8,19 +9,16 @@ from rich.logging import RichHandler
# Directories
BASE_DIR = Path(__file__).parent.parent.absolute()
CONFIG_DIR = Path(BASE_DIR, "config")
DATA_DIR = Path(BASE_DIR, "data")
STORES_DIR = Path(BASE_DIR, "stores")
DATA_DIR = Path("/data/DATASCI")
RAW_DATA = Path(DATA_DIR, "raw")
INTERMEDIATE_DIR = Path(DATA_DIR, "intermediate")
LOGS_DIR = Path(BASE_DIR, "logs")
RESULTS_DIR = Path(DATA_DIR, "results")
# Stores
MODEL_REGISTRY = Path(STORES_DIR, "model")
BLOB_STORE = Path(STORES_DIR, "blob")
# Create dirs
DATA_DIR.mkdir(parents=True, exist_ok=True)
MODEL_REGISTRY.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)
BLOB_STORE.mkdir(parents=True, exist_ok=True)
# Assets
# A folder containing RAW MEG data (CTF -> .ds folders)
MEG_DATA_DIR = Path(RAW_DATA, "MEG_Data")
ECG_DATA_DIR = Path(RAW_DATA, "BioPac_Data")
# MLFlow model registry
mlflow.set_tracking_uri("http://localhost:5000")
......
import os
import Label_ICA_Components
import Get_RR_Component
import MNE_Processing
import pandas as pd
import traceback
current_dir = os.path.dirname(os.path.realpath(__file__))
input_dir = "/home/ec2-user/MEGNET/input_data/"
output_dir = os.path.join(current_dir, "output")
# list of all ICA values and images for each scan in the input directory
scan_files = os.listdir(input_dir)
scan_files = [scan_file.split(".")[0] for scan_file in scan_files]
# remove potential duplicates
scan_files = list(set(scan_files))
def process(scan_file):
try:
output_sub_dir = os.path.join(output_dir, scan_file)
full_input_path = os.path.join(input_dir, scan_file)
full_input_path = f"{full_input_path}.ds"
if not os.path.exists(output_sub_dir):
os.makedirs(output_sub_dir)
full_csv_output_path = os.path.join(output_sub_dir, scan_file + "_HR_component.csv")
print(f"Now processing: {scan_file}")
MNE_proc = MNE_Processing.MNE_Processor(
full_input_path, output_sub_dir, full_csv_output_path
)
MNE_proc.process()
###############################################################################
# feed CSV component filepath to Label_ICA_Components.py so MEGNET knows what files to work with
label_ICA = Label_ICA_Components.label_ICA_components(full_csv_output_path)
# get best heartifact after running MEGNET on the MATLAB file above
heartifact = label_ICA.fPredictICA()
# filter dataframe to only heartifact and Time index
indices_to_filter = [0, heartifact + 1]
df = pd.read_csv(full_csv_output_path)
df = df.iloc[:, indices_to_filter]
df.to_csv(full_csv_output_path, index=False)
###############################################################################
# feed .mat filepath and heartifact list to Get_RR_Intervals.py to get RR intervals for each heartifact
getRR = Get_RR_Component.GetRRComponent(
output_sub_dir, scan_file, full_csv_output_path, heartifact
)
# getRR = Get_RR_Intervals.GetRRIntervals(scan_file, heartifacts)
getRR.get_RR_intervals()
except Exception:
print(traceback.format_exc())
for scan_file in scan_files:
process(scan_file)
# pool = multiprocessing.Pool(128)
# pool.map(process, scan_files)
import json
import tempfile
import warnings
from argparse import Namespace
from pathlib import Path
from typing import Dict
import joblib
import mlflow
import optuna
import pandas as pd
import typer
from numpyencoder import NumpyEncoder
from optuna.integration.mlflow import MLflowCallback
from config import config
from config.config import logger
from tagifai import data, predict, train, utils
warnings.filterwarnings("ignore")
from hrvmeg import data, evaluate, predict
from py_utils import get_files
from utils import meg_to_ica, predict_and_vote
# Initialize Typer CLI app
app = typer.Typer()
warnings.filterwarnings("ignore")
@app.command()
def etl_data():
"""Extract, load and transform our data assets."""
# Extract
projects = utils.load_json_from_url(url=config.PROJECTS_URL)
tags = utils.load_json_from_url(url=config.TAGS_URL)
def extract_ica_from_meg(
args_fp: str = "config/args.json",
experiment_name: str = "baselines",
run_name: str = "sgd",
test_run: bool = False,
) -> None:
#recursively scan through folders for .ds (CTF) files
scan_files = get_files.get_files(config.MEG_DATA_DIR,".ds")
scan_files = [x for x in scan_files if Path(x).suffix == ".ds" ]
#Ignore the hz.ds files
scan_files = [x for x in scan_files if Path(x).stem != "hz"]
# removing any duplicates
scan_files = list(set(scan_files))
for scan_file in scan_files:
print(f"Now processing: {scan_file}", flush=True)
output_sub_dir = Path(config.INTERMEDIATE_DIR,scan_file.stem)
output_sub_dir.mkdir(parents=True, exist_ok=True)
full_csv_output_path = Path(output_sub_dir, "ICA.csv")
MNE_proc = meg_to_ica.MNEProcessor(
scan_file, output_sub_dir, full_csv_output_path
)
MNE_proc.process()
# Transform
df = pd.DataFrame(projects)
df = df[df.tag.notnull()] # drop rows w/ no tag
# if not os.path.exists(output_sub_dir):
# os.makedirs(output_sub_dir)
# Load
projects_fp = Path(config.DATA_DIR, "projects.json")
utils.save_dict(d=df.to_dict(orient="records"), filepath=projects_fp)
tags_fp = Path(config.DATA_DIR, "tags.json")
utils.save_dict(d=tags, filepath=tags_fp)
# full_csv_output_path = os.path.join(output_sub_dir, scan_file + "_HR_component.csv")
logger.info("✅ ETL on data is complete!")
# ###############################################################################
# # feed CSV component filepath to Label_ICA_Components.py so MEGNET knows what files to work with
# label_ICA = predict.label_ICA_components(full_csv_output_path)
@app.command()
def label_data(args_fp: str = "config/args.json") -> None:
"""Label data with constraints.
Args:
args_fp (str): location of args.
"""
# Load projects
projects_fp = Path(config.DATA_DIR, "projects.json")
projects = utils.load_dict(filepath=projects_fp)
df = pd.DataFrame(projects)
# Load tags
tags_dict = {}
tags_fp = Path(config.DATA_DIR, "tags.json")
for item in utils.load_dict(filepath=tags_fp):
key = item.pop("tag")
tags_dict[key] = item
# Label with constrains
args = Namespace(**utils.load_dict(filepath=args_fp))
df = df[df.tag.notnull()] # remove projects with no label
df = data.replace_oos_labels(df=df, labels=tags_dict.keys(), label_col="tag", oos_label="other")
df = data.replace_minority_labels(
df=df, label_col="tag", min_freq=args.min_freq, new_label="other"
)
# Save clean labeled data
labeled_projects_fp = Path(config.DATA_DIR, "labeled_projects.json")
utils.save_dict(d=df.to_dict(orient="records"), filepath=labeled_projects_fp)
logger.info("✅ Saved labeled data!")
# # get best heartifact after running MEGNET on the MATLAB file above
# heartifact = label_ICA.fPredictICA()
# # filter dataframe to only heartifact and Time index
# indices_to_filter = [0, heartifact + 1]
# df = pd.read_csv(full_csv_output_path)
# df = df.iloc[:, indices_to_filter]
# df.to_csv(full_csv_output_path, index=False)
# ###############################################################################
# # feed .mat filepath and heartifact list to Get_RR_Intervals.py to get RR intervals for each heartifact
# getRR = data.GetRRComponent(output_sub_dir, scan_file, full_csv_output_path, heartifact)
# # getRR = Get_RR_Intervals.GetRRIntervals(scan_file, heartifacts)
# getRR.get_RR_intervals()
@app.command()
......@@ -82,128 +79,24 @@ def train_model(
run_name: str = "sgd",
test_run: bool = False,
) -> None:
"""Train a model given arguments.
Args:
args_fp (str): location of args.
experiment_name (str): name of experiment.
run_name (str): name of specific run in experiment.
test_run (bool, optional): If True, artifacts will not be saved. Defaults to False.
"""
# Load labeled data
projects_fp = Path(config.DATA_DIR, "labeled_projects.json")
projects = utils.load_dict(filepath=projects_fp)
df = pd.DataFrame(projects)
# Train
args = Namespace(**utils.load_dict(filepath=args_fp))
mlflow.set_experiment(experiment_name=experiment_name)
with mlflow.start_run(run_name=run_name):
run_id = mlflow.active_run().info.run_id
logger.info(f"Run ID: {run_id}")
artifacts = train.train(df=df, args=args)
performance = artifacts["performance"]
logger.info(json.dumps(performance, indent=2))
# Log metrics and parameters
performance = artifacts["performance"]
mlflow.log_metrics({"precision": performance["overall"]["precision"]})
mlflow.log_metrics({"recall": performance["overall"]["recall"]})
mlflow.log_metrics({"f1": performance["overall"]["f1"]})
mlflow.log_params(vars(artifacts["args"]))
# Log artifacts
with tempfile.TemporaryDirectory() as dp:
utils.save_dict(vars(artifacts["args"]), Path(dp, "args.json"), cls=NumpyEncoder)
artifacts["label_encoder"].save(Path(dp, "label_encoder.json"))
joblib.dump(artifacts["vectorizer"], Path(dp, "vectorizer.pkl"))
joblib.dump(artifacts["model"], Path(dp, "model.pkl"))
utils.save_dict(performance, Path(dp, "performance.json"))
mlflow.log_artifacts(dp)
# Save to config
if not test_run: # pragma: no cover, actual run
open(Path(config.CONFIG_DIR, "run_id.txt"), "w").write(run_id)
utils.save_dict(performance, Path(config.CONFIG_DIR, "performance.json"))
raise (NotImplementedError)
@app.command()
def optimize(
args_fp: str = "config/args.json", study_name: str = "optimization", num_trials: int = 20
) -> None:
"""Optimize hyperparameters.
Args:
args_fp (str): location of args.
study_name (str): name of optimization study.
num_trials (int): number of trials to run in study.
"""
# Load labeled data
projects_fp = Path(config.DATA_DIR, "labeled_projects.json")
projects = utils.load_dict(filepath=projects_fp)
df = pd.DataFrame(projects)
# Optimize
args = Namespace(**utils.load_dict(filepath=args_fp))
pruner = optuna.pruners.MedianPruner(n_startup_trials=5, n_warmup_steps=5)
study = optuna.create_study(study_name=study_name, direction="maximize", pruner=pruner)
mlflow_callback = MLflowCallback(tracking_uri=mlflow.get_tracking_uri(), metric_name="f1")
study.optimize(
lambda trial: train.objective(args, df, trial),
n_trials=num_trials,
callbacks=[mlflow_callback],
)
# Best trial
trials_df = study.trials_dataframe()
trials_df = trials_df.sort_values(["user_attrs_f1"], ascending=False)
args = {**args.__dict__, **study.best_trial.params}
utils.save_dict(d=args, filepath=args_fp, cls=NumpyEncoder)
logger.info(f"\nBest value (f1): {study.best_trial.value}")
logger.info(f"Best hyperparameters: {json.dumps(study.best_trial.params, indent=2)}")
raise (NotImplementedError)
def load_artifacts(run_id: str = None) -> Dict:
"""Load artifacts for a given run_id.
Args:
run_id (str): id of run to load artifacts from.
Returns:
Dict: run's artifacts.
"""
if not run_id:
run_id = open(Path(config.CONFIG_DIR, "run_id.txt")).read()
# Locate specifics artifacts directory
experiment_id = mlflow.get_run(run_id=run_id).info.experiment_id
artifacts_dir = Path(config.MODEL_REGISTRY, experiment_id, run_id, "artifacts")
# Load objects from run
args = Namespace(**utils.load_dict(filepath=Path(artifacts_dir, "args.json")))
vectorizer = joblib.load(Path(artifacts_dir, "vectorizer.pkl"))
label_encoder = data.LabelEncoder.load(fp=Path(artifacts_dir, "label_encoder.json"))
model = joblib.load(Path(artifacts_dir, "model.pkl"))
performance = utils.load_dict(filepath=Path(artifacts_dir, "performance.json"))
return {
"args": args,
"label_encoder": label_encoder,
"vectorizer": vectorizer,
"model": model,
"performance": performance,
}
raise (NotImplementedError)
@app.command()
def predict_tag(text: str = "", run_id: str = None) -> None:
"""Predict tag for text.
Args:
text (str): input text to predict label for.
run_id (str, optional): run id to load artifacts for prediction. Defaults to None.
"""
if not run_id:
run_id = open(Path(config.CONFIG_DIR, "run_id.txt")).read()
artifacts = load_artifacts(run_id=run_id)
prediction = predict.predict(texts=[text], artifacts=artifacts)
logger.info(json.dumps(prediction, indent=2))
return prediction
raise (NotImplementedError)
if __name__ == "__main__":
......
from utils import fPredictChunkAndVoting
import numpy as np
from tensorflow import keras
import pandas as pd
from tensorflow import keras
from utils import predict_and_vote
class label_ICA_components:
......@@ -14,9 +15,9 @@ class label_ICA_components:
It is set up to be ran from the command line.
Note: Tensroflow does take some time to load, thus running this independently for each subject is
not the most computationally efficient. To increase efficeny, I'd suggest imbedding this
not the most computationally efficient. To increase efficeny, I'd suggest embedding this
function into a pipeline that will load tensorflow and then run multiple subjects at once.
Alternativley, fPredictChunkAndVoting (used in function below) can be applied to N spatial map
Alternativley, predict_and_vote (used in function below) can be applied to N spatial map
and time series pairs. Thus the fPredictICA could be easily modified to be appled to a complete
list of ICA components and ran on many subjects.
......@@ -42,8 +43,8 @@ class label_ICA_components:
The image is being returned as an array of integers.
"""
from PIL import Image
import numpy as np
from PIL import Image
img = Image.open(infilename)
img.load()
......
py_utils @ 6399e6fc
Subproject commit 9bbeb518610af0838ad3fc36551bbf41a6d5a87b
Subproject commit 6399e6fcf9b3138e39acd45631741bc21ce24c90
from pathlib import Path
from setuptools import find_namespace_packages, setup
# Load packages from requirements.txt
......@@ -6,9 +7,9 @@ BASE_DIR = Path(__file__).parent
with open(Path(BASE_DIR, "requirements.txt")) as file:
required_packages = [ln.strip() for ln in file.readlines()]
docs_packages = ["mkdocs==1.3.0", "mkdocstrings==0.18.1"]
docs_packages = ["mkdocs==1.3.1", "mkdocstrings==0.18.1"]
style_packages = ["black==22.3.0", "flake8==3.9.2", "isort==5.10.1"]
style_packages = ["black==22.6.0", "flake8==5.0.2", "isort==5.10.1"]
# Define our package
setup(
......@@ -21,8 +22,5 @@ setup(
python_requires=">=3.7",
packages=find_namespace_packages(),
install_requires=[required_packages],
extras_require={
"dev": docs_packages + style_packages,
"docs": docs_packages
},
)
\ No newline at end of file
extras_require={"dev": docs_packages + style_packages, "docs": docs_packages},
)
import multiprocessing
import os
import mne
import numpy as np
from mne.preprocessing import ICA
class MNEProcessor:
def __init__(self, scan_path, output_dir, csv_output_path):
self.scan_path = scan_path
self.output_dir = output_dir
self.csv_output_path = csv_output_path
def process(self):
"""
It reads in a CTF file, filters it, resamples it, runs ICA on it, and then saves the ICA
components to a CSV file
"""
# process with every available thread
os.environ["OMP_NUM_THREADS"] = str(multiprocessing.cpu_count())
# os.environ['OMP_NUM_THREADS'] = '1'
mne.set_log_level("ERROR")
raw = mne.io.read_raw_ctf(
self.scan_path,
verbose="warning",
preload=True,
system_clock="truncate",
clean_names=False,
)
sfreq = raw.info["sfreq"]
# Unused so ignoring
# mag_picks = mne.pick_types(raw.info, meg=True, eeg=False, misc=False)
# eeg_picks = mne.pick_types(raw.info, meg=False, eeg=True, misc=False)
misc_picks = mne.pick_types(raw.info, meg=False, eeg=False, misc=True)
all_picks = mne.pick_types(raw.info, meg=True, eeg=True, misc=True)
freq_to_filter = 60 # remove 60 Hz sig from power lines
notch_filtered = mne.io.Raw.notch_filter(
raw,
np.arange(freq_to_filter, sfreq / 2.0, freq_to_filter),
picks=all_picks,
phase="zero",
)
lower_freq_cutoff = 0.5
upper_freq_cutoff = 100
bandpass_filtered = mne.io.Raw.filter(
notch_filtered, lower_freq_cutoff, upper_freq_cutoff, picks=all_picks
)
bandpass_filtered_resampled = bandpass_filtered.resample(500)
method = "infomax"
if method == "infomax":
fit_params = dict(extended=True)
elif method == "fastica":
fit_params = dict()
# reject = dict(mag=5e-12, grad=4000e-13)
ica = ICA(n_components=20, method="infomax", fit_params=fit_params, random_state=0)
ica.fit(bandpass_filtered_resampled, picks="data")
ecg_index = list(range(20))
ica_sources = ica.get_sources(bandpass_filtered_resampled)
data, times = bandpass_filtered_resampled[misc_picks[:1]]
time = ica_sources[ecg_index[0]][1].reshape(
len(times),
)
sigi = []
try:
for i in ecg_index:
sigi.append(
ica_sources[i][0].reshape(
len(times),
)
)
except Exception as err:
print(err)
appended_output = np.c_[time, np.array(sigi).T]
np.savetxt(
self.csv_output_path,
appended_output,
fmt="%1.4e",
delimiter=",",
header="Time, IC1, IC2, IC3, IC4, IC5, IC6, IC7, IC8, IC9, IC10, IC11, IC12, IC13, IC14, IC15, IC16, IC17, IC18, IC19, IC20",
)
import pandas as pd
import numpy as np
import pandas as pd
idx = pd.IndexSlice
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment