Commit 81e452b8 authored by Yaroslav's avatar Yaroslav

Merge branch 'master' into gather_with_packer

parents fedede3d a874640c
......@@ -2,6 +2,7 @@
# Make a directory in /tmp/openfpm_data
echo "$PATH"
echo "Directory: $1"
echo "Machine: $2"
......@@ -16,13 +17,11 @@ git clone git@ppmcore.mpi-cbg.de:/incardon/openfpm_data.git openfpm_data
cd "$1/openfpm_vcluster"
if [ "$2" == "gin" ]
then
if [ "$2" == "gin" ]; then
echo "Compiling on gin\n"
module load gcc/4.9.2
module load openmpi/1.8.1
elif [ "$2" == "wetcluster" ]
then
elif [ "$2" == "wetcluster" ]; then
echo "Compiling on wetcluster"
## produce the module path
......@@ -59,8 +58,7 @@ exit(0)\n"
# if [ $? -ne 0 ]; then exit 1 ; fi
# bsub -o output_run32.%J -K -n 128 "module load openmpi/1.8.1 ; module load gcc/4.9.2; mpirun -np 128 ./src/vcluster"
# if [ $? -ne 0 ]; then exit 1 ; fi
elif [ "$2" == "taurus" ]
then
elif [ "$2" == "taurus" ]; then
echo "Compiling on taurus"
echo "$PATH"
......@@ -92,6 +90,9 @@ then
if [ $? -ne 0 ]; then exit 1 ; fi
else
source $HOME/.bashrc
echo "$PATH"
echo "Compiling general"
sh ./autogen.sh
sh ./configure CXX=mpic++
......@@ -99,6 +100,8 @@ else
mpirun -np 2 ./src/vcluster
if [ $? -ne 0 ]; then exit 1 ; fi
mpirun -np 3 ./src/vcluster
if [ $? -ne 0 ]; then exit 1 ; fi
mpirun -np 4 ./src/vcluster
if [ $? -ne 0 ]; then exit 1 ; fi
fi
......
......@@ -19,6 +19,7 @@ m4_ifdef([AX_BOOST_IOSTREAMS],,[m4_include([m4/ax_boost_iostreams.m4])])
m4_ifdef([AX_BOOST_PROGRAM_OPTIONS],,[m4_include([m4/ax_boost_program_options.m4])])
m4_ifdef([AX_BOOST_UNIT_TEST_FRAMEWORK],,[m4_include([m4/ax_boost_unit_test_framework.m4])])
case $host_os in
*cygwin*)
# Do something specific for cygwin
......@@ -101,6 +102,7 @@ else
NVCCFLAGS+="$NVCCFLAGS -O3 "
fi
####### include openfpm_devices include path
INCLUDES_PATH+=" -I/usr/local/include -I. -Iconfig -I../../openfpm_devices/src -I../../openfpm_data/src"
......
LINKLIBS = $(DEFAULT_LIB) $(PTHREAD_LIBS) $(OPT_LIBS) $(BOOST_LDFLAGS)
LINKLIBS = $(DEFAULT_LIB) $(PTHREAD_LIBS) $(OPT_LIBS) $(HDF5_LDFLAGS) $(HDF5_LIBS) $(BOOST_LDFLAGS)
noinst_PROGRAMS = vcluster
vcluster_SOURCES = main.cpp VCluster.cpp ../../openfpm_devices/src/memory/HeapMemory.cpp ../../openfpm_devices/src/memory/PtrMemory.cpp
vcluster_CXXFLAGS = $(INCLUDES_PATH) $(BOOST_CPPFLAGS)
vcluster_CXXFLAGS = $(INCLUDES_PATH) $(BOOST_CPPFLAGS)
vcluster_CFLAGS = $(CUDA_CFLAGS)
vcluster_LDADD = $(LINKLIBS)
......@@ -18,3 +18,7 @@ util/Vcluster_log.hpp
.cu.o :
$(NVCC) $(NVCCFLAGS) -o $@ -c $<
test: vcluster
source $(HOME)/openfpm_vars && cd .. && mpirun -np 3 ./src/vcluster && mpirun -np 4 ./src/vcluster
......@@ -4,3 +4,4 @@ Vcluster * global_v_cluster_private = NULL;
// number of vcluster instances
size_t n_vcluster = 0;
bool ofp_initialized = false;
......@@ -19,6 +19,10 @@
#include "util/Vcluster_log.hpp"
#include "memory/BHeapMemory.hpp"
#ifdef HAVE_PETSC
#include <petscvec.h>
#endif
#define MSG_LENGTH 1024
#define MSG_SEND_RECV 1025
#define SEND_SPARSE 4096
......@@ -33,6 +37,8 @@
extern size_t n_vcluster;
// Global MPI initialization
extern bool global_mpi_init;
// initialization flag
extern bool ofp_initialized;
///////////////////// Post functions /////////////
......@@ -949,19 +955,13 @@ public:
*/
void execute()
{
int err = 0;
// if req == 0 return
if (req.size() == 0)
return;
// Wait for all the requests
stat.resize(req.size());
err = MPI_Waitall(req.size(),&req.get(0),&stat.get(0));
// MPI error get the message and abort MPI
if (err != MPI_SUCCESS)
MPI_Abort(MPI_COMM_WORLD,1);
MPI_SAFE_CALL(MPI_Waitall(req.size(),&req.get(0),&stat.get(0)));
// Remove executed request and status
req.clear();
......@@ -1006,6 +1006,16 @@ static inline Vcluster & create_vcluster()
return *global_v_cluster_private;
}
/*! \brief Check if the library has been initialized
*
* \return true if the library has been initialized
*
*/
static inline bool is_openfpm_init()
{
return ofp_initialized;
}
/*! \brief Initialize the library
*
* This function MUST be called before any other function
......@@ -1013,7 +1023,14 @@ static inline Vcluster & create_vcluster()
*/
static inline void openfpm_init(int *argc, char ***argv)
{
#ifdef HAVE_PETSC
PetscInitialize(argc,argv,NULL,NULL);
#endif
init_global_v_cluster_private(argc,argv);
ofp_initialized = true;
}
/*! \brief Finalize the library
......@@ -1023,7 +1040,14 @@ static inline void openfpm_init(int *argc, char ***argv)
*/
static inline void openfpm_finalize()
{
#ifdef HAVE_PETSC
PetscFinalize();
#endif
delete_global_v_cluster_private();
ofp_initialized = false;
}
#endif
......
......@@ -17,6 +17,8 @@ void reset_recv_buf()
{
for (size_t i = 0 ; i < recv_buf.size() ; i++)
recv_buf.get(i).resize(0);
recv_buf.resize(0);
}
/*! \brief Base info
......@@ -78,8 +80,11 @@ static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i
* \param recv receive object
*
*/
template<typename T, typename S> void process_receive_buffer(S & recv)
template<typename T, typename S> void process_receive_buffer(S & recv, openfpm::vector<size_t> * sz = NULL)
{
if (sz != NULL)
sz->resize(recv_buf.size());
for (size_t i = 0 ; i < recv_buf.size() ; i++)
{
// for each received buffer create a memory reppresentation
......@@ -90,7 +95,7 @@ template<typename T, typename S> void process_receive_buffer(S & recv)
PtrMemory * ptr1 = new PtrMemory(recv_buf.get(i).getPointer(),recv_buf.get(i).size());
// create vector representation to a piece of memory already allocated
openfpm::vector<typename T::value_type,PtrMemory,openfpm::grow_policy_identity> v2;
openfpm::vector<typename T::value_type,PtrMemory,typename memory_traits_lin<typename T::value_type>::type, memory_traits_lin,openfpm::grow_policy_identity> v2;
v2.setMemory(*ptr1);
......@@ -99,6 +104,9 @@ template<typename T, typename S> void process_receive_buffer(S & recv)
// Merge the information
recv.add(v2);
if (sz != NULL)
sz->get(i) = v2.size();
}
}
......@@ -191,7 +199,7 @@ template<typename T, typename S> bool SGather(T & send, S & recv, openfpm::vecto
sz.get(i) /= sizeof(typename T::value_type);
// process the received information
process_receive_buffer<T,S>(recv);
process_receive_buffer<T,S>(recv,&sz);
recv.add(send);
prc.add(root);
......@@ -275,7 +283,7 @@ template<typename T, typename S> bool SScatter(T & send, S & recv, openfpm::vect
sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
// process the received information
process_receive_buffer<T,S>(recv);
process_receive_buffer<T,S>(recv,NULL);
}
else
{
......@@ -288,10 +296,71 @@ template<typename T, typename S> bool SScatter(T & send, S & recv, openfpm::vect
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
process_receive_buffer<T,S>(recv);
process_receive_buffer<T,S>(recv,NULL);
}
return true;
}
/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
*
* Semantic communication differ from the normal one. They in general
* follow the following model.
*
* SSendRecv(T,S,...,op=add);
*
* "SendRecv" indicate the communication pattern, or how the information flow
* T is the object to send, S is the object that will receive the data.
* In order to work S must implement the interface S.add(T).
*
* ### Example scatter a vector of structures, to other processors
* \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
*
* \tparam T type of sending object
* \tparam S type of receiving object
*
* \param Object to send
* \param Object to receive
* \param prc processor involved in the scatter
* \param sz size of each chunks
* \param root which processor should scatter the information
*
* \return true if the function completed succefully
*
*/
template<typename T, typename S> bool SSendRecv(openfpm::vector<T> & send, S & recv, openfpm::vector<size_t> & prc_send, openfpm::vector<size_t> & prc_recv, openfpm::vector<size_t> & sz_recv)
{
// Reset the receive buffer
reset_recv_buf();
#ifdef SE_CLASS1
if (send.size() != prc_send.size())
std::cerr << __FILE__ << ":" << __LINE__ << " Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl;
#endif
// Prepare the sending buffer
openfpm::vector<const void *> send_buf;
openfpm::vector<size_t> sz_byte;
sz_byte.resize(send.size());
for (size_t i = 0; i < send.size() ; i++)
{
send_buf.add((char *)send.get(i).getPointer());
sz_byte.get(i) = send.get(i).size() * sizeof(typename T::value_type);
}
// receive information
base_info bi(&recv_buf,prc_recv,sz_recv);
// Send and recv multiple messages
sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
// process the received information
process_receive_buffer<T,S>(recv,&sz_recv);
return true;
}
......@@ -234,6 +234,173 @@ BOOST_AUTO_TEST_CASE (Vcluster_semantic_gather_ptst)
BOOST_AUTO_TEST_CASE (Vcluster_semantic_sendrecv)
{
for (size_t i = 0 ; i < 100 ; i++)
{
Vcluster & vcl = create_vcluster();
if (vcl.getProcessingUnits() >= 32)
return;
openfpm::vector<size_t> prc_recv2;
openfpm::vector<size_t> prc_recv3;
openfpm::vector<size_t> prc_send;
openfpm::vector<size_t> sz_recv2;
openfpm::vector<size_t> sz_recv3;
openfpm::vector<openfpm::vector<size_t>> v1;
openfpm::vector<size_t> v2;
openfpm::vector<openfpm::vector<size_t>> v3;
v1.resize(vcl.getProcessingUnits());
size_t nc = vcl.getProcessingUnits() / SSCATTER_MAX;
size_t nr = vcl.getProcessingUnits() - nc * SSCATTER_MAX;
nr = ((nr-1) * nr) / 2;
size_t n_ele = nc * SSCATTER_MAX * (SSCATTER_MAX - 1) / 2 + nr;
for(size_t i = 0 ; i < v1.size() ; i++)
{
for (size_t j = 0 ; j < i % SSCATTER_MAX ; j++)
v1.get(i).add(j);
prc_send.add((i + vcl.getProcessUnitID()) % vcl.getProcessingUnits());
}
vcl.SSendRecv(v1,v2,prc_send,prc_recv2,sz_recv2);
vcl.SSendRecv(v1,v3,prc_send,prc_recv3,sz_recv3);
BOOST_REQUIRE_EQUAL(v2.size(),n_ele);
BOOST_REQUIRE_EQUAL(v3.size(),vcl.getProcessingUnits()-1-nc);
bool match = true;
size_t s = 0;
for (size_t i = 0 ; i < sz_recv2.size() ; i++)
{
for (size_t j = 0 ; j < sz_recv2.get(i) % SSCATTER_MAX ; j++)
{
match &= v2.get(s+j) == j;
}
s += sz_recv2.get(i) % SSCATTER_MAX;
}
BOOST_REQUIRE_EQUAL(match,true);
for (size_t i = 0 ; i < sz_recv3.size() ; i++)
{
for (size_t j = 0 ; j < sz_recv3.get(i) % SSCATTER_MAX ; j++)
{
match &= v3.get(i).get(j) == j;
}
}
BOOST_REQUIRE_EQUAL(match,true);
}
}
BOOST_AUTO_TEST_CASE (Vcluster_semantic_struct_sendrecv)
{
for (size_t i = 0 ; i < 100 ; i++)
{
Vcluster & vcl = create_vcluster();
if (vcl.getProcessingUnits() >= 32)
return;
openfpm::vector<size_t> prc_recv2;
openfpm::vector<size_t> prc_recv3;
openfpm::vector<size_t> prc_send;
openfpm::vector<size_t> sz_recv2;
openfpm::vector<size_t> sz_recv3;
openfpm::vector<openfpm::vector<Box<3,size_t>>> v1;
openfpm::vector<Box<3,size_t>> v2;
openfpm::vector<openfpm::vector<Box<3,size_t>>> v3;
v1.resize(vcl.getProcessingUnits());
size_t nc = vcl.getProcessingUnits() / SSCATTER_MAX;
size_t nr = vcl.getProcessingUnits() - nc * SSCATTER_MAX;
nr = ((nr-1) * nr) / 2;
size_t n_ele = nc * SSCATTER_MAX * (SSCATTER_MAX - 1) / 2 + nr;
for(size_t i = 0 ; i < v1.size() ; i++)
{
for (size_t j = 0 ; j < i % SSCATTER_MAX ; j++)
{
Box<3,size_t> b({j,j,j},{j,j,j});
v1.get(i).add(b);
}
prc_send.add((i + vcl.getProcessUnitID()) % vcl.getProcessingUnits());
}
vcl.SSendRecv(v1,v2,prc_send,prc_recv2,sz_recv2);
vcl.SSendRecv(v1,v3,prc_send,prc_recv3,sz_recv3);
BOOST_REQUIRE_EQUAL(v2.size(),n_ele);
BOOST_REQUIRE_EQUAL(v3.size(),vcl.getProcessingUnits()-1-nc);
bool match = true;
size_t s = 0;
for (size_t i = 0 ; i < sz_recv2.size() ; i++)
{
for (size_t j = 0 ; j < sz_recv2.get(i) % SSCATTER_MAX ; j++)
{
Box<3,size_t> b({j,j,j},{j,j,j});
Box<3,size_t> bt = v2.get(s+j);
match &= bt == b;
}
s += sz_recv2.get(i) % SSCATTER_MAX;
}
BOOST_REQUIRE_EQUAL(match,true);
for (size_t i = 0 ; i < sz_recv3.size() ; i++)
{
for (size_t j = 0 ; j < sz_recv3.get(i) % SSCATTER_MAX ; j++)
{
Box<3,size_t> b({j,j,j},{j,j,j});
Box<3,size_t> bt = v3.get(i).get(j);
match &= bt == b;
}
}
BOOST_REQUIRE_EQUAL(match,true);
}
// Send and receive 0 and check
{
Vcluster & vcl = create_vcluster();
openfpm::vector<size_t> prc_recv2;
openfpm::vector<size_t> prc_send;
openfpm::vector<size_t> sz_recv2;
openfpm::vector<openfpm::vector<Box<3,size_t>>> v1;
openfpm::vector<Box<3,size_t>> v2;
v1.resize(vcl.getProcessingUnits());
for(size_t i = 0 ; i < v1.size() ; i++)
{
prc_send.add((i + vcl.getProcessUnitID()) % vcl.getProcessingUnits());
}
vcl.SSendRecv(v1,v2,prc_send,prc_recv2,sz_recv2);
BOOST_REQUIRE_EQUAL(v2.size(),0ul);
BOOST_REQUIRE_EQUAL(prc_recv2.size(),0ul);
BOOST_REQUIRE_EQUAL(sz_recv2.size(),0ul);
}
}
BOOST_AUTO_TEST_SUITE_END()
#endif /* OPENFPM_VCLUSTER_SRC_VCLUSTER_SEMANTIC_UNIT_TESTS_HPP_ */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment