Commit c9fae2a0 authored by incardon's avatar incardon

Fixing NBX and adding send and recv on vcluster

parent 52da4051
#include "ComUnit.hpp"
#define SERVICE_TAG 0xFFFFFFF
/*! \brief Send some data globally to one processor when the other side
* do not know about the other side
*
* Send some data globally to one processor when the other side
* do not know about the other side
*
* \Warning if you already call this function with p, will overwrite the request
*
* \param p is the processor number
* \param buf is the buffer pointer
* \param sz is the size of the communication
*
*/
bool SentToU(size_t p, void * buf,size_t sz)
{
// before complete the communication we have to notify to the other
// processor that we have some data to send.
if (p >= comReq.size())
{
std::cerr << "Error: file: " << __FILE__ << " line: " << __LINE__ << " processor " << p << " does not exist";
return false;
}
return true;
}
/*! \brief Send some data locally (to its neighborhood) to one processor
*
* Send some data locally to one processor
*
*/
bool SendToNU(void * buf, size_t sz)
{
return true;
}
/*! \brief Send some data globally to one processor when the other side
* know about the other side
*
* Send some data globally to one processor when the other side
* know about the other side
*
* \Warning if you already call this function with p, will overwrite the request
*
* \param p is the processor number
* \param buf is the buffer pointer
* \param sz is the size of the communication
*
*/
bool SendTo(size_t p, void * buf, size_t sz)
{
MPI_ISend(p,buf,sz);
}
/*! \brief Wait for all communication to complete
*
* Wait for all communication to complete
*
* \return true if no error occur
*
*/
bool wait()
{
// Here we have to type of communication to handle
// Type 1 One side does not know that the other side want to communcate
// Type 2 The other side know that want to communicate
// The reqs structure handle the communication of type 2
// comReq and comSize store request of type 2
// For the type 1 we have to reduce scatter the comReq this
// for each processor K return the number of processors that need
// to communicate with K
MPI_ireduce_scatter();
// For the type 2 we already have recv coupled to send so just wait to complete
//! wait for all request to complete
MPI_Waitall(reqs.size(),&reqs[0],&status[0]);
//! For the number of incoming request queue an MPI_IRecv with MPI_ANY_SOURCE
//! It is going to receive the length of the message that each processor need
//! communicate
for (int i = 0 ; i < 5; i++)
{
}
//! For the number of outcomming request queue MPI_ISend sending the length of
//! the message the processor need to comunicate
for (int i = 0 ; i < 5; i++)
{
}
//! wait for all request to complete
MPI_Waitall(reqs.size(),&reqs[0],&status[0]);
//! finally send and receive the data
for (int i = 0 ; i < 5; i++)
{
}
for (int i = 0 ; i < 5; i++)
{
}
//! wait for all request to complete
MPI_Waitall(reqs.size(),&reqs[0],&status[0]);
return true;
}
#ifndef COM_UNIT_HPP
#define COM_UNIT_HPP
#include <mpi.h>
#include <vector>
/*! \brief This is the abstraction of the communication
* unit for the virtual cluster
*
* This with the abstraction of the communication
* unit of the virtual cluster
*
* When this unit is returned back, you must ensure that no other thread
* is using MPI call
*
*/
class ComUnit
{
// if this processor need to communicate with the processor K
// it put 1 at the position K
std::vector<unsigned int> comReq;
// if this processor need to communicate with the processor K
// a message to length m put m at position K
std::vector<size_t> sizeReq;
// List of all status request
std::vector<MPI_Request> reqs;
// List of the status of all the request
std::vector<MPI_Status> stat;
//! Send data
bool SentTo();
//! Send data to the neighborhood
bool SendToN();
//! wait for communication to complete
bool wait();
};
#endif
#ifndef MPI_IALLREDUCEW_HPP
#define MPI_IALLREDUCEW_HPP
#include <mpi.h>
/*! \brief Set of wrapping classing for MPI_Iallreduce
*
* The purpose of these classes is to correctly choose the right call based on the type we want to reduce
*
*/
/*! \brief General reduction
*
* \tparam any type
*
*/
template<typename T> class MPI_IallreduceW
{
public:
static inline void reduce(T & buf,MPI_Op op, MPI_Request & req)
{
std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " cannot recognize " << typeid(T).name() << "\n";
}
};
/*! \brief specialization for integer
*
*/
template<> class MPI_IallreduceW<int>
{
public:
static inline void reduce(int & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf, 1,MPI_INT, op, MPI_COMM_WORLD,&req);
}
};
/*! \brief specialization for unsigned integer
*
*/
template<> class MPI_IallreduceW<unsigned int>
{
public:
static inline void reduce(unsigned int & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf, 1,MPI_UNSIGNED, op, MPI_COMM_WORLD,&req);
}
};
/*! \brief specialization for short
*
*/
template<> class MPI_IallreduceW<short>
{
public:
static inline void reduce(short & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf, 1,MPI_SHORT, op, MPI_COMM_WORLD,&req);
}
};
/*! \brief specialization for short
*
*/
template<> class MPI_IallreduceW<unsigned short>
{
public:
static inline void reduce(unsigned short & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf, 1,MPI_UNSIGNED_SHORT, op, MPI_COMM_WORLD,&req);
}
};
/*! \brief specialization for char
*
*/
template<> class MPI_IallreduceW<char>
{
public:
static inline void reduce(char & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf, 1,MPI_CHAR, op, MPI_COMM_WORLD,&req);
}
};
/*! \brief specialization for char
*
*/
template<> class MPI_IallreduceW<unsigned char>
{
public:
static inline void reduce(unsigned char & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf, 1,MPI_UNSIGNED_CHAR, op, MPI_COMM_WORLD,&req);
}
};
/*! \brief specialization for size_t
*
*/
template<> class MPI_IallreduceW<size_t>
{
public:
static inline void reduce(size_t & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf, 1,MPI_UNSIGNED_LONG, op, MPI_COMM_WORLD,&req);
}
};
/*! \brief specialization for size_t
*
*/
template<> class MPI_IallreduceW<long int>
{
public:
static inline void reduce(long int & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf, 1,MPI_LONG, op, MPI_COMM_WORLD,&req);
}
};
/*! \brief specialization for float
*
*/
template<> class MPI_IallreduceW<float>
{
public:
static inline void reduce(float & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf, 1,MPI_FLOAT, op, MPI_COMM_WORLD,&req);
}
};
/*! \brief specialization for double
*
*/
template<> class MPI_IallreduceW<double>
{
public:
static inline void reduce(double & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf, 1,MPI_DOUBLE, op, MPI_COMM_WORLD,&req);
}
};
////////////////// Specialization for vectors ///////////////
/*! \brief specialization for vector integer
*
*/
/*template<> class MPI_IallreduceW<openfpm::vector<int>>
{
public:
static inline void reduce(openfpm::vector<int> & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf.get(0), buf.size(),MPI_INT, op, MPI_COMM_WORLD,&req);
}
};*/
/*! \brief specialization for vector short
*
*/
/*template<> class MPI_IallreduceW<openfpm::vector<short>>
{
public:
static inline void reduce(openfpm::vector<short> & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf.get(0), buf.size(),MPI_SHORT, op, MPI_COMM_WORLD,&req);
}
};*/
/*! \brief specialization for vector char
*
*/
/*template<> class MPI_IallreduceW<openfpm::vector<char>>
{
public:
static inline void reduce(openfpm::vector<char> & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf.get(0), buf.size(),MPI_CHAR, op, MPI_COMM_WORLD,&req);
}
};*/
/*! \brief specialization for vector size_t
*
*/
/*template<> class MPI_IallreduceW<openfpm::vector<size_t>>
{
public:
static inline void reduce(openfpm::vector<size_t> & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf.get(0), buf.size(),MPI_UNSIGNED_LONG, op, MPI_COMM_WORLD,&req);
}
};*/
/*! \brief specialization for vector float
*
*/
/*template<> class MPI_IallreduceW<openfpm::vector<float>>
{
public:
static inline void reduce(openfpm::vector<float> & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf.get(0), buf.size(),MPI_FLOAT, op, MPI_COMM_WORLD,&req);
}
};*/
/*! \brief specialization for vector double
*
*/
/*template<> class MPI_IallreduceW<openfpm::vector<double>>
{
public:
static inline void reduce(openfpm::vector<double> & buf,MPI_Op op, MPI_Request & req)
{
MPI_Iallreduce(MPI_IN_PLACE, &buf.get(0), buf.size(),MPI_DOUBLE, op, MPI_COMM_WORLD,&req);
}
};*/
#endif
......@@ -97,7 +97,7 @@ CONFIG_CLEAN_VPATH_FILES =
am__installdirs = "$(DESTDIR)$(bindir)"
PROGRAMS = $(bin_PROGRAMS)
am_vcluster_OBJECTS = vcluster-main.$(OBJEXT) \
vcluster-VCluster.$(OBJEXT)
vcluster-VCluster.$(OBJEXT) vcluster-HeapMemory.$(OBJEXT)
vcluster_OBJECTS = $(am_vcluster_OBJECTS)
am__DEPENDENCIES_1 =
am__DEPENDENCIES_2 = $(am__DEPENDENCIES_1) $(am__DEPENDENCIES_1)
......@@ -284,7 +284,7 @@ top_build_prefix = ../
top_builddir = ..
top_srcdir = ..
LINKLIBS = $(DEFAULT_LIB) $(PTHREAD_LIBS) $(OPT_LIBS) $(BOOST_LDFLAGS)
vcluster_SOURCES = main.cpp VCluster.cpp
vcluster_SOURCES = main.cpp VCluster.cpp ../../OpenFPM_devices/src/memory/HeapMemory.cpp
vcluster_CXXFLAGS = $(INCLUDES_PATH) $(BOOST_CPPFLAGS)
vcluster_CFLAGS = $(CUDA_CFLAGS)
vcluster_LDADD = $(LINKLIBS)
......@@ -375,6 +375,7 @@ mostlyclean-compile:
distclean-compile:
-rm -f *.tab.c
include ./$(DEPDIR)/vcluster-HeapMemory.Po
include ./$(DEPDIR)/vcluster-VCluster.Po
include ./$(DEPDIR)/vcluster-main.Po
......@@ -420,6 +421,20 @@ vcluster-VCluster.obj: VCluster.cpp
# DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) \
# $(AM_V_CXX_no)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(vcluster_CXXFLAGS) $(CXXFLAGS) -c -o vcluster-VCluster.obj `if test -f 'VCluster.cpp'; then $(CYGPATH_W) 'VCluster.cpp'; else $(CYGPATH_W) '$(srcdir)/VCluster.cpp'; fi`
vcluster-HeapMemory.o: ../../OpenFPM_devices/src/memory/HeapMemory.cpp
$(AM_V_CXX)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(vcluster_CXXFLAGS) $(CXXFLAGS) -MT vcluster-HeapMemory.o -MD -MP -MF $(DEPDIR)/vcluster-HeapMemory.Tpo -c -o vcluster-HeapMemory.o `test -f '../../OpenFPM_devices/src/memory/HeapMemory.cpp' || echo '$(srcdir)/'`../../OpenFPM_devices/src/memory/HeapMemory.cpp
$(AM_V_at)$(am__mv) $(DEPDIR)/vcluster-HeapMemory.Tpo $(DEPDIR)/vcluster-HeapMemory.Po
# $(AM_V_CXX)source='../../OpenFPM_devices/src/memory/HeapMemory.cpp' object='vcluster-HeapMemory.o' libtool=no \
# DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) \
# $(AM_V_CXX_no)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(vcluster_CXXFLAGS) $(CXXFLAGS) -c -o vcluster-HeapMemory.o `test -f '../../OpenFPM_devices/src/memory/HeapMemory.cpp' || echo '$(srcdir)/'`../../OpenFPM_devices/src/memory/HeapMemory.cpp
vcluster-HeapMemory.obj: ../../OpenFPM_devices/src/memory/HeapMemory.cpp
$(AM_V_CXX)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(vcluster_CXXFLAGS) $(CXXFLAGS) -MT vcluster-HeapMemory.obj -MD -MP -MF $(DEPDIR)/vcluster-HeapMemory.Tpo -c -o vcluster-HeapMemory.obj `if test -f '../../OpenFPM_devices/src/memory/HeapMemory.cpp'; then $(CYGPATH_W) '../../OpenFPM_devices/src/memory/HeapMemory.cpp'; else $(CYGPATH_W) '$(srcdir)/../../OpenFPM_devices/src/memory/HeapMemory.cpp'; fi`
$(AM_V_at)$(am__mv) $(DEPDIR)/vcluster-HeapMemory.Tpo $(DEPDIR)/vcluster-HeapMemory.Po
# $(AM_V_CXX)source='../../OpenFPM_devices/src/memory/HeapMemory.cpp' object='vcluster-HeapMemory.obj' libtool=no \
# DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) \
# $(AM_V_CXX_no)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) $(vcluster_CXXFLAGS) $(CXXFLAGS) -c -o vcluster-HeapMemory.obj `if test -f '../../OpenFPM_devices/src/memory/HeapMemory.cpp'; then $(CYGPATH_W) '../../OpenFPM_devices/src/memory/HeapMemory.cpp'; else $(CYGPATH_W) '$(srcdir)/../../OpenFPM_devices/src/memory/HeapMemory.cpp'; fi`
ID: $(am__tagged_files)
$(am__define_uniq_tagged_files); mkid -fID $$unique
tags: tags-am
......
......@@ -2,7 +2,7 @@
LINKLIBS = $(DEFAULT_LIB) $(PTHREAD_LIBS) $(OPT_LIBS) $(BOOST_LDFLAGS)
bin_PROGRAMS = vcluster
vcluster_SOURCES = main.cpp VCluster.cpp
vcluster_SOURCES = main.cpp VCluster.cpp ../../OpenFPM_devices/src/memory/HeapMemory.cpp
vcluster_CXXFLAGS = $(INCLUDES_PATH) $(BOOST_CPPFLAGS)
vcluster_CFLAGS = $(CUDA_CFLAGS)
vcluster_LDADD = $(LINKLIBS)
......
//
// Project : ThreadPool
// File : TThread.cc
// Author : Ronald Kriemann
// Purpose : baseclass for a thread-able class
//
// arch-tag: aec71576-f8d5-4573-ad31-f3dbddc59934
//
#include <unistd.h>
#include <signal.h>
#include <time.h>
#include <sched.h>
#include <string.h>
#include <iostream>
#include <cmath>
#include "TThread.h"
//
// routine to call TThread::run() method
//
extern "C"
void *
_run_thread ( void *arg )
{
if (arg != NULL)
{
((TThread*) arg)->run();
((TThread*) arg)->reset_running();
}// if
return NULL;
}
////////////////////////////////////////////
//
// constructor and destructor
//
TThread::TThread ( const int thread_no )
: _running( false ), _thread_no(thread_no)
{
}
TThread::~TThread ()
{
// request cancellation of the thread if running
if ( _running )
cancel();
}
////////////////////////////////////////////
//
// access local data
//
void
TThread::set_thread_no ( const int n )
{
_thread_no = n;
}
////////////////////////////////////////////
//
// thread management
//
//
// create thread (actually start it)
//
void
TThread::create ( const bool detached, const bool sscope )
{
if ( ! _running )
{
int status;
if ((status = pthread_attr_init( & _thread_attr )) != 0)
{
std::cerr << "(TThread) create : pthread_attr_init ("
<< strerror( status ) << ")" << std::endl;
return;
}// if
if ( detached )
{
// detache created thread from calling thread
if ((status = pthread_attr_setdetachstate( & _thread_attr,
PTHREAD_CREATE_DETACHED )) != 0)
{
std::cerr << "(TThread) create : pthread_attr_setdetachstate ("
<< strerror( status ) << ")" << std::endl;
return;
}// if
}// if
if ( sscope )
{
// use system-wide scheduling for thread
if ((status = pthread_attr_setscope( & _thread_attr, PTHREAD_SCOPE_SYSTEM )) != 0 )
{
std::cerr << "(TThread) create : pthread_attr_setscope ("
<< strerror( status ) << ")" << std::endl;
return;
}// if
}// if
#if defined(_POSIX_THREAD_PRIORITY_SCHEDULING) && defined(SUNOS)
//
// adjust thread-scheduling for Solaris
//
struct sched_param t_param;
t_param.sched_priority = sched_get_priority_min( SCHED_RR );
if ((status = pthread_attr_setschedpolicy( & _thread_attr, SCHED_RR )) != 0)
std::cerr << "(TThread) create : pthread_attr_setschedpolicy ("
<< strerror( status ) << ")" << std::endl;
if ((status = pthread_attr_setschedparam( & _thread_attr, & t_param )) != 0)
std::cerr << "(TThread) create : pthread_attr_setschedparam ("
<< strerror( status ) << ")" << std::endl;
if ((status = pthread_attr_setinheritsched( & _thread_attr, PTHREAD_EXPLICIT_SCHED )) != 0)
std::cerr << "(TThread) create : pthread_attr_setinheritsched ("
<< strerror( status ) << ")" << std::endl;
#endif
#ifdef HPUX
// on HP-UX we increase the stack-size for a stable behaviour
// (need much memory for this !!!)
pthread_attr_setstacksize( & _thread_attr, 32 * 1024 * 1024 );
#endif
if ((status = pthread_create( & _thread_id, & _thread_attr, _run_thread, this ) != 0))
std::cerr << "(TThread) create : pthread_create ("
<< strerror( status ) << ")" << std::endl;
else
_running = true;
}// if
else
std::cout << "(TThread) create : thread is already running" << std::endl;
}
//
// detach thread
//
void
TThread::detach ()
{
if ( _running )
{
int status;
// detach thread
if ((status = pthread_detach( _thread_id )) != 0)
std::cerr << "(TThread) detach : pthread_detach ("
<< strerror( status ) << ")" << std::endl;
}// if
}
//
// synchronise with thread (wait until finished)
//
void
TThread::join ()
{
if ( _running )
{
int status;
// wait for thread to finish
if ((status = pthread_join( _thread_id, NULL )) != 0)
std::cerr << "(TThread) join : pthread_join ("
<< strerror( status ) << ")" << std::endl;
_running = false;
}// if
}
//
// request cancellation of thread
//
void
TThread::cancel ()
{
if ( _running )
{
int status;
if ((status = pthread_cancel( _thread_id )) != 0)
std::cerr << "(TThread) cancel : pthread_cancel ("
<< strerror( status ) << ")" << std::endl;
}// if
}
////////////////////////////////////////////
//
// functions to be called by a thread itself
//
//
// terminate thread
//
void
TThread::exit ()
{
if ( _running && (pthread_self() == _thread_id))
{
void * ret_val = NULL;
pthread_exit( ret_val );
_running = false;
}// if
}
//
// put thread to sleep (milli + nano seconds)
//
void
TThread::sleep ( const double sec )
{
if ( _running )
{
struct timespec interval;
if ( sec <= 0.0 )
{
interval.tv_sec = 0;
interval.tv_nsec = 0;
}// if
else
{
interval.tv_sec = time_t( std::floor( sec ) );
interval.tv_nsec = long( (sec - interval.tv_sec) * 1e6 );
}// else
nanosleep( & interval, 0 );
}// if
}
#ifndef __TTHREAD_HH
#define __TTHREAD_HH
//
// Project : ThreadPool
// File : TThread.hh
// Author : Ronald Kriemann
// Purpose : baseclass for a thread-able class
//
// arch-tag: d09c570a-520a-48ce-b612-a813b50e87b4
//
#include <stdio.h>
#include <pthread.h>
#ifdef HAVE_CUDA
#include <cuda.h>
#endif
////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////
//
// baseclass for all threaded classes
// - defines basic interface
//
////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////
class TThread
{
protected:
// thread-specific things
pthread_t _thread_id;
pthread_attr_t _thread_attr;
// is the thread running or not
bool _running;
// no of thread
int _thread_no;
public:
////////////////////////////////////////////
//
// constructor and destructor
//
TThread ( const int thread_no = -1 );
virtual ~TThread ();
////////////////////////////////////////////
//
// access local data
//
int thread_no () const { return _thread_no; }
int proc_no () const { return _thread_no; }
void set_thread_no ( const int n );
// compare if given proc-no is local one