From 184c3c9a472050639f95246c75e67d54ae12e074 Mon Sep 17 00:00:00 2001 From: Pietro Incardona Date: Thu, 28 Dec 2017 01:25:47 +0100 Subject: [PATCH] Vcluster adding RECEIVE KNOWN --- src/VCluster/VCluster.hpp | 23 ++- src/VCluster/VCluster_base.hpp | 150 ++++++++++++++---- src/VCluster/VCluster_semantic_unit_tests.hpp | 87 +++++++++- src/VCluster/VCluster_unit_test_util.hpp | 55 ++++++- src/VCluster/VCluster_unit_tests.hpp | 15 +- 5 files changed, 287 insertions(+), 43 deletions(-) diff --git a/src/VCluster/VCluster.hpp b/src/VCluster/VCluster.hpp index d90c0fa..e96624e 100644 --- a/src/VCluster/VCluster.hpp +++ b/src/VCluster/VCluster.hpp @@ -135,11 +135,16 @@ class Vcluster: public Vcluster_base sz_recv_byte.get(i) = sz_recv.get(i) * sizeof(typename T::value_type); } else - std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option RECEIVE_KNOWN or NO_CHANGE_ELEMENTS" << std::endl; - } + {std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option or NO_CHANGE_ELEMENTS" << std::endl;} - Vcluster_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(), - prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte.getPointer(),msg_alloc_known,(void *)&bi); + Vcluster_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(), + prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte.getPointer(),msg_alloc_known,(void *)&bi); + } + else + { + Vcluster_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(), + prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc_known,(void *)&bi); + } } else { @@ -683,9 +688,10 @@ class Vcluster: public Vcluster_base openfpm::vector & prc_send, openfpm::vector & prc_recv, openfpm::vector & sz_recv, - openfpm::vector & sz_recv_byte) + openfpm::vector & sz_recv_byte, + size_t opt = NONE) { - prepare_send_buffer,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,NONE); + prepare_send_buffer,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt); // operation object op_ssend_recv_add opa; @@ -729,9 +735,10 @@ class Vcluster: public Vcluster_base S & recv, openfpm::vector & prc_send, openfpm::vector & prc_recv, - openfpm::vector & sz_recv) + openfpm::vector & sz_recv, + size_t opt = NONE) { - prepare_send_buffer,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,NONE); + prepare_send_buffer,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt); // operation object op_ssend_recv_add opa; diff --git a/src/VCluster/VCluster_base.hpp b/src/VCluster/VCluster_base.hpp index 6caf620..543f08b 100644 --- a/src/VCluster/VCluster_base.hpp +++ b/src/VCluster/VCluster_base.hpp @@ -502,6 +502,64 @@ public: NBX_cnt = (NBX_cnt + 1) % 1024; } + + /*! \brief Send and receive multiple messages + * + * It send multiple messages to a set of processors the and receive + * multiple messages from another set of processors, all the processor must call this + * function + * + * suppose the following situation the calling processor want to communicate + * * 2 vector of 100 integers to processor 1 + * * 1 vector of 50 integers to processor 6 + * * 1 vector of 48 integers to processor 7 + * * 1 vector of 70 integers to processor 8 + * + * \param prc list of processors you should communicate with [1,1,6,7,8] + * + * \param data vector containing the data to send [v=vector>, v.size()=4, T=vector], T at the moment + * is only tested for vectors of 0 or more generic elements (without pointers) + * + * \param msg_alloc This is a call-back with the purpose to allocate space + * for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by + * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have + * the following 6 parameters + * in the call-back in order: + * * message size required to receive the message (100) + * * total message size to receive from all the processors (NBX does not provide this information) + * * the total number of processor want to communicate with you (NBX does not provide this information) + * * processor id (5) + * * ri request id (it is an id that goes from 0 to total_p, and is incremented + * every time message_alloc is called) + * * void pointer, parameter for additional data to pass to the call-back + * + * \param ptr_arg data passed to the call-back function specified + * + * \param opt options, only NONE supported + * + */ + template + void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, + openfpm::vector< T > & data, + void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), + void * ptr_arg, long int opt=NONE) + { +#ifdef SE_CLASS1 + checkType(); +#endif + // resize the pointer list + ptr_send.resize(prc.size()); + sz_send.resize(prc.size()); + + for (size_t i = 0 ; i < prc.size() ; i++) + { + ptr_send.get(i) = data.get(i).getPointer(); + sz_send.get(i) = data.get(i).size() * sizeof(typename T::value_type); + } + + sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt); + } + /*! \brief Send and receive multiple messages * * It send multiple messages to a set of processors the and receive @@ -544,7 +602,11 @@ public: * \param opt options, NONE (ignored in this moment) * */ - void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], size_t n_recv, size_t prc_recv[] , size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE) + void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , + void * ptr[], size_t n_recv, size_t prc_recv[] , + size_t sz_recv[] , + void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), + void * ptr_arg, long int opt=NONE) { // Allocate the buffers @@ -564,60 +626,87 @@ public: NBX_cnt = (NBX_cnt + 1) % 1024; } + openfpm::vector sz_recv_tmp; + /*! \brief Send and receive multiple messages * * It send multiple messages to a set of processors the and receive * multiple messages from another set of processors, all the processor must call this - * function + * function. In this particular case the receiver know from which processor is going + * to receive, but does not know the size. + * * * suppose the following situation the calling processor want to communicate - * * 2 vector of 100 integers to processor 1 - * * 1 vector of 50 integers to processor 6 - * * 1 vector of 48 integers to processor 7 - * * 1 vector of 70 integers to processor 8 + * * 2 messages of size 100 byte to processor 1 + * * 1 message of size 50 byte to processor 6 + * * 1 message of size 48 byte to processor 7 + * * 1 message of size 70 byte to processor 8 * - * \param prc list of processors you should communicate with [1,1,6,7,8] + * \param n_send number of send for this processor [4] * - * \param data vector containing the data to send [v=vector>, v.size()=4, T=vector], T at the moment - * is only tested for vectors of 0 or more generic elements (without pointers) + * \param prc list of processor with which it should communicate + * [1,1,6,7,8] * - * \param msg_alloc This is a call-back with the purpose to allocate space - * for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by + * \param sz the array contain the size of the message for each processor + * (zeros must not be presents) [100,100,50,48,70] + * + * \param ptr array that contain the pointers to the message to send + * + * \param msg_alloc This is a call-back with the purpose of allocate space + * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have * the following 6 parameters - * in the call-back in order: - * * message size required to receive the message (100) + * in the call-back are in order: + * * message size required to receive the message [100] * * total message size to receive from all the processors (NBX does not provide this information) * * the total number of processor want to communicate with you (NBX does not provide this information) - * * processor id (5) + * * processor id [5] * * ri request id (it is an id that goes from 0 to total_p, and is incremented * every time message_alloc is called) * * void pointer, parameter for additional data to pass to the call-back * * \param ptr_arg data passed to the call-back function specified * - * \param opt options, only NONE supported + * \param opt options, NONE (ignored in this moment) * */ - template void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE) + void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , + void * ptr[], size_t n_recv, size_t prc_recv[] , + void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), + void * ptr_arg, long int opt=NONE) { -#ifdef SE_CLASS1 - checkType(); -#endif - // resize the pointer list - ptr_send.resize(prc.size()); - sz_send.resize(prc.size()); + sz_recv_tmp.resize(n_recv); - for (size_t i = 0 ; i < prc.size() ; i++) + // First we understand the receive size for each processor + + for (size_t i = 0 ; i < n_send ; i++) + {send(prc[i],SEND_SPARSE + NBX_cnt,&sz[i],sizeof(size_t));} + + for (size_t i = 0 ; i < n_recv ; i++) + {recv(prc_recv[i],SEND_SPARSE + NBX_cnt,&sz_recv_tmp.get(i),sizeof(size_t));} + + execute(); + + // Circular counter + NBX_cnt = (NBX_cnt + 1) % 1024; + + // Allocate the buffers + + for (size_t i = 0 ; i < n_send ; i++) + {send(prc[i],SEND_SPARSE + NBX_cnt,ptr[i],sz[i]);} + + for (size_t i = 0 ; i < n_recv ; i++) { - ptr_send.get(i) = data.get(i).getPointer(); - sz_send.get(i) = data.get(i).size() * sizeof(typename T::value_type); - } + void * ptr_recv = msg_alloc(sz_recv_tmp.get(i),0,0,prc_recv[i],i,ptr_arg); - sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt); - } + recv(prc_recv[i],SEND_SPARSE + NBX_cnt,ptr_recv,sz_recv_tmp.get(i)); + } + execute(); + // Circular counter + NBX_cnt = (NBX_cnt + 1) % 1024; + } /*! \brief Send and receive multiple messages * @@ -659,7 +748,10 @@ public: * \param opt options, NONE (ignored in this moment) * */ - void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt = NONE) + void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , + void * ptr[], + void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), + void * ptr_arg, long int opt = NONE) { if (stat.size() != 0 || req.size() != 0) std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress. Please remember that if you use function like max(),sum(),send(),recv() check that you did not miss to call the function execute() \n"; diff --git a/src/VCluster/VCluster_semantic_unit_tests.hpp b/src/VCluster/VCluster_semantic_unit_tests.hpp index 4e6e1de..8bbbfaa 100644 --- a/src/VCluster/VCluster_semantic_unit_tests.hpp +++ b/src/VCluster/VCluster_semantic_unit_tests.hpp @@ -637,7 +637,7 @@ BOOST_AUTO_TEST_CASE (Vcluster_semantic_struct_scatter) -BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv) +BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv_all_unknown) { openfpm::vector prc_recv2; openfpm::vector prc_recv3; @@ -738,6 +738,16 @@ BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv) BOOST_REQUIRE_EQUAL(match,true); } +} + + +BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv_receive_size_known) +{ + openfpm::vector prc_recv2; + openfpm::vector prc_recv3; + + openfpm::vector sz_recv2; + openfpm::vector sz_recv3; for (size_t i = 0 ; i < 100 ; i++) { @@ -806,6 +816,81 @@ BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv) } + +BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv_receive_known) +{ + openfpm::vector prc_recv2; + openfpm::vector prc_recv3; + + openfpm::vector sz_recv2; + openfpm::vector sz_recv3; + + for (size_t i = 0 ; i < 100 ; i++) + { + Vcluster & vcl = create_vcluster(); + + if (vcl.getProcessUnitID() == 0 && i == 0) + std::cout << "Semantic sendrecv test start" << std::endl; + + + if (vcl.getProcessingUnits() >= 32) + return; + + openfpm::vector prc_send; + + openfpm::vector> v1; + openfpm::vector v2; + openfpm::vector> v3; + + v1.resize(vcl.getProcessingUnits()); + + size_t nc = vcl.getProcessingUnits() / SSCATTER_MAX; + size_t nr = vcl.getProcessingUnits() - nc * SSCATTER_MAX; + nr = ((nr-1) * nr) / 2; + + size_t n_ele = nc * SSCATTER_MAX * (SSCATTER_MAX - 1) / 2 + nr; + + for(size_t i = 0 ; i < v1.size() ; i++) + { + for (size_t j = 0 ; j < i % SSCATTER_MAX ; j++) + v1.get(i).add(j); + + prc_send.add((i + vcl.getProcessUnitID()) % vcl.getProcessingUnits()); + } + + vcl.SSendRecv(v1,v2,prc_send,prc_recv2,sz_recv2,RECEIVE_KNOWN); + vcl.SSendRecv(v1,v3,prc_send,prc_recv3,sz_recv3); + + BOOST_REQUIRE_EQUAL(v2.size(),n_ele); + size_t nc_check = (vcl.getProcessingUnits()-1) / SSCATTER_MAX; + BOOST_REQUIRE_EQUAL(v3.size(),vcl.getProcessingUnits()-1-nc_check); + + bool match = true; + size_t s = 0; + + for (size_t i = 0 ; i < sz_recv2.size() ; i++) + { + for (size_t j = 0 ; j < sz_recv2.get(i); j++) + { + match &= v2.get(s+j) == j; + } + s += sz_recv2.get(i); + } + + BOOST_REQUIRE_EQUAL(match,true); + + for (size_t i = 0 ; i < v3.size() ; i++) + { + for (size_t j = 0 ; j < v3.get(i).size() ; j++) + { + match &= v3.get(i).get(j) == j; + } + } + + BOOST_REQUIRE_EQUAL(match,true); + } +} + BOOST_AUTO_TEST_CASE (Vcluster_semantic_struct_sendrecv) { for (size_t i = 0 ; i < 100 ; i++) diff --git a/src/VCluster/VCluster_unit_test_util.hpp b/src/VCluster/VCluster_unit_test_util.hpp index 6ed9463..5716a52 100644 --- a/src/VCluster/VCluster_unit_test_util.hpp +++ b/src/VCluster/VCluster_unit_test_util.hpp @@ -12,6 +12,9 @@ #include "VCluster_base.hpp" #include "Vector/vector_test_util.hpp" +#define RECEIVE_UNKNOWN 1 +#define RECEIVE_SIZE_UNKNOWN 2 + #define NBX 1 #define N_TRY 2 @@ -321,7 +324,7 @@ template void test_known() } } -template void test() +template void test(unsigned int opt) { Vcluster & vcl = create_vcluster(); @@ -388,10 +391,51 @@ template void test() recv_message.get(p_id).resize(j); } - // Send and receive - vcl.sendrecvMultipleMessagesNBX(prc,message,msg_alloc,&recv_message); - + if (opt == RECEIVE_UNKNOWN) + { + // Send and receive + vcl.sendrecvMultipleMessagesNBX(prc,message,msg_alloc,&recv_message); + } //! [dsde] + else if (opt == RECEIVE_SIZE_UNKNOWN) + { + openfpm::vector sz_send; + openfpm::vector ptr; + + openfpm::vector prc_recv; + + sz_send.resize(prc.size()); + ptr.resize(prc.size()); + + for (size_t i = 0 ; i < prc.size() ; i++) + { + sz_send.get(i) = message.get(i).size(); + ptr.get(i) = &message.get(i).get(0); + } + + // Calculate the receiving part + + // Check the message + for (size_t i = 0 ; i < 8 && i < n_proc ; i++) + { + long int p_id = vcl.getProcessUnitID() - i - 1; + if (p_id < 0) + {p_id += n_proc;} + else + {p_id = p_id % n_proc;} + + if (p_id != (long int)vcl.getProcessUnitID()) + { + prc_recv.add(p_id); + } + } + + //! [dsde] + + // Send and receive + vcl.sendrecvMultipleMessagesNBX(prc.size(),&sz_send.get(0),&prc.get(0), + &ptr.get(0),prc_recv.size(),&prc_recv.get(0),msg_alloc,&recv_message); + } #ifdef VERBOSE_TEST timer t; @@ -437,6 +481,9 @@ template void test() } } + if (opt == RECEIVE_SIZE_UNKNOWN) + {return;} + std::srand(create_vcluster().getProcessUnitID()); std::default_random_engine eg; std::uniform_int_distribution d(0,n_proc/8); diff --git a/src/VCluster/VCluster_unit_tests.hpp b/src/VCluster/VCluster_unit_tests.hpp index a8d84a7..f8a0dd0 100644 --- a/src/VCluster/VCluster_unit_tests.hpp +++ b/src/VCluster/VCluster_unit_tests.hpp @@ -181,7 +181,7 @@ BOOST_AUTO_TEST_CASE( VCluster_use_sendrecv) std::cout << "VCluster unit test start sendrecv" << "\n"; totp_check = false; - test(); + test(RECEIVE_UNKNOWN); totp_check = false; test_no_send_some_peer(); @@ -189,6 +189,19 @@ BOOST_AUTO_TEST_CASE( VCluster_use_sendrecv) std::cout << "VCluster unit test stop sendrecv" << "\n"; } +BOOST_AUTO_TEST_CASE( VCluster_use_sendrecv_size_known) +{ + std::cout << "VCluster unit test start sendrecv known size" << "\n"; + + totp_check = false; + test(RECEIVE_SIZE_UNKNOWN); + + totp_check = false; + test_no_send_some_peer(); + + std::cout << "VCluster unit test stop sendrecv known size" << "\n"; +} + BOOST_AUTO_TEST_CASE( VCluster_use_sendrecv_known) { std::cout << "VCluster unit test start known" << "\n"; -- GitLab