Skip to content
Snippets Groups Projects
Commit 95503df1 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Stopping thread in receiver correctly

parent 9c49b522
No related branches found
No related tags found
No related merge requests found
......@@ -494,8 +494,8 @@ class FileMover():
self.dataStreamIp,
self.dataStreamPort,
self.chunkSize,
zmqCleanerIp,
zmqCleanerPort,
self.zmqCleanerIp,
self.zmqCleanerPort,
fileWaitTimeInMs,
fileMaxWaitTimeInMs))
workerThreadList.append(newWorkerThread)
......
......@@ -24,12 +24,15 @@ class FileReceiver:
zmqDataStreamPort = None
zmqLiveViewerIp = None
zmqLiveViewerPort = None
zmqLiveViewerExchangeIp = "127.0.0.1"
zmqLiveViewerExchangePort = "6072"
ringBuffer = []
maxRingBufferSize = 200
timeToWaitForRingBuffer = 2
# sockets
zmqSocket = None
exchangeSocket = None
def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, context = None):
......@@ -50,6 +53,12 @@ class FileReceiver:
self.zmqSocket.bind(connectionStrZmqSocket)
logging.debug("zmqSocket started for '" + connectionStrZmqSocket + "'")
self.exchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStrExchangeSocket = "tcp://" + self.zmqLiveViewerExchangeIp + ":%s" % self.zmqLiveViewerExchangePort
self.exchangeSocket.bind(connectionStrExchangeSocket)
logging.debug("exchangeSocket started (bind)for '" + connectionStrExchangeSocket + "'")
# thread to communicate with live viewer
self.liveViewerThread = threading.Thread(target=self.sendFileToLiveViewer)
self.liveViewerThread.start()
......@@ -100,48 +109,76 @@ class FileReceiver:
def sendFileToLiveViewer(self):
# create socket for live viewer
zmqLiveViewerSocket = self.zmqContext.socket(zmq.REP)
zmqLiveViewerSocket = self.zmqContext.socket(zmq.REP)
connectionStrZmqLiveViewerSocket = "tcp://" + self.zmqLiveViewerIp + ":%s" % self.zmqLiveViewerPort
zmqLiveViewerSocket.bind(connectionStrZmqLiveViewerSocket)
logging.debug("zmqLiveViewerSocket started for '" + connectionStrZmqLiveViewerSocket + "'")
exchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStrExchangeSocket = "tcp://" + self.zmqLiveViewerExchangeIp + ":%s" % self.zmqLiveViewerExchangePort
exchangeSocket.connect(connectionStrExchangeSocket)
logging.debug("exchangeSocket started (connect) for '" + connectionStrExchangeSocket + "'")
poller = zmq.Poller()
poller.register(zmqLiveViewerSocket, zmq.POLLIN)
poller.register(exchangeSocket, zmq.POLLIN)
should_continue = True
while should_continue:
socks = dict(poller.poll())
if exchangeSocket in socks and socks[exchangeSocket] == zmq.POLLIN:
message = exchangeSocket.recv()
logging.debug("Recieved control command: %s" % message )
if message == "Exit":
logging.debug("Recieved exit command, liveViewer thread will stop recieving messages")
should_continue = False
break
# if there is a request of the live viewer:
while True:
# Wait for next request from client
try:
if zmqLiveViewerSocket in socks and socks[zmqLiveViewerSocket] == zmq.POLLIN:
message = zmqLiveViewerSocket.recv()
except zmq.error.ContextTerminated:
break
print "Received request: ", message
time.sleep (1)
# send first element in ring buffer to live viewer (the path of this file is the second entry)
if self.ringBuffer:
try:
zmqLiveViewerSocket.send(self.ringBuffer[0][1])
print self.ringBuffer[0][1]
except zmq.error.ContextTerminated:
break
else:
logging.debug("Call for next file... ")
# send first element in ring buffer to live viewer (the path of this file is the second entry)
if self.ringBuffer:
answer = self.ringBuffer[0][1]
else:
answer = "None"
print answer
try:
zmqLiveViewerSocket.send("None")
print self.ringBuffer
zmqLiveViewerSocket.send(answer)
except zmq.error.ContextTerminated:
break
logging.debug("LiveViewerThread: closing socket")
zmqLiveViewerSocket.close()
exchangeSocket.close()
def combineMessage(self, zmqSocket):
receivingMessages = True
#save all chunks to file
while True:
while receivingMessages:
multipartMessage = zmqSocket.recv_multipart()
#extract multipart message
try:
#TODO is string conversion needed here?
payloadMetadata = str(multipartMessage[0])
except:
logging.error("an empty config was transferred for multipartMessage")
#TODO validate multipartMessage (like correct dict-values for metadata)
logging.debug("multipartMessage.metadata = " + str(payloadMetadata))
#extraction metadata from multipart-message
payloadMetadataDict = json.loads(payloadMetadata)
#append to file
try:
logging.debug("append to file based on multipart-message...")
#TODO: save message to file using a thread (avoids blocking)
#TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened
self.appendChunksToFileFromMultipartMessage(multipartMessage)
self.appendChunksToFileFromMultipartMessage(payloadMetadataDict, multipartMessage)
logging.debug("append to file based on multipart-message...success.")
except Exception, e:
errorMessage = "Unable to append multipart-content to file."
......@@ -152,12 +189,10 @@ class FileReceiver:
errorMessage = "Unable to append multipart-content to file. Unknown Error."
logging.error(errorMessage)
logging.debug("append to file based on multipart-message...failed.")
if len(multipartMessage[1]) == 0:
if len(multipartMessage[1]) < payloadMetadataDict["chunkSize"] :
#indicated end of file. closing file and leave loop
logging.debug("last file-chunk received. stop appending.")
break
payloadMetadata = str(multipartMessage[0])
payloadMetadataDict = json.loads(payloadMetadata)
filename = self.generateTargetFilepath(payloadMetadataDict)
fileModTime = payloadMetadataDict["fileModificationTime"]
logging.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename))
......@@ -166,6 +201,7 @@ class FileReceiver:
# logging.debug("message-length: " + str(len(multipartMessage)))
# add to ring buffer
logging.debug("add file to ring buffer: "+ str(filename) + ", " + str(fileModTime))
self.addFileToRingBuffer(str(filename), fileModTime)
......@@ -232,13 +268,8 @@ class FileReceiver:
return targetPath
def appendChunksToFileFromMultipartMessage(self, multipartMessage):
def appendChunksToFileFromMultipartMessage(self, configDict, multipartMessage):
#extract multipart message
try:
configDictJson = multipartMessage[0]
except:
logging.error("an empty config was transferred for multipartMessage")
try:
chunkCount = len(multipartMessage) - 1 #-1 as the first element keeps the dictionary/metadata
payload = multipartMessage[1:]
......@@ -246,12 +277,6 @@ class FileReceiver:
logging.warning("an empty file was received within the multipart-message")
payload = None
#TODO validate multipartMessage (like correct dict-values for metadata)
logging.debug("multipartMessage.metadata = " + str(configDictJson))
#extraction metadata from multipart-message
configDict = json.loads(configDictJson)
#generate target filepath
targetFilepath = self.generateTargetFilepath(configDict)
......@@ -294,7 +319,6 @@ class FileReceiver:
raise Exception(errorMessage)
def stopReceiving(self, zmqSocket, zmqContext):
logging.debug("stopReceiving...")
......@@ -305,6 +329,12 @@ class FileReceiver:
logging.error("closing zmqSocket...failed.")
logging.error(sys.exc_info())
logging.debug("sending exit signal to thread...")
self.exchangeSocket.send("Exit")
time.sleep(0.01)
self.exchangeSocket.close()
logging.debug("sending exit signal to thread...done")
try:
zmqContext.destroy()
logging.debug("closing zmqContext...done.")
......@@ -313,9 +343,6 @@ class FileReceiver:
logging.error(sys.exc_info())
def argumentParsing():
parser = argparse.ArgumentParser()
......
......@@ -12,7 +12,6 @@ import helperScript
# class MyHandler(PatternMatchingEventHandler):
class DirectoryWatcherHandler():
patterns = ["*"]
zmqContext = None
......@@ -25,27 +24,19 @@ class DirectoryWatcherHandler():
def __init__(self, zmqContext, fileEventServerIp, watchFolder, fileEventServerPort):
logging.debug("DirectoryWatcherHandler: __init__()")
# logging.debug("DirectoryWatcherHandler(): type(zmqContext) = " + str(type(zmqContext)))
logging.info("registering zmq global context")
self.globalZmqContext = zmqContext
logging.info("registering zmq context")
self.zmqContext = zmqContext
self.watchFolder = os.path.normpath(watchFolder)
self.fileEventServerIp = fileEventServerIp
self.fileEventServerPort = fileEventServerPort
#create zmq sockets
self.messageSocket = self.createPushSocket(self.globalZmqContext, fileEventServerPort)
def createPushSocket(self, context, fileEventServerPort):
assert isinstance(context, zmq.sugar.context.Context)
assert isinstance(self.zmqContext, zmq.sugar.context.Context)
socket = context.socket(zmq.PUSH)
zmqSocketStr = 'tcp://' + self.fileEventServerIp + ':' + str(fileEventServerPort)
#create zmq sockets
self.messageSocket = zmqContext.socket(zmq.PUSH)
zmqSocketStr = "tcp://" + self.fileEventServerIp + ":" + str(self.fileEventServerPort)
self.messageSocket.connect(zmqSocketStr)
logging.debug("Connecting to ZMQ socket: " + str(zmqSocketStr))
socket.connect(zmqSocketStr)
return socket
def passFileToZeromq(self, filepath, targetPath):
......@@ -370,8 +361,8 @@ if __name__ == '__main__':
workers = zmqContext.socket(zmq.PULL)
zmqSocketStr = 'tcp://' + communicationWithLcyncdIp + ':' + communicationWithLcyncdPort
logging.debug("Connecting to ZMQ socket: " + str(zmqSocketStr))
workers.bind(zmqSocketStr)
logging.debug("Bind to lcyncd ZMQ socket: " + str(zmqSocketStr))
try:
while True:
......
......@@ -64,13 +64,13 @@ zmqPort = "6080"
if supported_file:
# set up ZeroMQ
# set up ZeroMQ
zmqContext = zmq.Context()
socket = zmqContext.socket(zmq.PUSH)
zmqSocketStr = 'tcp://' + zmqIp + ':' + zmqPort
logging.debug( "Connecting to ZMQ socket: " + str(zmqSocketStr))
socket.connect(zmqSocketStr)
logging.debug( "Connecting to ZMQ socket: " + str(zmqSocketStr))
#send reply back to server
workload = { "filepath": source, "targetPath": target }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment