Commit 7c68ec7f authored by incardon's avatar incardon

SSendRecv adapting for GPU

parent faa1d114
......@@ -91,6 +91,7 @@ class Vcluster: public Vcluster_base
// Prepare the sending buffer
openfpm::vector<const void *> send_buf;
openfpm::vector<size_t> send_sz_byte;
openfpm::vector<size_t> prc_send_;
size_t tot_size = 0;
......@@ -103,6 +104,8 @@ class Vcluster: public Vcluster_base
tot_size += req;
}
pack_unpack_cond_with_prp_inte_lin<T>::construct_prc(prc_send,prc_send_);
HeapMemory pmem;
ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem));
......@@ -117,8 +120,10 @@ class Vcluster: public Vcluster_base
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value, op, T, S, layout_base>::packing(mem, send.get(i), sts, send_buf);
}
tags.clear();
// receive information
base_info bi(&recv_buf,prc_recv,sz_recv_byte);
base_info bi(&recv_buf,prc_recv,sz_recv_byte,tags);
// Send and recv multiple messages
if (opt & RECEIVE_KNOWN)
......@@ -142,11 +147,11 @@ class Vcluster: public Vcluster_base
else
{
prc_recv.clear();
sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
sendrecvMultipleMessagesNBX(prc_send_.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
}
// Reorder the buffer
reorder_buffer(prc_recv,sz_recv_byte);
reorder_buffer(prc_recv,tags,sz_recv_byte);
mem.decRef();
delete &mem;
......@@ -180,10 +185,12 @@ class Vcluster: public Vcluster_base
openfpm::vector<size_t> & prc;
//! size of each message
openfpm::vector<size_t> & sz;
//! tags
openfpm::vector<size_t> &tags;
//! constructor
base_info(openfpm::vector<BHeapMemory> * recv_buf, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz)
:recv_buf(recv_buf),prc(prc),sz(sz)
base_info(openfpm::vector<BHeapMemory> * recv_buf, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, openfpm::vector<size_t> & tags)
:recv_buf(recv_buf),prc(prc),sz(sz),tags(tags)
{}
};
......@@ -200,7 +207,7 @@ class Vcluster: public Vcluster_base
* \return the pointer where to store the message for the processor i
*
*/
static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
{
base_info & rinfo = *(base_info *)ptr;
......@@ -217,6 +224,7 @@ class Vcluster: public Vcluster_base
// Receive info
rinfo.prc.add(i);
rinfo.sz.add(msg_i);
rinfo.tags.add(tag);
// return the pointer
return rinfo.recv_buf->last().getPointer();
......@@ -236,7 +244,7 @@ class Vcluster: public Vcluster_base
* \return the pointer where to store the message for the processor i
*
*/
static void * msg_alloc_known(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
static void * msg_alloc_known(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
{
base_info & rinfo = *(base_info *)ptr;
......@@ -382,8 +390,10 @@ class Vcluster: public Vcluster_base
// remain buffer with size 0
openfpm::vector<size_t> send_req;
tags.clear();
// receive information
base_info bi(&recv_buf,prc,sz);
base_info bi(&recv_buf,prc,sz,tags);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
......@@ -428,8 +438,10 @@ class Vcluster: public Vcluster_base
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packing(mem, send, sts, send_buf);
tags.clear();
// receive information
base_info bi(NULL,prc,sz);
base_info bi(NULL,prc,sz,tags);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(send_prc.size(),(size_t *)sz.getPointer(),(size_t *)send_prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi,NONE);
......@@ -491,8 +503,10 @@ class Vcluster: public Vcluster_base
ptr += sz.get(i);
}
tags.clear();
// receive information
base_info bi(&recv_buf,prc,sz);
base_info bi(&recv_buf,prc,sz,tags);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
......@@ -510,8 +524,10 @@ class Vcluster: public Vcluster_base
// The non-root receive
openfpm::vector<size_t> send_req;
tags.clear();
// receive information
base_info bi(&recv_buf,prc,sz);
base_info bi(&recv_buf,prc,sz,tags);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
......@@ -534,7 +550,7 @@ class Vcluster: public Vcluster_base
* \param sz_recv list of size of the receiving messages (in byte)
*
*/
void reorder_buffer(openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz_recv)
void reorder_buffer(openfpm::vector<size_t> & prc, openfpm::vector<size_t> tags, openfpm::vector<size_t> & sz_recv)
{
struct recv_buff_reorder
......@@ -542,18 +558,23 @@ class Vcluster: public Vcluster_base
//! processor
size_t proc;
size_t tag;
//! position in the receive list
size_t pos;
//! default constructor
recv_buff_reorder()
:proc(0),pos(0)
:proc(0),tag(0),pos(0)
{};
//! needed to reorder
bool operator<(const recv_buff_reorder & rd) const
{
return proc < rd.proc;
if (proc == rd.proc)
{return tag < rd.tag;}
return (proc < rd.proc);
}
};
......@@ -564,6 +585,7 @@ class Vcluster: public Vcluster_base
for (size_t i = 0 ; i < rcv.size() ; i++)
{
rcv.get(i).proc = prc.get(i);
rcv.get(i).tag = tags.get(i);
rcv.get(i).pos = i;
}
......
......@@ -184,6 +184,9 @@ protected:
//! Receive buffers
openfpm::vector<BHeapMemory> recv_buf;
//! tags receiving
openfpm::vector<size_t> tags;
public:
// Finalize the MPI program
......@@ -480,18 +483,18 @@ public:
openfpm::vector< T > & data,
openfpm::vector< size_t > prc_recv,
openfpm::vector< size_t > & recv_sz ,
void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *),
void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
void * ptr_arg,
long int opt=NONE)
{
// Allocate the buffers
for (size_t i = 0 ; i < prc.size() ; i++)
send(prc.get(i),SEND_SPARSE + NBX_cnt,data.get(i).getPointer(),data.get(i).size());
{send(prc.get(i),SEND_SPARSE + NBX_cnt,data.get(i).getPointer(),data.get(i).size());}
for (size_t i = 0 ; i < prc_recv.size() ; i++)
{
void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,ptr_arg);
void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,SEND_SPARSE + NBX_cnt,ptr_arg);
recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt,ptr_recv,recv_sz.get(i));
}
......@@ -544,7 +547,11 @@ public:
* \param opt options, NONE (ignored in this moment)
*
*/
void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], size_t n_recv, size_t prc_recv[] , size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE)
void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[],
size_t prc[] , void * ptr[],
size_t n_recv, size_t prc_recv[] ,
size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t, size_t,void *),
void * ptr_arg, long int opt=NONE)
{
// Allocate the buffers
......@@ -553,7 +560,7 @@ public:
for (size_t i = 0 ; i < n_recv ; i++)
{
void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,ptr_arg);
void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,SEND_SPARSE + NBX_cnt,ptr_arg);
recv(prc_recv[i],SEND_SPARSE + NBX_cnt,ptr_recv,sz_recv[i]);
}
......@@ -599,7 +606,10 @@ public:
* \param opt options, only NONE supported
*
*/
template<typename T> void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE)
template<typename T>
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, openfpm::vector< T > & data,
void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
void * ptr_arg, long int opt=NONE)
{
#ifdef SE_CLASS1
checkType<typename T::value_type>();
......@@ -659,7 +669,10 @@ public:
* \param opt options, NONE (ignored in this moment)
*
*/
void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt = NONE)
void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[],
size_t prc[] , void * ptr[],
void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
void * ptr_arg, long int opt = NONE)
{
if (stat.size() != 0 || req.size() != 0)
std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress. Please remember that if you use function like max(),sum(),send(),recv() check that you did not miss to call the function execute() \n";
......@@ -680,7 +693,7 @@ public:
#endif
tot_sent += sz[i];
MPI_SAFE_CALL(MPI_Issend(ptr[i], sz[i], MPI_BYTE, prc[i], SEND_SPARSE + NBX_cnt, MPI_COMM_WORLD,&req.last()));
MPI_SAFE_CALL(MPI_Issend(ptr[i], sz[i], MPI_BYTE, prc[i], SEND_SPARSE + NBX_cnt*131072 + i, MPI_COMM_WORLD,&req.last()));
log.logSend(prc[i]);
}
}
......@@ -701,33 +714,38 @@ public:
MPI_Status stat_t;
int stat = false;
MPI_SAFE_CALL(MPI_Iprobe(MPI_ANY_SOURCE,SEND_SPARSE + NBX_cnt,MPI_COMM_WORLD,&stat,&stat_t));
MPI_SAFE_CALL(MPI_Iprobe(MPI_ANY_SOURCE,MPI_ANY_TAG/*SEND_SPARSE + NBX_cnt*/,MPI_COMM_WORLD,&stat,&stat_t));
// If I have an incoming message and is related to this NBX communication
if (stat == true)
{
// Get the message size
int msize;
// Get the message tag and size
MPI_SAFE_CALL(MPI_Get_count(&stat_t,MPI_BYTE,&msize));
// Get the pointer to receive the message
void * ptr = msg_alloc(msize,0,0,stat_t.MPI_SOURCE,rid,ptr_arg);
// Ok we check if the TAG come from one of our send TAG
if (stat_t.MPI_TAG >= (int)(SEND_SPARSE + NBX_cnt*131072) && stat_t.MPI_TAG < (int)(SEND_SPARSE + (NBX_cnt + 1)*131072))
{
// Get the pointer to receive the message
void * ptr = msg_alloc(msize,0,0,stat_t.MPI_SOURCE,rid,stat_t.MPI_TAG,ptr_arg);
// Log the receiving request
log.logRecv(stat_t);
// Log the receiving request
log.logRecv(stat_t);
rid++;
rid++;
// Check the pointer
// Check the pointer
#ifdef SE_CLASS2
check_valid(ptr,msize);
check_valid(ptr,msize);
#endif
tot_recv += msize;
MPI_SAFE_CALL(MPI_Recv(ptr,msize,MPI_BYTE,stat_t.MPI_SOURCE,SEND_SPARSE+NBX_cnt,MPI_COMM_WORLD,&stat_t));
tot_recv += msize;
MPI_SAFE_CALL(MPI_Recv(ptr,msize,MPI_BYTE,stat_t.MPI_SOURCE,stat_t.MPI_TAG,MPI_COMM_WORLD,&stat_t));
#ifdef SE_CLASS2
check_valid(ptr,msize);
check_valid(ptr,msize);
#endif
}
}
// Check the status of all the MPI_issend and call the barrier if finished
......
......@@ -116,6 +116,60 @@ struct unpack_each_prop_buffer
}
};
/*! \brief this class is a functor for "for_each" algorithm
*
* This class is a functor for "for_each" algorithm. For each
* element of the boost::vector the operator() is called.
* Is mainly used to process the receive buffers in case of memory_traits_inte layout receive
*
* \tparam encap source
* \tparam encap dst
*
*/
template<typename sT, template<typename> class layout_base>
struct process_receive_mem_traits_inte
{
//! set of pointers
size_t i;
//! Receive buffer
openfpm::vector<BHeapMemory> & recv_buf;
//! Fake vector that map over received memory
openfpm::vector<typename sT::value_type,PtrMemory,typename layout_base<typename sT::value_type>::type,layout_base,openfpm::grow_policy_identity> & v2;
size_t n_ele = 0;
/*! \brief constructor
*
* \param v set of pointer buffers to set
*
*/
inline process_receive_mem_traits_inte(openfpm::vector<typename sT::value_type,PtrMemory,typename layout_base<typename sT::value_type>::type,layout_base,openfpm::grow_policy_identity> & v2,
openfpm::vector<BHeapMemory> & recv_buf,
size_t i)
:i(i),recv_buf(recv_buf),v2(v2)
{};
//! It call the copy function for each property
template<typename T>
inline void operator()(T& t)
{
typedef typename boost::mpl::at<typename sT::value_type::type,T>::type type_prp;
// calculate the number of received elements
this->n_ele = recv_buf.get(i).size() / sizeof(type_prp);
// add the received particles to the vector
PtrMemory * ptr1 = new PtrMemory(recv_buf.get(i).getPointer(),recv_buf.get(i).size());
v2.template setMemory<T::value>(*ptr1);
++i;
}
};
template<bool inte_or_lin,typename T, typename S, template<typename> class layout_base>
struct unpack_selector_with_prp_lin
{
......@@ -126,19 +180,14 @@ struct unpack_selector_with_prp_lin
op & op_param,
size_t i)
{
// calculate the number of received elements
size_t n_ele = recv_buf.get(i).size() / sizeof(typename T::value_type);
// add the received particles to the vector
PtrMemory * ptr1 = new PtrMemory(recv_buf.get(i).getPointer(),recv_buf.get(i).size());
// create vector representation to a piece of memory already allocated
openfpm::vector<typename T::value_type,PtrMemory,typename layout_base<typename T::value_type>::type,layout_base,openfpm::grow_policy_identity> v2;
v2.setMemory(*ptr1);
process_receive_mem_traits_inte<T,layout_base> prmti(v2,recv_buf,i);
// resize with the number of elements
v2.resize(n_ele);
boost::mpl::for_each_ref<boost::mpl::range_c<int,0,T::value_type::max_prop>>(prmti);
v2.resize(prmti.n_ele);
// Merge the information
......@@ -287,6 +336,44 @@ struct set_buf_pointer_for_each_prop
}
};
/*! \brief this class is a functor for "for_each" algorithm
*
* This class is a functor for "for_each" algorithm. For each
* element of the boost::vector the operator() is called.
* Is mainly used to copy one encap into another encap object
*
* \tparam encap source
* \tparam encap dst
*
*/
template<typename sT>
struct set_buf_size_for_each_prop
{
//! set of pointers
sT & v;
openfpm::vector<size_t> & sz;
/*! \brief constructor
*
* \param v set of pointer buffers to set
*
*/
inline set_buf_size_for_each_prop(sT & v, openfpm::vector<size_t> & sz)
:v(v),sz(sz)
{};
//! It call the copy function for each property
template<typename T>
inline void operator()(T& t) const
{
typedef typename boost::mpl::at<typename sT::value_type::type,T>::type type_prp;
sz.add(sizeof(type_prp)*v.size());
}
};
template<typename T, bool impl = is_multiple_buffer_each_prp<T>::value >
struct pack_unpack_cond_with_prp_inte_lin
{
......@@ -294,6 +381,19 @@ struct pack_unpack_cond_with_prp_inte_lin
{
send_buf.add(send.getPointer());
}
static void set_size_buffers(T & send, openfpm::vector<size_t> & sz)
{
sz.add(send.size()*sizeof(typename T::value_type));
}
static void construct_prc(openfpm::vector<size_t> & prc_send, openfpm::vector<size_t> & prc_send_)
{
for (size_t i = 0 ; i < prc_send.size() ; i++)
{
prc_send_.add(prc_send.get(i));
}
}
};
// memory_traits_inte
......@@ -306,6 +406,22 @@ struct pack_unpack_cond_with_prp_inte_lin<T,true>
boost::mpl::for_each_ref<boost::mpl::range_c<int,0,T::value_type::max_prop>>(sbp);
}
static void set_size_buffers(T & send, openfpm::vector<size_t> & sz)
{
set_buf_size_for_each_prop<T> sbp(send,sz);
boost::mpl::for_each_ref<boost::mpl::range_c<int,0,T::value_type::max_prop>>(sbp);
}
static void construct_prc(openfpm::vector<size_t> & prc_send, openfpm::vector<size_t> & prc_send_)
{
for (size_t i = 0 ; i < prc_send.size() ; i++)
{
for (size_t j = 0 ; j < T::value_type::max_prop ; j++)
{prc_send_.add(prc_send.get(i));}
}
}
};
//! There is max_prop inside
......@@ -316,9 +432,8 @@ struct pack_unpack_cond_with_prp
{
typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
if (has_pack_gen<typename T::value_type>::value == false && is_vector<T>::value == true)
//if (has_pack<typename T::value_type>::type::value == false && has_pack_agg<typename T::value_type>::result::value == false && is_vector<T>::value == true)
{
sz.add(send.size()*sizeof(typename T::value_type));
pack_unpack_cond_with_prp_inte_lin<T>::set_size_buffers(send,sz);
}
else
{
......@@ -361,10 +476,15 @@ template<bool sr>
struct op_ssend_recv_add_sr
{
//! Add data
template<typename T, typename D, typename S, template<typename> class layout_base, int ... prp> static void execute(D & recv,S & v2, size_t i)
template<typename T, typename D, typename S, template<typename> class layout_base, int ... prp>
static void execute(D & recv,S & v2, size_t i)
{
// Merge the information
recv.template add_prp<typename T::value_type,PtrMemory,openfpm::grow_policy_identity,openfpm::vect_isel<typename T::value_type>::value,prp...>(v2);
recv.template add_prp<typename T::value_type,
PtrMemory,
openfpm::grow_policy_identity,
openfpm::vect_isel<typename T::value_type>::value,
prp...>(v2);
}
};
......@@ -376,7 +496,7 @@ struct op_ssend_recv_add_sr<true>
template<typename T, typename D, typename S, template<typename> class layout_base , int ... prp> static void execute(D & recv,S & v2, size_t i)
{
// Merge the information
recv.template add_prp<typename T::value_type,HeapMemory,openfpm::grow_policy_double,openfpm::vect_isel<typename T::value_type>::value, prp...>(v2);
recv.template add_prp<typename T::value_type,HeapMemory,typename T::grow_policy,openfpm::vect_isel<typename T::value_type>::value, prp...>(v2);
}
};
......
......@@ -1486,8 +1486,9 @@ BOOST_AUTO_TEST_CASE( Vcluster_semantic_ssend_recv_layout_switch )
if (v_cl.size() > 10) {return;}
openfpm::vector<openfpm::vector_gpu<aggregate<float,float[3]>>> vd;
openfpm::vector<openfpm::vector_gpu_single<aggregate<float,float[3]>>> vd;
openfpm::vector_gpu<aggregate<float,float[3]>> collect;
openfpm::vector_gpu<aggregate<float,float[3]>> collect2;
openfpm::vector<size_t> prc_send;
openfpm::vector<size_t> prc_recv;
openfpm::vector<size_t> sz_recv;
......@@ -1506,52 +1507,46 @@ BOOST_AUTO_TEST_CASE( Vcluster_semantic_ssend_recv_layout_switch )
vd.get(i).template get<1>(j)[1] = 400000 + 10000*i + v_cl.rank()*100 + j;
vd.get(i).template get<1>(j)[2] = 400000 + 10000*i + v_cl.rank()*100 + j;
}
prc_send.add(i);
}
v_cl.SSendRecv<openfpm::vector_gpu<aggregate<float,float[3]>>,decltype(collect),memory_traits_inte>(vd,collect,prc_send, prc_recv,sz_recv);
}
v_cl.SSendRecv<openfpm::vector_gpu_single<aggregate<float,float[3]>>,decltype(collect),memory_traits_inte>(vd,collect,prc_send, prc_recv,sz_recv);
v_cl.SSendRecvP<openfpm::vector_gpu_single<aggregate<float,float[3]>>,decltype(collect),memory_traits_inte,0,1>(vd,collect2,prc_send, prc_recv,sz_recv);
/*BOOST_AUTO_TEST_CASE (Vcluster_semantic_bench_all_all)
{
Vcluster & vcl = create_vcluster();
// now we check what we received
if (vcl.getProcessingUnits() >= 32)
return;
// collect must have 100 * v_cl.size()
openfpm::vector<size_t> prc_recv2;
openfpm::vector<size_t> prc_recv3;
openfpm::vector<size_t> prc_send;
openfpm::vector<size_t> sz_recv2;
openfpm::vector<size_t> sz_recv3;
openfpm::vector<openfpm::vector<Box<3,size_t>>> v1;
openfpm::vector<Box<3,size_t>> v2;
openfpm::vector<openfpm::vector<Box<3,size_t>>> v3;
BOOST_REQUIRE_EQUAL(collect.size(),100*v_cl.size());
BOOST_REQUIRE_EQUAL(collect2.size(),100*v_cl.size());
v1.resize(vcl.getProcessingUnits());
for(size_t i = 0 ; i < v1.size() ; i++)
{
for (size_t j = 0 ; j < 1000000 ; j++)
{
Box<3,size_t> b({j,j,j},{j,j,j});
v1.get(i).add(b);
}
// check what we received
prc_send.add(i);
}
bool match = true;
for (size_t i = 0 ; i < v_cl.size() ; i++)
{
for (size_t j = 0 ; j < 100 ; j++)
{
match &= collect.template get<0>(i*100 +j) == v_cl.rank()*10000 + i*100 + j;
timer comm_time;
comm_time.start();
match &= collect.template get<1>(i*100 +j)[0] == 400000 + v_cl.rank()*10000 + i*100 + j;
match &= collect.template get<1>(i*100 +j)[1] == 400000 + v_cl.rank()*10000 + i*100 + j;
match &= collect.template get<1>(i*100 +j)[2] == 400000 + v_cl.rank()*10000 + i*100 + j;
vcl.SSendRecv(v1,v2,prc_send,prc_recv2,sz_recv2);
match &= collect2.template get<0>(i*100 +j) == v_cl.rank()*10000 + i*100 + j;
comm_time.stop();
std::cout << "Communication time " << comm_time.getwct() << std::endl;
match &= collect2.template get<1>(i*100 +j)[0] == 400000 + v_cl.rank()*10000 + i*100 + j;
match &= collect2.template get<1>(i*100 +j)[1] == 400000 + v_cl.rank()*10000 + i*100 + j;
match &= collect2.template get<1>(i*100 +j)[2] == 400000 + v_cl.rank()*10000 + i*100 + j;
}
std::cout << "Total sent: " << tot_sent << " Tot recv: " << tot_recv << std::endl;
if (match == false){break;}
}
std::cout << "END" << std::endl;
}*/
BOOST_REQUIRE_EQUAL(match,true);
}
BOOST_AUTO_TEST_SUITE_END()
......
......@@ -37,7 +37,7 @@ int mod(int x, int m) {
//! [message alloc]
void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i,size_t ri, void * ptr)
void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i,size_t ri, size_t tag, void * ptr)
{
// convert the void pointer argument into a pointer to receiving buffers
openfpm::vector<openfpm::vector<unsigned char>> * v = static_cast<openfpm::vector<openfpm::vector<unsigned char>> *>(ptr);
......@@ -85,7 +85,7 @@ void * msg_alloc2(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size
return &(v->get(id-1).get(0));
}
void * msg_alloc3(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
void * msg_alloc3(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
{
openfpm::vector<openfpm::vector<unsigned char>> * v = static_cast<openfpm::vector<openfpm::vector<unsigned char>> *>(ptr);
......@@ -100,12 +100,12 @@ void * msg_alloc3(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size
return &(v->last().get(0));
}
template<unsigned int ip, typename T> void commFunc(Vcluster & vcl,openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg)
template<unsigned int ip, typename T> void commFunc(Vcluster & vcl,openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg)
{
vcl.sendrecvMultipleMessagesNBX(prc,data,msg_alloc,ptr_arg);
}
template<unsigned int ip, typename T> void commFunc_null_odd(Vcluster & vcl,openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg)
template<unsigned int ip, typename T> void commFunc_null_odd(Vcluster & vcl,openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg)
{
if (vcl.getProcessUnitID() % 2 == 0)
vcl.sendrecvMultipleMessagesNBX(prc,data,msg_alloc,ptr_arg);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment