Commit 83903de6 authored by incardon's avatar incardon
Browse files

Fixing prop serialization

parent b84c798c
......@@ -19,6 +19,7 @@
#include "util/Vcluster_log.hpp"
#include "memory/BHeapMemory.hpp"
#include "Packer_Unpacker/has_max_prop.hpp"
#include "data_type/aggregate.hpp"
#ifdef HAVE_PETSC
#include <petscvec.h>
......@@ -48,18 +49,73 @@ template<typename T> void assign(T * ptr1, T * ptr2)
*ptr1 = *ptr2;
};
template<bool sr>
struct op_ssend_recv_add_sr
{
template<typename T, typename D, typename S, int ... prp> static void execute(D & recv,S & v2, size_t i)
{
// Merge the information
recv.template add_prp<typename T::value_type,PtrMemory,openfpm::grow_policy_identity,openfpm::vect_isel<typename T::value_type>::value,prp...>(v2);
}
};
template<>
struct op_ssend_recv_add_sr<true>
{
template<typename T, typename D, typename S, int ... prp> static void execute(D & recv,S & v2, size_t i)
{
// Merge the information
recv.template add_prp<typename T::value_type,HeapMemory,openfpm::grow_policy_double,openfpm::vect_isel<typename T::value_type>::value, prp...>(v2);
}
};
template<typename op>
struct op_ssend_recv_add
{
template<bool sr, typename T, typename D, typename S, int ... prp> static void execute(D & recv,S & v2, size_t i)
{
// Merge the information
op_ssend_recv_add_sr<sr>::template execute<T,D,S,prp...>(recv,v2,i);
}
};
template<template<typename,typename> class op>
struct op_ssend_recv_merge
{
openfpm::vector<openfpm::vector<aggregate<size_t,size_t>>> & opart;
op_ssend_recv_merge(openfpm::vector<openfpm::vector<aggregate<size_t,size_t>>> & opart)
:opart(opart)
{}
template<bool sr, typename T, typename D, typename S, int ... prp> void execute(D & recv,S & v2,size_t i)
{
// Merge the information
recv.template merge_prp_v<op,typename T::value_type, PtrMemory, openfpm::grow_policy_identity, prp...>(v2,opart.get(i));
}
};
//////////////////////////////////////////////////
// temporal buffer for reductions
//! temporal buffer for reductions
union red
{
//! char
char c;
//! unsigned char
unsigned char uc;
//! signed
short s;
//! unsigned short
unsigned short us;
//! integer
int i;
//! unsigned integer
unsigned int ui;
//! float
float f;
//! double
double d;
};
......@@ -103,6 +159,7 @@ class exec_exception: public std::exception
class Vcluster
{
//! log file
Vcluster_log log;
//! NBX has a potential pitfall that must be addressed,
......@@ -121,20 +178,20 @@ class Vcluster
//! messages come from other send or subsequent NBX procedures
size_t NBX_cnt;
// temporal vector used for meta-communication
// ( or meta-data before the real communication )
//! temporal vector used for meta-communication
//! ( or meta-data before the real communication )
openfpm::vector<size_t> proc_com;
// vector that contain the scatter map (it is basically an array of one)
//! vector that contain the scatter map (it is basically an array of one)
openfpm::vector<int> map_scatter;
// vector of MPI requests
//! vector of MPI requests
openfpm::vector<MPI_Request> req;
// vector of MPI status
//! vector of MPI status
openfpm::vector<MPI_Status> stat;
// vector of functions to execute after all the request has been performed
//! vector of functions to execute after all the request has been performed
std::vector<int> post_exe;
// Object array
......@@ -142,15 +199,15 @@ class Vcluster
// Single objects
// number of processes
//! number of processes
int size;
// actual rank
//! actual rank
int rank;
// number of processing unit per process
//! number of processing unit per process
int numPE = 1;
/* This buffer is a temporal buffer for reductions
/*! This buffer is a temporal buffer for reductions
*
* MPI_Iallreduce does not accept recv and send buffer to be the same
* r is used to overcome this problem (is given as second parameter)
......@@ -159,16 +216,16 @@ class Vcluster
*/
std::vector<red> r;
// vector of pointers of send buffers
//! vector of pointers of send buffers
openfpm::vector<void *> ptr_send;
// vector of the size of send buffers
//! vector of the size of send buffers
openfpm::vector<size_t> sz_send;
// sending map
//! sending map
openfpm::vector<size_t> map;
// Receive buffers
//! Receive buffers
openfpm::vector<BHeapMemory> recv_buf;
// barrier request
......
......@@ -9,90 +9,78 @@
private:
// Structures that do an unpack, depending on the existence of max_prop inside 'send'
//
template<bool result, typename T, typename S>
struct unpack_selector
struct unpack_selector_with_prp
{
template<int ... prp> static void call_unpack(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz = NULL)
template<typename op, int ... prp> static void call_unpack(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz, op & op_param)
{
#ifdef DEBUG
std::cout << "Sz.size(): " << sz->size() << std::endl;
#endif
#endif
for (size_t i = 0 ; i < recv_buf.size() ; i++)
{
#ifdef DEBUG
#ifdef DEBUG
std::cout << "Recv_buf.get(i).size(): " << recv_buf.get(i).size() << std::endl;
#endif
#endif
T unp;
ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(recv_buf.get(i).size(),recv_buf.get(i)));
mem.incRef();
Unpack_stat ps;
Unpacker<T,HeapMemory>::template unpack<prp...>(mem, unp, ps);
Unpacker<T,HeapMemory>::template unpack<>(mem, unp, ps);
size_t recv_size_old = recv.size();
// Merge the information
recv.add(unp);
op_param.template execute<true,T,decltype(recv),decltype(unp),prp...>(recv,unp,i);
size_t recv_size_new = recv.size();
if (sz != NULL)
sz->get(i) = recv_size_new - recv_size_old;
}
}
};
//
template<typename T, typename S>
struct unpack_selector<true, T, S>
struct unpack_selector_with_prp<true,T,S>
{
template<int ... prp> static void call_unpack(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz = NULL)
template<typename op, unsigned int ... prp> static void call_unpack(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz, op & op_param)
{
for (size_t i = 0 ; i < recv_buf.size() ; i++)
{
/*ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(recv_buf.get(i).size(),recv_buf.get(i)));
mem.incRef();
Unpack_stat ps;
size_t n_ele = 0;
Unpacker<size_t, HeapMemory>::unpack(mem,n_ele,ps);*/
// calculate the number of received elements
size_t n_ele = recv_buf.get(i).size() / sizeof(typename T::value_type);
// add the received particles to the vector
PtrMemory * ptr1 = new PtrMemory(recv_buf.get(i).getPointer(),recv_buf.get(i).size());
// create vector representation to a piece of memory already allocated
openfpm::vector<typename T::value_type,PtrMemory,typename memory_traits_lin<typename T::value_type>::type, memory_traits_lin,openfpm::grow_policy_identity> v2;
v2.setMemory(*ptr1);
// resize with the number of elements
v2.resize(n_ele);
// Merge the information
size_t recv_size_old = recv.size();
// Merge the information
recv.add(v2);
op_param.template execute<false,T,decltype(recv),decltype(v2),prp...>(recv,v2,i);
size_t recv_size_new = recv.size();
if (sz != NULL)
sz->get(i) = recv_size_new - recv_size_old;
}
}
};
template<typename T>
struct call_serialize_variadic {};
......@@ -110,19 +98,17 @@ private:
Packer<T,HeapMemory>::template pack<prp...>(mem,send,sts);
}
template<typename T, typename S> inline static void call_unpack(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz = NULL)
template<typename op, typename T, typename S> inline static void call_unpack(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz, op & op_param)
{
const bool result = has_pack_gen<typename T::value_type>::value == false && is_vector<T>::value == true;
//const bool result = has_pack<typename T::value_type>::type::value == false && has_pack_agg<typename T::value_type>::result::value == false && is_vector<T>::value == true;
unpack_selector<result, T, S>::template call_unpack<prp...>(recv, recv_buf, sz);
}
unpack_selector_with_prp<result, T, S>::template call_unpack<op,prp...>(recv, recv_buf, sz, op_param);
}
};
// Structures that do a pack request, depending on the existence of max_prop inside 'send'
//There is max_prop inside
template<bool cond, typename T, typename S>
struct pack_unpack_cond
//! There is max_prop inside
template<bool cond, typename op, typename T, typename S, unsigned int ... prp>
struct pack_unpack_cond_with_prp
{
static void packingRequest(T & send, size_t & tot_size, openfpm::vector<size_t> & sz)
{
......@@ -133,7 +119,7 @@ private:
#ifdef DEBUG
std::cout << "Inside SGather pack request (has prp) (vector case) " << std::endl;
#endif
sz.add(send.size()*sizeof(typename T::value_type));
sz.add(send.size()*sizeof(typename T::value_type));
}
else
{
......@@ -144,7 +130,7 @@ private:
sz.add(tot_size);
}
}
static void packing(ExtPreAlloc<HeapMemory> & mem, T & send, Pack_stat & sts, openfpm::vector<const void *> & send_buf)
{
typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
......@@ -163,76 +149,85 @@ private:
std::cout << "Inside SGather pack (has prp) (general case) " << std::endl;
#endif
send_buf.add(mem.getPointerEnd());
call_serialize_variadic<ind_prop_to_pack>::call_pack(mem,send,sts);
}
call_serialize_variadic<ind_prop_to_pack>::call_pack(mem,send,sts);
}
}
static void unpacking(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz = NULL)
static void unpacking(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz, op & op_param)
{
typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
call_serialize_variadic<ind_prop_to_pack>::template call_unpack<T,S>(recv, recv_buf, sz);
}
typedef index_tuple<prp...> ind_prop_to_pack;
call_serialize_variadic<ind_prop_to_pack>::template call_unpack<op,T,S>(recv, recv_buf, sz, op_param);
}
};
//There is no max_prop inside
template<typename T, typename S>
struct pack_unpack_cond<false, T, S>
template<typename T>
struct index_gen {};
template<int ... prp>
struct index_gen<index_tuple<prp...>>
{
static void packingRequest(T & send, size_t & tot_size, openfpm::vector<size_t> & sz)
template<typename op, typename T, typename S> inline static void process_recv(Vcluster & vcl, S & recv, openfpm::vector<size_t> * sz_recv, op & op_param)
{
if (has_pack<typename T::value_type>::type::value == false && is_vector<T>::value == true)
{
#ifdef DEBUG
std::cout << "Inside SGather pack request (no prp) (vector case) " << std::endl;
#endif
sz.add(send.size()*sizeof(typename T::value_type));
}
else
{
Packer<T,HeapMemory>::packRequest(send,tot_size);
#ifdef DEBUG
std::cout << "Tot_size: " << tot_size << std::endl;
#endif
sz.add(tot_size);
}
vcl.process_receive_buffer_with_prp<op,T,S,prp...>(recv,sz_recv,op_param);
}
static void packing(ExtPreAlloc<HeapMemory> & mem, T & send, Pack_stat & sts, openfpm::vector<const void *> & send_buf)
{
};
if (has_pack<typename T::value_type>::type::value == false && is_vector<T>::value == true)
{
#ifdef DEBUG
std::cout << "Inside SGather pack (no prp) (vector case)" << std::endl;
#endif
send_buf.add(send.getPointer());
}
else
{
#ifdef DEBUG
std::cout << "Inside SGather pack (no prp) (genaral case) " << std::endl;
#endif
send_buf.add(mem.getPointerEnd());
Packer<T,HeapMemory>::pack(mem,send,sts);
}
}
static void unpacking(S & recv, openfpm::vector<BHeapMemory> & recv_buf, openfpm::vector<size_t> * sz = NULL)
{
#ifdef DEBUG
std::cout << "Inside SGather unpack (no prp) " << std::endl;
template<typename op, typename T, typename S> void prepare_send_buffer(openfpm::vector<T> & send, S & recv, openfpm::vector<size_t> & prc_send, openfpm::vector<size_t> & prc_recv, openfpm::vector<size_t> & sz_recv)
{
prc_recv.clear();
sz_recv.clear();
// Reset the receive buffer
reset_recv_buf();
#ifdef SE_CLASS1
if (send.size() != prc_send.size())
std::cerr << __FILE__ << ":" << __LINE__ << " Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl;
#endif
const bool result = has_pack<typename T::value_type>::type::value == false && is_vector<T>::value == true;
unpack_selector<result, T, S>::template call_unpack<>(recv, recv_buf, sz);
}
};
// Prepare the sending buffer
openfpm::vector<const void *> send_buf;
openfpm::vector<size_t> sz_byte;
size_t tot_size = 0;
for (size_t i = 0; i < send.size() ; i++)
{
size_t req = 0;
//Pack requesting
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op, T, S>::packingRequest(send.get(i), req, sz_byte);
tot_size += req;
}
HeapMemory pmem;
ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem));
mem.incRef();
for (size_t i = 0; i < send.size() ; i++)
{
//Packing
Pack_stat sts;
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value, op, T, S>::packing(mem, send.get(i), sts, send_buf);
}
// receive information
base_info bi(&recv_buf,prc_recv,sz_recv);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
// Reorder the buffer
reorder_buffer(prc_recv,sz_recv);
}
/*! \brief Reset the receive buffer
......@@ -305,20 +300,23 @@ static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i
*
* \tparam T type of sending object
* \tparam S type of receiving object
* \tparam prp properties to receive
*
* \param recv receive object
*
*/
template<typename T, typename S> void process_receive_buffer(S & recv, openfpm::vector<size_t> * sz = NULL)
template<typename op, typename T, typename S, unsigned int ... prp > void process_receive_buffer_with_prp(S & recv, openfpm::vector<size_t> * sz, op & op_param)
{
if (sz != NULL)
sz->resize(recv_buf.size());
pack_unpack_cond<has_max_prop<T, has_value_type<T>::value>::value, T, S>::unpacking(recv, recv_buf, sz);
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op, T, S, prp... >::unpacking(recv, recv_buf, sz, op_param);
}
public:
/*! \brief Semantic Gather, gather the data from all processors into one node
*
* Semantic communication differ from the normal one. They in general
......@@ -408,8 +406,13 @@ template<typename T, typename S> bool SGather(T & send, S & recv, openfpm::vecto
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
// process the received information
process_receive_buffer<T,S>(recv,&sz);
// we generate the list of the properties to pack
typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
// operation object
op_ssend_recv_add<void> opa;
index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S>(*this,recv,&sz,opa);
recv.add(send);
prc.add(root);
......@@ -433,7 +436,7 @@ template<typename T, typename S> bool SGather(T & send, S & recv, openfpm::vecto
size_t tot_size = 0;
pack_unpack_cond<has_max_prop<T, has_value_type<T>::value>::value, T, S>::packingRequest(send, tot_size, sz);
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op_ssend_recv_add<void>, T, S>::packingRequest(send, tot_size, sz);
HeapMemory pmem;
......@@ -444,7 +447,7 @@ template<typename T, typename S> bool SGather(T & send, S & recv, openfpm::vecto
Pack_stat sts;
pack_unpack_cond<has_max_prop<T, has_value_type<T>::value>::value, T, S>::packing(mem, send, sts, send_buf);
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op_ssend_recv_add<void>, T, S>::packing(mem, send, sts, send_buf);
// receive information
base_info bi(NULL,prc,sz);
......@@ -512,8 +515,13 @@ template<typename T, typename S> bool SScatter(T & send, S & recv, openfpm::vect
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
// process the received information
process_receive_buffer<T,S>(recv,NULL);
// we generate the list of the properties to pack
typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
// operation object
op_ssend_recv_add<void> opa;
index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S>(*this,recv,NULL,opa);
}
else
{
......@@ -526,12 +534,81 @@ template<typename T, typename S> bool SScatter(T & send, S & recv, openfpm::vect
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
process_receive_buffer<T,S>(recv,NULL);
// we generate the list of the properties to pack
typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
// operation object
op_ssend_recv_add<void> opa;
index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S>(*this,recv,NULL,opa);
}
return true;
}
/*! \brief reorder the receiving buffer
*
* \param prc list of the receiving processors
*
*/
void reorder_buffer(openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz_recv)
{
struct recv_buff_reorder
{
//! processor
size_t proc;
//! position in the receive list
size_t pos;
//! default constructor
recv_buff_reorder() {};
//! needed to reorder
bool operator<(recv_buff_reorder & rd)
{
return proc < rd.proc;
}
};
openfpm::vector<recv_buff_reorder> rcv;
rcv.resize(recv_buf.size());
for (size_t i = 0 ; i < rcv.size() ; i++)
{
rcv.get(i).proc = prc.get(i);
rcv.get(i).pos = i;
}
// we sort based on processor
rcv.sort();
openfpm::vector<BHeapMemory> recv_ord;
recv_ord.resize(rcv.size());
openfpm::vector<size_t> prc_ord;
prc_ord.resize(rcv.size());
openfpm::vector<size_t> sz_recv_ord;
sz_recv_ord.resize(rcv.size());
// Now we reorder rcv
for (size_t i = 0 ; i < rcv.size() ; i++)
{
recv_ord.get(i).swap(recv_buf.get(rcv.get(i).pos));
prc_ord.get(i) = rcv.get(i).proc;
sz_recv_ord.get(i) = sz_recv.get(rcv.get(i).pos);
}
// move rcv into recv
recv_buf.swap(recv_ord);
prc.swap(prc_ord);
sz_recv.swap(sz_recv_ord);
// reorder prc_recv and recv_sz
}
/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
*
......@@ -561,55 +638,92 @@ template<typename T, typename S> bool SScatter(T & send, S & recv, openfpm::vect
*/
template<typename T, typename S> bool SSendRecv(openfpm::vector<T> & send, S & recv, openfpm::vector<size_t> & prc_send, openfpm::vector<size_t> & prc_recv, openfpm::vector<size_t> & sz_recv)
{
// Reset the receive buffer
reset_recv_buf();
prepare_send_buffer<op_ssend_recv_add<void>,T,S>(send,recv,prc_send,prc_recv,sz_recv);
#ifdef SE_CLASS1
// we generate the list of the properties to pack
typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
if (send.size() != prc_send.size())
std::cerr << __FILE__ << ":" << __LINE__ << " Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl;
op_ssend_recv_add<void> opa;
#endif
index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S>(*this,recv,&sz_recv,opa);
// Prepare the sending buffer
openfpm::vector<const void *> send_buf;
return true;
}
openfpm::vector<size_t> sz_byte;
size_t tot_size = 0;
for (size_t i = 0; i < send.size() ; i++)
{
size_t req = 0;