Skip to content
Snippets Groups Projects
Commit 6c95b1a3 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Sending messages to the receiver always block

parent cfa03528
No related branches found
No related tags found
No related merge requests found
......@@ -41,10 +41,10 @@ class FileReceiver:
# create pull socket
self.zmqDataStreamSocket = self.zmqContext.socket(zmq.PULL)
connectionStrDataStreamSocket = "tcp://{ip}:{port}".format(ip=self.zmqDataStreamIp, port=self.zmqDataStreamPort)
print "connectionStrDataStreamSocket", connectionStrDataStreamSocket
self.zmqDataStreamSocket.bind(connectionStrDataStreamSocket)
self.log.debug("zmqDataStreamSocket started (connect) for '" + connectionStrDataStreamSocket + "'")
connectionStr = "tcp://{ip}:{port}".format(ip=self.zmqDataStreamIp, port=self.zmqDataStreamPort)
print "connectionStr", connectionStr
self.zmqDataStreamSocket.bind(connectionStr)
self.log.debug("zmqDataStreamSocket started (bind) for '" + connectionStr + "'")
try:
self.log.info("Start receiving new files")
......
......@@ -241,7 +241,6 @@ class WorkerProcess():
if self.ondaComSocket in socks and socks[self.ondaComSocket] == zmq.POLLIN:
ondaWorkload = self.ondaComSocket.recv()
self.log.debug("worker-"+str(self.id)+": received new request from onda")
# print "ondaWorkload", ondaWorkload
request = ondaWorkload == b"NEXT_FILE"
if request:
self.requestFromOnda = True
......@@ -368,7 +367,6 @@ class WorkerProcess():
#send message
try:
self.log.debug("Passing multipart-message for file " + str(sourceFilePathFull) + "...")
# print "sending file: ", sourceFilePathFull
chunkNumber = 0
stillChunksToRead = True
payloadAll = []
......@@ -398,7 +396,16 @@ class WorkerProcess():
# send data to the data stream to store it in the storage system
if self.useDataStream:
self.dataStreamSocket.send_multipart(chunkPayload)
print "Sending data to data stream..."
tracker = self.dataStreamSocket.send_multipart(chunkPayload, copy=False, track=True)
if not tracker.done:
self.log.info("Message part from file " + str(sourceFilePathFull) + " has not been sent yet, waiting ...")
tracker.wait()
self.log.info("Message part from file " + str(sourceFilePathFull) + " has not been sent yet, waiting ...done")
print "Sending data to data stream...done"
#send data to the live viewer
if socketDict.has_key("liveViewer"):
......@@ -411,7 +418,6 @@ class WorkerProcess():
#close file
fileDescriptor.close()
# print "sending file: ", sourceFilePathFull, "done"
# self.liveViewerSocket.send_multipart(multipartMessage)
self.log.debug("Passing multipart-message for file " + str(sourceFilePathFull) + "...done.")
......@@ -500,6 +506,7 @@ class WorkerProcess():
self.log.debug("Sending stop signal to cleaner from worker-" + str(self.id))
self.cleanerSocket.send("STOP") #no communication needed because cleaner detects KeyboardInterrupt signals
self.log.debug("Closing sockets for worker " + str(self.id))
self.dataStreamSocket.close(0)
if self.liveViewerSocket:
self.liveViewerSocket.close(0)
self.ondaComSocket.close(0)
......
import zmq
import sys
port = "5556"
#ip="localhost"
ip="*"
#ip="zitpcx19282.desy.de"
context = zmq.Context()
print "Connecting to server..."
socket = context.socket(zmq.PULL)
socket.bind ("tcp://"+ ip + ":%s" % port)
# Do 10 requests, waiting each time for a response
for request in range (1,10):
# Get the reply.
message = socket.recv_multipart()
print "received reply ", request, "[", message, "]"
import zmq
import time
import sys
port = "5556"
#ip = "*"
ip="zitpcx19282.desy.de"
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://" + ip + ":%s" % port)
while True:
message = ["World"]
print "Send: ", message
res = socket.send_multipart(message, copy=False, track=True)
if res.done:
print "res: done"
else:
print "res: waiting"
res.wait()
print "res: waiting..."
print "sleeping..."
time.sleep (1)
print "sleeping...done"
......@@ -3,8 +3,8 @@ import time
import sys
port = "5556"
#ip = "*"
ip="zitpcx19282.desy.de"
ip = "*"
#ip="zitpcx19282.desy.de"
context = zmq.Context()
socket = context.socket(zmq.REP)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment