Skip to content
Snippets Groups Projects
Commit bec5b4d2 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Tested + Fixed TaskProvider

parent 0de834bf
No related branches found
No related tags found
No related merge requests found
......@@ -79,8 +79,9 @@ class InotifyxDetector():
checkPassed = True
if ( not config.has_key("monDir") or
not config.has_key("monEventType") or
not config.has_key("monDefSubdirs") or
not config.has_key("monSubdirs") or
not config.has_key("monSuffixes") ):
print "config", config
self.log.error ("Configuration of wrong format")
checkPassed = False
......
......@@ -53,9 +53,9 @@ class SignalHandler():
self.whiteList.append(host)
# sockets
self.comSocket = None
self.signalFwSocket = None
self.requestSocket = None
self.comSocket = None
self.requestFwSocket = None
self.requestSocket = None
self.log = self.getLogger()
self.log.debug("Init")
......@@ -91,13 +91,13 @@ class SignalHandler():
# setting up router for load-balancing worker-processes.
# each worker-process will handle a file event
self.signalFwSocket = self.context.socket(zmq.REP)
self.requestFwSocket = self.context.socket(zmq.REP)
connectionStr = "tcp://{ip}:{port}".format(ip=self.localhost, port=self.signalFwPort)
try:
self.signalFwSocket.bind(connectionStr)
self.log.debug("signalFwSocket started (bind) for '" + connectionStr + "'")
self.requestFwSocket.bind(connectionStr)
self.log.debug("requestFwSocket started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start signalFwSocket (bind): '" + connectionStr + "'")
self.log.error("Failed to start requestFwSocket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
# create socket to receive requests
......@@ -113,7 +113,7 @@ class SignalHandler():
# Poller to distinguish between start/stop signals and queries for the next set of signals
self.poller = zmq.Poller()
self.poller.register(self.comSocket, zmq.POLLIN)
self.poller.register(self.signalFwSocket, zmq.POLLIN)
self.poller.register(self.requestFwSocket, zmq.POLLIN)
self.poller.register(self.requestSocket, zmq.POLLIN)
......@@ -123,12 +123,12 @@ class SignalHandler():
while True:
socks = dict(self.poller.poll())
if self.signalFwSocket in socks and socks[self.signalFwSocket] == zmq.POLLIN:
if self.requestFwSocket in socks and socks[self.requestFwSocket] == zmq.POLLIN:
try:
incomingMessage = self.signalFwSocket.recv()
incomingMessage = self.requestFwSocket.recv()
if incomingMessage == "STOP":
self.signalFwSocket.send(incomingMessage)
self.requestFwSocket.send(incomingMessage)
time.sleep(0.1)
break
self.log.debug("New request for signals received.")
......@@ -140,11 +140,11 @@ class SignalHandler():
openRequests.append(tmp)
if openRequests:
self.signalFwSocket.send(str(openRequests))
self.requestFwSocket.send(str(openRequests))
self.log.debug("Answered to request: " + str(openRequests))
else:
openRequests = ["None"]
self.signalFwSocket.send(str(openRequests))
self.requestFwSocket.send(str(openRequests))
self.log.debug("Answered to request: " + str(openRequests))
except Exception, e:
self.log.error("Failed to receive/answer new signal requests.")
......@@ -319,7 +319,7 @@ class SignalHandler():
def stop (self):
self.log.debug("Closing sockets")
self.comSocket.close(0)
self.signalFwSocket.close(0)
self.requestFwSocket.close(0)
self.requestSocket.close(0)
......@@ -358,7 +358,7 @@ if __name__ == '__main__':
def __exit__(self):
self.requestFwSocket.close(0)
#self.context.destroy()
self.context.destroy()
# def __del__(self):
# self.requestFwSocket.close(0)
......
......@@ -24,6 +24,9 @@ class TaskProvider():
# monSuffixes : ... ,
#}
print "eventDetectorConfig", eventDetectorConfig
self.log = self.getLogger()
self.log.debug("TaskProvider: __init__()")
......@@ -95,7 +98,7 @@ class TaskProvider():
# socket to disribute the events to the worker
self.distrSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.requestFwPort )
connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.distrPort )
try:
self.distrSocket.bind(connectionStr)
self.log.debug("Connecting to distributing socket (bind): " + str(connectionStr))
......@@ -148,8 +151,9 @@ class TaskProvider():
# send the file to the fileMover
try:
self.log.debug("Sending message...")
self.log.debug(str(messageDict))
self.distrSocket.send_multipart([messageDict, requests])
message = [messageDict] + requests
self.log.debug(str(message))
self.distrSocket.send_multipart(message)
self.log.debug("Sending message...done.")
except Exception, e:
self.log.error("Sending message...failed.")
......@@ -187,13 +191,40 @@ if __name__ == '__main__':
sys.path.append ( SRC_PATH )
import shared.helperScript as helperScript
import time
class requestResponder():
def __init__ (self, requestFwPort, context = None):
self.context = context or zmq.Context.instance()
self.requestFwSocket = self.context.socket(zmq.REP)
connectionStr = "tcp://127.0.0.1:" + requestFwPort
self.requestFwSocket.bind(connectionStr)
logging.info("[requestResponder] requestFwSocket started (bind) for '" + connectionStr + "'")
self.run()
def run (self):
logging.info("[requestResponder] Start run")
openRequests = ['zitpcx19282:6004', 'zitpcx19282:6005']
while True:
request = self.requestFwSocket.recv()
logging.debug("[requestResponder] Received request: " + str(request) )
self.requestFwSocket.send_multipart(openRequests)
logging.debug("[requestResponder] Answer: " + str(openRequests) )
def __exit__(self):
self.requestFwSocket.close(0)
self.context.destroy()
#enable logging
helperScript.initLogging("/space/projects/live-viewer/logs/signalHandler.log", verbose=True, onScreenLogLevel="debug")
eventDetectorConfig = {
"configType" : "inotifyx",
"monDir" : "/space/projects/live-viewer/data/src",
"monDir" : "/space/projects/live-viewer/data/source",
"monEventType" : "IN_CLOSE_WRITE",
"monSubdirs" : ["commissioning", "current", "local"],
"monSuffixes" : [".tif", ".cbf"]
......@@ -205,4 +236,26 @@ if __name__ == '__main__':
taskProviderPr = Process ( target = TaskProvider, args = (eventDetectorConfig, requestFwPort, distrPort) )
taskProviderPr.start()
taskProviderPr.join()
requestResponderPr = Process ( target = requestResponder, args = ( requestFwPort, ) )
requestResponderPr.start()
context = zmq.Context.instance()
distrSocket = context.socket(zmq.PULL)
connectionStr = "tcp://localhost:" + distrPort
distrSocket.connect(connectionStr)
logging.info("=== distrSocket connected to " + connectionStr)
try:
while True:
workload = distrSocket.recv_multipart()
logging.info("=== next workload " + str(workload))
except KeyboardInterrupt:
pass
finally:
requestResponderPr.terminate()
taskProviderPr.terminate()
distrSocket.close(0)
context.destroy()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment