VCluster.hpp 28.6 KB
Newer Older
incardon's avatar
incardon committed
1
2
3
4
5
6
7
8
9
10
/*
 * Vcluster.hpp
 *
 *  Created on: Feb 8, 2016
 *      Author: Pietro Incardona
 */

#ifndef VCLUSTER_HPP
#define VCLUSTER_HPP

11
#include <signal.h>
incardon's avatar
incardon committed
12

incardon's avatar
incardon committed
13
14
#include "VCluster_base.hpp"
#include "VCluster_meta_function.hpp"
incardon's avatar
incardon committed
15
#include "util/math_util_complex.hpp"
incardon's avatar
incardon committed
16

17
void bt_sighandler(int sig, siginfo_t * info, void * ctx);
incardon's avatar
incardon committed
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

/*! \brief Implementation of VCluster class
 *
 * This class implement communication functions. Like summation, minimum and maximum across
 * processors, or Dynamic Sparse Data Exchange (DSDE)
 *
 * ## Vcluster Min max sum
 * \snippet VCluster_unit_tests.hpp max min sum
 *
 * ## Vcluster all gather
 * \snippet VCluster_unit_test_util.hpp allGather numbers
 *
 * ## Dynamic sparse data exchange with complex objects
 * \snippet VCluster_semantic_unit_tests.hpp dsde with complex objects1
 *
 * ## Dynamic sparse data exchange with buffers
 * \snippet VCluster_unit_test_util.hpp dsde
 * \snippet VCluster_unit_test_util.hpp message alloc
 *
 */
class Vcluster: public Vcluster_base
{
	template<typename T>
	struct index_gen {};

	//! Process the receive buffer using the specified properties (meta-function)
	template<int ... prp>
	struct index_gen<index_tuple<prp...>>
	{
		//! Process the receive buffer
incardon's avatar
incardon committed
48
		template<typename op, typename T, typename S,template <typename> class layout_base> inline static void process_recv(Vcluster & vcl, S & recv, openfpm::vector<size_t> * sz_recv, openfpm::vector<size_t> * sz_recv_byte, op & op_param)
incardon's avatar
incardon committed
49
		{
incardon's avatar
incardon committed
50
			vcl.process_receive_buffer_with_prp<op,T,S,layout_base, prp...>(recv,sz_recv,sz_recv_byte,op_param);
incardon's avatar
incardon committed
51
52
53
		}
	};

54
	/*! \brief Prepare the send buffer and send the message to other processors
incardon's avatar
incardon committed
55
56
57
58
59
60
61
62
63
	 *
	 * \tparam op Operation to execute in merging the receiving data
	 * \tparam T sending object
	 * \tparam S receiving object
	 *
	 * \note T and S must not be the same object but a S.operation(T) must be defined. There the flexibility
	 * of the operation is defined by op
	 *
	 * \param send sending buffer
64
	 * \param recv receiving object
incardon's avatar
incardon committed
65
66
67
	 * \param prc_send each object T in the vector send is sent to one processor specified in this list.
	 *                 This mean that prc_send.size() == send.size()
	 * \param prc_recv list of processor from where we receive (output), in case of RECEIVE_KNOWN muts be filled
68
	 * \param sz_recv size of each receiving message (output), in case of RECEICE_KNOWN must be filled
incardon's avatar
incardon committed
69
70
71
	 * \param opt Options using RECEIVE_KNOWN enable patters with less latencies, in case of RECEIVE_KNOWN
	 *
	 */
incardon's avatar
incardon committed
72
	template<typename op, typename T, typename S, template <typename> class layout_base> void prepare_send_buffer(openfpm::vector<T> & send,
73
74
75
76
77
			                                                               S & recv,
																		   openfpm::vector<size_t> & prc_send,
																		   openfpm::vector<size_t> & prc_recv,
																		   openfpm::vector<size_t> & sz_recv,
																		   size_t opt)
incardon's avatar
incardon committed
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
	{
		openfpm::vector<size_t> sz_recv_byte(sz_recv.size());

		// Reset the receive buffer
		reset_recv_buf();

	#ifdef SE_CLASS1

		if (send.size() != prc_send.size())
			std::cerr << __FILE__ << ":" << __LINE__ << " Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl;

	#endif

		// Prepare the sending buffer
		openfpm::vector<const void *> send_buf;
		openfpm::vector<size_t> send_sz_byte;
incardon's avatar
incardon committed
94
		openfpm::vector<size_t> prc_send_;
incardon's avatar
incardon committed
95
96
97
98
99
100
101
102

		size_t tot_size = 0;

		for (size_t i = 0; i < send.size() ; i++)
		{
			size_t req = 0;

			//Pack requesting
incardon's avatar
incardon committed
103
			pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op, T, S, layout_base>::packingRequest(send.get(i), req, send_sz_byte);
incardon's avatar
incardon committed
104
105
106
			tot_size += req;
		}

incardon's avatar
incardon committed
107
108
		pack_unpack_cond_with_prp_inte_lin<T>::construct_prc(prc_send,prc_send_);

incardon's avatar
incardon committed
109
110
111
112
113
114
115
116
117
118
119
		HeapMemory pmem;

		ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem));
		mem.incRef();

		for (size_t i = 0; i < send.size() ; i++)
		{
			//Packing

			Pack_stat sts;

incardon's avatar
incardon committed
120
			pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value, op, T, S, layout_base>::packing(mem, send.get(i), sts, send_buf);
incardon's avatar
incardon committed
121
122
		}

incardon's avatar
incardon committed
123
124
		tags.clear();

incardon's avatar
incardon committed
125
		// receive information
incardon's avatar
incardon committed
126
		base_info bi(&recv_buf,prc_recv,sz_recv_byte,tags);
incardon's avatar
incardon committed
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149

		// Send and recv multiple messages
		if (opt & RECEIVE_KNOWN)
		{
			// We we are passing the number of element but not the byte, calculate the byte
			if (opt & KNOWN_ELEMENT_OR_BYTE)
			{
				// We know the number of element convert to byte (ONLY if it is possible)
				if (has_pack_gen<typename T::value_type>::value == false && is_vector<T>::value == true)
				{
					for (size_t i = 0 ; i < sz_recv.size() ; i++)
						sz_recv_byte.get(i) = sz_recv.get(i) * sizeof(typename T::value_type);
				}
				else
					std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option RECEIVE_KNOWN or NO_CHANGE_ELEMENTS" << std::endl;
			}

			Vcluster_base::sendrecvMultipleMessagesNBX(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
										prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte.getPointer(),msg_alloc_known,(void *)&bi);
		}
		else
		{
			prc_recv.clear();
incardon's avatar
incardon committed
150
			sendrecvMultipleMessagesNBX(prc_send_.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
incardon's avatar
incardon committed
151
152
153
		}

		// Reorder the buffer
incardon's avatar
incardon committed
154
		reorder_buffer(prc_recv,tags,sz_recv_byte);
incardon's avatar
incardon committed
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187

		mem.decRef();
		delete &mem;
	}


	/*! \brief Reset the receive buffer
	 *
	 *
	 */
	void reset_recv_buf()
	{
		for (size_t i = 0 ; i < recv_buf.size() ; i++)
			recv_buf.get(i).resize(0);

		recv_buf.resize(0);
	}

	/*! \brief Base info
	 *
	 * \param recv_buf receive buffers
	 * \param prc processors involved
	 * \param size of the received data
	 *
	 */
	struct base_info
	{
		//! Receive buffer
		openfpm::vector<BHeapMemory> * recv_buf;
		//! receiving processor list
		openfpm::vector<size_t> & prc;
		//! size of each message
		openfpm::vector<size_t> & sz;
incardon's avatar
incardon committed
188
189
		//! tags
		openfpm::vector<size_t> &tags;
incardon's avatar
incardon committed
190
191

		//! constructor
incardon's avatar
incardon committed
192
193
		base_info(openfpm::vector<BHeapMemory> * recv_buf, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, openfpm::vector<size_t> & tags)
		:recv_buf(recv_buf),prc(prc),sz(sz),tags(tags)
incardon's avatar
incardon committed
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
		{}
	};

	/*! \brief Call-back to allocate buffer to receive data
	 *
	 * \param msg_i size required to receive the message from i
	 * \param total_msg total size to receive from all the processors
	 * \param total_p the total number of processor that want to communicate with you
	 * \param i processor id
	 * \param ri request id (it is an id that goes from 0 to total_p, and is unique
	 *           every time message_alloc is called)
	 * \param ptr a pointer to the vector_dist structure
	 *
	 * \return the pointer where to store the message for the processor i
	 *
	 */
incardon's avatar
incardon committed
210
	static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
incardon's avatar
incardon committed
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
	{
		base_info & rinfo = *(base_info *)ptr;

		if (rinfo.recv_buf == NULL)
		{
			std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
			return NULL;
		}

		rinfo.recv_buf->resize(ri+1);

		rinfo.recv_buf->get(ri).resize(msg_i);

		// Receive info
		rinfo.prc.add(i);
		rinfo.sz.add(msg_i);
incardon's avatar
incardon committed
227
		rinfo.tags.add(tag);
incardon's avatar
incardon committed
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246

		// return the pointer
		return rinfo.recv_buf->last().getPointer();
	}


	/*! \brief Call-back to allocate buffer to receive data
	 *
	 * \param msg_i size required to receive the message from i
	 * \param total_msg total size to receive from all the processors
	 * \param total_p the total number of processor that want to communicate with you
	 * \param i processor id
	 * \param ri request id (it is an id that goes from 0 to total_p, and is unique
	 *           every time message_alloc is called)
	 * \param ptr a pointer to the vector_dist structure
	 *
	 * \return the pointer where to store the message for the processor i
	 *
	 */
incardon's avatar
incardon committed
247
	static void * msg_alloc_known(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
incardon's avatar
incardon committed
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
	{
		base_info & rinfo = *(base_info *)ptr;

		if (rinfo.recv_buf == NULL)
		{
			std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
			return NULL;
		}

		rinfo.recv_buf->resize(ri+1);

		rinfo.recv_buf->get(ri).resize(msg_i);

		// return the pointer
		return rinfo.recv_buf->last().getPointer();
	}
	
	/*! \brief Process the receive buffer
	 *
267
	 * \tparam op operation to do in merging the received data
incardon's avatar
incardon committed
268
269
270
271
272
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 * \tparam prp properties to receive
	 *
	 * \param recv receive object
273
274
275
	 * \param sz vector that store how many element has been added per processors on S
	 * \param sz_byte byte received on a per processor base
	 * \param op_param operation to do in merging the received information with recv
incardon's avatar
incardon committed
276
277
	 *
	 */
incardon's avatar
incardon committed
278
	template<typename op, typename T, typename S, template<typename> class layout_base, unsigned int ... prp >
279
280
281
282
	void process_receive_buffer_with_prp(S & recv,
			                             openfpm::vector<size_t> * sz,
										 openfpm::vector<size_t> * sz_byte,
										 op & op_param)
incardon's avatar
incardon committed
283
284
285
286
	{
		if (sz != NULL)
			sz->resize(recv_buf.size());

incardon's avatar
incardon committed
287
		pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op, T, S, layout_base, prp... >::unpacking(recv, recv_buf, sz, sz_byte, op_param);
incardon's avatar
incardon committed
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
	}

	public:

	/*! \brief Constructor
	 *
	 * \param argc main number of arguments
	 * \param argv main set of arguments
	 *
	 */
	Vcluster(int *argc, char ***argv)
	:Vcluster_base(argc,argv)
	{
	}

	/*! \brief Semantic Gather, gather the data from all processors into one node
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * Gather(T,S,root,op=add);
	 *
	 * "Gather" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add(T).
	 *
	 * ### Example send a vector of structures, and merge all together in one vector
	 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master
	 *
	 * ### Example send a vector of structures, and merge all together in one vector
	 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 *
323
324
	 * \param send Object to send
	 * \param recv Object to receive
incardon's avatar
incardon committed
325
326
327
328
329
330
331
332
333
334
335
336
337
	 * \param root witch node should collect the information
	 *
	 * \return true if the function completed succefully
	 *
	 */
	template<typename T, typename S> bool SGather(T & send, S & recv,size_t root)
	{
		openfpm::vector<size_t> prc;
		openfpm::vector<size_t> sz;

		return SGather(send,recv,prc,sz,root);
	}

338
	//! metafunction
incardon's avatar
incardon committed
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
	template<size_t index, size_t N> struct MetaFuncOrd {
	   enum { value = index };
	};

	/*! \brief Semantic Gather, gather the data from all processors into one node
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * Gather(T,S,root,op=add);
	 *
	 * "Gather" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add(T).
	 *
	 * ### Example send a vector of structures, and merge all together in one vector
	 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master
	 *
	 * ### Example send a vector of structures, and merge all together in one vector
	 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 *
363
364
	 * \param send Object to send
	 * \param recv Object to receive
incardon's avatar
incardon committed
365
366
367
368
369
370
371
	 * \param root witch node should collect the information
	 * \param prc processors from witch we received the information
	 * \param sz size of the received information for each processor
	 *
	 * \return true if the function completed succefully
	 *
	 */
incardon's avatar
incardon committed
372
	template<typename T, typename S, template <typename> class layout_base=memory_traits_lin> bool SGather(T & send,
373
374
375
376
			                                      S & recv,
												  openfpm::vector<size_t> & prc,
												  openfpm::vector<size_t> & sz,
												  size_t root)
incardon's avatar
incardon committed
377
	{
incardon's avatar
incardon committed
378
#ifdef SE_CLASS1
incardon's avatar
incardon committed
379
		if (&send == (T *)&recv)
incardon's avatar
incardon committed
380
381
382
		{std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " using SGather in general the sending object and the receiving object must be different" << std::endl;}
#endif

incardon's avatar
incardon committed
383
384
385
386
387
388
389
390
391
392
		// Reset the receive buffer
		reset_recv_buf();

		// If we are on master collect the information
		if (getProcessUnitID() == root)
		{
			// send buffer (master does not send anything) so send req and send_buf
			// remain buffer with size 0
			openfpm::vector<size_t> send_req;

incardon's avatar
incardon committed
393
394
			tags.clear();

incardon's avatar
incardon committed
395
			// receive information
incardon's avatar
incardon committed
396
			base_info bi(&recv_buf,prc,sz,tags);
incardon's avatar
incardon committed
397
398
399
400
401
402
403
404
405
406

			// Send and recv multiple messages
			sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);

			// we generate the list of the properties to pack
			typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;

			// operation object
			op_ssend_recv_add<void> opa;

incardon's avatar
incardon committed
407
			index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz,NULL,opa);
incardon's avatar
incardon committed
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427

			recv.add(send);
			prc.add(root);
			sz.add(send.size());
		}
		else
		{
			// send buffer (master does not send anything) so send req and send_buf
			// remain buffer with size 0
			openfpm::vector<size_t> send_prc;
			send_prc.add(root);

			openfpm::vector<size_t> sz;

			openfpm::vector<const void *> send_buf;
				
			//Pack requesting

			size_t tot_size = 0;

incardon's avatar
incardon committed
428
			pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packingRequest(send, tot_size, sz);
incardon's avatar
incardon committed
429
430
431
432
433
434
435
436
437
438

			HeapMemory pmem;

			ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem));
			mem.incRef();

			//Packing

			Pack_stat sts;
			
incardon's avatar
incardon committed
439
			pack_unpack_cond_with_prp<has_max_prop<T, has_value_type<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packing(mem, send, sts, send_buf);
incardon's avatar
incardon committed
440

incardon's avatar
incardon committed
441
442
			tags.clear();

incardon's avatar
incardon committed
443
			// receive information
incardon's avatar
incardon committed
444
			base_info bi(NULL,prc,sz,tags);
incardon's avatar
incardon committed
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472

			// Send and recv multiple messages
			sendrecvMultipleMessagesNBX(send_prc.size(),(size_t *)sz.getPointer(),(size_t *)send_prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi,NONE);

			mem.decRef();
			delete &mem;
		}
		
		return true;
	}

	/*! \brief Semantic Scatter, scatter the data from one processor to the other node
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * Scatter(T,S,...,op=add);
	 *
	 * "Scatter" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
	 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 *
473
474
	 * \param send Object to send
	 * \param recv Object to receive
incardon's avatar
incardon committed
475
476
477
478
479
480
481
	 * \param prc processor involved in the scatter
	 * \param sz size of each chunks
	 * \param root which processor should scatter the information
	 *
	 * \return true if the function completed succefully
	 *
	 */
incardon's avatar
incardon committed
482
	template<typename T, typename S, template <typename> class layout_base=memory_traits_lin> bool SScatter(T & send, S & recv, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, size_t root)
incardon's avatar
incardon committed
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
	{
		// Reset the receive buffer
		reset_recv_buf();

		// If we are on master scatter the information
		if (getProcessUnitID() == root)
		{
			// Prepare the sending buffer
			openfpm::vector<const void *> send_buf;


			openfpm::vector<size_t> sz_byte;
			sz_byte.resize(sz.size());

			size_t ptr = 0;

			for (size_t i = 0; i < sz.size() ; i++)
			{
				send_buf.add((char *)send.getPointer() + sizeof(typename T::value_type)*ptr );
				sz_byte.get(i) = sz.get(i) * sizeof(typename T::value_type);
				ptr += sz.get(i);
			}

incardon's avatar
incardon committed
506
507
			tags.clear();

incardon's avatar
incardon committed
508
			// receive information
incardon's avatar
incardon committed
509
			base_info bi(&recv_buf,prc,sz,tags);
incardon's avatar
incardon committed
510
511
512
513
514
515
516
517
518
519

			// Send and recv multiple messages
			sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);

			// we generate the list of the properties to pack
			typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;

			// operation object
			op_ssend_recv_add<void> opa;

incardon's avatar
incardon committed
520
			index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa);
incardon's avatar
incardon committed
521
522
523
524
525
526
		}
		else
		{
			// The non-root receive
			openfpm::vector<size_t> send_req;

incardon's avatar
incardon committed
527
528
			tags.clear();

incardon's avatar
incardon committed
529
			// receive information
incardon's avatar
incardon committed
530
			base_info bi(&recv_buf,prc,sz,tags);
incardon's avatar
incardon committed
531
532
533
534
535
536
537
538
539
540

			// Send and recv multiple messages
			sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);

			// we generate the list of the properties to pack
			typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;

			// operation object
			op_ssend_recv_add<void> opa;

incardon's avatar
incardon committed
541
			index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa);
incardon's avatar
incardon committed
542
543
544
545
546
547
548
549
		}

		return true;
	}
	
	/*! \brief reorder the receiving buffer
	 *
	 * \param prc list of the receiving processors
550
	 * \param sz_recv list of size of the receiving messages (in byte)
incardon's avatar
incardon committed
551
552
	 *
	 */
incardon's avatar
incardon committed
553
	void reorder_buffer(openfpm::vector<size_t> & prc, openfpm::vector<size_t> tags, openfpm::vector<size_t> & sz_recv)
incardon's avatar
incardon committed
554
555
556
557
558
559
560
	{

		struct recv_buff_reorder
		{
			//! processor
			size_t proc;

incardon's avatar
incardon committed
561
562
			size_t tag;

incardon's avatar
incardon committed
563
564
565
566
567
			//! position in the receive list
			size_t pos;

			//! default constructor
			recv_buff_reorder()
incardon's avatar
incardon committed
568
			:proc(0),tag(0),pos(0)
incardon's avatar
incardon committed
569
570
571
572
573
			{};

			//! needed to reorder
			bool operator<(const recv_buff_reorder & rd) const
			{
incardon's avatar
incardon committed
574
575
576
577
				if (proc == rd.proc)
				{return tag < rd.tag;}

				return (proc < rd.proc);
incardon's avatar
incardon committed
578
579
580
581
582
583
584
585
586
587
			}
		};

		openfpm::vector<recv_buff_reorder> rcv;

		rcv.resize(recv_buf.size());

		for (size_t i = 0 ; i < rcv.size() ; i++)
		{
			rcv.get(i).proc = prc.get(i);
incardon's avatar
incardon committed
588
			rcv.get(i).tag = tags.get(i);
incardon's avatar
incardon committed
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
			rcv.get(i).pos = i;
		}

		// we sort based on processor
		rcv.sort();

		openfpm::vector<BHeapMemory> recv_ord;
		recv_ord.resize(rcv.size());

		openfpm::vector<size_t> prc_ord;
		prc_ord.resize(rcv.size());

		openfpm::vector<size_t> sz_recv_ord;
		sz_recv_ord.resize(rcv.size());

		// Now we reorder rcv
		for (size_t i = 0 ; i < rcv.size() ; i++)
		{
			recv_ord.get(i).swap(recv_buf.get(rcv.get(i).pos));
			prc_ord.get(i) = rcv.get(i).proc;
			sz_recv_ord.get(i) = sz_recv.get(rcv.get(i).pos);
		}

		// move rcv into recv
		recv_buf.swap(recv_ord);
		prc.swap(prc_ord);
		sz_recv.swap(sz_recv_ord);

		// reorder prc_recv and recv_sz
	}

	/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * Recv(T,S,...,op=add);
	 *
	 * "SendRecv" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
632
	 * \snippet VCluster_semantic_unit_tests.hpp dsde with complex objects1
incardon's avatar
incardon committed
633
634
635
636
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 *
637
638
639
640
641
642
	 * \param send Object to send
	 * \param recv Object to receive
	 * \param prc_send destination processors
	 * \param prc_recv list of the receiving processors
	 * \param sz_recv number of elements added
	 * \param opt options
incardon's avatar
incardon committed
643
644
645
646
	 *
	 * \return true if the function completed succefully
	 *
	 */
incardon's avatar
incardon committed
647
648
649
650
651
652
653
	template<typename T, typename S, template<typename> class layout_base = memory_traits_lin >
	bool SSendRecv(openfpm::vector<T> & send,
			       S & recv,
			       openfpm::vector<size_t> & prc_send,
			       openfpm::vector<size_t> & prc_recv,
			       openfpm::vector<size_t> & sz_recv,
			       size_t opt = NONE)
incardon's avatar
incardon committed
654
	{
incardon's avatar
incardon committed
655
		prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
incardon's avatar
incardon committed
656
657
658
659
660
661

		// we generate the list of the properties to pack
		typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;

		op_ssend_recv_add<void> opa;

incardon's avatar
incardon committed
662
		index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz_recv,NULL,opa);
incardon's avatar
incardon committed
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685

		return true;
	}


	/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * SSendRecv(T,S,...,op=add);
	 *
	 * "SendRecv" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add<prp...>(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
	 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 * \tparam prp properties for merging
	 *
686
687
688
689
690
691
	 * \param send Object to send
	 * \param recv Object to receive
	 * \param prc_send destination processors
	 * \param prc_recv processors from which we received
	 * \param sz_recv number of elements added per processor
	 * \param sz_recv_byte message received from each processor in byte
incardon's avatar
incardon committed
692
	 *
693
	 * \return true if the function completed successful
incardon's avatar
incardon committed
694
695
	 *
	 */
incardon's avatar
incardon committed
696
	template<typename T, typename S, template<typename> class layout_base, int ... prp> bool SSendRecvP(openfpm::vector<T> & send,
697
698
699
700
701
			                                                      S & recv,
																  openfpm::vector<size_t> & prc_send,
																  openfpm::vector<size_t> & prc_recv,
																  openfpm::vector<size_t> & sz_recv,
																  openfpm::vector<size_t> & sz_recv_byte)
incardon's avatar
incardon committed
702
	{
incardon's avatar
incardon committed
703
		prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,NONE);
incardon's avatar
incardon committed
704
705
706
707
708

		// operation object
		op_ssend_recv_add<void> opa;

		// process the received information
incardon's avatar
incardon committed
709
		process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,&sz_recv_byte,opa);
incardon's avatar
incardon committed
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732

		return true;
	}


	/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * SSendRecv(T,S,...,op=add);
	 *
	 * "SendRecv" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add<prp...>(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
	 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
	 *
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 * \tparam prp properties for merging
	 *
733
734
735
736
737
	 * \param send Object to send
	 * \param recv Object to receive
	 * \param prc_send destination processors
	 * \param prc_recv list of the processors from which we receive
	 * \param sz_recv number of elements added per processors
incardon's avatar
incardon committed
738
739
740
741
	 *
	 * \return true if the function completed succefully
	 *
	 */
incardon's avatar
incardon committed
742
	template<typename T, typename S, template<typename> class layout_base, int ... prp> bool SSendRecvP(openfpm::vector<T> & send,
743
744
745
746
			                                                      S & recv,
																  openfpm::vector<size_t> & prc_send,
																  openfpm::vector<size_t> & prc_recv,
																  openfpm::vector<size_t> & sz_recv)
incardon's avatar
incardon committed
747
	{
incardon's avatar
incardon committed
748
		prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,NONE);
incardon's avatar
incardon committed
749
750
751
752
753

		// operation object
		op_ssend_recv_add<void> opa;

		// process the received information
incardon's avatar
incardon committed
754
		process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,NULL,opa);
incardon's avatar
incardon committed
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777

		return true;
	}

	/*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
	 *
	 * Semantic communication differ from the normal one. They in general
	 * follow the following model.
	 *
	 * SSendRecv(T,S,...,op=add);
	 *
	 * "SendRecv" indicate the communication pattern, or how the information flow
	 * T is the object to send, S is the object that will receive the data.
	 * In order to work S must implement the interface S.add<prp...>(T).
	 *
	 * ### Example scatter a vector of structures, to other processors
	 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
	 *
	 * \tparam op type of operation
	 * \tparam T type of sending object
	 * \tparam S type of receiving object
	 * \tparam prp properties for merging
	 *
778
779
780
781
782
	 * \param send Object to send
	 * \param recv Object to receive
	 * \param prc_send destination processors
	 * \param op_param operation object (operation to do im merging the information)
	 * \param recv_sz size of each receiving buffer. This parameters are output
incardon's avatar
incardon committed
783
784
785
786
787
788
789
790
791
	 *        with RECEIVE_KNOWN you must feed this parameter
	 * \param prc_recv from which processor we receive messages
	 *        with RECEIVE_KNOWN you must feed this parameter
	 * \param opt options default is NONE, another is RECEIVE_KNOWN. In this case each
	 *        processor is assumed to know from which processor receive, and the size of
	 *        the message. in such case prc_recv and sz_recv are not anymore parameters
	 *        but must be input.
	 *
	 *
792
	 * \return true if the function completed successful
incardon's avatar
incardon committed
793
794
	 *
	 */
incardon's avatar
incardon committed
795
	template<typename op, typename T, typename S, template<typename>class layout_base , int ... prp > bool SSendRecvP_op(openfpm::vector<T> & send,
796
797
798
799
800
801
			                                                                      S & recv,
																				  openfpm::vector<size_t> & prc_send,
																				  op & op_param,
																				  openfpm::vector<size_t> & prc_recv,
																				  openfpm::vector<size_t> & recv_sz,
																				  size_t opt = NONE)
incardon's avatar
incardon committed
802
	{
incardon's avatar
incardon committed
803
		prepare_send_buffer<op,T,S,layout_base>(send,recv,prc_send,prc_recv,recv_sz,opt);
incardon's avatar
incardon committed
804
805

		// process the received information
incardon's avatar
incardon committed
806
		process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,NULL,NULL,op_param);
incardon's avatar
incardon committed
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880

		return true;
	}

};



// Function to initialize the global VCluster //

extern Vcluster * global_v_cluster_private;

/*! \brief Initialize a global instance of Runtime Virtual Cluster Machine
 *
 * Initialize a global instance of Runtime Virtual Cluster Machine
 *
 */

static inline void init_global_v_cluster_private(int *argc, char ***argv)
{
	if (global_v_cluster_private == NULL)
		global_v_cluster_private = new Vcluster(argc,argv);
}

static inline void delete_global_v_cluster_private()
{
	delete global_v_cluster_private;
}

static inline Vcluster & create_vcluster()
{
#ifdef SE_CLASS1

	if (global_v_cluster_private == NULL)
		std::cerr << __FILE__ << ":" << __LINE__ << " Error you must call openfpm_init before using any distributed data structures";

#endif

	return *global_v_cluster_private;
}



/*! \brief Check if the library has been initialized
 *
 * \return true if the library has been initialized
 *
 */
static inline bool is_openfpm_init()
{
	return ofp_initialized;
}

/*! \brief Initialize the library
 *
 * This function MUST be called before any other function
 *
 */
static inline void openfpm_init(int *argc, char ***argv)
{
#ifdef HAVE_PETSC

	PetscInitialize(argc,argv,NULL,NULL);

#endif

	init_global_v_cluster_private(argc,argv);

#ifdef SE_CLASS1
	std::cout << "OpenFPM is compiled with debug mode LEVEL:1. Remember to remove SE_CLASS1 when you go in production" << std::endl;
#endif

#ifdef SE_CLASS2
	std::cout << "OpenFPM is compiled with debug mode LEVEL:2. Remember to remove SE_CLASS2 when you go in production" << std::endl;
881
#endif
incardon's avatar
incardon committed
882

883
884
#ifdef SE_CLASS3
	std::cout << "OpenFPM is compiled with debug mode LEVEL:3. Remember to remove SE_CLASS3 when you go in production" << std::endl;
incardon's avatar
incardon committed
885
886
#endif

887
888
889
890
891
892
893
894
895
896
897
898
899
	// install segmentation fault signal handler

	struct sigaction sa;

	sa.sa_sigaction = bt_sighandler;
	sigemptyset(&sa.sa_mask);
	sa.sa_flags = SA_RESTART;

	sigaction(SIGSEGV, &sa, NULL);

	if (*argc != 0)
		program_name = std::string(*argv[0]);

incardon's avatar
incardon committed
900
901
902
	// Initialize math pre-computation tables
	openfpm::math::init_getFactorization();

incardon's avatar
incardon committed
903
904
905
	ofp_initialized = true;
}

906

incardon's avatar
incardon committed
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
/*! \brief Finalize the library
 *
 * This function MUST be called at the end of the program
 *
 */
static inline void openfpm_finalize()
{
#ifdef HAVE_PETSC

	PetscFinalize();

#endif

	delete_global_v_cluster_private();
	ofp_initialized = false;
}


#endif