Skip to content
Snippets Groups Projects
Commit e85bc97c authored by ='s avatar =
Browse files

Plopping in old code

parent 57439a64
No related branches found
No related tags found
1 merge request!1Progress toward getting the evaluation integrated
Showing with 1503 additions and 2 deletions
......@@ -43,7 +43,7 @@ RUN echo "LIBRARY_PATH=${LIBRARY_PATH}" >> /etc/bash.bashrc && echo "LIBRARY_PAT
# [Optional] If your pip requirements rarely change, uncomment this section to add them to the image.
COPY docs/requirements.txt /tmp/pip-tmp/
COPY requirements.txt /tmp/pip-tmp/
RUN pip3 --disable-pip-version-check --no-cache-dir install -r /tmp/pip-tmp/requirements.txt \
&& rm -rf /tmp/pip-tmp
# RUN pip3 uninstall lightgbm -y && pip3 install wheel && \
......
......@@ -51,7 +51,7 @@
// //"cd /data/DATASCI/lab_notebook",
// "/bin/bash conda init",
// "/bin/bash conda activate"
"./startup.sh"
"Docker/startup.sh"
],
......
File moved
import logging
import sys
from pathlib import Path
import mlflow
from rich.logging import RichHandler
# Directories
BASE_DIR = Path(__file__).parent.parent.absolute()
CONFIG_DIR = Path(BASE_DIR, "config")
DATA_DIR = Path(BASE_DIR, "data")
STORES_DIR = Path(BASE_DIR, "stores")
LOGS_DIR = Path(BASE_DIR, "logs")
# Stores
MODEL_REGISTRY = Path(STORES_DIR, "model")
BLOB_STORE = Path(STORES_DIR, "blob")
# Create dirs
DATA_DIR.mkdir(parents=True, exist_ok=True)
MODEL_REGISTRY.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)
BLOB_STORE.mkdir(parents=True, exist_ok=True)
# MLFlow model registry
mlflow.set_tracking_uri("file://" + str(MODEL_REGISTRY.absolute()))
# Logger
logging_config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"minimal": {"format": "%(message)s"},
"detailed": {
"format": "%(levelname)s %(asctime)s [%(name)s:%(filename)s:%(funcName)s:%(lineno)d]\n%(message)s\n"
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"stream": sys.stdout,
"formatter": "minimal",
"level": logging.DEBUG,
},
"info": {
"class": "logging.handlers.RotatingFileHandler",
"filename": Path(LOGS_DIR, "info.log"),
"maxBytes": 10485760, # 1 MB
"backupCount": 10,
"formatter": "detailed",
"level": logging.INFO,
},
"error": {
"class": "logging.handlers.RotatingFileHandler",
"filename": Path(LOGS_DIR, "error.log"),
"maxBytes": 10485760, # 1 MB
"backupCount": 10,
"formatter": "detailed",
"level": logging.ERROR,
},
},
"root": {
"handlers": ["console", "info", "error"],
"level": logging.INFO,
"propagate": True,
},
}
logging.config.dictConfig(logging_config)
logger = logging.getLogger()
logger.handlers[0] = RichHandler(markup=True)
# Assets
PROJECTS_URL = (
"https://raw.githubusercontent.com/GokuMohandas/Made-With-ML/main/datasets/projects.json"
)
TAGS_URL = "https://raw.githubusercontent.com/GokuMohandas/Made-With-ML/main/datasets/tags.json"
# Data
logs/
stores/
# VSCode
.vscode/
.idea
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Flask:
instance/
.webassets-cache
# Scrapy:
.scrapy
# Sphinx
docs/_build/
# PyBuilder
target/
# IPython
.ipynb_checkpoints
profile_default/
ipython_config.py
# pyenv
.python-version
# PEP 582
__pypackages__/
# Celery
celerybeat-schedule
celerybeat.pid
# Environment
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# mkdocs
site/
# Airflow
airflow/airflow.db
# MacOS
.DS_Store
\ No newline at end of file
import os
import Label_ICA_Components
import Get_RR_Component
import MNE_Processing
import multiprocessing
import pandas as pd
import traceback
current_dir = os.path.dirname(os.path.realpath(__file__))
input_dir = "/home/ec2-user/MEGNET/input_data/"
output_dir = os.path.join(current_dir, "output")
# list of all ICA values and images for each scan in the input directory
scan_files = os.listdir(input_dir)
scan_files = [scan_file.split('.')[0] for scan_file in scan_files]
# remove potential duplicates
scan_files = list(set(scan_files))
def process(scan_file):
try:
output_sub_dir = os.path.join(output_dir, scan_file)
full_input_path = os.path.join(input_dir, scan_file)
full_input_path = f'{full_input_path}.ds'
if not os.path.exists(output_sub_dir):
os.makedirs(output_sub_dir)
full_csv_output_path = os.path.join(output_sub_dir, scan_file + '_HR_component.csv')
print(f'Now processing: {scan_file}')
MNE_proc = MNE_Processing.MNE_Processor(full_input_path, output_sub_dir, full_csv_output_path)
MNE_proc.process()
###############################################################################
# feed CSV component filepath to Label_ICA_Components.py so MEGNET knows what files to work with
label_ICA = Label_ICA_Components.label_ICA_components(full_csv_output_path)
# get best heartifact after running MEGNET on the MATLAB file above
heartifact = label_ICA.fPredictICA()
# filter dataframe to only heartifact and Time index
indices_to_filter = [0, heartifact + 1]
df = pd.read_csv(full_csv_output_path)
df = df.iloc[:, indices_to_filter]
df.to_csv(full_csv_output_path, index=False)
###############################################################################
# feed .mat filepath and heartifact list to Get_RR_Intervals.py to get RR intervals for each heartifact
getRR = Get_RR_Component.GetRRComponent(output_sub_dir, scan_file, full_csv_output_path, heartifact)
# getRR = Get_RR_Intervals.GetRRIntervals(scan_file, heartifacts)
getRR.get_RR_intervals()
except Exception as err:
print(traceback.format_exc())
for scan_file in scan_files:
process(scan_file)
# pool = multiprocessing.Pool(128)
# pool.map(process, scan_files)
import mpld3
import numpy as np
import matplotlib.pyplot as plt
import scipy.signal as signal
import pandas as pd
import scipy.io
from sklearn import preprocessing
import os
import pyhrv.tools as tools
import pyhrv
import biosppy
from opensignalsreader import OpenSignalsReader
import math
from sklearn.metrics import mean_squared_error
class GetRRComponent():
def __init__(self, output_sub_dir, subject_id, subject_path, ICA_value):
self.subject_path = subject_path
self.subject_id = subject_id
self.output_sub_dir = output_sub_dir
self.ICA_value = ICA_value
def get_RR_intervals(self):
#############################################################
df = pd.DataFrame()
# get time and component from .CSV file
arr_Time_Series = pd.read_csv(self.subject_path).iloc[:, 0].T.to_numpy()
component_value = pd.read_csv(self.subject_path).iloc[:, 1].T.to_numpy()
df['Time'] = arr_Time_Series
df[f'ICA_Values_{self.ICA_value}'] = component_value
hr_arr = df.to_numpy()
#############################################################
# Assign paths
stats_component = f'{self.output_sub_dir}/{self.subject_id}_HR_component.csv'
graph_component = f'{self.output_sub_dir}/{self.subject_id}_Plot_{self.ICA_value}.png'
#############################################################
component_time = hr_arr[:, 0]
ICA_component_value = hr_arr[:, 1]
# attempt to flip values if signal is inverted
if np.median(ICA_component_value) > 0.00:
ICA_component_value = -ICA_component_value
df[f'ICA_Values_{self.ICA_value}'] = ICA_component_value
# SCIPY PEAK DETECTION
###################################################################################################
distance = 150
# filter height to peaks within the top 5% of largest values
num_to_filter_component = round(len(ICA_component_value) * 0.045)
height_component = ((sum(sorted(ICA_component_value)[-num_to_filter_component:])) / num_to_filter_component)
# Load sample ECG signal & extract R-peaks using BioSppy
component_rpeaks, _ = signal.find_peaks(ICA_component_value, height=height_component, distance=distance, prominence=3.5)
# PYHRV PEAK DETECTION
########################################################################################################
# t_component, filtered_signal, component_rpeaks = biosppy.signals.ecg.ecg(ICA_component_value, sampling_rate=500)[:3]
# Plots Component Peaks
################################################################################################
length_of_component_time_series = np.amax(component_time)
to_divide = 1
mini2 = 0
maxi2 = len(component_time) / to_divide
peaks_range = [idx for idx in range(len(component_rpeaks)) if mini2 < component_rpeaks[idx] < maxi2]
peaks1 = component_rpeaks[peaks_range]
fig, ax = plt.subplots(1, 1, figsize=(12, 5))
plt.plot(component_time, ICA_component_value, linewidth=0.2)
plt.plot(component_time[component_rpeaks], ICA_component_value[component_rpeaks], "x")
plt.xlim([10, 480 // to_divide])
fig = plt.gcf()
fig.set_size_inches(12, 5)
beat_cnt = len(peaks1)
avg_hr = round((beat_cnt / (length_of_component_time_series)) * 60, 2)
plt.title(f"{self.subject_id} RR Interval Component {self.ICA_value} \nNumber of Beats: {beat_cnt} || Average Heart Rate: {avg_hr}")
plt.savefig(graph_component, dpi=100)
# save figure as HTML file
html_str = mpld3.fig_to_html(fig)
Html_file = open(graph_component[:-4] + '.html', "w")
Html_file.write(html_str)
Html_file.close()
# Writes Component RR Intervals to CSV
################################################################################################################
# peak_vals = component_time[component_rpeaks]
# rr_int = []
# size = len(peak_vals) - 1
# for i in range(1, size):
# print(peak_vals[i])
# delta = peak_vals[i] - peak_vals[i - 1]
# rr_int = np.append(rr_int, delta)
# df = pd.DataFrame()
# df['Time'] = peak_vals[2:]
# df = pd.DataFrame()
# df['Signal'] = ICA_component_value
# df = df.set_index('Signal')
# df = df.dropna()
df.to_csv(stats_component, index=False)
\ No newline at end of file
import mpld3
import numpy as np
import matplotlib.pyplot as plt
import scipy.signal as signal
import pandas as pd
import scipy.io
from sklearn import preprocessing
import os
import pyhrv.tools as tools
import pyhrv
import biosppy
from opensignalsreader import OpenSignalsReader
import math
from sklearn.metrics import mean_squared_error
class GetRRIntervals():
def __init__(self, subject_id):
self.subject_id = subject_id
def get_RR_intervals(self):
self.subject_id = self.subject_id.split('.')[0]
#############################################################
output_path = f"/home/ec2-user/MEGNET/output/{self.subject_id}"
arr_Time_Series = pd.read_csv(f'{output_path}/{self.subject_id}.csv').iloc[:, 0].T.to_numpy()
value = pd.read_csv(f'{output_path}/{self.subject_id}.csv').iloc[:, 1]
value_num = value.name
counter = 0
#############################################################
counter = counter + 1
df_input = pd.DataFrame()
df_input['Time_Series'] = arr_Time_Series
print(arr_Time_Series)
df_input[f'ICA_Value_{counter}'] = value
#############################################################
hr_arr = df_input.to_numpy()
#############################################################
# Create toul variable for later use
stats_component = f'{output_path}/{self.subject_id}_RR_Intervals_{value_num}.csv'
stats_ECG = f'{output_path}/{self.subject_id}_RR_Intervals_ECG.csv'
graph_component = f'{output_path}/{self.subject_id}_RR_Intervals_{value_num}.png'
graph_ecg = f'{output_path}/{self.subject_id}_RR_Intervals_{value_num}_ecg.png'
#############################################################
time_s = hr_arr[:, 0]
sig_one = hr_arr[:, 1]
#############################################################
if np.median(sig_one) > 0.00:
sig_one = -sig_one
df_input[f'ICA_Value_{counter}'] = sig_one
x = np.amax(time_s)
distance = 150
ecg_filepath = f"/home/ec2-user/MEGNET/techdev_ecg/{self.subject_id}.csv"
ecg_df = pd.read_csv(ecg_filepath).to_numpy()
ecg_time = ecg_df[:, 0]
ecg_signal = ecg_df[:, 1]
if np.median(ecg_signal) > 0.00:
ecg_signal = -ecg_signal
ecg_signal = signal.resample(ecg_signal, 240000)
# SCIPY PEAK DETECTION
###################################################################################################
num_to_filter_component = round(len(sig_one) * 0.04)
height_component = ((sum(sorted(sig_one)[-num_to_filter_component:])) / num_to_filter_component)
# Load sample ECG signal & extract R-peaks using BioSppy
component_rpeaks, _ = signal.find_peaks(sig_one, height=height_component, distance=distance)
num_to_filter_ECG = round(len(ecg_signal) * 0.04)
height_ECG = ((sum(sorted(ecg_signal)[-num_to_filter_ECG:])) / num_to_filter_ECG)
# Load sample ECG signal & extract R-peaks using BioSppy
ecg_rpeaks, _ = signal.find_peaks(ecg_signal, height=height_ECG, distance=distance)
# component_nni = tools.nn_intervals(component_rpeaks)
# ecg_nni = tools.nn_intervals(ecg_rpeaks)
# ecg_series = pd.Series(ecg_rpeaks)
# component_series = pd.Series(component_rpeaks)
# component_rmssd = pyhrv.time_domain.rmssd(nni=component_nni, rpeaks=component_rpeaks)['rmssd']
# component_sdnn = pyhrv.time_domain.sdnn(nni=component_nni, rpeaks=component_rpeaks)['sdnn']
# component_nn50 = pyhrv.time_domain.nn50(nni=component_nni, rpeaks=component_rpeaks)['nn50']
# component_pnn50 = pyhrv.time_domain.nn50(nni=component_nni, rpeaks=component_rpeaks)['pnn50']
# ecg_rmssd = pyhrv.time_domain.rmssd(nni=ecg_nni, rpeaks=ecg_rpeaks)['rmssd']
# ecg_sdnn = pyhrv.time_domain.sdnn(nni=ecg_nni, rpeaks=ecg_rpeaks)['sdnn']
# ecg_nn50 = pyhrv.time_domain.nn50(nni=ecg_nni, rpeaks=ecg_rpeaks)['nn50']
# ecg_pnn50 = pyhrv.time_domain.nn50(nni=ecg_nni, rpeaks=ecg_rpeaks)['pnn50']
# df = pd.DataFrame()
# df['component_rmssd'] = [component_rmssd]
# df['ecg_rmssd'] = [ecg_rmssd]
# df['component_sdnn'] = [component_sdnn]
# df['ecg_sdnn'] = [ecg_sdnn]
# df['component_nn50'] = [component_nn50]
# df['ecg_nn50'] = [ecg_nn50]
# df['component_pnn50'] = [component_pnn50]
# df['ecg_pnn50'] = [ecg_pnn50]
# df['component_peaks_num'] = [len(component_rpeaks)]
# df['ECG_peaks_num'] = [len(ecg_rpeaks)]
# df['subject_id'] = [f'{self.subject_id}']
# df = df.set_index('subject_id')
# df.to_csv(f"/home/ec2-user/MEGNET/comparisons/{self.subject_id}_stats.csv")
################################################################################################
to_divide = 1
mini2 = 0
maxi2 = len(df_input['Time_Series']) / to_divide
peaks_range = [idx for idx in range(len(component_rpeaks)) if mini2 < component_rpeaks[idx] < maxi2]
peaks1 = component_rpeaks[peaks_range]
fig, ax = plt.subplots(1, 1, figsize=(12, 5))
plt.plot(time_s, sig_one, linewidth=0.2)
plt.plot(time_s[component_rpeaks], sig_one[component_rpeaks], "x")
plt.xlim([10, 480 // to_divide])
fig = plt.gcf()
fig.set_size_inches(12, 5)
beat_cnt = len(peaks1)
avg_hr = round((beat_cnt / (x)) * 60, 2)
plt.title(f"{self.subject_id} RR Interval Component {value_num} \nNumber of Beats: {beat_cnt} || Average Heart Rate: {avg_hr}")
plt.savefig(graph_component, dpi=100)
# save figure as HTML file
html_str = mpld3.fig_to_html(fig)
Html_file = open(graph_component[:-4] + '.html', "w")
Html_file.write(html_str)
Html_file.close()
#############################################################
# to_divide = 1
# mini2 = 0
# maxi2 = len(ecg_time) / to_divide
# peaks_range_ecg = [idx for idx in range(len(ecg_rpeaks)) if mini2 < ecg_rpeaks[idx] < maxi2]
# peaks1_ecg = ecg_rpeaks[peaks_range_ecg]
# fig, ax = plt.subplots(1, 1, figsize=(12, 5))
# plt.plot(ecg_time, ecg_signal, linewidth=0.2)
# plt.plot(ecg_time[ecg_rpeaks], ecg_signal[ecg_rpeaks], "x")
# plt.xlim([10, 480 // to_divide])
# fig = plt.gcf()
# fig.set_size_inches(12, 5)
# ecg_beats_length = np.amax(ecg_time)
# beat_cnt = len(peaks1_ecg)
# avg_hr = round((beat_cnt / ecg_beats_length) * 60, 2)
# plt.title(f"{self.subject_id} RR Interval ECG \nNumber of Beats: {beat_cnt} || Average Heart Rate: {avg_hr}")
# plt.savefig(graph_ecg, dpi=100)
# save figure as HTML file
# html_str = mpld3.fig_to_html(fig)
# Html_file = open(graph_ecg[:-4] + '.html', "w")
# Html_file.write(html_str)
# Html_file.close()
# Writes Component values to CSV
################################################################################################################
# peak_vals = component_time[component_rpeaks]
# rr_int = []
# size = len(peak_vals) - 1
# for i in range(1, size):
# delta = peak_vals[i] - peak_vals[i - 1]
# rr_int = np.append(rr_int, delta)
df = pd.DataFrame()
# df['Time'] = peak_vals[2:]
df['Signal'] = sig_one
df = df.set_index('Signal')
# df = df.dropna()
df.to_csv(stats_component)
# Writes ECG RR Intervals to CSV
###############################################################################################################
# peak_vals = ecg_time[ecg_rpeaks]
# rr_int = []
# size = len(peak_vals) - 1
# for i in range(1, size):
# delta = peak_vals[i] - peak_vals[i - 1]
# rr_int = np.append(rr_int, delta)
df = pd.DataFrame()
# df['Time'] = peak_vals[2:]
df['Signal'] = ecg_signal
df = df.set_index('Signal')
# df = df.dropna()
df.to_csv(stats_ECG)
\ No newline at end of file
"""
Date 11/11/2021
Autor: Alex Treacher
This script is designed use the final trained MEGnet to make predictions on data.
It is set up to be ran from the command line.
Note: Tensroflow does take some time to load, thus running this independently for each subject is not the most computationally efficient.
To increase efficeny, I'd suggest imbedding this function into a pipeline that will load tensorflow and then run multiple subjects at once
Alternativley, fPredictChunkAndVoting (used in function below) can be applied to N spatial map and time series pairs.
Thus the fPredictICA could be easily modified to be appled to a complete list of ICA components and ran on many subjects.
The outputs are saved by numpy in a text file, that is easliy human readable and can be loaded using np.loadtxt('/path/to/ICA_component_lables.txt')
example usage:
python Label_ICA_Components.py --input_path example_data/HCP/100307/@rawc_rfDC_8-StoryM_resample_notch_band/ICA202DDisc --output_dir example_data/HCP/100307/@rawc_rfDC_8-StoryM_resample_notch_band/ICA202DDisc --output_type list
"""
class label_ICA_components():
def __init__(self, subject_path):
self.subject_path = subject_path
def load_image(self, infilename ) :
from PIL import Image
import numpy as np
img = Image.open( infilename )
img.load()
data = np.asarray( img, dtype="int32" )
return data
def fPredictICA(self):
import os
# os.environ['OMP_NUM_THREADS'] = '1'
from Megnet_Utilities import fPredictChunkAndVoting
import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa
import scipy.io
from tensorflow import keras
import pandas as pd
heartifacts = []
strOutputType = 'list'
#loading the data is from our Brainstorm Pipeline, it may require some minor edits based on how the data is saved.
#load the time seris and the spatial map
arrTimeSeries = pd.read_csv(self.subject_path).iloc[:, 1:]
arrTimeSeries = arrTimeSeries.T.to_numpy()
arrSpatialMap = np.zeros((20, 120, 120, 3), dtype=np.uint8)
#ensure the data is compatable
try:
assert arrTimeSeries.shape[0] == arrSpatialMap.shape[0] #the number of time series should be the same as the number of spatial maps
assert arrSpatialMap.shape[1:] == (120,120,3) #the spatial maps should have a shape of [N,120,120,3]
assert arrTimeSeries.shape[1] >= 15000 #the time series need to be at least 60secs with a sample rate of 250hz (60*250=15000)
except AssertionError:
raise ValueError('The data does not have the correct dimensions')
current_dir = os.path.dirname(os.path.realpath(__file__))
#load the model
kModel = keras.models.load_model(f"{current_dir}/MEGnet_final_model.h5")
#use the vote chunk prediction function to make a prediction on each input
output = fPredictChunkAndVoting(kModel,
arrTimeSeries,
arrSpatialMap,
np.zeros((20,3)), #the code expects the Y values as it was used for performance, just put in zeros as a place holder.
15000,
3750)
arrPredictionsVote, arrGTVote, arrPredictionsChunk, arrGTChunk = output
#format the predictions
if strOutputType.lower() == 'array':
to_return = arrPredictionsVote[:,0,:]
else:
to_return = arrPredictionsVote[:,0,:].argmax(axis=1)
for x in range(len(to_return)):
confidence = list(arrPredictionsVote[x][0])
heartrate_confidence = confidence[2]
# append all HR artifacts with confidence above 0 to list
if to_return[x] == 2 and heartrate_confidence > 0:
print("Heartifact:", x, heartrate_confidence)
heartifacts.append((x, heartrate_confidence))
# return highest confidence heartrate artifact
index, confidence = max(heartifacts, key=lambda x: x[1])
return index
import os
import mne
from time import time, localtime, strftime
import scipy.stats
from matplotlib import pyplot as plt
from skimage import io, color, transform, exposure, img_as_ubyte
from time import time, localtime, strftime
import cv2
import numpy as np
import os
from PIL import Image
from mne.preprocessing import ICA
import multiprocessing
class MNE_Processor():
def __init__(self, scan_path, output_dir, csv_output_path):
self.scan_path = scan_path
self.output_dir = output_dir
self.csv_output_path = csv_output_path
def process(self):
# process with every available thread
os.environ['OMP_NUM_THREADS'] = str(multiprocessing.cpu_count())
# os.environ['OMP_NUM_THREADS'] = '1'
mne.set_log_level('ERROR')
raw = mne.io.read_raw_ctf(self.scan_path, verbose='warning', preload=True, system_clock='truncate', clean_names=False)
sfreq = raw.info['sfreq']
mag_picks = mne.pick_types(raw.info, meg=True, eeg=False, misc=False)
eeg_picks = mne.pick_types(raw.info, meg=False, eeg=True, misc=False)
misc_picks = mne.pick_types(raw.info, meg=False, eeg=False, misc=True)
all_picks = mne.pick_types(raw.info, meg=True, eeg=True, misc=True)
freq_to_filter = 60 # remove 60 Hz sig from power lines
notch_filtered = mne.io.Raw.notch_filter(raw, np.arange(freq_to_filter, sfreq / 2., freq_to_filter), \
picks=all_picks, phase='zero')
lower_freq_cutoff = 0.5
upper_freq_cutoff = 100
bandpass_filtered = mne.io.Raw.filter(notch_filtered, lower_freq_cutoff, upper_freq_cutoff, picks=all_picks)
bandpass_filtered_resampled = bandpass_filtered.resample(500)
method = 'infomax'
if method == 'infomax':
fit_params = dict(extended=True)
elif method == 'fastica':
fit_params = dict()
reject = dict(mag=5e-12, grad=4000e-13)
ica = ICA(n_components=20, method='infomax', fit_params=fit_params, random_state=0)
ica.fit(bandpass_filtered_resampled, picks="data")
ecg_index = list(range(20))
ica_sources = ica.get_sources(bandpass_filtered_resampled)
data, times = bandpass_filtered_resampled[misc_picks[:1]]
time = ica_sources[ecg_index[0]][1].reshape(len(times), )
sigi = []
try:
for i in ecg_index:
sigi.append(ica_sources[i][0].reshape(len(times), ))
except Exception as err:
print(err)
appended_output = np.c_[time, np.array(sigi).T]
np.savetxt(self.csv_output_path, appended_output, fmt='%1.4e',
delimiter=',', \
header='Time, IC1, IC2, IC3, IC4, IC5, IC6, IC7, IC8, IC9, IC10, IC11, IC12, IC13, IC14, IC15, IC16, IC17, IC18, IC19, IC20')
import pandas as pd
idx = pd.IndexSlice
import numpy as np
def fGetStartTimesOverlap(intInputLen, intModelLen=15000, intOverlap=3750):
"""
model len is 60 seconds at 250Hz = 15000
overlap len is 15 seconds at 250Hz = 3750
"""
lStartTimes = []
intStartTime = 0
while intStartTime+intModelLen<=intInputLen:
lStartTimes.append(intStartTime)
intStartTime = intStartTime+intModelLen-intOverlap
return lStartTimes
def fPredictChunkAndVoting(kModel, lTimeSeries, arrSpatialMap, arrY, intModelLen=15000, intOverlap=3750):
"""
This function is designed to take in ICA time series and a spatial map pair and produce a prediction useing a trained model.
The time series will be split into multiple chunks and the final prediction will be a weighted vote of each time chunk.
The weight for the voting will be determined by the manout of time and overlap each chunk has with one another.
For example if the total lenght of the scan is 50 seconds, and the chunks are 15 seconds long with a 5 second overlap:
The first chunk will be the only chunk to use the first 10 seconds, and one of two chunks to use the next 5 seconds.
Thus
:param kModel: The model that will be used for the predictions on each chunk. It should have two inputs the spatial map and time series respectivley
:type kModel: a keras model
:param lTimeSeries: The time series for each scan (can also be an array if all scans are the same lenght)
:type lTimeSeries: list or array (if each scan is a different length, then it needs to be a list)
:param arrSpatialMap: The spatial maps (one per scan)
:type arrSpatialMap: numpy array
:param intModelLen: The lenght of the time series in the model, defaults to 15000
:type intModelLen: int, optional
:param intOverlap: The lenght of the overlap between scans, defaults to 3750
:type intOverlap: int, optional
"""
# empty list to hold the prediction for each component pair
lPredictionsVote = []
lGTVote = []
lPredictionsChunk = []
lGTChunk = []
i = 0
for arrScanTimeSeries, arrScanSpatialMap, arrScanY in zip(lTimeSeries, arrSpatialMap, arrY):
intTimeSeriesLen = arrScanTimeSeries.shape[0]
lStartTimes = fGetStartTimesOverlap(intTimeSeriesLen, intModelLen=intModelLen, intOverlap=intOverlap)
if lStartTimes[-1] + intModelLen <= intTimeSeriesLen:
lStartTimes.append(arrScanTimeSeries.shape[0] - intModelLen)
dctTimeChunkVotes = dict([[x, 0] for x in lStartTimes])
for intT in range(intTimeSeriesLen):
lChunkMatches = [x <= intT < x + intModelLen for x in dctTimeChunkVotes.keys()]
intInChunks = np.sum(lChunkMatches)
for intStartTime, bTruth in zip(dctTimeChunkVotes.keys(), lChunkMatches):
if bTruth:
dctTimeChunkVotes[intStartTime] += 1.0 / intInChunks
# predict
dctWeightedPredictions = {}
for intStartTime in dctTimeChunkVotes.keys():
lPrediction = kModel.predict([np.expand_dims(arrScanSpatialMap, 0),
np.expand_dims(
np.expand_dims(arrScanTimeSeries[intStartTime:intStartTime + intModelLen],
0), -1)])
lPredictionsChunk.append(lPrediction)
lGTChunk.append(arrScanY)
dctWeightedPredictions[intStartTime] = lPrediction * dctTimeChunkVotes[intStartTime]
arrScanPrediction = np.stack(dctWeightedPredictions.values())
arrScanPrediction = arrScanPrediction.mean(axis=0)
arrScanPrediction = arrScanPrediction / arrScanPrediction.sum()
lPredictionsVote.append(arrScanPrediction)
lGTVote.append(arrScanY)
i += 1
return np.stack(lPredictionsVote), np.stack(lGTVote), np.stack(lPredictionsChunk), np.stack(lGTChunk)
from typing import Dict, List
import numpy as np
import pandas as pd
from sklearn.metrics import precision_recall_fscore_support
from snorkel.slicing import PandasSFApplier, slicing_function
@slicing_function()
def nlp_cnn(x):
"""NLP Projects that use convolution."""
nlp_projects = "natural-language-processing" in x.tag
convolution_projects = "CNN" in x.text or "convolution" in x.text
return nlp_projects and convolution_projects
@slicing_function()
def short_text(x):
"""Projects with short titles and descriptions."""
return len(x.text.split()) < 8 # less than 8 words
def get_slice_metrics(y_true: np.ndarray, y_pred: np.ndarray, slices: np.recarray) -> Dict:
"""Generate metrics for slices of data.
Args:
y_true (np.ndarray): true labels.
y_pred (np.ndarray): predicted labels.
slices (np.recarray): generated slices.
Returns:
Dict: slice metrics.
"""
metrics = {}
for slice_name in slices.dtype.names:
mask = slices[slice_name].astype(bool)
if sum(mask):
slice_metrics = precision_recall_fscore_support(
y_true[mask], y_pred[mask], average="micro"
)
metrics[slice_name] = {}
metrics[slice_name]["precision"] = slice_metrics[0]
metrics[slice_name]["recall"] = slice_metrics[1]
metrics[slice_name]["f1"] = slice_metrics[2]
metrics[slice_name]["num_samples"] = len(y_true[mask])
return metrics
def get_metrics(
y_true: np.ndarray, y_pred: np.ndarray, classes: List, df: pd.DataFrame = None
) -> Dict:
"""Performance metrics using ground truths and predictions.
Args:
y_true (np.ndarray): true labels.
y_pred (np.ndarray): predicted labels.
classes (List): list of class labels.
df (pd.DataFrame, optional): dataframe to generate slice metrics on. Defaults to None.
Returns:
Dict: performance metrics.
"""
# Performance
metrics = {"overall": {}, "class": {}}
# Overall metrics
overall_metrics = precision_recall_fscore_support(y_true, y_pred, average="weighted")
metrics["overall"]["precision"] = overall_metrics[0]
metrics["overall"]["recall"] = overall_metrics[1]
metrics["overall"]["f1"] = overall_metrics[2]
metrics["overall"]["num_samples"] = np.float64(len(y_true))
# Per-class metrics
class_metrics = precision_recall_fscore_support(y_true, y_pred, average=None)
for i, _class in enumerate(classes):
metrics["class"][_class] = {
"precision": class_metrics[0][i],
"recall": class_metrics[1][i],
"f1": class_metrics[2][i],
"num_samples": np.float64(class_metrics[3][i]),
}
# Slice metrics
if df is not None:
slices = PandasSFApplier([nlp_cnn, short_text]).apply(df)
metrics["slices"] = get_slice_metrics(y_true=y_true, y_pred=y_pred, slices=slices)
return metrics
\ No newline at end of file
import json
import tempfile
import warnings
from argparse import Namespace
from pathlib import Path
from typing import Dict
import joblib
import mlflow
import optuna
import pandas as pd
import typer
from numpyencoder import NumpyEncoder
from optuna.integration.mlflow import MLflowCallback
from config import config
from config.config import logger
from tagifai import data, predict, train, utils
warnings.filterwarnings("ignore")
# Initialize Typer CLI app
app = typer.Typer()
@app.command()
def etl_data():
"""Extract, load and transform our data assets."""
# Extract
projects = utils.load_json_from_url(url=config.PROJECTS_URL)
tags = utils.load_json_from_url(url=config.TAGS_URL)
# Transform
df = pd.DataFrame(projects)
df = df[df.tag.notnull()] # drop rows w/ no tag
# Load
projects_fp = Path(config.DATA_DIR, "projects.json")
utils.save_dict(d=df.to_dict(orient="records"), filepath=projects_fp)
tags_fp = Path(config.DATA_DIR, "tags.json")
utils.save_dict(d=tags, filepath=tags_fp)
logger.info("✅ ETL on data is complete!")
@app.command()
def label_data(args_fp: str = "config/args.json") -> None:
"""Label data with constraints.
Args:
args_fp (str): location of args.
"""
# Load projects
projects_fp = Path(config.DATA_DIR, "projects.json")
projects = utils.load_dict(filepath=projects_fp)
df = pd.DataFrame(projects)
# Load tags
tags_dict = {}
tags_fp = Path(config.DATA_DIR, "tags.json")
for item in utils.load_dict(filepath=tags_fp):
key = item.pop("tag")
tags_dict[key] = item
# Label with constrains
args = Namespace(**utils.load_dict(filepath=args_fp))
df = df[df.tag.notnull()] # remove projects with no label
df = data.replace_oos_labels(df=df, labels=tags_dict.keys(), label_col="tag", oos_label="other")
df = data.replace_minority_labels(
df=df, label_col="tag", min_freq=args.min_freq, new_label="other"
)
# Save clean labeled data
labeled_projects_fp = Path(config.DATA_DIR, "labeled_projects.json")
utils.save_dict(d=df.to_dict(orient="records"), filepath=labeled_projects_fp)
logger.info("✅ Saved labeled data!")
@app.command()
def train_model(
args_fp: str = "config/args.json",
experiment_name: str = "baselines",
run_name: str = "sgd",
test_run: bool = False,
) -> None:
"""Train a model given arguments.
Args:
args_fp (str): location of args.
experiment_name (str): name of experiment.
run_name (str): name of specific run in experiment.
test_run (bool, optional): If True, artifacts will not be saved. Defaults to False.
"""
# Load labeled data
projects_fp = Path(config.DATA_DIR, "labeled_projects.json")
projects = utils.load_dict(filepath=projects_fp)
df = pd.DataFrame(projects)
# Train
args = Namespace(**utils.load_dict(filepath=args_fp))
mlflow.set_experiment(experiment_name=experiment_name)
with mlflow.start_run(run_name=run_name):
run_id = mlflow.active_run().info.run_id
logger.info(f"Run ID: {run_id}")
artifacts = train.train(df=df, args=args)
performance = artifacts["performance"]
logger.info(json.dumps(performance, indent=2))
# Log metrics and parameters
performance = artifacts["performance"]
mlflow.log_metrics({"precision": performance["overall"]["precision"]})
mlflow.log_metrics({"recall": performance["overall"]["recall"]})
mlflow.log_metrics({"f1": performance["overall"]["f1"]})
mlflow.log_params(vars(artifacts["args"]))
# Log artifacts
with tempfile.TemporaryDirectory() as dp:
utils.save_dict(vars(artifacts["args"]), Path(dp, "args.json"), cls=NumpyEncoder)
artifacts["label_encoder"].save(Path(dp, "label_encoder.json"))
joblib.dump(artifacts["vectorizer"], Path(dp, "vectorizer.pkl"))
joblib.dump(artifacts["model"], Path(dp, "model.pkl"))
utils.save_dict(performance, Path(dp, "performance.json"))
mlflow.log_artifacts(dp)
# Save to config
if not test_run: # pragma: no cover, actual run
open(Path(config.CONFIG_DIR, "run_id.txt"), "w").write(run_id)
utils.save_dict(performance, Path(config.CONFIG_DIR, "performance.json"))
@app.command()
def optimize(
args_fp: str = "config/args.json", study_name: str = "optimization", num_trials: int = 20
) -> None:
"""Optimize hyperparameters.
Args:
args_fp (str): location of args.
study_name (str): name of optimization study.
num_trials (int): number of trials to run in study.
"""
# Load labeled data
projects_fp = Path(config.DATA_DIR, "labeled_projects.json")
projects = utils.load_dict(filepath=projects_fp)
df = pd.DataFrame(projects)
# Optimize
args = Namespace(**utils.load_dict(filepath=args_fp))
pruner = optuna.pruners.MedianPruner(n_startup_trials=5, n_warmup_steps=5)
study = optuna.create_study(study_name=study_name, direction="maximize", pruner=pruner)
mlflow_callback = MLflowCallback(tracking_uri=mlflow.get_tracking_uri(), metric_name="f1")
study.optimize(
lambda trial: train.objective(args, df, trial),
n_trials=num_trials,
callbacks=[mlflow_callback],
)
# Best trial
trials_df = study.trials_dataframe()
trials_df = trials_df.sort_values(["user_attrs_f1"], ascending=False)
args = {**args.__dict__, **study.best_trial.params}
utils.save_dict(d=args, filepath=args_fp, cls=NumpyEncoder)
logger.info(f"\nBest value (f1): {study.best_trial.value}")
logger.info(f"Best hyperparameters: {json.dumps(study.best_trial.params, indent=2)}")
def load_artifacts(run_id: str = None) -> Dict:
"""Load artifacts for a given run_id.
Args:
run_id (str): id of run to load artifacts from.
Returns:
Dict: run's artifacts.
"""
if not run_id:
run_id = open(Path(config.CONFIG_DIR, "run_id.txt")).read()
# Locate specifics artifacts directory
experiment_id = mlflow.get_run(run_id=run_id).info.experiment_id
artifacts_dir = Path(config.MODEL_REGISTRY, experiment_id, run_id, "artifacts")
# Load objects from run
args = Namespace(**utils.load_dict(filepath=Path(artifacts_dir, "args.json")))
vectorizer = joblib.load(Path(artifacts_dir, "vectorizer.pkl"))
label_encoder = data.LabelEncoder.load(fp=Path(artifacts_dir, "label_encoder.json"))
model = joblib.load(Path(artifacts_dir, "model.pkl"))
performance = utils.load_dict(filepath=Path(artifacts_dir, "performance.json"))
return {
"args": args,
"label_encoder": label_encoder,
"vectorizer": vectorizer,
"model": model,
"performance": performance,
}
@app.command()
def predict_tag(text: str = "", run_id: str = None) -> None:
"""Predict tag for text.
Args:
text (str): input text to predict label for.
run_id (str, optional): run id to load artifacts for prediction. Defaults to None.
"""
if not run_id:
run_id = open(Path(config.CONFIG_DIR, "run_id.txt")).read()
artifacts = load_artifacts(run_id=run_id)
prediction = predict.predict(texts=[text], artifacts=artifacts)
logger.info(json.dumps(prediction, indent=2))
return prediction
if __name__ == "__main__":
app() # pragma: no cover, live app
\ No newline at end of file
from typing import Dict, List
import numpy as np
def custom_predict(y_prob: np.ndarray, threshold: float, index: int) -> np.ndarray:
"""Custom predict function that defaults
to an index if conditions are not met.
Args:
y_prob (np.ndarray): predicted probabilities
threshold (float): minimum softmax score to predict majority class
index (int): label index to use if custom conditions is not met.
Returns:
np.ndarray: predicted label indices.
"""
y_pred = [np.argmax(p) if max(p) > threshold else index for p in y_prob]
return np.array(y_pred)
def predict(texts: List, artifacts: Dict) -> List:
"""Predict tags for given texts.
Args:
texts (List): raw input texts to classify.
artifacts (Dict): artifacts from a run.
Returns:
List: predictions for input texts.
"""
x = artifacts["vectorizer"].transform(texts)
y_pred = custom_predict(
y_prob=artifacts["model"].predict_proba(x),
threshold=artifacts["args"].threshold,
index=artifacts["label_encoder"].class_to_index["other"],
)
tags = artifacts["label_encoder"].decode(y_pred)
predictions = [
{
"input_text": texts[i],
"predicted_tag": tags[i],
}
for i in range(len(tags))
]
return predictions
\ No newline at end of file
import json
from argparse import Namespace
from typing import Dict
import mlflow
import numpy as np
import optuna
import pandas as pd
from imblearn.over_sampling import RandomOverSampler
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import log_loss
from config.config import logger
from tagifai import data, evaluate, predict, utils
def train(args: Namespace, df: pd.DataFrame, trial: optuna.trial._trial.Trial = None) -> Dict:
"""Train model on data.
Args:
args (Namespace): arguments to use for training.
df (pd.DataFrame): data for training.
trial (optuna.trial._trial.Trial, optional): optimization trial. Defaults to None.
Raises:
optuna.TrialPruned: early stopping of trial if it's performing poorly.
Returns:
Dict: artifacts from the run.
"""
# Setup
utils.set_seeds()
if args.shuffle:
df = df.sample(frac=1).reset_index(drop=True)
df = df[: args.subset] # None = all samples
df = data.preprocess(df, lower=args.lower, stem=args.stem)
label_encoder = data.LabelEncoder().fit(df.tag)
X_train, X_val, X_test, y_train, y_val, y_test = data.get_data_splits(
X=df.text.to_numpy(), y=label_encoder.encode(df.tag)
)
test_df = pd.DataFrame({"text": X_test, "tag": label_encoder.decode(y_test)})
# Tf-idf
vectorizer = TfidfVectorizer(
analyzer=args.analyzer, ngram_range=(2, args.ngram_max_range)
) # char n-grams
X_train = vectorizer.fit_transform(X_train)
X_val = vectorizer.transform(X_val)
X_test = vectorizer.transform(X_test)
# Oversample
oversample = RandomOverSampler(sampling_strategy="all")
X_over, y_over = oversample.fit_resample(X_train, y_train)
# Model
model = SGDClassifier(
loss="log",
penalty="l2",
alpha=args.alpha,
max_iter=1,
learning_rate="constant",
eta0=args.learning_rate,
power_t=args.power_t,
warm_start=True,
)
# Training
for epoch in range(args.num_epochs):
model.fit(X_over, y_over)
train_loss = log_loss(y_train, model.predict_proba(X_train))
val_loss = log_loss(y_val, model.predict_proba(X_val))
if not epoch % 10:
logger.info(
f"Epoch: {epoch:02d} | "
f"train_loss: {train_loss:.5f}, "
f"val_loss: {val_loss:.5f}"
)
# Log
if not trial:
mlflow.log_metrics({"train_loss": train_loss, "val_loss": val_loss}, step=epoch)
# Pruning (for optimization in next section)
if trial: # pragma: no cover, optuna pruning
trial.report(val_loss, epoch)
if trial.should_prune():
raise optuna.TrialPruned()
# Threshold
y_pred = model.predict(X_val)
y_prob = model.predict_proba(X_val)
args.threshold = np.quantile([y_prob[i][j] for i, j in enumerate(y_pred)], q=0.25) # Q1
# Evaluation
other_index = label_encoder.class_to_index["other"]
y_prob = model.predict_proba(X_test)
y_pred = predict.custom_predict(y_prob=y_prob, threshold=args.threshold, index=other_index)
performance = evaluate.get_metrics(
y_true=y_test, y_pred=y_pred, classes=label_encoder.classes, df=test_df
)
return {
"args": args,
"label_encoder": label_encoder,
"vectorizer": vectorizer,
"model": model,
"performance": performance,
}
def objective(args: Namespace, df: pd.DataFrame, trial: optuna.trial._trial.Trial) -> float:
"""Objective function for optimization trials.
Args:
args (Namespace): arguments to use for training.
df (pd.DataFrame): data for training.
trial (optuna.trial._trial.Trial, optional): optimization trial.
Returns:
float: metric value to be used for optimization.
"""
# Parameters to tune
args.analyzer = trial.suggest_categorical("analyzer", ["word", "char", "char_wb"])
args.ngram_max_range = trial.suggest_int("ngram_max_range", 3, 10)
args.learning_rate = trial.suggest_loguniform("learning_rate", 1e-2, 1e0)
args.power_t = trial.suggest_uniform("power_t", 0.1, 0.5)
# Train & evaluate
artifacts = train(args=args, df=df, trial=trial)
# Set additional attributes
overall_performance = artifacts["performance"]["overall"]
logger.info(json.dumps(overall_performance, indent=2))
trial.set_user_attr("precision", overall_performance["precision"])
trial.set_user_attr("recall", overall_performance["recall"])
trial.set_user_attr("f1", overall_performance["f1"])
return overall_performance["f1"]
\ No newline at end of file
import json
import random
from typing import Dict
from urllib.request import urlopen
import numpy as np
def load_json_from_url(url: str) -> Dict:
"""Load JSON data from a URL.
Args:
url (str): URL of the data source.
Returns:
Dict: loaded JSON data.
"""
data = json.loads(urlopen(url).read())
return data
def load_dict(filepath: str) -> Dict:
"""Load a dictionary from a JSON's filepath.
Args:
filepath (str): location of file.
Returns:
Dict: loaded JSON data.
"""
with open(filepath) as fp:
d = json.load(fp)
return d
def save_dict(d: Dict, filepath: str, cls=None, sortkeys: bool = False) -> None:
"""Save a dictionary to a specific location.
Args:
d (Dict): data to save.
filepath (str): location of where to save the data.
cls (optional): encoder to use on dict data. Defaults to None.
sortkeys (bool, optional): whether to sort keys alphabetically. Defaults to False.
"""
with open(filepath, "w") as fp:
json.dump(d, indent=2, fp=fp, cls=cls, sort_keys=sortkeys)
fp.write("\n")
def set_seeds(seed: int = 42) -> None:
"""Set seed for reproducibility.
Args:
seed (int, optional): number to be used as the seed. Defaults to 42.
"""
# Set seeds
np.random.seed(seed)
random.seed(seed)
\ No newline at end of file
setup.py 0 → 100644
from pathlib import Path
from setuptools import find_namespace_packages, setup
# Load packages from requirements.txt
BASE_DIR = Path(__file__).parent
with open(Path(BASE_DIR, "requirements.txt")) as file:
required_packages = [ln.strip() for ln in file.readlines()]
docs_packages = ["mkdocs==1.3.0", "mkdocstrings==0.18.1"]
style_packages = ["black==22.3.0", "flake8==3.9.2", "isort==5.10.1"]
test_packages = ["pytest==7.1.2", "pytest-cov==2.10.1", "great-expectations==0.15.15"]
# Define our package
setup(
name="tagifai",
version=0.1,
description="Classify machine learning projects.",
author="Goku Mohandas",
author_email="goku@madewithml.com",
url="https://madewithml.com/",
python_requires=">=3.7",
packages=find_namespace_packages(),
install_requires=[required_packages],
extras_require={
"dev": docs_packages + style_packages + test_packages + ["pre-commit==2.19.0"],
"docs": docs_packages,
"test": test_packages,
},
)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment