TangoServer.py 12.7 KB
Newer Older
1
2
#!/usr/bin/env python
#
Manuela Kuhn's avatar
Manuela Kuhn committed
3
4
5
6
7
import thread
import os
import socket
import subprocess
import psutil
8

Manuela Kuhn's avatar
Manuela Kuhn committed
9
10
11
12
13
14
15
16
17
18
PORT = 51000

BASEDIR = "/root/zeromq-data-transfer"
#BASEDIR = "/space/projects/zeromq-data-transfer"

CONFIGPATH = "/root/zeromq-data-transfer/conf"
#CONFIGPATH = "/space/projects/zeromq-data-transfer/conf"

LOGPATH = "/root/zeromq-data-transfer/logs"
#LOGPATH = "/space/projects/zeromq-data-transfer/logs"
19
20
21
22
23

#
# assume that the server listening to 7651 serves p09
#
port2BL = {
Manuela Kuhn's avatar
Manuela Kuhn committed
24
25
26
27
28
29
30
31
32
33
34
35
    "51000": "p00",
    "51001": "p01",
    "51002": "p02",
    "51003": "p03",
    "51004": "p04",
    "51005": "p05",
    "51006": "p06",
    "51007": "p07",
    "51008": "p08",
    "51009": "p09",
    "51010": "p10",
    "51011": "p11"
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
    }


class ZmqDT():
    '''
    this class holds getter/setter for all parameters
    and function members that control the operation.
    '''
    def __init__ (self, beamline):

        # Beamline is read-only, determined by portNo
        self.beamline = beamline
        self.procname = "zeromq-data-transfer_" + self.beamline

        # TangoDevices to talk with
        self.detectorDevice = None
        self.filewriterDevice = None

        # TODO replace TangoDevices with the following
        # IP of the EIGER Detector
        # self.eigerIp = None

        # Number of events stored to look for doubles
        self.historySize = None

        # Target to move the files into
        # e.g. /beamline/p11/current/raw
        self.localTarget = None

        # Flag describing if the data should be stored in localTarget
        self.storeData = None

        # Flag describing if the files should be removed from the source
        self.removeData = None

        # List of hosts allowed to connect to the data distribution
        self.whitelist = None


    def execMsg (self, msg):
        '''
Manuela Kuhn's avatar
Manuela Kuhn committed
77
        set filedir /gpfs/current/raw
78
79
          returns DONE
        get filedir
Manuela Kuhn's avatar
Manuela Kuhn committed
80
          returns /gpfs/current/raw
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
        do reset
          return DONE
        '''
        tokens = msg.split(' ')

        if len( tokens) == 0:
            return "ERROR"

        if tokens[0].lower() == 'set':
            if len( tokens) != 3:
                return "ERROR"

            return self.set(tokens[1], tokens[2])

        elif tokens[0].lower() == 'get':
            if len( tokens) != 2:
                return "ERROR"

            return self.get(tokens[1])

        elif tokens[0].lower() == 'do':
            if len( tokens) != 2:
                return "ERROR"

            return self.do(tokens[1])

        else:
            return "ERROR"


    def set (self, param, value):
        '''
Manuela Kuhn's avatar
Manuela Kuhn committed
113
        set a parameter, e.g.: set filedir /gpfs/current/raw/
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
        '''

        key = param.lower()

        if key == "detectordevice":
            self.detectorDevice = value
            return "DONE"

        elif key == "filewriterdevice":
            self.filewriterDevice = value
            return "DONE"

#        TODO replace detectordevice and filewriterDevice with eigerIP
#        elif key == "eigerIp":
#            self.eigerIP = value
#            return "DONE"

        elif key == "historysize":
            self.historySize = value
            return "DONE"

        elif key == "localtarget":
            self.localTarget = value
            return "DONE"

        elif key == "storedata":
            self.storeData = value
            return "DONE"

        elif key == "removedata":
            self.removeData = value
            return "DONE"

        elif key == "whitelist":
            self.whitelist = value
            return "DONE"

        else:
            return "ERROR"


    def get (self, param):
        '''
Manuela Kuhn's avatar
Manuela Kuhn committed
157
        return the value of a parameter, e.g.: get localtarget
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
        '''
        key = param.lower()

        if key == "detectordevice":
            self.detectorDevice   = value
            return "DONE"

        elif key == "filewriterdevice":
            self.filewriterDevice = value
            return "DONE"

#        TODO replace detectordevice and filewriterDevice with eigerIP
#        elif key == "eigerIP":
#            return self.eigerIp

        elif key == "historysize":
            return self.historySize

        elif key == "localtarget":
            return self.localTarget

        elif key == "storedata":
            return self.storeData

        elif key == "removedata":
            return self.removeData

        elif key == "whitelist":
            return self.whitelist

        else:
            return "ERROR"


    def do (self, cmd):
        '''
        executes commands
        '''
        key = cmd.lower()

        if key == "start":
            return self.start()

        elif key == "stop":
            return self.stop()

Manuela Kuhn's avatar
Manuela Kuhn committed
204
205
        elif key == "restart":
            return self.stop()
206

Manuela Kuhn's avatar
Manuela Kuhn committed
207
208
        elif key == "status":
            return self.stop()
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237

        else:
            return "ERROR"


    def start (self):
        '''
        start ...
        '''
        #
        # see, if all required params are there.
        #

        if (self.detectorDevice
            and self.filewriterDevice
            # TODO replace TangoDevices with the following
            #and self.eigerIp
            and self.historySize
            and self.localTarget
            and self.storeData
            and self.removeData
            and self.whitelist ):

            #
            # execute the start action ...
            #

            # write configfile
            # /etc/zeromq-data-transfer/P01.conf
Manuela Kuhn's avatar
Manuela Kuhn committed
238
            configFile = CONFIGPATH + os.sep + self.beamline + ".conf"
239
            with open(configFile, 'w') as f:
Manuela Kuhn's avatar
Manuela Kuhn committed
240
                f.write("logfilePath        = " + LOGPATH                                       + "\n")
241
242
                f.write("logfileName        = dataManager.log"                                  + "\n")
                f.write("logfileSize        = 10485760"                                         + "\n")
Manuela Kuhn's avatar
Manuela Kuhn committed
243
                f.write("procname           = " + self.procname                                 + "\n")
244
245
246
247
248
249
                f.write("comPort            = 50000"                                            + "\n")
                f.write("requestPort        = 50001"                                            + "\n")

    #            f.write("eventDetectorType  = HttpDetector"                                     + "\n")
                f.write("eventDetectorType  = InotifyxDetector"                                 + "\n")
                f.write('fixSubdirs         = ["commissioning", "current", "local"]'            + "\n")
Manuela Kuhn's avatar
Manuela Kuhn committed
250
                f.write("monitoredDir       = " + BASEDIR + "/data/source"                      + "\n")
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
                f.write("monitoredEventType = IN_CLOSE_WRITE"                                   + "\n")
                f.write('monitoredFormats   = [".tif", ".cbf"]'                                 + "\n")
                f.write("useCleanUp         = False"                                            + "\n")
                f.write("actionTime         = 150"                                              + "\n")
                f.write("timeTillClosed     = 2"                                                + "\n")

    #            f.write("dataFetcherType    = getFromHttp"                                      + "\n")
                f.write("dataFetcherType    = getFromFile"                                      + "\n")

                f.write("numberOfStreams    = 1"                                                + "\n")
                f.write("useDataStream      = False"                                            + "\n")
                f.write("chunkSize          = 10485760"                                         + "\n")


                f.write("detectorDevice     = " +  str(self.detectorDevice)                     + "\n")
                f.write("filewriterDevice   = " +  str(self.filewriterDevice)                   + "\n")
                # TODO replace TangoDevices with the following
                #f.write("eigerIp            = " +  str(self.eigerIp)                            + "\n")
                f.write("historySize        = " +  str(self.historySize)                        + "\n")
                f.write("localTarget        = " +  str(self.localTarget)                        + "\n")
                f.write("storeData          = " +  str(self.storeData)                          + "\n")
                f.write("removeData         = " +  str(self.removeData)                         + "\n")
                f.write("whitelist          = " +  str(self.whitelist)                          + "\n")

Manuela Kuhn's avatar
Manuela Kuhn committed
275
276
277
278
279
280
281
            # check if service is running
            # psutil version 4
            if self.procname in [psutil.Process(i).name() for i in psutil.pids()]:
            # psutil version 1
#            if self.procname in [psutil.Process(i).name for i in psutil.get_pid_list()]:
                print self.procname + " is already running"
                return "ERROR"
282

Manuela Kuhn's avatar
Manuela Kuhn committed
283
284
285
            # start service
            p = subprocess.call(["systemctl", "start", "zeromq-data-transfer@" + self.beamline + ".service"])
            print "returncode=", p
286

Manuela Kuhn's avatar
Manuela Kuhn committed
287
288
289
290
            if p == 0:
                return "DONE"
            else:
                return "ERROR"
291
292
293
294
295
296

        else:
            return "ERROR"


    def stop (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
297
298
        # stop service
        p = subprocess.call(["systemctl", "stop", "zeromq-data-transfer@" + self.beamline + ".service"])
299
300
        return "DONE"

Manuela Kuhn's avatar
Manuela Kuhn committed
301
302
303
304
305
        if p == 0:
            return "DONE"
        else:
            return "ERROR"

306
307

    def restart (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
308
309
310
311
312
313
314
315
316
317
318
        # stop service
        p = subprocess.call(["systemctl", "stop", "zeromq-data-transfer@" + self.beamline + ".service"])

        # start service
        p = subprocess.call(["systemctl", "start", "zeromq-data-transfer@" + self.beamline + ".service"])
        print "returncode=", p

        if p == 0:
            return "DONE"
        else:
            return "ERROR"
319
320
321


    def status (self):
Manuela Kuhn's avatar
Manuela Kuhn committed
322
323
324
325
326
327
328
        if self.procname in [psutil.Process(i).name() for i in psutil.pids()]:
        # psutil version 1
#            if self.procname in [psutil.Process(i).name for i in psutil.get_pid_list()]:
            print self.procname + " is already running"
            return "RUNNING"
        else:
            return "NOT RUNNING"
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446


class sockel (object):
    '''
    one socket for the port, accept() generates new sockets
    '''
    sckt = None

    def __init__ (self):
        self.conn   = None
        self.addr   = None
        self.host   = socket.gethostname()
        self.port   = PORT # TODO init variable

        if sockel.sckt is None:
            self.createSocket()

        self.conn, self.addr = sockel.sckt.accept()

        if not str(PORT) in port2BL.keys():
            raise Exception( "sockel.__init__: port %d not identified" % str(PORT))

        self.zmqDT = ZmqDT( port2BL[ str(PORT)])


    def createSocket (self):
        print "create the main socket"
        try:
            sockel.sckt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        except Exception, e:
            print "socket() failed", e
            sys.exit()
        try:
            sockel.sckt.bind( (self.host, self.port))
        except Exception, e:
            raise Exception( "sockel.__init__: bind() failed %s" % str(e))

        print "bind( %s, %d) ok" % (self.host, self.port)
        sockel.sckt.listen(5)


    def close (self):
        #
        # close the 'accepted' socket only, not the main socket
        # because it may still be in use by another client
        #
        if not self.conn is None:
            self.conn.close()


    def finish (self):
        sockel.sckt.close()


    def recv (self):
        argout = None
        try:
            argout = self.conn.recv(1024)
        except Exception, e:
            print e
            argout = None

        print "recv (len %2d): %s" % (len( argout.strip()), argout.strip())

        return argout.strip()


    def send (self, msg):
        try:
            argout = self.conn.send( msg)
        except:
            argout = ""

        print "sent (len %2d): %s" % (argout, msg)

        return argout


def socketAcceptor ():
    # waits for new accepts on the original socket,
    # receives the newly created socket and
    # creates threads to handle each client separatly
    while True:
        s = sockel()
        print "socketAcceptor: new connect"
        thread.start_new_thread( socketServer, (s, ))


def socketServer (s):

    while True:

        msg = s.recv()

        if len(msg) == 0:
            print "received empty msg"
            continue

        elif msg.lower().find('bye') == 0:
            print "received 'bye', closing socket"
            s.close()
            break

        elif msg.find('exit') >= 0:
            s.close()
            s.finish()
            os._exit(1)

        reply = s.zmqDT.execMsg (msg)

        if s.send (reply) == 0:
            s.close()
            break


if __name__ == '__main__':
    socketAcceptor()