From 7a3e433588bd0fe3079b2af44aeeb71a42a60012 Mon Sep 17 00:00:00 2001 From: Manuela Kuhn <manuela.kuhn@desy.de> Date: Fri, 17 Jul 2015 14:32:02 +0200 Subject: [PATCH] Added test programs --- ZeroMQTunnel/named_pipe.py | 17 +++ ZeroMQTunnel/startZeromq.bash | 210 ++++++++++++++++++++++++++++++++++ ring_buffer_test.py | 60 ++++++++++ 3 files changed, 287 insertions(+) create mode 100644 ZeroMQTunnel/named_pipe.py create mode 100755 ZeroMQTunnel/startZeromq.bash create mode 100644 ring_buffer_test.py diff --git a/ZeroMQTunnel/named_pipe.py b/ZeroMQTunnel/named_pipe.py new file mode 100644 index 00000000..8dbb7642 --- /dev/null +++ b/ZeroMQTunnel/named_pipe.py @@ -0,0 +1,17 @@ +import os, time + +pipe_path = "/tmp/mypipe" +if not os.path.exists(pipe_path): + os.mkfifo(pipe_path) +# Open the fifo. We need to open in non-blocking mode or it will stalls until +# someone opens it for writting +pipe_fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK) +with os.fdopen(pipe_fd) as pipe: + while True: + message = pipe.read() + if message: + print("Received: '%s'" % message) + pathnames = message.splitlines() + print pathnames + print("Doing other stuff") + time.sleep(5) diff --git a/ZeroMQTunnel/startZeromq.bash b/ZeroMQTunnel/startZeromq.bash new file mode 100755 index 00000000..07375c03 --- /dev/null +++ b/ZeroMQTunnel/startZeromq.bash @@ -0,0 +1,210 @@ +#!/usr/bin/env bash + + +#defaults values. will be overwritten by argument parsing if option has been set +SOURCE_DIR="/space/projects/Live_Viewer/source/" # directory which will be watched for new files. +#SOURCE_DIR="/tmp/data" # directory which will be watched for new files. +TARGET_IP="127.0.0.1" # where new files will be send to via network +TARGET_PORT="6061" # where new files will be send to via network +PARALLEL_DATA_STREAMS=1 # number of parallel streams used to send data to target +PYTHON_CMD=$(which python) # executable of python +LOG_DIR="/space/projects/Live_Viewer/logs" # place to put logfiles +#LOG_DIR="/tmp/log" # place to put logfiles +FILE_MOVER_IP="127.0.0.1" # ip-adress of fileMover process, it is waiting for new file events +FILE_MOVER_PORT="6060" # port of fileMover process, it is waiting for new file events +VERBOSE=0 # set to 1 for more verbose output +PID_WATCHFOLDER="" +PID_FILEMOVER="" + +startWatchFolder() +{ + echo "starting 'watcher_lsyncd.py'..." + + if [ "${VERBOSE}" == "0" ] + then + ${PYTHON_CMD} watcher_lsyncd.py --watchFolder="${SOURCE_DIR}" \ + --logfilePath="${LOG_DIR}" \ + --pushServerIp="${FILE_MOVER_IP}" \ + --pushServerPort=${FILE_MOVER_PORT} \ + --verbose 1>/dev/null 2>/dev/null & + else + ${PYTHON_CMD} watcher_lsyncd.py --watchFolder="${SOURCE_DIR}" \ + --logfilePath="${LOG_DIR}" \ + --pushServerIp="${FILE_MOVER_IP}" \ + --pushServerPort=${FILE_MOVER_PORT} 1>/dev/null 2>/dev/null & + fi + local PID=${!} + echo "starting 'watcher_lsyncd.py'... started as background job with PID=${PID}" + PID_WATCHFOLDER=${PID} + + #to kill single process + #kill -s 24 ${PID} + +} + + +startFileMover() +{ + + echo "starting 'fileMover.py'..." + if [ "${VERBOSE}" == "0" ] + then + ${PYTHON_CMD} fileMover.py --logfilePath="${LOG_DIR}" \ + --bindingIpForSocket="127.0.0.1" \ + --logfileName="fileMover.log" \ + --parallelDataStreams ${PARALLEL_DATA_STREAMS} \ + --dataStreamIp="${TARGET_IP}" \ + --dataStreamPort="${TARGET_PORT}" 1>/dev/null 2>/dev/null & + else + ${PYTHON_CMD} fileMover.py --logfilePath="${LOG_DIR}" \ + --bindingIpForSocket="127.0.0.1" \ + --logfileName="fileMover.log" \ + --parallelDataStreams ${PARALLEL_DATA_STREAMS} \ + --dataStreamIp="${TARGET_IP}" \ + --dataStreamPort="${TARGET_PORT}" \ + --verbose 1>/dev/null 2>/dev/null & + fi + local PID=${!} + echo "starting 'fileMover.py'... started as background job with PID=${PID}" + PID_FILEMOVER=${PID} + + #to kill process + #kill -s 24 ${PID} +} + + +printHelpMessage() +{ + echo "" +} + + +printOptions() +{ + printf "\ +SOURCE_DIR = %s \n\ +TARGET_IP = %s \n\ +TARGET_PORT = %s \n\ +PARALLEL_DATA_STREAMS = %s \n\ +PYTHON_CMD = %s \n\ +LOG_DIR = %s \n\ +FILE_MOVER_IP = %s \n\ +FILE_MOVER_PORT = %s \n\ +VERBOSE = %s \n\ +" "${SOURCE_DIR}" \ + "${TARGET_IP}" \ + "${TARGET_PORT}" \ + "${PARALLEL_DATA_STREAMS}" \ + "${PYTHON_CMD}" \ + "${LOG_DIR}" \ + "${FILE_MOVER_IP}" \ + "${FILE_MOVER_PORT}" \ + "${VERBOSE}" + +} + + + +argumentParsing() +{ + local ARGUMENT_COUNT=$# + + +# if [ $ARGUMENT_COUNT -lt 1 ] +# then +# printHelpMessage +# exit 0 +# fi + + while [[ $# > 1 ]] +# while true + do + key="$1" + + case $key in + -s|--parallelDataStreams) + PARALLEL_DATA_STREAMS="$2" + shift + ;; + -d|--sourceDirectory) + SOURCE_DIR="$2" + shift + ;; + -l|--logDir) + LOG_DIR="$2" + shift + ;; + -fmi|--fileMoverIp) + FILE_MOVER_IP="$2" + shift + ;; + -fmp|--fileMoverPort) + FILE_MOVER_PORT="$2" + shift + ;; + -v|--verbose) + VERBOSE=1 + shift + ;; + *) + # unknown option + ;; + esac + shift + done + + return 0 +} + + + +checkDependencies() +{ + local FAILED_DEPENDENCIES=0 + #watchdog + echo "import watchdog" | python 1> /dev/null 2> /dev/null + local DEPENDENCY_WATCHDOG=${?} + if [ "${DEPENDENCY_WATCHDOG}" -ne "0" ] + then + echo "Missing python library: watchdog" + FAILED_DEPENDENCIES=1 + fi + + #zeromq + echo "import zmq" | python 1> /dev/null 2> /dev/null + local DEPENDENCY_WATCHDOG=${?} + if [ "${DEPENDENCY_WATCHDOG}" -ne "0" ] + then + echo "Missing python library: zmq" + FAILED_DEPENDENCIES=1 + fi + + #exit on error + if [ ${FAILED_DEPENDENCIES} == 1 ] + then + echo "aborting. missing libraries detected." + exit 1 + fi +} + + + +argumentParsing $@ + +checkDependencies + +printOptions + +startWatchFolder + +startFileMover + + +echo +echo "You can now take data in '${SOURCE_DIR}'" +echo + +#get process group of all sub-processed started during the script +PROCESS_GROUP_ID=$(ps x -o "%p %r %c" | grep ${PID_FILEMOVER} | awk '{print $2}' | head -n1) +echo "to stop execute: kill -- -${PROCESS_GROUP_ID}" +echo diff --git a/ring_buffer_test.py b/ring_buffer_test.py new file mode 100644 index 00000000..1e6c37af --- /dev/null +++ b/ring_buffer_test.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python + +#http://stackoverflow.com/questions/168409/how-do-you-get-a-directory-listing-sorted-by-creation-date-in-python + +from stat import S_ISREG, ST_MTIME, ST_MODE +import os, sys, time +import bisect + +RING_BUFFER_SIZE = 5 + +# path to the directory (relative or absolute) +dirpath = "/space/projects/Live_Viewer/target/local" + +# get all entries in the directory +entries = (os.path.join(dirpath, fn) for fn in os.listdir(dirpath)) +# get the corresponding stats +entries = ((os.stat(path), path) for path in entries) + +# leave only regular files, insert modification date +entries = [[stat[ST_MTIME], path] + for stat, path in entries if S_ISREG(stat[ST_MODE])] + +entries = sorted(entries, reverse=True) +len_entries = len(entries) +print entries + +#print entries +#print len_entries + +#targetFilepath = "/space/projects/Live_Viewer/test.tif" +#entries[:0] = [[os.stat(path)[ST_MTIME], targetFilepath]] +#print "after prepend" +#print entries + +filename = "/space/projects/Live_Viewer/test.tif" +#fileModTime = os.stat(filename)[ST_MTIME] +fileModTime = 1436956680 + +entries[:0] = [[fileModTime, filename]] +print "after insort" +print entries + +entries = sorted(entries, reverse=True) +print "sort again" +print entries + + +if len_entries > RING_BUFFER_SIZE: + print "files to remove" + for mod_time, path in entries[RING_BUFFER_SIZE:]: + print mod_time, path + pass +# os.remove(path) +# entries.remove([mod_time, path]) + + +print +print "content" +for cdate, path in entries: + print time.ctime(cdate), os.path.basename(path) -- GitLab