dataTransferAPI.py 24.9 KB
Newer Older
Manuela Kuhn's avatar
Manuela Kuhn committed
1
2
# API to communicate with a data transfer unit

Manuela Kuhn's avatar
Manuela Kuhn committed
3
__version__ = '2.3.0'
Manuela Kuhn's avatar
Manuela Kuhn committed
4
5
6
7
8
9
10
11

import zmq
import socket
import logging
import json
import errno
import os
import cPickle
Manuela Kuhn's avatar
Manuela Kuhn committed
12
import traceback
13
from zmq.auth.thread import ThreadAuthenticator
Manuela Kuhn's avatar
Manuela Kuhn committed
14
15


Manuela Kuhn's avatar
Manuela Kuhn committed
16
class loggingFunction:
Manuela Kuhn's avatar
Manuela Kuhn committed
17
    def out (self, x, exc_info = None):
Manuela Kuhn's avatar
Manuela Kuhn committed
18
19
20
21
        if exc_info:
            print x, traceback.format_exc()
        else:
            print x
Manuela Kuhn's avatar
Manuela Kuhn committed
22
    def __init__ (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
23
24
25
26
27
28
29
        self.debug    = lambda x, exc_info=None: self.out(x, exc_info)
        self.info     = lambda x, exc_info=None: self.out(x, exc_info)
        self.warning  = lambda x, exc_info=None: self.out(x, exc_info)
        self.error    = lambda x, exc_info=None: self.out(x, exc_info)
        self.critical = lambda x, exc_info=None: self.out(x, exc_info)


30
31
32
33
34
35
36
37
38
39
40
class noLoggingFunction:
    def out (self, x, exc_info = None):
        pass
    def __init__ (self):
        self.debug    = lambda x, exc_info=None: self.out(x, exc_info)
        self.info     = lambda x, exc_info=None: self.out(x, exc_info)
        self.warning  = lambda x, exc_info=None: self.out(x, exc_info)
        self.error    = lambda x, exc_info=None: self.out(x, exc_info)
        self.critical = lambda x, exc_info=None: self.out(x, exc_info)


41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class NotSupported(Exception):
    pass

class FormatError(Exception):
    pass

class ConnectionFailed(Exception):
    pass

class VersionError(Exception):
    pass

class AuthenticationFailed(Exception):
    pass

class CommunicationFailed(Exception):
    pass

class DataSavingError(Exception):
    pass


Manuela Kuhn's avatar
Manuela Kuhn committed
63
class dataTransfer():
Manuela Kuhn's avatar
Manuela Kuhn committed
64
    def __init__ (self, connectionType, signalHost = None, useLog = False, context = None):
Manuela Kuhn's avatar
Manuela Kuhn committed
65
66
67

        if useLog:
            self.log = logging.getLogger("dataTransferAPI")
68
69
        elif useLog == None:
            self.log = noLoggingFunction()
Manuela Kuhn's avatar
Manuela Kuhn committed
70
71
72
73
74
75
76
        else:
            self.log = loggingFunction()

        # ZMQ applications always start by creating a context,
        # and then using that for creating sockets
        # (source: ZeroMQ, Messaging for Many Applications by Pieter Hintjens)
        if context:
77
78
79
80
81
            self.context    = context
            self.extContext = True
        else:
            self.context    = zmq.Context()
            self.extContext = False
Manuela Kuhn's avatar
Manuela Kuhn committed
82
83
84


        self.signalHost            = signalHost
85
86
87
88
89
90
91
92
        self.signalPort            = "50000"
        self.requestPort           = "50001"
        self.dataHost              = None
        self.dataPort              = None

        self.signalSocket          = None
        self.dataSocket            = None
        self.requestSocket         = None
93

94
        self.poller                = zmq.Poller()
95

96
97
        self.auth                  = None

98
99
        self.targets               = None

100
        self.supportedConnections = ["stream", "streamMetadata", "queryNext", "queryMetadata"]
101

Manuela Kuhn's avatar
Manuela Kuhn committed
102
        self.signalExchanged       = None
103
104
105
106

        self.streamStarted         = None
        self.queryNextStarted      = None

Manuela Kuhn's avatar
Manuela Kuhn committed
107
108
        self.socketResponseTimeout = 1000

109
110
111
        if connectionType in self.supportedConnections:
            self.connectionType = connectionType
        else:
112
            raise NotSupported("Chosen type of connection is not supported.")
Manuela Kuhn's avatar
Manuela Kuhn committed
113
114
115


    # targets: [host, port, prio] or [[host, port, prio], ...]
Manuela Kuhn's avatar
Manuela Kuhn committed
116
    def initiate (self, targets):
Manuela Kuhn's avatar
Manuela Kuhn committed
117
118
119

        if type(targets) != list:
            self.stop()
120
            raise FormatError("Argument 'targets' must be list.")
Manuela Kuhn's avatar
Manuela Kuhn committed
121

122
        if not self.context:
123
124
            self.context    = zmq.Context()
            self.extContext = False
Manuela Kuhn's avatar
Manuela Kuhn committed
125
126
127
128

        signal = None
        # Signal exchange
        if self.connectionType == "stream":
Manuela Kuhn's avatar
Manuela Kuhn committed
129
            signalPort = self.signalPort
Manuela Kuhn's avatar
Manuela Kuhn committed
130
            signal     = "START_STREAM"
131
132
133
        elif self.connectionType == "streamMetadata":
            signalPort = self.signalPort
            signal     = "START_STREAM_METADATA"
Manuela Kuhn's avatar
Manuela Kuhn committed
134
        elif self.connectionType == "queryNext":
Manuela Kuhn's avatar
Manuela Kuhn committed
135
            signalPort = self.signalPort
Manuela Kuhn's avatar
Manuela Kuhn committed
136
            signal     = "START_QUERY_NEXT"
137
138
139
        elif self.connectionType == "queryMetadata":
            signalPort = self.signalPort
            signal     = "START_QUERY_METADATA"
Manuela Kuhn's avatar
Manuela Kuhn committed
140
141
142
143
144
145
146
147

        self.log.debug("Create socket for signal exchange...")


        if self.signalHost:
            self.__createSignalSocket(signalPort)
        else:
            self.stop()
148
            raise ConnectionFailed("No host to send signal to specified." )
Manuela Kuhn's avatar
Manuela Kuhn committed
149

150
        self.__setTargets (targets)
Manuela Kuhn's avatar
Manuela Kuhn committed
151
152
153
154
155

        message = self.__sendSignal(signal)

        if message and message == "VERSION_CONFLICT":
            self.stop()
156
            raise VersionError("Versions are conflicting.")
Manuela Kuhn's avatar
Manuela Kuhn committed
157
158
159

        elif message and message == "NO_VALID_HOST":
            self.stop()
160
            raise AuthenticationFailed("Host is not allowed to connect.")
Manuela Kuhn's avatar
Manuela Kuhn committed
161
162
163

        elif message and message == "CONNECTION_ALREADY_OPEN":
            self.stop()
164
            raise CommunicationFailed("Connection is already open.")
Manuela Kuhn's avatar
Manuela Kuhn committed
165
166
167

        elif message and message == "NO_VALID_SIGNAL":
            self.stop()
168
            raise CommunicationFailed("Connection type is not supported for this kind of sender.")
Manuela Kuhn's avatar
Manuela Kuhn committed
169
170
171
172

        # if there was no response or the response was of the wrong format, the receiver should be shut down
        elif message and message.startswith(signal):
            self.log.info("Received confirmation ...")
Manuela Kuhn's avatar
Manuela Kuhn committed
173
            self.signalExchanged = signal
Manuela Kuhn's avatar
Manuela Kuhn committed
174
175

        else:
176
            raise CommunicationFailed("Sending start signal ...failed.")
Manuela Kuhn's avatar
Manuela Kuhn committed
177
178


Manuela Kuhn's avatar
Manuela Kuhn committed
179
    def __createSignalSocket (self, signalPort):
Manuela Kuhn's avatar
Manuela Kuhn committed
180
181
182
183
184
185
186
187
188
189
190

        # To send a notification that a Displayer is up and running, a communication socket is needed
        # create socket to exchange signals with Sender
        self.signalSocket = self.context.socket(zmq.REQ)

        # time to wait for the sender to give a confirmation of the signal
#        self.signalSocket.RCVTIMEO = self.socketResponseTimeout
        connectionStr = "tcp://" + str(self.signalHost) + ":" + str(signalPort)
        try:
            self.signalSocket.connect(connectionStr)
            self.log.info("signalSocket started (connect) for '" + connectionStr + "'")
191
192
        except:
            self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'")
Manuela Kuhn's avatar
Manuela Kuhn committed
193
194
195
196
197
198
            raise

        # using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO)
        self.poller.register(self.signalSocket, zmq.POLLIN)


199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
    def __setTargets (self, targets):
        self.targets = []

        # [host, port, prio]
        if len(targets) == 3 and type(targets[0]) != list and type(targets[1]) != list and type(targets[2]) != list:
            host, port, prio = targets
            self.targets = [[host + ":" + port, prio, [""]]]

        # [host, port, prio, suffixes]
        elif len(targets) == 4 and type(targets[0]) != list and type(targets[1]) != list and type(targets[2]) != list and type(targets[3]) == list:
            host, port, prio, suffixes = targets
            self.targets = [[host + ":" + port, prio, suffixes]]

        # [[host, port, prio], ...] or [[host, port, prio, suffixes], ...]
        else:
            for t in targets:
                if type(t) == list and len(t) == 3:
                    host, port, prio = t
                    self.targets.append([host + ":" + port, prio, [""]])
                elif type(t) == list and len(t) == 4 and type(t[3]):
                    host, port, prio, suffixes = t
                    self.targets.append([host + ":" + port, prio, suffixes])
                else:
                    self.stop()
                    self.log.debug("targets=" + str(targets))
                    raise FormatError("Argument 'targets' is of wrong format.")

226

Manuela Kuhn's avatar
Manuela Kuhn committed
227
    def __sendSignal (self, signal):
Manuela Kuhn's avatar
Manuela Kuhn committed
228

Manuela Kuhn's avatar
Manuela Kuhn committed
229
230
231
        if not signal:
            return

Manuela Kuhn's avatar
Manuela Kuhn committed
232
233
234
        # Send the signal that the communication infrastructure should be established
        self.log.info("Sending Signal")

Manuela Kuhn's avatar
Manuela Kuhn committed
235
        sendMessage = [__version__,  signal]
Manuela Kuhn's avatar
Manuela Kuhn committed
236
237
238
239
240
241
242
243
244

        trg = cPickle.dumps(self.targets)
        sendMessage.append(trg)

#        sendMessage = [__version__, signal, self.dataHost, self.dataPort]

        self.log.debug("Signal: " + str(sendMessage))
        try:
            self.signalSocket.send_multipart(sendMessage)
Manuela Kuhn's avatar
Manuela Kuhn committed
245
        except:
246
            self.log.error("Could not send signal")
Manuela Kuhn's avatar
Manuela Kuhn committed
247
248
249
250
251
            raise

        message = None
        try:
            socks = dict(self.poller.poll(self.socketResponseTimeout))
Manuela Kuhn's avatar
Manuela Kuhn committed
252
        except:
253
            self.log.error("Could not poll for new message")
Manuela Kuhn's avatar
Manuela Kuhn committed
254
255
256
257
258
259
260
261
262
263
            raise


        # if there was a response
        if self.signalSocket in socks and socks[self.signalSocket] == zmq.POLLIN:
            try:
                #  Get the reply.
                message = self.signalSocket.recv()
                self.log.info("Received answer to signal: " + str(message) )

Manuela Kuhn's avatar
Manuela Kuhn committed
264
            except:
265
                self.log.error("Could not receive answer to signal")
Manuela Kuhn's avatar
Manuela Kuhn committed
266
267
268
269
270
                raise

        return message


271
272
273
274
    def start (self, dataSocket = False, whitelist = None):

        # Receive data only from whitelisted nodes
        if whitelist:
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
            if type(whitelist) == list:
                self.auth = ThreadAuthenticator(self.context)
                self.auth.start()
                for host in whitelist:
                    try:
                        if host == "localhost":
                            ip = [socket.gethostbyname(host)]
                        else:
                            hostname, tmp, ip = socket.gethostbyaddr(host)

                        self.log.debug("Allowing host " + host + " (" + str(ip[0]) + ")")
                        self.auth.allow(ip[0])
                    except:
                        self.log.error("Error was: ", exc_info=True)
                        raise AuthenticationFailed("Could not get IP of host " + host)
            else:
                raise FormatError("Whitelist has to be a list of IPs")

Manuela Kuhn's avatar
Manuela Kuhn committed
293

294
        socketIdToConnect = self.streamStarted or self.queryNextStarted
Manuela Kuhn's avatar
Manuela Kuhn committed
295

296
297
298
        if socketIdToConnect:
            self.log.info("Reopening already started connection.")
        else:
Manuela Kuhn's avatar
Manuela Kuhn committed
299

300
            ip   = "0.0.0.0"           #TODO use IP of hostname?
Manuela Kuhn's avatar
Manuela Kuhn committed
301

302
303
            host = ""
            port = ""
Manuela Kuhn's avatar
Manuela Kuhn committed
304

305
306
307
308
309
310
311
312
313
314
315
316
317
318
            if dataSocket:
                if type(dataSocket) == list:
                    socketIdToConnect = dataSocket[0] + ":" + dataSocket[1]
                    host = dataSocket[0]
                    ip   = socket.gethostbyaddr(host)[2][0]
                    port = dataSocket[1]
                else:
                    port = str(dataSocket)

                    host = socket.gethostname()
                    socketId = host + ":" + port
                    ipFromHost = socket.gethostbyaddr(host)[2]
                    if len(ipFromHost) == 1:
                        ip = ipFromHost[0]
Manuela Kuhn's avatar
Manuela Kuhn committed
319

320
321
            elif len(self.targets) == 1:
                host, port = self.targets[0][0].split(":")
Manuela Kuhn's avatar
Manuela Kuhn committed
322
323
324
325
                ipFromHost = socket.gethostbyaddr(host)[2]
                if len(ipFromHost) == 1:
                    ip = ipFromHost[0]

326
327
            else:
                raise FormatError("Multipe possible ports. Please choose which one to use.")
Manuela Kuhn's avatar
Manuela Kuhn committed
328

329
330
331
            socketId = host + ":" + port
            socketIdToConnect = ip + ":" + port
#            socketIdToConnect = "[" + ip + "]:" + port
Manuela Kuhn's avatar
Manuela Kuhn committed
332

Manuela Kuhn's avatar
Manuela Kuhn committed
333

Manuela Kuhn's avatar
Manuela Kuhn committed
334
335
        self.dataSocket = self.context.socket(zmq.PULL)
        # An additional socket is needed to establish the data retriving mechanism
Manuela Kuhn's avatar
Manuela Kuhn committed
336
        connectionStr = "tcp://" + socketIdToConnect
337
338
        if whitelist:
            self.dataSocket.zap_domain = b'global'
339

Manuela Kuhn's avatar
Manuela Kuhn committed
340
        try:
341
#            self.dataSocket.ipv6 = True
Manuela Kuhn's avatar
Manuela Kuhn committed
342
            self.dataSocket.bind(connectionStr)
343
#            self.dataSocket.bind("tcp://[2003:ce:5bc0:a600:fa16:54ff:fef4:9fc0]:50102")
344
            self.log.info("Data socket of type " + self.connectionType + " started (bind) for '" + connectionStr + "'")
Manuela Kuhn's avatar
Manuela Kuhn committed
345
346
        except:
            self.log.error("Failed to start Socket of type " + self.connectionType + " (bind): '" + connectionStr + "'", exc_info=True)
347
            raise
Manuela Kuhn's avatar
Manuela Kuhn committed
348

349
        self.poller.register(self.dataSocket, zmq.POLLIN)
Manuela Kuhn's avatar
Manuela Kuhn committed
350

351
        if self.connectionType in ["queryNext", "queryMetadata"]:
Manuela Kuhn's avatar
Manuela Kuhn committed
352
353
354

            self.requestSocket = self.context.socket(zmq.PUSH)
            # An additional socket is needed to establish the data retriving mechanism
Manuela Kuhn's avatar
Manuela Kuhn committed
355
            connectionStr = "tcp://" + self.signalHost + ":" + self.requestPort
Manuela Kuhn's avatar
Manuela Kuhn committed
356
357
            try:
                self.requestSocket.connect(connectionStr)
358
                self.log.info("Request socket started (connect) for '" + connectionStr + "'")
Manuela Kuhn's avatar
Manuela Kuhn committed
359
360
            except:
                self.log.error("Failed to start Socket of type " + self.connectionType + " (connect): '" + connectionStr + "'", exc_info=True)
361
                raise
Manuela Kuhn's avatar
Manuela Kuhn committed
362

Manuela Kuhn's avatar
Manuela Kuhn committed
363
            self.queryNextStarted = socketId
Manuela Kuhn's avatar
Manuela Kuhn committed
364
365
366
367
368
369
370
371
372
373
        else:
            self.streamStarted    = socketId


    ##
    #
    # Receives or queries for new files depending on the connection initialized
    #
    # returns either
    #   the newest file
374
    #       (if connection type "queryNext" or "stream" was choosen)
Manuela Kuhn's avatar
Manuela Kuhn committed
375
    #   the path of the newest file
376
    #       (if connection type "queryMetadata" or "streamMetadata" was choosen)
Manuela Kuhn's avatar
Manuela Kuhn committed
377
378
    #
    ##
379
    def get (self, timeout=None):
Manuela Kuhn's avatar
Manuela Kuhn committed
380
381
382

        if not self.streamStarted and not self.queryNextStarted:
            self.log.info("Could not communicate, no connection was initialized.")
Manuela Kuhn's avatar
Manuela Kuhn committed
383
            return None, None
Manuela Kuhn's avatar
Manuela Kuhn committed
384
385
386
387
388
389
390

        if self.queryNextStarted :

            sendMessage = ["NEXT", self.queryNextStarted]
            try:
                self.requestSocket.send_multipart(sendMessage)
            except Exception as e:
Manuela Kuhn's avatar
Manuela Kuhn committed
391
392
                self.log.error("Could not send request to requestSocket", exc_info=True)
                return None, None
Manuela Kuhn's avatar
Manuela Kuhn committed
393

394
395
396
397
398
399
400
401
402
403
404
405
406
407
        while True:
            # receive data
            if timeout:
                try:
                    socks = dict(self.poller.poll(timeout))
                except:
                    self.log.error("Could not poll for new message")
                    raise
            else:
                try:
                    socks = dict(self.poller.poll())
                except:
                    self.log.error("Could not poll for new message")
                    raise
Manuela Kuhn's avatar
Manuela Kuhn committed
408

409
410
            # if there was a response
            if self.dataSocket in socks and socks[self.dataSocket] == zmq.POLLIN:
411

412
413
414
                try:
                    multipartMessage = self.dataSocket.recv_multipart()
                except:
root's avatar
root committed
415
                    self.log.error("Receiving data..failed.", exc_info=True)
416
417
                    return [None, None]

root's avatar
root committed
418
419

                if multipartMessage[0] == b"ALIVE_TEST":
420
421
422
                    continue
                elif len(multipartMessage) < 2:
                    self.log.error("Received mutipart-message is too short. Either config or file content is missing.")
root's avatar
root committed
423
                    self.log.debug("multipartMessage=" + str(mutipartMessage)[:100])
424
425
426
427
428
429
430
431
                    return [None, None]

                # extract multipart message
                try:
                    metadata = cPickle.loads(multipartMessage[0])
                except:
                    self.log.error("Could not extract metadata from the multipart-message.", exc_info=True)
                    metadata = None
Manuela Kuhn's avatar
Manuela Kuhn committed
432

433
                #TODO validate multipartMessage (like correct dict-values for metadata)
Manuela Kuhn's avatar
Manuela Kuhn committed
434

435
436
437
438
439
                try:
                    payload = multipartMessage[1]
                except:
                    self.log.warning("An empty file was received within the multipart-message", exc_info=True)
                    payload = None
Manuela Kuhn's avatar
Manuela Kuhn committed
440

441
442
443
                return [metadata, payload]
            else:
                self.log.warning("Could not receive data in the given time.")
Manuela Kuhn's avatar
Manuela Kuhn committed
444

445
446
447
448
449
                if self.queryNextStarted :
                    try:
                        self.requestSocket.send_multipart(["CANCEL", self.queryNextStarted])
                    except Exception as e:
                        self.log.error("Could not cancel the next query", exc_info=True)
Manuela Kuhn's avatar
Manuela Kuhn committed
450

451
                return [None, None]
Manuela Kuhn's avatar
Manuela Kuhn committed
452
453


Manuela Kuhn's avatar
Manuela Kuhn committed
454
    def store (self, targetBasePath, dataObject):
Manuela Kuhn's avatar
Manuela Kuhn committed
455
456

        if type(dataObject) is not list and len(dataObject) != 2:
457
            raise FormatError("Wrong input type for 'store'")
Manuela Kuhn's avatar
Manuela Kuhn committed
458
459
460
461
462

        payloadMetadata   = dataObject[0]
        payload           = dataObject[1]


Manuela Kuhn's avatar
Manuela Kuhn committed
463
        if type(payloadMetadata) is not dict:
464
            raise FormatError("payload: Wrong input format in 'store'")
Manuela Kuhn's avatar
Manuela Kuhn committed
465
466
467
468

        #save all chunks to file
        while True:

Manuela Kuhn's avatar
Manuela Kuhn committed
469
            #TODO check if payload != cPickle.dumps(None) ?
Manuela Kuhn's avatar
Manuela Kuhn committed
470
471
472
473
474
475
476
477
478
            if payloadMetadata and payload:
                #append to file
                try:
                    self.log.debug("append to file based on multipart-message...")
                    #TODO: save message to file using a thread (avoids blocking)
                    #TODO: instead of open/close file for each chunk recyle the file-descriptor for all chunks opened
                    self.__appendChunksToFile(targetBasePath, payloadMetadata, payload)
                    self.log.debug("append to file based on multipart-message...success.")
                except KeyboardInterrupt:
Manuela Kuhn's avatar
Manuela Kuhn committed
479
                    self.log.info("KeyboardInterrupt detected. Unable to append multipart-content to file.")
Manuela Kuhn's avatar
Manuela Kuhn committed
480
481
                    break
                except Exception, e:
Manuela Kuhn's avatar
Manuela Kuhn committed
482
483
                    self.log.error("Unable to append multipart-content to file.", exc_info=True)
                    self.log.debug("Append to file based on multipart-message...failed.")
Manuela Kuhn's avatar
Manuela Kuhn committed
484
485
486
487

                if len(payload) < payloadMetadata["chunkSize"] :
                    #indicated end of file. Leave loop
                    filename    = self.generateTargetFilepath(targetBasePath, payloadMetadata)
488
                    fileModTime = payloadMetadata["fileModTime"]
Manuela Kuhn's avatar
Manuela Kuhn committed
489
490
491
492
493
494

                    self.log.info("New file with modification time " + str(fileModTime) + " received and saved: " + str(filename))
                    break

            try:
                [payloadMetadata, payload] = self.get()
Manuela Kuhn's avatar
Manuela Kuhn committed
495
496
            except:
                self.log.error("Getting data failed.", exc_info=True)
Manuela Kuhn's avatar
Manuela Kuhn committed
497
498
499
                break


Manuela Kuhn's avatar
Manuela Kuhn committed
500
    def __appendChunksToFile (self, targetBasePath, configDict, payload):
Manuela Kuhn's avatar
Manuela Kuhn committed
501
502
503
504
505
506
507
508
509
510
511
512
513

        #generate target filepath
        targetFilepath = self.generateTargetFilepath(targetBasePath, configDict)
        self.log.debug("new file is going to be created at: " + targetFilepath)


        #append payload to file
        try:
            newFile = open(targetFilepath, "a")
        except IOError, e:
            # errno.ENOENT == "No such file or directory"
            if e.errno == errno.ENOENT:
                try:
514
                    #TODO do not create commissioning, current, local
Manuela Kuhn's avatar
Manuela Kuhn committed
515
516
517
518
                    targetPath = self.__generateTargetPath(targetBasePath, configDict)
                    os.makedirs(targetPath)
                    newFile = open(targetFilepath, "w")
                    self.log.info("New target directory created: " + str(targetPath))
Manuela Kuhn's avatar
Manuela Kuhn committed
519
                except:
520
                    self.log.error("Unable to save payload to file: '" + targetFilepath + "'")
Manuela Kuhn's avatar
Manuela Kuhn committed
521
                    self.log.debug("targetPath:" + str(targetPath))
Manuela Kuhn's avatar
Manuela Kuhn committed
522
                    raise
Manuela Kuhn's avatar
Manuela Kuhn committed
523
            else:
524
525
                self.log.error("Failed to append payload to file: '" + targetFilepath + "'")
                raise
Manuela Kuhn's avatar
Manuela Kuhn committed
526
        except:
527
528
529
            self.log.error("Failed to append payload to file: '" + targetFilepath + "'")
#            self.log.debug("e.errno = " + str(e.errno) + "        errno.EEXIST==" + str(errno.EEXIST))
            raise
Manuela Kuhn's avatar
Manuela Kuhn committed
530
531
532
533

        #only write data if a payload exist
        try:
            if payload != None:
Manuela Kuhn's avatar
Manuela Kuhn committed
534
                newFile.write(payload)
Manuela Kuhn's avatar
Manuela Kuhn committed
535
            newFile.close()
Manuela Kuhn's avatar
Manuela Kuhn committed
536
        except:
537
            self.log.error("Unable to append data to file.")
Manuela Kuhn's avatar
Manuela Kuhn committed
538
            raise
Manuela Kuhn's avatar
Manuela Kuhn committed
539
540


Manuela Kuhn's avatar
Manuela Kuhn committed
541
    def generateTargetFilepath (self, basePath, configDict):
Manuela Kuhn's avatar
Manuela Kuhn committed
542
543
544
545
        """
        generates full path where target file will saved to.

        """
546
547
548
        if not configDict:
            return None

Manuela Kuhn's avatar
Manuela Kuhn committed
549
        filename     = configDict["filename"]
550
551
552
        #TODO This is due to Windows path names, check if there has do be done anything additionally to work
        # e.g. check sourcePath if it's a windows path
        relativePath = configDict["relativePath"].replace('\\', os.sep)
Manuela Kuhn's avatar
Manuela Kuhn committed
553
554
555
556
557
558
559
560
561
562
563

        if relativePath is '' or relativePath is None:
            targetPath = basePath
        else:
            targetPath = os.path.normpath(basePath + os.sep + relativePath)

        filepath =  os.path.join(targetPath, filename)

        return filepath


Manuela Kuhn's avatar
Manuela Kuhn committed
564
    def __generateTargetPath (self, basePath, configDict):
Manuela Kuhn's avatar
Manuela Kuhn committed
565
566
567
568
        """
        generates path where target file will saved to.

        """
569
570
571
        #TODO This is due to Windows path names, check if there has do be done anything additionally to work
        # e.g. check sourcePath if it's a windows path
        relativePath = configDict["relativePath"].replace('\\', os.sep)
Manuela Kuhn's avatar
Manuela Kuhn committed
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586

        # if the relative path starts with a slash path.join will consider it as absolute path
        if relativePath.startswith("/"):
            relativePath = relativePath[1:]

        targetPath = os.path.join(basePath, relativePath)

        return targetPath


    ##
    #
    # Send signal that the displayer is quitting, close ZMQ connections, destoying context
    #
    ##
Manuela Kuhn's avatar
Manuela Kuhn committed
587
    def stop (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
588
589
590
591
        if self.signalSocket and self.signalExchanged:
            self.log.info("Sending close signal")
            signal = None
            if self.streamStarted or ( "STREAM" in self.signalExchanged):
Manuela Kuhn's avatar
Manuela Kuhn committed
592
                signal = "STOP_STREAM"
593
            elif self.queryNextStarted or ( "QUERY" in self.signalExchanged):
Manuela Kuhn's avatar
Manuela Kuhn committed
594
595
                signal = "STOP_QUERY_NEXT"

Manuela Kuhn's avatar
Manuela Kuhn committed
596

Manuela Kuhn's avatar
Manuela Kuhn committed
597
598
599
            message = self.__sendSignal(signal)
            #TODO need to check correctness of signal?

600
601
602
            self.streamStarted    = None
            self.queryNextStarted = None

Manuela Kuhn's avatar
Manuela Kuhn committed
603
604
605
606
607
608
609
610
611
        try:
            if self.signalSocket:
                self.log.info("closing signalSocket...")
                self.signalSocket.close(linger=0)
                self.signalSocket = None
            if self.dataSocket:
                self.log.info("closing dataSocket...")
                self.dataSocket.close(linger=0)
                self.dataSocket = None
612
613
614
615
            if self.requestSocket:
                self.log.info("closing requestSocket...")
                self.requestSocket.close(linger=0)
                self.requestSocket = None
Manuela Kuhn's avatar
Manuela Kuhn committed
616
617
        except:
            self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
Manuela Kuhn's avatar
Manuela Kuhn committed
618

619
620
621
622
623
624
625
626
        if self.auth:
            try:
                self.auth.stop()
                self.auth = None
                self.log.info("Stopping authentication thread...done.")
            except:
                self.log.error("Stopping authentication thread...done.", exc_info=True)

Manuela Kuhn's avatar
Manuela Kuhn committed
627
628
        # if the context was created inside this class,
        # it has to be destroyed also within the class
629
        if not self.extContext and self.context:
Manuela Kuhn's avatar
Manuela Kuhn committed
630
            try:
631
                self.log.info("Closing ZMQ context...")
Manuela Kuhn's avatar
Manuela Kuhn committed
632
                self.context.destroy(0)
633
634
                self.context = None
                self.log.info("Closing ZMQ context...done.")
Manuela Kuhn's avatar
Manuela Kuhn committed
635
            except:
636
                self.log.error("Closing ZMQ context...failed.", exc_info=True)
Manuela Kuhn's avatar
Manuela Kuhn committed
637
638


639
    def forceStop (self, targets):
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697

        if type(targets) != list:
            self.stop()
            raise FormatError("Argument 'targets' must be list.")

        if not self.context:
            self.context    = zmq.Context()
            self.extContext = False

        signal = None
        # Signal exchange
        if self.connectionType == "stream":
            signalPort = self.signalPort
            signal     = "STOP_STREAM"
        elif self.connectionType == "streamMetadata":
            signalPort = self.signalPort
            signal     = "STOP_STREAM_METADATA"
        elif self.connectionType == "queryNext":
            signalPort = self.signalPort
            signal     = "STOP_QUERY_NEXT"
        elif self.connectionType == "queryMetadata":
            signalPort = self.signalPort
            signal     = "STOP_QUERY_METADATA"

        self.log.debug("Create socket for signal exchange...")


        if self.signalHost and not self.signalSocket:
            self.__createSignalSocket(signalPort)
        elif not self.signalHost:
            self.stop()
            raise ConnectionFailed("No host to send signal to specified." )

        self.__setTargets (targets)

        message = self.__sendSignal(signal)

        if message and message == "VERSION_CONFLICT":
            self.stop()
            raise VersionError("Versions are conflicting.")

        elif message and message == "NO_VALID_HOST":
            self.stop()
            raise AuthenticationFailed("Host is not allowed to connect.")

        elif message and message == "CONNECTION_ALREADY_OPEN":
            self.stop()
            raise CommunicationFailed("Connection is already open.")

        elif message and message == "NO_VALID_SIGNAL":
            self.stop()
            raise CommunicationFailed("Connection type is not supported for this kind of sender.")

        # if there was no response or the response was of the wrong format, the receiver should be shut down
        elif message and message.startswith(signal):
            self.log.info("Received confirmation ...")


Manuela Kuhn's avatar
Manuela Kuhn committed
698
    def __exit__ (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
699
700
701
        self.stop()


Manuela Kuhn's avatar
Manuela Kuhn committed
702
    def __del__ (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
703
704
705
        self.stop()