Skip to content
Snippets Groups Projects
Commit acccecab authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added senderComIp and Exceptions for socket-creation

parent 83b2858c
No related branches found
No related tags found
No related merge requests found
......@@ -14,6 +14,8 @@ liveViewerPort = 50021
# Port to exchange data and signals between receiver and coordinator
coordinatorExchangePort = 50020
# IP of dataStream-socket to send signals back to the sender
senderComIp = lsdma-lab04.desy.de
# Port number of dataStream-socket to send signals back to the sender
senderComPort = 50000
# Time to wait for the sender to give a confirmation of the signal
......
......@@ -30,6 +30,7 @@ def argumentParsing():
liveViewerIp = config.get('asection', 'liveViewerIp')
liveViewerPort = config.get('asection', 'liveViewerPort')
coordinatorExchangePort = config.get('asection', 'coordinatorExchangePort')
senderComIp = config.get('asection', 'senderComIp')
senderComPort = config.get('asection', 'senderComPort')
maxRingBufferSize = config.get('asection', 'maxRingBufferSize')
senderResponseTimeout = config.get('asection', 'senderResponseTimeout')
......@@ -53,6 +54,8 @@ def argumentParsing():
help="tcp port of live viewer (default=" + str(liveViewerPort) + ")")
parser.add_argument("--coordinatorExchangePort", type=str, default=coordinatorExchangePort,
help="port to exchange data and signals between receiver and coordinato (default=" + str(coordinatorExchangePort) + ")")
parser.add_argument("--senderComIp" , type=str, default=senderComIp,
help="port number of dataStream-socket to send signals back to the sender (default=" + str(senderComIp) + ")")
parser.add_argument("--senderComPort" , type=str, default=senderComPort,
help="port number of dataStream-socket to send signals back to the sender (default=" + str(senderComPort) + ")")
parser.add_argument("--maxRingBufferSize" , type=int, default=maxRingBufferSize,
......@@ -90,6 +93,7 @@ class ReceiverLiveViewer():
liveViewerIp = None
liveViewerPort = None
coordinatorExchangePort = None
senderComIp = None
senderComPort = None
maxRingBufferSize = None
senderResponseTimeout = None
......@@ -109,6 +113,7 @@ class ReceiverLiveViewer():
self.liveViewerIp = arguments.liveViewerIp
self.liveViewerPort = arguments.liveViewerPort
self.coordinatorExchangePort = arguments.coordinatorExchangePort
self.senderComIp = arguments.senderComIp
self.senderComPort = arguments.senderComPort
self.maxRingBufferSize = arguments.maxRingBufferSize
self.senderResponseTimeout = arguments.senderResponseTimeout
......@@ -119,8 +124,11 @@ class ReceiverLiveViewer():
#start file receiver
myWorker = FileReceiver(self.targetDir, self.dataStreamIp, self.dataStreamPort, self.liveViewerPort, self.liveViewerIp, self.coordinatorExchangePort, self.senderComPort, self.maxRingBufferSize, self.senderResponseTimeout)
myWorker = FileReceiver(self.targetDir,
self.senderComIp, self.senderComPort,
self.dataStreamIp, self.dataStreamPort,
self.liveViewerPort, self.liveViewerIp,
self.coordinatorExchangePort, self.maxRingBufferSize, self.senderResponseTimeout)
if __name__ == "__main__":
receiver = ReceiverLiveViewer()
......@@ -26,8 +26,8 @@ class FileReceiver:
liveViewerPort = None
coordinatorExchangeIp = None
coordinatorExchangePort = None
senderComIp = None # ip for socket to communicate with receiver
senderComPort = None # port for socket to communicate receiver
senderComIp = None # ip for socket to communicate with the sender
senderComPort = None # port for socket to communicate with the sender
socketResponseTimeout = None # time in milliseconds to wait for the sender to answer to a signal
log = None
......@@ -41,9 +41,12 @@ class FileReceiver:
# print socket.gethostbyname(socket.gethostname())
def __init__(self, outputDir, dataStreamIp, dataStreamPort,
liveViewerPort, liveViewerIp, coordinatorExchangePort, senderComPort,
maxRingBuffersize, senderResponseTimeout = 1000, context = None):
def __init__(self, outputDir,
senderComIp, senderComPort,
dataStreamIp, dataStreamPort,
liveViewerPort, liveViewerIp,
coordinatorExchangePort, maxRingBuffersize, senderResponseTimeout = 1000,
context = None):
self.outputDir = os.path.normpath(outputDir)
self.dataStreamIp = dataStreamIp
......@@ -52,7 +55,7 @@ class FileReceiver:
self.liveViewerPort = liveViewerPort
self.coordinatorExchangeIp = "127.0.0.1"
self.coordinatorExchangePort = coordinatorExchangePort
self.senderComIp = dataStreamIp # ip for socket to communicate with sender; is the same ip as the data stream ip
self.senderComIp = senderComIp
self.senderComPort = senderComPort
self.socketResponseTimeout = senderResponseTimeout
......@@ -135,8 +138,12 @@ class FileReceiver:
# time to wait for the sender to give a confirmation of the signal
# self.senderComSocket.RCVTIMEO = self.socketResponseTimeout
connectionStr = "tcp://{ip}:{port}".format(ip=self.senderComIp, port=self.senderComPort)
self.senderComSocket.connect(connectionStr)
self.log.info("senderComSocket started (connect) for '" + connectionStr + "'")
try:
self.senderComSocket.connect(connectionStr)
self.log.info("senderComSocket started (connect) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start senderComSocket (connect): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
# using a Poller to implement the senderComSocket timeout (in older ZMQ version there is no option RCVTIMEO)
self.poller = zmq.Poller()
......@@ -145,14 +152,22 @@ class FileReceiver:
# create socket to communicate with Coordinator
self.coordinatorExchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStr = "tcp://{ip}:{port}".format(ip=self.coordinatorExchangeIp, port=self.coordinatorExchangePort)
self.coordinatorExchangeSocket.connect(connectionStr)
self.log.debug("coordinatorExchangeSocket started (connect) for '" + connectionStr + "'")
try:
self.coordinatorExchangeSocket.connect(connectionStr)
self.log.debug("coordinatorExchangeSocket started (connect) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start coordinatorExchangeSocket (connect): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
# create sockets to retrieve data from Sender
self.dataStreamSocket = self.zmqContext.socket(zmq.PULL)
connectionStr = "tcp://{ip}:{port}".format(ip=self.dataStreamIp, port=self.dataStreamPort)
self.dataStreamSocket.bind(connectionStr)
self.log.info("dataStreamSocket started (bind) for '" + connectionStr + "'")
try:
self.dataStreamSocket.bind(connectionStr)
self.log.info("dataStreamSocket started (bind) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start dataStreamSocket (bind): '" + connectionStr + "'")
self.log.debug("Error was:" + str(e))
def startReceiving(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment