Commit d1123fca authored by Aryaman Gupta's avatar Aryaman Gupta

Basic communication of timestep with remote client

parent 8e5ec9fc
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
NEWS 100644 → 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
This diff is collapsed.
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
......@@ -3,7 +3,7 @@ LINKLIBS = $(DEFAULT_LIB) $(PTHREAD_LIBS) $(OPT_LIBS) $(HDF5_LDFLAGS) $(HDF5_LIB
noinst_PROGRAMS = vcluster_test
vcluster_test_SOURCES = main.cpp VCluster/VCluster.cpp ../../openfpm_devices/src/memory/HeapMemory.cpp ../../openfpm_devices/src/memory/PtrMemory.cpp ../../openfpm_devices/src/Memleak_check.cpp
vcluster_test_CXXFLAGS = $(AM_CXXFLAGS) $(INCLUDES_PATH) $(BOOST_CPPFLAGS)
vcluster_test_CXXFLAGS = $(BOOST_CPPFLAGS) $(AM_CXXFLAGS) $(INCLUDES_PATH)
vcluster_test_CFLAGS = $(CUDA_CFLAGS)
vcluster_test_LDADD = $(LINKLIBS)
......
......@@ -20,6 +20,11 @@ size_t tot_recv = 0;
std::string program_name;
int phase = 0;
MPI_Request startMsg;
MPI_Request configMsg;
MPI_Request timeStepMsg;
// Segmentation fault signal handler
void bt_sighandler(int sig, siginfo_t * info, void * ctx_p)
{
......@@ -32,3 +37,50 @@ void bt_sighandler(int sig, siginfo_t * info, void * ctx_p)
exit(0);
}
void doInSituVis(int timeStep)
{
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
if(phase!=0 && phase!=1 && world_rank == 1) {
MPI_Isend(&timeStep, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &timeStepMsg);
printf("%d: I have sent the timestep %d\n", world_rank, timeStep);
}
int recvdNum;
int flag = 0;
int data;
if(phase == 0) {
printf("In phase 0!%d\n", world_rank);
MPI_Ibcast(&recvdNum, 1, MPI_INT, 0, MPI_COMM_WORLD, &startMsg);
phase = 1;
}
if(phase == 1) {
printf("In phase 1!%d\n", world_rank);
MPI_Test(&startMsg, &flag, MPI_STATUS_IGNORE);
printf("Ran the test!%d\n", world_rank);
if(flag!=0) {
printf("%d:I know In Situ Visualization has started! \n", world_rank);
phase = 2;
}
}
if(phase == 2) {
MPI_Ibcast(&data, 1, MPI_INT, 0, MPI_COMM_WORLD, &configMsg);
phase = 3;
}
if(phase == 3) {
MPI_Test(&configMsg, &flag, MPI_STATUS_IGNORE);
if(flag!=0) {
printf("%d: I have received the Config Change: %d\n", world_rank, data);
// Do stuff with the config change
phase = 2;
}
}
}
\ No newline at end of file
......@@ -11,6 +11,11 @@
#include <signal.h>
#include "VCluster_base.hpp"
#include "VCluster_meta_function.hpp"
#include <zmq.hpp>
extern MPI_Request startMsg;
extern MPI_Request configMsg;
extern MPI_Request timeStepMsg;
void bt_sighandler(int sig, siginfo_t * info, void * ctx);
......@@ -850,12 +855,18 @@ static inline void openfpm_finalize()
ofp_initialized = false;
}
void doInSituVis(int timeStep);
/*! \brief Initialize a global instance of Runtime Virtual Cluster Machine
*
* Initialize a global instance of Runtime Virtual Cluster Machine
*
*/
static inline void init_global_v_cluster_private(int *argc, char ***argv, init_options option)
{
global_option = option;
......@@ -884,18 +895,142 @@ static inline void init_global_v_cluster_private(int *argc, char ***argv, init_o
}
else
{
printf("I am node 0\n");
int flag = false;
MPI_Request bar_req;
MPI_Ibarrier(MPI_COMM_WORLD,&bar_req);
//! barrier status
MPI_Status bar_stat;
// char processor_name[MPI_MAX_PROCESSOR_NAME];
// int name_len;
// MPI_Get_processor_name(processor_name, &name_len);
//
// printf("Hello world again from processor %s, rank %d out of %d processors\n",
// processor_name, world_rank, world_size);
// Prepare our context and socket
zmq::context_t context (1);
zmq::socket_t socket (context, ZMQ_REP);
socket.bind ("tcp://*:5555");
zmq::message_t request;
zmq::message_t configChange;
zmq::socket_t requester (context, ZMQ_REQ);
requester.connect ("tcp://localhost:5560");
int r = socket.recv (&request);
std::cout << "0: Received message from client and returned " << r << std::endl;
int number = 10;
int errBcast = MPI_Ibcast(&number, 1, MPI_INT, 0, MPI_COMM_WORLD, &startMsg);
printf("0: Started In Situ Visualization!\n");
zmq::message_t reply (5);
memcpy (reply.data (), "World", 5);
socket.send (reply);
// Initial messaging (setting up of in situ) finished
// sleep(1);
//
// zmq::message_t sendRequest(3);
// int a = 1;
// std::string b = std::to_string(a);
// std::cout << "The string is:" << b << std::endl;
// memcpy(sendRequest.data(), (&b), 3);
// requester.send(sendRequest);
// std::cout << "Sent\n";
//
// zmq::message_t recvdReply;
// requester.recv(&recvdReply);
// std::cout<< "I am after receive\n";
//
// std::string recvdText = std::string(static_cast<char*>(recvdReply.data()), recvdReply.size());
//
//
// std::cout << recvdText;
bool MPIrecv = false;
bool timeStepSent = false;
bool TCPsent = false;
int timeStep = -1;
int flagTimeStep = 0;
while(flag == false)
{
if(MPIrecv == false) {
//A receive has not previously been actvated
MPI_Irecv(&timeStep, 1, MPI_INT, 1, 0, MPI_COMM_WORLD, &timeStepMsg);
MPIrecv = true;
}
else {
// A receive is active and requires testing
MPI_Test(&timeStepMsg, &flagTimeStep, MPI_STATUS_IGNORE);
// printf("The flag status is: %d\n", flagTimeStep);
if(flagTimeStep != 0) {
// Timestep has been received and needs to be communicated to Client
printf("I have received the timestep!\n");
zmq::message_t tStepMsg(8);
std::string tStep = std::to_string(timeStep);
memcpy(tStepMsg.data(), tStep.c_str(), 8);
if(timeStepSent) {
// Already sent, therefore, first receive, then send again
zmq::message_t tempReply(1);
int ret = requester.recv(&tempReply, ZMQ_NOBLOCK);
if(ret == 1) {
printf("Received a response!\n");
requester.send(tStepMsg, ZMQ_DONTWAIT);
printf("Sending the timestep!\n");
}
}
else {
// Sending for the first time
requester.send(tStepMsg, ZMQ_DONTWAIT);
printf("Sending the timestep for the first time!\n");
timeStepSent = true;
}
MPIrecv = false;
}
}
// Timestep conveying part is now over
//Receive the config change
int ret = socket.recv(&configChange, ZMQ_NOBLOCK);
// std::cout<<"Return was: "<<ret<<"\n";
//
// ret = socket.recv(&configChange, ZMQ_NOBLOCK);
// std::cout<<"Return was: "<<ret<<"\n";
//
// ret = socket.recv(&configChange, ZMQ_NOBLOCK);
// std::cout<<"Return was: "<<ret<<"\n";
if(ret == 1) {
// A config change was actually received
std::string recvdText = std::string(static_cast<char*>(configChange.data()), configChange.size());
std::cout << "I have now received " << recvdText <<"\n";
int tempNum = 15;
MPI_Ibcast(&tempNum, 1, MPI_INT, 0, MPI_COMM_WORLD, &configMsg);
printf("0: Broadcasted a configuration message!\n");
zmq::message_t tempSend (1);
memcpy (tempSend.data (), "a", 1);
socket.send (tempSend);
}
MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat));
}
/* while(flag == false)
{
std::cout << "I am node " << rank << std::endl;
sleep(1);
MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat));
}
}*/
openfpm_finalize();
exit(0);
......
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
File mode changed from 100644 to 100755
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment