OOFEM  2.4
OOFEM.org - Object Oriented Finite Element Solver
dyncombuff.C
Go to the documentation of this file.
1 /*
2  *
3  * ##### ##### ###### ###### ### ###
4  * ## ## ## ## ## ## ## ### ##
5  * ## ## ## ## #### #### ## # ##
6  * ## ## ## ## ## ## ## ##
7  * ## ## ## ## ## ## ## ##
8  * ##### ##### ## ###### ## ##
9  *
10  *
11  * OOFEM : Object Oriented Finite Element Code
12  *
13  * Copyright (C) 1993 - 2013 Borek Patzak
14  *
15  *
16  *
17  * Czech Technical University, Faculty of Civil Engineering,
18  * Department of Structural Mechanics, 166 29 Prague, Czech Republic
19  *
20  * This library is free software; you can redistribute it and/or
21  * modify it under the terms of the GNU Lesser General Public
22  * License as published by the Free Software Foundation; either
23  * version 2.1 of the License, or (at your option) any later version.
24  *
25  * This program is distributed in the hope that it will be useful,
26  * but WITHOUT ANY WARRANTY; without even the implied warranty of
27  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
28  * Lesser General Public License for more details.
29  *
30  * You should have received a copy of the GNU Lesser General Public
31  * License along with this library; if not, write to the Free Software
32  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
33  */
34 
35 #include <list>
36 #include <algorithm>
37 
38 #include "dyncombuff.h"
39 #include "mathfem.h"
40 #include "error.h"
41 
42 namespace oofem {
44 {
45  this->EOF_Flag = false;
46  this->number = num;
47  // reserve space for packet header
48  curr_pos += givePackSize(comm, MPI_INT, 2);
49 }
50 
51 
53 {
54  this->EOF_Flag = false;
55  this->number = num;
56  // reserve space for packet header
57  curr_pos += givePackSize(comm, MPI_INT, 2);
58 }
59 
60 
62 { }
63 
64 
65 void
67 {
69  this->EOF_Flag = false;
70  // reserve space for packet header
71  curr_pos += givePackSize(comm, MPI_INT, 2);
72 }
73 
74 int
75 CommunicationPacket :: iSend(MPI_Comm communicator, int dest, int tag)
76 {
77  this->packHeader(communicator);
78  return ( MPI_Isend(this->buff, this->curr_pos, MPI_PACKED, dest, tag,
79  communicator, & this->request) == MPI_SUCCESS );
80 }
81 
82 
83 int
84 CommunicationPacket :: iRecv(MPI_Comm communicator, int source, int tag, int count)
85 {
86  if ( count ) {
87  if ( count >= this->size ) {
88  // reallocate itself
89  if ( this->resize(count) == 0 ) {
90  return 0;
91  }
92  }
93  }
94 
95  return ( MPI_Irecv(this->buff, this->size, MPI_PACKED, source, tag,
96  communicator, & this->request) == MPI_SUCCESS );
97 }
98 
99 
100 int
102  int flag;
103  MPI_Status status;
104 
105  MPI_Test(& this->request, & flag, & status);
106  return flag;
107 }
108 
109 int
111 {
112  MPI_Status status;
113 
114  return ( MPI_Wait(& this->request, & status) == MPI_SUCCESS );
115 }
116 
117 
118 int
120 {
121  int _arry [ 2 ];
122  int _res, _pos = 0;
123 
124  _arry [ 0 ] = this->number;
125  _arry [ 1 ] = this->EOF_Flag;
126 
127  _res = MPI_Pack(_arry, 2, MPI_INT, this->buff, size, & _pos, comm);
128 
129  return ( _res == MPI_SUCCESS );
130 }
131 
132 int
134 {
135  int _arry [ 2 ];
136  int _res, _pos = 0;
137 
138  _res = MPI_Unpack(this->buff, this->size, & _pos, _arry, 2, MPI_INT, comm);
139  this->number = _arry [ 0 ];
140  this->EOF_Flag = _arry [ 1 ];
141 
142  return ( _res == MPI_SUCCESS );
143 }
144 
145 
146 
147 /********DynamicCommunicationBuffer***************/
148 
150 
152  CommunicationBuffer(comm, size, dynamic), packet_list()
153 {
154  number_of_packets = 0;
155  mode = DCB_null;
156  completed = false;
157  /*
158  * // alocate first send/receive packet
159  * active_packet = this->allocateNewPacket (++number_of_packets);
160  * packet_list.push_back(active_packet);
161  */
162 }
163 
164 
166  CommunicationBuffer(comm, dynamic), packet_list()
167 {
168  number_of_packets = 0;
169  mode = DCB_null;
170  completed = false;
171  /*
172  * // alocate first send/receive packet
173  * active_packet = this->allocateNewPacket (++number_of_packets);
174  * packet_list.push_back(active_packet);
175  */
176 }
177 
180 {
181  this->clear();
182 }
183 
184 void
186 {
187  completed = false;
188  this->clear();
189 }
190 
191 void
193 {
194  this->clear();
195 
196  if ( !active_packet ) {
198  packet_list.push_back(active_packet);
199  }
200 }
201 
202 void
204 {
205  recvIt = packet_list.begin();
206  this->popNewRecvPacket();
207 }
208 
209 /*
210  * int
211  * DynamicCommunicationBuffer::write (int* src, int n)
212  * {
213  * int _result=1;
214  * int start_indx=0, end_indx, _size;
215  *
216  * do {
217  * _size = this->giveFitSize(MPI_INT, active_packet -> giveAvailableSpace(), n);
218  * end_indx = start_indx + _size;
219  *
220  * if (_size) _result &= active_packet -> write (communicator, src+start_indx,_size, MPI_INT);
221  * if (end_indx == n) break;
222  * // active packet full, allocate a new one
223  * active_packet = this->allocateNewPacket (++number_of_packets);
224  * packet_list.push_back(active_packet);
225  * start_indx = end_indx;
226  * } while (1);
227  *
228  * return _result;
229  * }
230  *
231  *
232  * int
233  * DynamicCommunicationBuffer::read (int* dest, int n)
234  * {
235  * int _result=1;
236  * int start_indx=0, end_indx, _size;
237  *
238  * do {
239  * _size = this->giveFitSize(MPI_INT, active_packet -> giveAvailableSpace(), n);
240  * end_indx = start_indx + _size;
241  *
242  * if (_size) _result &= active_packet->read (communicator,dest+start_indx,_size, MPI_INT);
243  * if (end_indx == n) break;
244  * // active packet exhausted, pop a new one
245  * this->popNewRecvPacket();
246  * start_indx = end_indx;
247  * } while (1);
248  *
249  * return _result;
250  * }
251  */
252 
253 int
255 {
256  int result = 1;
257 
260 
261  active_rank = dest;
262  active_tag = tag;
263  for ( auto &packet: packet_list ) {
264  result &= packet->iSend(communicator, dest, tag);
265  }
266 
267  /*
268  * int _myrank;
269  * MPI_Comm_rank (communicator, &_myrank);
270  * fprintf (stderr,"[%d] sending to [%d] %d packets for tag %d\n", _myrank, dest, number_of_packets, tag);
271  * (*(packet_list.begin()))->dump();
272  */
273  mode = DCB_send;
274  completed = false;
275  return result;
276 }
277 
278 
279 
280 int
281 DynamicCommunicationBuffer :: iRecv(int source, int tag, int count)
282 {
283  this->init();
284  number_of_packets = 0;
285  // receive first packet, but it is probably not the last one
286  // create new first packet and init its receive
288  active_rank = source;
289  active_tag = tag;
290  mode = DCB_receive;
291  completed = false;
292  return active_packet->iRecv(communicator, source, tag);
293 }
294 
296 {
297  /*
298  * int _myrank;
299  * MPI_Comm_rank (communicator, &_myrank);
300  */
301  if ( completed ) {
302  return 1;
303  }
304 
305  if ( active_packet->testCompletion() ) {
306  // active packet received, add it to the pool and unpach header info
308 
309  //fprintf (stderr, "[%d] received from [%d] packet no. %d\n", _myrank, active_rank, active_packet->getNumber());
310 
312  if ( active_packet->hasEOFFlag() ) {
313  // last packet received; init for unpacking
314  this->initForUnpacking();
315 
316  //fprintf (stderr,"[%d] received from [%d] %d packets for tag %d\n", _myrank, active_rank, number_of_packets, active_tag);
317  //active_packet->dump();
318  completed = true;
319  return 1;
320  } else {
321  // received next packet, but it is not the last one
322  // create new packet and init its receive
325  return 0;
326  }
327  } else {
328  // active packet not yet received
329  return 0;
330  }
331 }
332 
334 {
335  int result = 1;
336 
337  if ( completed ) {
338  return 1;
339  }
340 
341  for ( auto &packet: packet_list ) {
342  result &= packet->testCompletion();
343  }
344 
345  completed = result;
346  return result;
347 }
348 
349 int
351 {
352  if ( mode == DCB_send ) {
353  return this->sendCompleted();
354  } else if ( mode == DCB_receive ) {
355  return this->receiveCompleted();
356  } else {
357  return 0;
358  }
359 }
360 
361 int
363 {
364  if ( mode == DCB_send ) {
365  while ( !this->sendCompleted() ) { }
366 
367  ;
368  return 1;
369  } else if ( mode == DCB_receive ) {
370  while ( !this->receiveCompleted() ) { }
371 
372  ;
373  return 1;
374  }
375 
376  return 0;
377 }
378 
379 
380 int
381 DynamicCommunicationBuffer :: giveFitSize(MPI_Datatype type, int availableSpace, int arrySize)
382 {
383  int arrySpace, guessSize;
384  MPI_Pack_size(arrySize, type, communicator, & arrySpace);
385  if ( availableSpace >= arrySpace ) {
386  return arrySize;
387  }
388 
389  guessSize = ( int ) floor( ( ( double ) arrySize / ( double ) arrySpace ) * availableSpace ) + 1;
390  do {
391  guessSize--;
392  MPI_Pack_size(guessSize, type, communicator, & arrySpace);
393  } while ( ( availableSpace < arrySpace ) && ( guessSize > 0 ) );
394 
395  return guessSize;
396 }
397 
400 {
402  result->init(communicator);
403  result->setNumber(n);
404  return result;
405 }
406 
407 void
409 {
411 }
412 
413 void
415 {
416  for ( auto &packet: packet_list ) {
417  this->freePacket(packet);
418  }
419 
420  packet_list.clear();
421  active_packet = NULL;
422  number_of_packets = 0;
423 }
424 
425 
426 void
428 {
429  active_packet = ( * recvIt );
430  ++recvIt;
431  if ( active_packet == NULL ) {
432  OOFEM_ERROR("no more packets received");
433  }
434 
435  //active_packet->init(communicator);
436 }
437 
438 void
440 {
441  packet_list.push_back(p);
442 }
443 
444 
445 int
447 {
448  OOFEM_ERROR("not implemented");
449  return 0;
450 }
451 
453 
456 {
457  CommunicationPacket *result;
458 
459  if ( available_packets.empty() ) {
460  // allocate new packet
461  if ( ( result = new CommunicationPacket(comm, 0) ) == NULL ) {
462  OOFEM_ERROR("allocation of new packed failed");
463  }
464 
465  allocatedPackets++;
466  } else {
467  result = available_packets.front();
468  available_packets.pop_front();
469  freePackets--;
470  }
471 
472 #ifdef DEBUG
473  // add packet into list of leased packets
474  leased_packets.push_back(result);
475 #endif
476 
477  leasedPackets++;
478  return result;
479 }
480 
481 void
483 {
484 #ifdef DEBUG
485  std :: list< CommunicationPacket * > :: iterator it = std :: find(leased_packets.begin(), leased_packets.end(), p);
486  if ( it != leased_packets.end() ) {
487  // found previosly leased one
488  leased_packets.erase(it);
489  available_packets.push_back(p);
490  } else {
491  OOFEM_ERROR("request to push strange packet (not allocated by pool)");
492  }
493 
494 #else
495  available_packets.push_back(p);
496 #endif
497 
498  leasedPackets--;
499  freePackets++;
500 }
501 
502 void
504 {
505  if ( !leased_packets.empty() ) {
506  OOFEM_WARNING("some packets still leased");
507  }
508 
509  for ( auto &packet: available_packets ) {
510  if ( packet ) {
511  delete packet;
512  }
513  }
514 
515  available_packets.clear();
516  allocatedPackets = leasedPackets = freePackets = 0;
517 }
518 
519 
520 void
522 {
523  OOFEM_LOG_INFO("CommunicationPacketPool: allocated %d packets\n(packet size: %d, %d leased, %d free)\n",
524  allocatedPackets, __CommunicationPacket_DEFAULT_SIZE, leasedPackets, freePackets);
525 }
526 } // end namespace oofem
int testCompletion()
Tests if the operation identified by this->request is complete.
Definition: dyncombuff.C:350
int packHeader(MPI_Comm)
Packs packet header info at receiver beginning.
Definition: dyncombuff.C:119
virtual int iRecv(int source, int tag, int count=0)
Starts standard mode, nonblocking receive.
Definition: dyncombuff.C:281
virtual void init()
Initializes buffer to empty state.
Definition: combuff.C:114
CommunicationPacket * active_packet
Active packet.
Definition: dyncombuff.h:165
CommunicationPacket * allocateNewPacket(int)
Definition: dyncombuff.C:399
virtual ~CommunicationPacket()
Destructor.
Definition: dyncombuff.C:61
int max(int i, int j)
Returns bigger value form two given decimals.
Definition: mathfem.h:71
static CommunicationPacketPool packetPool
Static packet pool.
Definition: dyncombuff.h:173
virtual ~DynamicCommunicationBuffer()
Destructor.
Definition: dyncombuff.C:179
int givePackSize(MPI_Comm communicator, MPI_Datatype type, int size)
Returns pack size required to pack array of given type and size (c-style).
Definition: combuff.C:218
ComBuff_BYTE_TYPE * buff
Buffer. Dynamically allocated.
Definition: combuff.h:59
int iRecv(MPI_Comm communicator, int source, int tag, int count=0)
Starts standard mode, nonblocking receive.
Definition: dyncombuff.C:84
virtual int testCompletion()
Tests if the operation identified by this->request is complete.
Definition: dyncombuff.C:101
virtual void initForPacking()
Initialize for packing.
Definition: dyncombuff.C:192
virtual int iSend(int dest, int tag)
Starts standard mode, nonblocking send.
Definition: dyncombuff.C:254
virtual int waitCompletion()
Waits until a completion of a nonblocking communication.
Definition: dyncombuff.C:110
enum oofem::DynamicCommunicationBuffer::DCB_Mode mode
DynamicCommunicationBuffer(MPI_Comm comm, int size, bool dynamic=0)
Constructor. Creates buffer of given size, using given communicator for packing.
Definition: dyncombuff.C:151
MPI_Request request
MPI request handle.
Definition: combuff.h:65
void pushNewRecvPacket(CommunicationPacket *)
Definition: dyncombuff.C:439
int active_tag
Active rank and tag (send by initSend,initReceive, and initExchange).
Definition: dyncombuff.h:167
std::list< CommunicationPacket * >::iterator recvIt
Iterator to iterate over received packets.
Definition: dyncombuff.h:163
int resize(int newSize)
Resizes buffer to given size.
Definition: combuff.C:78
bool completed
Communication completion flag.
Definition: dyncombuff.h:175
std::list< CommunicationPacket * > packet_list
Definition: dyncombuff.h:161
virtual int bcast(int root)
Initializes broadcast over collaborating processes.
Definition: dyncombuff.C:446
Class CommunicationPacket represent a data-packet, that is used to implement dynamic communicator...
Definition: dyncombuff.h:60
#define OOFEM_LOG_INFO(...)
Definition: logger.h:127
Class CommunicationBuffer provides abstraction for communication buffer.
Definition: combuff.h:208
#define OOFEM_ERROR(...)
Definition: error.h:61
int size
Size and current position in buffer in bytes (sizeof(char)).
Definition: combuff.h:55
int unpackHeader(MPI_Comm)
Definition: dyncombuff.C:133
#define __CommunicationPacket_DEFAULT_SIZE
Definition: dyncombuff.h:47
void pushPacket(CommunicationPacket *)
Definition: dyncombuff.C:482
void setNumber(int _num)
Definition: dyncombuff.h:123
void freePacket(CommunicationPacket *)
Definition: dyncombuff.C:408
virtual int waitCompletion()
Waits until a completion of a nonblocking communication.
Definition: dyncombuff.C:362
int giveFitSize(MPI_Datatype type, int availableSpace, int arrySize)
Definition: dyncombuff.C:381
int iSend(MPI_Comm communicator, int dest, int tag)
Starts standard mode, nonblocking send.
Definition: dyncombuff.C:75
CommunicationPacket(MPI_Comm comm, int size, int num)
Constructor. Creates buffer of given size, using given communicator for packing.
Definition: dyncombuff.C:43
the oofem namespace is to define a context or scope in which all oofem names are defined.
CommunicationPacket * popPacket(MPI_Comm)
Definition: dyncombuff.C:455
virtual void init(MPI_Comm comm)
Initializes buffer to empty state.
Definition: dyncombuff.C:66
#define OOFEM_WARNING(...)
Definition: error.h:62
virtual void initForUnpacking()
Initialize for Unpacking (data already received).
Definition: dyncombuff.C:203
virtual void init()
Initializes buffer to empty state.
Definition: dyncombuff.C:185

This page is part of the OOFEM documentation. Copyright (c) 2011 Borek Patzak
Project e-mail: info@oofem.org
Generated at Tue Jan 2 2018 20:07:28 for OOFEM by doxygen 1.8.11 written by Dimitri van Heesch, © 1997-2011