Commit 3f37237e authored by incardon's avatar incardon
Browse files

Adding optimized communications

parent a463ae1d
......@@ -35,6 +35,9 @@
#define SEND_RECV_BASE 8192
#define GATHER_BASE 24576
#define RECEIVE_KNOWN 4
#define KNOWN_ELEMENT_OR_BYTE 8
// number of vcluster instances
extern size_t n_vcluster;
// Global MPI initialization
......@@ -290,9 +293,6 @@ class Vcluster
//! vector of the size of send buffers
openfpm::vector<size_t> sz_send;
//! sending map
openfpm::vector<size_t> map;
//! Receive buffers
openfpm::vector<BHeapMemory> recv_buf;
......@@ -522,65 +522,135 @@ public:
MPI_IallreduceW<T>::reduce(num,MPI_MIN,req.last());
}
/*! \brief Send and receive multiple messages
*
* It send multiple messages to a set of processors the and receive
* multiple messages from another set of processors, all the processor must call this
* function, NBX is more performant than PCX with more processors (1000+)
* function. In this particular case the receiver know from which processor is going
* to receive.
*
*
* suppose the following situation the calling processor want to communicate
* * 2 vector of 100 integers to processor 1
* * 1 vector of 50 integers to processor 6
* * 1 vector of 48 integers to processor 7
* * 1 vector of 70 integers to processor 8
* * 2 messages of size 100 byte to processor 1
* * 1 message of size 50 byte to processor 6
* * 1 message of size 48 byte to processor 7
* * 1 message of size 70 byte to processor 8
*
* \param prc list of processors you should communicate with [1,1,6,7,8]
* \param n_send number of send for this processor [4]
*
* \param data vector containing the data to send [v=vector<vector<int>>, v.size()=4, T=vector<int>], T at the moment
* is only tested for vectors of 0 or more generic elements (without pointers)
* \param prc list of processor with which it should communicate
* [1,1,6,7,8]
*
* \param msg_alloc This is a call-back with the purpose to allocate space
* for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by
* \param sz the array contain the size of the message for each processor
* (zeros must not be presents) [100,100,50,48,70]
*
* \param ptr array that contain the pointers to the message to send
*
* \param msg_alloc This is a call-back with the purpose of allocate space
* for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
* the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
* the following 6 parameters
* in the call-back in order:
* * message size required to receive the message (100)
* in the call-back are in order:
* * message size required to receive the message [100]
* * total message size to receive from all the processors (NBX does not provide this information)
* * the total number of processor want to communicate with you (NBX does not provide this information)
* * processor id (5)
* * processor id [5]
* * ri request id (it is an id that goes from 0 to total_p, and is incremented
* every time message_alloc is called)
* * void pointer, parameter for additional data to pass to the call-back
*
* \param ptr_arg data passed to the call-back function specified
*
* \param opt options, only NONE supported
* \param opt options, NONE (ignored in this moment)
*
*/
template<typename T> void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE)
template<typename T> void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, openfpm::vector< T > & data, openfpm::vector< size_t > prc_recv, openfpm::vector< size_t > & recv_sz ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE)
{
#ifdef DEBUG
checkType<typename T::value_type>();
#endif
// resize the pointer list
ptr_send.resize(prc.size());
sz_send.resize(prc.size());
// Allocate the buffers
for (size_t i = 0 ; i < prc.size() ; i++)
send(prc.get(i),SEND_SPARSE + NBX_cnt,data.get(i).getPointer(),data.get(i).size());
for (size_t i = 0 ; i < prc_recv.size() ; i++)
{
ptr_send.get(i) = data.get(i).getPointer();
sz_send.get(i) = data.get(i).size() * sizeof(typename T::value_type);
void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,ptr_arg);
recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt,ptr_recv,recv_sz.get(i));
}
sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt);
execute();
// Circular counter
NBX_cnt = (NBX_cnt + 1) % 1024;
}
/*! \brief Send and receive multiple messages
*
* It send multiple messages to a set of processors the and receive
* multiple messages from another set of processors, all the processor must call this
* function. In this particular case the receiver know from which processor is going
* to receive.
*
*
* suppose the following situation the calling processor want to communicate
* * 2 messages of size 100 byte to processor 1
* * 1 message of size 50 byte to processor 6
* * 1 message of size 48 byte to processor 7
* * 1 message of size 70 byte to processor 8
*
* \param n_send number of send for this processor [4]
*
* \param prc list of processor with which it should communicate
* [1,1,6,7,8]
*
* \param sz the array contain the size of the message for each processor
* (zeros must not be presents) [100,100,50,48,70]
*
* \param ptr array that contain the pointers to the message to send
*
* \param msg_alloc This is a call-back with the purpose of allocate space
* for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
* the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
* the following 6 parameters
* in the call-back are in order:
* * message size required to receive the message [100]
* * total message size to receive from all the processors (NBX does not provide this information)
* * the total number of processor want to communicate with you (NBX does not provide this information)
* * processor id [5]
* * ri request id (it is an id that goes from 0 to total_p, and is incremented
* every time message_alloc is called)
* * void pointer, parameter for additional data to pass to the call-back
*
* \param ptr_arg data passed to the call-back function specified
*
* \param opt options, NONE (ignored in this moment)
*
*/
void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], size_t n_recv, size_t prc_recv[] , size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE)
{
// Allocate the buffers
for (size_t i = 0 ; i < n_send ; i++)
send(prc[i],SEND_SPARSE + NBX_cnt,ptr[i],sz[i]);
for (size_t i = 0 ; i < n_recv ; i++)
{
void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,ptr_arg);
recv(prc_recv[i],SEND_SPARSE + NBX_cnt,ptr_recv,sz_recv[i]);
}
execute();
// Circular counter
NBX_cnt = (NBX_cnt + 1) % 1024;
}
/*! \brief Send and receive multiple messages
*
* It sends multiple messages to a set of processors and receives
* It send multiple messages to a set of processors the and receive
* multiple messages from another set of processors, all the processor must call this
* function, NBX is more performant than PCX with more processors (1000+)
* function
*
* suppose the following situation the calling processor want to communicate
* * 2 vector of 100 integers to processor 1
......@@ -598,10 +668,10 @@ public:
* the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
* the following 6 parameters
* in the call-back in order:
* * message size required to receive the message [100]
* * message size required to receive the message (100)
* * total message size to receive from all the processors (NBX does not provide this information)
* * the total number of processor want to communicate with you (NBX does not provide this information)
* * processor id [5]
* * processor id (5)
* * ri request id (it is an id that goes from 0 to total_p, and is incremented
* every time message_alloc is called)
* * void pointer, parameter for additional data to pass to the call-back
......@@ -611,21 +681,11 @@ public:
* \param opt options, only NONE supported
*
*/
template<typename T> void sendrecvMultipleMessagesPCX(openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE)
template<typename T> void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE)
{
#ifdef DEBUG
checkType<typename T::value_type>();
#endif
// resize map with the number of processors
map.resize(size);
// reset the sending buffer
map.fill(0);
// create sending map
for (size_t i = 0 ; i < prc.size() ; i++)
{map.get(prc.get(i)) = 1;}
// resize the pointer list
ptr_send.resize(prc.size());
sz_send.resize(prc.size());
......@@ -636,14 +696,16 @@ public:
sz_send.get(i) = data.get(i).size() * sizeof(typename T::value_type);
}
sendrecvMultipleMessagesPCX(prc.size(),(size_t *)map.getPointer(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt);
sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt);
}
/*! \brief Send and receive multiple messages
*
* It send multiple messages to a set of processors the and receive
* multiple messages from another set of processors, all the processor must call this
* function, NBX is more performant than PCX with more processors (1000+)
* function
*
* suppose the following situation the calling processor want to communicate
* * 2 messages of size 100 byte to processor 1
......@@ -682,7 +744,7 @@ public:
void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt = NONE)
{
if (stat.size() != 0 || req.size() != 0)
std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress \n";
std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress. Please remember that id you use function like max(),sum(),send(),recv() check that you did not miss to call the function execute() \n";
stat.clear();
......@@ -784,152 +846,6 @@ public:
NBX_cnt = (NBX_cnt + 1) % 1024;
}
/*! \brief Send and receive multiple messages
*
* It send multiple messages to a set of processors the and receive
* multiple messages from another set of processors, all the processor must call this
* function, NBX is more performant than PCX with more processors (1000+)
*
* suppose the following situation the calling processor want to communicate
* * 2 messages of size 100 byte to processor 1
* * 1 message of size 50 byte to processor 6
* * 1 message of size 48 byte to processor 7
* * 1 message of size 70 byte to processor 8
*
* \param n_send number of send for this processor [4]
*
* \param prc list of processor with which it should communicate
* [1,1,6,7,8]
*
* \param map array containing an array of the number of messages for
* each processor the colling processor want to communicate
* [0 2 0 0 0 0 1 1 1]
*
* \param sz the array contain the size of the message for each processor
* (zeros must not be presents) [100,100,50,48,70]
*
* \param ptr array that contain the pointers to the message to send
*
* \param msg_alloc This is a call-back with the purpose of allocate space
* for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
* the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
* the following 6 parameters
* in the call-back are in order:
* * message size required to receive the message [100]
* * total message size to receive from all the processors
* * the total number of processor want to communicate with the calling processor
* * processor id [5]
* * ri request id (it is an id that goes from 0 to total_p, and is incremented
* every time message_alloc is called)
* * void pointer, parameter for additional data to pass to the call-back
*
* \param ptr_arg pointer passed to the call-back function
*
* \param opt options, NONE (ignored in this moment)
*
*/
void sendrecvMultipleMessagesPCX(size_t n_send, size_t * map, size_t sz[], size_t prc[] , void * ptr[], void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt)
{
if (stat.size() != 0 || req.size() != 0)
std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress \n";
stat.clear();
req.clear();
req.add();
stat.add();
proc_com.resize(1);
MPI_SAFE_CALL(MPI_Ireduce_scatter(map,&proc_com.get(0),&map_scatter.get(0),MPI_UNSIGNED_LONG,MPI_SUM,MPI_COMM_WORLD,&req.last()));
MPI_SAFE_CALL(MPI_Waitall(req.size(),&req.get(0),&stat.get(0)));
// Remove the executed request
req.clear();
stat.clear();
// Allocate the temporal buffer to store the message size for each processor
size_t n_proc_com = proc_com.get(0);
proc_com.resize(n_proc_com * 2);
// queue n_proc_com MPI_Irecv with source ANY_SOURCE to get
// the message length from each processor and
// send the message length to each processor
for (size_t i = 0 ; i < n_proc_com ; i++)
{
req.add();
MPI_SAFE_CALL(MPI_Irecv(&proc_com.get(i),1,MPI_UNSIGNED_LONG,MPI_ANY_SOURCE,MSG_LENGTH,MPI_COMM_WORLD,&req.last()));
}
for (size_t i = 0 ; i < n_send ; i++)
{
req.add();
MPI_SAFE_CALL(MPI_Isend(&sz[i],1,MPI_UNSIGNED_LONG,prc[i],MSG_LENGTH,MPI_COMM_WORLD,&req.last()));
}
stat.resize(req.size());
if (req.size() != 0) {MPI_SAFE_CALL(MPI_Waitall(req.size(),&req.get(0),&stat.get(0)));}
// Use proc_com to get the processor id that try to communicate
for (size_t i = 0 ; i < n_proc_com ; i++)
{
proc_com.get(n_proc_com+i) = stat.get(i).MPI_SOURCE;
}
// Remove the executed request
req.clear();
stat.clear();
// Calculate the total size of the message
size_t total_msg = 0;
for (size_t i = 0 ; i < n_proc_com ; i++)
{
total_msg += proc_com.get(i);
}
// Receive the message
for (size_t i = 0 ; i < n_proc_com ; i++)
{
void * ptr = msg_alloc(proc_com.get(i),total_msg,n_proc_com,proc_com.get(n_proc_com+i),i,ptr_arg);
req.add();
#ifdef SE_CLASS2
check_valid(ptr,proc_com.get(i));
#endif
MPI_SAFE_CALL(MPI_Irecv(ptr,proc_com.get(i),MPI_BYTE,proc_com.get(i+n_proc_com),MSG_SEND_RECV,MPI_COMM_WORLD,&req.last()));
}
// Send all the messages this processor must send
for (size_t i = 0 ; i < n_send ; i++)
{
req.add();
#ifdef SE_CLASS2
check_valid(ptr[i],sz[i]);
#endif
MPI_SAFE_CALL(MPI_Isend(ptr[i],sz[i],MPI_BYTE,prc[i],MSG_SEND_RECV,MPI_COMM_WORLD,&req.last()));
}
stat.resize(req.size());
if (req.size() != 0) {MPI_SAFE_CALL(MPI_Waitall(req.size(),&req.get(0),&stat.get(0)));}
// Remove the executed request
req.clear();
stat.clear();
}
/*! \brief Send data to a processor
*
* \warning In order to avoid deadlock every send must be coupled with a recv
......@@ -1000,11 +916,11 @@ public:
*
* \warning In order to avoid deadlock every recv must be coupled with a send
* in case you want to send data without knowledge from the other side
* consider to use sendrecvMultipleMessagesPCX or sendrecvMultipleMessagesNBX
* consider to use or sendrecvMultipleMessagesNBX
*
* \warning operation is asynchronous execute must be called to ensure they are executed
*
* \see sendrecvMultipleMessagesPCX sendrecvMultipleMessagesNBX
* \see sendrecvMultipleMessagesNBX
*
* \param proc processor id
* \param tag id
......@@ -1031,11 +947,11 @@ public:
*
* \warning In order to avoid deadlock every recv must be coupled with a send
* in case you want to send data without knowledge from the other side
* consider to use sendrecvMultipleMessagesPCX sendrecvMultipleMessagesNBX
* consider to use sendrecvMultipleMessagesNBX
*
* \warning operation is asynchronous execute must be called to ensure they are executed
*
* \see sendrecvMultipleMessagesPCX sendrecvMultipleMessagesNBX
* \see sendrecvMultipleMessagesNBX
*
* \param proc processor id
* \param tag id
......@@ -1171,6 +1087,19 @@ static inline void openfpm_init(int *argc, char ***argv)
#endif
init_global_v_cluster_private(argc,argv);
#ifdef SE_CLASS1
std::cout << "OpenFPM is compiled with debug mode LEVEL:1. Remember to remove SE_CLASS1 when you go in production" << std::endl;
#endif
#ifdef SE_CLASS2
std::cout << "OpenFPM is compiled with debug mode LEVEL:2. Remember to remove SE_CLASS2 when you go in production" << std::endl;
#endif
ofp_initialized = true;
}
......
......@@ -9,12 +9,14 @@
private:
template<bool result, typename T, typename S>
struct unpack_selector_with_prp
{
template<typename op, int ... prp> static void call_unpack(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz, op & op_param)
template<typename op, int ... prp> static void call_unpack(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz, openfpm::vector<size_t> * sz_byte, op & op_param)
{
if (sz_byte != NULL)
sz_byte->resize(recv_buf.size());
for (size_t i = 0 ; i < recv_buf.size() ; i++)
{
T unp;
......@@ -33,8 +35,13 @@ private:
size_t recv_size_new = recv.size();
if (sz_byte != NULL)
sz_byte->get(i) = recv_buf.get(i).size();
if (sz != NULL)
sz->get(i) = recv_size_new - recv_size_old;
mem.decRef();
delete &mem;
}
}
};
......@@ -43,8 +50,11 @@ private:
template<typename T, typename S>
struct unpack_selector_with_prp<true,T,S>
{
template<typename op, unsigned int ... prp> static void call_unpack(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz, op & op_param)
template<typename op, unsigned int ... prp> static void call_unpack(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz, openfpm::vector<size_t> * sz_byte, op & op_param)
{
if (sz_byte != NULL)
sz_byte->resize(recv_buf.size());
for (size_t i = 0 ; i < recv_buf.size() ; i++)
{
// calculate the number of received elements
......@@ -69,6 +79,8 @@ private:
size_t recv_size_new = recv.size();
if (sz_byte != NULL)
sz_byte->get(i) = recv_buf.get(i).size();
if (sz != NULL)
sz->get(i) = recv_size_new - recv_size_old;
}
......@@ -92,11 +104,11 @@ private:
Packer<T,HeapMemory>::template pack<prp...>(mem,send,sts);
}
template<typename op, typename T, typename S> inline static void call_unpack(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz, op & op_param)
template<typename op, typename T, typename S> inline static void call_unpack(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz, openfpm::vector<size_t> * sz_byte, op & op_param)
{
const bool result = has_pack_gen<typename T::value_type>::value == false && is_vector<T>::value == true;
//const bool result = has_pack<typename T::value_type>::type::value == false && has_pack_agg<typename T::value_type>::result::value == false && is_vector<T>::value == true;
unpack_selector_with_prp<result, T, S>::template call_unpack<op,prp...>(recv, recv_buf, sz, op_param);
unpack_selector_with_prp<result, T, S>::template call_unpack<op,prp...>(recv, recv_buf, sz, sz_byte, op_param);
}
};
......@@ -136,10 +148,10 @@ private:
}
}
static void unpacking(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz, op & op_param)
static void unpacking(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz, openfpm::vector<size_t> * sz_byte, op & op_param)
{
typedef index_tuple<prp...> ind_prop_to_pack;
call_serialize_variadic<ind_prop_to_pack>::template call_unpack<op,T,S>(recv, recv_buf, sz, op_param);
call_serialize_variadic<ind_prop_to_pack>::template call_unpack<op,T,S>(recv, recv_buf, sz, sz_byte, op_param);
}
};
......@@ -149,17 +161,16 @@ private:
template<int ... prp>
struct index_gen<index_tuple<prp...>>
{
template<typename op, typename T, typename S> inline static void process_recv(Vcluster & vcl, S & recv, openfpm::vector<size_t> * sz_recv, op & op_param)
template<typename op, typename T, typename S> inline static void process_recv(Vcluster & vcl, S & recv, openfpm::vector<size_t> * sz_recv, openfpm::vector<size_t> * sz_recv_byte, op & op_param)
{
vcl.process_receive_buffer_with_prp<op,T,S,prp...>(recv,sz_recv,op_param);
vcl.process_receive_buffer_with_prp<op,T,S,prp...>(recv,sz_recv,sz_recv_byte,op_param);
}
};
template<typename op, typename T, typename S> void prepare_send_buffer(openfpm::vector<T> & send, S & recv, openfpm::vector<size_t> & prc_send, openfpm::vector<size_t> & prc_recv, openfpm::vector<size_t> & sz_recv)
template<typename op, typename T, typename S> void prepare_send_buffer(openfpm::vector<T> & send, S & recv, openfpm::vector<size_t> & prc_send, openfpm::vector<size_t> & prc_recv, openfpm::vector<size_t> & sz_recv, size_t opt)
{
prc_recv.clear();
sz_recv.clear();
openfpm::vector<size_t> sz_recv_byte(sz_recv.size());
// Reset the receive buffer
reset_recv_buf();
......@@ -173,8 +184,7 @@ template<typename op, typename T, typename S> void prepare_send_buffer(openfpm::
// Prepare the sending buffer
openfpm::vector<const void *> send_buf;
openfpm::vector<size_t> sz_byte;
openfpm::vector<size_t> send_sz_byte;
size_t tot_size = 0;
......@@ -183,7 +193,7 @@ template<typename op, typename T, typename S> void prepare_send_buffer(openfpm::
size_t req = 0;
//Pack requesting
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op, T, S>::packingRequest(send.get(i), req, sz_byte);
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op, T, S>::packingRequest(send.get(i), req, send_sz_byte);
tot_size += req;
}
......@@ -199,17 +209,41 @@ template<typename op, typename T, typename S> void prepare_send_buffer(openfpm::
Pack_stat sts;
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value, op, T, S>::packing(mem, send.get(i), sts, send_buf);
}
// receive information
base_info bi(&recv_buf,prc_recv,sz_recv);
base_info bi(&recv_buf,prc_recv,sz_recv_byte);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
if (opt & RECEIVE_KNOWN)
{
// We we are passing the number of element but not the byte, calculate the byte
if (opt & KNOWN_ELEMENT_OR_BYTE)
{
// We know the number of element convert to byte (ONLY if it is possible)
if (has_pack_gen<typename T::value_type>::value == false && is_vector<T>::value == true)
{
for (size_t i = 0 ; i < sz_recv.size() ; i++)
sz_recv_byte.get(i) = sz_recv.get(i) * sizeof(typename T::value_type);
}
else
std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option RECEIVE_KNOWN or NO_CHANGE_ELEMENTS" << std::endl;
}
sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte.getPointer(),msg_alloc_known,(void *)&bi);
}
else
{
prc_recv.clear();
sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
}
// Reorder the buffer
reorder_buffer(prc_recv,sz_recv);
reorder_buffer(prc_recv,sz_recv_byte);
mem.decRef();
delete &mem;
}
......@@ -279,6 +313,38 @@ static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i
return rinfo.recv_buf->last().getPointer();
}
/*! \brief Call-back to allocate buffer to receive data
*
* \param msg_i size required to receive the message from i
* \param total_msg total size to receive from all the processors
* \param total_p the total number of processor that want to communicate with you
* \param i processor id
* \param ri request id (it is an id that goes from 0 to total_p, and is unique
* every time message_alloc is called)
* \param ptr a pointer to the vector_dist structure
*
* \return the pointer where to store the message for the processor i
*
*/
static void * msg_alloc_known(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
{
base_info & rinfo = *(base_info *)ptr;