TangoServer.py 14.8 KB
Newer Older
1
2
#!/usr/bin/env python
#
3
import threading
Manuela Kuhn's avatar
Manuela Kuhn committed
4
import os
5
import sys
Manuela Kuhn's avatar
Manuela Kuhn committed
6
7
import socket
import subprocess
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import logging
from multiprocessing import Queue

try:
    BASE_PATH   = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
except:
    BASE_PATH   = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.abspath ( sys.argv[0] ) )))
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
CONFIG_PATH = BASE_PATH + os.sep + "conf"

if not SHARED_PATH in sys.path:
    sys.path.append ( SHARED_PATH )
del SHARED_PATH
del CONFIG_PATH

import helpers
from logutils.queue import QueueHandler


27

Manuela Kuhn's avatar
Manuela Kuhn committed
28
29
30
31
32
33
34
35
36
37
PORT = 51000

BASEDIR = "/root/zeromq-data-transfer"
#BASEDIR = "/space/projects/zeromq-data-transfer"

CONFIGPATH = "/root/zeromq-data-transfer/conf"
#CONFIGPATH = "/space/projects/zeromq-data-transfer/conf"

LOGPATH = "/root/zeromq-data-transfer/logs"
#LOGPATH = "/space/projects/zeromq-data-transfer/logs"
38
39
40
41
42

#
# assume that the server listening to 7651 serves p09
#
port2BL = {
Manuela Kuhn's avatar
Manuela Kuhn committed
43
44
45
46
47
48
49
50
51
52
53
54
    "51000": "p00",
    "51001": "p01",
    "51002": "p02",
    "51003": "p03",
    "51004": "p04",
    "51005": "p05",
    "51006": "p06",
    "51007": "p07",
    "51008": "p08",
    "51009": "p09",
    "51010": "p10",
    "51011": "p11"
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
    }


class ZmqDT():
    '''
    this class holds getter/setter for all parameters
    and function members that control the operation.
    '''
    def __init__ (self, beamline):

        # Beamline is read-only, determined by portNo
        self.beamline = beamline
        self.procname = "zeromq-data-transfer_" + self.beamline

        # TangoDevices to talk with
        self.detectorDevice = None
        self.filewriterDevice = None

        # TODO replace TangoDevices with the following
        # IP of the EIGER Detector
        # self.eigerIp = None

        # Number of events stored to look for doubles
        self.historySize = None

        # Target to move the files into
        # e.g. /beamline/p11/current/raw
        self.localTarget = None

        # Flag describing if the data should be stored in localTarget
        self.storeData = None

        # Flag describing if the files should be removed from the source
        self.removeData = None

        # List of hosts allowed to connect to the data distribution
        self.whitelist = None


    def execMsg (self, msg):
        '''
Manuela Kuhn's avatar
Manuela Kuhn committed
96
        set filedir /gpfs/current/raw
97
98
          returns DONE
        get filedir
Manuela Kuhn's avatar
Manuela Kuhn committed
99
          returns /gpfs/current/raw
100
101
102
        do reset
          return DONE
        '''
Manuela Kuhn's avatar
Manuela Kuhn committed
103
        tokens = msg.split(' ', 2)
104

Manuela Kuhn's avatar
Manuela Kuhn committed
105
        if len(tokens) == 0:
106
107
108
            return "ERROR"

        if tokens[0].lower() == 'set':
Manuela Kuhn's avatar
Manuela Kuhn committed
109
            if len( tokens) < 3:
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
                return "ERROR"

            return self.set(tokens[1], tokens[2])

        elif tokens[0].lower() == 'get':
            if len( tokens) != 2:
                return "ERROR"

            return self.get(tokens[1])

        elif tokens[0].lower() == 'do':
            if len( tokens) != 2:
                return "ERROR"

            return self.do(tokens[1])

        else:
            return "ERROR"


    def set (self, param, value):
        '''
Manuela Kuhn's avatar
Manuela Kuhn committed
132
        set a parameter, e.g.: set filedir /gpfs/current/raw/
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
        '''

        key = param.lower()

        if key == "detectordevice":
            self.detectorDevice = value
            return "DONE"

        elif key == "filewriterdevice":
            self.filewriterDevice = value
            return "DONE"

#        TODO replace detectordevice and filewriterDevice with eigerIP
#        elif key == "eigerIp":
#            self.eigerIP = value
#            return "DONE"

        elif key == "historysize":
            self.historySize = value
            return "DONE"

        elif key == "localtarget":
            self.localTarget = value
            return "DONE"

        elif key == "storedata":
            self.storeData = value
            return "DONE"

        elif key == "removedata":
            self.removeData = value
            return "DONE"

        elif key == "whitelist":
            self.whitelist = value
            return "DONE"

        else:
            return "ERROR"


    def get (self, param):
        '''
Manuela Kuhn's avatar
Manuela Kuhn committed
176
        return the value of a parameter, e.g.: get localtarget
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
        '''
        key = param.lower()

        if key == "detectordevice":
            self.detectorDevice   = value
            return "DONE"

        elif key == "filewriterdevice":
            self.filewriterDevice = value
            return "DONE"

#        TODO replace detectordevice and filewriterDevice with eigerIP
#        elif key == "eigerIP":
#            return self.eigerIp

        elif key == "historysize":
            return self.historySize

        elif key == "localtarget":
            return self.localTarget

        elif key == "storedata":
            return self.storeData

        elif key == "removedata":
            return self.removeData

        elif key == "whitelist":
            return self.whitelist

        else:
            return "ERROR"


    def do (self, cmd):
        '''
        executes commands
        '''
        key = cmd.lower()

        if key == "start":
            return self.start()

        elif key == "stop":
            return self.stop()

Manuela Kuhn's avatar
Manuela Kuhn committed
223
        elif key == "restart":
Manuela Kuhn's avatar
Manuela Kuhn committed
224
            return self.restart()
225

Manuela Kuhn's avatar
Manuela Kuhn committed
226
        elif key == "status":
Manuela Kuhn's avatar
Manuela Kuhn committed
227
            return self.status()
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256

        else:
            return "ERROR"


    def start (self):
        '''
        start ...
        '''
        #
        # see, if all required params are there.
        #

        if (self.detectorDevice
            and self.filewriterDevice
            # TODO replace TangoDevices with the following
            #and self.eigerIp
            and self.historySize
            and self.localTarget
            and self.storeData
            and self.removeData
            and self.whitelist ):

            #
            # execute the start action ...
            #

            # write configfile
            # /etc/zeromq-data-transfer/P01.conf
Manuela Kuhn's avatar
Manuela Kuhn committed
257
            configFile = CONFIGPATH + os.sep + self.beamline + ".conf"
258
            with open(configFile, 'w') as f:
Manuela Kuhn's avatar
Manuela Kuhn committed
259
                f.write("logfilePath        = " + LOGPATH                                       + "\n")
260
261
                f.write("logfileName        = dataManager.log"                                  + "\n")
                f.write("logfileSize        = 10485760"                                         + "\n")
Manuela Kuhn's avatar
Manuela Kuhn committed
262
                f.write("procname           = " + self.procname                                 + "\n")
263
264
265
266
267
268
                f.write("comPort            = 50000"                                            + "\n")
                f.write("requestPort        = 50001"                                            + "\n")

    #            f.write("eventDetectorType  = HttpDetector"                                     + "\n")
                f.write("eventDetectorType  = InotifyxDetector"                                 + "\n")
                f.write('fixSubdirs         = ["commissioning", "current", "local"]'            + "\n")
Manuela Kuhn's avatar
Manuela Kuhn committed
269
                f.write("monitoredDir       = " + BASEDIR + "/data/source"                      + "\n")
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
                f.write("monitoredEventType = IN_CLOSE_WRITE"                                   + "\n")
                f.write('monitoredFormats   = [".tif", ".cbf"]'                                 + "\n")
                f.write("useCleanUp         = False"                                            + "\n")
                f.write("actionTime         = 150"                                              + "\n")
                f.write("timeTillClosed     = 2"                                                + "\n")

    #            f.write("dataFetcherType    = getFromHttp"                                      + "\n")
                f.write("dataFetcherType    = getFromFile"                                      + "\n")

                f.write("numberOfStreams    = 1"                                                + "\n")
                f.write("useDataStream      = False"                                            + "\n")
                f.write("chunkSize          = 10485760"                                         + "\n")


                f.write("detectorDevice     = " +  str(self.detectorDevice)                     + "\n")
                f.write("filewriterDevice   = " +  str(self.filewriterDevice)                   + "\n")
                # TODO replace TangoDevices with the following
                #f.write("eigerIp            = " +  str(self.eigerIp)                            + "\n")
                f.write("historySize        = " +  str(self.historySize)                        + "\n")
                f.write("localTarget        = " +  str(self.localTarget)                        + "\n")
                f.write("storeData          = " +  str(self.storeData)                          + "\n")
                f.write("removeData         = " +  str(self.removeData)                         + "\n")
                f.write("whitelist          = " +  str(self.whitelist)                          + "\n")

Manuela Kuhn's avatar
Manuela Kuhn committed
294
            # check if service is running
295
            if self.status() == "RUNNING":
Manuela Kuhn's avatar
Manuela Kuhn committed
296
                return "ERROR"
297

Manuela Kuhn's avatar
Manuela Kuhn committed
298
299
            # start service
            p = subprocess.call(["systemctl", "start", "zeromq-data-transfer@" + self.beamline + ".service"])
300

Manuela Kuhn's avatar
Manuela Kuhn committed
301
302
303
304
            if p == 0:
                return "DONE"
            else:
                return "ERROR"
305
306
307
308
309
310

        else:
            return "ERROR"


    def stop (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
311
312
        # stop service
        p = subprocess.call(["systemctl", "stop", "zeromq-data-transfer@" + self.beamline + ".service"])
313
314
        return "DONE"

Manuela Kuhn's avatar
Manuela Kuhn committed
315
316
317
318
319
        if p == 0:
            return "DONE"
        else:
            return "ERROR"

320
321

    def restart (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
322
        # stop service
323
        self.stop()
Manuela Kuhn's avatar
Manuela Kuhn committed
324
325

        # start service
326
        self.start()
327
328
329


    def status (self):
330
331
332
        p = subprocess.call(["systemctl", "is-active", "zeromq-data-transfer@" + self.beamline + ".service"])

        if p == 0:
Manuela Kuhn's avatar
Manuela Kuhn committed
333
334
335
            return "RUNNING"
        else:
            return "NOT RUNNING"
336
337


338
class socketCom (object):
339
340
341
342
    '''
    one socket for the port, accept() generates new sockets
    '''

343
    def __init__ (self, logQueue):
344
345
346
347
348
        self.conn   = None
        self.addr   = None
        self.host   = socket.gethostname()
        self.port   = PORT # TODO init variable

349
350
        self.log = self.getLogger(logQueue)

351
        self.createSocket()
352
353


354
355
356
    def getLogger (self, queue):
        # Create log and set handler to queue handle
        h = QueueHandler(queue) # Just the one handler needed
357
        logger = logging.getLogger("socketCom")
358
359
360
361
362
363
364
        logger.propagate = False
        logger.addHandler(h)
        logger.setLevel(logging.DEBUG)

        return logger


365
    def createSocket (self):
366

367
        try:
368
            self.sckt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
369
            self.log.debug("Socket created.")
370
        except Exception:
371
            self.log.error("Creation of socket failed", exc_info=True)
372
            sys.exit()
373

374
        try:
375
            self.sckt.bind( (self.host, self.port))
376
377
378
379
            self.log.info("Start socket (bind):  {h}, {p}".format(h=self.host, p=self.port))
        except Exception:
            self.log.error("Failed to start socket (bind).", exc_info=True)
            raise
380
381
382
383
384
385
386
387
388
389
390
391
392
393

        self.sckt.listen(5)


    def run (self):
        while True:
            self.conn, self.addr = self.sckt.accept()

            threading.Thread(target=self.socketServer).start()



    def socketServer (self):
        if not str(PORT) in port2BL.keys():
394
            raise Exception("Port {p} not identified".format(p=PORT))
395
396
397
398
399
400
401
402

        self.zmqDT = ZmqDT(port2BL[str(PORT)])

        while True:

            msg = self.recv()

            if len(msg) == 0:
403
                self.log.debug("Received empty msg")
404
405
406
                continue

            elif msg.lower().find('bye') == 0:
407
                self.log.debug("Received 'bye'")
408
409
410
411
                self.close()
                break

            elif msg.find('exit') >= 0:
412
                self.log.debug("Received 'exit'")
413
414
415
416
417
                self.close()
                self.finish()
                sys.exit(1)

            reply = self.zmqDT.execMsg (msg)
418

419
420
421
            if self.send (reply) == 0:
                self.close()
                break
422
423
424
425
426
427
428


    def close (self):
        #
        # close the 'accepted' socket only, not the main socket
        # because it may still be in use by another client
        #
429
430
        if self.conn:
            self.log.info("Closing connection")
431
            self.conn.close()
432
            self.conn = None
433
434
435


    def finish (self):
436
437
438
439
        if self.sckt:
            self.log.info("Closing Socket")
            self.sckt.close()
            self.sckt = None
440
441
442
443
444
445
446
447
448
449


    def recv (self):
        argout = None
        try:
            argout = self.conn.recv(1024)
        except Exception, e:
            print e
            argout = None

450
        self.log.debug("Recv (len {l: <2}): {m}".format(l=len(argout.strip()), m=argout.strip()))
451
452
453
454
455
456

        return argout.strip()


    def send (self, msg):
        try:
457
            argout = self.conn.send(msg)
458
459
460
        except:
            argout = ""

461
        self.log.debug("Send (len {l: <2}): {m}".format(l=argout, m=msg))
462
463
464
465

        return argout


466
467
468
469
470
471
472
473
474
475
    def __exit__ (self):
        print "exit"
        self.finish()


    def __del__ (self):
        print "del"
        self.finish()


476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
class TangoServer():
    def __init__(self):
        onScreen = "debug"
        verbose  = True
        logfile  = BASE_PATH + os.sep + "logs" + os.sep + "tangoServer.log"
        logsize  = 10485760

        # Get queue
        self.logQueue    = Queue(-1)

        # Get the log Configuration for the lisener
        if onScreen:
            h1, h2 = helpers.getLogHandlers(logfile, logsize, verbose, onScreen)

            # Start queue listener using the stream handler above.
            self.logQueueListener = helpers.CustomQueueListener(self.logQueue, h1, h2)
        else:
            h1 = helpers.getLogHandlers(logfile, logsize, verbose, onScreen)

            # Start queue listener using the stream handler above
            self.logQueueListener = helpers.CustomQueueListener(self.logQueue, h1)

        self.logQueueListener.start()

        # Create log and set handler to queue handle
        self.log = self.getLogger(self.logQueue)

        self.log.info("Init")

505
506
507
        # waits for new accepts on the original socket,
        # receives the newly created socket and
        # creates threads to handle each client separatly
508
        s = socketCom(self.logQueue)
509
510
511

        s.run()

512
513
514
515
516
517
518
519

    def getLogger (self, queue):
        # Create log and set handler to queue handle
        h = QueueHandler(queue) # Just the one handler needed
        logger = logging.getLogger("TangoServer")
        logger.propagate = False
        logger.addHandler(h)
        logger.setLevel(logging.DEBUG)
520
        logger.debug("getLogger (TangoServer)")
521
522
523
524

        return logger


525
if __name__ == '__main__':
526
    t = TangoServer()
527