diff --git a/src/VCluster.hpp b/src/VCluster.hpp index 7a113545d9dfaf37140efc5ff89e1c94d9cc15e2..51b056fe8ff9fbe10defe226821a5bb13b4f110e 100644 --- a/src/VCluster.hpp +++ b/src/VCluster.hpp @@ -160,6 +160,9 @@ class Vcluster // sending map openfpm::vector map; + // Receive buffers + openfpm::vector recv_buf; + // barrier request MPI_Request bar_req; // barrier status @@ -347,56 +350,6 @@ public: MPI_IallreduceW::reduce(num,MPI_MAX,req.last()); } - /*! \brief Send and receive multiple messages - * - * It send multiple messages to a set of processors the and receive - * multiple messages from another set of processors, all the processor must call this - * function, NBX is more performant than PCX with more processors (1000+) - * - * suppose the following situation the calling processor want to communicate - * * 2 vector of 100 integers to processor 1 - * * 1 vector of 50 integers to processor 6 - * * 1 vector of 48 integers to processor 7 - * * 1 vector of 70 integers to processor 8 - * - * \param prc list of processors you should communicate with [1,1,6,7,8] - * - * \param v vector containing the data to send [v=vector>, v.size()=4, T=vector], T at the moment - * is only tested for vectors of 0 or more generic elements (without pointers) - * - * \param msg_alloc This is a call-back with the purpose to allocate space - * for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by - * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have - * the following 6 parameters - * in the call-back in order: - * * message size required to receive the message (100) - * * total message size to receive from all the processors (NBX does not provide this information) - * * the total number of processor want to communicate with you (NBX does not provide this information) - * * processor id (5) - * * ri request id (it is an id that goes from 0 to total_p, and is incremented - * every time message_alloc is called) - * * void pointer, parameter for additional data to pass to the call-back - * - * \param opt options, only NONE supported - * - */ - template void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, openfpm::vector< T * > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE) - { -#ifdef DEBUG - checkType(); -#endif - // resize the pointer list - ptr_send.resize(prc.size()); - sz_send.resize(prc.size()); - - for (size_t i = 0 ; i < prc.size() ; i++) - { - ptr_send.get(i) = data.get(i)->getPointer(); - sz_send.get(i) = data.get(i)->size() * sizeof(typename T::value_type); - } - - sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt); - } /*! \brief Send and receive multiple messages * @@ -548,7 +501,7 @@ public: * \param opt options, NONE (ignored in this moment) * */ - void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt) + void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt = NONE) { if (stat.size() != 0 || req.size() != 0) std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress \n"; @@ -981,6 +934,8 @@ public: } + /////////////////////// Semantic communication /////////////////////// + #include "VCluster_semantic.ipp" }; // Function to initialize the global VCluster // diff --git a/src/VCluster_semantic.ipp b/src/VCluster_semantic.ipp new file mode 100644 index 0000000000000000000000000000000000000000..7ef505af67cbf6bf477a23c2b68c260c1c702950 --- /dev/null +++ b/src/VCluster_semantic.ipp @@ -0,0 +1,112 @@ +/* + * VCluster_semantic.hpp + * + * Implementation of semantic communications + * + * Created on: Feb 8, 2016 + * Author: Pietro Incardona + */ + +/*! \brief Call-back to allocate buffer to receive data + * + * \param msg_i size required to receive the message from i + * \param total_msg total size to receive from all the processors + * \param total_p the total number of processor that want to communicate with you + * \param i processor id + * \param ri request id (it is an id that goes from 0 to total_p, and is unique + * every time message_alloc is called) + * \param ptr a pointer to the vector_dist structure + * + * \return the pointer where to store the message for the processor i + * + */ +static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr) +{ + openfpm::vector * recv_buf = (openfpm::vector *)ptr; + + if (recv_buf == NULL) + std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n"; + + // We need one more slot + recv_buf->add(); + + recv_buf->last().resize(msg_i); + + // return the pointer + return recv_buf->last().getPointer(); +} + +/*! \brief Semantic Gather, gather the data from all processors into one node + * + * Semantic communication differ from the normal one. They in general + * follow the following model. + * + * Gather(T,S,root,op=add); + * + * "Gather" indicate the communication pattern, or how the information flow + * T is the object to send, S is the object that will receive the data. + * In order to work S must implement the interface S.add(T). + * + * ### Example send a vector of structures, and merge all together in one vector + * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master + * + * ### Example send a vector of structures, and merge all together in one vector + * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex + * + * \param Object to send + * \param Object to receive + * \param root witch node should collect the information + * + * \return true if the function completed succefully + * + */ +template bool SGather(T & send, S & recv,size_t root) +{ + // If we are on master collect the information + if (getProcessUnitID() == root) + { + // send buffer (master does not send anything) so send req and send_buf + // remain buffer with size 0 + openfpm::vector send_req; + + // Send and recv multiple messages + sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&recv_buf); + + for (size_t i = 0 ; i < recv_buf.size() ; i++) + { + // for each received buffer create a memory reppresentation + // calculate the number of received elements + size_t n_ele = recv_buf.get(i).size() / sizeof(typename T::value_type); + + // add the received particles to the vector + PtrMemory * ptr1 = new PtrMemory(recv_buf.get(i).getPointer(),recv_buf.get(i).size()); + + // create vector representation to a piece of memory already allocated + openfpm::vector v2; + + v2.setMemory(*ptr1); + + // resize with the number of elements + v2.resize(n_ele); + + // Merge the information + recv.add(v2); + } + } + else + { + // send buffer (master does not send anything) so send req and send_buf + // remain buffer with size 0 + openfpm::vector send_prc; + send_prc.add(0); + openfpm::vector send_buf; + send_buf.add(send.getPointer()); + openfpm::vector sz; + sz.add(send.size()*sizeof(typename T::value_type)); + + // Send and recv multiple messages + sendrecvMultipleMessagesNBX(send_prc.size(),(size_t *)sz.getPointer(),(size_t *)send_prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,NULL); + } + + return true; +} diff --git a/src/VCluster_semantic_unit_tests.hpp b/src/VCluster_semantic_unit_tests.hpp new file mode 100644 index 0000000000000000000000000000000000000000..0b8e69edf2aa140e518dc2c15996e589c4d34df3 --- /dev/null +++ b/src/VCluster_semantic_unit_tests.hpp @@ -0,0 +1,101 @@ +/* + * VCluster_semantic_unit_test.hpp + * + * Created on: Feb 8, 2016 + * Author: i-bird + */ + +#ifndef OPENFPM_VCLUSTER_SRC_VCLUSTER_SEMANTIC_UNIT_TESTS_HPP_ +#define OPENFPM_VCLUSTER_SRC_VCLUSTER_SEMANTIC_UNIT_TESTS_HPP_ + +struct A +{ + size_t a; + float b; + double c; +}; + + +BOOST_AUTO_TEST_SUITE( VCluster_semantic_test ) + +BOOST_AUTO_TEST_CASE (Vcluster_semantic_gather) +{ + Vcluster & vcl = *global_v_cluster; + + if (vcl.getProcessingUnits() >= 32) + return; + + //! [Gather the data on master] + + openfpm::vector v1; + v1.resize(vcl.getProcessUnitID()); + + for(size_t i = 0 ; i < vcl.getProcessUnitID() ; i++) + v1.get(i) = 5; + + openfpm::vector v2; + + vcl.SGather(v1,v2,0); + + //! [Gather the data on master] + + if (vcl.getProcessUnitID() == 0) + { + size_t n = vcl.getProcessingUnits(); + BOOST_REQUIRE_EQUAL(v2.size(),n*(n-1)/2); + + bool is_five = true; + for (size_t i = 0 ; i < v2.size() ; i++) + is_five &= (v2.get(i) == 5); + + BOOST_REQUIRE_EQUAL(is_five,true); + } +} + + +BOOST_AUTO_TEST_CASE (Vcluster_semantic_struct_gather) +{ + Vcluster & vcl = *global_v_cluster; + + if (vcl.getProcessingUnits() >= 32) + return; + + //! [Gather the data on master complex] + + openfpm::vector v1; + v1.resize(vcl.getProcessUnitID()); + + for(size_t i = 0 ; i < vcl.getProcessUnitID() ; i++) + { + v1.get(i).a = 5; + v1.get(i).b = 10.0; + v1.get(i).c = 11.0; + } + + openfpm::vector v2; + + vcl.SGather(v1,v2,0); + + //! [Gather the data on master complex] + + if (vcl.getProcessUnitID() == 0) + { + size_t n = vcl.getProcessingUnits(); + BOOST_REQUIRE_EQUAL(v2.size(),n*(n-1)/2); + + bool is_correct = true; + for (size_t i = 0 ; i < v2.size() ; i++) + { + is_correct &= (v2.get(i).a == 5); + is_correct &= (v2.get(i).b == 10.0); + is_correct &= (v2.get(i).c == 11.0); + } + + BOOST_REQUIRE_EQUAL(is_correct,true); + } +} + + +BOOST_AUTO_TEST_SUITE_END() + +#endif /* OPENFPM_VCLUSTER_SRC_VCLUSTER_SEMANTIC_UNIT_TESTS_HPP_ */ diff --git a/src/main.cpp b/src/main.cpp index 5f9a8caac55cdfe0b006822d4416933fc528e423..042076eb4764840a5601b37ea9d6013698d6ecac 100755 --- a/src/main.cpp +++ b/src/main.cpp @@ -9,3 +9,4 @@ #include "unit_test_init_cleanup.hpp" #include "VCluster_unit_tests.hpp" +#include "VCluster_semantic_unit_tests.hpp"