Commit cba7b8f2 authored by Pietro Incardona's avatar Pietro Incardona
Browse files

Refactorization of mappings and ghost_get for vector dist

parent d28562ea
......@@ -45,6 +45,7 @@ ChangeLog
INSTALL
NEWS
README
**/vtk/Makefile
**/src/Makefile
./Makefile
Makefile.in
......@@ -71,3 +72,4 @@ config.sub
depcomp
install-sh
missing
install_dir
openfpm_data @ b8378915
Subproject commit 5e0680ec6f7e0e39269356315571ac0580caad35
Subproject commit b8378915a8c9f49d02e7deebbe2ef1ab76fcc054
openfpm_devices @ 52aa2cff
Subproject commit 919638297d5c92749400c69c4a271a2d742cf594
Subproject commit 52aa2cffd29ad7e4fc306e4f58fd2be2e596fd81
......@@ -86,7 +86,7 @@ private:
Vcluster & v_cl;
// definition of the send vector for position
typedef openfpm::vector<Point<dim,St>,ExtPreAlloc<Memory>> send_pos_vector;
typedef openfpm::vector<Point<dim,St>,ExtPreAlloc<Memory>,openfpm::grow_policy_identity> send_pos_vector;
//////////////////////////////
// COMMUNICATION variables
......@@ -96,7 +96,10 @@ private:
openfpm::vector<size_t> p_map_req;
//! For each near processor, outgoing particle id and shift vector
openfpm::vector<openfpm::vector<std::pair<size_t,size_t>>> opart;
openfpm::vector<openfpm::vector<size_t>> opart;
//! For each near processor, particle shift vector
openfpm::vector<openfpm::vector<size_t>> oshift;
//! For each adjacent processor the size of the ghost sending buffer
openfpm::vector<size_t> ghost_prc_sz;
......@@ -110,32 +113,40 @@ private:
//! For each adjacent processor it store the size of the receiving message in byte
openfpm::vector<size_t> recv_sz;
//! For each adjacent processot it store the received message
//! For each adjacent processor it store the received message for ghost get
openfpm::vector<HeapMemory> recv_mem_gg;
//! Receive buffer for global communication
HeapMemory hp_recv;
//! For each message contain the processor from which processor come from
openfpm::vector<size_t> v_proc;
//! For each processor it store the received message for global map
openfpm::vector<HeapMemory> recv_mem_gm;
//! Total size of the received buffer
size_t recv_cnt;
/*! \brief It store for each processor the position and properties vector of the particles
*
*
*/
struct pos_prop
{
//! position vector
openfpm::vector<Point<dim,St>,PreAllocHeapMemory<2>,openfpm::grow_policy_identity> pos;
//! properties vector
openfpm::vector<prop,PreAllocHeapMemory<2>,openfpm::grow_policy_identity> prp;
};
/*! \brief Label particles for mappings
*
* \param lbl_p Particle labeled
* \param p_map processor map (for each processor 1=comunication required 0=no communication)
* \param prc_sz processor send buffer size (number of particles)
* \param prc_sz For each processor the number of particles to send
* \param opart id of the particles to send
*
*/
void labelParticleMap(openfpm::vector<size_t> & lbl_p, openfpm::vector<size_t> & p_map, openfpm::vector<size_t> & prc_sz, openfpm::vector<size_t> & opart)
void labelParticleProcessor(openfpm::vector<openfpm::vector<size_t>> & lbl_p, openfpm::vector<size_t> & prc_sz, openfpm::vector<size_t> & opart)
{
// reset lbl_p
lbl_p.resize(v_cl.getProcessingUnits());
for (size_t i = 0 ; i < lbl_p.size() ; i++)
lbl_p.get(i).clear();
// resize the label buffer
prc_sz.resize(v_cl.getProcessingUnits());
p_map.resize(v_cl.getProcessingUnits());
lbl_p.resize(v_pos.size());
auto it = v_pos.getIterator();
......@@ -149,14 +160,11 @@ private:
size_t p_id = dec.processorIDBC(v_pos.get(key));
lbl_p.get(key) = p_id;
// Particle toe move
// Particle to move
if (p_id != v_cl.getProcessUnitID())
{
p_map.get(p_id) = 1;
prc_sz.get(p_id)++;
lbl_p.get(p_id).add(key);
opart.add(key);
}
......@@ -170,8 +178,14 @@ private:
*
* It count the number of particle to send to each processors and save its ids
*
* \param prc_sz For each processor the number of particles to send
* \param opart id if of the particles to send
* \param shift_id shift correction id
*
* \see nn_prcs::getShiftvectors()
*
*/
void labelParticles()
void labelParticlesGhost()
{
// Buffer that contain the number of elements to send for each processor
ghost_prc_sz.clear();
......@@ -181,6 +195,10 @@ private:
opart.clear();
opart.resize(dec.getNNProcessors());
// Buffer that contain for each processor the id of the shift vector
oshift.clear();
oshift.resize(dec.getNNProcessors());
// Iterate over all particles
auto it = v_pos.getIterator();
while (it.isNext())
......@@ -199,7 +217,8 @@ private:
// add particle to communicate
ghost_prc_sz.get(p_id)++;
opart.get(p_id).add(std::pair<size_t,size_t>(key,vp_id.get(i).second));
opart.get(p_id).add(key);
oshift.get(p_id).add(vp_id.get(i).second);
}
++it;
......@@ -314,7 +333,7 @@ private:
* \param prAlloc_pos Memory object for the send buffer
*
*/
void fill_send_pos_buf(openfpm::vector<send_pos_vector> & g_pos_send, ExtPreAlloc<Memory> * prAlloc_pos)
void fill_send_ghost_pos_buf(openfpm::vector<send_pos_vector> & g_pos_send, ExtPreAlloc<Memory> * prAlloc_pos)
{
// get the shift vectors
const openfpm::vector<Point<dim,St>> & shifts = dec.getShiftVectors();
......@@ -335,8 +354,8 @@ private:
{
for (size_t j = 0 ; j < opart.get(i).size() ; j++)
{
Point<dim,St> s = v_pos.get(opart.get(i).get(j).first);
s -= shifts.get(opart.get(i).get(j).second);
Point<dim,St> s = v_pos.get(opart.get(i).get(j));
s -= shifts.get(oshift.get(i).get(j));
g_pos_send.get(i).set(j,s);
}
}
......@@ -352,7 +371,7 @@ private:
* \param prAlloc_prp Memory object for the send buffer
*
*/
template<typename send_vector,typename prp_object, int... prp> void fill_send_prp_buf(openfpm::vector<send_vector> & g_send_prp, ExtPreAlloc<Memory> * prAlloc_prp)
template<typename send_vector,typename prp_object, int... prp> void fill_send_ghost_prp_buf(openfpm::vector<send_vector> & g_send_prp, ExtPreAlloc<Memory> * prAlloc_prp)
{
// create a number of send buffers equal to the near processors
g_send_prp.resize(ghost_prc_sz.size());
......@@ -376,7 +395,58 @@ private:
typedef encapc<1,prp_object,typename openfpm::vector<prp_object>::memory_conf> encap_dst;
// Copy only the selected properties
object_si_d<encap_src,encap_dst,OBJ_ENCAP,prp...>(v_prp.get(opart.get(i).get(j).first),g_send_prp.get(i).get(j));
object_si_d<encap_src,encap_dst,OBJ_ENCAP,prp...>(v_prp.get(opart.get(i).get(j)),g_send_prp.get(i).get(j));
}
}
}
/*! \brief allocate and fill the send buffer for the map function
*
* \param prc_r List of processor rank involved in the send
* \param prc_r_sz For each processor in the list the size of the message to send
* \param pb send buffer
*
*/
void fill_send_map_buf(openfpm::vector<size_t> & prc_r, openfpm::vector<size_t> & prc_sz_r, openfpm::vector<pos_prop> & pb)
{
pb.resize(prc_r.size());
for (size_t i = 0 ; i < prc_r.size() ; i++)
{
// Create the size required to store the particles position and properties to communicate
size_t s1 = openfpm::vector<Point<dim,St>,HeapMemory,openfpm::grow_policy_identity>::calculateMem(prc_sz_r.get(i),0);
size_t s2 = openfpm::vector<prop,HeapMemory,openfpm::grow_policy_identity>::calculateMem(prc_sz_r.get(i),0);
// Preallocate the memory
size_t sz[2] = {s1,s2};
PreAllocHeapMemory<2> * mem = new PreAllocHeapMemory<2>(sz);
// Set the memory allocator
pb.get(i).pos.setMemory(*mem);
pb.get(i).prp.setMemory(*mem);
// set the size and allocate, using mem warant that pos and prp is contiguous
pb.get(i).pos.resize(prc_sz_r.get(i));
pb.get(i).prp.resize(prc_sz_r.get(i));
}
// Run through all the particles and fill the sending buffer
for (size_t i = 0 ; i < opart.size() ; i++)
{
auto it = opart.get(i).getIterator();
size_t lbl = p_map_req.get(i);
while (it.isNext())
{
size_t key = it.get();
size_t id = opart.get(i).get(key);
pb.get(lbl).pos.set(key,v_pos.get(id));
pb.get(lbl).prp.set(key,v_prp.get(id));
++it;
}
}
}
......@@ -388,7 +458,7 @@ private:
* \tparam prp set of properties to send
*
*/
template<typename send_vector,typename prp_object, int... prp> void process_received_prp()
template<typename send_vector,typename prp_object, int... prp> void process_received_ghost_prp()
{
// Mark the ghost part
g_m = v_prp.size();
......@@ -418,7 +488,7 @@ private:
/*! \brief This function process the received data for the properties and populate the ghost
*
*/
void process_received_pos()
void process_received_ghost_pos()
{
// Process the received data (recv_mem_gg != 0 if you have data)
for (size_t i = 0 ; i < dec.getNNProcessors() && recv_mem_gg.size() != 0 ; i++)
......@@ -443,6 +513,64 @@ private:
}
}
/*! \brief Process the received particles
*
* \param list of the out-going particles
*
*/
void process_received_map(openfpm::vector<size_t> & out_part)
{
size_t o_p_id = 0;
for (size_t i = 0 ; i < recv_mem_gm.size() ; i++)
{
// Get the number of elements
size_t n_ele = recv_mem_gm.get(i).size() / (sizeof(Point<dim,St>) + sizeof(prop));
// Pointer of the received positions for each near processor
void * ptr_pos = (unsigned char *)recv_mem_gm.get(i).getPointer();
// Pointer of the received properties for each near processor
void * ptr_prp = (unsigned char *)recv_mem_gm.get(i).getPointer() + n_ele * sizeof(Point<dim,St>);
PtrMemory * ptr1 = new PtrMemory(ptr_pos,n_ele * sizeof(Point<dim,St>));
PtrMemory * ptr2 = new PtrMemory(ptr_prp,n_ele * sizeof(prop));
// create vector representation to a piece of memory already allocated
openfpm::vector<Point<dim,St>,PtrMemory,openfpm::grow_policy_identity> vpos;
openfpm::vector<prop,PtrMemory,openfpm::grow_policy_identity> vprp;
vpos.setMemory(*ptr1);
vprp.setMemory(*ptr2);
vpos.resize(n_ele);
vprp.resize(n_ele);
// Add the received particles to v_pos and v_prp
size_t j = 0;
for ( ; j < vpos.size() && o_p_id < out_part.size() ; j++, o_p_id++)
{
v_pos.set(out_part.get(o_p_id),vpos.get(j));
v_prp.set(out_part.get(o_p_id),vprp.get(j));
}
for ( ; j < vpos.size(); j++)
{
v_pos.add();
v_pos.set(v_pos.size()-1,vpos.get(j));
v_prp.add();
v_prp.set(v_prp.size()-1,vprp.get(j));
}
}
// remove the (out-going particles) in the vector
v_pos.remove(out_part,o_p_id);
v_prp.remove(out_part,o_p_id);
}
/*! \brief Calculate send buffers total size and allocation
*
* \tparam prp_object object containing only the properties to send
......@@ -453,16 +581,16 @@ private:
* \param pap_pos allocation sequence for the position buffer
*
*/
template<typename prp_object> void calc_send_buf(size_t & size_byte_prp, size_t & size_byte_pos, std::vector<size_t> & pap_prp, std::vector<size_t> & pap_pos)
template<typename prp_object> void calc_send_ghost_buf(size_t & size_byte_prp, size_t & size_byte_pos, std::vector<size_t> & pap_prp, std::vector<size_t> & pap_pos)
{
// Calculate the total size required for the sending buffer
for ( size_t i = 0 ; i < ghost_prc_sz.size() ; i++ )
{
size_t alloc_ele = openfpm::vector<prp_object>::calculateMem(ghost_prc_sz.get(i),0);
size_t alloc_ele = openfpm::vector<prp_object,HeapMemory,openfpm::grow_policy_identity>::calculateMem(ghost_prc_sz.get(i),0);
pap_prp.push_back(alloc_ele);
size_byte_prp += alloc_ele;
alloc_ele = openfpm::vector<Point<dim,St>>::calculateMem(ghost_prc_sz.get(i),0);
alloc_ele = openfpm::vector<Point<dim,St>,HeapMemory,openfpm::grow_policy_identity>::calculateMem(ghost_prc_sz.get(i),0);
pap_pos.push_back(alloc_ele);
size_byte_pos += alloc_ele;
}
......@@ -479,7 +607,7 @@ public:
*
*/
vector_dist(size_t np, Box<dim,St> box, const size_t (& bc)[dim] ,const Ghost<dim,St> & g)
:dec(*global_v_cluster),v_cl(*global_v_cluster),recv_cnt(0)
:dec(*global_v_cluster),v_cl(*global_v_cluster)
{
#ifdef SE_CLASS2
check_new(this,8,VECTOR_DIST_EVENT,4);
......@@ -578,18 +706,6 @@ public:
return v_prp.template get<id>(vec_key.getKey());
}
/*! \brief It store for each processor the position and properties vector of the particles
*
*
*/
struct pos_prop
{
//! position vector
openfpm::vector<Point<dim,St>,PreAllocHeapMemory<2>,openfpm::grow_policy_identity> pos;
//! properties vector
openfpm::vector<prop,PreAllocHeapMemory<2>,openfpm::grow_policy_identity> prp;
};
/*! \brief It move all the particles that does not belong to the local processor to the respective processor
*
* In general this function is called after moving the particles to move the
......@@ -599,18 +715,12 @@ public:
*/
void map()
{
// Labeling particles
openfpm::vector<size_t> lbl_p;
// outgoing particles-id
openfpm::vector<size_t> opart;
openfpm::vector<size_t> out_part;
// Processor communication size
openfpm::vector<size_t> prc_sz(v_cl.getProcessingUnits());
// Contain the map of the processors, this processors should communicate with
openfpm::vector<size_t> p_map(v_cl.getProcessingUnits());
// It contain the list of the processors this processor should to communicate with
openfpm::vector<size_t> p_list;
......@@ -619,7 +729,7 @@ public:
v_prp.resize(g_m);
// Contain the processor id of each particle (basically where they have to go)
labelParticleMap(lbl_p,p_map,prc_sz,opart);
labelParticleProcessor(opart,prc_sz,out_part);
// Calculate the sending buffer size for each processor, put this information in
// a contiguous buffer
......@@ -629,7 +739,7 @@ public:
for (size_t i = 0 ; i < v_cl.getProcessingUnits() ; i++)
{
if (p_map.get(i) == 1)
if (prc_sz.get(i) != 0)
{
p_map_req.get(i) = prc_r.size();
prc_r.add(i);
......@@ -639,56 +749,10 @@ public:
// Allocate the send buffers
openfpm::vector<pos_prop> pb(prc_r.size());
for (size_t i = 0 ; i < prc_r.size() ; i++)
{
// Create the size required to store the particles position and properties to communicate
size_t s1 = openfpm::vector<Point<dim,St>,HeapMemory,openfpm::grow_policy_identity>::calculateMem(prc_sz_r.get(i),0);
size_t s2 = openfpm::vector<prop,HeapMemory,openfpm::grow_policy_identity>::calculateMem(prc_sz_r.get(i),0);
// Preallocate the memory
size_t sz[2] = {s1,s2};
PreAllocHeapMemory<2> * mem = new PreAllocHeapMemory<2>(sz);
// Set the memory allocator
pb.get(i).pos.setMemory(*mem);
pb.get(i).prp.setMemory(*mem);
// set the size and allocate, using mem warant that pos and prp is contiguous
pb.get(i).pos.resize(prc_sz_r.get(i));
pb.get(i).prp.resize(prc_sz_r.get(i));
}
// Run through all the particles and fill the sending buffer
openfpm::vector<size_t> prc_cnt(prc_r.size());
prc_cnt.fill(0);
auto it = lbl_p.getIterator();
while (it.isNext())
{
auto key = it.get();
size_t lbl = lbl_p.get(key);
if (lbl == v_cl.getProcessUnitID())
{
++it;
continue;
}
lbl = p_map_req.get(lbl);
pb.get(lbl).pos.set(prc_cnt.get(lbl),v_pos.get(key));
pb.get(lbl).prp.set(prc_cnt.get(lbl),v_prp.get(key));
openfpm::vector<pos_prop> pb;
prc_cnt.get(lbl)++;
// Add processors and add size
++it;
}
// fill the send buffers
fill_send_map_buf(prc_r,prc_sz_r,pb);
// Create the set of pointers
openfpm::vector<void *> ptr(prc_r.size());
......@@ -705,65 +769,12 @@ public:
// Send and receive the particles
v_proc.clear();
recv_cnt = 0;
v_cl.sendrecvMultipleMessagesPCX(prc_sz_r.size(),&p_map.get(0), (size_t *)prc_sz_r.getPointer(), (size_t *)prc_r.getPointer() , (void **)ptr.getPointer() , vector_dist::message_alloc_map, this ,NEED_ALL_SIZE);
recv_mem_gm.clear();
v_cl.sendrecvMultipleMessagesNBX(prc_sz_r.size(), (size_t *)prc_sz_r.getPointer(), (size_t *)prc_r.getPointer() , (void **)ptr.getPointer() , vector_dist::message_alloc_map, this ,NEED_ALL_SIZE);
// Process the incoming particles
size_t total_element = 0;
size_t o_p_id = 0;
for (size_t i = 0 ; i < v_proc.size() ; i++)
{
// Get the number of elements
size_t n_ele = v_proc.get(i) / (sizeof(Point<dim,St>) + sizeof(prop));
// Pointer of the received positions for each near processor
void * ptr_pos = ((unsigned char *)hp_recv.getPointer()) + (total_element * (sizeof(Point<dim,St>) + sizeof(prop)));
// Pointer of the received properties for each near processor
void * ptr_prp = ((unsigned char *)hp_recv.getPointer()) + (total_element * (sizeof(Point<dim,St>) + sizeof(prop))) + n_ele * sizeof(Point<dim,St>);
PtrMemory * ptr1 = new PtrMemory(ptr_pos,n_ele * sizeof(Point<dim,St>));
PtrMemory * ptr2 = new PtrMemory(ptr_prp,n_ele * sizeof(prop));
// create vector representation to a piece of memory already allocated
openfpm::vector<Point<dim,St>,PtrMemory,openfpm::grow_policy_identity> vpos;
openfpm::vector<prop,PtrMemory,openfpm::grow_policy_identity> vprp;
vpos.setMemory(*ptr1);
vprp.setMemory(*ptr2);
vpos.resize(n_ele);
vprp.resize(n_ele);
// Add the received particles to v_pos and v_prp
size_t j = 0;
for ( ; j < vpos.size() && o_p_id < opart.size() ; j++, o_p_id++)
{
v_pos.set(opart.get(o_p_id),vpos.get(j));
v_prp.set(opart.get(o_p_id),vprp.get(j));
}
for ( ; j < vpos.size(); j++)
{
v_pos.add();
v_pos.set(v_pos.size()-1,vpos.get(j));
v_prp.add();
v_prp.set(v_prp.size()-1,vprp.get(j));
}
// increment the total number of element counter
total_element += n_ele;
}
// remove the (out-going particles) in the vector
v_pos.remove(opart,o_p_id);
v_prp.remove(opart,o_p_id);
process_received_map(out_part);
// mark the ghost part
......@@ -782,14 +793,14 @@ public:
typedef object<typename object_creator<typename prop::type,prp...>::type> prp_object;
// send vector for each processor
typedef openfpm::vector<prp_object,ExtPreAlloc<Memory>> send_vector;
typedef openfpm::vector<prp_object,ExtPreAlloc<Memory>,openfpm::grow_policy_identity> send_vector;
// reset the ghost part
v_pos.resize(g_m);
v_prp.resize(g_m);
// Label all the particles
labelParticles();
labelParticlesGhost();
// Calculate memory and allocation for the send buffers
......@@ -801,7 +812,7 @@ public:
std::vector<size_t> pap_prp;
std::vector<size_t> pap_pos;
calc_send_buf<prp_object>(size_byte_prp,size_byte_pos,pap_prp,pap_pos);
calc_send_ghost_buf<prp_object>(size_byte_prp,size_byte_pos,pap_prp,pap_pos);
// Create memory for the send buffer
......@@ -812,7 +823,7 @@ public:
ExtPreAlloc<Memory> * prAlloc_prp = new ExtPreAlloc<Memory>(pap_prp,g_prp_mem);
openfpm::vector<send_vector> g_send_prp;
fill_send_prp_buf<send_vector,prp_object,prp...>(g_send_prp,prAlloc_prp);
fill_send_ghost_prp_buf<send_vector,prp_object,prp...>(g_send_prp,prAlloc_prp);
// Create and fill the send buffer for the particle position
......@@ -821,7 +832,7 @@ public:
if (opt != NO_POSITION)
{
prAlloc_pos = new ExtPreAlloc<Memory>(pap_pos,g_pos_mem);
fill_send_pos_buf(g_pos_send,prAlloc_pos);
fill_send_ghost_pos_buf(g_pos_send,prAlloc_pos);
}
// Create processor list
......@@ -831,14 +842,14 @@ public:
// Send/receive the particle properties information
v_cl.sendrecvMultipleMessagesNBX(prc,g_send_prp,msg_alloc_ghost_get,this);
process_received_prp<send_vector,prp_object,prp...>();
process_received_ghost_prp<send_vector,prp_object,prp...>();
if (opt != NO_POSITION)
{
// Send/receive the particle properties information
v_cl.sendrecvMultipleMessagesNBX(prc,g_pos_send,msg_alloc_ghost_get,this);
process_received_pos();
process_received_ghost_pos();
}
add_loc_particles_bc();
......@@ -890,20 +901,10 @@ public:
// cast the pointer
vector_dist<dim,St,prop,Decomposition,Memory> * vd = static_cast<vector_dist<dim,St,prop,Decomposition,Memory> *>(ptr);
// Resize the receive buffer, and the size of each message buffer
vd->hp_recv.resize(total_msg);
vd->v_proc.resize(total_p);
// Return the receive pointer
void * recv_ptr = (unsigned char *)vd->hp_recv.getPointer() + vd->recv_cnt;
// increment the receive pointer
vd->recv_cnt += msg_i;
// Save the processor message size
vd->v_proc.get(ri) = msg_i;
vd->recv_mem_gm.resize(vd->v_cl.getProcessingUnits());
vd->recv_mem_gm.get(i).resize(msg_i);
return recv_ptr;
return vd->recv_mem_gm.get(i).getPointer();
}
......