Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
__author__ = 'Manuela Kuhn <manuela.kuhn@desy.de>'
import zmq
def __sendToTargets(log, targets, sourceFile, openConnections, payload, context, properties):
for target, prio, sendType in targets:
# send data to the data stream to store it in the storage system
if prio == 0:
# socket already known
if target in openConnections:
tracker = openConnections[target].send_multipart(payload, copy=False, track=True)
log.info("Sending message part from file " + str(sourceFile) +
" to '" + target + "' with priority " + str(prio) )
else:
# open socket
socket = context.socket(zmq.PUSH)
connectionStr = "tcp://" + str(target)
socket.connect(connectionStr)
log.info("Start socket (connect): '" + str(connectionStr) + "'")
# register socket
openConnections[target] = socket
# send data
tracker = openConnections[target].send_multipart(payload, copy=False, track=True)
log.info("Sending message part from file " + str(sourceFile) +
" to '" + target + "' with priority " + str(prio) )
# socket not known
if not tracker.done:
log.info("Message part from file " + str(sourceFile) +
" has not been sent yet, waiting...")
tracker.wait()
log.info("Message part from file " + str(sourceFile) +
" has not been sent yet, waiting...done")
else:
# socket already known
if target in openConnections:
# send data
openConnections[target].send_multipart(payload, zmq.NOBLOCK)
log.info("Sending message part from file " + str(sourceFile) +
" to " + target)
# socket not known
else:
# open socket
socket = context.socket(zmq.PUSH)
connectionStr = "tcp://" + str(target)
socket.connect(connectionStr)
log.info("Start socket (connect): '" + str(connectionStr) + "'")
# register socket
openConnections[target] = socket
# send data
openConnections[target].send_multipart(payload, zmq.NOBLOCK)
log.info("Sending message part from file " + str(sourceFile) +
" to " + target)