Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# API to ingest data into a data transfer unit
__version__ = '0.0.1'
import zmq
import socket
import logging
import json
import errno
import os
import cPickle
import traceback
class dataIngest():
# return error code
def __init__(self, useLog = False, context = None):
if useLog:
self.log = logging.getLogger("dataIngestAPI")
else:
class loggingFunction:
def out(self, x, exc_info = None):
if exc_info:
print x, traceback.format_exc()
else:
print x
def __init__(self):
self.debug = lambda x, exc_info=None: self.out(x, exc_info)
self.info = lambda x, exc_info=None: self.out(x, exc_info)
self.warning = lambda x, exc_info=None: self.out(x, exc_info)
self.error = lambda x, exc_info=None: self.out(x, exc_info)
self.critical = lambda x, exc_info=None: self.out(x, exc_info)
self.log = loggingFunction()
# ZMQ applications always start by creating a context,
# and then using that for creating sockets
# (source: ZeroMQ, Messaging for Many Applications by Pieter Hintjens)
if context:
self.context = context
self.extContext = True
else:
self.context = zmq.Context()
self.extContext = False
self.signalHost = "zitpcx19282"
# has to be the same port as configured in dataManager.conf as eventPort
self.eventPort = "50003"
# has to be the same port as configured in dataManager.conf as fixedStreamPort #TODO change that to a different port
self.dataPort = "50100"
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
self.eventSocket = None
self.dataSocket = None
self.openFile = False
self.filePart = None
self.responseTimeout = 1000
self.__createSocket()
def __createSocket(self):
# To send file open and file close notification, a communication socket is needed
self.signalSocket = self.context.socket(zmq.REQ)
# time to wait for the sender to give a confirmation of the signal
# self.signalSocket.RCVTIMEO = self.responseTimeout
connectionStr = "tcp://" + str(self.signalHost) + ":" + str(self.signalPort)
try:
self.signalSocket.connect(connectionStr)
self.log.info("signalSocket started (connect) for '" + connectionStr + "'")
except Exception as e:
self.log.error("Failed to start signalSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
# using a Poller to implement the signalSocket timeout (in older ZMQ version there is no option RCVTIMEO)
self.poller = zmq.Poller()
self.poller.register(self.signalSocket, zmq.POLLIN)
self.eventSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://localhost:" + str(self.eventPort)
try:
self.eventSocket.connect(connectionStr)
self.log.info("eventSocket started (connect) for '" + connectionStr + "'")
except:
self.log.error("Failed to start eventSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
self.dataSocket = self.context.socket(zmq.PUSH)
connectionStr = "tcp://localhost:" + str(self.dataPort)
try:
self.dataSocket.connect(connectionStr)
self.log.info("dataSocket started (connect) for '" + connectionStr + "'")
except:
self.log.error("Failed to start dataSocket (connect): '" + connectionStr + "'", exc_info=True)
raise
# return error code
def createFile(self, filename):
if self.openFile and self.openFile != filename:
raise Exception("File " + str(filename) + " already opened.")
# send notification to receiver
self.log.info("Sending signal to open a new file.")
message = self.signalSocket.recv()
self.log.debug("Received responce: " + str(message))
self.filename = filename
self.filePart = 0
def write(self, data):
# send event to eventDetector
message = {
"filename" : self.filename,
"filePart" : self.filePart
}
self.eventSocket.send(cPickle.dumps(message))
# send data to ZMQ-Queue
self.dataSocket.send(data)
self.filePart += 1
# return error code
def closeFile(self):
# send close-signal to signal socket
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
self.signalSocket.send(sendMessage)
self.log.info("Sending signal to close the file.")
recvMessage = self.signalSocket.recv()
if recvMessage != sendMessage:
self.log.debug("recieved message: " + str(recvMessage))
self.log.debug("send message: " + str(sendMessage))
raise Exception("Something went wrong while notifying to close the file")
# send close-signal to event Detector
self.eventSocket.send(sendMessage)
self.log.debug("Sending signal to close the file.")
self.openFile = None
self.filePart = None
##
#
# Send signal that the displayer is quitting, close ZMQ connections, destoying context
#
##
def stop(self):
try:
if self.signalSocket:
self.log.info("closing eventSocket...")
self.signalSocket.close(linger=0)
self.signalSocket = None
if self.eventSocket:
self.log.info("closing eventSocket...")
self.eventSocket.close(linger=0)
self.eventSocket = None
if self.dataSocket:
self.log.info("closing dataSocket...")
self.dataSocket.close(linger=0)
self.dataSocket = None
except:
self.log.error("closing ZMQ Sockets...failed.", exc_info=True)
# if the context was created inside this class,
# it has to be destroyed also within the class
if not self.extContext and self.context:
try:
self.log.info("Closing ZMQ context...")
self.context.destroy()
self.context = None
self.log.info("Closing ZMQ context...done.")
except:
self.log.error("Closing ZMQ context...failed.", exc_info=True)
def __exit__(self):
self.stop()
def __del__(self):
self.stop()