JEMRIS  2.8.2
open-source MRI simulations
mpi_Model.h
Go to the documentation of this file.
1 
5 /*
6  * JEMRIS Copyright (C)
7  * 2006-2018 Tony Stoecker
8  * 2007-2018 Kaveh Vahedipour
9  * 2009-2018 Daniel Pflugfelder
10  *
11  *
12  * This program is free software; you can redistribute it and/or modify
13  * it under the terms of the GNU General Public License as published by
14  * the Free Software Foundation; either version 2 of the License, or
15  * (at your option) any later version.
16  *
17  * This program is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20  * GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License
23  * along with this program; if not, write to the Free Software
24  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
25  */
26 
27 
28 #ifndef _MPI_MODEL_H_
29 #define _MPI_MODEL_H_
30 
31 #include<iostream>
32 #include "config.h"
33 
34 using namespace std;
35 #define MPICH_IGNORE_CXX_SEEK
36 
37 #include <mpi.h>
38 
39 #include "Signal.h"
40 #include "Declarations.h"
41 #include "Sample.h"
42 #include "MultiPoolSample.h"
43 
44 #ifdef HAVE_MPI_THREADS
45  #include <pthread.h>
46 #endif
47 
48 void mpi_send_paket_signal(Signal* pSig,int CoilID);
49 void mpi_recv_paket_signal(Signal* pSig,int SlaveID,int CoilID);
50 
51 /*****************************************************************************/
52 inline MPI_Datatype MPIspindata () {
53 
54  int NUM_DATA = World::instance()->GetNoOfSpinProps();
55 
56  MPI_Datatype MPI_SPINDATA ;
57  MPI_Datatype type = MPI_DOUBLE;
58  MPI_Aint disp = 0;
59 
60  MPI_Type_struct( 1, &NUM_DATA, &disp, &type, &MPI_SPINDATA);
61  MPI_Type_commit( &MPI_SPINDATA);
62 
63  return MPI_SPINDATA;
64 
65 }
66 
67 /*****************************************************************************/
68 /* function that does the progress counting; called by an extra thread from the master process */
69 #ifdef HAVE_MPI_THREADS
70 /*************************************************************************/
71 
72 void* CountProgress(void * arg) {
73 
74  int TotalNoSpins = *((int *) arg);
75  int SpinsDone = TotalNoSpins - *(((int *) arg )+1);
76  int NowDone=0;
77  int progress_percent = -1;
78  static string bars = "***************************************************";
79  static string blancs = " ";
80 
81  MPI_Status status;
82 
83  while (SpinsDone != TotalNoSpins) {
84  MPI_Recv(&NowDone,1,MPI_INT,MPI_ANY_SOURCE,SPINS_PROGRESS,MPI_COMM_WORLD,&status);
85  SpinsDone += NowDone;
86 
87  //update progress counter
88  int progr = (100*(SpinsDone+1)/TotalNoSpins);
89  if ( progr != progress_percent) {
90  progress_percent = progr;
91  ofstream fout(".jemris_progress.out" , ios::out);
92  fout << progr;
93  fout.close();
94 
95  cout << "\rSimulating | ";
96  cout << bars.substr(0, progr/2) << " " << blancs.substr(0, 50-progr/2) << "| " <<setw(3) << setfill(' ') << progr << "% done";
97 
98  flush(cout);
99 
100  }
101  }
102 
103  return new void*;
104 
105 }
106 #endif
107 
108 /*****************************************************************************/
109 void mpi_devide_and_send_sample (Sample* pSam, CoilArray* RxCA ) {
110 
111 #ifdef HAVE_MPI_THREADS
112  // copy + paste thread example
113  pthread_t counter_thread;
114  int errcode; /* holds pthread error code */
115 
116  int buffer[2];
117  buffer[0] = pSam->GetSize();
118  buffer[1] = pSam->SpinsLeft();
119 
120  errcode=pthread_create(&counter_thread, /* thread struct */
121  NULL, /* default thread attributes */
122  CountProgress, /* start routine */
123  (void *) &buffer);
124 
125  if (errcode) { /* arg to routine */
126  cout << "thread creation failed !! exit."<< endl;
127  exit (-1);
128  }
129  // end copy + paste thread example
130 #endif
131 
132  int size, ilen;
133  std::string hm (MPI_MAX_PROCESSOR_NAME,' ');
134 
135  MPI_Comm_size(MPI_COMM_WORLD, &size);
136  MPI_Get_processor_name(&hm[0], &ilen);
137 
138  cout << endl << hm.c_str() << " -> Master Process: send MR sample (" << pSam->GetSize()
139  << " spins) to " << size-1 << " slave(s)"<< endl << endl << flush;
140 
141  std::vector<int> sendcount (size); // array with NumberOfSpins[slaveid]
142  std::vector<int> displs (size);
143  int recvbuf;
144  double* recvdummy = 0; // dummy buffer
145 
146  // no spin data for master:
147  displs[0] = 0;
148  sendcount[0] = 0;
149 
150  pSam->GetScatterVectors(sendcount.data(), displs.data(), size);
151 
152  // scatter sendcounts:
153  MPI_Scatter (sendcount.data(), 1, MPI_INT, &recvbuf, 1, MPI_INT, 0, MPI_COMM_WORLD);
154 
155 
156  //MODIF
157  /*int ierr,my_id,num_procs;
158  ierr = MPI_Comm_rank(MPI_COMM_WORLD, &my_id);
159  ierr = MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
160 
161  printf("I'm process %i out of %i processes\n", my_id, num_procs);*/
162 
163  int ii;
164  long TotalSpinNumber=pSam->GetSize(),nextSpinMem=-1;
165  for(ii=0;ii<size;ii++)
166  {
167  //cout<<ii<<" Sencount "<<sendcount[ii]<<" Displs "<<displs[ii]<<endl;
168  if(ii>0) {
169  MPI_Send(&TotalSpinNumber,1,MPI_LONG,ii,SEND_TOTAL_NO_SPINS,MPI_COMM_WORLD);
170  MPI_Send(&displs[ii],1,MPI_LONG,ii,SEND_BEGIN_SPIN,MPI_COMM_WORLD);
171  //cout<<0<<" Sent spins index information to "<<ii<<" : "<<TotalSpinNumber<<" "<<displs[ii]<<endl;
172  }
173  }
174  //MODIF***
175 
176 
177  // broadcast number of individual spin prioperties:
178  long NProps = pSam->GetNProps();
179  MPI_Bcast (&NProps, 1, MPI_LONG, 0, MPI_COMM_WORLD);
180 //*
181  long hsize = pSam->GetHelperSize();
182  MPI_Bcast (&hsize, 1, MPI_LONG, 0, MPI_COMM_WORLD);
183  MPI_Bcast (pSam->GetHelper(), hsize, MPI_DOUBLE, 0, MPI_COMM_WORLD);
184 
185  long dsize = pSam->GetSampleDimsSize();
186  MPI_Bcast (&dsize, 1, MPI_LONG, 0, MPI_COMM_WORLD);
187  MPI_Bcast (&(pSam->GetSampleDims()[0]), dsize, MPI_INT, 0, MPI_COMM_WORLD);
188 // */
189  int csize = pSam->GetNoSpinCompartments();
190  MPI_Bcast (&csize , 1, MPI_INT, 0, MPI_COMM_WORLD);
191 // */
192  MPI_Datatype MPI_SPINDATA = MPIspindata();
193 
194  //scatter sendcounts:
195  MPI_Scatterv (pSam->GetSpinsData(), sendcount.data(), displs.data(),
196  MPI_SPINDATA, &recvdummy, 0, MPI_SPINDATA, 0, MPI_COMM_WORLD);
197  // broadcast resolution:
198  MPI_Bcast (pSam->GetResolution(),3,MPI_DOUBLE,0, MPI_COMM_WORLD);
199 
200  // now listen for paket requests:
201  int SlavesDone=0;
202  while (SlavesDone < size - 1) {
203 
204  int SlaveID;
205  int NoSpins;
206  int NextSpinToSend;
207  MPI_Status status;
208  MPI_Recv(&SlaveID, 1, MPI_INT, MPI_ANY_SOURCE,
209  REQUEST_SPINS, MPI_COMM_WORLD, &status);
210 
211  //get next spin paket to send:
212  pSam->GetNextPacket(NoSpins,NextSpinToSend,SlaveID);
213 
214 
215  //MODIF
216  for(ii=0;ii<size;ii++)
217  {
218  //cout<<ii<<" sencount "<<NoSpins<<" displs "<<NextSpinToSend<<endl;
219  if(ii>0 && NextSpinToSend!=nextSpinMem) {
220  MPI_Send(&NextSpinToSend,1,MPI_LONG,SlaveID,SEND_BEGIN_SPIN,MPI_COMM_WORLD);
221  //cout<<endl<<0<<" sent spins index information to "<<SlaveID<<" : "<<"--- "<<NextSpinToSend<<endl;
222  nextSpinMem=NextSpinToSend;
223  if(nextSpinMem<0) nextSpinMem-=1;
224  }
225 
226  }
227  //MODIF***
228 
229 
230  if (NoSpins == 0)
231  SlavesDone++;
232 
233  // receive signal
234  for (unsigned int i=0; i < RxCA->GetSize(); i++)
235  mpi_recv_paket_signal(RxCA->GetCoil(i)->GetSignal(),SlaveID,i);
236 
237  // dump temp signal
238  pSam->DumpRestartInfo(RxCA);
239 
240  // now send NoSpins
241  MPI_Send(&NoSpins,1,MPI_INT,SlaveID,SEND_NO_SPINS, MPI_COMM_WORLD);
242  if (NoSpins > 0)
243  MPI_Send(&(pSam->GetSpinsData())[NextSpinToSend*pSam->GetNProps()],
244  NoSpins, MPI_SPINDATA,SlaveID,SEND_SAMPLE, MPI_COMM_WORLD);
245 
246 #ifndef HAVE_MPI_THREADS
247 
248  // without threads: write progress bar each time new spins are sent:
249  static int progress_percent = -1;
250  static int spinsdone = 0;
251  spinsdone += sendcount[SlaveID];
252  sendcount[SlaveID] = NoSpins;
253 
254  //update progress counter (pjemris without threads support)
255  World* pW = World::instance();
256  int progr = (100*(spinsdone+1)/pW->TotalSpinNumber);
257  // case of restart: set progress to 100 at end:
258  if (SlavesDone==(size-1)) progr=100;
259 
260  if (progr != progress_percent) {
261  progress_percent = progr;
262  ofstream fout(".jemris_progress.out" , ios::out);
263  fout << progr;
264  fout.close();
265  }
266 #endif
267 
268  } // end while (SlavesDone < size -1)
269 
270  flush (cout);
271 
272 #ifdef HAVE_MPI_THREADS
273  /* join threads: */
274  int *status; /* holds return code */
275  errcode = pthread_join(counter_thread,(void **) &status);
276  if (errcode) {
277  cout << "Joining of threads failed! (so what... ;) )"<<endl;
278  // exit(-1);
279  }
280 #endif
281 
282  return;
283 
284 }
285 
286 /*
287  * Receive first portion of sample from MPI master process.
288  * @return pointer to the new (sub)sample.
289  * Deletion of the (sub)sample has to be done elsewehere!
290  * (This is e.g. done by the Simulator class)
291  */
292 Sample* mpi_receive_sample(int sender, int tag){
293 
294  long NPoints;
295  World* pW = World::instance();
296 
297  // get number of spins:
298  int nospins;
299  MPI_Scatter(NULL,1,MPI_INT,&nospins,1,MPI_INT,0, MPI_COMM_WORLD);
300 
301  //MODIF
302  long TotalSpinNumber=0,beginTraj=0;
303  //cout<<World::instance()->m_myRank<<" Query spins index information: "<<endl;
304  MPI_Status status;
305  MPI_Recv(&TotalSpinNumber,1,MPI_LONG,0,SEND_TOTAL_NO_SPINS,MPI_COMM_WORLD,&status);
306  MPI_Recv(&beginTraj,1,MPI_LONG,0,SEND_BEGIN_SPIN,MPI_COMM_WORLD,&status);
307  pW->setTrajLoading(beginTraj,TotalSpinNumber);
308  int num_traj=beginTraj;
309  //cout<<World::instance()->m_myRank<<" Received spins index information: "<<TotalSpinNumber<<" "<<num_traj<<endl;
310  //MODIF***
311 
312  NPoints= (long) nospins;
313  //get number of physical properties per spin
314  long NProps;
315  MPI_Bcast (&NProps,1,MPI_LONG,0, MPI_COMM_WORLD);
316 
317  Sample* pSam = new Sample();
318  pSam->CreateSpins(NProps, NPoints);
319  pW->SetNoOfSpinProps(pSam->GetNProps());
320 
321 /* >> required for multi-pool samples with exchange*/
322  long hsize = 0;
323  MPI_Bcast (&hsize, 1, MPI_LONG,0, MPI_COMM_WORLD);
324  pSam->CreateHelper(hsize);
325 
326  MPI_Bcast (pSam->GetHelper(),(int)hsize,MPI_DOUBLE,0, MPI_COMM_WORLD);
327  pW->InitHelper(pSam->GetHelperSize());
328  pSam->GetHelper( pW->Helper() );
329 
330  long dsize = 0;
331  MPI_Bcast (&dsize, 1, MPI_LONG,0, MPI_COMM_WORLD);
332  pSam->CreateDims(dsize);
333 
334  MPI_Bcast (&(pSam->GetSampleDims()[0]),(int)dsize,MPI_INT,0, MPI_COMM_WORLD);
335  int csize = 0;
336  MPI_Bcast (&csize, 1, MPI_INT,0, MPI_COMM_WORLD);
337  pSam->SetNoSpinCompartments(csize);
338  /* << required for multi-pool samples with exchange*/
339 
340 
341  MPI_Datatype MPI_SPINDATA = MPIspindata();
342  // get sample:
343  MPI_Scatterv (NULL,NULL,NULL,MPI_SPINDATA,pSam->GetSpinsData(),nospins,MPI_SPINDATA,0, MPI_COMM_WORLD);
344  //get resolution (needed for position randomness)
345  MPI_Bcast (pSam->GetResolution(),3,MPI_DOUBLE,0, MPI_COMM_WORLD);
346 
347  int ilen; std::string hm (MPI_MAX_PROCESSOR_NAME, ' ');
348  MPI_Get_processor_name(&hm[0],&ilen);
349  //MODIF
350  //cout <<endl<< "hostname"<< " -> slave " << setw(2) << MPI::COMM_WORLD.Get_rank() << ": received "
351  // << NPoints << " spins for simulation ..." << endl << flush;
352  //MODIF***
353 
354  return pSam;
355 
356 }
357 
358 /*****************************************************************************/
364 
365  World* pW = World::instance();
366  int myRank = pW->m_myRank;
367  int NoSpins = 0;
368 
369  //backup dimensions
370  vector<size_t> d = samp->GetSampleDims();
371 
372  // request new spins:
373  MPI_Send(&myRank,1,MPI_INT,0,REQUEST_SPINS,MPI_COMM_WORLD);
374 
375  // send signal
376  for (unsigned int i=0; i < RxCA->GetSize(); i++)
377  mpi_send_paket_signal(RxCA->GetCoil(i)->GetSignal(),i);
378 
379  // Receive number of spins
380  MPI_Status status;
381  MPI_Recv(&NoSpins,1,MPI_INT,0,SEND_NO_SPINS,MPI_COMM_WORLD,&status);
382 
383  //MODIF
384  long TotalSpinNumber=World::instance()->getTrajNumber(),beginTraj=0;
385  //cout<<World::instance()->m_myRank<<" query spins index information: "<<endl;
386  //MPI_Recv(&TotalSpinNumber,1,MPI_LONG,0,SEND_TOTAL_NO_SPINS,MPI_COMM_WORLD,&status);
387  MPI_Recv(&beginTraj,1,MPI_LONG,0,SEND_BEGIN_SPIN,MPI_COMM_WORLD,&status);
388  World::instance()->setTrajLoading(beginTraj,TotalSpinNumber);
389  //cout<<endl<<World::instance()->m_myRank<<" received spins index information: "<<TotalSpinNumber<<" "<<beginTraj<<endl;
390  //MODIF***
391 
392  // Spins left?
393  if (NoSpins == 0) return false;
394 
395  // prepare sample structure
396  samp->ClearSpins();
397  samp->CreateSpins(NoSpins);
398  samp->SetSampleDims(d);
399 
400 
401 
402  // Receive
403  MPI_Datatype MPI_SPINDATA = MPIspindata();
404  MPI_Recv(samp->GetSpinsData(),NoSpins,MPI_SPINDATA,0,SEND_SAMPLE,MPI_COMM_WORLD,&status);
405  int ilen; std::string hm (MPI_MAX_PROCESSOR_NAME, ' ');
406  MPI_Get_processor_name(&hm[0],&ilen);
407  //MODIF
408  //cout <<endl<< "hostname" << " -> slave " << setw(2) << MPI::COMM_WORLD.Get_rank() << ": received "
409  // << NoSpins << " spins for simulation ..." << endl << flush;
410  //pW->setTrajLoading(myRank,NoSpins);
411  //MODIF***
412 /*
413  cout << endl << "SLAVE: " << setw(2) << MPI::COMM_WORLD.Get_rank()
414  << " , #Pools= " << samp->GetNoSpinCompartments() << " , SampleDims=" << samp->GetSampleDimsSize() << " , SampleProps= " << samp->GetNProps() << " , NextPackageSize=" << samp->GetSize()
415  << " , ExMatDim=" << samp->GetHelperSize() << endl << endl;
416 // */
417  return true;
418 
419 }
420 /*****************************************************************************/
421 void mpi_send_paket_signal (Signal* pSig, const int CoilID) {
422 
423  int tsize = (int) pSig->Repo()->Size();
424 
425  if (World::instance()->m_myRank == 1)
426  MPI_Send(pSig->Repo()->Times(), (int) pSig->Repo()->Samples(), MPI_DOUBLE, 0, SIG_TP + (CoilID)*4,MPI_COMM_WORLD);
427 
428  MPI_Send(pSig->Repo()->Data(), tsize, MPI_DOUBLE, 0, SIG_MX + (CoilID)*4,MPI_COMM_WORLD);
429 
430 
431 }
432 
433 /*****************************************************************************/
434 void mpi_recv_paket_signal(Signal *pSig, const int SlaveId, const int CoilID) {
435 
436  int tsize = (int) pSig->Repo()->Size();
437  double* data = pSig->Repo()->Data();
438  std::vector<double> tmp (tsize);
439  MPI_Status status;
440 
441  if (SlaveId == 1)
442  MPI_Recv(pSig->Repo()->Times(), (int) pSig->Repo()->Samples(), MPI_DOUBLE, SlaveId, SIG_TP + (CoilID)*4,MPI_COMM_WORLD,&status);
443 
444  MPI_Recv(&tmp[0], tsize, MPI_DOUBLE, SlaveId, SIG_MX + (CoilID)*4,MPI_COMM_WORLD,&status);
445 
446  for (int i = 0; i < tsize; i++)
447  data[i] += tmp[i];
448 
449 }
450 
451 #endif
Implementation of JEMRIS Declarations.
size_t GetNProps() const
Definition: Sample.h:371
static World * instance()
Get sole instance of the sequence tree.
Definition: World.cpp:34
void InitHelper(long size)
Initilize helper array.
Definition: World.cpp:115
Singleton with information about the simulation of the current spin.
Definition: World.h:51
void DumpRestartInfo(CoilArray *RxCA)
Utility function for restart: dump information to restart jemris after crash.
Definition: Sample.cpp:522
Signal * GetSignal()
Get the received signal of this coil.
Definition: Coil.h:116
long Size()
Size of bulk data (i.e. number of elemments of m_data)
Definition: Signal.h:71
void GetNextPacket(int &noSpins, int &NextSpinToSend, int SlaveId)
utility function to send sample in parallel mode: get next spin packet to be sent. (beginning index + no_spins to send)
Definition: Sample.cpp:458
void SetNoOfSpinProps(int n)
Set number of spinproperties.
Definition: World.cpp:97
int GetNoOfSpinProps()
Get number of spin properties.
Definition: World.h:113
long TotalSpinNumber
Total number of spins.
Definition: World.h:163
double * GetSpinsData()
Definition: Sample.h:462
Coil configuration and sensitivities.
Definition: CoilArray.h:40
Implementation of JEMRIS Sample.
Coil * GetCoil(unsigned channel)
Get a particular coil.
Definition: CoilArray.cpp:250
double * Data()
Reference to data repository.
Definition: Signal.h:83
Implementation of JEMRIS Signal.
size_t GetSize() const
Definition: Sample.cpp:320
Repository * Repo()
Definition: Signal.h:306
double * Helper()
Reference to helper array.
Definition: World.h:119
const long Samples() const
Number of samples.
Definition: Signal.h:107
bool mpi_recieve_sample_paket(Sample *samp, CoilArray *RxCA)
Definition: mpi_Model.h:363
vector< size_t > GetSampleDims() const
Definition: Sample.h:378
double * GetResolution()
Get grid resolution.
Definition: Sample.h:417
int SpinsLeft()
Returns No spins which still needs to be calculated.
Definition: Sample.cpp:582
double * Times()
Reference to time point repository.
Definition: Signal.h:95
int m_myRank
MPI rank of this process. if m_myRank<0 process is serial jemris.
Definition: World.h:188
const int GetSampleDimsSize() const
Dimensions.
Definition: Sample.h:392
unsigned int GetSize()
Get the number of channels.
Definition: CoilArray.h:93
void setTrajLoading(int firstSpin, int paketSize)
Set trajectory parameters for MPI current sample paket.
Definition: World.h:71
void CreateSpins(const size_t size)
create the spin structure
Definition: Sample.cpp:88
void GetScatterVectors(int *sendcount, int *displ, int size)
utility function to send sample in parallel mode: initial samples are send via mpi_scatterv; get need...
Definition: Sample.cpp:366
The Sample is the object to simulate. It contains the spins.
Definition: Sample.h:301
The signal store and IO.
Definition: Signal.h:249

-- last change 24.05.2018 | Tony Stoecker | Imprint | Data Protection --