dataTransferAPI.py 22.4 KB
Newer Older
Manuela Kuhn's avatar
Manuela Kuhn committed
1
2
# API to communicate with a data transfer unit

Manuela Kuhn's avatar
Manuela Kuhn committed
3
__version__ = '2.1.4'
Manuela Kuhn's avatar
Manuela Kuhn committed
4
5
6
7
8
9
10
11

import zmq
import socket
import logging
import json
import errno
import os
import cPickle
Manuela Kuhn's avatar
Manuela Kuhn committed
12
import traceback
13
from zmq.auth.thread import ThreadAuthenticator
Manuela Kuhn's avatar
Manuela Kuhn committed
14
15


Manuela Kuhn's avatar
Manuela Kuhn committed
16
class loggingFunction:
Manuela Kuhn's avatar
Manuela Kuhn committed
17
    def out (self, x, exc_info = None):
Manuela Kuhn's avatar
Manuela Kuhn committed
18
19
20
21
        if exc_info:
            print x, traceback.format_exc()
        else:
            print x
Manuela Kuhn's avatar
Manuela Kuhn committed
22
    def __init__ (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
23
24
25
26
27
28
29
        self.debug    = lambda x, exc_info=None: self.out(x, exc_info)
        self.info     = lambda x, exc_info=None: self.out(x, exc_info)
        self.warning  = lambda x, exc_info=None: self.out(x, exc_info)
        self.error    = lambda x, exc_info=None: self.out(x, exc_info)
        self.critical = lambda x, exc_info=None: self.out(x, exc_info)


30
31
32
33
34
35
36
37
38
39
40
class noLoggingFunction:
    def out (self, x, exc_info = None):
        pass
    def __init__ (self):
        self.debug    = lambda x, exc_info=None: self.out(x, exc_info)
        self.info     = lambda x, exc_info=None: self.out(x, exc_info)
        self.warning  = lambda x, exc_info=None: self.out(x, exc_info)
        self.error    = lambda x, exc_info=None: self.out(x, exc_info)
        self.critical = lambda x, exc_info=None: self.out(x, exc_info)


41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class NotSupported(Exception):
    pass

class FormatError(Exception):
    pass

class ConnectionFailed(Exception):
    pass

class VersionError(Exception):
    pass

class AuthenticationFailed(Exception):
    pass

class CommunicationFailed(Exception):
    pass

class DataSavingError(Exception):
    pass


Manuela Kuhn's avatar
Manuela Kuhn committed
63
class dataTransfer():
Manuela Kuhn's avatar
Manuela Kuhn committed
64
    def __init__ (self, connectionType, signalHost = None, useLog = False, context = None):
Manuela Kuhn's avatar
Manuela Kuhn committed
65
66
67

        if useLog:
            self.log = logging.getLogger("dataTransferAPI")
68
69
        elif useLog == None:
            self.log = noLoggingFunction()
Manuela Kuhn's avatar
Manuela Kuhn committed
70
71
72
73
74
75
76
        else:
            self.log = loggingFunction()

        # ZMQ applications always start by creating a context,
        # and then using that for creating sockets
        # (source: ZeroMQ, Messaging for Many Applications by Pieter Hintjens)
        if context:
77
78
79
80
81
            self.context    = context
            self.extContext = True
        else:
            self.context    = zmq.Context()
            self.extContext = False
Manuela Kuhn's avatar
Manuela Kuhn committed
82
83
84


        self.signalHost            = signalHost
85
86
87
88
89
90
91
92
        self.signalPort            = "50000"
        self.requestPort           = "50001"
        self.dataHost              = None
        self.dataPort              = None

        self.signalSocket          = None
        self.dataSocket            = None
        self.requestSocket         = None
93

94
        self.poller                = zmq.Poller()
95

96
97
        self.auth                  = None

98
99
        self.targets               = None

100
        self.supportedConnections = ["stream", "streamMetadata", "queryNext", "queryMetadata"]
101

Manuela Kuhn's avatar
Manuela Kuhn committed
102
        self.signalExchanged       = None
103
104
105
106

        self.streamStarted         = None
        self.queryNextStarted      = None

Manuela Kuhn's avatar
Manuela Kuhn committed
107
108
        self.socketResponseTimeout = 1000

109
110
111
        if connectionType in self.supportedConnections:
            self.connectionType = connectionType
        else:
112
            raise NotSupported("Chosen type of connection is not supported.")
Manuela Kuhn's avatar
Manuela Kuhn committed
113
114
115


    # targets: [host, port, prio] or [[host, port, prio], ...]
Manuela Kuhn's avatar
Manuela Kuhn committed
116
    def initiate (self, targets):
Manuela Kuhn's avatar
Manuela Kuhn committed
117
118
119

        if type(targets) != list:
            self.stop()
120
            raise FormatError("Argument 'targets' must be list.")
Manuela Kuhn's avatar
Manuela Kuhn committed
121

122
        if not self.context:
123
124
            self.context    = zmq.Context()
            self.extContext = False
Manuela Kuhn's avatar
Manuela Kuhn committed
125
126
127
128

        signal = None
        # Signal exchange
        if self.connectionType == "stream":
Manuela Kuhn's avatar
Manuela Kuhn committed
129
            signalPort = self.signalPort
Manuela Kuhn's avatar
Manuela Kuhn committed
130
            signal     = "START_STREAM"
131
132
133
        elif self.connectionType == "streamMetadata":
            signalPort = self.signalPort
            signal     = "START_STREAM_METADATA"
Manuela Kuhn's avatar
Manuela Kuhn committed
134
        elif self.connectionType == "queryNext":
Manuela Kuhn's avatar
Manuela Kuhn committed
135
            signalPort = self.signalPort
Manuela Kuhn's avatar
Manuela Kuhn committed
136
            signal     = "START_QUERY_NEXT"
137
138
139
        elif self.connectionType == "queryMetadata":
            signalPort = self.signalPort
            signal     = "START_QUERY_METADATA"
Manuela Kuhn's avatar
Manuela Kuhn committed
140
141
142
143
144
145
146
147

        self.log.debug("Create socket for signal exchange...")


        if self.signalHost:
            self.__createSignalSocket(signalPort)
        else:
            self.stop()
148
            raise ConnectionFailed("No host to send signal to specified." )
Manuela Kuhn's avatar
Manuela Kuhn committed
149

Manuela Kuhn's avatar
Manuela Kuhn committed
150
        self.targets = []
Manuela Kuhn's avatar
Manuela Kuhn committed
151
        # [host, port, prio]
Manuela Kuhn's avatar
Manuela Kuhn committed
152
153
154
        if len(targets) == 3 and type(targets[0]) != list and type(targets[1]) != list and type(targets[2]) != list:
            host, port, prio = targets
            self.targets = [[host + ":" + port, prio]]
Manuela Kuhn's avatar
Manuela Kuhn committed
155
156
        # [[host, port, prio], ...]
        else:
Manuela Kuhn's avatar
Manuela Kuhn committed
157
            for t in targets:
158
                if type(t) == list and len(t) == 3:
Manuela Kuhn's avatar
Manuela Kuhn committed
159
160
                    host, port, prio = t
                    self.targets.append([host + ":" + port, prio])
Manuela Kuhn's avatar
Manuela Kuhn committed
161
162
                else:
                    self.stop()
Manuela Kuhn's avatar
Manuela Kuhn committed
163
                    self.log.debug("targets=" + str(targets))
164
                    raise FormatError("Argument 'targets' is of wrong format.")
Manuela Kuhn's avatar
Manuela Kuhn committed
165
166
167
168
169
170
171
172
173
174

#        if type(dataPort) == list:
#            self.dataHost = str([socket.gethostname() for i in dataPort])
#        else:
#            self.dataHost = socket.gethostname()

        message = self.__sendSignal(signal)

        if message and message == "VERSION_CONFLICT":
            self.stop()
175
            raise VersionError("Versions are conflicting.")
Manuela Kuhn's avatar
Manuela Kuhn committed
176
177
178

        elif message and message == "NO_VALID_HOST":
            self.stop()
179
            raise AuthenticationFailed("Host is not allowed to connect.")
Manuela Kuhn's avatar
Manuela Kuhn committed
180
181
182

        elif message and message == "CONNECTION_ALREADY_OPEN":
            self.stop()
183
            raise CommunicationFailed("Connection is already open.")
Manuela Kuhn's avatar
Manuela Kuhn committed
184
185
186

        elif message and message == "NO_VALID_SIGNAL":
            self.stop()
187
            raise CommunicationFailed("Connection type is not supported for this kind of sender.")
Manuela Kuhn's avatar
Manuela Kuhn committed
188
189
190
191

        # if there was no response or the response was of the wrong format, the receiver should be shut down
        elif message and message.startswith(signal):
            self.log.info("Received confirmation ...")
Manuela Kuhn's avatar
Manuela Kuhn committed
192
            self.signalExchanged = signal
Manuela Kuhn's avatar
Manuela Kuhn committed
193
194

        else:
195
            raise CommunicationFailed("Sending start signal ...failed.")
Manuela Kuhn's avatar
Manuela Kuhn committed
196
197


Manuela Kuhn's avatar
Manuela Kuhn committed
198
    def __createSignalSocket (self, signalPort):
Manuela Kuhn's avatar
Manuela Kuhn committed
199
200
201
202
203
204
205
206
207
208
209

        # To send a notification that a Displayer is up and running, a communication socket is needed
        # create socket to exchange signals with Sender
        self.signalSocket = self.context.socket(zmq.REQ)

        # time to wait for the sender to give a confirmation of the signal
#        self.signalSocket.RCVTIMEO = self.socketResponseTimeout
        connectionStr = "tcp://" + str(self.signalHost) + ":" + str(signalPort)
        try:
            self.signalSocket.connect(connectionStr)
            self.log.info("signalSocket started (connect) for '" + connectionStr + "'")
210
211
        except:
            self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'")
Manuela Kuhn's avatar
Manuela Kuhn committed
212
213
214
215
216
217
            raise

        # using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO)
        self.poller.register(self.signalSocket, zmq.POLLIN)


218

Manuela Kuhn's avatar
Manuela Kuhn committed
219
    def __sendSignal (self, signal):
Manuela Kuhn's avatar
Manuela Kuhn committed
220

Manuela Kuhn's avatar
Manuela Kuhn committed
221
222
223
        if not signal:
            return

Manuela Kuhn's avatar
Manuela Kuhn committed
224
225
226
        # Send the signal that the communication infrastructure should be established
        self.log.info("Sending Signal")

Manuela Kuhn's avatar
Manuela Kuhn committed
227
        sendMessage = [__version__,  signal]
Manuela Kuhn's avatar
Manuela Kuhn committed
228
229
230
231
232
233
234
235
236

        trg = cPickle.dumps(self.targets)
        sendMessage.append(trg)

#        sendMessage = [__version__, signal, self.dataHost, self.dataPort]

        self.log.debug("Signal: " + str(sendMessage))
        try:
            self.signalSocket.send_multipart(sendMessage)
Manuela Kuhn's avatar
Manuela Kuhn committed
237
        except:
238
            self.log.error("Could not send signal")
Manuela Kuhn's avatar
Manuela Kuhn committed
239
240
241
242
243
            raise

        message = None
        try:
            socks = dict(self.poller.poll(self.socketResponseTimeout))
Manuela Kuhn's avatar
Manuela Kuhn committed
244
        except:
245
            self.log.error("Could not poll for new message")
Manuela Kuhn's avatar
Manuela Kuhn committed
246
247
248
249
250
251
252
253
254
255
            raise


        # if there was a response
        if self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
            try:
                #  Get the reply.
                message = self.signalSocket.recv()
                self.log.info("Received answer to signal: " + str(message) )

Manuela Kuhn's avatar
Manuela Kuhn committed
256
            except:
257
                self.log.error("Could not receive answer to signal")
Manuela Kuhn's avatar
Manuela Kuhn committed
258
259
260
261
262
                raise

        return message


263
264
265
266
    def start (self, dataSocket = False, whitelist = None):

        # Receive data only from whitelisted nodes
        if whitelist:
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
            if type(whitelist) == list:
                self.auth = ThreadAuthenticator(self.context)
                self.auth.start()
                for host in whitelist:
                    try:
                        if host == "localhost":
                            ip = [socket.gethostbyname(host)]
                        else:
                            hostname, tmp, ip = socket.gethostbyaddr(host)

                        self.log.debug("Allowing host " + host + " (" + str(ip[0]) + ")")
                        self.auth.allow(ip[0])
                    except:
                        self.log.error("Error was: ", exc_info=True)
                        raise AuthenticationFailed("Could not get IP of host " + host)
            else:
                raise FormatError("Whitelist has to be a list of IPs")

Manuela Kuhn's avatar
Manuela Kuhn committed
285

286
        socketIdToConnect = self.streamStarted or self.queryNextStarted
Manuela Kuhn's avatar
Manuela Kuhn committed
287

288
289
290
        if socketIdToConnect:
            self.log.info("Reopening already started connection.")
        else:
Manuela Kuhn's avatar
Manuela Kuhn committed
291

292
            ip   = "0.0.0.0"           #TODO use IP of hostname?
Manuela Kuhn's avatar
Manuela Kuhn committed
293

294
295
            host = ""
            port = ""
Manuela Kuhn's avatar
Manuela Kuhn committed
296

297
298
299
300
301
302
303
304
305
306
307
308
309
310
            if dataSocket:
                if type(dataSocket) == list:
                    socketIdToConnect = dataSocket[0] + ":" + dataSocket[1]
                    host = dataSocket[0]
                    ip   = socket.gethostbyaddr(host)[2][0]
                    port = dataSocket[1]
                else:
                    port = str(dataSocket)

                    host = socket.gethostname()
                    socketId = host + ":" + port
                    ipFromHost = socket.gethostbyaddr(host)[2]
                    if len(ipFromHost) == 1:
                        ip = ipFromHost[0]
Manuela Kuhn's avatar
Manuela Kuhn committed
311

312
313
            elif len(self.targets) == 1:
                host, port = self.targets[0][0].split(":")
Manuela Kuhn's avatar
Manuela Kuhn committed
314
315
316
317
                ipFromHost = socket.gethostbyaddr(host)[2]
                if len(ipFromHost) == 1:
                    ip = ipFromHost[0]

318
319
            else:
                raise FormatError("Multipe possible ports. Please choose which one to use.")
Manuela Kuhn's avatar
Manuela Kuhn committed
320

321
322
323
            socketId = host + ":" + port
            socketIdToConnect = ip + ":" + port
#            socketIdToConnect = "[" + ip + "]:" + port
Manuela Kuhn's avatar
Manuela Kuhn committed
324

Manuela Kuhn's avatar
Manuela Kuhn committed
325

Manuela Kuhn's avatar
Manuela Kuhn committed
326
327
        self.dataSocket = self.context.socket(zmq.PULL)
        # An additional socket is needed to establish the data retriving mechanism
Manuela Kuhn's avatar
Manuela Kuhn committed
328
        connectionStr = "tcp://" + socketIdToConnect
329
        self.dataSocket.zap_domain = b'global'
330

Manuela Kuhn's avatar
Manuela Kuhn committed
331
        try:
332
#            self.dataSocket.ipv6 = True
Manuela Kuhn's avatar
Manuela Kuhn committed
333
            self.dataSocket.bind(connectionStr)
334
#            self.dataSocket.bind("tcp://[2003:ce:5bc0:a600:fa16:54ff:fef4:9fc0]:50102")
335
            self.log.info("Data socket of type " + self.connectionType + " started (bind) for '" + connectionStr + "'")
Manuela Kuhn's avatar
Manuela Kuhn committed
336
337
        except:
            self.log.error("Failed to start Socket of type " + self.connectionType + " (bind): '" + connectionStr + "'", exc_info=True)
338
            raise
Manuela Kuhn's avatar
Manuela Kuhn committed
339

340
        self.poller.register(self.dataSocket, zmq.POLLIN)
Manuela Kuhn's avatar
Manuela Kuhn committed
341

342
        if self.connectionType in ["queryNext", "queryMetadata"]:
Manuela Kuhn's avatar
Manuela Kuhn committed
343
344
345

            self.requestSocket = self.context.socket(zmq.PUSH)
            # An additional socket is needed to establish the data retriving mechanism
Manuela Kuhn's avatar
Manuela Kuhn committed
346
            connectionStr = "tcp://" + self.signalHost + ":" + self.requestPort
Manuela Kuhn's avatar
Manuela Kuhn committed
347
348
            try:
                self.requestSocket.connect(connectionStr)
349
                self.log.info("Request socket started (connect) for '" + connectionStr + "'")
Manuela Kuhn's avatar
Manuela Kuhn committed
350
351
            except:
                self.log.error("Failed to start Socket of type " + self.connectionType + " (connect): '" + connectionStr + "'", exc_info=True)
352
                raise
Manuela Kuhn's avatar
Manuela Kuhn committed
353

Manuela Kuhn's avatar
Manuela Kuhn committed
354
            self.queryNextStarted = socketId
Manuela Kuhn's avatar
Manuela Kuhn committed
355
356
357
358
359
360
361
362
363
364
        else:
            self.streamStarted    = socketId


    ##
    #
    # Receives or queries for new files depending on the connection initialized
    #
    # returns either
    #   the newest file
365
    #       (if connection type "queryNext" or "stream" was choosen)
Manuela Kuhn's avatar
Manuela Kuhn committed
366
    #   the path of the newest file
367
    #       (if connection type "queryMetadata" or "streamMetadata" was choosen)
Manuela Kuhn's avatar
Manuela Kuhn committed
368
369
    #
    ##
370
    def get (self, timeout=None):
Manuela Kuhn's avatar
Manuela Kuhn committed
371
372
373

        if not self.streamStarted and not self.queryNextStarted:
            self.log.info("Could not communicate, no connection was initialized.")
Manuela Kuhn's avatar
Manuela Kuhn committed
374
            return None, None
Manuela Kuhn's avatar
Manuela Kuhn committed
375
376
377
378
379
380
381

        if self.queryNextStarted :

            sendMessage = ["NEXT", self.queryNextStarted]
            try:
                self.requestSocket.send_multipart(sendMessage)
            except Exception as e:
Manuela Kuhn's avatar
Manuela Kuhn committed
382
383
                self.log.error("Could not send request to requestSocket", exc_info=True)
                return None, None
Manuela Kuhn's avatar
Manuela Kuhn committed
384

385
386
387
388
389
390
391
392
393
394
395
396
397
398
        while True:
            # receive data
            if timeout:
                try:
                    socks = dict(self.poller.poll(timeout))
                except:
                    self.log.error("Could not poll for new message")
                    raise
            else:
                try:
                    socks = dict(self.poller.poll())
                except:
                    self.log.error("Could not poll for new message")
                    raise
Manuela Kuhn's avatar
Manuela Kuhn committed
399

400
401
            # if there was a response
            if self.dataSocket in socks and socks[self.dataSocket] == zmq.POLLIN:
402

403
404
405
                try:
                    multipartMessage = self.dataSocket.recv_multipart()
                except:
root's avatar
root committed
406
                    self.log.error("Receiving data..failed.", exc_info=True)
407
408
                    return [None, None]

root's avatar
root committed
409
410

                if multipartMessage[0] == b"ALIVE_TEST":
411
412
413
                    continue
                elif len(multipartMessage) < 2:
                    self.log.error("Received mutipart-message is too short. Either config or file content is missing.")
root's avatar
root committed
414
                    self.log.debug("multipartMessage=" + str(mutipartMessage)[:100])
415
416
417
418
419
420
421
422
                    return [None, None]

                # extract multipart message
                try:
                    metadata = cPickle.loads(multipartMessage[0])
                except:
                    self.log.error("Could not extract metadata from the multipart-message.", exc_info=True)
                    metadata = None
Manuela Kuhn's avatar
Manuela Kuhn committed
423

424
                #TODO validate multipartMessage (like correct dict-values for metadata)
Manuela Kuhn's avatar
Manuela Kuhn committed
425

426
427
428
429
430
                try:
                    payload = multipartMessage[1]
                except:
                    self.log.warning("An empty file was received within the multipart-message", exc_info=True)
                    payload = None
Manuela Kuhn's avatar
Manuela Kuhn committed
431

432
433
434
                return [metadata, payload]
            else:
                self.log.warning("Could not receive data in the given time.")
Manuela Kuhn's avatar
Manuela Kuhn committed
435

436
437
438
439
440
                if self.queryNextStarted :
                    try:
                        self.requestSocket.send_multipart(["CANCEL", self.queryNextStarted])
                    except Exception as e:
                        self.log.error("Could not cancel the next query", exc_info=True)
Manuela Kuhn's avatar
Manuela Kuhn committed
441

442
                return [None, None]
Manuela Kuhn's avatar
Manuela Kuhn committed
443
444


Manuela Kuhn's avatar
Manuela Kuhn committed
445
    def store (self, targetBasePath, dataObject):
Manuela Kuhn's avatar
Manuela Kuhn committed
446
447

        if type(dataObject) is not list and len(dataObject) != 2:
448
            raise FormatError("Wrong input type for 'store'")
Manuela Kuhn's avatar
Manuela Kuhn committed
449
450
451
452
453

        payloadMetadata   = dataObject[0]
        payload           = dataObject[1]


Manuela Kuhn's avatar
Manuela Kuhn committed
454
        if type(payloadMetadata) is not dict:
455
            raise FormatError("payload: Wrong input format in 'store'")
Manuela Kuhn's avatar
Manuela Kuhn committed
456
457
458
459

        #save all chunks to file
        while True:

Manuela Kuhn's avatar
Manuela Kuhn committed
460
            #TODO check if payload != cPickle.dumps(None) ?
Manuela Kuhn's avatar
Manuela Kuhn committed
461
462
463
464
465
466
467
468
469
            if payloadMetadata and payload:
                #append to file
                try:
                    self.log.debug("append to file based on multipart-message...")
                    #TODO: save message to file using a thread (avoids blocking)
                    #TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened
                    self.__appendChunksToFile(targetBasePath, payloadMetadata, payload)
                    self.log.debug("append to file based on multipart-message...success.")
                except KeyboardInterrupt:
Manuela Kuhn's avatar
Manuela Kuhn committed
470
                    self.log.info("KeyboardInterrupt detected. Unable to append multipart-content to file.")
Manuela Kuhn's avatar
Manuela Kuhn committed
471
472
                    break
                except Exception, e:
Manuela Kuhn's avatar
Manuela Kuhn committed
473
474
                    self.log.error("Unable to append multipart-content to file.", exc_info=True)
                    self.log.debug("Append to file based on multipart-message...failed.")
Manuela Kuhn's avatar
Manuela Kuhn committed
475
476
477
478

                if len(payload) < payloadMetadata["chunkSize"] :
                    #indicated end of file. Leave loop
                    filename    = self.generateTargetFilepath(targetBasePath, payloadMetadata)
479
                    fileModTime = payloadMetadata["fileModTime"]
Manuela Kuhn's avatar
Manuela Kuhn committed
480
481
482
483
484
485

                    self.log.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename))
                    break

            try:
                [payloadMetadata, payload] = self.get()
Manuela Kuhn's avatar
Manuela Kuhn committed
486
487
            except:
                self.log.error("Getting data failed.", exc_info=True)
Manuela Kuhn's avatar
Manuela Kuhn committed
488
489
490
                break


Manuela Kuhn's avatar
Manuela Kuhn committed
491
    def __appendChunksToFile (self, targetBasePath, configDict, payload):
Manuela Kuhn's avatar
Manuela Kuhn committed
492
493
494
495
496
497
498
499
500
501
502
503
504

        #generate target filepath
        targetFilepath = self.generateTargetFilepath(targetBasePath, configDict)
        self.log.debug("new file is going to be created at: " + targetFilepath)


        #append payload to file
        try:
            newFile = open(targetFilepath, "a")
        except IOError, e:
            # errno.ENOENT == "No such file or directory"
            if e.errno == errno.ENOENT:
                try:
505
                    #TODO do not create commissioning, current, local
Manuela Kuhn's avatar
Manuela Kuhn committed
506
507
508
509
                    targetPath = self.__generateTargetPath(targetBasePath, configDict)
                    os.makedirs(targetPath)
                    newFile = open(targetFilepath, "w")
                    self.log.info("New target directory created: " + str(targetPath))
Manuela Kuhn's avatar
Manuela Kuhn committed
510
                except:
511
                    self.log.error("Unable to save payload to file: '" + targetFilepath + "'")
Manuela Kuhn's avatar
Manuela Kuhn committed
512
                    self.log.debug("targetPath:" + str(targetPath))
Manuela Kuhn's avatar
Manuela Kuhn committed
513
                    raise
Manuela Kuhn's avatar
Manuela Kuhn committed
514
            else:
515
516
                self.log.error("Failed to append payload to file: '" + targetFilepath + "'")
                raise
Manuela Kuhn's avatar
Manuela Kuhn committed
517
        except:
518
519
520
            self.log.error("Failed to append payload to file: '" + targetFilepath + "'")
#            self.log.debug("e.errno = " + str(e.errno) + "        errno.EEXIST==" + str(errno.EEXIST))
            raise
Manuela Kuhn's avatar
Manuela Kuhn committed
521
522
523
524

        #only write data if a payload exist
        try:
            if payload != None:
Manuela Kuhn's avatar
Manuela Kuhn committed
525
                newFile.write(payload)
Manuela Kuhn's avatar
Manuela Kuhn committed
526
            newFile.close()
Manuela Kuhn's avatar
Manuela Kuhn committed
527
        except:
528
            self.log.error("Unable to append data to file.")
Manuela Kuhn's avatar
Manuela Kuhn committed
529
            raise
Manuela Kuhn's avatar
Manuela Kuhn committed
530
531


Manuela Kuhn's avatar
Manuela Kuhn committed
532
    def generateTargetFilepath (self, basePath, configDict):
Manuela Kuhn's avatar
Manuela Kuhn committed
533
534
535
536
        """
        generates full path where target file will saved to.

        """
537
538
539
        if not configDict:
            return None

Manuela Kuhn's avatar
Manuela Kuhn committed
540
        filename     = configDict["filename"]
541
542
543
        #TODO This is due to Windows path names, check if there has do be done anything additionally to work
        # e.g. check sourcePath if it's a windows path
        relativePath = configDict["relativePath"].replace('\\', os.sep)
Manuela Kuhn's avatar
Manuela Kuhn committed
544
545
546
547
548
549
550
551
552
553
554

        if relativePath is '' or relativePath is None:
            targetPath = basePath
        else:
            targetPath = os.path.normpath(basePath + os.sep + relativePath)

        filepath =  os.path.join(targetPath, filename)

        return filepath


Manuela Kuhn's avatar
Manuela Kuhn committed
555
    def __generateTargetPath (self, basePath, configDict):
Manuela Kuhn's avatar
Manuela Kuhn committed
556
557
558
559
        """
        generates path where target file will saved to.

        """
560
561
562
        #TODO This is due to Windows path names, check if there has do be done anything additionally to work
        # e.g. check sourcePath if it's a windows path
        relativePath = configDict["relativePath"].replace('\\', os.sep)
Manuela Kuhn's avatar
Manuela Kuhn committed
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577

        # if the relative path starts with a slash path.join will consider it as absolute path
        if relativePath.startswith("/"):
            relativePath = relativePath[1:]

        targetPath = os.path.join(basePath, relativePath)

        return targetPath


    ##
    #
    # Send signal that the displayer is quitting, close ZMQ connections, destoying context
    #
    ##
Manuela Kuhn's avatar
Manuela Kuhn committed
578
    def stop (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
579
580
581
582
        if self.signalSocket and self.signalExchanged:
            self.log.info("Sending close signal")
            signal = None
            if self.streamStarted or ( "STREAM" in self.signalExchanged):
Manuela Kuhn's avatar
Manuela Kuhn committed
583
                signal = "STOP_STREAM"
584
            elif self.queryNextStarted or ( "QUERY" in self.signalExchanged):
Manuela Kuhn's avatar
Manuela Kuhn committed
585
586
                signal = "STOP_QUERY_NEXT"

Manuela Kuhn's avatar
Manuela Kuhn committed
587

Manuela Kuhn's avatar
Manuela Kuhn committed
588
589
590
            message = self.__sendSignal(signal)
            #TODO need to check correctness of signal?

591
592
593
            self.streamStarted    = None
            self.queryNextStarted = None

Manuela Kuhn's avatar
Manuela Kuhn committed
594
595
596
597
598
599
600
601
602
        try:
            if self.signalSocket:
                self.log.info("closing signalSocket...")
                self.signalSocket.close(linger=0)
                self.signalSocket = None
            if self.dataSocket:
                self.log.info("closing dataSocket...")
                self.dataSocket.close(linger=0)
                self.dataSocket = None
603
604
605
606
            if self.requestSocket:
                self.log.info("closing requestSocket...")
                self.requestSocket.close(linger=0)
                self.requestSocket = None
Manuela Kuhn's avatar
Manuela Kuhn committed
607
608
        except:
            self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
Manuela Kuhn's avatar
Manuela Kuhn committed
609

610
611
612
613
614
615
616
617
        if self.auth:
            try:
                self.auth.stop()
                self.auth = None
                self.log.info("Stopping authentication thread...done.")
            except:
                self.log.error("Stopping authentication thread...done.", exc_info=True)

Manuela Kuhn's avatar
Manuela Kuhn committed
618
619
        # if the context was created inside this class,
        # it has to be destroyed also within the class
620
        if not self.extContext and self.context:
Manuela Kuhn's avatar
Manuela Kuhn committed
621
            try:
622
                self.log.info("Closing ZMQ context...")
Manuela Kuhn's avatar
Manuela Kuhn committed
623
                self.context.destroy(0)
624
625
                self.context = None
                self.log.info("Closing ZMQ context...done.")
Manuela Kuhn's avatar
Manuela Kuhn committed
626
            except:
627
                self.log.error("Closing ZMQ context...failed.", exc_info=True)
Manuela Kuhn's avatar
Manuela Kuhn committed
628
629


Manuela Kuhn's avatar
Manuela Kuhn committed
630
    def __exit__ (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
631
632
633
        self.stop()


Manuela Kuhn's avatar
Manuela Kuhn committed
634
    def __del__ (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
635
636
637
        self.stop()