Commit c8ddf7f3 authored by Pietro Incardona's avatar Pietro Incardona

Adding semantic communications

parent 0e7112c7
......@@ -160,6 +160,9 @@ class Vcluster
// sending map
openfpm::vector<size_t> map;
// Receive buffers
openfpm::vector<HeapMemory> recv_buf;
// barrier request
MPI_Request bar_req;
// barrier status
......@@ -347,56 +350,6 @@ public:
MPI_IallreduceW<T>::reduce(num,MPI_MAX,req.last());
}
/*! \brief Send and receive multiple messages
*
* It send multiple messages to a set of processors the and receive
* multiple messages from another set of processors, all the processor must call this
* function, NBX is more performant than PCX with more processors (1000+)
*
* suppose the following situation the calling processor want to communicate
* * 2 vector of 100 integers to processor 1
* * 1 vector of 50 integers to processor 6
* * 1 vector of 48 integers to processor 7
* * 1 vector of 70 integers to processor 8
*
* \param prc list of processors you should communicate with [1,1,6,7,8]
*
* \param v vector containing the data to send [v=vector<vector<int>>, v.size()=4, T=vector<int>], T at the moment
* is only tested for vectors of 0 or more generic elements (without pointers)
*
* \param msg_alloc This is a call-back with the purpose to allocate space
* for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by
* the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
* the following 6 parameters
* in the call-back in order:
* * message size required to receive the message (100)
* * total message size to receive from all the processors (NBX does not provide this information)
* * the total number of processor want to communicate with you (NBX does not provide this information)
* * processor id (5)
* * ri request id (it is an id that goes from 0 to total_p, and is incremented
* every time message_alloc is called)
* * void pointer, parameter for additional data to pass to the call-back
*
* \param opt options, only NONE supported
*
*/
template<typename T> void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, openfpm::vector< T * > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE)
{
#ifdef DEBUG
checkType<typename T::value_type>();
#endif
// resize the pointer list
ptr_send.resize(prc.size());
sz_send.resize(prc.size());
for (size_t i = 0 ; i < prc.size() ; i++)
{
ptr_send.get(i) = data.get(i)->getPointer();
sz_send.get(i) = data.get(i)->size() * sizeof(typename T::value_type);
}
sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt);
}
/*! \brief Send and receive multiple messages
*
......@@ -548,7 +501,7 @@ public:
* \param opt options, NONE (ignored in this moment)
*
*/
void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt)
void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt = NONE)
{
if (stat.size() != 0 || req.size() != 0)
std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress \n";
......@@ -981,6 +934,8 @@ public:
}
/////////////////////// Semantic communication ///////////////////////
#include "VCluster_semantic.ipp"
};
// Function to initialize the global VCluster //
......
/*
* VCluster_semantic.hpp
*
* Implementation of semantic communications
*
* Created on: Feb 8, 2016
* Author: Pietro Incardona
*/
/*! \brief Call-back to allocate buffer to receive data
*
* \param msg_i size required to receive the message from i
* \param total_msg total size to receive from all the processors
* \param total_p the total number of processor that want to communicate with you
* \param i processor id
* \param ri request id (it is an id that goes from 0 to total_p, and is unique
* every time message_alloc is called)
* \param ptr a pointer to the vector_dist structure
*
* \return the pointer where to store the message for the processor i
*
*/
static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
{
openfpm::vector<HeapMemory> * recv_buf = (openfpm::vector<HeapMemory> *)ptr;
if (recv_buf == NULL)
std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
// We need one more slot
recv_buf->add();
recv_buf->last().resize(msg_i);
// return the pointer
return recv_buf->last().getPointer();
}
/*! \brief Semantic Gather, gather the data from all processors into one node
*
* Semantic communication differ from the normal one. They in general
* follow the following model.
*
* Gather(T,S,root,op=add);
*
* "Gather" indicate the communication pattern, or how the information flow
* T is the object to send, S is the object that will receive the data.
* In order to work S must implement the interface S.add(T).
*
* ### Example send a vector of structures, and merge all together in one vector
* \snippet VCluster_semantic_unit_tests.hpp Gather the data on master
*
* ### Example send a vector of structures, and merge all together in one vector
* \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex
*
* \param Object to send
* \param Object to receive
* \param root witch node should collect the information
*
* \return true if the function completed succefully
*
*/
template<typename T, typename S> bool SGather(T & send, S & recv,size_t root)
{
// If we are on master collect the information
if (getProcessUnitID() == root)
{
// send buffer (master does not send anything) so send req and send_buf
// remain buffer with size 0
openfpm::vector<size_t> send_req;
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&recv_buf);
for (size_t i = 0 ; i < recv_buf.size() ; i++)
{
// for each received buffer create a memory reppresentation
// calculate the number of received elements
size_t n_ele = recv_buf.get(i).size() / sizeof(typename T::value_type);
// add the received particles to the vector
PtrMemory * ptr1 = new PtrMemory(recv_buf.get(i).getPointer(),recv_buf.get(i).size());
// create vector representation to a piece of memory already allocated
openfpm::vector<typename T::value_type,PtrMemory,openfpm::grow_policy_identity> v2;
v2.setMemory(*ptr1);
// resize with the number of elements
v2.resize(n_ele);
// Merge the information
recv.add(v2);
}
}
else
{
// send buffer (master does not send anything) so send req and send_buf
// remain buffer with size 0
openfpm::vector<size_t> send_prc;
send_prc.add(0);
openfpm::vector<void *> send_buf;
send_buf.add(send.getPointer());
openfpm::vector<size_t> sz;
sz.add(send.size()*sizeof(typename T::value_type));
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(send_prc.size(),(size_t *)sz.getPointer(),(size_t *)send_prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,NULL);
}
return true;
}
/*
* VCluster_semantic_unit_test.hpp
*
* Created on: Feb 8, 2016
* Author: i-bird
*/
#ifndef OPENFPM_VCLUSTER_SRC_VCLUSTER_SEMANTIC_UNIT_TESTS_HPP_
#define OPENFPM_VCLUSTER_SRC_VCLUSTER_SEMANTIC_UNIT_TESTS_HPP_
struct A
{
size_t a;
float b;
double c;
};
BOOST_AUTO_TEST_SUITE( VCluster_semantic_test )
BOOST_AUTO_TEST_CASE (Vcluster_semantic_gather)
{
Vcluster & vcl = *global_v_cluster;
if (vcl.getProcessingUnits() >= 32)
return;
//! [Gather the data on master]
openfpm::vector<size_t> v1;
v1.resize(vcl.getProcessUnitID());
for(size_t i = 0 ; i < vcl.getProcessUnitID() ; i++)
v1.get(i) = 5;
openfpm::vector<size_t> v2;
vcl.SGather(v1,v2,0);
//! [Gather the data on master]
if (vcl.getProcessUnitID() == 0)
{
size_t n = vcl.getProcessingUnits();
BOOST_REQUIRE_EQUAL(v2.size(),n*(n-1)/2);
bool is_five = true;
for (size_t i = 0 ; i < v2.size() ; i++)
is_five &= (v2.get(i) == 5);
BOOST_REQUIRE_EQUAL(is_five,true);
}
}
BOOST_AUTO_TEST_CASE (Vcluster_semantic_struct_gather)
{
Vcluster & vcl = *global_v_cluster;
if (vcl.getProcessingUnits() >= 32)
return;
//! [Gather the data on master complex]
openfpm::vector<A> v1;
v1.resize(vcl.getProcessUnitID());
for(size_t i = 0 ; i < vcl.getProcessUnitID() ; i++)
{
v1.get(i).a = 5;
v1.get(i).b = 10.0;
v1.get(i).c = 11.0;
}
openfpm::vector<A> v2;
vcl.SGather(v1,v2,0);
//! [Gather the data on master complex]
if (vcl.getProcessUnitID() == 0)
{
size_t n = vcl.getProcessingUnits();
BOOST_REQUIRE_EQUAL(v2.size(),n*(n-1)/2);
bool is_correct = true;
for (size_t i = 0 ; i < v2.size() ; i++)
{
is_correct &= (v2.get(i).a == 5);
is_correct &= (v2.get(i).b == 10.0);
is_correct &= (v2.get(i).c == 11.0);
}
BOOST_REQUIRE_EQUAL(is_correct,true);
}
}
BOOST_AUTO_TEST_SUITE_END()
#endif /* OPENFPM_VCLUSTER_SRC_VCLUSTER_SEMANTIC_UNIT_TESTS_HPP_ */
......@@ -9,3 +9,4 @@
#include "unit_test_init_cleanup.hpp"
#include "VCluster_unit_tests.hpp"
#include "VCluster_semantic_unit_tests.hpp"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment