Skip to content
Snippets Groups Projects
Commit 546ce4af authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added debug information

parent 4e32914b
No related branches found
No related tags found
No related merge requests found
......@@ -8,6 +8,7 @@ sys.path.append ( ZMQ_PATH )
import helperScript
#LOCAL_IP= "0.0.0.0"
LOCAL_IP= "127.0.0.1"
class defaultConfigSender():
......@@ -24,8 +25,8 @@ class defaultConfigSender():
# zmq endpoint (port) to send file events to
fileEventPort = "6060"
# ip of dataStream-socket to push new files to
dataStreamIp = LOCAL_IP
# dataStreamIp = "131.169.185.121" # zitpcx19282.desy.de
# dataStreamIp = LOCAL_IP
dataStreamIp = "0.0.0.0"
# port number of dataStream-socket to push new files to
dataStreamPort = "6061"
# zmq-pull-socket ip which deletes/moves given files
......@@ -65,10 +66,10 @@ class defaultConfigReceiver():
# where incoming data will be stored to"
targetDir = "/space/projects/live-viewer/data/zmq_target"
# local ip to bind dataStream to
dataStreamIp = LOCAL_IP
# dataStreamIp = "*"
# local ip to connect dataStream to
# dataStreamIp = LOCAL_IP
# dataStreamIp = "131.169.55.170" # lsdma-lab04.desy.de
dataStreamIp = "131.169.185.121" # zitpcx19282.desy.de
# tcp port of data pipe"
dataStreamPort = "6061"
# local ip to bind LiveViewer to
......
......@@ -73,9 +73,10 @@ class FileReceiver:
# create pull socket
self.zmqDataStreamSocket = self.zmqContext.socket(zmq.PULL)
connectionStrZmqSocket = "tcp://{ip}:{port}".format(ip=self.zmqDataStreamIp, port=self.zmqDataStreamPort)
self.zmqDataStreamSocket.connect(connectionStrZmqSocket)
self.log.debug("zmqDataStreamSocket started (bind) for '" + connectionStrZmqSocket + "'")
connectionStrDataStreamSocket = "tcp://{ip}:{port}".format(ip=self.zmqDataStreamIp, port=self.zmqDataStreamPort)
print "connectionStrDataSTreamSocket", connectionStrDataStreamSocket
self.zmqDataStreamSocket.connect(connectionStrDataStreamSocket)
self.log.debug("zmqDataStreamSocket started (connect) for '" + connectionStrZmqSocket + "'")
self.exchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStrExchangeSocket = "tcp://{ip}:{port}".format(ip=self.exchangeIp, port=self.exchangePort)
......@@ -86,6 +87,7 @@ class FileReceiver:
# time to wait for the sender to give a confirmation of the signal
self.senderComSocket.RCVTIMEO = self.socketResponseTimeout
connectionStrSenderComSocket = "tcp://{ip}:{port}".format(ip=self.senderComIp, port=self.senderComPort)
print "connectionStrSenderComSocket", connectionStrSenderComSocket
self.senderComSocket.connect(connectionStrSenderComSocket)
self.log.debug("senderComSocket started (connect) for '" + connectionStrSenderComSocket + "'")
......@@ -136,6 +138,7 @@ class FileReceiver:
#save all chunks to file
while receivingMessages:
multipartMessage = zmqDataStreamSocket.recv_multipart()
print "receiving multipart message from data pipe"
#extract multipart message
try:
......
......@@ -65,6 +65,7 @@ class WorkerProcess():
self.zmqDataStreamSocket = self.zmqContextForWorker.socket(zmq.PUSH)
connectionStrDataStreamSocket = "tcp://{ip}:{port}".format(ip=self.dataStreamIp, port=self.dataStreamPort)
print "connectionStrDataStreamSocket", connectionStrDataStreamSocket
self.zmqDataStreamSocket.bind(connectionStrDataStreamSocket)
self.log.debug("zmqDataStreamSocket started for '" + connectionStrDataStreamSocket + "'")
......@@ -198,9 +199,9 @@ class WorkerProcess():
#send remove-request to message pipe
try:
#sending to pipe
self.log.debug("send file-event to cleaner-pipe...")
self.log.debug("send file-event for file " + str(sourcePath) + str(relativePath) + str(filename) + " to cleaner-pipe...")
self.cleanerSocket.send(workload)
self.log.debug("send file-event to cleaner-pipe...success.")
self.log.debug("send file-event for file " + str(sourcePath) + str(relativePath) + str(filename) + " to cleaner-pipe...success.")
#TODO: remember workload. append to list?
# can be used to verify files which have been processed twice or more
......@@ -263,7 +264,7 @@ class WorkerProcess():
#send message
try:
self.log.debug("Passing multipart-message...")
self.log.info("Passing multipart-message for file " + str(sourceFilePathFull) + "...")
print "sending file: ", sourceFilePathFull
chunkNumber = 0
stillChunksToRead = True
......@@ -297,9 +298,9 @@ class WorkerProcess():
print "sending file: ", sourceFilePathFull, "done"
# self.zmqDataStreamSocket.send_multipart(multipartMessage)
self.log.debug("Passing multipart-message...done.")
self.log.info("Passing multipart-message for file " + str(sourceFilePathFull) + "...done.")
except Exception, e:
self.log.error("Unable to send multipart-message")
self.log.error("Unable to send multipart-message for file " + str(sourceFilePathFull))
self.log.debug("Error was: " + str(e))
self.log.info("Passing multipart-message...failed.")
raise Exception(e)
......@@ -435,14 +436,15 @@ class FileMover():
self.fileEventSocket = self.zmqContext.socket(zmq.PULL)
connectionStrFileEventSocket = "tcp://{ip}:{port}".format(ip=self.fileEventIp, port=self.fileEventPort)
self.fileEventSocket.bind(connectionStrFileEventSocket)
self.log.debug("fileEventSocket started for '" + connectionStrFileEventSocket + "'")
self.log.debug("fileEventSocket started (bind) for '" + connectionStrFileEventSocket + "'")
# create zmq socket for communitation with receiver
self.receiverComSocket = self.zmqContext.socket(zmq.REP)
connectionStrReceiverComSocket = "tcp://{ip}:{port}".format(ip=self.receiverComIp, port=self.receiverComPort)
print "connectionStrReceiverComSocket", connectionStrReceiverComSocket
self.receiverComSocket.bind(connectionStrReceiverComSocket)
self.log.debug("receiverComSocket started for '" + connectionStrReceiverComSocket + "'")
self.log.debug("receiverComSocket started (bind) for '" + connectionStrReceiverComSocket + "'")
# Poller to get either messages from the watcher or communication messages to stop sending data to the live viewer
self.poller = zmq.Poller()
......@@ -457,7 +459,7 @@ class FileMover():
self.routerSocket = self.zmqContext.socket(zmq.ROUTER)
connectionStrRouterSocket = "tcp://{ip}:{port}".format(ip=routerIp, port=routerPort)
self.routerSocket.bind(connectionStrRouterSocket)
self.log.debug("routerSocket started for '" + connectionStrRouterSocket + "'")
self.log.debug("routerSocket started (bind) for '" + connectionStrRouterSocket + "'")
def process(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment