Skip to content
Snippets Groups Projects
Commit 5ee8f9f3 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Renamed LambdaDetector to ZmqDetector

parent de388ba2
No related branches found
No related tags found
No related merge requests found
......@@ -8,9 +8,10 @@ localTarget = /space/projects/zeromq-data-transfer/data/target
#localTarget = /home/kuhnm/Arbeit/zeromq-data-transfer/data/target
#localTarget = /gpfs
# Type of event detector to use (options are: inotifyx, watchdog)
# Type of event detector to use (options are: inotifyx, watchdog, zmq)
#eventDetectorType = inotifyx
eventDetectorType = watchdog
#eventDetectorType = zmq
# Event type of files to be monitored (options are: IN_CLOSE_WRITE, IN_MOVED_TO, ...)
monitoredEventType = IN_CLOSE_WRITE
#monitoredEventType = IN_MOVED_TO
......@@ -39,16 +40,19 @@ fixedStreamPort = 50100
# Port number to receive signals from
comPort = 50000
# ZMQ port to forward requests
requestFwPort = 50002
# ZMQ port to get new requests
requestPort = 50001
# ZMQ port to forward requests
requestFwPort = 50002
# ZMQ port to get events from (only needed if eventDetectorType is zmq)
eventPort = 50003
# ZMQ-router port which coordinates the load-balancing to the worker-processes
routerPort = 50003
routerPort = 50004
# ZMQ-pull-socket port which deletes/moves given files
cleanerPort = 50004
cleanerPort = 50005
# Chunk size of file-parts getting send via zmq
chunkSize = 10485760 ; # = 1024*1024*10
......
......@@ -4,9 +4,10 @@ monitoredDir = D:\zeromq-data-transfer\data\source
# Target to move the files into
localTarget = D:\zeromq-data-transfer\data\target
# Type of event detector to use (options are: inotifyx, watchdog)
# Type of event detector to use (options are: inotifyx, watchdog, zmq)
#eventDetectorType = inotifyx
eventDetectorType = watchdog
#eventDetectorType = zmq
# Event type of files to be monitored (options are: IN_CLOSE_WRITE, IN_MOVED_TO, ...)
monitoredEventType = IN_CLOSE_WRITE
# Subdirectories of watchDir to be monitored
......@@ -33,16 +34,19 @@ fixedStreamPort = 50100
# Port number to receive signals from
comPort = 50000
# ZMQ port to forward requests
requestFwPort = 50002
# ZMQ port to get new requests
requestPort = 50001
# ZMQ port to forward requests
requestFwPort = 50002
# ZMQ port to get events from (only needed if eventDetectorType is zmq)
eventPort = 50003
# ZMQ-router port which coordinates the load-balancing to the worker-processes
routerPort = 50003
routerPort = 50004
# ZMQ-pull-socket port which deletes/moves given files
cleanerPort = 50004
cleanerPort = 50005
# Chunk size of file-parts getting send via zmq
chunkSize = 10485760 ; # = 1024*1024*10
......
......@@ -62,6 +62,7 @@ def argumentParsing():
numberOfStreams = config.get('asection', 'numberOfStreams')
chunkSize = int(config.get('asection', 'chunkSize'))
eventPort = config.get('asection', 'eventPort')
routerPort = config.get('asection', 'routerPort')
localTarget = config.get('asection', 'localTarget')
......@@ -139,6 +140,10 @@ def argumentParsing():
help = "Chunk size of file-parts getting send via ZMQ (default=" + str(chunkSize) + ")",
default = chunkSize )
parser.add_argument("--eventPort" , type = str,
help = "ZMQ port to get events from (only needed if eventDetectorType is zmq; \
default=" + str(eventPort) + ")",
default = routerPort )
parser.add_argument("--routerPort" , type = str,
help = "ZMQ-router port which coordinates the load-balancing \
to the worker-processes (default=" + str(routerPort) + ")",
......@@ -161,7 +166,7 @@ def argumentParsing():
onScreen = arguments.onScreen
eventDetectorType = arguments.eventDetectorType.lower()
supportedEDTypes = ["inotifyx", "watchdog"]
supportedEDTypes = ["inotifyx", "watchdog", "zmq"]
monitoredDir = str(arguments.monitoredDir)
monitoredSubdirs = arguments.monitoredSubdirs
localTarget = str(arguments.localTarget)
......@@ -245,11 +250,12 @@ class DataManager():
"monSuffixes" : arguments.monitoredFormats,
"timeTillClosed" : arguments.timeTillClosed
}
elif arguments.eventDetectorType == "lambda":
elif arguments.eventDetectorType == "zmq":
self.eventDetectorConfig = {
"eventPort" : "6001", #arguments.eventPort,
"eventDetectorType" : arguments.eventDetectorType,
"eventPort" : arguments.eventPort,
"numberOfStreams" : arguments.numberOfStreams,
"context" : None,
"context" : None, #TODO
}
......
......@@ -8,7 +8,7 @@ import cPickle
from logutils.queue import QueueHandler
class LambdaDetector():
class ZmqDetector():
def __init__(self, config, logQueue):
......@@ -50,7 +50,7 @@ class LambdaDetector():
def getLogger (self, queue):
# Create log and set handler to queue handle
h = QueueHandler(queue) # Just the one handler needed
logger = logging.getLogger("lambdaDetector")
logger = logging.getLogger("zmqDetector")
logger.propagate = False
logger.addHandler(h)
logger.setLevel(logging.DEBUG)
......@@ -122,7 +122,7 @@ if __name__ == '__main__':
import helpers
logfile = BASE_PATH + os.sep + "logs" + os.sep + "lambdaDetector.log"
logfile = BASE_PATH + os.sep + "logs" + os.sep + "zmqDetector.log"
logsize = 10485760
logQueue = Queue(-1)
......@@ -144,14 +144,14 @@ if __name__ == '__main__':
eventPort = "6001"
numberOfStreams = 4
config = {
"eventDetectorType" : "lambda",
"eventDetectorType" : "zmq",
"eventPort" : eventPort,
"numberOfStreams" : numberOfStreams,
"context" : None,
}
eventDetector = LambdaDetector(config, logQueue)
eventDetector = ZmqDetector(config, logQueue)
sourceFile = BASE_PATH + os.sep + "test_file.cbf"
targetFileBase = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep + "raw" + os.sep
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment