Commit 7545a755 authored by incardon's avatar incardon

Changing names of some files

parent 3f37237e
......@@ -2,18 +2,18 @@
LINKLIBS = $(DEFAULT_LIB) $(PTHREAD_LIBS) $(OPT_LIBS) $(HDF5_LDFLAGS) $(HDF5_LIBS) $(BOOST_LDFLAGS)
noinst_PROGRAMS = vcluster
vcluster_SOURCES = main.cpp VCluster.cpp ../../openfpm_devices/src/memory/HeapMemory.cpp ../../openfpm_devices/src/memory/PtrMemory.cpp
vcluster_SOURCES = main.cpp VCluster/VCluster.cpp ../../openfpm_devices/src/memory/HeapMemory.cpp ../../openfpm_devices/src/memory/PtrMemory.cpp
vcluster_CXXFLAGS = $(AM_CXXFLAGS) $(INCLUDES_PATH) $(BOOST_CPPFLAGS)
vcluster_CFLAGS = $(CUDA_CFLAGS)
vcluster_LDADD = $(LINKLIBS)
lib_LIBRARIES = libvcluster.a
libvcluster_a_SOURCES = VCluster.cpp
libvcluster_a_SOURCES = VCluster/VCluster.cpp
libvcluster_a_CXXFLAGS = $(AM_CXXFLAGS) $(INCLUDES_PATH) $(BOOST_CPPFLAGS)
libvcluster_a_CFLAGS =
nobase_include_HEADERS = MPI_wrapper/MPI_IallreduceW.hpp MPI_wrapper/MPI_IrecvW.hpp MPI_wrapper/MPI_IsendW.hpp MPI_wrapper/MPI_util.hpp MPI_wrapper/MPI_IAllGather.hpp \
VCluster_semantic.ipp VCluster.hpp VCluster_object.hpp \
VCluster/VCluster_base.hpp VCluster/VCluster.hpp \
util/Vcluster_log.hpp
.cu.o :
......
/*
* Vcluster.hpp
*
* Created on: Feb 8, 2016
* Author: Pietro Incardona
*/
#ifndef VCLUSTER_HPP
#define VCLUSTER_HPP
#include "VCluster_base.hpp"
#include "VCluster_meta_function.hpp"
/*! \brief Implementation of VCluster class
*
* This class implement communication functions. Like summation, minimum and maximum across
* processors, or Dynamic Sparse Data Exchange (DSDE)
*
* ## Vcluster Min max sum
* \snippet VCluster_unit_tests.hpp max min sum
*
* ## Vcluster all gather
* \snippet VCluster_unit_test_util.hpp allGather numbers
*
* ## Dynamic sparse data exchange with complex objects
* \snippet VCluster_semantic_unit_tests.hpp dsde with complex objects1
*
* ## Dynamic sparse data exchange with buffers
* \snippet VCluster_unit_test_util.hpp dsde
* \snippet VCluster_unit_test_util.hpp message alloc
*
*/
class Vcluster: public Vcluster_base
{
template<typename T>
struct index_gen {};
//! Process the receive buffer using the specified properties (meta-function)
template<int ... prp>
struct index_gen<index_tuple<prp...>>
{
//! Process the receive buffer
template<typename op, typename T, typename S> inline static void process_recv(Vcluster & vcl, S & recv, openfpm::vector<size_t> * sz_recv, openfpm::vector<size_t> * sz_recv_byte, op & op_param)
{
vcl.process_receive_buffer_with_prp<op,T,S,prp...>(recv,sz_recv,sz_recv_byte,op_param);
}
};
/*! \brief Prepare the send buffer and send the message to other processots
*
* \tparam op Operation to execute in merging the receiving data
* \tparam T sending object
* \tparam S receiving object
*
* \note T and S must not be the same object but a S.operation(T) must be defined. There the flexibility
* of the operation is defined by op
*
* \param send sending buffer
* \param recv receiving buffer
* \param prc_send each object T in the vector send is sent to one processor specified in this list.
* This mean that prc_send.size() == send.size()
* \param recv Receiving object
* \param prc_recv list of processor from where we receive (output), in case of RECEIVE_KNOWN muts be filled
* \param recv_sz size of each receiving message (output), in case of RECEICE_KNOWN must be filled
* \param opt Options using RECEIVE_KNOWN enable patters with less latencies, in case of RECEIVE_KNOWN
*
*/
template<typename op, typename T, typename S> void prepare_send_buffer(openfpm::vector<T> & send, S & recv, openfpm::vector<size_t> & prc_send, openfpm::vector<size_t> & prc_recv, openfpm::vector<size_t> & sz_recv, size_t opt)
{
openfpm::vector<size_t> sz_recv_byte(sz_recv.size());
// Reset the receive buffer
reset_recv_buf();
#ifdef SE_CLASS1
if (send.size() != prc_send.size())
std::cerr << __FILE__ << ":" << __LINE__ << " Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl;
#endif
// Prepare the sending buffer
openfpm::vector<const void *> send_buf;
openfpm::vector<size_t> send_sz_byte;
size_t tot_size = 0;
for (size_t i = 0; i < send.size() ; i++)
{
size_t req = 0;
//Pack requesting
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op, T, S>::packingRequest(send.get(i), req, send_sz_byte);
tot_size += req;
}
HeapMemory pmem;
ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem));
mem.incRef();
for (size_t i = 0; i < send.size() ; i++)
{
//Packing
Pack_stat sts;
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value, op, T, S>::packing(mem, send.get(i), sts, send_buf);
}
// receive information
base_info bi(&recv_buf,prc_recv,sz_recv_byte);
// Send and recv multiple messages
if (opt & RECEIVE_KNOWN)
{
// We we are passing the number of element but not the byte, calculate the byte
if (opt & KNOWN_ELEMENT_OR_BYTE)
{
// We know the number of element convert to byte (ONLY if it is possible)
if (has_pack_gen<typename T::value_type>::value == false && is_vector<T>::value == true)
{
for (size_t i = 0 ; i < sz_recv.size() ; i++)
sz_recv_byte.get(i) = sz_recv.get(i) * sizeof(typename T::value_type);
}
else
std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option RECEIVE_KNOWN or NO_CHANGE_ELEMENTS" << std::endl;
}
Vcluster_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte.getPointer(),msg_alloc_known,(void *)&bi);
}
else
{
prc_recv.clear();
sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
}
// Reorder the buffer
reorder_buffer(prc_recv,sz_recv_byte);
mem.decRef();
delete &mem;
}
/*! \brief Reset the receive buffer
*
*
*/
void reset_recv_buf()
{
for (size_t i = 0 ; i < recv_buf.size() ; i++)
recv_buf.get(i).resize(0);
recv_buf.resize(0);
}
/*! \brief Base info
*
* \param recv_buf receive buffers
* \param prc processors involved
* \param size of the received data
*
*/
struct base_info
{
//! Receive buffer
openfpm::vector<BHeapMemory> * recv_buf;
//! receiving processor list
openfpm::vector<size_t> & prc;
//! size of each message
openfpm::vector<size_t> & sz;
//! constructor
base_info(openfpm::vector<BHeapMemory> * recv_buf, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz)
:recv_buf(recv_buf),prc(prc),sz(sz)
{}
};
/*! \brief Call-back to allocate buffer to receive data
*
* \param msg_i size required to receive the message from i
* \param total_msg total size to receive from all the processors
* \param total_p the total number of processor that want to communicate with you
* \param i processor id
* \param ri request id (it is an id that goes from 0 to total_p, and is unique
* every time message_alloc is called)
* \param ptr a pointer to the vector_dist structure
*
* \return the pointer where to store the message for the processor i
*
*/
static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
{
base_info & rinfo = *(base_info *)ptr;
if (rinfo.recv_buf == NULL)
{
std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
return NULL;
}
rinfo.recv_buf->resize(ri+1);
rinfo.recv_buf->get(ri).resize(msg_i);
// Receive info
rinfo.prc.add(i);
rinfo.sz.add(msg_i);
// return the pointer
return rinfo.recv_buf->last().getPointer();
}
/*! \brief Call-back to allocate buffer to receive data
*
* \param msg_i size required to receive the message from i
* \param total_msg total size to receive from all the processors
* \param total_p the total number of processor that want to communicate with you
* \param i processor id
* \param ri request id (it is an id that goes from 0 to total_p, and is unique
* every time message_alloc is called)
* \param ptr a pointer to the vector_dist structure
*
* \return the pointer where to store the message for the processor i
*
*/
static void * msg_alloc_known(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
{
base_info & rinfo = *(base_info *)ptr;
if (rinfo.recv_buf == NULL)
{
std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
return NULL;
}
rinfo.recv_buf->resize(ri+1);
rinfo.recv_buf->get(ri).resize(msg_i);
// return the pointer
return rinfo.recv_buf->last().getPointer();
}
/*! \brief Process the receive buffer
*
* \tparam T type of sending object
* \tparam S type of receiving object
* \tparam prp properties to receive
*
* \param recv receive object
*
*/
template<typename op, typename T, typename S, unsigned int ... prp > void process_receive_buffer_with_prp(S & recv, openfpm::vector<size_t> * sz, openfpm::vector<size_t> * sz_byte, op & op_param)
{
if (sz != NULL)
sz->resize(recv_buf.size());
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op, T, S, prp... >::unpacking(recv, recv_buf, sz, sz_byte, op_param);
}
public:
/*! \brief Constructor
*
* \param argc main number of arguments
* \param argv main set of arguments
*
*/
Vcluster(int *argc, char ***argv)
:Vcluster_base(argc,argv)
{
}
/*! \brief Semantic Gather, gather the data from all processors into one node
*
* Semantic communication differ from the normal one. They in general
* follow the following model.
*
* Gather(T,S,root,op=add);
*
* "Gather" indicate the communication pattern, or how the information flow
* T is the object to send, S is the object that will receive the data.
* In order to work S must implement the interface S.add(T).
*
* ### Example send a vector of structures, and merge all together in one vector
* \snippet VCluster_semantic_unit_tests.hpp Gather the data on master
*
* ### Example send a vector of structures, and merge all together in one vector
* \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex
*
* \tparam T type of sending object
* \tparam S type of receiving object
*
* \param Object to send
* \param Object to receive
* \param root witch node should collect the information
*
* \return true if the function completed succefully
*
*/
template<typename T, typename S> bool SGather(T & send, S & recv,size_t root)
{
openfpm::vector<size_t> prc;
openfpm::vector<size_t> sz;
return SGather(send,recv,prc,sz,root);
}
template<size_t index, size_t N> struct MetaFuncOrd {
enum { value = index };
};
/*! \brief Semantic Gather, gather the data from all processors into one node
*
* Semantic communication differ from the normal one. They in general
* follow the following model.
*
* Gather(T,S,root,op=add);
*
* "Gather" indicate the communication pattern, or how the information flow
* T is the object to send, S is the object that will receive the data.
* In order to work S must implement the interface S.add(T).
*
* ### Example send a vector of structures, and merge all together in one vector
* \snippet VCluster_semantic_unit_tests.hpp Gather the data on master
*
* ### Example send a vector of structures, and merge all together in one vector
* \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex
*
* \tparam T type of sending object
* \tparam S type of receiving object
*
* \param Object to send
* \param Object to receive
* \param root witch node should collect the information
* \param prc processors from witch we received the information
* \param sz size of the received information for each processor
*
* \return true if the function completed succefully
*
*/
template<typename T, typename S> bool SGather(T & send, S & recv, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz,size_t root)
{
// Reset the receive buffer
reset_recv_buf();
// If we are on master collect the information
if (getProcessUnitID() == root)
{
#ifdef DEBUG
std::cout << "Inside root " << root << std::endl;
#endif
// send buffer (master does not send anything) so send req and send_buf
// remain buffer with size 0
openfpm::vector<size_t> send_req;
// receive information
base_info bi(&recv_buf,prc,sz);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
// we generate the list of the properties to pack
typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
// operation object
op_ssend_recv_add<void> opa;
index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S>(*this,recv,&sz,NULL,opa);
recv.add(send);
prc.add(root);
sz.add(send.size());
}
else
{
#ifdef DEBUG
std::cout << "Inside slave " << getProcessUnitID() << std::endl;
#endif
// send buffer (master does not send anything) so send req and send_buf
// remain buffer with size 0
openfpm::vector<size_t> send_prc;
send_prc.add(root);
openfpm::vector<size_t> sz;
openfpm::vector<const void *> send_buf;
//Pack requesting
size_t tot_size = 0;
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op_ssend_recv_add<void>, T, S>::packingRequest(send, tot_size, sz);
HeapMemory pmem;
ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem));
mem.incRef();
//Packing
Pack_stat sts;
pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op_ssend_recv_add<void>, T, S>::packing(mem, send, sts, send_buf);
// receive information
base_info bi(NULL,prc,sz);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(send_prc.size(),(size_t *)sz.getPointer(),(size_t *)send_prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi,NONE);
mem.decRef();
delete &mem;
}
return true;
}
/*! \brief Semantic Scatter, scatter the data from one processor to the other node
*
* Semantic communication differ from the normal one. They in general
* follow the following model.
*
* Scatter(T,S,...,op=add);
*
* "Scatter" indicate the communication pattern, or how the information flow
* T is the object to send, S is the object that will receive the data.
* In order to work S must implement the interface S.add(T).
*
* ### Example scatter a vector of structures, to other processors
* \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
*
* \tparam T type of sending object
* \tparam S type of receiving object
*
* \param Object to send
* \param Object to receive
* \param prc processor involved in the scatter
* \param sz size of each chunks
* \param root which processor should scatter the information
*
* \return true if the function completed succefully
*
*/
template<typename T, typename S> bool SScatter(T & send, S & recv, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, size_t root)
{
// Reset the receive buffer
reset_recv_buf();
// If we are on master scatter the information
if (getProcessUnitID() == root)
{
// Prepare the sending buffer
openfpm::vector<const void *> send_buf;
openfpm::vector<size_t> sz_byte;
sz_byte.resize(sz.size());
size_t ptr = 0;
for (size_t i = 0; i < sz.size() ; i++)
{
send_buf.add((char *)send.getPointer() + sizeof(typename T::value_type)*ptr );
sz_byte.get(i) = sz.get(i) * sizeof(typename T::value_type);
ptr += sz.get(i);
}
// receive information
base_info bi(&recv_buf,prc,sz);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
// we generate the list of the properties to pack
typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
// operation object
op_ssend_recv_add<void> opa;
index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S>(*this,recv,NULL,NULL,opa);
}
else
{
// The non-root receive
openfpm::vector<size_t> send_req;
// receive information
base_info bi(&recv_buf,prc,sz);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
// we generate the list of the properties to pack
typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
// operation object
op_ssend_recv_add<void> opa;
index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S>(*this,recv,NULL,NULL,opa);
}
return true;
}
/*! \brief reorder the receiving buffer
*
* \param prc list of the receiving processors
*
*/
void reorder_buffer(openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz_recv)
{
struct recv_buff_reorder
{
//! processor
size_t proc;
//! position in the receive list
size_t pos;
//! default constructor
recv_buff_reorder()
:proc(0),pos(0)
{};
//! needed to reorder
bool operator<(const recv_buff_reorder & rd) const
{
return proc < rd.proc;
}
};
openfpm::vector<recv_buff_reorder> rcv;
rcv.resize(recv_buf.size());
for (size_t i = 0 ; i < rcv.size() ; i++)
{
rcv.get(i).proc = prc.get(i);
rcv.get(i).pos = i;
}
// we sort based on processor
rcv.sort();
openfpm::vector<BHeapMemory> recv_ord;
recv_ord.resize(rcv.size());
openfpm::vector<size_t> prc_ord;
prc_ord.resize(rcv.size());
openfpm::vector<size_t> sz_recv_ord;
sz_recv_ord.resize(rcv.size());
// Now we reorder rcv
for (size_t i = 0 ; i < rcv.size() ; i++)
{
recv_ord.get(i).swap(recv_buf.get(rcv.get(i).pos));
prc_ord.get(i) = rcv.get(i).proc;
sz_recv_ord.get(i) = sz_recv.get(rcv.get(i).pos);
}
// move rcv into recv
recv_buf.swap(recv_ord);
prc.swap(prc_ord);
sz_recv.swap(sz_recv_ord);
// reorder prc_recv and recv_sz
}
/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
*
* Semantic communication differ from the normal one. They in general
* follow the following model.
*
* Recv(T,S,...,op=add);
*
* "SendRecv" indicate the communication pattern, or how the information flow
* T is the object to send, S is the object that will receive the data.
* In order to work S must implement the interface S.add(T).
*
* ### Example scatter a vector of structures, to other processors
* \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
*
* \tparam T type of sending object
* \tparam S type of receiving object
*
* \param Object to send
* \param Object to receive
* \param prc processor involved in the scatter
* \param sz size of each chunks
* \param root which processor should scatter the information
*
* \return true if the function completed succefully
*
*/
template<typename T, typename S> bool SSendRecv(openfpm::vector<T> & send, S & recv, openfpm::vector<size_t> & prc_send, openfpm::vector<size_t> & prc_recv, openfpm::vector<size_t> & sz_recv, size_t opt = NONE)
{
prepare_send_buffer<op_ssend_recv_add<void>,T,S>(send,recv,prc_send,prc_recv,sz_recv,opt);
// we generate the list of the properties to pack
typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
op_ssend_recv_add<void> opa;
index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S>(*this,recv,&sz_recv,NULL,opa);
return true;
}
/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
*
* Semantic communication differ from the normal one. They in general
* follow the following model.
*
* SSendRecv(T,S,...,op=add);
*
* "SendRecv" indicate the communication pattern, or how the information flow
* T is the object to send, S is the object that will receive the data.
* In order to work S must implement the interface S.add<prp...>(T).
*
* ### Example scatter a vector of structures, to other processors
* \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
*
* \tparam T type of sending object
* \tparam S type of receiving object
* \tparam prp properties for merging
*
* \param Object to send
* \param Object to receive
* \param prc processor involved in the scatter
* \param sz size of each chunks
* \param root which processor should scatter the information
*
* \return true if the function completed succefully
*
*/
template<typename T, typename S, int ... prp> bool SSendRecvP(openfpm::vector<T> & send, S & recv, openfpm::vector<size_t> & prc_send, openfpm::vector<size_t> & prc_recv, openfpm::vector<size_t> & sz_recv, openfpm::vector<size_t> & sz_recv_byte)
{
prepare_send_buffer<op_ssend_recv_add<void>,T,S>(send,recv,prc_send,prc_recv,sz_recv,NONE);
// operation object
op_ssend_recv_add<void> opa;
// process the received information
process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,prp...>(recv,&sz_recv,&sz_recv_byte,opa);
return true;
}
/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
*
* Semantic communication differ from the normal one. They in general
* follow the following model.
*
* SSendRecv(T,S,...,op=add);
*
* "SendRecv" indicate the communication pattern, or how the information flow
* T is the object to send, S is the object that will receive the data.
* In order to work S must implement the interface S.add<prp...>(T).
*
* ### Example scatter a vector of structures, to other processors
* \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
*
* \tparam T type of sending object
* \tparam S type of receiving object
* \tparam prp properties for merging
*
* \param Object to send
* \param Object to receive
* \param prc processor involved in the scatter
* \param sz size of each chunks
* \param root which processor should scatter the information
*
* \return true if the function completed succefully
*
*/
template<typename T, typename S, int ... prp> bool SSendRecvP(openfpm::vector<T> & send, S & recv, openfpm::vector<size_t> & prc_send, openfpm::vector<size_t> & prc_recv, openfpm::vector<size_t> & sz_recv)
{
prepare_send_buffer<op_ssend_recv_add<void>,T,S>(send,recv,prc_send,prc_recv,sz_recv,NONE);
// operation object
op_ssend_recv_add<void> opa;
// process the received information
process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,prp...>(recv,&sz_recv,NULL,opa);
return true;
}
/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
*
* Semantic communication differ from the normal one. They in general
* follow the following model.
*
* SSendRecv(T,S,...,op=add);
*
* "SendRecv" indicate the communication pattern, or how the information flow
* T is the object to send, S is the object that will receive the data.