vector_dist_comm.hpp 36.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/*
 * vector_dist_comm.hpp
 *
 *  Created on: Aug 18, 2016
 *      Author: i-bird
 */

#ifndef SRC_VECTOR_VECTOR_DIST_COMM_HPP_
#define SRC_VECTOR_VECTOR_DIST_COMM_HPP_

#define SKIP_LABELLING 512
12
#define KEEP_PROPERTIES 512
13 14 15

#define NO_POSITION 1
#define WITH_POSITION 2
16
#define NO_CHANGE_ELEMENTS 4
17

18 19
#define BIND_DEC_TO_GHOST 1

incardon's avatar
incardon committed
20 21 22 23 24 25 26 27 28 29 30 31 32
/*! \brief compute the communication options from the ghost_get/put options
 *
 *
 */
inline static size_t compute_options(size_t opt)
{
	size_t opt_ = NONE;
	if (opt & NO_CHANGE_ELEMENTS)
		opt_ = RECEIVE_KNOWN | KNOWN_ELEMENT_OR_BYTE;

	return opt_;
}

33 34 35 36 37 38 39 40 41 42 43 44
/*! \brief This class is an helper for the communication of vector_dist
 *
 * \tparam dim Dimensionality of the space where the elements lives
 * \tparam St type of space float, double ...
 * \tparam prop properties the vector element store in OpenFPM data structure format
 * \tparam Decomposition Decomposition strategy to use CartDecomposition ...
 * \tparam Memory Memory pool where store the information HeapMemory ...
 *
 * \see vector_dist
 *
 */

incardon's avatar
incardon committed
45 46 47 48 49 50 51
template<unsigned int dim,
         typename St,
		 typename prop,
		 typename layout,
		 template <typename> class layout_base,
		 typename Decomposition = CartDecomposition<dim,St>,
		 typename Memory = HeapMemory>
52 53
class vector_dist_comm
{
incardon's avatar
incardon committed
54 55 56
	//! Number of units for each sub-domain
	size_t v_sub_unit_factor = 64;

57 58 59
	//! definition of the send vector for position
	typedef openfpm::vector<Point<dim, St>, Memory> send_pos_vector;

60 61 62 63 64 65 66 67 68
	//! VCluster
	Vcluster & v_cl;

	//! Domain decomposition
	Decomposition dec;

	//! It map the processor id with the communication request into map procedure
	openfpm::vector<size_t> p_map_req;

Pietro Incardona's avatar
Pietro Incardona committed
69
	//! For each near processor, outgoing particle id
70 71 72 73 74
	//! \warning opart is assumed to be an ordered list
	//! first id particle id
	//! second id shift id
	//! third id is the processor id
	openfpm::vector<aggregate<size_t,size_t,size_t>> m_opart;
75

incardon's avatar
incardon committed
76
	//! Per processor ordered particles id for ghost_get (see prc_g_opart)
77 78 79
	//! For each processor the internal vector store the id of the
	//! particles that must be communicated to the other processors
	openfpm::vector<openfpm::vector<aggregate<size_t,size_t>>> g_opart;
80

incardon's avatar
incardon committed
81 82 83
	//! Per processor number of particle g_opart_sz.get(i) = g_opart.get(i).size()
	openfpm::vector<size_t> g_opart_sz;

84
	//! processor rank list of g_opart
incardon's avatar
incardon committed
85 86
	openfpm::vector<size_t> prc_g_opart;

87 88 89
	//! It store the list of processor that communicate with us (local processor)
	//! from the last ghost get
	openfpm::vector<size_t> prc_recv_get;
Pietro Incardona's avatar
Pietro Incardona committed
90

91
	//! the same as prc_recv_get but for put
Pietro Incardona's avatar
Pietro Incardona committed
92 93
	openfpm::vector<size_t> prc_recv_put;

94 95
	//! the same as prc_recv_get but for map
	openfpm::vector<size_t> prc_recv_map;
Pietro Incardona's avatar
Pietro Incardona committed
96

97 98 99
	//! It store the size of the elements added for each processor that communicate with us (local processor)
	//! from the last ghost get
	openfpm::vector<size_t> recv_sz_get;
100 101
	//! Conversion to byte of recv_sz_get
	openfpm::vector<size_t> recv_sz_get_byte;
incardon's avatar
incardon committed
102

103

104
	//! The same as recv_sz_get but for put
Pietro Incardona's avatar
Pietro Incardona committed
105 106
	openfpm::vector<size_t> recv_sz_put;

107 108
	//! The same as recv_sz_get but for map
	openfpm::vector<size_t> recv_sz_map;
109

Pietro Incardona's avatar
Pietro Incardona committed
110 111 112 113
	//! Local ghost marker (across the ghost particles it mark from where we have the)
	//! replicated ghost particles that are local
	size_t lg_m;

114 115 116 117 118 119
	//! Sending buffer
	openfpm::vector<HeapMemory> hsmem;

	//! Receiving buffer
	openfpm::vector<HeapMemory> hrmem;

120
	//! process the particle without properties
121 122
	struct proc_without_prp
	{
123
		//! process the particle
124 125 126 127 128 129
		template<typename T1, typename T2> inline static void proc(size_t lbl, size_t cnt, size_t id, T1 & v_prp, T2 & m_prp)
		{
			m_prp.get(lbl).set(cnt, v_prp.get(id));
		}
	};

130
	//! process the particle with properties
131 132 133
	template<typename prp_object, int ... prp>
	struct proc_with_prp
	{
134
		//! process the particle
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
		template<typename T1, typename T2> inline static void proc(size_t lbl, size_t cnt, size_t id, T1 & v_prp, T2 & m_prp)
		{
			// source object type
			typedef encapc<1, prop, typename openfpm::vector<prop>::layout_type> encap_src;
			// destination object type
			typedef encapc<1, prp_object, typename openfpm::vector<prp_object>::layout_type> encap_dst;

			// Copy only the selected properties
			object_si_d<encap_src, encap_dst, OBJ_ENCAP, prp...>(v_prp.get(id), m_prp.get(lbl).get(cnt));
		}
	};

	//! It process one particle
	template<typename proc_class, typename T1, typename T2, typename T3, typename T4> inline void process_map_particle(size_t i, long int & end, long int & id_end, T1 & m_pos, T2 & m_prp, T3 & v_pos, T4 & v_prp, openfpm::vector<size_t> & cnt)
	{
		long int prc_id = m_opart.template get<2>(i);
		size_t id = m_opart.template get<0>(i);

		if (prc_id >= 0)
		{
			size_t lbl = p_map_req.get(prc_id);

			m_pos.get(lbl).set(cnt.get(lbl), v_pos.get(id));
			proc_class::proc(lbl,cnt.get(lbl),id,v_prp,m_prp);

			cnt.get(lbl)++;

			// swap the particle
			long int id_valid = get_end_valid(end,id_end);

			if (id_valid > 0 && (long int)id < id_valid)
			{
				v_pos.set(id,v_pos.get(id_valid));
				v_prp.set(id,v_prp.get(id_valid));
			}
		}
		else
		{
			// swap the particle
			long int id_valid = get_end_valid(end,id_end);

			if (id_valid > 0 && (long int)id < id_valid)
			{
				v_pos.set(id,v_pos.get(id_valid));
				v_prp.set(id,v_prp.get(id_valid));
			}
		}
	}

184 185 186
	/*! \brief Return a valid particle starting from end and tracing back
	 *
	 * \param end actual opart particle pointer
187
	 * \param end_id actual end particle point
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
	 *
	 * \return a valid particle
	 *
	 */
	inline size_t get_end_valid(long int & end, long int & end_id)
	{
		end_id--;

		while (end >= 0 && end_id >= 0 && (long int)m_opart.template get<0>(end) == end_id)
		{
			end_id--;
			end--;
		}

		return end_id;
	}

Pietro Incardona's avatar
Pietro Incardona committed
205
	//! Flags that indicate that the function createShiftBox() has been called
206 207
	bool is_shift_box_created = false;

Pietro Incardona's avatar
Pietro Incardona committed
208
	//! this map is used to check if a combination is already present
209 210
	std::unordered_map<size_t, size_t> map_cmb;

Pietro Incardona's avatar
Pietro Incardona committed
211 212
	//! The boxes touching the border of the domain are divided in groups (first vector)
	//! each group contain internal ghost coming from sub-domains of the same section
incardon's avatar
incardon committed
213
	openfpm::vector_std<openfpm::vector_std<Box<dim, St>>> box_f;
214

Pietro Incardona's avatar
Pietro Incardona committed
215
	//! Store the sector for each group (previous vector)
216 217
	openfpm::vector_std<comb<dim>> box_cmb;

Pietro Incardona's avatar
Pietro Incardona committed
218
	//! Id of the local particle to replicate for ghost_get
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
	openfpm::vector<aggregate<size_t,size_t>> o_part_loc;

	/*! \brief For every internal ghost box we create a structure that order such internal local ghost box in
	 *         shift vectors
	 *
	 */
	void createShiftBox()
	{
		if (is_shift_box_created == true)
			return;

		// Add local particles coming from periodic boundary, the only boxes that count are the one
		// touching the border, filter them
		for (size_t i = 0; i < dec.getNLocalSub(); i++)
		{
			size_t Nl = dec.getLocalNIGhost(i);

			for (size_t j = 0; j < Nl; j++)
			{
				// If the ghost does not come from the intersection with an out of
				// border sub-domain the combination is all zero and n_zero return dim
				if (dec.getLocalIGhostPos(i, j).n_zero() == dim)
					continue;

				// Check if we already have boxes with such combination
				auto it = map_cmb.find(dec.getLocalIGhostPos(i, j).lin());
				if (it == map_cmb.end())
				{
					// we do not have it
					box_f.add();
					box_f.last().add(dec.getLocalIGhostBox(i, j));
					box_cmb.add(dec.getLocalIGhostPos(i, j));
					map_cmb[dec.getLocalIGhostPos(i, j).lin()] = box_f.size() - 1;
				}
				else
				{
					// we have it
					box_f.get(it->second).add(dec.getLocalIGhostBox(i, j));
				}

			}
		}

		is_shift_box_created = true;
	}

	/*! \brief Local ghost from labeled particles
	 *
Pietro Incardona's avatar
Pietro Incardona committed
267 268
	 * \param v_pos vector of particle positions
	 * \param v_prp vector of particles properties
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
	 *
	 */
	void local_ghost_from_opart(openfpm::vector<Point<dim, St>> & v_pos, openfpm::vector<prop> & v_prp)
	{
		// get the shift vectors
		const openfpm::vector<Point<dim, St>> & shifts = dec.getShiftVectors();

		for (size_t i = 0 ; i < o_part_loc.size() ; i++)
		{
			size_t lin_id = o_part_loc.get<1>(i);
			size_t key = o_part_loc.template get<0>(i);

			Point<dim, St> p = v_pos.get(key);
			// shift
			p -= shifts.get(lin_id);

			// add this particle shifting its position
			v_pos.add(p);
incardon's avatar
incardon committed
287
			v_prp.get(lg_m+i) = v_prp.get(key);
288 289 290 291 292 293 294
		}
	}

	/*! \brief Local ghost from decomposition
	 *
	 * \param v_pos vector of particle positions
	 * \param v_prp vector of particle properties
Pietro Incardona's avatar
Pietro Incardona committed
295
	 * \param g_m ghost marker
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
	 *
	 */
	void local_ghost_from_dec(openfpm::vector<Point<dim, St>> & v_pos, openfpm::vector<prop> & v_prp, size_t g_m)
	{
		o_part_loc.clear();

		// get the shift vectors
		const openfpm::vector<Point<dim, St>> & shifts = dec.getShiftVectors();

		// Label the internal (assigned) particles
		auto it = v_pos.getIteratorTo(g_m);

		while (it.isNext())
		{
			auto key = it.get();

			// If particles are inside these boxes
			for (size_t i = 0; i < box_f.size(); i++)
			{
				for (size_t j = 0; j < box_f.get(i).size(); j++)
				{
					if (box_f.get(i).get(j).isInside(v_pos.get(key)) == true)
					{
						size_t lin_id = box_cmb.get(i).lin();

						o_part_loc.add();
						o_part_loc.template get<0>(o_part_loc.size()-1) = key;
						o_part_loc.template get<1>(o_part_loc.size()-1) = lin_id;

						Point<dim, St> p = v_pos.get(key);
						// shift
						p -= shifts.get(lin_id);

						// add this particle shifting its position
						v_pos.add(p);
						v_prp.add();
						v_prp.last() = v_prp.get(key);

						// boxes in one group can be overlapping
						// we do not have to search for the other
						// boxes otherwise we will have duplicate particles
						//
						// A small note overlap of boxes across groups is fine
						// (and needed) because each group has different shift
						// producing non overlapping particles
						//
						break;
					}
				}
			}

			++it;
		}
	}

	/*! \brief Add local particles based on the boundary conditions
	 *
	 * In order to understand what this function use the following
	 *
	 \verbatim

	 [1,1]
	 +---------+------------------------+---------+
	 | (1,-1)  |                        | (1,1)   |
	 |   |     |    (1,0) --> 7         |   |     |
	 |   v     |                        |   v     |
	 |   6     |                        |   8     |
	 +--------------------------------------------+
	 |         |                        |         |
	 |         |                        |         |
	 |         |                        |         |
	 | (-1,0)  |                        | (1,0)   |
	 |    |    |                        |   |     |
	 |    v    |      (0,0) --> 4       |   v     |
	 |    3    |                        |   5     |
	 |         |                        |         |
 B	 |         |                        |     A   |
 *	 |         |                        |    *    |
	 |         |                        |         |
	 |         |                        |         |
	 |         |                        |         |
	 +--------------------------------------------+
	 | (-1,-1) |                        | (-1,1)  |
	 |    |    |   (-1,0) --> 1         |    |    |
	 |    v    |                        |    v    |
	 |    0    |                        |    2    |
	 +---------+------------------------+---------+


	 \endverbatim

	 *
	 *  The box is the domain, while all boxes at the border (so not (0,0) ) are the
	 *  ghost part at the border of the domain. If a particle A is in the position in figure
	 *  a particle B must be created. This function duplicate the particle A, if A and B are
	 *  local
	 *
	 * \param v_pos vector of particle of positions
	 * \param v_prp vector of particle properties
	 * \param g_m ghost marker
	 * \param opt options
	 *
	 */
	void add_loc_particles_bc(openfpm::vector<Point<dim, St>> & v_pos, openfpm::vector<prop> & v_prp ,size_t & g_m, size_t opt)
	{
		// Create the shift boxes
		createShiftBox();

404
		if (!(opt & SKIP_LABELLING))
incardon's avatar
incardon committed
405
			lg_m = v_prp.size();
Pietro Incardona's avatar
Pietro Incardona committed
406

407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
		if (box_f.size() == 0)
			return;
		else
		{
			if (opt & SKIP_LABELLING)
				local_ghost_from_opart(v_pos,v_prp);
			else
				local_ghost_from_dec(v_pos,v_prp,g_m);
		}
	}

	/*! \brief This function fill the send buffer for the particle position after the particles has been label with labelParticles
	 *
	 * \param v_pos vector of particle positions
	 * \param g_pos_send Send buffer to fill
	 *
	 */
424
	void fill_send_ghost_pos_buf(openfpm::vector<Point<dim, St>> & v_pos,openfpm::vector<send_pos_vector> & g_pos_send)
425 426 427 428 429
	{
		// get the shift vectors
		const openfpm::vector<Point<dim, St>> & shifts = dec.getShiftVectors();

		// create a number of send buffers equal to the near processors
430
		g_pos_send.resize(g_opart.size());
431 432 433

		resize_retained_buffer(hsmem,g_pos_send.size());

434 435
		for (size_t i = 0; i < g_pos_send.size(); i++)
		{
436 437 438 439 440 441 442 443
			// Buffer must retained and survive the destruction of the
			// vector
			if (hsmem.get(i).ref() == 0)
				hsmem.get(i).incRef();

			// Set the memory for retain the send buffer
			g_pos_send.get(i).setMemory(hsmem.get(i));

444
			// resize the sending vector (No allocation is produced)
445
			g_pos_send.get(i).resize(g_opart.get(i).size());
446 447 448
		}

		// Fill the send buffer
449
		for (size_t i = 0; i < g_opart.size(); i++)
450
		{
451
			for (size_t j = 0; j < g_opart.get(i).size(); j++)
452
			{
453 454
				Point<dim, St> s = v_pos.get(g_opart.get(i).template get<0>(j));
				s -= shifts.get(g_opart.get(i).template get<1>(j));
455 456 457 458 459
				g_pos_send.get(i).set(j, s);
			}
		}
	}

Pietro Incardona's avatar
Pietro Incardona committed
460 461 462 463 464 465 466 467 468 469 470
	/*! \brief This function fill the send buffer for ghost_put
	 *
	 * \tparam send_vector type used to send data
	 * \tparam prp_object object containing only the properties to send
	 * \tparam prp set of properties to send
	 *
	 * \param v_prp vector of particle properties
	 * \param g_send_prp Send buffer to fill
	 * \param g_m ghost marker
	 *
	 */
471
	template<typename send_vector, typename prp_object, int ... prp> void fill_send_ghost_put_prp_buf(openfpm::vector<prop> & v_prp, openfpm::vector<send_vector> & g_send_prp, size_t & g_m)
Pietro Incardona's avatar
Pietro Incardona committed
472 473 474
	{
		// create a number of send buffers equal to the near processors
		// from which we received
475
		g_send_prp.resize(prc_recv_get.size());
476 477 478

		resize_retained_buffer(hsmem,g_send_prp.size());

Pietro Incardona's avatar
Pietro Incardona committed
479 480
		for (size_t i = 0; i < g_send_prp.size(); i++)
		{
481 482 483 484 485 486 487 488
			// Buffer must retained and survive the destruction of the
			// vector
			if (hsmem.get(i).ref() == 0)
				hsmem.get(i).incRef();

			// Set the memory for retain the send buffer
			g_send_prp.get(i).setMemory(hsmem.get(i));

Pietro Incardona's avatar
Pietro Incardona committed
489
			// resize the sending vector (No allocation is produced)
490
			g_send_prp.get(i).resize(recv_sz_get.get(i));
Pietro Incardona's avatar
Pietro Incardona committed
491 492 493 494 495
		}

		size_t accum = g_m;

		// Fill the send buffer
496
		for (size_t i = 0; i < prc_recv_get.size(); i++)
Pietro Incardona's avatar
Pietro Incardona committed
497 498
		{
			size_t j2 = 0;
499
			for (size_t j = accum; j < accum + recv_sz_get.get(i); j++)
Pietro Incardona's avatar
Pietro Incardona committed
500 501 502 503 504 505 506 507 508 509 510 511
			{
				// source object type
				typedef encapc<1, prop, typename openfpm::vector<prop>::layout_type> encap_src;
				// destination object type
				typedef encapc<1, prp_object, typename openfpm::vector<prp_object>::layout_type> encap_dst;

				// Copy only the selected properties
				object_si_d<encap_src, encap_dst, OBJ_ENCAP, prp...>(v_prp.get(j), g_send_prp.get(i).get(j2));

				j2++;
			}

512
			accum = accum + recv_sz_get.get(i);
Pietro Incardona's avatar
Pietro Incardona committed
513 514 515
		}
	}

516 517 518 519 520 521 522 523 524 525 526 527 528 529 530
	/*! \brief resize the retained buffer by nbf
	 *
	 *
	 */
	void resize_retained_buffer(openfpm::vector<HeapMemory> & rt_buf, size_t nbf)
	{
		// Release all the buffer that are going to be deleted
		for (size_t i = nbf ; i < rt_buf.size() ; i++)
		{
			rt_buf.get(i).decRef();
		}

		hsmem.resize(nbf);
	}

531 532 533 534 535 536 537 538 539 540
	/*! \brief This function fill the send buffer for properties after the particles has been label with labelParticles
	 *
	 * \tparam send_vector type used to send data
	 * \tparam prp_object object containing only the properties to send
	 * \tparam prp set of properties to send
	 *
	 * \param v_prp vector of particle properties
	 * \param g_send_prp Send buffer to fill
	 *
	 */
541
	template<typename send_vector, typename prp_object, int ... prp> void fill_send_ghost_prp_buf(openfpm::vector<prop> & v_prp, openfpm::vector<send_vector> & g_send_prp)
542 543
	{
		// create a number of send buffers equal to the near processors
544
		g_send_prp.resize(g_opart.size());
545 546 547

		resize_retained_buffer(hsmem,g_send_prp.size());

548 549
		for (size_t i = 0; i < g_send_prp.size(); i++)
		{
550 551 552 553 554 555 556 557
			// Buffer must retained and survive the destruction of the
			// vector
			if (hsmem.get(i).ref() == 0)
				hsmem.get(i).incRef();

			// Set the memory for retain the send buffer
			g_send_prp.get(i).setMemory(hsmem.get(i));

558
			// resize the sending vector (No allocation is produced)
559
			g_send_prp.get(i).resize(g_opart.get(i).size());
560 561 562
		}

		// Fill the send buffer
563
		for (size_t i = 0; i < g_opart.size(); i++)
564
		{
565
			for (size_t j = 0; j < g_opart.get(i).size(); j++)
566 567 568 569 570 571 572
			{
				// source object type
				typedef encapc<1, prop, typename openfpm::vector<prop>::layout_type> encap_src;
				// destination object type
				typedef encapc<1, prp_object, typename openfpm::vector<prp_object>::layout_type> encap_dst;

				// Copy only the selected properties
573
				object_si_d<encap_src, encap_dst, OBJ_ENCAP, prp...>(v_prp.get(g_opart.get(i).template get<0>(j)), g_send_prp.get(i).get(j));
574 575 576 577 578 579 580 581 582
			}
		}
	}

	/*! \brief allocate and fill the send buffer for the map function
	 *
	 * \param v_pos vector of particle positions
	 * \param v_prp vector of particles properties
	 * \param prc_sz_r For each processor in the list the size of the message to send
583 584
	 * \param m_pos sending buffer for position
	 * \param m_prp sending buffer for properties
585 586
	 *
	 */
587
	void fill_send_map_buf(openfpm::vector<Point<dim, St>> & v_pos, openfpm::vector<prop> & v_prp, openfpm::vector<size_t> & prc_sz_r, openfpm::vector<openfpm::vector<Point<dim,St>>> & m_pos, openfpm::vector<openfpm::vector<prop>> & m_prp)
588
	{
589 590 591
		m_prp.resize(prc_sz_r.size());
		m_pos.resize(prc_sz_r.size());
		openfpm::vector<size_t> cnt(prc_sz_r.size());
592

593
		for (size_t i = 0; i < prc_sz_r.size() ; i++)
594 595
		{
			// set the size and allocate, using mem warant that pos and prp is contiguous
596 597 598
			m_pos.get(i).resize(prc_sz_r.get(i));
			m_prp.get(i).resize(prc_sz_r.get(i));
			cnt.get(i) = 0;
599 600
		}

601 602
		// end vector point
		long int id_end = v_pos.size();
603

604 605 606 607 608
		// end opart point
		long int end = m_opart.size()-1;

		// Run through all the particles and fill the sending buffer
		for (size_t i = 0; i < m_opart.size(); i++)
609
		{
610
			process_map_particle<proc_without_prp>(i,end,id_end,m_pos,m_prp,v_pos,v_prp,cnt);
611
		}
612 613 614

		v_pos.resize(v_pos.size() - m_opart.size());
		v_prp.resize(v_prp.size() - m_opart.size());
615 616
	}

617

618
	/*! \brief allocate and fill the send buffer for the map function
incardon's avatar
incardon committed
619 620 621
	 *
	 * \tparam prp_object object type to send
	 * \tparam prp properties to send
622 623 624
	 *
	 * \param v_pos vector of particle positions
	 * \param v_prp vector of particle properties
incardon's avatar
incardon committed
625
	 * \param prc_sz_r number of particles to send for each processor
626 627
	 * \param m_pos sending buffer for position
	 * \param m_prp sending buffer for properties
628 629
	 *
	 */
630
	template<typename prp_object,int ... prp> void fill_send_map_buf_list(openfpm::vector<Point<dim, St>> & v_pos, openfpm::vector<prop> & v_prp, openfpm::vector<size_t> & prc_sz_r, openfpm::vector<openfpm::vector<Point<dim,St>>> & m_pos, openfpm::vector<openfpm::vector<prp_object>> & m_prp)
631
	{
632 633 634
		m_prp.resize(prc_sz_r.size());
		m_pos.resize(prc_sz_r.size());
		openfpm::vector<size_t> cnt(prc_sz_r.size());
635

636
		for (size_t i = 0; i < prc_sz_r.size(); i++)
637 638
		{
			// set the size and allocate, using mem warant that pos and prp is contiguous
639 640
			m_pos.get(i).resize(prc_sz_r.get(i));
			m_prp.get(i).resize(prc_sz_r.get(i));
641
			cnt.get(i) = 0;
642 643
		}

644 645 646 647 648
		// end vector point
		long int id_end = v_pos.size();

		// end opart point
		long int end = m_opart.size()-1;
649

650 651
		// Run through all the particles and fill the sending buffer
		for (size_t i = 0; i < m_opart.size(); i++)
652
		{
653
			process_map_particle<proc_with_prp<prp_object,prp...>>(i,end,id_end,m_pos,m_prp,v_pos,v_prp,cnt);
654
		}
655

656 657
		v_pos.resize(v_pos.size() - m_opart.size());
		v_prp.resize(v_prp.size() - m_opart.size());
658 659 660 661 662 663 664 665 666
	}

	/*! \brief Label particles for mappings
	 *
	 * \param v_pos vector of particle positions
	 * \param lbl_p Particle labeled
	 * \param prc_sz For each processor the number of particles to send
	 *
	 */
667
	template<typename obp> void labelParticleProcessor(openfpm::vector<Point<dim, St>> & v_pos, openfpm::vector<aggregate<size_t,size_t,size_t>> & lbl_p, openfpm::vector<size_t> & prc_sz)
668 669
	{
		// reset lbl_p
670
		lbl_p.clear();
671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697

		// resize the label buffer
		prc_sz.resize(v_cl.getProcessingUnits());

		auto it = v_pos.getIterator();

		// Label all the particles with the processor id where they should go
		while (it.isNext())
		{
			auto key = it.get();

			// Apply the boundary conditions
			dec.applyPointBC(v_pos.get(key));

			size_t p_id = 0;

			// Check if the particle is inside the domain
			if (dec.getDomain().isInside(v_pos.get(key)) == true)
				p_id = dec.processorIDBC(v_pos.get(key));
			else
				p_id = obp::out(key, v_cl.getProcessUnitID());

			// Particle to move
			if (p_id != v_cl.getProcessUnitID())
			{
				if ((long int) p_id != -1)
				{
Pietro Incardona's avatar
Pietro Incardona committed
698
					prc_sz.get(p_id)++;
699 700 701
					lbl_p.add();
					lbl_p.last().template get<0>() = key;
					lbl_p.last().template get<2>() = p_id;
702
				}
703 704 705 706 707 708
				else
				{
					lbl_p.add();
					lbl_p.last().template get<0>() = key;
					lbl_p.last().template get<2>() = p_id;
				}
709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724
			}

			// Add processors and add size

			++it;
		}
	}

	/*! \brief Label the particles
	 *
	 * It count the number of particle to send to each processors and save its ids
	 *
	 * \see nn_prcs::getShiftvectors()
	 *
	 * \param v_pos vector of particle positions
	 * \param v_prp vector of particle properties
incardon's avatar
incardon committed
725
	 * \param prc for each particle it label the processor id (the owner of the particle, or where it should go the particle)
726 727 728
	 * \param g_m ghost marker
	 *
	 */
729
	void labelParticlesGhost(openfpm::vector<Point<dim, St>> & v_pos, openfpm::vector<prop> & v_prp, openfpm::vector<size_t> & prc, size_t & g_m)
730 731
	{
		// Buffer that contain for each processor the id of the particle to send
732 733
		g_opart.clear();
		g_opart.resize(dec.getNNProcessors());
incardon's avatar
incardon committed
734
		prc_g_opart.clear();
735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751

		// Iterate over all particles
		auto it = v_pos.getIteratorTo(g_m);
		while (it.isNext())
		{
			auto key = it.get();

			// Given a particle, it return which processor require it (first id) and shift id, second id
			// For an explanation about shifts vectors please consult getShiftVector in ie_ghost
			const openfpm::vector<std::pair<size_t, size_t>> & vp_id = dec.template ghost_processorID_pair<typename Decomposition::lc_processor_id, typename Decomposition::shift_id>(v_pos.get(key), UNIQUE);

			for (size_t i = 0; i < vp_id.size(); i++)
			{
				// processor id
				size_t p_id = vp_id.get(i).first;

				// add particle to communicate
752 753 754
				g_opart.get(p_id).add();
				g_opart.get(p_id).last().template get<0>() = key;
				g_opart.get(p_id).last().template get<1>() = vp_id.get(i).second;
755 756 757 758
			}

			++it;
		}
759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774

		// remove all zero entry and construct prc (the list of the sending processors)
		openfpm::vector<openfpm::vector<aggregate<size_t,size_t>>> g_opart_f;

		// count the non zero element
		for (size_t i = 0 ; i < g_opart.size() ; i++)
		{
			if (g_opart.get(i).size() != 0)
			{
				g_opart_f.add();
				g_opart.get(i).swap(g_opart_f.last());
				prc.add(dec.IDtoProc(i));
			}
		}

		g_opart.swap(g_opart_f);
775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792
	}

	/*! \brief Call-back to allocate buffer to receive incoming elements (particles)
	 *
	 * \param msg_i size required to receive the message from i
	 * \param total_msg total size to receive from all the processors
	 * \param total_p the total number of processor that want to communicate with you
	 * \param i processor id
	 * \param ri request id (it is an id that goes from 0 to total_p, and is unique
	 *           every time message_alloc is called)
	 * \param ptr a pointer to the vector_dist structure
	 *
	 * \return the pointer where to store the message for the processor i
	 *
	 */
	static void * message_alloc_map(size_t msg_i, size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr)
	{
		// cast the pointer
incardon's avatar
incardon committed
793
		vector_dist_comm<dim, St, prop,layout,layout_base, Decomposition, Memory> * vd = static_cast<vector_dist_comm<dim, St, prop, layout, layout_base, Decomposition, Memory> *>(ptr);
794 795 796 797 798 799 800 801 802

		vd->recv_mem_gm.resize(vd->v_cl.getProcessingUnits());
		vd->recv_mem_gm.get(i).resize(msg_i);

		return vd->recv_mem_gm.get(i).getPointer();
	}

public:

incardon's avatar
incardon committed
803 804 805 806 807
	/*! \brief Copy Constructor
	 *
	 * \param v vector to copy
	 *
	 */
incardon's avatar
incardon committed
808
	vector_dist_comm(const vector_dist_comm<dim,St,prop,layout,layout_base,Decomposition,Memory> & v)
incardon's avatar
incardon committed
809
	:v_cl(create_vcluster()),dec(create_vcluster()),lg_m(0)
incardon's avatar
incardon committed
810 811 812 813 814
	{
		this->operator=(v);
	}


815 816 817 818 819 820
	/*! \brief Constructor
	 *
	 * \param dec Domain decompositon
	 *
	 */
	vector_dist_comm(const Decomposition & dec)
incardon's avatar
incardon committed
821
	:v_cl(create_vcluster()),dec(dec),lg_m(0)
822 823 824 825 826 827 828 829 830 831
	{

	}

	/*! \brief Constructor
	 *
	 * \param dec Domain decompositon
	 *
	 */
	vector_dist_comm(Decomposition && dec)
incardon's avatar
incardon committed
832
	:v_cl(create_vcluster()),dec(dec),lg_m(0)
833 834 835 836 837 838 839 840
	{

	}

	/*! \brief Constructor
	 *
	 */
	vector_dist_comm()
incardon's avatar
incardon committed
841
	:v_cl(create_vcluster()),dec(create_vcluster()),lg_m(0)
842 843 844
	{
	}

845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861
	/*! \brief Destructor
	 *
	 * Release the retained buffer
	 *
	 */
	~vector_dist_comm()
	{
		for (size_t i = 0 ; i < hsmem.size() ; i++)
		{
			if (hsmem.get(i).ref() == 1)
				hsmem.get(i).decRef();
			else
				std::cout << __FILE__ << ":" << __LINE__ << " internal error memory is in an invalid state " << std::endl;
		}

	}

incardon's avatar
incardon committed
862
	/*! \brief Get the number of minimum sub-domain per processor
863 864 865 866
	 *
	 * \return minimum number
	 *
	 */
incardon's avatar
incardon committed
867 868 869 870 871 872 873 874 875 876 877
	size_t getDecompositionGranularity()
	{
		return v_sub_unit_factor;
	}

	/*! \brief Set the minimum number of sub-domain per processor
	 *
	 * \param n_sub
	 *
	 */
	void setDecompositionGranularity(size_t n_sub)
878
	{
incardon's avatar
incardon committed
879
		this->v_sub_unit_factor = n_sub;
880 881 882 883 884 885 886
	}

	/*! \brief Initialize the decomposition
	 *
	 * \param box domain
	 * \param bc boundary conditions
	 * \param g ghost extension
887
	 * \param opt additional options
888 889
	 *
	 */
890 891 892 893 894
	void init_decomposition(Box<dim,St> & box,
							const size_t (& bc)[dim],
							const Ghost<dim,St> & g,
							size_t opt,
							const grid_sm<dim,void> & gdist)
895 896
	{
		size_t div[dim];
897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912

		if (opt & BIND_DEC_TO_GHOST)
		{
			// padding
			size_t pad = 0;

			// CellDecomposer
			CellDecomposer_sm<dim,St,shift<dim,St>> cd_sm;

			// Calculate the divisions for the symmetric Cell-lists
			cl_param_calculateSym<dim,St>(box,cd_sm,g,pad);

			for (size_t i = 0 ; i < dim ; i++)
				div[i] = cd_sm.getDiv()[i] - 2*pad;
		}
		else
913
		{
914 915 916 917
			// Create a valid decomposition of the space
			// Get the number of processor and calculate the number of sub-domain
			// for decomposition
			size_t n_proc = v_cl.getProcessingUnits();
incardon's avatar
incardon committed
918
			size_t n_sub = n_proc * getDecompositionGranularity();
919 920 921 922 923 924 925 926

			// Calculate the maximum number (before merging) of sub-domain on
			// each dimension

			for (size_t i = 0; i < dim; i++)
			{
				div[i] = openfpm::math::round_big_2(pow(n_sub, 1.0 / dim));
			}
927 928 929
		}

		// Create the sub-domains
930
		dec.setParameters(div, box, bc, g, gdist);
931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949
		dec.decompose();
	}

	/*! \brief It synchronize the properties and position of the ghost particles
	 *
	 * \tparam prp list of properties to get synchronize
	 *
	 * \param opt options WITH_POSITION, it send also the positional information of the particles
	 * \param v_pos vector of position to update
	 * \param v_prp vector of properties to update
	 * \param g_m marker between real and ghost particles
	 *
	 */
	template<int ... prp> inline void ghost_get_(openfpm::vector<Point<dim, St>> & v_pos, openfpm::vector<prop> & v_prp, size_t & g_m, size_t opt = WITH_POSITION)
	{
		// Sending property object
		typedef object<typename object_creator<typename prop::type, prp...>::type> prp_object;

		// send vector for each processor
950
		typedef openfpm::vector<prp_object> send_vector;
951

952
		if (!(opt & NO_POSITION))
incardon's avatar
incardon committed
953
			v_pos.resize(g_m);
incardon's avatar
incardon committed
954

955
		// reset the ghost part
incardon's avatar
incardon committed
956

957
		if (!(opt & SKIP_LABELLING))
incardon's avatar
incardon committed
958
			v_prp.resize(g_m);
959 960 961

		// Label all the particles
		if ((opt & SKIP_LABELLING) == false)
incardon's avatar
incardon committed
962
			labelParticlesGhost(v_pos,v_prp,prc_g_opart,g_m);
963

964
		// Send and receive ghost particle information
965 966 967
		{
			openfpm::vector<send_vector> g_send_prp;
			fill_send_ghost_prp_buf<send_vector, prp_object, prp...>(v_prp,g_send_prp);
968

969 970
			// if there are no properties skip
			// SSendRecvP send everything when we do not give properties
971

972 973
			if (sizeof...(prp) != 0)
			{
incardon's avatar
incardon committed
974 975
                if (opt & SKIP_LABELLING)
                {
incardon's avatar
incardon committed
976
                	size_t opt_ = compute_options(opt);
977
                	op_ssend_gg_recv_merge opm(g_m);
incardon's avatar
incardon committed
978
                    v_cl.SSendRecvP_op<op_ssend_gg_recv_merge,send_vector,decltype(v_prp),prp...>(g_send_prp,v_prp,prc_g_opart,opm,prc_recv_get,recv_sz_get,opt_);
incardon's avatar
incardon committed
979 980
                }
                else
incardon's avatar
incardon committed
981 982 983 984 985 986 987
                	v_cl.SSendRecvP<send_vector,decltype(v_prp),prp...>(g_send_prp,v_prp,prc_g_opart,prc_recv_get,recv_sz_get,recv_sz_get_byte);

                // fill g_opart_sz
                g_opart_sz.resize(prc_g_opart.size());

				for (size_t i = 0 ; i < prc_g_opart.size() ; i++)
					g_opart_sz.get(i) = g_send_prp.get(i).size();
988 989
			}
		}
990

991
		if (!(opt & NO_POSITION))
992
		{
993 994 995 996 997
			// Sending buffer for the ghost particles position
			openfpm::vector<send_pos_vector> g_pos_send;

			fill_send_ghost_pos_buf(v_pos,g_pos_send);

incardon's avatar
incardon committed
998 999 1000
			if (opt & SKIP_LABELLING)
			{
            	size_t opt_ = compute_options(opt);
incardon's avatar
incardon committed
1001
				v_cl.SSendRecv(g_pos_send,v_pos,prc_g_opart,prc_recv_get,recv_sz_get,opt_);
incardon's avatar
incardon committed
1002 1003 1004 1005
			}
			else
			{
				prc_recv_get.clear();
incardon's avatar
incardon committed
1006 1007
				recv_sz_get.clear();
				v_cl.SSendRecv(g_pos_send,v_pos,prc_g_opart,prc_recv_get,recv_sz_get);
incardon's avatar
incardon committed
1008
			}
incardon's avatar
incardon committed
1009 1010 1011 1012 1013 1014

            // fill g_opart_sz
            g_opart_sz.resize(prc_g_opart.size());

			for (size_t i = 0 ; i < prc_g_opart.size() ; i++)
				g_opart_sz.get(i) = g_pos_send.get(i).size();
1015 1016
		}

incardon's avatar
incardon committed
1017 1018 1019 1020 1021 1022 1023
        // Important to ensure that the number of particles in v_prp must be equal to v_pos
        // Note that if we do not give properties sizeof...(prp) == 0 in general at this point
        // v_prp.size() != v_pos.size()
        if (!(opt & SKIP_LABELLING))
        {
                v_prp.resize(v_pos.size());
        }
1024

1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055
		add_loc_particles_bc(v_pos,v_prp,g_m,opt);
	}


	/*! \brief It move all the particles that does not belong to the local processor to the respective processor
	 *
	 * \tparam out of bound policy it specify what to do when the particles are detected out of bound
	 *
	 * In general this function is called after moving the particles to move the
	 * elements out the local processor. Or just after initialization if each processor
	 * contain non local particles
	 *
	 * \tparam prp properties to communicate
	 *
	 * \param v_pos vector of particle positions
	 * \param v_prp vector of particle properties
	 * \param g_m ghost marker
	 *
	 */
	template<unsigned int ... prp> void map_list_(openfpm::vector<Point<dim, St>> & v_pos, openfpm::vector<prop> & v_prp, size_t & g_m)
	{
		typedef KillParticle obp;

		// Processor communication size
		openfpm::vector<size_t> prc_sz(v_cl.getProcessingUnits());

		// map completely reset the ghost part
		v_pos.resize(g_m);
		v_prp.resize(g_m);

		// Contain the processor id of each particle (basically where they have to go)
1056
		labelParticleProcessor<obp>(v_pos,m_opart, prc_sz);
1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076

		// Calculate the sending buffer size for each processor, put this information in
		// a contiguous buffer
		p_map_req.resize(v_cl.getProcessingUnits());
		openfpm::vector<size_t> prc_sz_r;
		openfpm::vector<size_t> prc_r;

		for (size_t i = 0; i < v_cl.getProcessingUnits(); i++)
		{
			if (prc_sz.get(i) != 0)
			{
				p_map_req.get(i) = prc_r.size();
				prc_r.add(i);
				prc_sz_r.add(prc_sz.get(i));
			}
		}

		// Sending property object
		typedef object<typename object_creator<typename prop::type, prp...>::type> prp_object;

1077 1078 1079 1080
		//! position vector
		openfpm::vector<openfpm::vector<Point<dim, St>>> m_pos;
		//! properties vector
		openfpm::vector<openfpm::vector<prp_object>> m_prp;
1081

1082
		fill_send_map_buf_list<prp_object,prp...>(v_pos,v_prp,prc_sz_r, m_pos, m_prp);
1083

1084 1085
		v_cl.SSendRecv(m_pos,v_pos,prc_r,prc_recv_map,recv_sz_map);
		v_cl.SSendRecvP<openfpm::vector<prp_object>,decltype(v_prp),prp...>(m_prp,v_prp,prc_r,prc_recv_map,recv_sz_map);
1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114

		// mark the ghost part

		g_m = v_pos.size();
	}

	/*! \brief It move all the particles that does not belong to the local processor to the respective processor
	 *
	 * \tparam out of bound policy it specify what to do when the particles are detected out of bound
	 *
	 * In general this function is called after moving the particles to move the
	 * elements out the local processor. Or just after initialization if each processor
	 * contain non local particles
	 *
	 * \param v_pos vector of particle positions
	 * \param v_prp vector of particle properties
	 * \param g_m ghost marker
	 *
	 */
	template<typename obp = KillParticle> void map_(openfpm::vector<Point<dim, St>> & v_pos, openfpm::vector<prop> & v_prp, size_t & g_m)
	{
		// Processor communication size
		openfpm::vector<size_t> prc_sz(v_cl.getProcessingUnits());

		// map completely reset the ghost part
		v_pos.resize(g_m);
		v_prp.resize(g_m);

		// Contain the processor id of each particle (basically where they have to go)
1115
		labelParticleProcessor<obp>(v_pos,m_opart, prc_sz);
1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132

		// Calculate the sending buffer size for each processor, put this information in
		// a contiguous buffer
		p_map_req.resize(v_cl.getProcessingUnits());
		openfpm::vector<size_t> prc_sz_r;
		openfpm::vector<size_t> prc_r;

		for (size_t i = 0; i < v_cl.getProcessingUnits(); i++)
		{
			if (prc_sz.get(i) != 0)
			{
				p_map_req.get(i) = prc_r.size();
				prc_r.add(i);
				prc_sz_r.add(prc_sz.get(i));
			}
		}

1133 1134 1135 1136
		//! position vector
		openfpm::vector<openfpm::vector<Point<dim, St>>> m_pos;
		//! properties vector
		openfpm::vector<openfpm::vector<prop>> m_prp;
1137

1138
		fill_send_map_buf(v_pos,v_prp, prc_sz_r, m_pos, m_prp);
1139

1140 1141
		v_cl.SSendRecv(m_pos,v_pos,prc_r,prc_recv_map,recv_sz_map);
		v_cl.SSendRecv(m_prp,v_prp,prc_r,prc_recv_map,recv_sz_map);
1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157

		// mark the ghost part

		g_m = v_pos.size();
	}

	/*! \brief Get the decomposition
	 *
	 * \return
	 *
	 */
	inline Decomposition & getDecomposition()
	{
		return dec;
	}

incardon's avatar
incardon committed
1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
	/*! \brief Get the decomposition
	 *
	 * \return
	 *
	 */
	inline const Decomposition & getDecomposition() const
	{
		return dec;
	}

1168 1169 1170 1171
	/*! \brief Copy a vector
	 *
	 * \param vc vector to copy
	 *
Pietro Incardona's avatar
Pietro Incardona committed
1172 1173
	 * \return iteself
	 *
1174
	 */
incardon's avatar
incardon committed
1175
	vector_dist_comm<dim,St,prop,layout,layout_base,Decomposition,Memory> & operator=(const vector_dist_comm<dim,St,prop,layout,layout_base,Decomposition,Memory> & vc)
1176 1177 1178 1179 1180 1181 1182 1183 1184 1185
	{
		dec = vc.dec;

		return *this;
	}

	/*! \brief Copy a vector
	 *
	 * \param vc vector to copy
	 *
Pietro Incardona's avatar
Pietro Incardona committed
1186 1187
	 * \return itself
	 *
1188
	 */
incardon's avatar
incardon committed
1189
	vector_dist_comm<dim,St,prop,layout,layout_base,Decomposition,Memory> & operator=(vector_dist_comm<dim,St,prop,layout,layout_base,Decomposition,Memory> && vc)
1190
	{
1191
		dec = vc.dec;
1192 1193 1194

		return *this;
	}
Pietro Incardona's avatar
Pietro Incardona committed
1195 1196 1197 1198 1199 1200 1201 1202 1203

	/*! \brief Ghost put
	 *
	 * \tparam op operation to apply
	 * \tparam prp set of properties
	 *
	 * \param v_pos vector of particle positions
	 * \param v_prp vector od particle properties
	 * \param g_m ghost marker
incardon's avatar
incardon committed
1204
	 * \param opt options
Pietro Incardona's avatar
Pietro Incardona committed
1205 1206
	 *
	 */
incardon's avatar
incardon committed
1207
	template<template<typename,typename> class op, int ... prp> void ghost_put_(openfpm::vector<Point<dim, St>> & v_pos, openfpm::vector<prop> & v_prp, size_t & g_m, size_t opt)
Pietro Incardona's avatar
Pietro Incardona committed
1208 1209 1210 1211 1212
	{
		// Sending property object
		typedef object<typename object_creator<typename prop::type, prp...>::type> prp_object;

		// send vector for each processor
1213
		typedef openfpm::vector<prp_object> send_vector;
Pietro Incardona's avatar
Pietro Incardona committed
1214 1215

		openfpm::vector<send_vector> g_send_prp;
1216
		fill_send_ghost_put_prp_buf<send_vector, prp_object, prp...>(v_prp,g_send_prp,g_m);
Pietro Incardona's avatar
Pietro Incardona committed
1217

1218
		// Send and receive ghost particle information
incardon's avatar
incardon committed
1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230
		if (opt & NO_CHANGE_ELEMENTS)
		{
			size_t opt_ = compute_options(opt);

			op_ssend_recv_merge<op> opm(g_opart);
			v_cl.SSendRecvP_op<op_ssend_recv_merge<op>,send_vector,decltype(v_prp),prp...>(g_send_prp,v_prp,prc_recv_get,opm,prc_g_opart,g_opart_sz,opt_);
		}
		else
		{
			op_ssend_recv_merge<op> opm(g_opart);
			v_cl.SSendRecvP_op<op_ssend_recv_merge<op>,send_vector,decltype(v_prp),prp...>(g_send_prp,v_prp,prc_recv_get,opm,prc_recv_put,recv_sz_put);
		}
1231 1232 1233 1234 1235 1236

		// process also the local replicated particles

		size_t i2 = 0;


incardon's avatar
incardon committed
1237
		if (lg_m < v_prp.size() && v_prp.size() - lg_m != o_part_loc.size())
incardon's avatar
incardon committed
1238 1239
		{
			std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " Local ghost particles = " << v_prp.size() - lg_m << " != " << o_part_loc.size() << std::endl;
1240
			std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " Check that you did a ghost_get before a ghost_put" << std::endl;
incardon's avatar
incardon committed
1241
		}
1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252


		for (size_t i = lg_m ; i < v_prp.size() ; i++)
		{
			auto dst = v_prp.get(o_part_loc.template get<0>(i2));
			auto src = v_prp.get(i);
			copy_cpu_encap_encap_op_prp<op,decltype(v_prp.get(0)),decltype(v_prp.get(0)),prp...> cp(src,dst);

			boost::mpl::for_each_ref< boost::mpl::range_c<int,0,sizeof...(prp)> >(cp);
			i2++;
		}
Pietro Incardona's avatar
Pietro Incardona committed
1253
	}
1254 1255 1256 1257
};


#endif /* SRC_VECTOR_VECTOR_DIST_COMM_HPP_ */