Commit 8d4d2abd authored by Pietro Incardona's avatar Pietro Incardona

updated VCluster

parent 2eb8e431
......@@ -7,6 +7,8 @@
* Author: Pietro Incardona
*/
private:
/*! \brief Reset the receive buffer
*
*
......@@ -17,6 +19,25 @@ void reset_recv_buf()
recv_buf.get(i).resize(0);
}
/*! \brief Base info
*
* \param recv_buf receive buffers
* \param prc processors involved
* \param size of the received data
*
*/
struct base_info
{
openfpm::vector<BHeapMemory> * recv_buf;
openfpm::vector<size_t> & prc;
openfpm::vector<size_t> & sz;
// constructor
base_info(openfpm::vector<BHeapMemory> * recv_buf, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz)
:recv_buf(recv_buf),prc(prc),sz(sz)
{}
};
/*! \brief Call-back to allocate buffer to receive data
*
* \param msg_i size required to receive the message from i
......@@ -32,19 +53,57 @@ void reset_recv_buf()
*/
static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
{
openfpm::vector<BHeapMemory> * recv_buf = (openfpm::vector<BHeapMemory> *)ptr;
base_info & rinfo = *(base_info *)ptr;
if (recv_buf == NULL)
if (rinfo.recv_buf == NULL)
std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
recv_buf->resize(ri+1);
rinfo.recv_buf->resize(ri+1);
recv_buf->get(ri).resize(msg_i);
rinfo.recv_buf->get(ri).resize(msg_i);
// Receive info
rinfo.prc.add(i);
rinfo.sz.add(msg_i);
// return the pointer
return recv_buf->last().getPointer();
return rinfo.recv_buf->last().getPointer();
}
/*! \brief Process the receive buffer
*
* \tparam T type of sending object
* \tparam S type of receiving object
*
* \param recv receive object
*
*/
template<typename T, typename S> void process_receive_buffer(S & recv)
{
for (size_t i = 0 ; i < recv_buf.size() ; i++)
{
// for each received buffer create a memory reppresentation
// calculate the number of received elements
size_t n_ele = recv_buf.get(i).size() / sizeof(typename T::value_type);
// add the received particles to the vector
PtrMemory * ptr1 = new PtrMemory(recv_buf.get(i).getPointer(),recv_buf.get(i).size());
// create vector representation to a piece of memory already allocated
openfpm::vector<typename T::value_type,PtrMemory,openfpm::grow_policy_identity> v2;
v2.setMemory(*ptr1);
// resize with the number of elements
v2.resize(n_ele);
// Merge the information
recv.add(v2);
}
}
public:
/*! \brief Semantic Gather, gather the data from all processors into one node
*
* Semantic communication differ from the normal one. They in general
......@@ -62,6 +121,9 @@ static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i
* ### Example send a vector of structures, and merge all together in one vector
* \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex
*
* \tparam T type of sending object
* \tparam S type of receiving object
*
* \param Object to send
* \param Object to receive
* \param root witch node should collect the information
......@@ -69,7 +131,44 @@ static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i
* \return true if the function completed succefully
*
*/
template<typename T, typename S> bool SGather(T & send, S & recv,size_t root, int debug = 0)
template<typename T, typename S> bool SGather(T & send, S & recv,size_t root)
{
openfpm::vector<size_t> prc;
openfpm::vector<size_t> sz;
return SGather(send,recv,prc,sz,root);
}
/*! \brief Semantic Gather, gather the data from all processors into one node
*
* Semantic communication differ from the normal one. They in general
* follow the following model.
*
* Gather(T,S,root,op=add);
*
* "Gather" indicate the communication pattern, or how the information flow
* T is the object to send, S is the object that will receive the data.
* In order to work S must implement the interface S.add(T).
*
* ### Example send a vector of structures, and merge all together in one vector
* \snippet VCluster_semantic_unit_tests.hpp Gather the data on master
*
* ### Example send a vector of structures, and merge all together in one vector
* \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex
*
* \tparam T type of sending object
* \tparam S type of receiving object
*
* \param Object to send
* \param Object to receive
* \param root witch node should collect the information
* \param prc processors from witch we received the information
* \param sz size of the received information for each processor
*
* \return true if the function completed succefully
*
*/
template<typename T, typename S> bool SGather(T & send, S & recv, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz,size_t root)
{
// Reset the receive buffer
reset_recv_buf();
......@@ -81,31 +180,22 @@ template<typename T, typename S> bool SGather(T & send, S & recv,size_t root, in
// remain buffer with size 0
openfpm::vector<size_t> send_req;
// receive information
base_info bi(&recv_buf,prc,sz);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&recv_buf);
sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
// Convert the received byte into number of elements
for (size_t i = 0 ; i < sz.size() ; i++)
sz.get(i) /= sizeof(typename T::value_type);
// process the received information
process_receive_buffer<T,S>(recv);
for (size_t i = 0 ; i < recv_buf.size() ; i++)
{
// for each received buffer create a memory reppresentation
// calculate the number of received elements
size_t n_ele = recv_buf.get(i).size() / sizeof(typename T::value_type);
// add the received particles to the vector
PtrMemory * ptr1 = new PtrMemory(recv_buf.get(i).getPointer(),recv_buf.get(i).size());
// create vector representation to a piece of memory already allocated
openfpm::vector<typename T::value_type,PtrMemory,openfpm::grow_policy_identity> v2;
v2.setMemory(*ptr1);
// resize with the number of elements
v2.resize(n_ele);
// Merge the information
recv.add(v2);
}
recv.add(send);
prc.add(root);
sz.add(send.size());
}
else
{
......@@ -113,14 +203,95 @@ template<typename T, typename S> bool SGather(T & send, S & recv,size_t root, in
// remain buffer with size 0
openfpm::vector<size_t> send_prc;
send_prc.add(root);
openfpm::vector<void *> send_buf;
openfpm::vector<const void *> send_buf;
send_buf.add(send.getPointer());
openfpm::vector<size_t> sz;
sz.add(send.size()*sizeof(typename T::value_type));
// receive information
base_info bi(NULL,prc,sz);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(send_prc.size(),(size_t *)sz.getPointer(),(size_t *)send_prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,NULL);
sendrecvMultipleMessagesNBX(send_prc.size(),(size_t *)sz.getPointer(),(size_t *)send_prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
}
return true;
}
/*! \brief Semantic Scatter, scatter the data from one processor to the other node
*
* Semantic communication differ from the normal one. They in general
* follow the following model.
*
* Scatter(T,S,...,op=add);
*
* "Scatter" indicate the communication pattern, or how the information flow
* T is the object to send, S is the object that will receive the data.
* In order to work S must implement the interface S.add(T).
*
* ### Example scatter a vector of structures, to other processors
* \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
*
* \tparam T type of sending object
* \tparam S type of receiving object
*
* \param Object to send
* \param Object to receive
* \param prc processor involved in the scatter
* \param sz size of each chunks
* \param root which processor should scatter the information
*
* \return true if the function completed succefully
*
*/
template<typename T, typename S> bool SScatter(T & send, S & recv, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, size_t root)
{
// Reset the receive buffer
reset_recv_buf();
// If we are on master scatter the information
if (getProcessUnitID() == root)
{
// Prepare the sending buffer
openfpm::vector<const void *> send_buf;
openfpm::vector<size_t> sz_byte;
sz_byte.resize(sz.size());
size_t ptr = 0;
for (size_t i = 0; i < sz.size() ; i++)
{
send_buf.add((char *)send.getPointer() + sizeof(typename T::value_type)*ptr );
sz_byte.get(i) = sz.get(i) * sizeof(typename T::value_type);
ptr += sz.get(i);
}
// receive information
base_info bi(&recv_buf,prc,sz);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
// process the received information
process_receive_buffer<T,S>(recv);
}
else
{
// The non-root receive
openfpm::vector<size_t> send_req;
// receive information
base_info bi(&recv_buf,prc,sz);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
process_receive_buffer<T,S>(recv);
}
return true;
}
......@@ -93,6 +93,102 @@ BOOST_AUTO_TEST_CASE (Vcluster_semantic_struct_gather)
}
}
#define SSCATTER_MAX 7
BOOST_AUTO_TEST_CASE (Vcluster_semantic_scatter)
{
for (size_t i = 0 ; i < 100 ; i++)
{
Vcluster & vcl = *global_v_cluster;
if (vcl.getProcessingUnits() >= 32)
return;
size_t nc = vcl.getProcessingUnits() / SSCATTER_MAX;
size_t nr = vcl.getProcessingUnits() - nc * SSCATTER_MAX;
nr = ((nr-1) * nr) / 2;
size_t n_elements = nc * SSCATTER_MAX * (SSCATTER_MAX - 1) / 2 + nr;
openfpm::vector<size_t> v1;
v1.resize(n_elements);
for(size_t i = 0 ; i < n_elements ; i++)
v1.get(i) = 5;
openfpm::vector<size_t> v2;
openfpm::vector<size_t> prc;
openfpm::vector<size_t> sz;
// Scatter pattern
for (size_t i = 0 ; i < vcl.getProcessingUnits() ; i++)
{
sz.add(i % SSCATTER_MAX);
prc.add(i);
}
vcl.SScatter(v1,v2,prc,sz,(i%vcl.getProcessingUnits()));
BOOST_REQUIRE_EQUAL(v2.size(),vcl.getProcessUnitID() % SSCATTER_MAX);
bool is_five = true;
for (size_t i = 0 ; i < v2.size() ; i++)
is_five &= (v2.get(i) == 5);
BOOST_REQUIRE_EQUAL(is_five,true);
}
}
BOOST_AUTO_TEST_CASE (Vcluster_semantic_struct_scatter)
{
for (size_t i = 0 ; i < 100 ; i++)
{
Vcluster & vcl = *global_v_cluster;
if (vcl.getProcessingUnits() >= 32)
return;
size_t nc = vcl.getProcessingUnits() / SSCATTER_MAX;
size_t nr = vcl.getProcessingUnits() - nc * SSCATTER_MAX;
nr = ((nr-1) * nr) / 2;
size_t n_elements = nc * SSCATTER_MAX * (SSCATTER_MAX - 1) / 2 + nr;
openfpm::vector<size_t> v1;
v1.resize(n_elements);
for(size_t i = 0 ; i < n_elements ; i++)
v1.get(i) = 5;
openfpm::vector<size_t> v2;
openfpm::vector<size_t> prc;
openfpm::vector<size_t> sz;
// Scatter pattern
for (size_t i = 0 ; i < vcl.getProcessingUnits() ; i++)
{
sz.add(i % SSCATTER_MAX);
prc.add(i);
}
vcl.SScatter(v1,v2,prc,sz,(i%vcl.getProcessingUnits()));
if (vcl.getProcessUnitID() == (i%vcl.getProcessingUnits()))
{
BOOST_REQUIRE_EQUAL(v2.size(),vcl.getProcessUnitID() % SSCATTER_MAX);
bool is_five = true;
for (size_t i = 0 ; i < v2.size() ; i++)
is_five &= (v2.get(i) == 5);
BOOST_REQUIRE_EQUAL(is_five,true);
}
}
}
BOOST_AUTO_TEST_SUITE_END()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment