#!/usr/bin/env Rscript #sessionInfo() devtools::source_url("https://dl.dropboxusercontent.com/u/113630701/datautils/R/core_commons.R") devtools::source_url("https://dl.dropboxusercontent.com/u/113630701/datautils/R/ggplot_commons.R") devtools::source_url("https://raw.githubusercontent.com/holgerbrandl/mdreport/master/R/mdreport-package.r") require.auto(lubridate) if(!exists("reportName")){ argv = commandArgs(TRUE) if(length(argv) == 0){ reportName=".jobs" # reportName=".ipsjobs" # reportName=".trinjob" # reportName=".failchunksblastx" # stop("Usage: RemoveContaminants.R <assemblyBaseName>") }else{ reportName=argv[1] } } reportNiceName <- str_replace_all(reportName, "^[.]", "") md_new(paste("Job Report:", reportNiceName)) #require.auto(data.table) echo("processing job report for '", reportName,"'") jobData <- read.table(concat(reportName, ".cluster_snapshots.txt"), header=F, fill=T) %>% as.df() %>% set_names(c("jobid", "user", "stat", "queue", "from_host", "exec_host", "job_name", "submit_time", "proj_name", "cpu_used", "mem", "swap", "pids", "start_time", "finish_time", "snapshot_time")) %>% transform(jobid=factor(jobid)) %>% arrange(jobid) %>% subset(stat=="RUN") %>% transform(num_cores=str_match(exec_host, "([0-9]+)[*]n")[,2]) #parse_date_time(ac("00:00:00.00"), c("%d:%H:%M.%S")) #parse_date_time(ac("00:04:55.18"), c("%d:%H%M%S")) ## parse the submission time curYear=str_match(ac(jobData$snapshot_time[1]), "-([0-9]*)_")[,2] convertTimes <- function(someDate) parse_date_time(concat(curYear, ac(someDate)), c("%Y/%m/%d-%H%M%S")) convertedTimes <- colwise(convertTimes, .(submit_time, start_time, finish_time))(jobData) jobData <- cbind(subset(jobData, select=!(names(jobData) %in% names(convertedTimes))), convertedTimes) jobData <- transform(jobData, snapshot_time=parse_date_time(ac(snapshot_time), c("%d-%m-%y_%H%M%S"))) splitCPU <- str_split_fixed(jobData$cpu_used, "[.]", 2)[,1] splitCPUhms <- str_split_fixed(splitCPU, ":", 3) cpuSecs <- 3600*as.numeric(splitCPUhms[,1]) + 60*as.numeric(splitCPUhms[,2]) + as.numeric(splitCPUhms[,3]) #splitCPU <- str_sub(splitCPU, 2, str_length(splitCPU)) #as.numeric(as.difftime(jobData[22,]$cpu_used_hms, units="secs")) #jobData <- mutate(jobData, cpu_used_hms=hms(ac(splitCPU)), cpu_used_secs=as.numeric(as.difftime(cpu_used_hms, units="secs")), cpu_used_hours=cpu_used_secs/3600) jobData <- mutate(jobData, cpu_used_secs=cpuSecs, cpu_used_hours=cpu_used_secs/3600) jobData <- mutate(jobData, exec_time=difftime(snapshot_time, start_time, units="secs"), exec_time_min=as.numeric(exec_time)/60, exec_time_hours=as.numeric(exec_time)/3600) ## add the queue limits wallLimits <- c(short=1, medium=8, long=96) jobData <- mutate(jobData, queueLimit=wallLimits[ac(queue)]) #tt <- head(subset(jobData, is.na(cpu_used_secs)), 100) #subset(jobData, cpu_used_secs==max(jobData$cpu_used_secs)) #with(jobData, as.data.frame(table(is.na(cpu_used_secs)))) if(max(jobData$cpu_used_secs)==0){ stop(echo("stopping job report generation for", reportName, "because no cpu time has been consumed")) } ## todo use rollapply to calculate better normalized cpu usage overtime #ggplot(jobData, aes(exec_time_min, cpu_used_secs/(60*exec_time_min), group=jobid)) + geom_line(alpha=0.3) + ggtitle("normalized cpu usage") #ggsave2() save(jobData, file=concat(reportName, ".cluster_snapshots.RData")) #jobData <- local(get(load(concat(reportName, ".cluster_snapshots.RData")))) #ggplot(jobData, aes(exec_time_min, cpu_used_secs, group=jobid)) + geom_line(alpha=0.3) + geom_smooth() + ggtitle("accumulated cpu usage") md_plot(ggplot(jobData, aes(exec_time_hours, cpu_used_hours, group=jobid)) + geom_line(alpha=0.3) + ggtitle("accumulated cpu usage") + geom_vline(aes(xintercept=queueLimit), color="red")) #### ussage per time interval jobDataSlim <- with(jobData, data.frame(jobid, num_cores, cpu_used_secs, exec_time=as.numeric(exec_time))) jobDataCPUChange = ddply(jobDataSlim, .(jobid), subset, diff(cpu_used_secs)!=0) smoothData <- ddply(jobDataCPUChange, .(jobid), mutate, exec_period=c(NA, diff(as.numeric(exec_time))), cpu_usage_in_period=c(NA, diff(cpu_used_secs))) smoothData[is.na(smoothData)] <- 0 #ggplot(smoothData, aes(exec_time, cpu_usage_in_period, color=jobid)) + geom_line() md_plot(ggplot(subset(smoothData, cpu_usage_in_period>0), aes(exec_time/3600, cpu_usage_in_period/(exec_period* as.numeric(as.character(num_cores))), color=num_cores, group=jobid)) + geom_line(alpha=0.3) + xlab("exec time [hours]") + ylab("core normalized cpu usage")) # + scale_color_discrete(name="jobid") ####################################################################################################################### ### sumarize the jobs jobSummaries <- mutate(subset(plyr::arrange(jobData, -1* exec_time), !duplicated(jobid)), pending_time=difftime(start_time, submit_time, units="secs"), pending_time_min=as.numeric(pending_time)/60) jobSummaries <- transform(jobSummaries, jobid=reorder(jobid, as.numeric(jobid))) #ggplot(jobSummaries, aes(pending_time_min)) + geom_histogram() + ggtitle("pending times") + coord_flip() if(nrow(jobSummaries)<50){ md_plot(ggplot(jobSummaries, aes(reorder(jobid, -as.numeric(jobid)), pending_time_min/60)) + geom_bar(stat="identity") + ggtitle("pending times") + coord_flip() + xlab("job id")) }else{ md_plot(ggplot(jobSummaries, aes(as.numeric(jobid), pending_time_min/60)) + geom_area() + ggtitle("pending times")+xlab("job_nr") + ylab("pending time [h]")) } #ggsave2(p=reportName) if(nrow(jobSummaries)<50){ md_plot(ggplot(jobSummaries, aes(reorder(jobid, -as.numeric(jobid)), exec_time_hours)) + geom_bar(stat="identity") + ggtitle("job execution times") + coord_flip() + xlab("job id")) }else{ md_plot(ggplot(jobSummaries, aes(as.numeric(jobid), exec_time_hours)) + geom_area() + ggtitle("job execution times")+ xlab("job_nr") + geom_hline(mapping=aes(yintercept=queueLimit), color="red")) } #ggplot(jobSummaries, aes(as.numeric(jobidx), exec_time_min/pending_time_min)) + geom_area() + ggtitle("pending vs exec time ratio")+xlab("job_nr") md_plot(ggplot(jobSummaries, aes(exec_time_min, pending_time_min)) + geom_point() + ggtitle("pending vs exec time") + geom_abline()) write.delim(jobSummaries, file=concat(reportName, ".jobSummaries.txt")) # jobSummaries <- read.delim("jobSummaries.txt") jobSummaries %>% mutate(pending_time_hours=pending_time_min/60) %>% select(jobid, exec_host, job_name, cpu_used_hours, pending_time_hours, exec_time_hours) %>% md_table("Job Summaries") md_report(paste0(reportNiceName, "_batch_report"), open=F) ####################################################################################################################### ## create warning email if jobs died ## todo finish send mail if wall time was exceeded numKilled=nrow(subset(jobSummaries, exec_time_hours>queueLimit)) numTotal= nrow(jobSummaries) if(numKilled >0){ system(paste("mailme '",numKilled,"out of ",numTotal," jobs in ", getwd(), " died because of queue length limitation'")) }