Commit 57a5391e authored by incardon's avatar incardon
Browse files

Renaming the old pattern PCX and creating the NBX

parent 028ce9f4
...@@ -191,7 +191,7 @@ CUDA_CFLAGS = -I -I -I/usr/local/cuda-5.5/include ...@@ -191,7 +191,7 @@ CUDA_CFLAGS = -I -I -I/usr/local/cuda-5.5/include
CUDA_LIBS = -L -L -L/usr/local/cuda-5.5/lib64 -lcuda -lcudart CUDA_LIBS = -L -L -L/usr/local/cuda-5.5/lib64 -lcuda -lcudart
CXX = mpic++ CXX = mpic++
CXXDEPMODE = depmode=gcc3 CXXDEPMODE = depmode=gcc3
CXXFLAGS = --std=c++11 -march=native -mtune=native -Wno-unused-local-typedefs -Wextra -Wno-unused-parameter -Wall -O3 -g3 -funroll-loops CXXFLAGS = --std=c++11 -march=native -mtune=native -Wno-unused-local-typedefs -Wextra -Wno-unused-parameter -g3 -Wall -O0
CYGPATH_W = echo CYGPATH_W = echo
DEFAULT_LIB = -lrt DEFAULT_LIB = -lrt
DEFS = -DHAVE_CONFIG_H DEFS = -DHAVE_CONFIG_H
...@@ -213,7 +213,7 @@ LTLIBOBJS = ...@@ -213,7 +213,7 @@ LTLIBOBJS =
MAKEINFO = ${SHELL} /home/i-bird/Desktop/MOSAIC/OpenFPM_project/OpenFPM_vcluster/missing makeinfo MAKEINFO = ${SHELL} /home/i-bird/Desktop/MOSAIC/OpenFPM_project/OpenFPM_vcluster/missing makeinfo
MKDIR_P = /usr/bin/mkdir -p MKDIR_P = /usr/bin/mkdir -p
NVCC = /usr/local/cuda-5.5/bin/nvcc NVCC = /usr/local/cuda-5.5/bin/nvcc
NVCCFLAGS = -O3 NVCCFLAGS = -g -O0
NVCC_EXIST = yes NVCC_EXIST = yes
OBJEXT = o OBJEXT = o
PACKAGE = full-package-name PACKAGE = full-package-name
......
...@@ -7,10 +7,11 @@ ...@@ -7,10 +7,11 @@
#include "Vector/map_vector.hpp" #include "Vector/map_vector.hpp"
#include "MPI_IallreduceW.hpp" #include "MPI_IallreduceW.hpp"
#include <exception> #include <exception>
#include <Vector/map_vector.hpp> #include "Vector/map_vector.hpp"
#define MSG_LENGTH 1024 #define MSG_LENGTH 1024
#define MSG_SEND_RECV 1025 #define MSG_SEND_RECV 1025
#define SEND_SPARSE 1026
#define NONE 1 #define NONE 1
#define NEED_ALL_SIZE 2 #define NEED_ALL_SIZE 2
...@@ -241,21 +242,105 @@ public: ...@@ -241,21 +242,105 @@ public:
// sending map // sending map
openfpm::vector<size_t> map; openfpm::vector<size_t> map;
// Distributed processor graph
MPI_Comm proc_comm_graph;
/*! \brief
*
* Set the near processor of this processors
*
*/
openfpm::vector<size_t> NN_proc;
void setLocality(openfpm::vector<size_t> NN_proc)
{
// Number of sources in the graph, and sources processors
size_t sources = NN_proc.size();
openfpm::vector<int> src_proc;
// number of destination in the graph
size_t dest = NN_proc.size();
openfpm::vector<int> dest_proc;
// insert in sources and out sources
for (size_t i = 0; i < NN_proc.size() ; i++)
{
src_proc.add(NN_proc.get(i));
dest_proc.add(NN_proc.get(i));
// Copy the vector
this->NN_proc.get(i) = NN_proc.get(i);
}
MPI_Dist_graph_create_adjacent(MPI_COMM_WORLD,sources,&src_proc.get(0),(const int *)MPI_UNWEIGHTED,dest,&dest_proc.get(0),(const int *)MPI_UNWEIGHTED,MPI_INFO_NULL,true,&proc_comm_graph);
}
/*! \brief Send and receive multiple messages within local processors
*
* It send multiple messages and receive
* other multiple messages, all the processor must call this
* function
*
* \param prc list of processors with which it should communicate
*
* \param v vector containing the data to send
*
* \param msg_alloc This is a call-back with the purpose of allocate space
* for the incoming message and give back a valid pointer, the 6 parameters
* in the call-back are in order:
* 1) message size required to receive from i
* 2) total message size to receive from all the processors
* 3) the total number of processor want to communicate with you
* 4) processor id
* 5) ri request id (it is an id that goes from 0 to total_p, and is unique
* every time message_alloc is called)
* 6) void pointer parameter for additional data to pass to the call-back
*
* \param opt options, NONE or NEED_ALL_SIZE, with NEED_ALL_SIZE the allocation
* callback will not be called until all the message size will be
* gathered, [usefull for example with you want to allocate one big buffer
* to gather all the messages]
*
*/
template<typename T> void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE)
{
// resize the pointer list
ptr_send.resize(prc.size());
sz_send.resize(prc.size());
for (size_t i = 0 ; i < prc.size() ; i++)
{
ptr_send.get(i) = data.get(i).getPointer();
sz_send.get(i) = data.get(i).size() * sizeof(typename T::value_type);
}
sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt);
}
/*! \brief Send and receive multiple messages /*! \brief Send and receive multiple messages
* *
* It send multiple (to more than one) messages and receive * It send multiple messages and receive
* other multiple messages, all the processor must call this * other multiple messages, all the processor must call this
* function * function
* *
* \param prc list of processors with which it should communicate * \param prc list of processors with which it should communicate
* *
* \param nn_prc near processors
*
* \param v vector containing the data to send * \param v vector containing the data to send
* *
* \param msg_alloc This is a call-back with the purpose of allocate space * \param msg_alloc This is a call-back with the purpose of allocate space
* for the incoming message and give back a valid pointer, the 3 parameters * for the incoming message and give back a valid pointer, the 6 parameters
* in the call-back are , total message to receive, i processor id from witch * in the call-back are in order:
* to receive * 1) message size required to receive from i
* 2) total message size to receive from all the processors
* 3) the total number of processor want to communicate with you
* 4) processor id
* 5) ri request id (it is an id that goes from 0 to total_p, and is unique
* every time message_alloc is called)
* 6) void pointer parameter for additional data to pass to the call-back
* *
* \param opt options, NONE or NEED_ALL_SIZE, with NEED_ALL_SIZE the allocation * \param opt options, NONE or NEED_ALL_SIZE, with NEED_ALL_SIZE the allocation
* callback will not be called until all the message size will be * callback will not be called until all the message size will be
...@@ -264,7 +349,7 @@ public: ...@@ -264,7 +349,7 @@ public:
* *
*/ */
template<typename T> void sendrecvMultipleMessages(openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE) template<typename T> void sendrecvMultipleMessagesPCX(openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt=NONE)
{ {
// resize map with the number of processors // resize map with the number of processors
map.resize(size); map.resize(size);
...@@ -286,7 +371,109 @@ public: ...@@ -286,7 +371,109 @@ public:
sz_send.get(i) = data.get(i).size() * sizeof(typename T::value_type); sz_send.get(i) = data.get(i).size() * sizeof(typename T::value_type);
} }
sendrecvMultipleMessages(prc.size(),(size_t *)map.getPointer(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt); sendrecvMultipleMessagesPCX(prc.size(),(size_t *)map.getPointer(),(size_t *)sz_send.getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send.getPointer(),msg_alloc,ptr_arg,opt);
}
/*! \brief Send and receive multiple messages local
*
* It send multiple messages to the near processor the and receive
* other multiple messages from the, all the processor must call this
* function
*
* \param n_send number of send this processor must do
*
* \param sz the array contain the size of the message for each processor
* (zeros must be omitted)
*
* [Example] for the previous patter 5 10 15 4 mean processor 1
* message size 5 byte, processor 6 message size 10 , ......
*
* \param prc list of processor with which it should communicate
* [Example] for the previous case should be
* 1 6 7 8 (prc and mp contain the same information in different
* format, giving both reduce the computation)
*
* \param ptr array that contain the messages pointers
*
* \param msg_alloc This is a call-back with the purpose of allocate space
* for the incoming message and give back a valid pointer, the 6 parameters
* in the call-back are in order:
* 1) message size required to receive from i
* 2) total message size to receive from all the processors
* 3) the total number of processor want to communicate with you
* 4) processor id
* 5) ri request id (it is an id that goes from 0 to total_p, and is unique
* every time message_alloc is called)
* 6) void pointer parameter for additional data to pass to the call-back
*
* \param opt options, NONE (ignored in this moment)
*
*/
void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , void * ptr[], void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt)
{
req.clear();
// Do MPI_Issend
for (size_t i = 0 ; i < n_send ; i++)
{
req.add();
MPI_SAFE_CALL(MPI_Issend(ptr[i], sz[i], MPI_BYTE, prc[i], SEND_SPARSE, MPI_COMM_WORLD,&req.last()));
}
size_t rid = 0;
int flag = false;
bool reached_bar_req = false;
MPI_Request bar_req;
// Wait that all the send are acknowledge
do
{
// flag that notify that this processor reach the barrier
// Barrier request
MPI_Status stat_t;
int stat = false;
MPI_SAFE_CALL(MPI_Iprobe(MPI_ANY_SOURCE,SEND_SPARSE,MPI_COMM_WORLD,&stat,&stat_t));
// If I have received a message
if (stat == true)
{
// Get the message size
int msize;
MPI_SAFE_CALL(MPI_Get_count(&stat_t,MPI_BYTE,&msize));
// Get the pointer to receive the message
void * ptr = msg_alloc(msize,0,0,stat_t.MPI_SOURCE,rid,ptr_arg);
rid++;
MPI_SAFE_CALL(MPI_Recv(ptr,msize,MPI_BYTE,stat_t.MPI_SOURCE,SEND_SPARSE,MPI_COMM_WORLD,&stat_t));
}
// Check the status of all the MPI_issend and call the barrier if finished
if (reached_bar_req == false)
{
int flag = false;
MPI_SAFE_CALL(MPI_Testall(req.size(),&req.get(0),&flag,MPI_STATUSES_IGNORE));
// If all send has been completed
if (flag == true)
{MPI_SAFE_CALL(MPI_Ibarrier(MPI_COMM_WORLD,&bar_req));reached_bar_req = true;}
}
// Check if all processor reach the async barrier
if (reached_bar_req)
MPI_Test(&bar_req,&flag,MPI_STATUSES_IGNORE);
} while (flag == false);
// Remove the executed request
req.clear();
stat.clear();
} }
/*! \brief Send and receive multiple messages /*! \brief Send and receive multiple messages
...@@ -328,7 +515,7 @@ public: ...@@ -328,7 +515,7 @@ public:
* *
*/ */
void sendrecvMultipleMessages(size_t n_send, size_t * map, size_t sz[], size_t prc[] , void * ptr[], void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt) void sendrecvMultipleMessagesPCX(size_t n_send, size_t * map, size_t sz[], size_t prc[] , void * ptr[], void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg, long int opt)
{ {
req.clear(); req.clear();
req.add(); req.add();
......
...@@ -13,55 +13,10 @@ ...@@ -13,55 +13,10 @@
#include <boost/test/included/unit_test.hpp> #include <boost/test/included/unit_test.hpp>
#include "timer.hpp" #include "timer.hpp"
#include <random> #include <random>
#include "VCluster_unit_test_util.hpp"
#define VERBOSE_TEST
#define N_TRY 2
#define N_LOOP 67108864
#define BUFF_STEP 524288
BOOST_AUTO_TEST_SUITE( VCluster_test ) BOOST_AUTO_TEST_SUITE( VCluster_test )
size_t global_step = 0;
size_t global_rank;
// Alloc the buffer to receive the messages
void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i,size_t ri, void * ptr)
{
openfpm::vector<openfpm::vector<unsigned char>> * v = static_cast<openfpm::vector<openfpm::vector<unsigned char>> *>(ptr);
if (global_v_cluster->getProcessingUnits() <= 8)
BOOST_REQUIRE_EQUAL(total_p,global_v_cluster->getProcessingUnits()-1);
else
BOOST_REQUIRE_EQUAL(total_p,8);
BOOST_REQUIRE_EQUAL(msg_i, global_step);
v->get(i).resize(msg_i);
return &(v->get(i).get(0));
}
// Alloc the buffer to receive the messages
size_t id = 0;
openfpm::vector<size_t> prc_recv;
void * msg_alloc2(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
{
openfpm::vector<openfpm::vector<unsigned char>> * v = static_cast<openfpm::vector<openfpm::vector<unsigned char>> *>(ptr);
v->resize(total_p);
prc_recv.resize(total_p);
BOOST_REQUIRE_EQUAL(msg_i, global_step);
id++;
v->get(id-1).resize(msg_i);
prc_recv.get(id-1) = i;
return &(v->get(id-1).get(0));
}
BOOST_AUTO_TEST_CASE( VCluster_use_reductions) BOOST_AUTO_TEST_CASE( VCluster_use_reductions)
{ {
init_global_v_cluster(&boost::unit_test::framework::master_test_suite().argc,&boost::unit_test::framework::master_test_suite().argv); init_global_v_cluster(&boost::unit_test::framework::master_test_suite().argc,&boost::unit_test::framework::master_test_suite().argv);
...@@ -151,310 +106,10 @@ BOOST_AUTO_TEST_CASE( VCluster_use_sendrecv) ...@@ -151,310 +106,10 @@ BOOST_AUTO_TEST_CASE( VCluster_use_sendrecv)
std::cout << "VCluster unit test start" << "\n"; std::cout << "VCluster unit test start" << "\n";
init_global_v_cluster(&boost::unit_test::framework::master_test_suite().argc,&boost::unit_test::framework::master_test_suite().argv); init_global_v_cluster(&boost::unit_test::framework::master_test_suite().argc,&boost::unit_test::framework::master_test_suite().argv);
Vcluster & vcl = *global_v_cluster; totp_check = true;
test<PCX>();
// send/recv messages totp_check = false;
test<NBX>();
global_rank = vcl.getProcessUnitID();
size_t n_proc = vcl.getProcessingUnits();
// Checking short communication pattern
for (size_t s = 0 ; s < N_TRY ; s++)
{
for (size_t j = 32 ; j < N_LOOP ; j*=2)
{
global_step = j;
// send message
openfpm::vector<openfpm::vector<unsigned char>> message;
// recv message
openfpm::vector<openfpm::vector<unsigned char>> recv_message(n_proc);
openfpm::vector<size_t> prc;
for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
{
size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc;
if (p_id != vcl.getProcessUnitID())
{
prc.add(p_id);
message.add();
std::ostringstream msg;
msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id;
std::string str(msg.str());
message.last().resize(j);
memset(message.last().getPointer(),0,j);
std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
}
}
recv_message.resize(n_proc);
// The pattern is not really random preallocate the receive buffer
for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
{
long int p_id = vcl.getProcessUnitID() - i - 1;
if (p_id < 0)
p_id += n_proc;
else
p_id = p_id % n_proc;
if (p_id != (long int)vcl.getProcessUnitID())
recv_message.get(p_id).resize(j);
}
#ifdef VERBOSE_TEST
timer t;
t.start();
#endif
vcl.sendrecvMultipleMessages(prc,message,msg_alloc,&recv_message);
#ifdef VERBOSE_TEST
t.stop();
double clk = t.getwct();
double clk_max = clk;
size_t size_send_recv = 2 * j * (prc.size());
vcl.reduce(size_send_recv);
vcl.max(clk_max);
vcl.execute();
if (vcl.getProcessUnitID() == 0)
std::cout << "(Short pattern: )Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n";
#endif
// Check the message
for (size_t i = 0 ; i < 8 && i < n_proc ; i++)
{
long int p_id = vcl.getProcessUnitID() - i - 1;
if (p_id < 0)
p_id += n_proc;
else
p_id = p_id % n_proc;
if (p_id != (long int)vcl.getProcessUnitID())
{
std::ostringstream msg;
msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID();
std::string str(msg.str());
BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true);
}
else
{
BOOST_REQUIRE_EQUAL(0,recv_message.get(p_id).size());
}
}
}
std::srand(global_v_cluster->getProcessUnitID());
std::default_random_engine eg;
std::uniform_int_distribution<int> d(0,n_proc/8);
// Check random pattern (maximum 16 processors)
for (size_t j = 32 ; j < N_LOOP && n_proc < 16 ; j*=2)
{
global_step = j;
// original send
openfpm::vector<size_t> o_send;
// send message
openfpm::vector<openfpm::vector<unsigned char>> message;
// recv message
openfpm::vector<openfpm::vector<unsigned char>> recv_message;
openfpm::vector<size_t> prc;
for (size_t i = 0 ; i < n_proc ; i++)
{
// randomly with witch processor communicate
if (d(eg) == 0)
{
prc.add(i);
o_send.add(i);
message.add();
std::ostringstream msg;
msg << "Hello from " << vcl.getProcessUnitID() << " to " << i;
std::string str(msg.str());
message.last().resize(str.size());
std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
message.last().resize(j);
}
}
id = 0;
prc_recv.clear();
#ifdef VERBOSE_TEST
timer t;
t.start();
#endif
vcl.sendrecvMultipleMessages(prc,message,msg_alloc2,&recv_message);
#ifdef VERBOSE_TEST
t.stop();
double clk = t.getwct();
double clk_max = clk;
size_t size_send_recv = (prc.size() + recv_message.size()) * j;
vcl.reduce(size_send_recv);
vcl.reduce(clk);
vcl.max(clk_max);
vcl.execute();
clk /= vcl.getProcessingUnits();
if (vcl.getProcessUnitID() == 0)
std::cout << "(Random Pattern: ) Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max << "\n";
#endif
// Check the message
for (size_t i = 0 ; i < recv_message.size() ; i++)
{
std::ostringstream msg;
msg << "Hello from " << prc_recv.get(i) << " to " << vcl.getProcessUnitID();
std::string str(msg.str());
BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(i).get(0))),true);
}
// Reply back
// Create the message
prc.clear();
message.clear();
for (size_t i = 0 ; i < prc_recv.size() ; i++)
{
prc.add(prc_recv.get(i));
message.add();
std::ostringstream msg;
msg << "Hey from " << vcl.getProcessUnitID() << " to " << prc_recv.get(i);
std::string str(msg.str());
message.last().resize(str.size());
std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0)));
message.last().resize(j);
}
id = 0;
prc_recv.clear();
recv_message.clear();
vcl.sendrecvMultipleMessages(prc,message,msg_alloc2,&recv_message);
// Check if the received hey message match the original send
BOOST_REQUIRE_EQUAL(o_send.size(),prc_recv.size());
for (size_t i = 0 ; i < o_send.size() ; i++)
{
size_t j = 0;
for ( ; j < prc_recv.size() ; j++)
{
if (o_send.get(i) == prc_recv.get(j))
{
// found the message check it
std::ostringstream msg;
msg << "Hey from " << prc_recv.get(i) << " to " << vcl.getProcessUnitID();
std::string str(msg.str());
BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(i).get(0))),true);
break;
}
}
// Check that we find always a match
BOOST_REQUIRE_EQUAL(j != prc_recv.size(),true);
}
}
// Check long communication pattern
for (size_t j = 32 ; j < N_LOOP ; j*=2)
{
global_step = j;
// Processor step
long int ps = n_proc / (8 + 1);
// send message
openfpm::vector<openfpm::vector<unsigned char>> message;
// recv message
openfpm::vector<openfpm::vector<unsigned char>> recv_message(n_proc);
openfpm::vector<size_t> prc;