# run report on pickled list policy data

The script reads pickled files that match the `glob_pattern` from the `pickledir` derived from `dirname` and runs the report saving it as a csv to the subdir "`dirname`/reports" dir by default.

Some progress info is available via the `verbose` flag.

The current report aggrates storage stats by top-level-dir and age (year) of data's last access. The goal of this report is to understand the distribution of lesser used data.

!conda info --envs

!conda list

!pip list -freeze

In [None]:
import datetime
import pandas as pd
import matplotlib.pyplot as plt
from urllib.parse import unquote
import sys
import os
import pathlib
import re
import dask.dataframe as dd
import dask

In [None]:
from dask.diagnostics import ProgressBar

In [None]:
from dask.distributed import Client

Client(scheduler_file='scheduler.json')

In [None]:
client = Client(scheduler_file='scheduler.json')


client = Client()

## input vars

In [None]:
dirname="data/list-policy_projects_2023-08-31" # directory to fine files to pickle
glob_pattern = "*.parquet" # file name glob pattern to match, can be file name for individual file
line_regex_filter = ".*" # regex to match lines of interest in file
pickledir=f"{dirname}/parquet"
reportdir=f"{dirname}/reports"
tldpath="/data/project/ccts/galaxy"

verbose = True
limit = 0

## Utilities

In [None]:
# get top level dir on which to aggregate

def get_tld(df, dirname):
 '''
 df: dataframe with path column (e.g. from policy run)
 dirname: top level dir (TLD) that contains dirs for report
 
 The function uses the length of dirname to locate the TLD column in the split path.
 '''
 dirpaths = dirname.split("/")
 new=df["path"].str.split("/", n=len(dirpaths)+1, expand=True)
 #df=df.assign(tld=new[len(dirpaths)])
 #df["tld"] = new[len(dirpaths)]
 
 return new[len(dirpaths)]

In [None]:
# get top level dir on which to aggregate

def get_year(df, column):
 '''
 df: dataframe with path column (e.g. from policy run)
 dirname: top level dir (TLD) that contains dirs for report
 
 The function uses the length of dirname to locate the TLD column in the split path.
 '''
 new = df[column].dt.year
 #dirpaths = dirname.split("/")
 #new=df["path"].str.split("/", n=len(dirpaths)+1, expand=True)
 #df=df.assign(tld=new[len(dirpaths)])
 #df["tld"] = new[len(dirpaths)]
 
 return new

In [None]:
def report_tld_year(df):
 '''
 Aggregate the sum and count of files by year in the top level dir (TLD)
 
 Uses dict parameter to pandas agg to apply sum and count function to size column
 '''
 report = df.groupby(['tld', df.access.dt.year]).agg({"size": ["sum", "count"]})
 return report

## Read and parse the files according to glob_pattern

dask.config.set(scheduler='threads')

dask.config.set(scheduler='processes')

In [None]:
def read_policy_parquet(file, columns=['size', 'access', 'modify', 'uid', 'path'], engine="pyarrow"):
 
 df = dd.read_parquet(file, columns=columns, engine=engine)
 
 df = client.persist(df)
 
 df=df.repartition(partition_size="64MB")

 return df

df = dd.read_parquet(f'{pickledir}/list-*.parquet', columns=['size', 'access', 'modify', 'uid', 'path'], engine="pyarrow")


df

df = client.persist(df)

df.dask

%%time

df=df.repartition(partition_size="64MB")

df

%%time

df.map_partitions(len).compute()

df.dask

df = df[df.path.str.startswith("/data/project/ccts/galaxy/")]

In [None]:
%%time

maydf = read_policy_parquet("data/list-policy_projects_2024-05-03/parquet")

In [None]:
maydf = maydf[maydf.path.str.startswith("/data/project/ccts/galaxy/")]

%%time

maydf = maydf.set_index(maydf.path, npartitions="auto")

In [None]:
%%time

augdf = read_policy_parquet("data/list-policy_projects_2023-08-31/parquet")

In [None]:
augdf = augdf[augdf.path.str.startswith("/data/project/ccts/galaxy/")]

In [None]:
%%time

augdf=augdf.repartition(partition_size="64MB")

In [None]:
%%time

maydf=maydf.repartition(partition_size="64MB")

In [None]:
%%time

len(augdf)

In [None]:
%%time

len(maydf)

In [None]:
%%time

augdf = augdf.set_index(augdf.path)

In [None]:
%%time

maydf = maydf.set_index(maydf.path)

In [None]:
%%time
joindf = maydf.join(augdf, how="outer", lsuffix="_may", rsuffix="_aug")

In [None]:
joindf

In [None]:
joindf.dask

In [None]:
%%time

len(joindf)

In [None]:
%%time

len(joindf[joindf.modify_aug.isna()])

In [None]:
%%time

len(joindf[joindf.modify_may.isna()])

In [None]:
modify_comp = joindf.modify_may != joindf.modify_aug

In [None]:
%%time

len(joindf[modify_comp])

In [None]:
%%time

len(joindf[joindf.size_may != joindf.size_aug])

In [None]:
%%time

len(joindf[joindf.size_may == joindf.size_aug])

In [None]:
%%time

len(joindf[joindf.uid_may != joindf.uid_aug])

In [None]:
%%time

len(joindf[joindf.access_may != joindf.access_aug])

In [None]:
%%time

len(joindf[joindf.access_may == joindf.access_aug])

In [None]:
stop

## Aggregate stats into running totals

In [None]:
df1=get_tld(df, tldpath)

In [None]:
df1.dask

%%time

with ProgressBar():
 display(df1.head())

In [None]:
df = df.assign(tld=df1)

In [None]:
df.dask

df = df.drop(columns="path")

In [None]:
df1 = get_year(df, "access")

In [None]:
df = df.assign(year=df1)

df = df.drop(columns=["uid","access"])

In [None]:
df

In [None]:
df.dask

In [None]:
df = client.persist(df)

In [None]:
df.dask

In [None]:
df.head()

In [None]:
def ls_path(df, path):
 tmp = df[df.path.str.match(path)]
 tmp = tmp.assign(tld=get_tld(tmp, path))
 
 return tmp

In [None]:
def du_by_year(df, path, time="access"):
 tmp = df[df.path.str.match(path)]

 tmp = tmp.assign(tld=get_tld(tmp, path))
 
 tmp = tmp.assign(year=get_year(tmp, time))
 
 tmp = tmp.drop(columns=["uid", "access", "path"])
 tmp = client.persist(tmp)
 
 tmp = tmp.groupby(['tld', 'year']).sum()
 
 tmp = tmp.assign(terabytes=tmp["size"]/(10**12))
 
 return tmp
 

In [None]:
def du_by_year(df, path, time="access"):
 tmp = df[df.path.str.match(path)]

 tmp = tmp.assign(tld=get_tld(tmp, path))
 
 tmp = tmp.assign(year=get_year(tmp, time))
 
 tmp = tmp.drop(columns=["uid", time, "path"])
 tmp = client.persist(tmp)
 
 tmp = tmp.groupby(['tld', 'year']).sum()
 
 tmp = tmp.assign(terabytes=tmp["size"]/(10**12))
 
 return tmp

In [None]:
def du_by_tld(df, path, time="access"):
 tmp = df[df.path.str.match(path)]

 
 tmp = tmp.assign(tld=get_tld(tmp, path))
 
 #tmp = tmp.assign(year=get_year(tmp, time))
 
 tmp = tmp.drop(columns=["uid", "access", "path", "year"])
 tmp = client.persist(tmp)
 tmp = tmp.groupby(['tld']).sum()
 
 tmp = tmp.assign(terabytes=tmp["size"]/(10**12))
 
 return tmp

In [None]:
df.dask

In [None]:
df.dask

In [None]:
%time

dudf = du_by_year(df, '/data/project/ccts', "modify")

In [None]:
dudf.dask

%time

dudf = du_by_tld(df, '/data/project/ccts')

In [None]:
%%time

dudf = client.persist(dudf)

In [None]:
dudf.dask

%%time

dudf=dudf.repartition(partition_size="64MB")

In [None]:
dudf

In [None]:
%%time

dudf = client.compute(dudf)

In [None]:
%%time

dudf = dudf.result()

In [None]:
%%time

dudf.sort_values(["tld", "year"])

In [None]:
%%time

tmp=dudf.reset_index()
#tmp[(tmp['tld']=="galaxy")]

In [None]:
tmp[tmp.tld=='galaxy'].sort_values('year')

In [None]:
%%time

dudf.head()

In [None]:
%%time

dudf.groupby("tld").sum()

%%time

lsdf = ls_path(df, '/data/project/ccts/galaxy')

%%time

lsdf = client.persist(lsdf.tld.unique())

lsdf.dask

%%time

lsdf = client.compute(lsdf)

lsdf.result()

In [None]:
df.dask

In [None]:
%%time

dfccts = df[df.path.str.match('/data/project/ccts')]

In [None]:
dfccts.dask

In [None]:
dfccts = dfccts.assign(tld=get_tld(dfccts, '/data/project/ccts/galaxy'))

In [None]:
dfccts

In [None]:
dfccts.dask

In [None]:
%%time

dfccts.head()

In [None]:
df1 = df

lru_projects = ['ICOS', 'boldlab', 'hartmanlab', 'sdtrlab', 'kinglab', 'kobielab', 'MRIPhantom', 'NCRlab', 'bridgeslab', 'hsight', 'kutschlab', 'lcdl', 'metalsgroup', 'rowelab', 'szaflarski_mirman']


condition=df1["tld"].isin(lru_projects)

condition=df1["tld"].isin(["ccts"])

lru=df1[condition]

%%time

with ProgressBar():
 display(lru.head())

df.groupby(['tld', 'year']).size.sum.visualize(node_attr={'penwidth': '6'})

df2 = df.groupby(['tld', 'year']).agg({"size": ["sum", "count"]})

df.groupby('name').x.mean().visualize(node_attr={'penwidth': '6'})

%%time

df2 = report_tld_year(lru)


In [None]:
df

In [None]:
%%time 

df2 = df.groupby(['tld', 'year']).sum()

In [None]:
df2

In [None]:
df2.dask

In [None]:
tbsize = df2["size"]/(10**12)

In [None]:
df2 = df2.assign(terrabytes=tbsize)

In [None]:
df2

In [None]:
df2.dask

In [None]:
report=df2

In [None]:
%%time

report = client.compute(report)

In [None]:
report

## Create final report

Create summary format for gigabyte and terabyte columns https://stackoverflow.com/a/20937592/8928529

report["average_size"] = report["sum"]/report["count"]

report["terabytes"] = report["sum"]/(10**12)
report["terabytes"] = report["terabytes"].map('{:,.2f}'.format)

report["gigabytes"] = report["sum"]/(10**9)
report["gigabytes"] = report["gigabytes"].map('{:,.2f}'.format)

## Save report as CSV

In [None]:
%%time

report = report.result()

In [None]:
report

In [None]:
# only create dir if there is data to pickle
if (len(report) and not os.path.isdir(reportdir)):
 os.mkdir(reportdir)

In [None]:
reportdir

In [None]:
%%time

if (verbose): print(f"report: groupby-tld")
report.to_csv(f"{reportdir}/groupby-tld-dask3.csv.gz")

In [None]:
%%time

report.to_parquet(f"{reportdir}/groupby-tld-year-dask4.parquet")

## Summarize high-level stats

In [None]:
report

In [None]:
report.reset_index()

report[report["sum"] == report["sum"].max()]

In [None]:
report[(report["size"] > 5*10**13)]

report=report.reset_index()

summer = report.groupby("tld").agg("sum", "sum") #[report["sum"] > 10**13

summer["terabytes"] = summer["sum"]/(10**12)
summer["terabytes"] = summer["terabytes"].map('{:,.2f}'.format)

print(summer[summer["sum"] > 10**13].sort_values("sum", ascending=False)[['count', 'terabytes']])

report[(report["sum"] > 10**13) & (report["access"] <= 2021)]

report[(report["sum"] > 10**13) & (report["access"] <= 2021)]["sum"].sum()

report[(report["sum"] <= 10**13) & (report["access"] <= 2021)]["sum"].sum()

report[(report["sum"] > 10**13) & (report["access"] < 2023)]["sum"].sum()/10**12