Newer
Older
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import argparse
import zmq
import os
import logging
import sys
import json
import trace
# -------------------------- class: TaskProvider --------------------------------------
def __init__ (self, eventDetectorConfig, requestFwPort, routerPort, context = None):
# configType : ... ,
# monDir : ... ,
# monSubdirs : ... ,
# monSuffixes : ... ,
print "eventDetectorConfig", eventDetectorConfig
self.log.debug("TaskProvider: __init__()")
self.eventDetector = None
self.config = eventDetectorConfig
self.localhost = "127.0.0.1"
self.extIp = "0.0.0.0"
self.requestFwPort = requestFwPort
self.log.debug("Registering ZMQ context")
# remember if the context was created outside this class or not
if context:
self.context = context
self.extContext = True
else:
self.context = zmq.Context()
self.extContext = False
if self.config.has_key("configType") and self.config["configType"] == "inotifyx":
from InotifyxDetector import InotifyxDetector as EventDetector
# check format of config
if ( not self.config.has_key("monDir") or
not self.config.has_key("monEventType") or
not self.config.has_key("monSubdirs") or
not self.config.has_key("monSuffixes") ):
self.log.error ("Configuration of wrong format")
#TODO forward self.config instead of seperate variables
self.eventDetector = EventDetector(self.config)
else:
self.log.error("Type of event detector is not supported: " + str( self.config["configType"] ))
return -1
self.createSockets()
try:
self.run()
except KeyboardInterrupt:
self.log.debug("Keyboard interruption detected. Shuting down")
except:
trace = traceback.format_exc()
self.log.info("Stopping TaskProvider due to unknown error condition.")
self.log.debug("Error was: " + str(trace))
def getLogger (self):
logger = logging.getLogger("TaskProvider")
return logger
def createSockets (self):
# socket to get requests
self.requestFwSocket = self.context.socket(zmq.REQ)
connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.requestFwPort )
try:
self.requestFwSocket.connect(connectionStr)
self.log.info("Start requestFwSocket (connect): '" + str(connectionStr) + "'")
except Exception as e:
self.log.error("Failed to start requestFwSocket (connect): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
# socket to disribute the events to the worker
self.routerSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.routerPort )
self.routerSocket.bind(connectionStr)
self.log.info("Start to routeributing socket (bind): '" + str(connectionStr) + "'")
self.log.error("Failed to start routeributing Socket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
def run (self):
while True:
try:
# the event for a file /tmp/test/source/local/file1.tif is of the form:
# {
# "sourcePath" : "/tmp/test/source/"
# "relativePath": "local"
# "filename" : "file1.tif"
# }
workloadList = self.eventDetector.getNewEvent()
except Exception, e:
self.log.error("Invalid fileEvent message received.")
self.log.debug("Error was: " + str(e))
#skip all further instructions and continue with next iteration
continue
#TODO validate workload dict
for workload in workloadList:
# get requests for this event
try:
self.log.debug("Get requests...")
self.requestFwSocket.send("")
requests = pickle.loads(self.requestFwSocket.recv())
self.log.debug("Get requests... done.")
self.log.debug("Requests: " + str(requests))
except:
self.log.error("Get Requests... failed.")
trace = traceback.format_exc()
self.log.debug("Error was: " + str(trace))
# build message dict
try:
self.log.debug("Building message dict...")
messageDict = json.dumps(workload) #sets correct escape characters
self.log.debug("Building message dict...done.")
except Exception, e:
self.log.error("Unable to assemble message dict.")
self.log.debug("Error was: " + stri(e))
continue
# send the file to the fileMover
try:
self.log.debug("Sending message...")
message = [messageDict]
if requests != ["None"]:
message.append(pickle.dumps(requests))
self.routerSocket.send_multipart(message)
self.log.debug("Sending message...done.")
except Exception, e:
self.log.error("Sending message...failed.")
self.log.debug("Error was: " + str(e))
def stop(self):
if self.routerSocket:
self.routerSocket.close(0)
self.routerSocket = None
if self.requestFwSocket:
self.requestFwSocket.close(0)
self.requestFwSocket = None
if not self.extContext and self.context:
self.context.destroy()
self.context = None
def __exit__(self):
self.stop()
def __del__(self):
self.stop()
if __name__ == '__main__':
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
SRC_PATH = BASE_PATH + os.sep + "src"
sys.path.append ( SRC_PATH )
import shared.helperScript as helperScript
import time
class requestResponder():
def __init__ (self, requestFwPort, context = None):
self.context = context or zmq.Context.instance()
self.requestFwSocket = self.context.socket(zmq.REP)
connectionStr = "tcp://127.0.0.1:" + requestFwPort
self.requestFwSocket.bind(connectionStr)
logging.info("[requestResponder] requestFwSocket started (bind) for '" + connectionStr + "'")
self.run()
def run (self):
logging.info("[requestResponder] Start run")
openRequests = [['zitpcx19282:6003', 1], ['zitpcx19282:6004', 0]]
while True:
request = self.requestFwSocket.recv()
logging.debug("[requestResponder] Received request: " + str(request) )
self.requestFwSocket.send(pickle.dumps(openRequests))
logging.debug("[requestResponder] Answer: " + str(openRequests) )
def __exit__(self):
self.requestFwSocket.close(0)
self.context.destroy()
helperScript.initLogging("/space/projects/live-viewer/logs/signalHandler.log", verbose=True, onScreenLogLevel="debug")
eventDetectorConfig = {
"configType" : "inotifyx",
"monDir" : "/space/projects/live-viewer/data/source",
"monEventType" : "IN_CLOSE_WRITE",
"monSubdirs" : ["commissioning", "current", "local"],
"monSuffixes" : [".tif", ".cbf"]
}
taskProviderPr = Process ( target = TaskProvider, args = (eventDetectorConfig, requestFwPort, routerPort) )
requestResponderPr = Process ( target = requestResponder, args = ( requestFwPort, ) )
requestResponderPr.start()
context = zmq.Context.instance()
routerSocket = context.socket(zmq.PULL)
connectionStr = "tcp://localhost:" + routerPort
routerSocket.connect(connectionStr)
logging.info("=== routerSocket connected to " + connectionStr)
workload = routerSocket.recv_multipart()
logging.info("=== next workload " + str(workload))
except KeyboardInterrupt:
pass
finally:
requestResponderPr.terminate()
taskProviderPr.terminate()