Skip to content
Snippets Groups Projects
Commit c5798ab1 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Fixed DataManager for Windows

parent 911aae65
No related branches found
No related tags found
No related merge requests found
......@@ -7,11 +7,14 @@ import os
import logging
import sys
import json
import cPickle
from multiprocessing import Process, freeze_support
import ConfigParser
#BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ))
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.abspath ( sys.argv[0] ) ))
try:
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.realpath ( __file__ ) ))
except:
BASE_PATH = os.path.dirname ( os.path.dirname ( os.path.abspath ( sys.argv[0] ) ))
CONFIG_PATH = BASE_PATH + os.sep + "conf"
import shared.helpers as helpers
......@@ -256,9 +259,75 @@ class Sender():
def __exit__(self):
self.stop()
# def __del__(self):
# self.stop()
# cannot be defined in "if __name__ == '__main__'" because then it is unbound
# see https://docs.python.org/2/library/multiprocessing.html#windows
class Test_Receiver_Stream():
def __init__(self, comPort, fixedRecvPort, receivingPort, receivingPort2, logConfig):
if logConfig:
logfile = BASE_PATH + os.sep + "logs" + os.sep + "dataManager_test_.log"
helpers.initLogging(logfile, verbose=True, onScreenLogLevel="debug")
context = zmq.Context.instance()
self.comSocket = context.socket(zmq.REQ)
connectionStr = "tcp://localhost:" + comPort
self.comSocket.connect(connectionStr)
logging.info("=== comSocket connected to " + connectionStr)
self.fixedRecvSocket = context.socket(zmq.PULL)
connectionStr = "tcp://0.0.0.0:" + fixedRecvPort
self.fixedRecvSocket.bind(connectionStr)
logging.info("=== fixedRecvSocket connected to " + connectionStr)
self.receivingSocket = context.socket(zmq.PULL)
connectionStr = "tcp://0.0.0.0:" + receivingPort
self.receivingSocket.bind(connectionStr)
logging.info("=== receivingSocket connected to " + connectionStr)
self.receivingSocket2 = context.socket(zmq.PULL)
connectionStr = "tcp://0.0.0.0:" + receivingPort2
self.receivingSocket2.bind(connectionStr)
logging.info("=== receivingSocket2 connected to " + connectionStr)
self.sendSignal("START_STREAM", receivingPort, 1)
self.sendSignal("START_STREAM", receivingPort2, 0)
self.run()
def sendSignal(self, signal, ports, prio = None):
logging.info("=== sendSignal : " + signal + ", " + str(ports))
sendMessage = ["0.0.1", signal]
targets = []
if type(ports) == list:
for port in ports:
targets.append(["localhost:" + port, prio])
else:
targets.append(["localhost:" + ports, prio])
targets = cPickle.dumps(targets)
sendMessage.append(targets)
self.comSocket.send_multipart(sendMessage)
receivedMessage = self.comSocket.recv()
logging.info("=== Responce : " + receivedMessage )
def run(self):
try:
while True:
recv_message = self.fixedRecvSocket.recv_multipart()
logging.info("=== received fixed: " + str(cPickle.loads(recv_message[0])))
recv_message = self.receivingSocket.recv_multipart()
logging.info("=== received: " + str(cPickle.loads(recv_message[0])))
recv_message = self.receivingSocket2.recv_multipart()
logging.info("=== received 2: " + str(cPickle.loads(recv_message[0])))
except KeyboardInterrupt:
pass
def __exit__(self):
self.receivingSocket.close(0)
self.receivingSocket2.close(0)
context.destroy()
if __name__ == '__main__':
......@@ -268,7 +337,6 @@ if __name__ == '__main__':
if test:
import time
import cPickle
from shutil import copyfile
from subprocess import call
......@@ -277,75 +345,16 @@ if __name__ == '__main__':
#enable logging
helpers.initLogging(logfile, verbose=True, onScreenLogLevel="debug")
class Test_Receiver_Stream():
def __init__(self, comPort, fixedRecvPort, receivingPort, receivingPort2):
context = zmq.Context.instance()
self.comSocket = context.socket(zmq.REQ)
connectionStr = "tcp://zitpcx19282:" + comPort
self.comSocket.connect(connectionStr)
logging.info("=== comSocket connected to " + connectionStr)
self.fixedRecvSocket = context.socket(zmq.PULL)
connectionStr = "tcp://0.0.0.0:" + fixedRecvPort
self.fixedRecvSocket.bind(connectionStr)
logging.info("=== fixedRecvSocket connected to " + connectionStr)
self.receivingSocket = context.socket(zmq.PULL)
connectionStr = "tcp://0.0.0.0:" + receivingPort
self.receivingSocket.bind(connectionStr)
logging.info("=== receivingSocket connected to " + connectionStr)
self.receivingSocket2 = context.socket(zmq.PULL)
connectionStr = "tcp://0.0.0.0:" + receivingPort2
self.receivingSocket2.bind(connectionStr)
logging.info("=== receivingSocket2 connected to " + connectionStr)
self.sendSignal("START_STREAM", receivingPort, 1)
self.sendSignal("START_STREAM", receivingPort2, 0)
self.run()
def sendSignal(self, signal, ports, prio = None):
logging.info("=== sendSignal : " + signal + ", " + str(ports))
sendMessage = ["0.0.1", signal]
targets = []
if type(ports) == list:
for port in ports:
targets.append(["zitpcx19282:" + port, prio])
else:
targets.append(["zitpcx19282:" + ports, prio])
targets = cPickle.dumps(targets)
sendMessage.append(targets)
self.comSocket.send_multipart(sendMessage)
receivedMessage = self.comSocket.recv()
logging.info("=== Responce : " + receivedMessage )
def run(self):
try:
while True:
recv_message = self.fixedRecvSocket.recv_multipart()
logging.info("=== received fixed: " + str(cPickle.loads(recv_message[0])))
recv_message = self.receivingSocket.recv_multipart()
logging.info("=== received: " + str(cPickle.loads(recv_message[0])))
recv_message = self.receivingSocket2.recv_multipart()
logging.info("=== received 2: " + str(cPickle.loads(recv_message[0])))
except KeyboardInterrupt:
pass
def __exit__(self):
self.receivingSocket.close(0)
self.receivingSocket2.close(0)
context.destroy()
comPort = "50000"
fixedRecvPort = "50100"
receivingPort = "50101"
receivingPort2 = "50102"
testPr = Process ( target = Test_Receiver_Stream, args = (comPort, fixedRecvPort, receivingPort, receivingPort2))
logConfig = "test"
testPr = Process ( target = Test_Receiver_Stream, args = (comPort, fixedRecvPort, receivingPort, receivingPort2, logConfig))
testPr.start()
logging.debug("test receiver started")
sourceFile = BASE_PATH + os.sep + "test_file.cbf"
targetFileBase = BASE_PATH + os.sep + "data" + os.sep + "source" + os.sep + "local" + os.sep + "raw" + os.sep
......@@ -359,7 +368,7 @@ if __name__ == '__main__':
i = 100
try:
if sender:
while i <= 110:
while i <= 105:
if test:
time.sleep(0.5)
targetFile = targetFileBase + str(i) + ".cbf"
......@@ -375,6 +384,7 @@ if __name__ == '__main__':
logging.error("Exception detected: " + str(e))
finally:
if test:
time.sleep(3)
testPr.terminate()
for number in range(100, i):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment