getFromFile.py 12.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'

import zmq
import os
import sys
import logging
import traceback
import cPickle
import shutil
Manuela Kuhn's avatar
Manuela Kuhn committed
10
import errno
11

12
from send_helpers import __sendToTargets, DataHandlingError
Manuela Kuhn's avatar
Manuela Kuhn committed
13

14

15
16
17
def setup (log, prop):

    if ( not prop.has_key("fixSubdirs") or
18
        not prop.has_key("storeData") ):
19
20
21
22
23
24

        log.error ("Configuration of wrong format")
        log.debug ("dataFetcherProp="+ str(prop))
        return False

    else:
25
        prop["timeout"]    = -1 #10
26
        prop["removeFlag"] = False
27
        return True
Manuela Kuhn's avatar
Manuela Kuhn committed
28
29


30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def getMetadata (log, metadata, chunkSize, localTarget = None):

    #extract fileEvent metadata
    try:
        #TODO validate metadata dict
        filename     = metadata["filename"]
        sourcePath   = metadata["sourcePath"]
        relativePath = metadata["relativePath"]
    except:
        log.error("Invalid fileEvent message received.", exc_info=True)
        log.debug("metadata=" + str(metadata))
        #skip all further instructions and continue with next iteration
        raise

    # filename = "img.tiff"
    # filepath = "C:\dir"
    #
    # -->  sourceFilePathFull = 'C:\\dir\img.tiff'
    sourceFilePath = os.path.normpath(sourcePath + os.sep + relativePath)
    sourceFile     = os.path.join(sourceFilePath, filename)

    #TODO combine better with sourceFile... (for efficiency)
    if localTarget:
53
        targetFilePath = os.path.normpath(localTarget + os.sep + relativePath)
54
55
56
57
58
59
60
        targetFile     = os.path.join(targetFilePath, filename)
    else:
        targetFile     = None

    try:
        # For quick testing set filesize of file as chunksize
        log.debug("get filesize for '" + str(sourceFile) + "'...")
61
62
63
64
        filesize       = os.path.getsize(sourceFile)
        fileModTime    = os.stat(sourceFile).st_mtime
        fileCreateTime = os.stat(sourceFile).st_ctime
        chunksize      = filesize    #can be used later on to split multipart message
65
66
67
68
        log.debug("filesize(%s) = %s" % (sourceFile, str(filesize)))
        log.debug("fileModTime(%s) = %s" % (sourceFile, str(fileModTime)))

    except:
69
        log.error("Unable to create metadata dictionary.")
70
71
72
73
74
75
76
77
78
79
80
81
        raise

    try:
        log.debug("create metadata for source file...")
        #metadata = {
        #        "filename"     : filename,
        #        "sourcePath"   : sourcePath,
        #        "relativePath" : relativePath,
        #        "filesize"     : filesize,
        #        "fileModTime"  : fileModTime,
        #        "chunkSize"    : self.zmqMessageChunkSize
        #        }
82
83
84
85
        metadata[ "filesize"    ]   = filesize
        metadata[ "fileModTime" ]   = fileModTime
        metadata[ "fileCreateTime"] = fileCreateTime
        metadata[ "chunkSize"   ]   = chunkSize
86
87
88

        log.debug("metadata = " + str(metadata))
    except:
89
        log.error("Unable to assemble multi-part message.")
90
91
92
93
94
        raise

    return sourceFile, targetFile, metadata


95
def sendData (log, targets, sourceFile, targetFile, metadata, openConnections, context, prop):
Manuela Kuhn's avatar
Manuela Kuhn committed
96

97
98
99
100
    targets_data     = [i for i in targets if i[2] == "data"]

    if not targets_data:
        prop["removeFlag"] = True
Manuela Kuhn's avatar
Manuela Kuhn committed
101
102
        return

103
104
    prop["removeFlag"] = False
    chunkSize          = metadata[ "chunkSize" ]
105

106
107
    chunkNumber        = 0
    sendError          = False
108

109
110
111
112
113
114
115
116
    #reading source file into memory
    try:
        log.debug("Opening '" + str(sourceFile) + "'...")
        fileDescriptor = open(str(sourceFile), "rb")
    except:
        log.error("Unable to read source file '" + str(sourceFile) + "'.", exc_info=True)
        raise

117
118
    log.debug("Passing multipart-message for file " + str(sourceFile) + "...")
    while True:
119

120
121
        #read next chunk from file
        fileContent = fileDescriptor.read(chunkSize)
122

123
124
125
        #detect if end of file has been reached
        if not fileContent:
            break
126

127
        try:
128
            #assemble metadata for zmq-message
129
130
            chunkMetadata = metadata.copy()
            chunkMetadata["chunkNumber"] = chunkNumber
131

132
            chunkPayload = []
133
            chunkPayload.append(cPickle.dumps(chunkMetadata))
134
            chunkPayload.append(fileContent)
135
136
        except:
            log.error("Unable to pack multipart-message for file " + str(sourceFile), exc_info=True)
137

138
139
140
141
        #send message to data targets
        try:
            __sendToTargets(log, targets_data, sourceFile, targetFile, openConnections, None, chunkPayload, context)
            log.debug("Passing multipart-message for file " + str(sourceFile) + " (chunk " + str(chunkNumber) + ")...done.")
142

143
144
145
        except DataHandlingError:
            log.error("Unable to send multipart-message for file " + str(sourceFile) + " (chunk " + str(chunkNumber) + ")", exc_info=True)
            sendError = True
146
147
        except:
            log.error("Unable to send multipart-message for file " + str(sourceFile) + " (chunk " + str(chunkNumber) + ")", exc_info=True)
148

149
        chunkNumber += 1
Manuela Kuhn's avatar
Manuela Kuhn committed
150

151
152
153
154
    #close file
    try:
        log.debug("Closing '" + str(targetFile) + "'...")
        fileDescriptor.close()
155
    except:
156
        log.error("Unable to close target file '" + str(targetFile) + "'.", exc_info=True)
157
        raise
158

159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
    if not sendError:
        prop["removeFlag"] = True


def __dataHandling(log, sourceFile, targetFile, actionFunction, metadata, prop):
    try:
        actionFunction(sourceFile, targetFile)
    except IOError as e:

        # errno.ENOENT == "No such file or directory"
        if e.errno == errno.ENOENT:
            subdir, tmp = os.path.split(metadata["relativePath"])

            if metadata["relativePath"] in prop["fixSubdirs"]:
                log.error("Unable to copy/move file '" + sourceFile + "' to '" + targetFile +
                          ": Directory " + metadata["relativePath"] + " is not available.", exc_info=True)
                raise
            elif subdir in prop["fixSubdirs"] :
                log.error("Unable to copy/move file '" + sourceFile + "' to '" + targetFile +
                          ": Directory " + subdir + " is not available.", exc_info=True)
                raise
            else:
                try:
                    targetPath, filename = os.path.split(targetFile)
                    os.makedirs(targetPath)
                    log.info("New target directory created: " + str(targetPath))
                    actionFunction(sourceFile, targetFile)
                except:
                    log.error("Unable to copy/move file '" + sourceFile + "' to '" + targetFile, exc_info=True)
                    log.debug("targetPath:" + str(targetPath))
        else:
            log.error("Unable to copy/move file '" + sourceFile + "' to '" + targetFile, exc_info=True)
            raise
    except:
        log.error("Unable to copy/move file '" + sourceFile + "' to '" + targetFile, exc_info=True)
        raise
195

Manuela Kuhn's avatar
Manuela Kuhn committed
196

197
198
199
def finishDataHandling (log, targets, sourceFile, targetFile, metadata, openConnections, context, prop):

    targets_metadata = [i for i in targets if i[2] == "metadata"]
200

201
    if prop["storeData"] and prop["removeFlag"]:
Manuela Kuhn's avatar
Manuela Kuhn committed
202
203
204

        # move file
        try:
205
            __dataHandling(log, sourceFile, targetFile, shutil.move, metadata, prop)
Manuela Kuhn's avatar
Manuela Kuhn committed
206
207
            log.info("Moving file '" + str(sourceFile) + "' ...success.")
        except:
208
209
210
211
212
            return

        #send message to metadata targets
        if targets_metadata:
            try:
213
                __sendToTargets(log, targets_metadata, sourceFile, targetFile, openConnections, metadata, None, context, prop["timeout"])
214
215
216
217
218
                log.debug("Passing metadata multipart-message for file " + str(sourceFile) + "...done.")

            except:
                log.error("Unable to send metadata multipart-message for file " + str(sourceFile), exc_info=True)

219
    elif prop["storeData"]:
220
221
222
223
224
225
226

        # copy file
        # (does not preserve file owner, group or ACLs)
        try:
            __dataHandling(log, sourceFile, targetFile, shutil.copy, metadata, prop)
            log.info("Copying file '" + str(sourceFile) + "' ...success.")
        except:
227
            return
Manuela Kuhn's avatar
Manuela Kuhn committed
228

229
        #send message to metadata targets
230
231
        if targets_metadata:
            try:
232
                __sendToTargets(log, targets_metadata, sourceFile, targetFile, openConnections, metadata, None, context, prop["timeout"])
233
                log.debug("Passing metadata multipart-message for file " + str(sourceFile) + "...done.")
234

235
236
            except:
                log.error("Unable to send metadata multipart-message for file " + str(sourceFile), exc_info=True)
237
238
239
240
241
242
243
244
245
246
247

    elif prop["removeFlag"]:
        # remove file
        try:
            os.remove(sourceFile)
            log.info("Removing file '" + str(sourceFile) + "' ...success.")
        except:
            log.error("Unable to remove file " + str(sourceFile), exc_info=True)

        prop["removeFlag"] = False

248
249
250
251
252
253
254
255
256
        #send message to metadata targets
        if targets_metadata:
            try:
                __sendToTargets(log, targets_metadata, sourceFile, targetFile, openConnections, metadata, None, context, prop["timeout"] )
                log.debug("Passing metadata multipart-message for file " + str(sourceFile) + "...done.")

            except:
                log.error("Unable to send metadata multipart-message for file " + str(sourceFile), exc_info=True)

Manuela Kuhn's avatar
Manuela Kuhn committed
257

258
def clean (prop):
Manuela Kuhn's avatar
Manuela Kuhn committed
259
260
261
    pass


262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
if __name__ == '__main__':
    import time
    from shutil import copyfile

    try:
        BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ))))
    except:
        BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.abspath ( sys.argv[0] ) ))))
    print "BASE_PATH", BASE_PATH
    SHARED_PATH  = BASE_PATH + os.sep + "src" + os.sep + "shared"

    if not SHARED_PATH in sys.path:
        sys.path.append ( SHARED_PATH )
    del SHARED_PATH

    import helpers

    logfile = BASE_PATH + os.sep + "logs" + os.sep + "getFromFile.log"
Manuela Kuhn's avatar
Manuela Kuhn committed
280
    logsize = 10485760
281
282

    # Get the log Configuration for the lisener
Manuela Kuhn's avatar
Manuela Kuhn committed
283
    h1, h2 = helpers.getLogHandlers(logfile, logsize, verbose=True, onScreenLogLevel="debug")
284
285
286
287
288
289
290

    # Create log and set handler to queue handle
    root = logging.getLogger()
    root.setLevel(logging.DEBUG) # Log level = DEBUG
    root.addHandler(h1)
    root.addHandler(h2)

Manuela Kuhn's avatar
Manuela Kuhn committed
291
292
293
    receivingPort    = "6005"
    receivingPort2   = "6006"
    extIp            = "0.0.0.0"
294

Manuela Kuhn's avatar
Manuela Kuhn committed
295
    context          = zmq.Context.instance()
296

Manuela Kuhn's avatar
Manuela Kuhn committed
297
298
    receivingSocket  = context.socket(zmq.PULL)
    connectionStr    = "tcp://{ip}:{port}".format( ip=extIp, port=receivingPort )
299
300
301
302
    receivingSocket.bind(connectionStr)
    logging.info("=== receivingSocket connected to " + connectionStr)

    receivingSocket2 = context.socket(zmq.PULL)
Manuela Kuhn's avatar
Manuela Kuhn committed
303
    connectionStr    = "tcp://{ip}:{port}".format( ip=extIp, port=receivingPort2 )
304
305
306
307
308
309
310
311
312
313
    receivingSocket2.bind(connectionStr)
    logging.info("=== receivingSocket2 connected to " + connectionStr)


    prework_sourceFile = BASE_PATH + os.sep + "test_file.cbf"
    prework_targetFile = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep + "raw" + os.sep + "100.cbf"

    copyfile(prework_sourceFile, prework_targetFile)
    time.sleep(0.5)

314
    workload = {
315
316
317
318
            "sourcePath"  : BASE_PATH + os.sep +"data" + os.sep + "source",
            "relativePath": os.sep + "local" + os.sep + "raw",
            "filename"    : "100.cbf"
            }
Manuela Kuhn's avatar
Manuela Kuhn committed
319
    targets = [['localhost:' + receivingPort, 1, "data"], ['localhost:' + receivingPort2, 0, "data"]]
320
321
322
323
324

    chunkSize       = 10485760 ; # = 1024*1024*10 = 10 MiB
    localTarget     = BASE_PATH + os.sep + "data" + os.sep + "target"
    openConnections = dict()

Manuela Kuhn's avatar
Manuela Kuhn committed
325
326
    dataFetcherProp = {
            "type"       : "getFromFile",
327
328
            "fixSubdirs" : ["commissioning", "current", "local"],
            "storeData"  : False
Manuela Kuhn's avatar
Manuela Kuhn committed
329
            }
330
331
332

    logging.debug("openConnections before function call: " + str(openConnections))

Manuela Kuhn's avatar
Manuela Kuhn committed
333
334
    setup(logging, dataFetcherProp)

335
    sourceFile, targetFile, metadata = getMetadata (logging, workload, chunkSize, localTarget = None)
Manuela Kuhn's avatar
Manuela Kuhn committed
336
    sendData(logging, targets, sourceFile, targetFile, metadata, openConnections, context, dataFetcherProp)
Manuela Kuhn's avatar
Manuela Kuhn committed
337
338

    finishDataHandling(logging, sourceFile, targetFile, dataFetcherProp)
339
340
341
342
343
344
345
346
347
348
349
350
351
352

    logging.debug("openConnections after function call: " + str(openConnections))


    try:
        recv_message = receivingSocket.recv_multipart()
        logging.info("=== received: " + str(cPickle.loads(recv_message[0])))
        recv_message = receivingSocket2.recv_multipart()
        logging.info("=== received 2: " + str(cPickle.loads(recv_message[0])))
    except KeyboardInterrupt:
        pass
    finally:
        receivingSocket.close(0)
        receivingSocket2.close(0)
Manuela Kuhn's avatar
Manuela Kuhn committed
353
        clean(dataFetcherProp)
354
355
        context.destroy()