VCluster.hpp 32.9 KB
Newer Older
incardon's avatar
incardon committed
1 2 3 4 5 6 7 8 9 10
/*
 * Vcluster.hpp
 *
 *  Created on: Feb 8, 2016
 *      Author: Pietro Incardona
 */

#ifndef VCLUSTER_HPP
#define VCLUSTER_HPP

11
#include <signal.h>
incardon's avatar
incardon committed
12

incardon's avatar
incardon committed
13 14
#include "VCluster_base.hpp"
#include "VCluster_meta_function.hpp"
incardon's avatar
incardon committed
15
#include "util/math_util_complex.hpp"
incardon's avatar
incardon committed
16

incardon's avatar
incardon committed
17 18 19
#ifdef CUDA_GPU
extern CudaMemory mem_tmp;
#endif
incardon's avatar
incardon committed
20

21
void bt_sighandler(int sig, siginfo_t * info, void * ctx);
incardon's avatar
incardon committed
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41

/*! \brief Implementation of VCluster class
 *
 * This class implement communication functions. Like summation, minimum and maximum across
 * processors, or Dynamic Sparse Data Exchange (DSDE)
 *
 * ## Vcluster Min max sum
 * \snippet VCluster_unit_tests.hpp max min sum
 *
 * ## Vcluster all gather
 * \snippet VCluster_unit_test_util.hpp allGather numbers
 *
 * ## Dynamic sparse data exchange with complex objects
 * \snippet VCluster_semantic_unit_tests.hpp dsde with complex objects1
 *
 * ## Dynamic sparse data exchange with buffers
 * \snippet VCluster_unit_test_util.hpp dsde
 * \snippet VCluster_unit_test_util.hpp message alloc
 *
 */
42 43
template<typename InternalMemory = HeapMemory>
class Vcluster: public Vcluster_base<InternalMemory>
incardon's avatar
incardon committed
44
{
45 46
	typedef Vcluster_base<InternalMemory> self_base;

incardon's avatar
incardon committed
47 48 49 50 51 52 53 54
	template<typename T>
	struct index_gen {};

	//! Process the receive buffer using the specified properties (meta-function)
	template<int ... prp>
	struct index_gen<index_tuple<prp...>>
	{
		//! Process the receive buffer
incardon's avatar
incardon committed
55 56 57 58
		template<typename op,
		         typename T,
				 typename S,
				 template <typename> class layout_base = memory_traits_lin>
incardon's avatar
incardon committed
59 60
		inline static void process_recv(Vcluster & vcl, S & recv, openfpm::vector<size_t> * sz_recv,
				                        openfpm::vector<size_t> * sz_recv_byte, op & op_param,size_t opt)
incardon's avatar
incardon committed
61
		{
62 63 64 65 66 67 68 69
			if (opt == MPI_GPU_DIRECT && !std::is_same<InternalMemory,CudaMemory>::value)
			{
				// In order to have this option activated InternalMemory must be  CudaMemory

				std::cout << __FILE__ << ":" << __LINE__ << " error: in order to have MPI_GPU_DIRECT VCluster must use CudaMemory internally, the most probable" <<
						                                    " cause of this problem is that you are using MPI_GPU_DIRECT option with a non-GPU data-structure" << std::endl;
			}

incardon's avatar
incardon committed
70
			vcl.process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,sz_recv,sz_recv_byte,op_param,opt);
incardon's avatar
incardon committed
71 72 73
		}
	};

74
	/*! \brief Prepare the send buffer and send the message to other processors
incardon's avatar
incardon committed
75 76 77 78 79 80 81 82 83
	 *
	 * \tparam op Operation to execute in merging the receiving data
	 * \tparam T sending object
	 * \tparam S receiving object
	 *
	 * \note T and S must not be the same object but a S.operation(T) must be defined. There the flexibility
	 * of the operation is defined by op
	 *
	 * \param send sending buffer
84
	 * \param recv receiving object
incardon's avatar
incardon committed
85 86 87
	 * \param prc_send each object T in the vector send is sent to one processor specified in this list.
	 *                 This mean that prc_send.size() == send.size()
	 * \param prc_recv list of processor from where we receive (output), in case of RECEIVE_KNOWN muts be filled
88
	 * \param sz_recv size of each receiving message (output), in case of RECEICE_KNOWN must be filled
incardon's avatar
incardon committed
89 90 91
	 * \param opt Options using RECEIVE_KNOWN enable patters with less latencies, in case of RECEIVE_KNOWN
	 *
	 */
incardon's avatar
incardon committed
92
	template<typename op, typename T, typename S, template <typename> class layout_base> void prepare_send_buffer(openfpm::vector<T> & send,
93 94 95 96 97
			                                                               S & recv,
																		   openfpm::vector<size_t> & prc_send,
																		   openfpm::vector<size_t> & prc_recv,
																		   openfpm::vector<size_t> & sz_recv,
																		   size_t opt)
incardon's avatar
incardon committed
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
	{
		openfpm::vector<size_t> sz_recv_byte(sz_recv.size());

		// Reset the receive buffer
		reset_recv_buf();

	#ifdef SE_CLASS1

		if (send.size() != prc_send.size())
			std::cerr << __FILE__ << ":" << __LINE__ << " Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl;

	#endif

		// Prepare the sending buffer
		openfpm::vector<const void *> send_buf;
		openfpm::vector<size_t> send_sz_byte;
incardon's avatar
incardon committed
114
		openfpm::vector<size_t> prc_send_;
incardon's avatar
incardon committed
115 116 117 118 119 120 121 122

		size_t tot_size = 0;

		for (size_t i = 0; i < send.size() ; i++)
		{
			size_t req = 0;

			//Pack requesting
incardon's avatar
incardon committed
123
			pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op, T, S, layout_base>::packingRequest(send.get(i), req, send_sz_byte);
incardon's avatar
incardon committed
124 125 126
			tot_size += req;
		}

incardon's avatar
incardon committed
127 128
		pack_unpack_cond_with_prp_inte_lin<T>::construct_prc(prc_send,prc_send_);

incardon's avatar
incardon committed
129 130 131 132 133 134 135 136 137 138 139
		HeapMemory pmem;

		ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem));
		mem.incRef();

		for (size_t i = 0; i < send.size() ; i++)
		{
			//Packing

			Pack_stat sts;

140
			pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value, op, T, S, layout_base>::packing(mem, send.get(i), sts, send_buf,opt);
incardon's avatar
incardon committed
141 142 143
		}

		// receive information
144
		base_info<InternalMemory> bi(&this->recv_buf,prc_recv,sz_recv_byte,this->tags,opt);
incardon's avatar
incardon committed
145 146 147 148 149 150 151 152 153 154 155 156 157 158

		// Send and recv multiple messages
		if (opt & RECEIVE_KNOWN)
		{
			// We we are passing the number of element but not the byte, calculate the byte
			if (opt & KNOWN_ELEMENT_OR_BYTE)
			{
				// We know the number of element convert to byte (ONLY if it is possible)
				if (has_pack_gen<typename T::value_type>::value == false && is_vector<T>::value == true)
				{
					for (size_t i = 0 ; i < sz_recv.size() ; i++)
						sz_recv_byte.get(i) = sz_recv.get(i) * sizeof(typename T::value_type);
				}
				else
incardon's avatar
incardon committed
159
				{std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option or NO_CHANGE_ELEMENTS" << std::endl;}
incardon's avatar
incardon committed
160

161
				self_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
incardon's avatar
incardon committed
162 163 164 165
											prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte.getPointer(),msg_alloc_known,(void *)&bi);
			}
			else
			{
166
				self_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
incardon's avatar
incardon committed
167
											prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc_known,(void *)&bi);
168
				sz_recv_byte = self_base::sz_recv_tmp;
incardon's avatar
incardon committed
169
			}
incardon's avatar
incardon committed
170 171 172
		}
		else
		{
incardon's avatar
incardon committed
173
			self_base::tags.clear();
incardon's avatar
incardon committed
174
			prc_recv.clear();
175
			self_base::sendrecvMultipleMessagesNBX(prc_send_.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
incardon's avatar
incardon committed
176 177 178
		}

		// Reorder the buffer
179
		reorder_buffer(prc_recv,self_base::tags,sz_recv_byte);
incardon's avatar
incardon committed
180 181 182 183 184 185 186 187 188 189 190 191

		mem.decRef();
		delete &mem;
	}


	/*! \brief Reset the receive buffer
	 *
	 *
	 */
	void reset_recv_buf()
	{
192 193
		for (size_t i = 0 ; i < self_base::recv_buf.size() ; i++)
		{self_base::recv_buf.get(i).resize(0);}
incardon's avatar
incardon committed
194

195
		self_base::recv_buf.resize(0);
incardon's avatar
incardon committed
196 197 198 199 200 201 202 203 204
	}

	/*! \brief Base info
	 *
	 * \param recv_buf receive buffers
	 * \param prc processors involved
	 * \param size of the received data
	 *
	 */
205
	template<typename Memory>
incardon's avatar
incardon committed
206 207 208
	struct base_info
	{
		//! Receive buffer
incardon's avatar
incardon committed
209
		openfpm::vector_fr<BMemory<Memory>> * recv_buf;
incardon's avatar
incardon committed
210 211 212 213
		//! receiving processor list
		openfpm::vector<size_t> & prc;
		//! size of each message
		openfpm::vector<size_t> & sz;
incardon's avatar
incardon committed
214 215
		//! tags
		openfpm::vector<size_t> &tags;
incardon's avatar
incardon committed
216

217 218
		//! options
		size_t opt;
incardon's avatar
incardon committed
219 220

		//! constructor
incardon's avatar
incardon committed
221
		base_info(openfpm::vector_fr<BMemory<Memory>> * recv_buf, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, openfpm::vector<size_t> & tags,size_t opt)
222
		:recv_buf(recv_buf),prc(prc),sz(sz),tags(tags),opt(opt)
incardon's avatar
incardon committed
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
		{}
	};

	/*! \brief Call-back to allocate buffer to receive data
	 *
	 * \param msg_i size required to receive the message from i
	 * \param total_msg total size to receive from all the processors
	 * \param total_p the total number of processor that want to communicate with you
	 * \param i processor id
	 * \param ri request id (it is an id that goes from 0 to total_p, and is unique
	 *           every time message_alloc is called)
	 * \param ptr a pointer to the vector_dist structure
	 *
	 * \return the pointer where to store the message for the processor i
	 *
	 */
incardon's avatar
incardon committed
239
	static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
incardon's avatar
incardon committed
240
	{
241
		base_info<InternalMemory> & rinfo = *(base_info<InternalMemory> *)ptr;
incardon's avatar
incardon committed
242 243 244 245 246 247 248 249 250 251 252 253 254 255

		if (rinfo.recv_buf == NULL)
		{
			std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
			return NULL;
		}

		rinfo.recv_buf->resize(ri+1);

		rinfo.recv_buf->get(ri).resize(msg_i);

		// Receive info
		rinfo.prc.add(i);
		rinfo.sz.add(msg_i);
incardon's avatar
incardon committed
256
		rinfo.tags.add(tag);
incardon's avatar
incardon committed
257 258

		// return the pointer
259 260 261 262 263

		// If we have GPU direct activated use directly the cuda buffer
		if (rinfo.opt & MPI_GPU_DIRECT)
		{
#if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
incardon's avatar
incardon committed
264
			return rinfo.recv_buf->last().getDevicePointer();
265 266 267 268 269
#else
			return rinfo.recv_buf->last().getPointer();
#endif
		}

incardon's avatar
incardon committed
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
		return rinfo.recv_buf->last().getPointer();
	}


	/*! \brief Call-back to allocate buffer to receive data
	 *
	 * \param msg_i size required to receive the message from i
	 * \param total_msg total size to receive from all the processors
	 * \param total_p the total number of processor that want to communicate with you
	 * \param i processor id
	 * \param ri request id (it is an id that goes from 0 to total_p, and is unique
	 *           every time message_alloc is called)
	 * \param ptr a pointer to the vector_dist structure
	 *
	 * \return the pointer where to store the message for the processor i
	 *
	 */
incardon's avatar
incardon committed
287
	static void * msg_alloc_known(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
incardon's avatar
incardon committed
288
	{
289
		base_info<InternalMemory> & rinfo = *(base_info<InternalMemory> *)ptr;
incardon's avatar
incardon committed
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306

		if (rinfo.recv_buf == NULL)
		{
			std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
			return NULL;
		}

		rinfo.recv_buf->resize(ri+1);

		rinfo.recv_buf->get(ri).resize(msg_i);

		// return the pointer
		return rinfo.recv_buf->last().getPointer();
	}
	
	/*! \brief Process the receive buffer
	 *
307
	 * \tparam op operation to do in merging the received data
incardon's avatar
incardon committed
308 309 310 311 312
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 * \tparam prp properties to receive
	 *
	 * \param recv receive object
313 314 315
	 * \param sz vector that store how many element has been added per processors on S
	 * \param sz_byte byte received on a per processor base
	 * \param op_param operation to do in merging the received information with recv
incardon's avatar
incardon committed
316 317
	 *
	 */
incardon's avatar
incardon committed
318
	template<typename op, typename T, typename S, template <typename> class layout_base ,unsigned int ... prp >
319 320 321
	void process_receive_buffer_with_prp(S & recv,
			                             openfpm::vector<size_t> * sz,
										 openfpm::vector<size_t> * sz_byte,
incardon's avatar
incardon committed
322 323
										 op & op_param,
										 size_t opt)
incardon's avatar
incardon committed
324 325
	{
		if (sz != NULL)
326
		{sz->resize(self_base::recv_buf.size());}
incardon's avatar
incardon committed
327

328
		pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op, T, S, layout_base, prp... >::unpacking(recv, self_base::recv_buf, sz, sz_byte, op_param,opt);
incardon's avatar
incardon committed
329 330 331 332 333 334 335 336 337 338
	}

	public:

	/*! \brief Constructor
	 *
	 * \param argc main number of arguments
	 * \param argv main set of arguments
	 *
	 */
incardon's avatar
incardon committed
339
	Vcluster(int *argc, char ***argv,MPI_Comm ext_comm = MPI_COMM_WORLD)
incardon's avatar
incardon committed
340
	:Vcluster_base<InternalMemory>(argc,argv,ext_comm)
incardon's avatar
incardon committed
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
	{
	}

	/*! \brief Semantic Gather, gather the data from all processors into one node
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * Gather(T,S,root,op=add);
	 *
	 * "Gather" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add(T).
	 *
	 * ### Example send a vector of structures, and merge all together in one vector
	 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master
	 *
	 * ### Example send a vector of structures, and merge all together in one vector
	 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 *
364 365
	 * \param send Object to send
	 * \param recv Object to receive
incardon's avatar
incardon committed
366 367 368 369 370
	 * \param root witch node should collect the information
	 *
	 * \return true if the function completed succefully
	 *
	 */
371
	template<typename T, typename S, template <typename> class layout_base=memory_traits_lin> bool SGather(T & send, S & recv,size_t root)
incardon's avatar
incardon committed
372 373 374 375
	{
		openfpm::vector<size_t> prc;
		openfpm::vector<size_t> sz;

376
		return SGather<T,S,layout_base>(send,recv,prc,sz,root);
incardon's avatar
incardon committed
377 378
	}

379
	//! metafunction
incardon's avatar
incardon committed
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
	template<size_t index, size_t N> struct MetaFuncOrd {
	   enum { value = index };
	};

	/*! \brief Semantic Gather, gather the data from all processors into one node
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * Gather(T,S,root,op=add);
	 *
	 * "Gather" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add(T).
	 *
	 * ### Example send a vector of structures, and merge all together in one vector
	 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master
	 *
	 * ### Example send a vector of structures, and merge all together in one vector
	 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 *
404 405
	 * \param send Object to send
	 * \param recv Object to receive
incardon's avatar
incardon committed
406 407 408 409 410 411 412
	 * \param root witch node should collect the information
	 * \param prc processors from witch we received the information
	 * \param sz size of the received information for each processor
	 *
	 * \return true if the function completed succefully
	 *
	 */
incardon's avatar
incardon committed
413 414 415 416 417 418 419 420
	template<typename T,
	         typename S,
			 template <typename> class layout_base = memory_traits_lin>
	bool SGather(T & send,
			     S & recv,
				 openfpm::vector<size_t> & prc,
				 openfpm::vector<size_t> & sz,
				 size_t root)
incardon's avatar
incardon committed
421
	{
incardon's avatar
incardon committed
422
#ifdef SE_CLASS1
incardon's avatar
incardon committed
423
		if (&send == (T *)&recv)
incardon's avatar
incardon committed
424 425 426
		{std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " using SGather in general the sending object and the receiving object must be different" << std::endl;}
#endif

incardon's avatar
incardon committed
427 428 429 430
		// Reset the receive buffer
		reset_recv_buf();

		// If we are on master collect the information
431
		if (self_base::getProcessUnitID() == root)
incardon's avatar
incardon committed
432 433 434 435 436
		{
			// send buffer (master does not send anything) so send req and send_buf
			// remain buffer with size 0
			openfpm::vector<size_t> send_req;

437
			self_base::tags.clear();
incardon's avatar
incardon committed
438

incardon's avatar
incardon committed
439
			// receive information
440
			base_info<InternalMemory> bi(&this->recv_buf,prc,sz,this->tags,0);
incardon's avatar
incardon committed
441 442

			// Send and recv multiple messages
443
			self_base::sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
incardon's avatar
incardon committed
444

445
			// we generate the list of the properties to unpack
incardon's avatar
incardon committed
446 447 448 449 450
			typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;

			// operation object
			op_ssend_recv_add<void> opa;

451
			// Reorder the buffer
452
			reorder_buffer(prc,self_base::tags,sz);
453

incardon's avatar
incardon committed
454
			index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz,NULL,opa,0);
incardon's avatar
incardon committed
455 456 457 458 459 460 461 462 463 464

			recv.add(send);
			prc.add(root);
			sz.add(send.size());
		}
		else
		{
			// send buffer (master does not send anything) so send req and send_buf
			// remain buffer with size 0
			openfpm::vector<size_t> send_prc;
465
			openfpm::vector<size_t> send_prc_;
incardon's avatar
incardon committed
466 467 468 469 470 471 472 473 474 475
			send_prc.add(root);

			openfpm::vector<size_t> sz;

			openfpm::vector<const void *> send_buf;
				
			//Pack requesting

			size_t tot_size = 0;

incardon's avatar
incardon committed
476
			pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packingRequest(send, tot_size, sz);
incardon's avatar
incardon committed
477 478 479 480 481 482 483 484 485 486

			HeapMemory pmem;

			ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem));
			mem.incRef();

			//Packing

			Pack_stat sts;
			
incardon's avatar
incardon committed
487
			pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packing(mem, send, sts, send_buf);
incardon's avatar
incardon committed
488

489 490
			pack_unpack_cond_with_prp_inte_lin<T>::construct_prc(send_prc,send_prc_);

491
			self_base::tags.clear();
incardon's avatar
incardon committed
492

incardon's avatar
incardon committed
493
			// receive information
494
			base_info<InternalMemory> bi(NULL,prc,sz,self_base::tags,0);
incardon's avatar
incardon committed
495 496

			// Send and recv multiple messages
497
			self_base::sendrecvMultipleMessagesNBX(send_prc_.size(),(size_t *)sz.getPointer(),(size_t *)send_prc_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi,NONE);
incardon's avatar
incardon committed
498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522

			mem.decRef();
			delete &mem;
		}
		
		return true;
	}

	/*! \brief Semantic Scatter, scatter the data from one processor to the other node
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * Scatter(T,S,...,op=add);
	 *
	 * "Scatter" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
	 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 *
523 524
	 * \param send Object to send
	 * \param recv Object to receive
incardon's avatar
incardon committed
525 526 527 528 529 530 531
	 * \param prc processor involved in the scatter
	 * \param sz size of each chunks
	 * \param root which processor should scatter the information
	 *
	 * \return true if the function completed succefully
	 *
	 */
incardon's avatar
incardon committed
532
	template<typename T, typename S, template <typename> class layout_base=memory_traits_lin> bool SScatter(T & send, S & recv, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, size_t root)
incardon's avatar
incardon committed
533 534 535 536 537
	{
		// Reset the receive buffer
		reset_recv_buf();

		// If we are on master scatter the information
538
		if (self_base::getProcessUnitID() == root)
incardon's avatar
incardon committed
539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555
		{
			// Prepare the sending buffer
			openfpm::vector<const void *> send_buf;


			openfpm::vector<size_t> sz_byte;
			sz_byte.resize(sz.size());

			size_t ptr = 0;

			for (size_t i = 0; i < sz.size() ; i++)
			{
				send_buf.add((char *)send.getPointer() + sizeof(typename T::value_type)*ptr );
				sz_byte.get(i) = sz.get(i) * sizeof(typename T::value_type);
				ptr += sz.get(i);
			}

556
			self_base::tags.clear();
incardon's avatar
incardon committed
557

incardon's avatar
incardon committed
558
			// receive information
559
			base_info<InternalMemory> bi(&this->recv_buf,prc,sz,this->tags,0);
incardon's avatar
incardon committed
560 561

			// Send and recv multiple messages
562
			self_base::sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
incardon's avatar
incardon committed
563 564 565 566 567 568 569

			// we generate the list of the properties to pack
			typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;

			// operation object
			op_ssend_recv_add<void> opa;

incardon's avatar
incardon committed
570
			index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa,0);
incardon's avatar
incardon committed
571 572 573 574 575 576
		}
		else
		{
			// The non-root receive
			openfpm::vector<size_t> send_req;

577
			self_base::tags.clear();
incardon's avatar
incardon committed
578

incardon's avatar
incardon committed
579
			// receive information
580
			base_info<InternalMemory> bi(&this->recv_buf,prc,sz,this->tags,0);
incardon's avatar
incardon committed
581 582

			// Send and recv multiple messages
583
			self_base::sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
incardon's avatar
incardon committed
584 585 586 587 588 589 590

			// we generate the list of the properties to pack
			typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;

			// operation object
			op_ssend_recv_add<void> opa;

incardon's avatar
incardon committed
591
			index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa,0);
incardon's avatar
incardon committed
592 593 594 595 596 597 598 599
		}

		return true;
	}
	
	/*! \brief reorder the receiving buffer
	 *
	 * \param prc list of the receiving processors
600
	 * \param sz_recv list of size of the receiving messages (in byte)
incardon's avatar
incardon committed
601 602
	 *
	 */
603
	void reorder_buffer(openfpm::vector<size_t> & prc, const openfpm::vector<size_t> & tags, openfpm::vector<size_t> & sz_recv)
incardon's avatar
incardon committed
604 605 606 607 608 609 610
	{

		struct recv_buff_reorder
		{
			//! processor
			size_t proc;

incardon's avatar
incardon committed
611 612
			size_t tag;

incardon's avatar
incardon committed
613 614 615 616 617
			//! position in the receive list
			size_t pos;

			//! default constructor
			recv_buff_reorder()
incardon's avatar
incardon committed
618
			:proc(0),tag(0),pos(0)
incardon's avatar
incardon committed
619 620 621 622 623
			{};

			//! needed to reorder
			bool operator<(const recv_buff_reorder & rd) const
			{
incardon's avatar
incardon committed
624 625 626 627
				if (proc == rd.proc)
				{return tag < rd.tag;}

				return (proc < rd.proc);
incardon's avatar
incardon committed
628 629 630 631 632
			}
		};

		openfpm::vector<recv_buff_reorder> rcv;

633
		rcv.resize(self_base::recv_buf.size());
incardon's avatar
incardon committed
634 635 636 637

		for (size_t i = 0 ; i < rcv.size() ; i++)
		{
			rcv.get(i).proc = prc.get(i);
638 639 640 641
			if (i < tags.size())
			{rcv.get(i).tag = tags.get(i);}
			else
			{rcv.get(i).tag = (unsigned int)-1;}
incardon's avatar
incardon committed
642 643 644 645 646 647
			rcv.get(i).pos = i;
		}

		// we sort based on processor
		rcv.sort();

incardon's avatar
incardon committed
648
		openfpm::vector_fr<BMemory<InternalMemory>> recv_ord;
incardon's avatar
incardon committed
649 650 651 652 653 654 655 656 657 658 659
		recv_ord.resize(rcv.size());

		openfpm::vector<size_t> prc_ord;
		prc_ord.resize(rcv.size());

		openfpm::vector<size_t> sz_recv_ord;
		sz_recv_ord.resize(rcv.size());

		// Now we reorder rcv
		for (size_t i = 0 ; i < rcv.size() ; i++)
		{
660
			recv_ord.get(i).swap(self_base::recv_buf.get(rcv.get(i).pos));
incardon's avatar
incardon committed
661 662 663 664 665
			prc_ord.get(i) = rcv.get(i).proc;
			sz_recv_ord.get(i) = sz_recv.get(rcv.get(i).pos);
		}

		// move rcv into recv
incardon's avatar
incardon committed
666 667 668 669 670 671
		// Now we swap back to recv_buf in an ordered way
		for (size_t i = 0 ; i < rcv.size() ; i++)
		{
			self_base::recv_buf.get(i).swap(recv_ord.get(i));
		}

incardon's avatar
incardon committed
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689
		prc.swap(prc_ord);
		sz_recv.swap(sz_recv_ord);

		// reorder prc_recv and recv_sz
	}

	/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * Recv(T,S,...,op=add);
	 *
	 * "SendRecv" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
690
	 * \snippet VCluster_semantic_unit_tests.hpp dsde with complex objects1
incardon's avatar
incardon committed
691 692 693 694
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 *
695 696 697 698 699 700
	 * \param send Object to send
	 * \param recv Object to receive
	 * \param prc_send destination processors
	 * \param prc_recv list of the receiving processors
	 * \param sz_recv number of elements added
	 * \param opt options
incardon's avatar
incardon committed
701 702 703 704
	 *
	 * \return true if the function completed succefully
	 *
	 */
incardon's avatar
incardon committed
705 706 707 708 709 710 711 712 713
	template<typename T,
	         typename S,
			 template <typename> class layout_base = memory_traits_lin>
	bool SSendRecv(openfpm::vector<T> & send,
			       S & recv,
				   openfpm::vector<size_t> & prc_send,
				   openfpm::vector<size_t> & prc_recv,
				   openfpm::vector<size_t> & sz_recv,
				   size_t opt = NONE)
incardon's avatar
incardon committed
714
	{
incardon's avatar
incardon committed
715
		prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
incardon's avatar
incardon committed
716 717 718 719 720 721

		// we generate the list of the properties to pack
		typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;

		op_ssend_recv_add<void> opa;

incardon's avatar
incardon committed
722
		index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz_recv,NULL,opa,opt);
incardon's avatar
incardon committed
723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745

		return true;
	}


	/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * SSendRecv(T,S,...,op=add);
	 *
	 * "SendRecv" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add<prp...>(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
	 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 * \tparam prp properties for merging
	 *
746 747 748 749 750 751
	 * \param send Object to send
	 * \param recv Object to receive
	 * \param prc_send destination processors
	 * \param prc_recv processors from which we received
	 * \param sz_recv number of elements added per processor
	 * \param sz_recv_byte message received from each processor in byte
incardon's avatar
incardon committed
752
	 *
753
	 * \return true if the function completed successful
incardon's avatar
incardon committed
754 755
	 *
	 */
incardon's avatar
incardon committed
756
	template<typename T, typename S, template <typename> class layout_base, int ... prp> bool SSendRecvP(openfpm::vector<T> & send,
757 758 759 760
			                                                      S & recv,
																  openfpm::vector<size_t> & prc_send,
																  openfpm::vector<size_t> & prc_recv,
																  openfpm::vector<size_t> & sz_recv,
incardon's avatar
incardon committed
761 762
																  openfpm::vector<size_t> & sz_recv_byte,
																  size_t opt = NONE)
incardon's avatar
incardon committed
763
	{
incardon's avatar
incardon committed
764
		prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
incardon's avatar
incardon committed
765 766 767 768 769

		// operation object
		op_ssend_recv_add<void> opa;

		// process the received information
770
		process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,&sz_recv_byte,opa,opt);
incardon's avatar
incardon committed
771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793

		return true;
	}


	/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * SSendRecv(T,S,...,op=add);
	 *
	 * "SendRecv" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add<prp...>(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
	 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 * \tparam prp properties for merging
	 *
794 795 796 797 798
	 * \param send Object to send
	 * \param recv Object to receive
	 * \param prc_send destination processors
	 * \param prc_recv list of the processors from which we receive
	 * \param sz_recv number of elements added per processors
incardon's avatar
incardon committed
799 800 801 802
	 *
	 * \return true if the function completed succefully
	 *
	 */
incardon's avatar
incardon committed
803 804 805 806 807
	template<typename T, typename S, template <typename> class layout_base, int ... prp>
	bool SSendRecvP(openfpm::vector<T> & send,
			        S & recv,
					openfpm::vector<size_t> & prc_send,
			    	openfpm::vector<size_t> & prc_recv,
incardon's avatar
incardon committed
808 809
					openfpm::vector<size_t> & sz_recv,
					size_t opt = NONE)
incardon's avatar
incardon committed
810
	{
incardon's avatar
incardon committed
811
		prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
incardon's avatar
incardon committed
812 813 814 815 816

		// operation object
		op_ssend_recv_add<void> opa;

		// process the received information
incardon's avatar
incardon committed
817
		process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,NULL,opa,opt);
incardon's avatar
incardon committed
818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840

		return true;
	}

	/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * SSendRecv(T,S,...,op=add);
	 *
	 * "SendRecv" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add<prp...>(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
	 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
	 *
	 * \tparam op type of operation
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 * \tparam prp properties for merging
	 *
841 842 843 844 845
	 * \param send Object to send
	 * \param recv Object to receive
	 * \param prc_send destination processors
	 * \param op_param operation object (operation to do im merging the information)
	 * \param recv_sz size of each receiving buffer. This parameters are output
incardon's avatar
incardon committed
846 847 848 849 850 851 852 853 854
	 *        with RECEIVE_KNOWN you must feed this parameter
	 * \param prc_recv from which processor we receive messages
	 *        with RECEIVE_KNOWN you must feed this parameter
	 * \param opt options default is NONE, another is RECEIVE_KNOWN. In this case each
	 *        processor is assumed to know from which processor receive, and the size of
	 *        the message. in such case prc_recv and sz_recv are not anymore parameters
	 *        but must be input.
	 *
	 *
855
	 * \return true if the function completed successful
incardon's avatar
incardon committed
856 857
	 *
	 */
incardon's avatar
incardon committed
858 859 860 861 862 863 864 865 866 867 868 869
	template<typename op,
	         typename T,
			 typename S,
			 template <typename> class layout_base,
			 int ... prp>
	bool SSendRecvP_op(openfpm::vector<T> & send,
			           S & recv,
					   openfpm::vector<size_t> & prc_send,
					   op & op_param,
					   openfpm::vector<size_t> & prc_recv,
					   openfpm::vector<size_t> & recv_sz,
				 	   size_t opt = NONE)
incardon's avatar
incardon committed
870
	{
incardon's avatar
incardon committed
871
		prepare_send_buffer<op,T,S,layout_base>(send,recv,prc_send,prc_recv,recv_sz,opt);
incardon's avatar
incardon committed
872 873

		// process the received information
874
		process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,NULL,NULL,op_param,opt);
incardon's avatar
incardon committed
875 876 877 878 879 880

		return true;
	}

};

incardon's avatar
incardon committed
881 882 883 884 885
enum init_options
{
	none = 0x0,
	in_situ_visualization = 0x1,
};
incardon's avatar
incardon committed
886

incardon's avatar
incardon committed
887
extern init_options global_option;
incardon's avatar
incardon committed
888 889 890

// Function to initialize the global VCluster //

891 892
extern Vcluster<> * global_v_cluster_private_heap;
extern Vcluster<CudaMemory> * global_v_cluster_private_cuda;
incardon's avatar
incardon committed
893

incardon's avatar
incardon committed
894 895
static inline void delete_global_v_cluster_private()
{
incardon's avatar
incardon committed
896 897
        delete global_v_cluster_private_heap;
        delete global_v_cluster_private_cuda;
incardon's avatar
incardon committed
898 899 900 901 902 903 904 905 906 907
}


/*! \brief Finalize the library
 *
 * This function MUST be called at the end of the program
 *
 */
static inline void openfpm_finalize()
{
incardon's avatar
incardon committed
908 909 910 911 912
        if (global_option == init_options::in_situ_visualization)
        {
                MPI_Request bar_req;
                MPI_Ibarrier(MPI_COMM_WORLD,&bar_req);
        }
incardon's avatar
incardon committed
913 914 915

#ifdef HAVE_PETSC

incardon's avatar
incardon committed
916
        PetscFinalize();
incardon's avatar
incardon committed
917 918 919

#endif

incardon's avatar
incardon committed
920 921 922 923 924 925 926 927 928 929
        delete_global_v_cluster_private();
        ofp_initialized = false;

#ifdef CUDA_GPU

        // Release memory
        mem_tmp.destroy();
        mem_tmp.decRef();

#endif
incardon's avatar
incardon committed
930 931
}

incardon's avatar
incardon committed
932

incardon's avatar
incardon committed
933 934 935 936 937 938
/*! \brief Initialize a global instance of Runtime Virtual Cluster Machine
 *
 * Initialize a global instance of Runtime Virtual Cluster Machine
 *
 */

incardon's avatar
incardon committed
939
static inline void init_global_v_cluster_private(int *argc, char ***argv, init_options option)
incardon's avatar
incardon committed
940
{
incardon's avatar
incardon committed
941 942 943
	global_option = option;
	if (option == init_options::in_situ_visualization)
	{
944 945 946 947 948 949
		int flag;
		MPI_Initialized(&flag);

		if (flag == false)
		{MPI_Init(argc,argv);}

incardon's avatar
incardon committed
950
		MPI_Comm com_compute;
incardon's avatar
incardon committed
951

incardon's avatar
incardon committed
952 953 954 955 956 957 958 959 960 961
		int rank;
		MPI_Comm_rank(MPI_COMM_WORLD,&rank);

		if (rank == 0)
		{MPI_Comm_split(MPI_COMM_WORLD, MPI_UNDEFINED,rank, &com_compute);}
		else
		{MPI_Comm_split(MPI_COMM_WORLD,0,rank, &com_compute);}

		if (rank != 0 )
		{
incardon's avatar
incardon committed
962 963
			if (global_v_cluster_private_heap == NULL)
			{global_v_cluster_private_heap = new Vcluster<>(argc,argv,com_compute);}
964 965

                	if (global_v_cluster_private_cuda == NULL)
incardon's avatar
incardon committed
966
                	{global_v_cluster_private_cuda = new Vcluster<CudaMemory>(argc,argv,com_compute);}
incardon's avatar
incardon committed
967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988
		}
		else
		{
			int flag = false;
			MPI_Request bar_req;
			MPI_Ibarrier(MPI_COMM_WORLD,&bar_req);
			//! barrier status
			MPI_Status bar_stat;

			while(flag == false)
			{
				std::cout << "I am node " << rank << std::endl;
				sleep(1);
				MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat));
			}

			openfpm_finalize();
			exit(0);
		}
	}
	else
	{
989 990 991 992 993
        	if (global_v_cluster_private_heap == NULL)
        	{global_v_cluster_private_heap = new Vcluster<>(argc,argv);}

        	if (global_v_cluster_private_cuda == NULL)
        	{global_v_cluster_private_cuda = new Vcluster<CudaMemory>(argc,argv);}
incardon's avatar
incardon committed
994
	}
incardon's avatar
incardon committed
995 996 997
}


998 999
template<typename Memory>
struct get_vcl
incardon's avatar
incardon committed
1000
{
1001 1002 1003 1004 1005
	static Vcluster<Memory> & get()
	{
		return *global_v_cluster_private_heap;
	}
};
incardon's avatar
incardon committed
1006

1007 1008 1009 1010 1011 1012 1013 1014
template<>
struct get_vcl<CudaMemory>
{
	static Vcluster<CudaMemory> & get()
	{
		return *global_v_cluster_private_cuda;
	}
};
incardon's avatar
incardon committed
1015

1016 1017 1018 1019 1020
template<typename Memory = HeapMemory>
static inline Vcluster<Memory> & create_vcluster()
{
	if (global_v_cluster_private_heap == NULL)
	{std::cerr << __FILE__ << ":" << __LINE__ << " Error you must call openfpm_init before using any distributed data structures";}
incardon's avatar
incardon committed
1021

1022
	return get_vcl<Memory>::get();
incardon's avatar
incardon committed
1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036
}



/*! \brief Check if the library has been initialized
 *
 * \return true if the library has been initialized
 *
 */
static inline bool is_openfpm_init()
{
	return ofp_initialized;
}

1037

incardon's avatar
incardon committed
1038 1039 1040 1041 1042
/*! \brief Initialize the library
 *
 * This function MUST be called before any other function
 *
 */
incardon's avatar
incardon committed
1043
static inline void openfpm_init(int *argc, char ***argv, init_options option = init_options::none )
incardon's avatar
incardon committed
1044 1045 1046 1047 1048 1049 1050
{
#ifdef HAVE_PETSC

	PetscInitialize(argc,argv,NULL,NULL);

#endif

incardon's avatar
incardon committed
1051
	init_global_v_cluster_private(argc,argv,option);
incardon's avatar
incardon committed
1052 1053 1054 1055 1056 1057 1058

#ifdef SE_CLASS1
	std::cout << "OpenFPM is compiled with debug mode LEVEL:1. Remember to remove SE_CLASS1 when you go in production" << std::endl;
#endif

#ifdef SE_CLASS2
	std::cout << "OpenFPM is compiled with debug mode LEVEL:2. Remember to remove SE_CLASS2 when you go in production" << std::endl;
1059
#endif
incardon's avatar
incardon committed
1060

1061 1062
#ifdef SE_CLASS3
	std::cout << "OpenFPM is compiled with debug mode LEVEL:3. Remember to remove SE_CLASS3 when you go in production" << std::endl;
incardon's avatar
incardon committed
1063 1064
#endif

1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077
	// install segmentation fault signal handler

	struct sigaction sa;

	sa.sa_sigaction = bt_sighandler;
	sigemptyset(&sa.sa_mask);
	sa.sa_flags = SA_RESTART;

	sigaction(SIGSEGV, &sa, NULL);

	if (*argc != 0)
		program_name = std::string(*argv[0]);

incardon's avatar
incardon committed
1078 1079 1080
	// Initialize math pre-computation tables
	openfpm::math::init_getFactorization();

incardon's avatar
incardon committed
1081
	ofp_initialized = true;
incardon's avatar
incardon committed
1082 1083 1084 1085 1086 1087 1088

#ifdef CUDA_GPU

	// Initialize temporal memory
	mem_tmp.incRef();

#endif
incardon's avatar
incardon committed
1089 1090
}

1091

incardon's avatar
incardon committed
1092 1093 1094 1095


#endif