Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>', 'Marco Strutz <marco.strutz@desy.de>'
import time
import zmq
import sys
import json
import logging
import errno
import os
import traceback
import threading
import socket # needed to get hostname
from Coordinator import Coordinator
#
# -------------------------- class: FileReceiver --------------------------------------
#
class FileReceiver:
zmqContext = None
outputDir = None
zmqDataStreamIp = None
zmqDataStreamPort = None
zmqLiveViewerIp = None
zmqLiveViewerPort = None
exchangeIp = "127.0.0.1"
exchangePort = "6072"
senderComIp = None # ip for socket to communicate with receiver
senderComPort = None # port for socket to communicate receiver
socketResponseTimeout = None # time in milliseconds to wait for the sender to answer to a signal
log = None
# sockets
zmqDataStreamSocket = None # socket to receive the data from
exchangeSocket = None # socket to communicate with Coordinator class
senderComSocket = None # socket to communicate with sender
hostname = socket.gethostname()
# print socket.gethostbyname(socket.gethostname())
def __init__(self, outputDir, zmqDataStreamPort, zmqDataStreamIp, zmqLiveViewerPort, zmqLiveViewerIp, senderComPort,
maxRingBuffersize, senderResponseTimeout = 1000, context = None):
self.outputDir = outputDir
self.zmqDataStreamIp = zmqDataStreamIp
self.zmqDataStreamPort = zmqDataStreamPort
self.zmqLiveViewerIp = zmqLiveViewerIp
self.zmqLiveViewerPort = zmqLiveViewerPort
self.senderComIp = zmqDataStreamIp # ip for socket to communicate with sender; is the same ip as the data stream ip
self.senderComPort = senderComPort
self.socketResponseTimeout = senderResponseTimeout
# if context:
# assert isinstance(context, zmq.sugar.context.Context)
self.zmqContext = context or zmq.Context()
self.log = self.getLogger()
self.log.debug("Init")
# start file receiver
self.receiverThread = threading.Thread(target=Coordinator, args=(self.outputDir, self.zmqDataStreamPort, self.zmqDataStreamIp, self.zmqLiveViewerPort, self.zmqLiveViewerIp, maxRingBuffersize))
self.receiverThread.start()
# create pull socket
self.zmqDataStreamSocket = self.zmqContext.socket(zmq.PULL)
connectionStrDataStreamSocket = "tcp://{ip}:{port}".format(ip=self.zmqDataStreamIp, port=self.zmqDataStreamPort)
print "connectionStrDataStreamSocket", connectionStrDataStreamSocket
self.zmqDataStreamSocket.connect(connectionStrDataStreamSocket)
self.log.debug("zmqDataStreamSocket started (connect) for '" + connectionStrDataStreamSocket + "'")
self.exchangeSocket = self.zmqContext.socket(zmq.PAIR)
connectionStrExchangeSocket = "tcp://{ip}:{port}".format(ip=self.exchangeIp, port=self.exchangePort)
self.exchangeSocket.connect(connectionStrExchangeSocket)
self.log.debug("exchangeSocket started (connect) for '" + connectionStrExchangeSocket + "'")
self.senderComSocket = self.zmqContext.socket(zmq.REQ)
# time to wait for the sender to give a confirmation of the signal
# self.senderComSocket.RCVTIMEO = self.socketResponseTimeout
connectionStrSenderComSocket = "tcp://{ip}:{port}".format(ip=self.senderComIp, port=self.senderComPort)
print "connectionStrSenderComSocket", connectionStrSenderComSocket
self.senderComSocket.connect(connectionStrSenderComSocket)
self.log.debug("senderComSocket started (connect) for '" + connectionStrSenderComSocket + "'")
# using a Poller to implement the senderComSocket timeout because in older ZMQ version there is
self.poller = zmq.Poller()
self.poller.register(self.senderComSocket, zmq.POLLIN)
message = "START_LIVE_VIEWER," + str(self.hostname)
self.log.info("Sending start signal to sender...")
self.log.debug("Sending start signal to sender, message: " + message)
print "sending message ", message
self.senderComSocket.send(str(message))
# self.senderComSocket.send("START_LIVE_VIEWER")
senderMessage = None
socks = dict(self.poller.poll(self.socketResponseTimeout))
if self.senderComSocket in socks and socks[self.senderComSocket] == zmq.POLLIN:
try:
senderMessage = self.senderComSocket.recv()
print "answer to start live viewer: ", senderMessage
self.log.debug("Received message from sender: " + str(senderMessage) )
except KeyboardInterrupt:
self.log.error("KeyboardInterrupt: No message received from sender")
self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext, sendToSender = False)
sys.exit(1)
except Exception as e:
self.log.error("No message received from sender")
self.log.debug("Error was: " + str(e))
self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext, sendToSender = False)
sys.exit(1)
if senderMessage == "START_LIVE_VIEWER":
self.log.info("Received confirmation from sender...start receiving files")
try:
self.log.info("Start receiving new files")
self.startReceiving()
self.log.info("Stopped receiving.")
except Exception, e:
self.log.error("Unknown error while receiving files. Need to abort.")
self.log.debug("Error was: " + str(e))
except:
trace = traceback.format_exc()
self.log.info("Unkown error state. Shutting down...")
self.log.debug("Error was: " + str(trace))
self.zmqContext.destroy()
else:
print "Sending start signal to sender...failed."
self.log.info("Sending start signal to sender...failed.")
self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext, sendToSender = False)
self.log.info("Quitting.")
def getLogger(self):
logger = logging.getLogger("fileReceiver")
return logger
def combineMessage(self, zmqDataStreamSocket):
receivingMessages = True
#save all chunks to file
while receivingMessages:
multipartMessage = zmqDataStreamSocket.recv_multipart()
#extract multipart message
try:
#TODO is string conversion needed here?
payloadMetadata = str(multipartMessage[0])
except:
self.log.error("an empty config was transferred for multipartMessage")
#TODO validate multipartMessage (like correct dict-values for metadata)
self.log.debug("multipartMessage.metadata = " + str(payloadMetadata))
#extraction metadata from multipart-message
payloadMetadataDict = json.loads(payloadMetadata)
#append to file
try:
self.log.debug("append to file based on multipart-message...")
#TODO: save message to file using a thread (avoids blocking)
#TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened
self.appendChunksToFileFromMultipartMessage(payloadMetadataDict, multipartMessage)
self.log.debug("append to file based on multipart-message...success.")
except KeyboardInterrupt:
errorMessage = "KeyboardInterrupt detected. Unable to append multipart-content to file."
self.log.info(errorMessage)
break
except Exception, e:
errorMessage = "Unable to append multipart-content to file."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
self.log.debug("append to file based on multipart-message...failed.")
if len(multipartMessage[1]) < payloadMetadataDict["chunkSize"] :
#indicated end of file. closing file and leave loop
self.log.debug("last file-chunk received. stop appending.")
break
filename = self.generateTargetFilepath(payloadMetadataDict)
fileModTime = payloadMetadataDict["fileModificationTime"]
print "receiving multipart message from data pipe: ", filename
self.log.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename))
# send the file to the coordinator to add it to the ring buffer
message = "AddFile" + str(filename) + ", " + str(fileModTime)
self.log.debug("Send file to coordinator: " + message )
self.exchangeSocket.send(message)
def startReceiving(self):
#run loop, and wait for incoming messages
loopCounter = 0 #counter of total received messages
continueReceiving = True #receiving will stop if value gets False
self.log.debug("Waiting for new messages...")
while continueReceiving:
try:
self.combineMessage(self.zmqDataStreamSocket)
loopCounter+=1
except KeyboardInterrupt:
self.log.debug("Keyboard interrupt detected. Stop receiving.")
continueReceiving = False
break
except:
self.log.error("receive message...failed.")
self.log.error(sys.exc_info())
continueReceiving = False
self.log.info("shutting down receiver...")
try:
self.stopReceiving(self.zmqDataStreamSocket, self.zmqContext)
self.log.debug("shutting down receiver...done.")
except:
self.log.error(sys.exc_info())
self.log.error("shutting down receiver...failed.")
def generateTargetFilepath(self,configDict):
"""
generates full path where target file will saved to.
"""
targetFilename = configDict["filename"]
targetRelativePath = configDict["relativePath"]
if targetRelativePath is '' or targetRelativePath is None:
targetPath = self.outputDir
else:
targetPath = os.path.normpath(self.outputDir + os.sep + targetRelativePath)
targetFilepath = os.path.join(targetPath, targetFilename)
return targetFilepath
def generateTargetPath(self,configDict):
"""
generates path where target file will saved to.
"""
targetRelativePath = configDict["relativePath"]
# if the relative path starts with a slash path.join will consider it as absolute path
if targetRelativePath.startswith("/"):
targetRelativePath = targetRelativePath[1:]
outputDir = self.outputDir
targetPath = os.path.join(outputDir, targetRelativePath)
return targetPath
def appendChunksToFileFromMultipartMessage(self, configDict, multipartMessage):
try:
chunkCount = len(multipartMessage) - 1 #-1 as the first element keeps the dictionary/metadata
payload = multipartMessage[1:]
except:
self.log.warning("an empty file was received within the multipart-message")
payload = None
#generate target filepath
targetFilepath = self.generateTargetFilepath(configDict)
self.log.debug("new file is going to be created at: " + targetFilepath)
#append payload to file
try:
newFile = open(targetFilepath, "a")
except IOError, e:
# errno.ENOENT == "No such file or directory"
if e.errno == errno.ENOENT:
#TODO create subdirectory first, then try to open the file again
try:
targetPath = self.generateTargetPath(configDict)
os.makedirs(targetPath)
newFile = open(targetFilepath, "w")
self.log.info("New target directory created: " + str(targetPath))
except Exception, f:
errorMessage = "unable to save payload to file: '" + targetFilepath + "'"
self.log.error(errorMessage)
self.log.debug("Error was: " + str(f))
self.log.debug("targetPath="+str(targetPath))
raise Exception(errorMessage)
else:
self.log.error("failed to append payload to file: '" + targetFilepath + "'")
self.log.debug("Error was: " + str(e))
except Exception, e:
self.log.error("failed to append payload to file: '" + targetFilepath + "'")
self.log.debug("Error was: " + str(e))
self.log.debug("ErrorTyp: " + str(type(e)))
self.log.debug("e.errno = " + str(e.errno) + " errno.EEXIST==" + str(errno.EEXIST))
#only write data if a payload exist
try:
if payload != None:
for chunk in payload:
newFile.write(chunk)
newFile.close()
# print "received file: ", targetFilepath
except Exception, e:
errorMessage = "unable to append data to file."
self.log.error(errorMessage)
self.log.debug("Error was: " + str(e))
raise Exception(errorMessage)
def stopReceiving(self, zmqDataStreamSocket, zmqContext, sendToSender = True):
self.log.debug("stopReceiving...")
try:
zmqDataStreamSocket.close(0)
self.log.debug("closing zmqDataStreamSocket...done.")
except:
self.log.error("closing zmqDataStreamSocket...failed.")
self.log.error(sys.exc_info())
self.log.debug("sending exit signal to coordinator...")
self.exchangeSocket.send("Exit")
if sendToSender:
self.log.debug("sending stop signal to sender...")
message = "STOP_LIVE_VIEWER,"+ str(self.hostname)
print "sending message ", message
self.senderComSocket.send(str(message), zmq.NOBLOCK)
socks = dict(self.poller.poll(self.socketResponseTimeout))
if self.senderComSocket in socks and socks[self.senderComSocket] == zmq.POLLIN:
try:
senderMessage = self.senderComSocket.recv()
print "answer to stop live viewer: ", senderMessage
self.log.debug("Received message from sender: " + str(senderMessage) )
if senderMessage == "STOP_LIVE_VIEWER":
self.log.info("Received confirmation from sender...")
else:
self.log.error("Received confirmation from sender...failed")
except KeyboardInterrupt:
self.log.error("KeyboardInterrupt: No message received from sender")
except Exception as e:
self.log.error("sending stop signal to sender...failed.")
self.log.debug("Error was: " + str(e))
# give the signal time to arrive
time.sleep(0.1)
self.log.debug("closing signal communication sockets...")
self.exchangeSocket.close(0)
self.senderComSocket.close(0)
self.log.debug("closing signal communication sockets...done")
try:
zmqContext.destroy()
self.log.debug("closing zmqContext...done.")
except:
self.log.error("closing zmqContext...failed.")
self.log.error(sys.exc_info())