Commit 548dc8a3 authored by KOMAL BADI's avatar KOMAL BADI
Browse files

I have created a new database for the array job which is used for the throughput analysis.

This notebook is for the throughput analysis of array jobs.
parent 26fd9d19
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Through-Put Analysis:\n",
"Throughput is the amount of output that can be processed in a given period of time.\n",
"This throughput plot gives statistics about a single array job ."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Enter the JobID for which you want to do throughput analysis \n",
"Job_id='5976984'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Mandatory\n",
"import sqlite3\n",
"import slurm2sql\n",
"import pandas as pd\n",
"import seaborn as sns\n",
"import matplotlib.pyplot as plt\n",
"import matplotlib\n",
"import warnings\n",
"from RC_STYLES import rc_styles as s\n",
"warnings.filterwarnings(\"ignore\")\n",
"import numpy as np"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Database Connection\n",
"slurm-since-March-allocation.sqlite3 : This DB is created using the --allocations parameter in the sacct command.Shows data for only the job allocation , not taking batch,extern steps into consideration. SQLite is a C library that provides a lightweight disk-based database that doesn’t require a separate server process and allows accessing the database using a nonstandard variant of the SQL query language. Some applications can use SQLite for internal data storage. It’s also possible to prototype an application using SQLite and then port the code to a larger database such as PostgreSQL or Oracle."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"db = sqlite3.connect('throughput_analysis_array_job.db')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"slurm2sql.slurm2sql(db, ['-j', Job_id, '-a'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Mandatory\n",
"#db = sqlite3.connect('/data/rc/rc-team/slurm-since-March-allocation.sqlite3')\n",
"#db = sqlite3.connect('/data/rc/rc-team/slurm-since-March.sqlite3')\n",
"df = pd.read_sql('SELECT * FROM slurm', db)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df= df.loc[df['JobName'] =='R_array_job']"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Submit_Time , Start_Time , End_Time :\n",
"Termination time of the job. Format output is, YYYY-MM-DDTHH:MM:SS, unless changed through the SLURM_TIME_FORMAT environment variable. Submit_time is decribed as the time the job was submitted. Initiation time of the job in the same format as End. Here submit,start and End columns which are in epoch in the sacct are converted to date_time format and are saved under submit_time , start_time and end_time columns .\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Voluntary\n",
"df['start_time'] = pd.to_datetime(df['Start'],unit='s')\n",
"df['end_time'] = pd.to_datetime(df['End'],unit='s')\n",
"df['time'] = pd.to_datetime(df['Time'],unit='s')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Data Cleaning:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Voluntary\n",
"#ReqMemNode is Requested memory for each node, in MB. \n",
"#Hence we are converting ReqMemNode in GB for better understanding.\n",
"#Converting ReqMemNodes in GB\n",
"df['ReqMemNode']=df['ReqMemNode']/((1024)*(1024)*(1024)) \n",
"\n",
"#AveRSS is Average resident set size of all tasks in job.\n",
"#Converting AveRSS in GB\n",
"df['AveRSS']=df['AveRSS']/((1024)*(1024)*(1024))\n",
"\n",
"##ReqMemCPU is Requested memory for each CPU, in MB.\n",
"#Converting ReqMemCPU in GB\n",
"df['ReqMemCPU']=df['ReqMemCPU']/((1024)*(1024)*(1024))\n",
"\n",
"###ReqTotalRAM is multiplying Requested memory per each CPU by No. of CPUS requested\n",
"#Computing Total Requested RAM in GB\n",
"df['ReqTotalRAM']=df['NCPUS']*df['ReqMemCPU'] \n",
"\n",
"#Naming all the cancelled by user jobs as Cancelled jobs\n",
"df.loc[df['State'].str.contains('CANCELLED'), 'State'] = 'CANCELLED'\n",
"\n",
"#Waiting time is the time between the job being submitted \n",
"#to slurm scheduluer and the time at which job starts\n",
"#computing waiting time\n",
"df['Waiting'] = df['Start']-df['Submit']\n",
"df1 = df.dropna(subset=['Waiting'])\n",
"\n",
"#Computing waiting time in hours\n",
"df1['Waiting'] = df1['Waiting']/3600 \n",
"\n",
"#Computing Elapsed time in hours\n",
"df1['Elapsed'] = df1['Elapsed']/3600\n",
"\n",
"#Computing CPU time in hours\n",
"df1['CPUTime']=df1['CPUTime']/3600\n",
"\n",
"#droping na values for time(submitted jobs at a particular time)\n",
"#df1 = df1.dropna(subset=['Time']) \n",
"#df1 = df1.dropna(subset=['Submit']) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Succesful Array Jobs:\n",
" Creating a pandas dataframe in which we seperate all the array jobs where ArrayTaskID is not equal to Nan.\n",
" A SLURM job array is a collection of Tasks that differ from each other by only a single index parameter. Creating a job array provides an easy way to group related jobs together.."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Voluntary\n",
"df_Array_jobs=df1.dropna(subset=['ArrayTaskID'])\n",
"df_Array_jobs.head(10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Pandas Groupby Operator:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Mandatory\n",
"#As each Array Job consists of several tasks , No. od Array Tasks per \n",
"#each array job are calculated.\n",
"array_jobs = df_Array_jobs.groupby(\"ArrayJobID\")[\"ArrayTaskID\"].count().reset_index()\n",
"array_jobs.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Voluntary\n",
"#Dropping all 0 value columns for better performance of data \n",
"array_jobs=array_jobs[array_jobs!=0].dropna()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Voluntary\n",
"#Sorting the previous pandas data frame in descending order to see \n",
"#highest no. of array tasks for a single array job and pull out that specific array job.\n",
"array_jobs.sort_values(by='ArrayTaskID', ascending=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Pulling out data corresponding to 1 Array Job:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Mandatory\n",
"#Assingning sorted values a pandas data frame.\n",
"sample_data=array_jobs.sort_values(by='ArrayTaskID', ascending=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sample_array_job=df1"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"sample_array_job.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Submit_Time , Start_Time , End_Time :\n",
"Termination time of the job. Format output is, YYYY-MM-DDTHH:MM:SS, unless changed through the SLURM_TIME_FORMAT environment variable. Submit_time is decribed as the time the job was submitted. Initiation time of the job in the same format as End. Here submit,start and End columns which are in epoch in the sacct are converted to date_time format and are saved under submit_time , start_time and end_time columns ."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Mandatory\n",
"sample_array_job['submit_time'] = pd.to_datetime(sample_array_job['Submit'],unit='s')\n",
"sample_array_job['start_time'] = pd.to_datetime(sample_array_job['Start'],unit='s')\n",
"sample_array_job['end_time'] = pd.to_datetime(sample_array_job['End'],unit='s')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Mandatory\n",
"#Creating 3 different dataframes in which each data frame \n",
"#is grouped by submitted,started and end time of array job.\n",
"# Job count of each time is calculated.\n",
"count_jobs_submit_time= sample_array_job.groupby([\"submit_time\"] , as_index=False)[\"ArrayTaskID\"].count()\n",
"count_jobs_start_time= sample_array_job.groupby([\"start_time\"] , as_index=False)[\"ArrayTaskID\"].count()\n",
"count_jobs_end_time= sample_array_job.groupby([\"end_time\"] , as_index=False)[\"ArrayTaskID\"].count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Submitted Jobs Data Frame:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Mandatory\n",
"##Creating dataframe in which data frame \n",
"#is grouped by submitted of array job.\n",
"# Job count of each time is calculated.\n",
"df_submit_time = count_jobs_submit_time.rename(columns={'ArrayTaskID': 'submitted_Job_count'})\n",
"print(df_submit_time)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Voluntary\n",
"#Submit_time as date-time is set as index \n",
"df_submit_time = count_jobs_submit_time.rename(columns={'ArrayTaskID': 'Submitted_Job_count'})\n",
"print(df_submit_time)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df_submit_time.head()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Started Jobs Data frame:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Mandatory\n",
"###Creating dataframe in which data frame \n",
"#is grouped by started time of array job.\n",
"# Job count of each time is calculated.\n",
"df_start_time = count_jobs_start_time.rename(columns={'ArrayTaskID': 'Started_Job_count'})\n",
"print(df_start_time)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Voluntary\n",
"#Start_time as date-time is set as index \n",
"df1_start_time=df_start_time.set_index('start_time')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df1_start_time.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Voluntary\n",
"#Resampling the data\n",
"#Resampling is the method that consists of drawing repeated samples\n",
"#from the original data samples.\n",
"#The method of Resampling is a nonparametric method of statistical inference.\n",
"Running_df=df1_start_time.resample('S').sum()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Mandatory\n",
"#Creating a new column named Running jobs where \n",
"#Running jobs are cumulative sum of \n",
"#started job count\n",
"Running_df['Running']=Running_df.cumsum()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"Running_df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"Running_df.plot(figsize=(15,10))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Pending Jobs :\n",
" pending jobs are the jobs which are waiting to be started ."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Mandatory\n",
"#pending jobs are calculated as difference between \n",
"#total jobs submitted at a particular time \n",
"#and Jobs that have been started.\n",
"Total_Jobs_Submitted = 50\n",
"Running_df['Pending_jobs'] = Total_Jobs_Submitted - Running_df['Running']"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"Running_df.plot(figsize=(15,10))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Completed Jobs:\n",
"Jobs gets completed at the time at which a job ends."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Mandatory\n",
"###Creating dataframe in which data frame \n",
"#is grouped by end time of array job.\n",
"# Job count of each time is calculated.\n",
"df_end_time = count_jobs_end_time.rename(columns={'ArrayTaskID': 'end_Job_count'})\n",
"print(df_end_time)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Voluntary\n",
"#end_time as date-time is set as index \n",
"df1_end_time=df_end_time.set_index('end_time')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Voluntary\n",
"#Resampling the data\n",
"#Resampling is the method that consists of drawing repeated samples\n",
"#from the original data samples.\n",
"#The method of Resampling is a nonparametric method of statistical inference.\n",
"completed_df=df1_end_time.resample('S').sum()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"completed_df.head(10)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Mandatory\n",
"#Creating a new column named Completed jobs where \n",
"#Completed jobs are cumulative sum of \n",
"#end job count\n",
"completed_df['Completed']=completed_df.cumsum()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"completed_df.head(50)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"completed_df.plot(figsize=(15,10))\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Joining Pandas Dataframes:\n",
" Join columns with other DataFrame either on index or on a key column. Efficiently join multiple DataFrame objects by index at once by passing a list."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Mandatory\n",
"#Joing all the 2 dataframes to plot throughput analysis\n",
"merged_df=completed_df.join(Running_df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Joing all the 2 dataframes to plot throughput analysis\n",
"merged_df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"t=pd.DataFrame(merged_df[['Completed' , 'Running' , 'Pending_jobs']])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#plt.figure(figsize=(15,4))\n",
"t.plot(figsize=(15,10))\n",
"plt.title('Throughput Analysis')\n",
"plt.xlabel('Time')\n",
"plt.ylabel('Job_Count')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Currently_Running Jobs:\n",
" Currently running jobs are the difference between started jobs and completed jobs"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Mandatory\n",
"merged_df['Currently_Running'] = merged_df['Running'] - merged_df['Completed'] "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"merged_df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"p=pd.DataFrame(merged_df[['Currently_Running']])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"p.plot(figsize=(15,5))\n",
"plt.title('currently_Running_Jobs_Statistics')\n",