TangoServer.py 15.6 KB
Newer Older
1
2
#!/usr/bin/env python
#
3
import threading
Manuela Kuhn's avatar
Manuela Kuhn committed
4
import os
5
import sys
Manuela Kuhn's avatar
Manuela Kuhn committed
6
7
import socket
import subprocess
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import logging
from multiprocessing import Queue

try:
    BASE_PATH   = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) )))
except:
    BASE_PATH   = os.path.dirname ( os.path.dirname ( os.path.dirname ( os.path.abspath ( sys.argv[0] ) )))
SHARED_PATH = BASE_PATH + os.sep + "src" + os.sep + "shared"
CONFIG_PATH = BASE_PATH + os.sep + "conf"

if not SHARED_PATH in sys.path:
    sys.path.append ( SHARED_PATH )
del SHARED_PATH
del CONFIG_PATH

import helpers
from logutils.queue import QueueHandler


27

Manuela Kuhn's avatar
Manuela Kuhn committed
28
29
30
31
32
33
34
35
36
37
PORT = 51000

BASEDIR = "/root/zeromq-data-transfer"
#BASEDIR = "/space/projects/zeromq-data-transfer"

CONFIGPATH = "/root/zeromq-data-transfer/conf"
#CONFIGPATH = "/space/projects/zeromq-data-transfer/conf"

LOGPATH = "/root/zeromq-data-transfer/logs"
#LOGPATH = "/space/projects/zeromq-data-transfer/logs"
38
39
40
41
42

#
# assume that the server listening to 7651 serves p09
#
port2BL = {
Manuela Kuhn's avatar
Manuela Kuhn committed
43
44
45
46
47
48
49
50
51
52
53
54
    "51000": "p00",
    "51001": "p01",
    "51002": "p02",
    "51003": "p03",
    "51004": "p04",
    "51005": "p05",
    "51006": "p06",
    "51007": "p07",
    "51008": "p08",
    "51009": "p09",
    "51010": "p10",
    "51011": "p11"
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
    }


class ZmqDT():
    '''
    this class holds getter/setter for all parameters
    and function members that control the operation.
    '''
    def __init__ (self, beamline):

        # Beamline is read-only, determined by portNo
        self.beamline = beamline
        self.procname = "zeromq-data-transfer_" + self.beamline

        # TangoDevices to talk with
        self.detectorDevice = None
        self.filewriterDevice = None

        # TODO replace TangoDevices with the following
        # IP of the EIGER Detector
        # self.eigerIp = None

        # Number of events stored to look for doubles
        self.historySize = None

        # Target to move the files into
        # e.g. /beamline/p11/current/raw
        self.localTarget = None

        # Flag describing if the data should be stored in localTarget
        self.storeData = None

        # Flag describing if the files should be removed from the source
        self.removeData = None

        # List of hosts allowed to connect to the data distribution
        self.whitelist = None


    def execMsg (self, msg):
        '''
Manuela Kuhn's avatar
Manuela Kuhn committed
96
        set filedir /gpfs/current/raw
97
98
          returns DONE
        get filedir
Manuela Kuhn's avatar
Manuela Kuhn committed
99
          returns /gpfs/current/raw
100
101
102
        do reset
          return DONE
        '''
Manuela Kuhn's avatar
Manuela Kuhn committed
103
        tokens = msg.split(' ', 2)
104

Manuela Kuhn's avatar
Manuela Kuhn committed
105
        if len(tokens) == 0:
106
107
108
            return "ERROR"

        if tokens[0].lower() == 'set':
Manuela Kuhn's avatar
Manuela Kuhn committed
109
            if len( tokens) < 3:
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
                return "ERROR"

            return self.set(tokens[1], tokens[2])

        elif tokens[0].lower() == 'get':
            if len( tokens) != 2:
                return "ERROR"

            return self.get(tokens[1])

        elif tokens[0].lower() == 'do':
            if len( tokens) != 2:
                return "ERROR"

            return self.do(tokens[1])

        else:
            return "ERROR"


    def set (self, param, value):
        '''
Manuela Kuhn's avatar
Manuela Kuhn committed
132
        set a parameter, e.g.: set filedir /gpfs/current/raw/
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
        '''

        key = param.lower()

        if key == "detectordevice":
            self.detectorDevice = value
            return "DONE"

        elif key == "filewriterdevice":
            self.filewriterDevice = value
            return "DONE"

#        TODO replace detectordevice and filewriterDevice with eigerIP
#        elif key == "eigerIp":
#            self.eigerIP = value
#            return "DONE"

        elif key == "historysize":
            self.historySize = value
            return "DONE"

        elif key == "localtarget":
            self.localTarget = value
            return "DONE"

        elif key == "storedata":
            self.storeData = value
            return "DONE"

        elif key == "removedata":
            self.removeData = value
            return "DONE"

        elif key == "whitelist":
            self.whitelist = value
            return "DONE"

        else:
            return "ERROR"


    def get (self, param):
        '''
Manuela Kuhn's avatar
Manuela Kuhn committed
176
        return the value of a parameter, e.g.: get localtarget
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
        '''
        key = param.lower()

        if key == "detectordevice":
            self.detectorDevice   = value
            return "DONE"

        elif key == "filewriterdevice":
            self.filewriterDevice = value
            return "DONE"

#        TODO replace detectordevice and filewriterDevice with eigerIP
#        elif key == "eigerIP":
#            return self.eigerIp

        elif key == "historysize":
            return self.historySize

        elif key == "localtarget":
            return self.localTarget

        elif key == "storedata":
            return self.storeData

        elif key == "removedata":
            return self.removeData

        elif key == "whitelist":
            return self.whitelist

        else:
            return "ERROR"


    def do (self, cmd):
        '''
        executes commands
        '''
        key = cmd.lower()

        if key == "start":
            return self.start()

        elif key == "stop":
            return self.stop()

Manuela Kuhn's avatar
Manuela Kuhn committed
223
        elif key == "restart":
Manuela Kuhn's avatar
Manuela Kuhn committed
224
            return self.restart()
225

Manuela Kuhn's avatar
Manuela Kuhn committed
226
        elif key == "status":
Manuela Kuhn's avatar
Manuela Kuhn committed
227
            return self.status()
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256

        else:
            return "ERROR"


    def start (self):
        '''
        start ...
        '''
        #
        # see, if all required params are there.
        #

        if (self.detectorDevice
            and self.filewriterDevice
            # TODO replace TangoDevices with the following
            #and self.eigerIp
            and self.historySize
            and self.localTarget
            and self.storeData
            and self.removeData
            and self.whitelist ):

            #
            # execute the start action ...
            #

            # write configfile
            # /etc/zeromq-data-transfer/P01.conf
Manuela Kuhn's avatar
Manuela Kuhn committed
257
            configFile = CONFIGPATH + os.sep + self.beamline + ".conf"
258
            with open(configFile, 'w') as f:
Manuela Kuhn's avatar
Manuela Kuhn committed
259
                f.write("logfilePath        = " + LOGPATH                                       + "\n")
260
261
                f.write("logfileName        = dataManager.log"                                  + "\n")
                f.write("logfileSize        = 10485760"                                         + "\n")
Manuela Kuhn's avatar
Manuela Kuhn committed
262
                f.write("procname           = " + self.procname                                 + "\n")
263
264
265
266
267
268
                f.write("comPort            = 50000"                                            + "\n")
                f.write("requestPort        = 50001"                                            + "\n")

    #            f.write("eventDetectorType  = HttpDetector"                                     + "\n")
                f.write("eventDetectorType  = InotifyxDetector"                                 + "\n")
                f.write('fixSubdirs         = ["commissioning", "current", "local"]'            + "\n")
Manuela Kuhn's avatar
Manuela Kuhn committed
269
                f.write("monitoredDir       = " + BASEDIR + "/data/source"                      + "\n")
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
                f.write("monitoredEventType = IN_CLOSE_WRITE"                                   + "\n")
                f.write('monitoredFormats   = [".tif", ".cbf"]'                                 + "\n")
                f.write("useCleanUp         = False"                                            + "\n")
                f.write("actionTime         = 150"                                              + "\n")
                f.write("timeTillClosed     = 2"                                                + "\n")

    #            f.write("dataFetcherType    = getFromHttp"                                      + "\n")
                f.write("dataFetcherType    = getFromFile"                                      + "\n")

                f.write("numberOfStreams    = 1"                                                + "\n")
                f.write("useDataStream      = False"                                            + "\n")
                f.write("chunkSize          = 10485760"                                         + "\n")


                f.write("detectorDevice     = " +  str(self.detectorDevice)                     + "\n")
                f.write("filewriterDevice   = " +  str(self.filewriterDevice)                   + "\n")
                # TODO replace TangoDevices with the following
                #f.write("eigerIp            = " +  str(self.eigerIp)                            + "\n")
                f.write("historySize        = " +  str(self.historySize)                        + "\n")
                f.write("localTarget        = " +  str(self.localTarget)                        + "\n")
                f.write("storeData          = " +  str(self.storeData)                          + "\n")
                f.write("removeData         = " +  str(self.removeData)                         + "\n")
                f.write("whitelist          = " +  str(self.whitelist)                          + "\n")

Manuela Kuhn's avatar
Manuela Kuhn committed
294
            # check if service is running
295
            if self.status() == "RUNNING":
Manuela Kuhn's avatar
Manuela Kuhn committed
296
                return "ERROR"
297

Manuela Kuhn's avatar
Manuela Kuhn committed
298
299
            # start service
            p = subprocess.call(["systemctl", "start", "zeromq-data-transfer@" + self.beamline + ".service"])
300

Manuela Kuhn's avatar
Manuela Kuhn committed
301
302
303
304
            if p == 0:
                return "DONE"
            else:
                return "ERROR"
305
306
307
308
309
310

        else:
            return "ERROR"


    def stop (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
311
312
        # stop service
        p = subprocess.call(["systemctl", "stop", "zeromq-data-transfer@" + self.beamline + ".service"])
313
314
        return "DONE"

Manuela Kuhn's avatar
Manuela Kuhn committed
315
316
317
318
319
        if p == 0:
            return "DONE"
        else:
            return "ERROR"

320
321

    def restart (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
322
        # stop service
323
        self.stop()
Manuela Kuhn's avatar
Manuela Kuhn committed
324
325

        # start service
326
        self.start()
327
328
329


    def status (self):
330
331
332
        p = subprocess.call(["systemctl", "is-active", "zeromq-data-transfer@" + self.beamline + ".service"])

        if p == 0:
Manuela Kuhn's avatar
Manuela Kuhn committed
333
334
335
            return "RUNNING"
        else:
            return "NOT RUNNING"
336
337


338
class socketServer (object):
339
340
341
342
    '''
    one socket for the port, accept() generates new sockets
    '''

343
    def __init__ (self, logQueue):
344
        self.logQueue = logQueue
345

346
        self.log      = self.getLogger(logQueue)
347

348
349
350
351
352
        self.host     = socket.gethostname()
        self.port     = PORT # TODO init variable
        self.conns    = []
        self.bl       = port2BL[str(PORT)]
        self.socket   = None
353

354
355
356
357
        if not str(PORT) in port2BL.keys():
            raise Exception("Port {p} not identified".format(p=PORT))

        self.createSocket()
358

359
360
361
    def getLogger (self, queue):
        # Create log and set handler to queue handle
        h = QueueHandler(queue) # Just the one handler needed
362
        logger = logging.getLogger("socketServer")
363
364
365
366
367
368
        logger.propagate = False
        logger.addHandler(h)
        logger.setLevel(logging.DEBUG)

        return logger

369
    def createSocket (self):
370

371
        try:
372
            self.sckt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
373
            self.log.debug("Socket created.")
374
        except Exception:
375
            self.log.error("Creation of socket failed", exc_info=True)
376
            sys.exit()
377

378
        try:
379
            self.sckt.bind( (self.host, self.port))
380
381
382
383
            self.log.info("Start socket (bind):  {h}, {p}".format(h=self.host, p=self.port))
        except Exception:
            self.log.error("Failed to start socket (bind).", exc_info=True)
            raise
384
385
386
387
388
389

        self.sckt.listen(5)


    def run (self):
        while True:
390
391
392
393
394
395
396
397
398
            try:
                conn, addr = self.sckt.accept()

                threading.Thread(target=socketCom, args=(self.logQueue, self.bl, conn, addr)).start()
            except KeyboardInterrupt:
                break
            except Exception, e:
                self.log.error("Stopped due to unknown error", exc_info=True)
                break
399
400


401
402
403
404
405
    def finish (self):
        if self.sckt:
            self.log.info("Closing Socket")
            self.sckt.close()
            self.sckt = None
406
407


408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
    def __exit__ (self):
        self.finish()


    def __del__ (self):
        self.finish()


class socketCom ():
    def __init__ (self, logQueue, bl, conn, addr):
        self.id    = threading.current_thread()

        self.log   = self.getLogger(logQueue)

        self.zmqDT = ZmqDT(bl)
        self.conn  = conn
        self.addr  = addr

        self.run()


    def getLogger (self, queue):
        # Create log and set handler to queue handle
        h = QueueHandler(queue) # Just the one handler needed
        logger = logging.getLogger("socketCom_" + str(self.id))
        logger.propagate = False
        logger.addHandler(h)
        logger.setLevel(logging.DEBUG)

        return logger
438

439
440

    def run (self):
441
442
443
444
445
446

        while True:

            msg = self.recv()

            if len(msg) == 0:
447
                self.log.debug("Received empty msg")
448
449
450
                continue

            elif msg.lower().find('bye') == 0:
451
                self.log.debug("Received 'bye'")
452
453
454
455
                self.close()
                break

            elif msg.find('exit') >= 0:
456
                self.log.debug("Received 'exit'")
457
458
459
460
                self.close()
                sys.exit(1)

            reply = self.zmqDT.execMsg (msg)
461

462
463
464
            if self.send (reply) == 0:
                self.close()
                break
465
466
467
468
469
470
471
472
473
474


    def recv (self):
        argout = None
        try:
            argout = self.conn.recv(1024)
        except Exception, e:
            print e
            argout = None

475
        self.log.debug("Recv (len {l: <2}): {m}".format(l=len(argout.strip()), m=argout.strip()))
476
477
478
479
480
481

        return argout.strip()


    def send (self, msg):
        try:
482
            argout = self.conn.send(msg)
483
484
485
        except:
            argout = ""

486
        self.log.debug("Send (len {l: <2}): {m}".format(l=argout, m=msg))
487
488
489
490

        return argout


491
492
493
494
495
496
497
498
499
500
501
    def close (self):
        #
        # close the 'accepted' socket only, not the main socket
        # because it may still be in use by another client
        #
        if self.conn:
            self.log.info("Closing connection")
            self.conn.close()
            self.conn = None


502
    def __exit__ (self):
503
        self.close()
504
505
506


    def __del__ (self):
507
        self.close()
508
509


510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
class TangoServer():
    def __init__(self):
        onScreen = "debug"
        verbose  = True
        logfile  = BASE_PATH + os.sep + "logs" + os.sep + "tangoServer.log"
        logsize  = 10485760

        # Get queue
        self.logQueue    = Queue(-1)

        # Get the log Configuration for the lisener
        if onScreen:
            h1, h2 = helpers.getLogHandlers(logfile, logsize, verbose, onScreen)

            # Start queue listener using the stream handler above.
            self.logQueueListener = helpers.CustomQueueListener(self.logQueue, h1, h2)
        else:
            h1 = helpers.getLogHandlers(logfile, logsize, verbose, onScreen)

            # Start queue listener using the stream handler above
            self.logQueueListener = helpers.CustomQueueListener(self.logQueue, h1)

        self.logQueueListener.start()

        # Create log and set handler to queue handle
        self.log = self.getLogger(self.logQueue)

        self.log.info("Init")

539
540
541
        # waits for new accepts on the original socket,
        # receives the newly created socket and
        # creates threads to handle each client separatly
542
        s = socketServer(self.logQueue)
543
544
545

        s.run()

546
547
548
549
550
551
552
553
554
555
556
557

    def getLogger (self, queue):
        # Create log and set handler to queue handle
        h = QueueHandler(queue) # Just the one handler needed
        logger = logging.getLogger("TangoServer")
        logger.propagate = False
        logger.addHandler(h)
        logger.setLevel(logging.DEBUG)

        return logger


558
if __name__ == '__main__':
559
    t = TangoServer()
560