VCluster.hpp 31.2 KB
Newer Older
incardon's avatar
incardon committed
1
2
3
4
5
6
7
8
9
10
/*
 * Vcluster.hpp
 *
 *  Created on: Feb 8, 2016
 *      Author: Pietro Incardona
 */

#ifndef VCLUSTER_HPP
#define VCLUSTER_HPP

11
#include <signal.h>
incardon's avatar
incardon committed
12

incardon's avatar
incardon committed
13
14
#include "VCluster_base.hpp"
#include "VCluster_meta_function.hpp"
incardon's avatar
incardon committed
15
#include "util/math_util_complex.hpp"
incardon's avatar
incardon committed
16

17
void bt_sighandler(int sig, siginfo_t * info, void * ctx);
incardon's avatar
incardon committed
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

/*! \brief Implementation of VCluster class
 *
 * This class implement communication functions. Like summation, minimum and maximum across
 * processors, or Dynamic Sparse Data Exchange (DSDE)
 *
 * ## Vcluster Min max sum
 * \snippet VCluster_unit_tests.hpp max min sum
 *
 * ## Vcluster all gather
 * \snippet VCluster_unit_test_util.hpp allGather numbers
 *
 * ## Dynamic sparse data exchange with complex objects
 * \snippet VCluster_semantic_unit_tests.hpp dsde with complex objects1
 *
 * ## Dynamic sparse data exchange with buffers
 * \snippet VCluster_unit_test_util.hpp dsde
 * \snippet VCluster_unit_test_util.hpp message alloc
 *
 */
incardon's avatar
incardon committed
38
39
template<typename InternalMemory = HeapMemory>
class Vcluster: public Vcluster_base<InternalMemory>
incardon's avatar
incardon committed
40
{
incardon's avatar
incardon committed
41
42
	typedef Vcluster_base<InternalMemory> self_base;

incardon's avatar
incardon committed
43
44
45
46
47
48
49
50
	template<typename T>
	struct index_gen {};

	//! Process the receive buffer using the specified properties (meta-function)
	template<int ... prp>
	struct index_gen<index_tuple<prp...>>
	{
		//! Process the receive buffer
incardon's avatar
incardon committed
51
52
53
54
		template<typename op,
		         typename T,
				 typename S,
				 template <typename> class layout_base = memory_traits_lin>
incardon's avatar
incardon committed
55
56
		inline static void process_recv(Vcluster & vcl, S & recv, openfpm::vector<size_t> * sz_recv,
				                        openfpm::vector<size_t> * sz_recv_byte, op & op_param,size_t opt)
incardon's avatar
incardon committed
57
		{
58
59
60
61
62
63
64
65
			if (opt == MPI_GPU_DIRECT && !std::is_same<InternalMemory,CudaMemory>::value)
			{
				// In order to have this option activated InternalMemory must be  CudaMemory

				std::cout << __FILE__ << ":" << __LINE__ << " error: in order to have MPI_GPU_DIRECT VCluster must use CudaMemory internally, the most probable" <<
						                                    " cause of this problem is that you are using MPI_GPU_DIRECT option with a non-GPU data-structure" << std::endl;
			}

incardon's avatar
incardon committed
66
			vcl.process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,sz_recv,sz_recv_byte,op_param,opt);
incardon's avatar
incardon committed
67
68
69
		}
	};

70
	/*! \brief Prepare the send buffer and send the message to other processors
incardon's avatar
incardon committed
71
72
73
74
75
76
77
78
79
	 *
	 * \tparam op Operation to execute in merging the receiving data
	 * \tparam T sending object
	 * \tparam S receiving object
	 *
	 * \note T and S must not be the same object but a S.operation(T) must be defined. There the flexibility
	 * of the operation is defined by op
	 *
	 * \param send sending buffer
80
	 * \param recv receiving object
incardon's avatar
incardon committed
81
82
83
	 * \param prc_send each object T in the vector send is sent to one processor specified in this list.
	 *                 This mean that prc_send.size() == send.size()
	 * \param prc_recv list of processor from where we receive (output), in case of RECEIVE_KNOWN muts be filled
84
	 * \param sz_recv size of each receiving message (output), in case of RECEICE_KNOWN must be filled
incardon's avatar
incardon committed
85
86
87
	 * \param opt Options using RECEIVE_KNOWN enable patters with less latencies, in case of RECEIVE_KNOWN
	 *
	 */
incardon's avatar
incardon committed
88
	template<typename op, typename T, typename S, template <typename> class layout_base> void prepare_send_buffer(openfpm::vector<T> & send,
89
90
91
92
93
			                                                               S & recv,
																		   openfpm::vector<size_t> & prc_send,
																		   openfpm::vector<size_t> & prc_recv,
																		   openfpm::vector<size_t> & sz_recv,
																		   size_t opt)
incardon's avatar
incardon committed
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
	{
		openfpm::vector<size_t> sz_recv_byte(sz_recv.size());

		// Reset the receive buffer
		reset_recv_buf();

	#ifdef SE_CLASS1

		if (send.size() != prc_send.size())
			std::cerr << __FILE__ << ":" << __LINE__ << " Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl;

	#endif

		// Prepare the sending buffer
		openfpm::vector<const void *> send_buf;
		openfpm::vector<size_t> send_sz_byte;
incardon's avatar
incardon committed
110
		openfpm::vector<size_t> prc_send_;
incardon's avatar
incardon committed
111
112
113
114
115
116
117
118

		size_t tot_size = 0;

		for (size_t i = 0; i < send.size() ; i++)
		{
			size_t req = 0;

			//Pack requesting
incardon's avatar
incardon committed
119
			pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op, T, S, layout_base>::packingRequest(send.get(i), req, send_sz_byte);
incardon's avatar
incardon committed
120
121
122
			tot_size += req;
		}

incardon's avatar
incardon committed
123
124
		pack_unpack_cond_with_prp_inte_lin<T>::construct_prc(prc_send,prc_send_);

incardon's avatar
incardon committed
125
126
127
128
129
130
131
132
133
134
135
		HeapMemory pmem;

		ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem));
		mem.incRef();

		for (size_t i = 0; i < send.size() ; i++)
		{
			//Packing

			Pack_stat sts;

incardon's avatar
incardon committed
136
			pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value, op, T, S, layout_base>::packing(mem, send.get(i), sts, send_buf,opt);
incardon's avatar
incardon committed
137
138
139
		}

		// receive information
140
		base_info<InternalMemory> bi(&this->recv_buf,prc_recv,sz_recv_byte,this->tags,opt);
incardon's avatar
incardon committed
141
142
143
144
145
146
147
148
149
150
151
152
153
154

		// Send and recv multiple messages
		if (opt & RECEIVE_KNOWN)
		{
			// We we are passing the number of element but not the byte, calculate the byte
			if (opt & KNOWN_ELEMENT_OR_BYTE)
			{
				// We know the number of element convert to byte (ONLY if it is possible)
				if (has_pack_gen<typename T::value_type>::value == false && is_vector<T>::value == true)
				{
					for (size_t i = 0 ; i < sz_recv.size() ; i++)
						sz_recv_byte.get(i) = sz_recv.get(i) * sizeof(typename T::value_type);
				}
				else
incardon's avatar
incardon committed
155
				{std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option or NO_CHANGE_ELEMENTS" << std::endl;}
incardon's avatar
incardon committed
156

incardon's avatar
incardon committed
157
				self_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
incardon's avatar
incardon committed
158
159
160
161
											prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte.getPointer(),msg_alloc_known,(void *)&bi);
			}
			else
			{
incardon's avatar
incardon committed
162
				self_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
incardon's avatar
incardon committed
163
											prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc_known,(void *)&bi);
incardon's avatar
incardon committed
164
				sz_recv_byte = self_base::sz_recv_tmp;
incardon's avatar
incardon committed
165
			}
incardon's avatar
incardon committed
166
167
168
		}
		else
		{
incardon's avatar
incardon committed
169
			self_base::tags.clear();
incardon's avatar
incardon committed
170
			prc_recv.clear();
incardon's avatar
incardon committed
171
			self_base::sendrecvMultipleMessagesNBX(prc_send_.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
incardon's avatar
incardon committed
172
173
174
		}

		// Reorder the buffer
incardon's avatar
incardon committed
175
		reorder_buffer(prc_recv,self_base::tags,sz_recv_byte);
incardon's avatar
incardon committed
176
177
178
179
180
181
182
183
184
185
186
187

		mem.decRef();
		delete &mem;
	}


	/*! \brief Reset the receive buffer
	 *
	 *
	 */
	void reset_recv_buf()
	{
incardon's avatar
incardon committed
188
189
		for (size_t i = 0 ; i < self_base::recv_buf.size() ; i++)
		{self_base::recv_buf.get(i).resize(0);}
incardon's avatar
incardon committed
190

incardon's avatar
incardon committed
191
		self_base::recv_buf.resize(0);
incardon's avatar
incardon committed
192
193
194
195
196
197
198
199
200
	}

	/*! \brief Base info
	 *
	 * \param recv_buf receive buffers
	 * \param prc processors involved
	 * \param size of the received data
	 *
	 */
201
	template<typename Memory>
incardon's avatar
incardon committed
202
203
204
	struct base_info
	{
		//! Receive buffer
incardon's avatar
incardon committed
205
		openfpm::vector_fr<BMemory<Memory>> * recv_buf;
incardon's avatar
incardon committed
206
207
208
209
		//! receiving processor list
		openfpm::vector<size_t> & prc;
		//! size of each message
		openfpm::vector<size_t> & sz;
incardon's avatar
incardon committed
210
211
		//! tags
		openfpm::vector<size_t> &tags;
incardon's avatar
incardon committed
212

incardon's avatar
incardon committed
213
214
215
		//! options
		size_t opt;

incardon's avatar
incardon committed
216
		//! constructor
incardon's avatar
incardon committed
217
		base_info(openfpm::vector_fr<BMemory<Memory>> * recv_buf, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, openfpm::vector<size_t> & tags,size_t opt)
incardon's avatar
incardon committed
218
		:recv_buf(recv_buf),prc(prc),sz(sz),tags(tags),opt(opt)
incardon's avatar
incardon committed
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
		{}
	};

	/*! \brief Call-back to allocate buffer to receive data
	 *
	 * \param msg_i size required to receive the message from i
	 * \param total_msg total size to receive from all the processors
	 * \param total_p the total number of processor that want to communicate with you
	 * \param i processor id
	 * \param ri request id (it is an id that goes from 0 to total_p, and is unique
	 *           every time message_alloc is called)
	 * \param ptr a pointer to the vector_dist structure
	 *
	 * \return the pointer where to store the message for the processor i
	 *
	 */
incardon's avatar
incardon committed
235
	static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
incardon's avatar
incardon committed
236
	{
237
		base_info<InternalMemory> & rinfo = *(base_info<InternalMemory> *)ptr;
incardon's avatar
incardon committed
238
239
240
241
242
243
244
245
246
247
248
249
250
251

		if (rinfo.recv_buf == NULL)
		{
			std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
			return NULL;
		}

		rinfo.recv_buf->resize(ri+1);

		rinfo.recv_buf->get(ri).resize(msg_i);

		// Receive info
		rinfo.prc.add(i);
		rinfo.sz.add(msg_i);
incardon's avatar
incardon committed
252
		rinfo.tags.add(tag);
incardon's avatar
incardon committed
253
254

		// return the pointer
incardon's avatar
incardon committed
255
256
257
258
259

		// If we have GPU direct activated use directly the cuda buffer
		if (rinfo.opt & MPI_GPU_DIRECT)
		{
#if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
incardon's avatar
incardon committed
260
			return rinfo.recv_buf->last().getDevicePointer();
incardon's avatar
incardon committed
261
262
263
264
265
#else
			return rinfo.recv_buf->last().getPointer();
#endif
		}

incardon's avatar
incardon committed
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
		return rinfo.recv_buf->last().getPointer();
	}


	/*! \brief Call-back to allocate buffer to receive data
	 *
	 * \param msg_i size required to receive the message from i
	 * \param total_msg total size to receive from all the processors
	 * \param total_p the total number of processor that want to communicate with you
	 * \param i processor id
	 * \param ri request id (it is an id that goes from 0 to total_p, and is unique
	 *           every time message_alloc is called)
	 * \param ptr a pointer to the vector_dist structure
	 *
	 * \return the pointer where to store the message for the processor i
	 *
	 */
incardon's avatar
incardon committed
283
	static void * msg_alloc_known(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
incardon's avatar
incardon committed
284
	{
285
		base_info<InternalMemory> & rinfo = *(base_info<InternalMemory> *)ptr;
incardon's avatar
incardon committed
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302

		if (rinfo.recv_buf == NULL)
		{
			std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
			return NULL;
		}

		rinfo.recv_buf->resize(ri+1);

		rinfo.recv_buf->get(ri).resize(msg_i);

		// return the pointer
		return rinfo.recv_buf->last().getPointer();
	}
	
	/*! \brief Process the receive buffer
	 *
303
	 * \tparam op operation to do in merging the received data
incardon's avatar
incardon committed
304
305
306
307
308
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 * \tparam prp properties to receive
	 *
	 * \param recv receive object
309
310
311
	 * \param sz vector that store how many element has been added per processors on S
	 * \param sz_byte byte received on a per processor base
	 * \param op_param operation to do in merging the received information with recv
incardon's avatar
incardon committed
312
313
	 *
	 */
incardon's avatar
incardon committed
314
	template<typename op, typename T, typename S, template <typename> class layout_base ,unsigned int ... prp >
315
316
317
	void process_receive_buffer_with_prp(S & recv,
			                             openfpm::vector<size_t> * sz,
										 openfpm::vector<size_t> * sz_byte,
incardon's avatar
incardon committed
318
319
										 op & op_param,
										 size_t opt)
incardon's avatar
incardon committed
320
321
	{
		if (sz != NULL)
incardon's avatar
incardon committed
322
		{sz->resize(self_base::recv_buf.size());}
incardon's avatar
incardon committed
323

incardon's avatar
incardon committed
324
		pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op, T, S, layout_base, prp... >::unpacking(recv, self_base::recv_buf, sz, sz_byte, op_param,opt);
incardon's avatar
incardon committed
325
326
327
328
329
330
331
332
333
334
335
	}

	public:

	/*! \brief Constructor
	 *
	 * \param argc main number of arguments
	 * \param argv main set of arguments
	 *
	 */
	Vcluster(int *argc, char ***argv)
incardon's avatar
incardon committed
336
	:Vcluster_base<InternalMemory>(argc,argv)
incardon's avatar
incardon committed
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
	{
	}

	/*! \brief Semantic Gather, gather the data from all processors into one node
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * Gather(T,S,root,op=add);
	 *
	 * "Gather" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add(T).
	 *
	 * ### Example send a vector of structures, and merge all together in one vector
	 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master
	 *
	 * ### Example send a vector of structures, and merge all together in one vector
	 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 *
360
361
	 * \param send Object to send
	 * \param recv Object to receive
incardon's avatar
incardon committed
362
363
364
365
366
	 * \param root witch node should collect the information
	 *
	 * \return true if the function completed succefully
	 *
	 */
incardon's avatar
incardon committed
367
	template<typename T, typename S, template <typename> class layout_base=memory_traits_lin> bool SGather(T & send, S & recv,size_t root)
incardon's avatar
incardon committed
368
369
370
371
	{
		openfpm::vector<size_t> prc;
		openfpm::vector<size_t> sz;

incardon's avatar
incardon committed
372
		return SGather<T,S,layout_base>(send,recv,prc,sz,root);
incardon's avatar
incardon committed
373
374
	}

375
	//! metafunction
incardon's avatar
incardon committed
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
	template<size_t index, size_t N> struct MetaFuncOrd {
	   enum { value = index };
	};

	/*! \brief Semantic Gather, gather the data from all processors into one node
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * Gather(T,S,root,op=add);
	 *
	 * "Gather" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add(T).
	 *
	 * ### Example send a vector of structures, and merge all together in one vector
	 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master
	 *
	 * ### Example send a vector of structures, and merge all together in one vector
	 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 *
400
401
	 * \param send Object to send
	 * \param recv Object to receive
incardon's avatar
incardon committed
402
403
404
405
406
407
408
	 * \param root witch node should collect the information
	 * \param prc processors from witch we received the information
	 * \param sz size of the received information for each processor
	 *
	 * \return true if the function completed succefully
	 *
	 */
incardon's avatar
incardon committed
409
410
411
412
413
414
415
416
	template<typename T,
	         typename S,
			 template <typename> class layout_base = memory_traits_lin>
	bool SGather(T & send,
			     S & recv,
				 openfpm::vector<size_t> & prc,
				 openfpm::vector<size_t> & sz,
				 size_t root)
incardon's avatar
incardon committed
417
	{
incardon's avatar
incardon committed
418
#ifdef SE_CLASS1
incardon's avatar
incardon committed
419
		if (&send == (T *)&recv)
incardon's avatar
incardon committed
420
421
422
		{std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " using SGather in general the sending object and the receiving object must be different" << std::endl;}
#endif

incardon's avatar
incardon committed
423
424
425
426
		// Reset the receive buffer
		reset_recv_buf();

		// If we are on master collect the information
incardon's avatar
incardon committed
427
		if (self_base::getProcessUnitID() == root)
incardon's avatar
incardon committed
428
429
430
431
432
		{
			// send buffer (master does not send anything) so send req and send_buf
			// remain buffer with size 0
			openfpm::vector<size_t> send_req;

incardon's avatar
incardon committed
433
			self_base::tags.clear();
incardon's avatar
incardon committed
434

incardon's avatar
incardon committed
435
			// receive information
436
			base_info<InternalMemory> bi(&this->recv_buf,prc,sz,this->tags,0);
incardon's avatar
incardon committed
437
438

			// Send and recv multiple messages
incardon's avatar
incardon committed
439
			self_base::sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
incardon's avatar
incardon committed
440

incardon's avatar
incardon committed
441
			// we generate the list of the properties to unpack
incardon's avatar
incardon committed
442
443
444
445
446
			typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;

			// operation object
			op_ssend_recv_add<void> opa;

incardon's avatar
incardon committed
447
			// Reorder the buffer
incardon's avatar
incardon committed
448
			reorder_buffer(prc,self_base::tags,sz);
incardon's avatar
incardon committed
449

incardon's avatar
incardon committed
450
			index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz,NULL,opa,0);
incardon's avatar
incardon committed
451
452
453
454
455
456
457
458
459
460

			recv.add(send);
			prc.add(root);
			sz.add(send.size());
		}
		else
		{
			// send buffer (master does not send anything) so send req and send_buf
			// remain buffer with size 0
			openfpm::vector<size_t> send_prc;
incardon's avatar
incardon committed
461
			openfpm::vector<size_t> send_prc_;
incardon's avatar
incardon committed
462
463
464
465
466
467
468
469
470
471
			send_prc.add(root);

			openfpm::vector<size_t> sz;

			openfpm::vector<const void *> send_buf;
				
			//Pack requesting

			size_t tot_size = 0;

incardon's avatar
incardon committed
472
			pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packingRequest(send, tot_size, sz);
incardon's avatar
incardon committed
473
474
475
476
477
478
479
480
481
482

			HeapMemory pmem;

			ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem));
			mem.incRef();

			//Packing

			Pack_stat sts;
			
incardon's avatar
incardon committed
483
			pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packing(mem, send, sts, send_buf);
incardon's avatar
incardon committed
484

incardon's avatar
incardon committed
485
486
			pack_unpack_cond_with_prp_inte_lin<T>::construct_prc(send_prc,send_prc_);

incardon's avatar
incardon committed
487
			self_base::tags.clear();
incardon's avatar
incardon committed
488

incardon's avatar
incardon committed
489
			// receive information
490
			base_info<InternalMemory> bi(NULL,prc,sz,self_base::tags,0);
incardon's avatar
incardon committed
491
492

			// Send and recv multiple messages
incardon's avatar
incardon committed
493
			self_base::sendrecvMultipleMessagesNBX(send_prc_.size(),(size_t *)sz.getPointer(),(size_t *)send_prc_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi,NONE);
incardon's avatar
incardon committed
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518

			mem.decRef();
			delete &mem;
		}
		
		return true;
	}

	/*! \brief Semantic Scatter, scatter the data from one processor to the other node
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * Scatter(T,S,...,op=add);
	 *
	 * "Scatter" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
	 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 *
519
520
	 * \param send Object to send
	 * \param recv Object to receive
incardon's avatar
incardon committed
521
522
523
524
525
526
527
	 * \param prc processor involved in the scatter
	 * \param sz size of each chunks
	 * \param root which processor should scatter the information
	 *
	 * \return true if the function completed succefully
	 *
	 */
incardon's avatar
incardon committed
528
	template<typename T, typename S, template <typename> class layout_base=memory_traits_lin> bool SScatter(T & send, S & recv, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, size_t root)
incardon's avatar
incardon committed
529
530
531
532
533
	{
		// Reset the receive buffer
		reset_recv_buf();

		// If we are on master scatter the information
incardon's avatar
incardon committed
534
		if (self_base::getProcessUnitID() == root)
incardon's avatar
incardon committed
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
		{
			// Prepare the sending buffer
			openfpm::vector<const void *> send_buf;


			openfpm::vector<size_t> sz_byte;
			sz_byte.resize(sz.size());

			size_t ptr = 0;

			for (size_t i = 0; i < sz.size() ; i++)
			{
				send_buf.add((char *)send.getPointer() + sizeof(typename T::value_type)*ptr );
				sz_byte.get(i) = sz.get(i) * sizeof(typename T::value_type);
				ptr += sz.get(i);
			}

incardon's avatar
incardon committed
552
			self_base::tags.clear();
incardon's avatar
incardon committed
553

incardon's avatar
incardon committed
554
			// receive information
555
			base_info<InternalMemory> bi(&this->recv_buf,prc,sz,this->tags,0);
incardon's avatar
incardon committed
556
557

			// Send and recv multiple messages
incardon's avatar
incardon committed
558
			self_base::sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
incardon's avatar
incardon committed
559
560
561
562
563
564
565

			// we generate the list of the properties to pack
			typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;

			// operation object
			op_ssend_recv_add<void> opa;

incardon's avatar
incardon committed
566
			index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa,0);
incardon's avatar
incardon committed
567
568
569
570
571
572
		}
		else
		{
			// The non-root receive
			openfpm::vector<size_t> send_req;

incardon's avatar
incardon committed
573
			self_base::tags.clear();
incardon's avatar
incardon committed
574

incardon's avatar
incardon committed
575
			// receive information
576
			base_info<InternalMemory> bi(&this->recv_buf,prc,sz,this->tags,0);
incardon's avatar
incardon committed
577
578

			// Send and recv multiple messages
incardon's avatar
incardon committed
579
			self_base::sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
incardon's avatar
incardon committed
580
581
582
583
584
585
586

			// we generate the list of the properties to pack
			typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;

			// operation object
			op_ssend_recv_add<void> opa;

incardon's avatar
incardon committed
587
			index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa,0);
incardon's avatar
incardon committed
588
589
590
591
592
593
594
595
		}

		return true;
	}
	
	/*! \brief reorder the receiving buffer
	 *
	 * \param prc list of the receiving processors
596
	 * \param sz_recv list of size of the receiving messages (in byte)
incardon's avatar
incardon committed
597
598
	 *
	 */
incardon's avatar
incardon committed
599
	void reorder_buffer(openfpm::vector<size_t> & prc, const openfpm::vector<size_t> & tags, openfpm::vector<size_t> & sz_recv)
incardon's avatar
incardon committed
600
601
602
603
604
605
606
	{

		struct recv_buff_reorder
		{
			//! processor
			size_t proc;

incardon's avatar
incardon committed
607
608
			size_t tag;

incardon's avatar
incardon committed
609
610
611
612
613
			//! position in the receive list
			size_t pos;

			//! default constructor
			recv_buff_reorder()
incardon's avatar
incardon committed
614
			:proc(0),tag(0),pos(0)
incardon's avatar
incardon committed
615
616
617
618
619
			{};

			//! needed to reorder
			bool operator<(const recv_buff_reorder & rd) const
			{
incardon's avatar
incardon committed
620
621
622
623
				if (proc == rd.proc)
				{return tag < rd.tag;}

				return (proc < rd.proc);
incardon's avatar
incardon committed
624
625
626
627
628
			}
		};

		openfpm::vector<recv_buff_reorder> rcv;

incardon's avatar
incardon committed
629
		rcv.resize(self_base::recv_buf.size());
incardon's avatar
incardon committed
630
631
632
633

		for (size_t i = 0 ; i < rcv.size() ; i++)
		{
			rcv.get(i).proc = prc.get(i);
incardon's avatar
incardon committed
634
635
636
637
			if (i < tags.size())
			{rcv.get(i).tag = tags.get(i);}
			else
			{rcv.get(i).tag = (unsigned int)-1;}
incardon's avatar
incardon committed
638
639
640
641
642
643
			rcv.get(i).pos = i;
		}

		// we sort based on processor
		rcv.sort();

incardon's avatar
incardon committed
644
		openfpm::vector_fr<BMemory<InternalMemory>> recv_ord;
incardon's avatar
incardon committed
645
646
647
648
649
650
651
652
653
654
655
		recv_ord.resize(rcv.size());

		openfpm::vector<size_t> prc_ord;
		prc_ord.resize(rcv.size());

		openfpm::vector<size_t> sz_recv_ord;
		sz_recv_ord.resize(rcv.size());

		// Now we reorder rcv
		for (size_t i = 0 ; i < rcv.size() ; i++)
		{
incardon's avatar
incardon committed
656
			recv_ord.get(i).swap(self_base::recv_buf.get(rcv.get(i).pos));
incardon's avatar
incardon committed
657
658
659
660
661
			prc_ord.get(i) = rcv.get(i).proc;
			sz_recv_ord.get(i) = sz_recv.get(rcv.get(i).pos);
		}

		// move rcv into recv
incardon's avatar
incardon committed
662
663
664
665
666
667
		// Now we swap back to recv_buf in an ordered way
		for (size_t i = 0 ; i < rcv.size() ; i++)
		{
			self_base::recv_buf.get(i).swap(recv_ord.get(i));
		}

incardon's avatar
incardon committed
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
		prc.swap(prc_ord);
		sz_recv.swap(sz_recv_ord);

		// reorder prc_recv and recv_sz
	}

	/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * Recv(T,S,...,op=add);
	 *
	 * "SendRecv" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
686
	 * \snippet VCluster_semantic_unit_tests.hpp dsde with complex objects1
incardon's avatar
incardon committed
687
688
689
690
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 *
691
692
693
694
695
696
	 * \param send Object to send
	 * \param recv Object to receive
	 * \param prc_send destination processors
	 * \param prc_recv list of the receiving processors
	 * \param sz_recv number of elements added
	 * \param opt options
incardon's avatar
incardon committed
697
698
699
700
	 *
	 * \return true if the function completed succefully
	 *
	 */
incardon's avatar
incardon committed
701
702
703
	template<typename T,
	         typename S,
			 template <typename> class layout_base = memory_traits_lin>
incardon's avatar
incardon committed
704
705
	bool SSendRecv(openfpm::vector<T> & send,
			       S & recv,
incardon's avatar
incardon committed
706
707
708
709
				   openfpm::vector<size_t> & prc_send,
				   openfpm::vector<size_t> & prc_recv,
				   openfpm::vector<size_t> & sz_recv,
				   size_t opt = NONE)
incardon's avatar
incardon committed
710
	{
incardon's avatar
incardon committed
711
		prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
incardon's avatar
incardon committed
712
713
714
715
716
717

		// we generate the list of the properties to pack
		typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;

		op_ssend_recv_add<void> opa;

incardon's avatar
incardon committed
718
		index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz_recv,NULL,opa,opt);
incardon's avatar
incardon committed
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741

		return true;
	}


	/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * SSendRecv(T,S,...,op=add);
	 *
	 * "SendRecv" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add<prp...>(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
	 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 * \tparam prp properties for merging
	 *
742
743
744
745
746
747
	 * \param send Object to send
	 * \param recv Object to receive
	 * \param prc_send destination processors
	 * \param prc_recv processors from which we received
	 * \param sz_recv number of elements added per processor
	 * \param sz_recv_byte message received from each processor in byte
incardon's avatar
incardon committed
748
	 *
749
	 * \return true if the function completed successful
incardon's avatar
incardon committed
750
751
	 *
	 */
incardon's avatar
incardon committed
752
	template<typename T, typename S, template <typename> class layout_base, int ... prp> bool SSendRecvP(openfpm::vector<T> & send,
753
754
755
756
			                                                      S & recv,
																  openfpm::vector<size_t> & prc_send,
																  openfpm::vector<size_t> & prc_recv,
																  openfpm::vector<size_t> & sz_recv,
incardon's avatar
incardon committed
757
758
																  openfpm::vector<size_t> & sz_recv_byte,
																  size_t opt = NONE)
incardon's avatar
incardon committed
759
	{
incardon's avatar
incardon committed
760
		prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
incardon's avatar
incardon committed
761
762
763
764
765

		// operation object
		op_ssend_recv_add<void> opa;

		// process the received information
766
		process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,&sz_recv_byte,opa,opt);
incardon's avatar
incardon committed
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789

		return true;
	}


	/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * SSendRecv(T,S,...,op=add);
	 *
	 * "SendRecv" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add<prp...>(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
	 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 * \tparam prp properties for merging
	 *
790
791
792
793
794
	 * \param send Object to send
	 * \param recv Object to receive
	 * \param prc_send destination processors
	 * \param prc_recv list of the processors from which we receive
	 * \param sz_recv number of elements added per processors
incardon's avatar
incardon committed
795
796
797
798
	 *
	 * \return true if the function completed succefully
	 *
	 */
incardon's avatar
incardon committed
799
800
801
802
803
	template<typename T, typename S, template <typename> class layout_base, int ... prp>
	bool SSendRecvP(openfpm::vector<T> & send,
			        S & recv,
					openfpm::vector<size_t> & prc_send,
			    	openfpm::vector<size_t> & prc_recv,
incardon's avatar
incardon committed
804
805
					openfpm::vector<size_t> & sz_recv,
					size_t opt = NONE)
incardon's avatar
incardon committed
806
	{
incardon's avatar
incardon committed
807
		prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
incardon's avatar
incardon committed
808
809
810
811
812

		// operation object
		op_ssend_recv_add<void> opa;

		// process the received information
incardon's avatar
incardon committed
813
		process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,NULL,opa,opt);
incardon's avatar
incardon committed
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836

		return true;
	}

	/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * SSendRecv(T,S,...,op=add);
	 *
	 * "SendRecv" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add<prp...>(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
	 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
	 *
	 * \tparam op type of operation
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 * \tparam prp properties for merging
	 *
837
838
839
840
841
	 * \param send Object to send
	 * \param recv Object to receive
	 * \param prc_send destination processors
	 * \param op_param operation object (operation to do im merging the information)
	 * \param recv_sz size of each receiving buffer. This parameters are output
incardon's avatar
incardon committed
842
843
844
845
846
847
848
849
850
	 *        with RECEIVE_KNOWN you must feed this parameter
	 * \param prc_recv from which processor we receive messages
	 *        with RECEIVE_KNOWN you must feed this parameter
	 * \param opt options default is NONE, another is RECEIVE_KNOWN. In this case each
	 *        processor is assumed to know from which processor receive, and the size of
	 *        the message. in such case prc_recv and sz_recv are not anymore parameters
	 *        but must be input.
	 *
	 *
851
	 * \return true if the function completed successful
incardon's avatar
incardon committed
852
853
	 *
	 */
incardon's avatar
incardon committed
854
855
856
857
858
859
860
861
862
863
864
865
	template<typename op,
	         typename T,
			 typename S,
			 template <typename> class layout_base,
			 int ... prp>
	bool SSendRecvP_op(openfpm::vector<T> & send,
			           S & recv,
					   openfpm::vector<size_t> & prc_send,
					   op & op_param,
					   openfpm::vector<size_t> & prc_recv,
					   openfpm::vector<size_t> & recv_sz,
				 	   size_t opt = NONE)
incardon's avatar
incardon committed
866
	{
incardon's avatar
incardon committed
867
		prepare_send_buffer<op,T,S,layout_base>(send,recv,prc_send,prc_recv,recv_sz,opt);
incardon's avatar
incardon committed
868
869

		// process the received information
870
		process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,NULL,NULL,op_param,opt);
incardon's avatar
incardon committed
871
872
873
874
875
876
877
878
879
880

		return true;
	}

};



// Function to initialize the global VCluster //

881
882
extern Vcluster<> * global_v_cluster_private_heap;
extern Vcluster<CudaMemory> * global_v_cluster_private_cuda;
incardon's avatar
incardon committed
883
884
885
886
887
888
889
890
891

/*! \brief Initialize a global instance of Runtime Virtual Cluster Machine
 *
 * Initialize a global instance of Runtime Virtual Cluster Machine
 *
 */

static inline void init_global_v_cluster_private(int *argc, char ***argv)
{
892
893
894
895
896
	if (global_v_cluster_private_heap == NULL)
	{global_v_cluster_private_heap = new Vcluster<>(argc,argv);}

	if (global_v_cluster_private_cuda == NULL)
	{global_v_cluster_private_cuda = new Vcluster<CudaMemory>(argc,argv);}
incardon's avatar
incardon committed
897
898
899
900
}

static inline void delete_global_v_cluster_private()
{
901
902
	delete global_v_cluster_private_heap;
	delete global_v_cluster_private_cuda;
incardon's avatar
incardon committed
903
904
}

905
906
template<typename Memory>
struct get_vcl
incardon's avatar
incardon committed
907
{
908
909
910
911
912
	static Vcluster<Memory> & get()
	{
		return *global_v_cluster_private_heap;
	}
};
incardon's avatar
incardon committed
913

914
915
916
917
918
919
920
921
template<>
struct get_vcl<CudaMemory>
{
	static Vcluster<CudaMemory> & get()
	{
		return *global_v_cluster_private_cuda;
	}
};
incardon's avatar
incardon committed
922

923
924
925
926
927
template<typename Memory = HeapMemory>
static inline Vcluster<Memory> & create_vcluster()
{
	if (global_v_cluster_private_heap == NULL)
	{std::cerr << __FILE__ << ":" << __LINE__ << " Error you must call openfpm_init before using any distributed data structures";}
incardon's avatar
incardon committed
928

929
	return get_vcl<Memory>::get();
incardon's avatar
incardon committed
930
931
932
933
934
935
936
937
938
939
940
941
942
943
}



/*! \brief Check if the library has been initialized
 *
 * \return true if the library has been initialized
 *
 */
static inline bool is_openfpm_init()
{
	return ofp_initialized;
}

incardon's avatar
incardon committed
944

incardon's avatar
incardon committed
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
/*! \brief Initialize the library
 *
 * This function MUST be called before any other function
 *
 */
static inline void openfpm_init(int *argc, char ***argv)
{
#ifdef HAVE_PETSC

	PetscInitialize(argc,argv,NULL,NULL);

#endif

	init_global_v_cluster_private(argc,argv);

#ifdef SE_CLASS1
	std::cout << "OpenFPM is compiled with debug mode LEVEL:1. Remember to remove SE_CLASS1 when you go in production" << std::endl;
#endif

#ifdef SE_CLASS2
	std::cout << "OpenFPM is compiled with debug mode LEVEL:2. Remember to remove SE_CLASS2 when you go in production" << std::endl;
966
#endif
incardon's avatar
incardon committed
967

968
969
#ifdef SE_CLASS3
	std::cout << "OpenFPM is compiled with debug mode LEVEL:3. Remember to remove SE_CLASS3 when you go in production" << std::endl;
incardon's avatar
incardon committed
970
971
#endif

972
973
974
975
976
977
978
979
980
981
982
983
984
	// install segmentation fault signal handler

	struct sigaction sa;

	sa.sa_sigaction = bt_sighandler;
	sigemptyset(&sa.sa_mask);
	sa.sa_flags = SA_RESTART;

	sigaction(SIGSEGV, &sa, NULL);

	if (*argc != 0)
		program_name = std::string(*argv[0]);

incardon's avatar
incardon committed
985
986
987
	// Initialize math pre-computation tables
	openfpm::math::init_getFactorization();

incardon's avatar
incardon committed
988
989
990
	ofp_initialized = true;
}

991

incardon's avatar
incardon committed
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
/*! \brief Finalize the library
 *
 * This function MUST be called at the end of the program
 *
 */
static inline void openfpm_finalize()
{
#ifdef HAVE_PETSC

	PetscFinalize();

#endif

	delete_global_v_cluster_private();
	ofp_initialized = false;
}


#endif