diff --git a/src/VCluster/VCluster.hpp b/src/VCluster/VCluster.hpp index d90c0faa076f7c5f65bf7f24123498b286f9e741..e96624e27cb46f127d5bb948010618390bf70905 100644 --- a/src/VCluster/VCluster.hpp +++ b/src/VCluster/VCluster.hpp @@ -135,11 +135,16 @@ class Vcluster: public Vcluster_base sz_recv_byte.get(i) = sz_recv.get(i) * sizeof(typename T::value_type); } else - std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option RECEIVE_KNOWN or NO_CHANGE_ELEMENTS" << std::endl; - } + {std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option or NO_CHANGE_ELEMENTS" << std::endl;} - Vcluster_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(), - prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte.getPointer(),msg_alloc_known,(void *)&bi); + Vcluster_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(), + prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte.getPointer(),msg_alloc_known,(void *)&bi); + } + else + { + Vcluster_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(), + prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc_known,(void *)&bi); + } } else { @@ -683,9 +688,10 @@ class Vcluster: public Vcluster_base openfpm::vector & prc_send, openfpm::vector & prc_recv, openfpm::vector & sz_recv, - openfpm::vector & sz_recv_byte) + openfpm::vector & sz_recv_byte, + size_t opt = NONE) { - prepare_send_buffer,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,NONE); + prepare_send_buffer,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt); // operation object op_ssend_recv_add opa; @@ -729,9 +735,10 @@ class Vcluster: public Vcluster_base S & recv, openfpm::vector & prc_send, openfpm::vector & prc_recv, - openfpm::vector & sz_recv) + openfpm::vector & sz_recv, + size_t opt = NONE) { - prepare_send_buffer,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,NONE); + prepare_send_buffer,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt); // operation object op_ssend_recv_add opa; diff --git a/src/VCluster/VCluster_base.hpp b/src/VCluster/VCluster_base.hpp index 6caf62041af4d56fa7b5bccaede6ebdb4639af5c..543f08b2652b516f4af07016cc1f8bab726cba11 100644 --- a/src/VCluster/VCluster_base.hpp +++ b/src/VCluster/VCluster_base.hpp @@ -502,6 +502,64 @@ public: NBX_cnt = (NBX_cnt + 1) % 1024; } + + /*! \brief Send and receive multiple messages + * + * It send multiple messages to a set of processors the and receive + * multiple messages from another set of processors, all the processor must call this + * function + * + * suppose the following situation the calling processor want to communicate + * * 2 vector of 100 integers to processor 1 + * * 1 vector of 50 integers to processor 6 + * * 1 vector of 48 integers to processor 7 + * * 1 vector of 70 integers to processor 8 + * + * \param prc list of processors you should communicate with [1,1,6,7,8] + * + * \param data vector containing the data to send [v=vector>, v.size()=4, T=vector], T at the moment + * is only tested for vectors of 0 or more generic elements (without pointers) + * + * \param msg_alloc This is a call-back with the purpose to allocate space + * for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by + * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have + * the following 6 parameters + * in the call-back in order: + * * message size required to receive the message (100) + * * total message size to receive from all the processors (NBX does not provide this information) + * * the total number of processor want to communicate with you (NBX does not provide this information) + * * processor id (5) + * * ri request id (it is an id that goes from 0 to total_p, and is incremented + * every time message_alloc is called) + * * void pointer, parameter for additional data to pass to the call-back + * + * \param ptr_arg data passed to the call-back function specified + * + * \param opt options, only NONE supported + * + */ + template + void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, + openfpm::vector< T > & data, + void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), + void * ptr_arg, long int opt=NONE) + { +#ifdef SE_CLASS1 + checkType(); +#endif + // resize the pointer list + ptr_send.resize(prc.size()); + sz_send.resize(prc.size()); + + for (size_t i = 0 ; i < prc.size() ; i++) + { + ptr_send.get(i) = data.get(i).getPointer(); + sz_send.get(i) = data.get(i).size() * sizeof(typename T::value_type); + } + + sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt); + } + /*! \brief Send and receive multiple messages * * It send multiple messages to a set of processors the and receive @@ -544,7 +602,11 @@ public: * \param opt options, NONE (ignored in this moment) * */ - void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], size_t n_recv, size_t prc_recv[] , size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE) + void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , + void * ptr[], size_t n_recv, size_t prc_recv[] , + size_t sz_recv[] , + void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), + void * ptr_arg, long int opt=NONE) { // Allocate the buffers @@ -564,60 +626,87 @@ public: NBX_cnt = (NBX_cnt + 1) % 1024; } + openfpm::vector sz_recv_tmp; + /*! \brief Send and receive multiple messages * * It send multiple messages to a set of processors the and receive * multiple messages from another set of processors, all the processor must call this - * function + * function. In this particular case the receiver know from which processor is going + * to receive, but does not know the size. + * * * suppose the following situation the calling processor want to communicate - * * 2 vector of 100 integers to processor 1 - * * 1 vector of 50 integers to processor 6 - * * 1 vector of 48 integers to processor 7 - * * 1 vector of 70 integers to processor 8 + * * 2 messages of size 100 byte to processor 1 + * * 1 message of size 50 byte to processor 6 + * * 1 message of size 48 byte to processor 7 + * * 1 message of size 70 byte to processor 8 * - * \param prc list of processors you should communicate with [1,1,6,7,8] + * \param n_send number of send for this processor [4] * - * \param data vector containing the data to send [v=vector>, v.size()=4, T=vector], T at the moment - * is only tested for vectors of 0 or more generic elements (without pointers) + * \param prc list of processor with which it should communicate + * [1,1,6,7,8] * - * \param msg_alloc This is a call-back with the purpose to allocate space - * for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by + * \param sz the array contain the size of the message for each processor + * (zeros must not be presents) [100,100,50,48,70] + * + * \param ptr array that contain the pointers to the message to send + * + * \param msg_alloc This is a call-back with the purpose of allocate space + * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have * the following 6 parameters - * in the call-back in order: - * * message size required to receive the message (100) + * in the call-back are in order: + * * message size required to receive the message [100] * * total message size to receive from all the processors (NBX does not provide this information) * * the total number of processor want to communicate with you (NBX does not provide this information) - * * processor id (5) + * * processor id [5] * * ri request id (it is an id that goes from 0 to total_p, and is incremented * every time message_alloc is called) * * void pointer, parameter for additional data to pass to the call-back * * \param ptr_arg data passed to the call-back function specified * - * \param opt options, only NONE supported + * \param opt options, NONE (ignored in this moment) * */ - template void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE) + void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , + void * ptr[], size_t n_recv, size_t prc_recv[] , + void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), + void * ptr_arg, long int opt=NONE) { -#ifdef SE_CLASS1 - checkType(); -#endif - // resize the pointer list - ptr_send.resize(prc.size()); - sz_send.resize(prc.size()); + sz_recv_tmp.resize(n_recv); - for (size_t i = 0 ; i < prc.size() ; i++) + // First we understand the receive size for each processor + + for (size_t i = 0 ; i < n_send ; i++) + {send(prc[i],SEND_SPARSE + NBX_cnt,&sz[i],sizeof(size_t));} + + for (size_t i = 0 ; i < n_recv ; i++) + {recv(prc_recv[i],SEND_SPARSE + NBX_cnt,&sz_recv_tmp.get(i),sizeof(size_t));} + + execute(); + + // Circular counter + NBX_cnt = (NBX_cnt + 1) % 1024; + + // Allocate the buffers + + for (size_t i = 0 ; i < n_send ; i++) + {send(prc[i],SEND_SPARSE + NBX_cnt,ptr[i],sz[i]);} + + for (size_t i = 0 ; i < n_recv ; i++) { - ptr_send.get(i) = data.get(i).getPointer(); - sz_send.get(i) = data.get(i).size() * sizeof(typename T::value_type); - } + void * ptr_recv = msg_alloc(sz_recv_tmp.get(i),0,0,prc_recv[i],i,ptr_arg); - sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt); - } + recv(prc_recv[i],SEND_SPARSE + NBX_cnt,ptr_recv,sz_recv_tmp.get(i)); + } + execute(); + // Circular counter + NBX_cnt = (NBX_cnt + 1) % 1024; + } /*! \brief Send and receive multiple messages * @@ -659,7 +748,10 @@ public: * \param opt options, NONE (ignored in this moment) * */ - void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt = NONE) + void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , + void * ptr[], + void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), + void * ptr_arg, long int opt = NONE) { if (stat.size() != 0 || req.size() != 0) std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress. Please remember that if you use function like max(),sum(),send(),recv() check that you did not miss to call the function execute() \n"; diff --git a/src/VCluster/VCluster_semantic_unit_tests.hpp b/src/VCluster/VCluster_semantic_unit_tests.hpp index 4e6e1de44cdb3c9d7fb521e9fe61290968f241db..8bbbfaa3cfe7e22740e4806993485f3000882c7b 100644 --- a/src/VCluster/VCluster_semantic_unit_tests.hpp +++ b/src/VCluster/VCluster_semantic_unit_tests.hpp @@ -637,7 +637,7 @@ BOOST_AUTO_TEST_CASE (Vcluster_semantic_struct_scatter) -BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv) +BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv_all_unknown) { openfpm::vector prc_recv2; openfpm::vector prc_recv3; @@ -738,6 +738,16 @@ BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv) BOOST_REQUIRE_EQUAL(match,true); } +} + + +BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv_receive_size_known) +{ + openfpm::vector prc_recv2; + openfpm::vector prc_recv3; + + openfpm::vector sz_recv2; + openfpm::vector sz_recv3; for (size_t i = 0 ; i < 100 ; i++) { @@ -806,6 +816,81 @@ BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv) } + +BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv_receive_known) +{ + openfpm::vector prc_recv2; + openfpm::vector prc_recv3; + + openfpm::vector sz_recv2; + openfpm::vector sz_recv3; + + for (size_t i = 0 ; i < 100 ; i++) + { + Vcluster & vcl = create_vcluster(); + + if (vcl.getProcessUnitID() == 0 && i == 0) + std::cout << "Semantic sendrecv test start" << std::endl; + + + if (vcl.getProcessingUnits() >= 32) + return; + + openfpm::vector prc_send; + + openfpm::vector> v1; + openfpm::vector v2; + openfpm::vector> v3; + + v1.resize(vcl.getProcessingUnits()); + + size_t nc = vcl.getProcessingUnits() / SSCATTER_MAX; + size_t nr = vcl.getProcessingUnits() - nc * SSCATTER_MAX; + nr = ((nr-1) * nr) / 2; + + size_t n_ele = nc * SSCATTER_MAX * (SSCATTER_MAX - 1) / 2 + nr; + + for(size_t i = 0 ; i < v1.size() ; i++) + { + for (size_t j = 0 ; j < i % SSCATTER_MAX ; j++) + v1.get(i).add(j); + + prc_send.add((i + vcl.getProcessUnitID()) % vcl.getProcessingUnits()); + } + + vcl.SSendRecv(v1,v2,prc_send,prc_recv2,sz_recv2,RECEIVE_KNOWN); + vcl.SSendRecv(v1,v3,prc_send,prc_recv3,sz_recv3); + + BOOST_REQUIRE_EQUAL(v2.size(),n_ele); + size_t nc_check = (vcl.getProcessingUnits()-1) / SSCATTER_MAX; + BOOST_REQUIRE_EQUAL(v3.size(),vcl.getProcessingUnits()-1-nc_check); + + bool match = true; + size_t s = 0; + + for (size_t i = 0 ; i < sz_recv2.size() ; i++) + { + for (size_t j = 0 ; j < sz_recv2.get(i); j++) + { + match &= v2.get(s+j) == j; + } + s += sz_recv2.get(i); + } + + BOOST_REQUIRE_EQUAL(match,true); + + for (size_t i = 0 ; i < v3.size() ; i++) + { + for (size_t j = 0 ; j < v3.get(i).size() ; j++) + { + match &= v3.get(i).get(j) == j; + } + } + + BOOST_REQUIRE_EQUAL(match,true); + } +} + BOOST_AUTO_TEST_CASE (Vcluster_semantic_struct_sendrecv) { for (size_t i = 0 ; i < 100 ; i++) diff --git a/src/VCluster/VCluster_unit_test_util.hpp b/src/VCluster/VCluster_unit_test_util.hpp index 6ed9463a5e90284dd3bd680b4fb1033e17cfea31..5716a522abc1ea8f3c3ff32d80ce6c0dda37adf4 100644 --- a/src/VCluster/VCluster_unit_test_util.hpp +++ b/src/VCluster/VCluster_unit_test_util.hpp @@ -12,6 +12,9 @@ #include "VCluster_base.hpp" #include "Vector/vector_test_util.hpp" +#define RECEIVE_UNKNOWN 1 +#define RECEIVE_SIZE_UNKNOWN 2 + #define NBX 1 #define N_TRY 2 @@ -321,7 +324,7 @@ template void test_known() } } -template void test() +template void test(unsigned int opt) { Vcluster & vcl = create_vcluster(); @@ -388,10 +391,51 @@ template void test() recv_message.get(p_id).resize(j); } - // Send and receive - vcl.sendrecvMultipleMessagesNBX(prc,message,msg_alloc,&recv_message); - + if (opt == RECEIVE_UNKNOWN) + { + // Send and receive + vcl.sendrecvMultipleMessagesNBX(prc,message,msg_alloc,&recv_message); + } //! [dsde] + else if (opt == RECEIVE_SIZE_UNKNOWN) + { + openfpm::vector sz_send; + openfpm::vector ptr; + + openfpm::vector prc_recv; + + sz_send.resize(prc.size()); + ptr.resize(prc.size()); + + for (size_t i = 0 ; i < prc.size() ; i++) + { + sz_send.get(i) = message.get(i).size(); + ptr.get(i) = &message.get(i).get(0); + } + + // Calculate the receiving part + + // Check the message + for (size_t i = 0 ; i < 8 && i < n_proc ; i++) + { + long int p_id = vcl.getProcessUnitID() - i - 1; + if (p_id < 0) + {p_id += n_proc;} + else + {p_id = p_id % n_proc;} + + if (p_id != (long int)vcl.getProcessUnitID()) + { + prc_recv.add(p_id); + } + } + + //! [dsde] + + // Send and receive + vcl.sendrecvMultipleMessagesNBX(prc.size(),&sz_send.get(0),&prc.get(0), + &ptr.get(0),prc_recv.size(),&prc_recv.get(0),msg_alloc,&recv_message); + } #ifdef VERBOSE_TEST timer t; @@ -437,6 +481,9 @@ template void test() } } + if (opt == RECEIVE_SIZE_UNKNOWN) + {return;} + std::srand(create_vcluster().getProcessUnitID()); std::default_random_engine eg; std::uniform_int_distribution d(0,n_proc/8); diff --git a/src/VCluster/VCluster_unit_tests.hpp b/src/VCluster/VCluster_unit_tests.hpp index a8d84a745ac959dbf7f185623355f6bcb177323a..f8a0dd041542df6ffbfa34c8f6e0fbe2e962db68 100644 --- a/src/VCluster/VCluster_unit_tests.hpp +++ b/src/VCluster/VCluster_unit_tests.hpp @@ -181,7 +181,7 @@ BOOST_AUTO_TEST_CASE( VCluster_use_sendrecv) std::cout << "VCluster unit test start sendrecv" << "\n"; totp_check = false; - test(); + test(RECEIVE_UNKNOWN); totp_check = false; test_no_send_some_peer(); @@ -189,6 +189,19 @@ BOOST_AUTO_TEST_CASE( VCluster_use_sendrecv) std::cout << "VCluster unit test stop sendrecv" << "\n"; } +BOOST_AUTO_TEST_CASE( VCluster_use_sendrecv_size_known) +{ + std::cout << "VCluster unit test start sendrecv known size" << "\n"; + + totp_check = false; + test(RECEIVE_SIZE_UNKNOWN); + + totp_check = false; + test_no_send_some_peer(); + + std::cout << "VCluster unit test stop sendrecv known size" << "\n"; +} + BOOST_AUTO_TEST_CASE( VCluster_use_sendrecv_known) { std::cout << "VCluster unit test start known" << "\n";