CreateJobReport.R.old 7.63 KB
Newer Older
Holger Brandl's avatar
Holger Brandl committed
1 2 3 4 5
#!/usr/bin/env Rscript

#sessionInfo()


6 7
devtools::source_url("https://raw.githubusercontent.com/holgerbrandl/datautils/v1.20/R/core_commons.R")
devtools::source_url("https://raw.githubusercontent.com/holgerbrandl/datautils/v1.20/R/ggplot_commons.R")
Holger Brandl's avatar
Holger Brandl committed
8

Holger Brandl's avatar
Holger Brandl committed
9
require_auto(lubridate)
Holger Brandl's avatar
Holger Brandl committed
10 11 12 13 14 15 16 17 18


if(!exists("reportName")){
    argv = commandArgs(TRUE)

    if(length(argv) == 0){
        reportName=".jobs"
        # reportName=".ipsjobs"
        # reportName=".trinjob"
Holger Brandl's avatar
Holger Brandl committed
19
        # reportName=".blastn"
Holger Brandl's avatar
Holger Brandl committed
20 21 22 23 24 25 26 27
        # reportName=".failchunksblastx"
    #    stop("Usage: RemoveContaminants.R <assemblyBaseName>")
    }else{
        reportName=argv[1]
    }
}

reportNiceName <- str_replace_all(reportName, "^[.]", "")
28
#> # Job Report:  `r reportNiceName`
Holger Brandl's avatar
Holger Brandl committed
29 30


Holger Brandl's avatar
Holger Brandl committed
31
echo("processing job report for '", reportName,"'")
Holger Brandl's avatar
Holger Brandl committed
32

Holger Brandl's avatar
Holger Brandl committed
33
jobData <- read.table(paste0(reportName, ".cluster_snapshots.txt"), header=F, fill=T) %>% as.df() %>%
Holger Brandl's avatar
Holger Brandl committed
34 35 36
    set_names(c("jobid", "user", "stat", "queue", "from_host", "exec_host", "job_name", "submit_time", "proj_name", "cpu_used", "mem", "swap", "pids", "start_time", "finish_time", "snapshot_time")) %>%
    transform(jobid=factor(jobid)) %>%
    arrange(jobid) %>%
Holger Brandl's avatar
Holger Brandl committed
37 38 39 40
    subset(stat=="RUN")

if(nrow(jobData)==0){
    system(paste("mailme 'no jobs were run in  ",normalizePath(reportName),"'"))
Holger Brandl's avatar
Holger Brandl committed
41
    warning(paste("no jobs were run in  ",normalizePath(reportName)))
Holger Brandl's avatar
Holger Brandl committed
42 43 44 45 46 47 48 49 50 51 52 53
    stop(-1)
}

#jobData %>% count(jobid) %>% nrow


## extract multi-threading number
jobData %<>%    transform(num_cores=str_match(exec_host, "([0-9]+)[*]n")[,2]) %>% mutate(num_cores=ifelse(is.na(num_cores), 1, num_cores))



jobData %>% count(exec_host)
Holger Brandl's avatar
Holger Brandl committed
54

Holger Brandl's avatar
Holger Brandl committed
55 56
jobData %>% select(submit_time, start_time, finish_time) %>% head
#    filter(finish_time!="-") %>% head
Holger Brandl's avatar
Holger Brandl committed
57 58 59 60 61

#parse_date_time(ac("00:00:00.00"), c("%d:%H:%M.%S"))
#parse_date_time(ac("00:04:55.18"), c("%d:%H%M%S"))
## parse the submission time
curYear=str_match(ac(jobData$snapshot_time[1]), "-([0-9]*)_")[,2]
Holger Brandl's avatar
Holger Brandl committed
62
convertTimes <- function(someDate) parse_date_time(paste0(curYear, ac(someDate)), c("%Y/%m/%d-%H%M%S"))
Holger Brandl's avatar
Holger Brandl committed
63 64 65 66
#convertedTimes <- colwise(convertTimes, .(submit_time, start_time, finish_time))(jobData)
#jobData <- cbind(subset(jobData, select=!(names(jobData) %in% names(convertedTimes))), convertedTimes)

jobData %<>% mutate_each(funs(convertTimes), submit_time, start_time, finish_time)
Holger Brandl's avatar
Holger Brandl committed
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83


jobData <- transform(jobData, snapshot_time=parse_date_time(ac(snapshot_time), c("%d-%m-%y_%H%M%S")))


splitCPU <- str_split_fixed(jobData$cpu_used, "[.]", 2)[,1]
splitCPUhms <- str_split_fixed(splitCPU, ":", 3)
cpuSecs <- 3600*as.numeric(splitCPUhms[,1]) + 60*as.numeric(splitCPUhms[,2]) + as.numeric(splitCPUhms[,3])
#splitCPU <- str_sub(splitCPU, 2, str_length(splitCPU))

#as.numeric(as.difftime(jobData[22,]$cpu_used_hms, units="secs"))
#jobData <- mutate(jobData, cpu_used_hms=hms(ac(splitCPU)), cpu_used_secs=as.numeric(as.difftime(cpu_used_hms, units="secs")), cpu_used_hours=cpu_used_secs/3600)
jobData <- mutate(jobData, cpu_used_secs=cpuSecs, cpu_used_hours=cpu_used_secs/3600)
jobData <- mutate(jobData, exec_time=difftime(snapshot_time, start_time, units="secs"), exec_time_min=as.numeric(exec_time)/60, exec_time_hours=as.numeric(exec_time)/3600)


## add the queue limits
Holger Brandl's avatar
Holger Brandl committed
84
wallLimits <- c(short=1, medium=8, long=96)
Holger Brandl's avatar
Holger Brandl committed
85 86 87 88 89 90 91 92
jobData <- mutate(jobData, queueLimit=wallLimits[ac(queue)])


#tt <- head(subset(jobData, is.na(cpu_used_secs)), 100)
#subset(jobData, cpu_used_secs==max(jobData$cpu_used_secs))
#with(jobData, as.data.frame(table(is.na(cpu_used_secs))))

if(max(jobData$cpu_used_secs)==0){
Holger Brandl's avatar
Holger Brandl committed
93
    stop(echo("stopping job report generation for", reportName, "because no cpu time has been consumed"))
94
    quit()
Holger Brandl's avatar
Holger Brandl committed
95 96 97 98 99 100 101 102
}


## todo use rollapply to calculate better normalized cpu usage overtime
#ggplot(jobData, aes(exec_time_min, cpu_used_secs/(60*exec_time_min), group=jobid)) + geom_line(alpha=0.3) + ggtitle("normalized cpu usage")
#ggsave2()


Holger Brandl's avatar
Holger Brandl committed
103
save(jobData, file=paste0(reportName, ".cluster_snapshots.RData"))
Holger Brandl's avatar
Holger Brandl committed
104 105
#jobData <- local(get(load(concat(reportName, ".cluster_snapshots.RData"))))

Holger Brandl's avatar
Holger Brandl committed
106
#ggplot(jobData, aes(exec_time_min, cpu_used_secs, group=jobid)) + geom_line(alpha=0.3) + geom_smooth() + ggtitle("accumulated cpu usage")
Holger Brandl's avatar
Holger Brandl committed
107
ggplot(jobData, aes(exec_time_hours, cpu_used_hours, group=jobid)) + geom_line(alpha=0.3)  + ggtitle("accumulated cpu usage") + geom_vline(aes(xintercept=queueLimit), color="red")
Holger Brandl's avatar
Holger Brandl committed
108 109 110 111 112 113 114 115

#### ussage per time interval
jobDataSlim <- with(jobData, data.frame(jobid,  num_cores, cpu_used_secs, exec_time=as.numeric(exec_time)))
jobDataCPUChange = ddply(jobDataSlim, .(jobid), subset, diff(cpu_used_secs)!=0)
smoothData <- ddply(jobDataCPUChange, .(jobid), mutate, exec_period=c(NA, diff(as.numeric(exec_time))), cpu_usage_in_period=c(NA, diff(cpu_used_secs)))
smoothData[is.na(smoothData)] <- 0

#ggplot(smoothData, aes(exec_time, cpu_usage_in_period, color=jobid)) + geom_line()
Holger Brandl's avatar
Holger Brandl committed
116 117 118 119
ggplot(subset(smoothData, cpu_usage_in_period>0), aes(exec_time/3600, cpu_usage_in_period/(exec_period* as.numeric(as.character(num_cores))), color=num_cores, group=jobid)) +
    geom_line(alpha=0.3) +
    xlab("exec time [hours]") +
    ylab("core normalized cpu usage") # + scale_color_discrete(name="jobid")
Holger Brandl's avatar
Holger Brandl committed
120 121 122 123 124 125 126 127 128 129 130



#######################################################################################################################
### sumarize the jobs
jobSummaries <- mutate(subset(plyr::arrange(jobData, -1* exec_time), !duplicated(jobid)), pending_time=difftime(start_time, submit_time,  units="secs"), pending_time_min=as.numeric(pending_time)/60)
jobSummaries <- transform(jobSummaries, jobid=reorder(jobid, as.numeric(jobid)))


#ggplot(jobSummaries, aes(pending_time_min)) + geom_histogram() + ggtitle("pending times") + coord_flip()
if(nrow(jobSummaries)<50){
Holger Brandl's avatar
Holger Brandl committed
131
    ggplot(jobSummaries, aes(reorder(jobid, -as.numeric(jobid)), pending_time_min/60)) + geom_bar(stat="identity") + ggtitle("pending times") + coord_flip() + xlab("job id")
Holger Brandl's avatar
Holger Brandl committed
132
}else{
Holger Brandl's avatar
Holger Brandl committed
133
    ggplot(jobSummaries, aes(as.numeric(jobid), pending_time_min/60)) + geom_area() + ggtitle("pending times")+xlab("job_nr") + ylab("pending time [h]")
Holger Brandl's avatar
Holger Brandl committed
134 135 136 137
}
#ggsave2(p=reportName)

if(nrow(jobSummaries)<50){
Holger Brandl's avatar
Holger Brandl committed
138
    ggplot(jobSummaries, aes(reorder(jobid, -as.numeric(jobid)), exec_time_hours)) + geom_bar(stat="identity") + ggtitle("job execution times") + coord_flip() + xlab("job id")
Holger Brandl's avatar
Holger Brandl committed
139
}else{
Holger Brandl's avatar
Holger Brandl committed
140
    ggplot(jobSummaries, aes(as.numeric(jobid), exec_time_hours))  + geom_area() + ggtitle("job execution times")+ xlab("job_nr") + geom_hline(mapping=aes(yintercept=queueLimit), color="red")
Holger Brandl's avatar
Holger Brandl committed
141 142 143
}

#ggplot(jobSummaries, aes(as.numeric(jobidx), exec_time_min/pending_time_min)) + geom_area() + ggtitle("pending vs exec time ratio")+xlab("job_nr")
Holger Brandl's avatar
Holger Brandl committed
144
ggplot(jobSummaries, aes(exec_time_min, pending_time_min)) + geom_point() + ggtitle("pending vs exec time") + geom_abline()
Holger Brandl's avatar
Holger Brandl committed
145

146 147
jobSummaries %<>% mutate(exceeded_queue_limit=exec_time_hours>queueLimit)

Holger Brandl's avatar
Holger Brandl committed
148
write.delim(jobSummaries, file=paste0(reportName, ".jobSummaries.txt"))
Holger Brandl's avatar
Holger Brandl committed
149 150
# jobSummaries <- read.delim("jobSummaries.txt")

Holger Brandl's avatar
Holger Brandl committed
151
require_auit(knitr)
Holger Brandl's avatar
Holger Brandl committed
152
jobSummaries %>% mutate(pending_time_hours=pending_time_min/60) %>% select(jobid, exec_host, job_name, cpu_used_hours, pending_time_hours, exec_time_hours) %>% kable()
Holger Brandl's avatar
Holger Brandl committed
153 154 155 156 157 158 159


#######################################################################################################################
## create warning email if jobs died
## todo finish send mail if wall time was exceeded


Holger Brandl's avatar
Holger Brandl committed
160
numKilled=nrow(filter(jobSummaries, exceeded_queue_limit))
Holger Brandl's avatar
Holger Brandl committed
161
numTotal= nrow(jobSummaries)
162

163
killedListFile=paste0(reportName, ".killed_jobs.txt")
Holger Brandl's avatar
Holger Brandl committed
164 165
if(numKilled >0){
    system(paste("mailme '",numKilled,"out of ",numTotal," jobs in ", getwd(), " died because of queue length limitation'"))
166 167 168 169
    filter(jobSummaries, exceeded_queue_limit) %$% writeLines(jobid, con=killedListFile)
}else{
    ## Create an empty killed list to indicate that we actually looked into it
    file.create(killedListFile)
Holger Brandl's avatar
Holger Brandl committed
170 171
}