Commit 184c3c9a authored by incardon's avatar incardon

Vcluster adding RECEIVE KNOWN

parent f551f4b7
......@@ -135,11 +135,16 @@ class Vcluster: public Vcluster_base
sz_recv_byte.get(i) = sz_recv.get(i) * sizeof(typename T::value_type);
}
else
std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option RECEIVE_KNOWN or NO_CHANGE_ELEMENTS" << std::endl;
}
{std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option or NO_CHANGE_ELEMENTS" << std::endl;}
Vcluster_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte.getPointer(),msg_alloc_known,(void *)&bi);
Vcluster_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte.getPointer(),msg_alloc_known,(void *)&bi);
}
else
{
Vcluster_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc_known,(void *)&bi);
}
}
else
{
......@@ -683,9 +688,10 @@ class Vcluster: public Vcluster_base
openfpm::vector<size_t> & prc_send,
openfpm::vector<size_t> & prc_recv,
openfpm::vector<size_t> & sz_recv,
openfpm::vector<size_t> & sz_recv_byte)
openfpm::vector<size_t> & sz_recv_byte,
size_t opt = NONE)
{
prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,NONE);
prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
// operation object
op_ssend_recv_add<void> opa;
......@@ -729,9 +735,10 @@ class Vcluster: public Vcluster_base
S & recv,
openfpm::vector<size_t> & prc_send,
openfpm::vector<size_t> & prc_recv,
openfpm::vector<size_t> & sz_recv)
openfpm::vector<size_t> & sz_recv,
size_t opt = NONE)
{
prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,NONE);
prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
// operation object
op_ssend_recv_add<void> opa;
......
......@@ -502,6 +502,64 @@ public:
NBX_cnt = (NBX_cnt + 1) % 1024;
}
/*! \brief Send and receive multiple messages
*
* It send multiple messages to a set of processors the and receive
* multiple messages from another set of processors, all the processor must call this
* function
*
* suppose the following situation the calling processor want to communicate
* * 2 vector of 100 integers to processor 1
* * 1 vector of 50 integers to processor 6
* * 1 vector of 48 integers to processor 7
* * 1 vector of 70 integers to processor 8
*
* \param prc list of processors you should communicate with [1,1,6,7,8]
*
* \param data vector containing the data to send [v=vector<vector<int>>, v.size()=4, T=vector<int>], T at the moment
* is only tested for vectors of 0 or more generic elements (without pointers)
*
* \param msg_alloc This is a call-back with the purpose to allocate space
* for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by
* the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
* the following 6 parameters
* in the call-back in order:
* * message size required to receive the message (100)
* * total message size to receive from all the processors (NBX does not provide this information)
* * the total number of processor want to communicate with you (NBX does not provide this information)
* * processor id (5)
* * ri request id (it is an id that goes from 0 to total_p, and is incremented
* every time message_alloc is called)
* * void pointer, parameter for additional data to pass to the call-back
*
* \param ptr_arg data passed to the call-back function specified
*
* \param opt options, only NONE supported
*
*/
template<typename T>
void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc,
openfpm::vector< T > & data,
void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *),
void * ptr_arg, long int opt=NONE)
{
#ifdef SE_CLASS1
checkType<typename T::value_type>();
#endif
// resize the pointer list
ptr_send.resize(prc.size());
sz_send.resize(prc.size());
for (size_t i = 0 ; i < prc.size() ; i++)
{
ptr_send.get(i) = data.get(i).getPointer();
sz_send.get(i) = data.get(i).size() * sizeof(typename T::value_type);
}
sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt);
}
/*! \brief Send and receive multiple messages
*
* It send multiple messages to a set of processors the and receive
......@@ -544,7 +602,11 @@ public:
* \param opt options, NONE (ignored in this moment)
*
*/
void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], size_t n_recv, size_t prc_recv[] , size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE)
void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] ,
void * ptr[], size_t n_recv, size_t prc_recv[] ,
size_t sz_recv[] ,
void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *),
void * ptr_arg, long int opt=NONE)
{
// Allocate the buffers
......@@ -564,60 +626,87 @@ public:
NBX_cnt = (NBX_cnt + 1) % 1024;
}
openfpm::vector<size_t> sz_recv_tmp;
/*! \brief Send and receive multiple messages
*
* It send multiple messages to a set of processors the and receive
* multiple messages from another set of processors, all the processor must call this
* function
* function. In this particular case the receiver know from which processor is going
* to receive, but does not know the size.
*
*
* suppose the following situation the calling processor want to communicate
* * 2 vector of 100 integers to processor 1
* * 1 vector of 50 integers to processor 6
* * 1 vector of 48 integers to processor 7
* * 1 vector of 70 integers to processor 8
* * 2 messages of size 100 byte to processor 1
* * 1 message of size 50 byte to processor 6
* * 1 message of size 48 byte to processor 7
* * 1 message of size 70 byte to processor 8
*
* \param prc list of processors you should communicate with [1,1,6,7,8]
* \param n_send number of send for this processor [4]
*
* \param data vector containing the data to send [v=vector<vector<int>>, v.size()=4, T=vector<int>], T at the moment
* is only tested for vectors of 0 or more generic elements (without pointers)
* \param prc list of processor with which it should communicate
* [1,1,6,7,8]
*
* \param msg_alloc This is a call-back with the purpose to allocate space
* for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by
* \param sz the array contain the size of the message for each processor
* (zeros must not be presents) [100,100,50,48,70]
*
* \param ptr array that contain the pointers to the message to send
*
* \param msg_alloc This is a call-back with the purpose of allocate space
* for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
* the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
* the following 6 parameters
* in the call-back in order:
* * message size required to receive the message (100)
* in the call-back are in order:
* * message size required to receive the message [100]
* * total message size to receive from all the processors (NBX does not provide this information)
* * the total number of processor want to communicate with you (NBX does not provide this information)
* * processor id (5)
* * processor id [5]
* * ri request id (it is an id that goes from 0 to total_p, and is incremented
* every time message_alloc is called)
* * void pointer, parameter for additional data to pass to the call-back
*
* \param ptr_arg data passed to the call-back function specified
*
* \param opt options, only NONE supported
* \param opt options, NONE (ignored in this moment)
*
*/
template<typename T> void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE)
void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] ,
void * ptr[], size_t n_recv, size_t prc_recv[] ,
void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *),
void * ptr_arg, long int opt=NONE)
{
#ifdef SE_CLASS1
checkType<typename T::value_type>();
#endif
// resize the pointer list
ptr_send.resize(prc.size());
sz_send.resize(prc.size());
sz_recv_tmp.resize(n_recv);
for (size_t i = 0 ; i < prc.size() ; i++)
// First we understand the receive size for each processor
for (size_t i = 0 ; i < n_send ; i++)
{send(prc[i],SEND_SPARSE + NBX_cnt,&sz[i],sizeof(size_t));}
for (size_t i = 0 ; i < n_recv ; i++)
{recv(prc_recv[i],SEND_SPARSE + NBX_cnt,&sz_recv_tmp.get(i),sizeof(size_t));}
execute();
// Circular counter
NBX_cnt = (NBX_cnt + 1) % 1024;
// Allocate the buffers
for (size_t i = 0 ; i < n_send ; i++)
{send(prc[i],SEND_SPARSE + NBX_cnt,ptr[i],sz[i]);}
for (size_t i = 0 ; i < n_recv ; i++)
{
ptr_send.get(i) = data.get(i).getPointer();
sz_send.get(i) = data.get(i).size() * sizeof(typename T::value_type);
}
void * ptr_recv = msg_alloc(sz_recv_tmp.get(i),0,0,prc_recv[i],i,ptr_arg);
sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt);
}
recv(prc_recv[i],SEND_SPARSE + NBX_cnt,ptr_recv,sz_recv_tmp.get(i));
}
execute();
// Circular counter
NBX_cnt = (NBX_cnt + 1) % 1024;
}
/*! \brief Send and receive multiple messages
*
......@@ -659,7 +748,10 @@ public:
* \param opt options, NONE (ignored in this moment)
*
*/
void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt = NONE)
void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] ,
void * ptr[],
void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *),
void * ptr_arg, long int opt = NONE)
{
if (stat.size() != 0 || req.size() != 0)
std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress. Please remember that if you use function like max(),sum(),send(),recv() check that you did not miss to call the function execute() \n";
......
......@@ -637,7 +637,7 @@ BOOST_AUTO_TEST_CASE (Vcluster_semantic_struct_scatter)
BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv)
BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv_all_unknown)
{
openfpm::vector<size_t> prc_recv2;
openfpm::vector<size_t> prc_recv3;
......@@ -738,6 +738,16 @@ BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv)
BOOST_REQUIRE_EQUAL(match,true);
}
}
BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv_receive_size_known)
{
openfpm::vector<size_t> prc_recv2;
openfpm::vector<size_t> prc_recv3;
openfpm::vector<size_t> sz_recv2;
openfpm::vector<size_t> sz_recv3;
for (size_t i = 0 ; i < 100 ; i++)
{
......@@ -806,6 +816,81 @@ BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv)
}
BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv_receive_known)
{
openfpm::vector<size_t> prc_recv2;
openfpm::vector<size_t> prc_recv3;
openfpm::vector<size_t> sz_recv2;
openfpm::vector<size_t> sz_recv3;
for (size_t i = 0 ; i < 100 ; i++)
{
Vcluster & vcl = create_vcluster();
if (vcl.getProcessUnitID() == 0 && i == 0)
std::cout << "Semantic sendrecv test start" << std::endl;
if (vcl.getProcessingUnits() >= 32)
return;
openfpm::vector<size_t> prc_send;
openfpm::vector<openfpm::vector<size_t>> v1;
openfpm::vector<size_t> v2;
openfpm::vector<openfpm::vector<size_t>> v3;
v1.resize(vcl.getProcessingUnits());
size_t nc = vcl.getProcessingUnits() / SSCATTER_MAX;
size_t nr = vcl.getProcessingUnits() - nc * SSCATTER_MAX;
nr = ((nr-1) * nr) / 2;
size_t n_ele = nc * SSCATTER_MAX * (SSCATTER_MAX - 1) / 2 + nr;
for(size_t i = 0 ; i < v1.size() ; i++)
{
for (size_t j = 0 ; j < i % SSCATTER_MAX ; j++)
v1.get(i).add(j);
prc_send.add((i + vcl.getProcessUnitID()) % vcl.getProcessingUnits());
}
vcl.SSendRecv(v1,v2,prc_send,prc_recv2,sz_recv2,RECEIVE_KNOWN);
vcl.SSendRecv(v1,v3,prc_send,prc_recv3,sz_recv3);
BOOST_REQUIRE_EQUAL(v2.size(),n_ele);
size_t nc_check = (vcl.getProcessingUnits()-1) / SSCATTER_MAX;
BOOST_REQUIRE_EQUAL(v3.size(),vcl.getProcessingUnits()-1-nc_check);
bool match = true;
size_t s = 0;
for (size_t i = 0 ; i < sz_recv2.size() ; i++)
{
for (size_t j = 0 ; j < sz_recv2.get(i); j++)
{
match &= v2.get(s+j) == j;
}
s += sz_recv2.get(i);
}
BOOST_REQUIRE_EQUAL(match,true);
for (size_t i = 0 ; i < v3.size() ; i++)
{
for (size_t j = 0 ; j < v3.get(i).size() ; j++)
{
match &= v3.get(i).get(j) == j;
}
}
BOOST_REQUIRE_EQUAL(match,true);
}
}
BOOST_AUTO_TEST_CASE (Vcluster_semantic_struct_sendrecv)
{
for (size_t i = 0 ; i < 100 ; i++)
......
......@@ -12,6 +12,9 @@
#include "VCluster_base.hpp"
#include "Vector/vector_test_util.hpp"
#define RECEIVE_UNKNOWN 1
#define RECEIVE_SIZE_UNKNOWN 2
#define NBX 1
#define N_TRY 2
......@@ -321,7 +324,7 @@ template<unsigned int ip> void test_known()
}
}
template<unsigned int ip> void test()
template<unsigned int ip> void test(unsigned int opt)
{
Vcluster & vcl = create_vcluster();
......@@ -388,10 +391,51 @@ template<unsigned int ip> void test()
recv_message.get(p_id).resize(j);
}
// Send and receive
vcl.sendrecvMultipleMessagesNBX(prc,message,msg_alloc,&recv_message);
if (opt == RECEIVE_UNKNOWN)
{
// Send and receive
vcl.sendrecvMultipleMessagesNBX(prc,message,msg_alloc,&recv_message);
}
//! [dsde]
else if (opt == RECEIVE_SIZE_UNKNOWN)
{
openfpm::vector<size_t> sz_send;
openfpm::vector<void *> ptr;
openfpm::vector<size_t> prc_recv;
sz_send.resize(prc.size());
ptr.resize(prc.size());
for (size_t i = 0 ; i < prc.size() ; i++)
{
sz_send.get(i) = message.get(i).size();
ptr.get(i) = &message.get(i).get(0);
}
// Calculate the receiving part
// Check the message
for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
{
long int p_id = vcl.getProcessUnitID() - i - 1;
if (p_id < 0)
{p_id += n_proc;}
else
{p_id = p_id % n_proc;}
if (p_id != (long int)vcl.getProcessUnitID())
{
prc_recv.add(p_id);
}
}
//! [dsde]
// Send and receive
vcl.sendrecvMultipleMessagesNBX(prc.size(),&sz_send.get(0),&prc.get(0),
&ptr.get(0),prc_recv.size(),&prc_recv.get(0),msg_alloc,&recv_message);
}
#ifdef VERBOSE_TEST
timer t;
......@@ -437,6 +481,9 @@ template<unsigned int ip> void test()
}
}
if (opt == RECEIVE_SIZE_UNKNOWN)
{return;}
std::srand(create_vcluster().getProcessUnitID());
std::default_random_engine eg;
std::uniform_int_distribution<int> d(0,n_proc/8);
......
......@@ -181,7 +181,7 @@ BOOST_AUTO_TEST_CASE( VCluster_use_sendrecv)
std::cout << "VCluster unit test start sendrecv" << "\n";
totp_check = false;
test<NBX>();
test<NBX>(RECEIVE_UNKNOWN);
totp_check = false;
test_no_send_some_peer<NBX>();
......@@ -189,6 +189,19 @@ BOOST_AUTO_TEST_CASE( VCluster_use_sendrecv)
std::cout << "VCluster unit test stop sendrecv" << "\n";
}
BOOST_AUTO_TEST_CASE( VCluster_use_sendrecv_size_known)
{
std::cout << "VCluster unit test start sendrecv known size" << "\n";
totp_check = false;
test<NBX>(RECEIVE_SIZE_UNKNOWN);
totp_check = false;
test_no_send_some_peer<NBX>();
std::cout << "VCluster unit test stop sendrecv known size" << "\n";
}
BOOST_AUTO_TEST_CASE( VCluster_use_sendrecv_known)
{
std::cout << "VCluster unit test start known" << "\n";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment