Commit 91850fe8 authored by incardon's avatar incardon
Browse files

Fixing Async

parent 422bc7d5
Pipeline #3320 passed with stages
in 8 minutes and 11 seconds
......@@ -163,13 +163,15 @@ class Vcluster_base
//! request id
size_t rid[NQUEUE];
//! NBX comunication on queue (-1 mean 0, 0 mean 1, 1 mean 2, .... )
unsigned int NBX_prc_qcnt = -1;
//! Is the barrier request reached
unsigned int NBX_prc_qcnt = 0;
bool NBX_prc_reached_bar_req[NQUEUE];
////// Status variables for NBX send with known/unknown processors
unsigned int NBX_prc_cnt_base = 0;
int NBX_prc_cnt_base = 0;
size_t NBX_prc_n_send[NQUEUE];
size_t * NBX_prc_prc[NQUEUE];
void ** NBX_prc_ptr[NQUEUE];
......@@ -823,6 +825,7 @@ public:
void * ptr_arg,
long int opt=NONE)
{
NBX_prc_qcnt++;
if (NBX_prc_qcnt >= NQUEUE)
{
std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
......@@ -844,7 +847,6 @@ public:
NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN;
if (NBX_prc_qcnt == 0)
{NBX_prc_cnt_base = NBX_cnt;}
NBX_prc_qcnt++;
}
/*! \brief Send and receive multiple messages
......@@ -891,17 +893,18 @@ public:
#ifdef SE_CLASS1
checkType<typename T::value_type>();
#endif
// resize the pointer list
ptr_send[NBX_prc_qcnt].resize(prc.size());
sz_send[NBX_prc_qcnt].resize(prc.size());
ptr_send[NBX_prc_qcnt+1].resize(prc.size());
sz_send[NBX_prc_qcnt+1].resize(prc.size());
for (size_t i = 0 ; i < prc.size() ; i++)
{
ptr_send[NBX_prc_qcnt].get(i) = data.get(i).getPointer();
sz_send[NBX_prc_qcnt].get(i) = data.get(i).size() * sizeof(typename T::value_type);
ptr_send[NBX_prc_qcnt+1].get(i) = data.get(i).getPointer();
sz_send[NBX_prc_qcnt+1].get(i) = data.get(i).size() * sizeof(typename T::value_type);
}
sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send[NBX_prc_qcnt].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt].getPointer(),msg_alloc,ptr_arg,opt);
sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send[NBX_prc_qcnt+1].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt+1].getPointer(),msg_alloc,ptr_arg,opt);
}
/*! \brief Send and receive multiple messages asynchronous version
......@@ -953,16 +956,16 @@ public:
checkType<typename T::value_type>();
#endif
// resize the pointer list
ptr_send[NBX_prc_qcnt].resize(prc.size());
sz_send[NBX_prc_qcnt].resize(prc.size());
ptr_send[NBX_prc_qcnt+1].resize(prc.size());
sz_send[NBX_prc_qcnt+1].resize(prc.size());
for (size_t i = 0 ; i < prc.size() ; i++)
{
ptr_send[NBX_prc_qcnt].get(i) = data.get(i).getPointer();
sz_send[NBX_prc_qcnt].get(i) = data.get(i).size() * sizeof(typename T::value_type);
ptr_send[NBX_prc_qcnt+1].get(i) = data.get(i).getPointer();
sz_send[NBX_prc_qcnt+1].get(i) = data.get(i).size() * sizeof(typename T::value_type);
}
sendrecvMultipleMessagesNBXAsync(prc.size(),(size_t *)sz_send[NBX_prc_qcnt].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt].getPointer(),msg_alloc,ptr_arg,opt);
sendrecvMultipleMessagesNBXAsync(prc.size(),(size_t *)sz_send[NBX_prc_qcnt+1].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt+1].getPointer(),msg_alloc,ptr_arg,opt);
}
/*! \brief Send and receive multiple messages
......@@ -1092,6 +1095,7 @@ public:
size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t, size_t,void *),
void * ptr_arg, long int opt=NONE)
{
NBX_prc_qcnt++;
if (NBX_prc_qcnt >= NQUEUE)
{
std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
......@@ -1113,7 +1117,6 @@ public:
NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN;
if (NBX_prc_qcnt == 0)
{NBX_prc_cnt_base = NBX_cnt;}
NBX_prc_qcnt++;
}
openfpm::vector<size_t> sz_recv_tmp;
......@@ -1255,6 +1258,7 @@ public:
void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
void * ptr_arg, long int opt=NONE)
{
NBX_prc_qcnt++;
if (NBX_prc_qcnt >= NQUEUE)
{
std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
......@@ -1285,7 +1289,6 @@ public:
NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN_PRC;
if (NBX_prc_qcnt == 0)
{NBX_prc_cnt_base = NBX_cnt;}
NBX_prc_qcnt++;
}
/*! \brief Send and receive multiple messages
......@@ -1339,6 +1342,7 @@ public:
#endif
NBX_prc_qcnt++;
if (NBX_prc_qcnt != 0)
{
std::cout << __FILE__ << ":" << __LINE__ << " error there are some asynchronous call running you have to complete them before go back to synchronous" << std::endl;
......@@ -1381,6 +1385,7 @@ public:
// Circular counter
NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
NBX_prc_qcnt = -1;
#ifdef VCLUSTER_PERF_REPORT
nbx_timer.stop();
......@@ -1437,6 +1442,7 @@ public:
void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
void * ptr_arg, long int opt = NONE)
{
NBX_prc_qcnt++;
queue_all_sends(n_send,sz,prc,ptr);
this->NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
......@@ -1450,7 +1456,6 @@ public:
log.start(10);
if (NBX_prc_qcnt == 0)
{NBX_prc_cnt_base = NBX_cnt;}
NBX_prc_qcnt++;
return;
}
......@@ -1527,7 +1532,7 @@ public:
}
NBX_prc_qcnt = 0;
NBX_prc_qcnt = -1;
return;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment