Skip to content
Snippets Groups Projects
Commit 71422ade authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Fixed DataDispatcher

parent e10d87f5
No related branches found
No related tags found
No related merge requests found
......@@ -8,9 +8,11 @@ import sys
import logging
import traceback
import json
import pickle
#path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
SHARED_PATH = os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ) ) + os.sep + "shared"
print SHARED_PATH
if not SHARED_PATH in sys.path:
sys.path.append ( SHARED_PATH )
......@@ -23,17 +25,17 @@ import helperScript
#
class DataDispatcher():
def __init__(self, id, dataStreamPort, chunkSize, cleanerPort,
useDataStream, context = None):
def __init__(self, id, routerPort, chunkSize, useDataStream, context = None):
self.log = self.getLogger()
self.id = id
self.log = self.getLogger()
self.log.debug("DataDispatcher Nr. " + str(self.id) + " started.")
self.id = id
self.localhost = "127.0.0.1"
self.extIp = "0.0.0.0"
self.routerPort = routerPort
self.chunkSize = chunkSize
self.localhost = "127.0.0.1"
self.extIp = "0.0.0.0"
self.routerPort = routerPort
self.chunkSize = chunkSize
self.routerSocket = None
......@@ -62,7 +64,7 @@ class DataDispatcher():
def createSockets(self):
self.routerSocket = self.context.socket(zmq.PULL)
connectionStr = "tcp://{ip}:{port}".format( ip=self.localhost, port=self.routerPort )
routerSocket.connect(connectionStr)
self.routerSocket.connect(connectionStr)
self.log.info("Start routerSocket (connect): '" + str(connectionStr) + "'")
......@@ -102,7 +104,7 @@ class DataDispatcher():
if len(message) >= 2:
workload = message[0]
targets = message[1:]
targets = pickle.loads(message[1])
else:
workload = message
targets = None
......@@ -117,7 +119,7 @@ class DataDispatcher():
self.log.debug("Getting file metadata")
sourceFile, metadata = self.getMetadata(workload)
except Exception as e:
self.log.error("Building of metadata dictionary failed for file: " + str(sourceFile) + ".")
self.log.error("Building of metadata dictionary failed for workload: " + str(workload) + ".")
self.log.debug("Error was: " + str(e))
#skip all further instructions and continue with next iteration
continue
......@@ -138,7 +140,7 @@ class DataDispatcher():
self.log.debug ("IOError: " + str(sourceFile))
except Exception, e:
trace = traceback.format_exc()
self.log.debug("Unable to remove file {FILE}.".format(FILE=str(sourceFile))
self.log.debug("Unable to remove file {FILE}.".format(FILE=str(sourceFile)))
self.log.debug("trace=" + str(trace))
else:
# move file
......@@ -149,7 +151,7 @@ class DataDispatcher():
self.log.debug ("IOError: " + str(sourceFile))
except Exception, e:
trace = traceback.format_exc()
self.log.debug("Unable to move file {FILE}.".format(FILE=str(sourceFile))
self.log.debug("Unable to move file {FILE}.".format(FILE=str(sourceFile)))
self.log.debug("trace=" + str(trace))
......@@ -227,7 +229,7 @@ class DataDispatcher():
# }
metadata[ "filesize" ] = filesize
metadata[ "fileModificationTime" ] = fileModificationTime
metadata[ "chunkSize" ] = self.zmqMessageChunkSize
metadata[ "chunkSize" ] = self.chunkSize
self.log.debug("metadata = " + str(metadata))
except Exception as e:
......@@ -253,12 +255,12 @@ class DataDispatcher():
self.log.debug("Passing multipart-message for file " + str(sourceFilepath) + "...")
chunkNumber = 0
stillChunksToRead = True
payloadAll = [json.dumps(payloadMetadata.copy())]
payloadAll = [json.dumps(metadata.copy())]
while stillChunksToRead:
chunkNumber += 1
#read next chunk from file
fileContent = fileDescriptor.read(self.zmqMessageChunkSize)
fileContent = fileDescriptor.read(self.chunkSize)
#detect if end of file has been reached
if not fileContent:
......@@ -269,7 +271,7 @@ class DataDispatcher():
break
#assemble metadata for zmq-message
chunkPayloadMetadata = payloadMetadata.copy()
chunkPayloadMetadata = metadata.copy()
chunkPayloadMetadata["chunkNumber"] = chunkNumber
chunkPayloadMetadataJson = json.dumps(chunkPayloadMetadata)
chunkPayload = []
......@@ -288,23 +290,26 @@ class DataDispatcher():
# streaming data
#TODO priority
for target in targets:
for target, prio in targets:
# socket already known
if target in self.openConnections:
# send data
self.connections[target].send_multipart(chunkPayload, zmq.NOBLOCK)
self.openConnections[target].send_multipart(chunkPayload, zmq.NOBLOCK)
self.log.info("Sending message part from file " + str(sourceFilepath) + " to " + target)
# socket not known
else:
# open socket
socket = self.zmqContextForWorker.socket(zmq.PUSH)
socket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://" + str(target)
socket.connect(connectionStr)
self.log.info("Start socket (connect): '" + str(connectionStr) + "'")
# register socket
self.openConnections[target] = socket
# send data
self.connections[target].send_multipart(chunkPayload, zmq.NOBLOCK)
self.openConnections[target].send_multipart(chunkPayload, zmq.NOBLOCK)
self.log.info("Sending message part from file " + str(sourceFilepath) + " to " + target)
#close file
......@@ -355,4 +360,55 @@ class DataDispatcher():
self.stop()
if __name__ == '__main__':
pass
from multiprocessing import Process
import time
from shutil import copyfile
#enable logging
helperScript.initLogging("/space/projects/live-viewer/logs/dataDispatcher.log", verbose=True, onScreenLogLevel="debug")
copyfile("/space/projects/live-viewer/data/source/local/raw/1.cbf", "/space/projects/live-viewer/data/source/local/raw/100.cbf")
time.sleep(0.5)
routerPort = "7000"
receivingPort = "6005"
chunkSize = 10485760 ; # = 1024*1024*10 = 10 MiB
useDataStream = False
dataDispatcherPr = Process ( target = DataDispatcher, args = ( 1, routerPort, chunkSize, useDataStream) )
dataDispatcherPr.start()
context = zmq.Context.instance()
routerSocket = context.socket(zmq.PUSH)
connectionStr = "tcp://127.0.0.1:" + routerPort
routerSocket.bind(connectionStr)
logging.info("=== routerSocket connected to " + connectionStr)
receivingSocket = context.socket(zmq.PULL)
connectionStr = "tcp://0.0.0.0:" + receivingPort
receivingSocket.bind(connectionStr)
logging.info("=== receivingSocket connected to " + connectionStr)
message = ['{"sourcePath": "/space/projects/live-viewer/data/source", "relativePath": "/local/raw", "filename": "100.cbf"}', "(lp0\n(lp1\nS'zitpcx19282:6005'\np2\naI2\naa."]
time.sleep(1)
workload = routerSocket.send_multipart(message)
logging.info("=== send message")
try:
recv_message = receivingSocket.recv_multipart()
logging.info("=== received: " + str(recv_message[0]))
except KeyboardInterrupt:
dataDispatcherPr.terminate()
routerSocket.close(0)
context.destroy()
finally:
dataDispatcherPr.terminate()
routerSocket.close(0)
context.destroy()
......@@ -224,7 +224,7 @@ if __name__ == '__main__':
self.context.destroy()
#enable logging
helperScript.initLogging("/space/projects/live-viewer/logs/signalHandler.log", verbose=True, onScreenLogLevel="debug")
helperScript.initLogging("/space/projects/live-viewer/logs/taskProvider.log", verbose=True, onScreenLogLevel="debug")
eventDetectorConfig = {
"configType" : "inotifyx",
......@@ -243,10 +243,10 @@ if __name__ == '__main__':
requestResponderPr = Process ( target = requestResponder, args = ( requestFwPort, ) )
requestResponderPr.start()
context = zmq.Context.instance()
context = zmq.Context.instance()
routerSocket = context.socket(zmq.PULL)
connectionStr = "tcp://localhost:" + routerPort
routerSocket = context.socket(zmq.PULL)
connectionStr = "tcp://localhost:" + routerPort
routerSocket.connect(connectionStr)
logging.info("=== routerSocket connected to " + connectionStr)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment