JEMRIS  2.8.1
open-source MRI simulations
mpi_Model.h
Go to the documentation of this file.
1 
5 /*
6  * JEMRIS Copyright (C)
7  * 2006-2015 Tony Stoecker
8  * 2007-2015 Kaveh Vahedipour
9  * 2009-2015 Daniel Pflugfelder
10  *
11  *
12  * This program is free software; you can redistribute it and/or modify
13  * it under the terms of the GNU General Public License as published by
14  * the Free Software Foundation; either version 2 of the License, or
15  * (at your option) any later version.
16  *
17  * This program is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20  * GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License
23  * along with this program; if not, write to the Free Software
24  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
25  */
26 
27 
28 #ifndef _MPI_MODEL_H_
29 #define _MPI_MODEL_H_
30 
31 #include<iostream>
32 #include "config.h"
33 
34 using namespace std;
35 #define MPICH_IGNORE_CXX_SEEK
36 
37 #include <mpi.h>
38 
39 #include "Signal.h"
40 #include "Declarations.h"
41 #include "Sample.h"
42 
43 #ifdef HAVE_MPI_THREADS
44  #include <pthread.h>
45 #endif
46 
47 void mpi_send_paket_signal(Signal* pSig,int CoilID);
48 void mpi_recv_paket_signal(Signal* pSig,int SlaveID,int CoilID);
49 
50 /*****************************************************************************/
51 inline MPI_Datatype MPIspindata () {
52 
53  int NUM_DATA = World::instance()->GetNoOfSpinProps();
54 
55  MPI_Datatype MPI_SPINDATA ;
56  MPI_Datatype type = MPI_DOUBLE;
57  MPI_Aint disp = 0;
58 
59  MPI_Type_struct( 1, &NUM_DATA, &disp, &type, &MPI_SPINDATA);
60  MPI_Type_commit( &MPI_SPINDATA);
61 
62  return MPI_SPINDATA;
63 
64 }
65 
66 /*****************************************************************************/
67 /* function that does the progress counting; called by an extra thread from the master process */
68 #ifdef HAVE_MPI_THREADS
69 /*************************************************************************/
70 
71 void* CountProgress(void * arg) {
72 
73  int TotalNoSpins = *((int *) arg);
74  int SpinsDone = TotalNoSpins - *(((int *) arg )+1);
75  int NowDone=0;
76  int progress_percent = -1;
77  static string bars = "***************************************************";
78  static string blancs = " ";
79 
80  MPI_Status status;
81 
82  while (SpinsDone != TotalNoSpins) {
83  MPI_Recv(&NowDone,1,MPI_INT,MPI_ANY_SOURCE,SPINS_PROGRESS,MPI_COMM_WORLD,&status);
84  SpinsDone += NowDone;
85 
86  //update progress counter
87  int progr = (100*(SpinsDone+1)/TotalNoSpins);
88  if ( progr != progress_percent) {
89  progress_percent = progr;
90  ofstream fout(".jemris_progress.out" , ios::out);
91  fout << progr;
92  fout.close();
93 
94  cout << "\rSimulating | ";
95  cout << bars.substr(0, progr/2) << " " << blancs.substr(0, 50-progr/2) << "| " <<setw(3) << setfill(' ') << progr << "% done";
96 
97  flush(cout);
98 
99  }
100  }
101 
102  return new void*;
103 
104 }
105 #endif
106 
107 /*****************************************************************************/
108 void mpi_devide_and_send_sample (Sample* pSam, CoilArray* RxCA ) {
109 
110 #ifdef HAVE_MPI_THREADS
111  // copy + paste thread example
112  pthread_t counter_thread;
113  int errcode; /* holds pthread error code */
114 
115  int buffer[2];
116  buffer[0] = pSam->GetSize();
117  buffer[1] = pSam->SpinsLeft();
118 
119  errcode=pthread_create(&counter_thread, /* thread struct */
120  NULL, /* default thread attributes */
121  CountProgress, /* start routine */
122  (void *) &buffer);
123 
124  if (errcode) { /* arg to routine */
125  cout << "thread creation failed !! exit."<< endl;
126  exit (-1);
127  }
128  // end copy + paste thread example
129 #endif
130 
131  int size, ilen;
132  std::string hm (MPI_MAX_PROCESSOR_NAME,' ');
133 
134  MPI_Comm_size(MPI_COMM_WORLD, &size);
135  MPI_Get_processor_name(&hm[0], &ilen);
136 
137  cout << endl << hm.c_str() << " -> Master Process: send MR sample (" << pSam->GetSize()
138  << " spins) to " << size-1 << " slave(s)"<< endl << endl << flush;
139 
140  std::vector<int> sendcount (size); // array with NumberOfSpins[slaveid]
141  std::vector<int> displs (size);
142  int recvbuf;
143  double* recvdummy = 0; // dummy buffer
144 
145  // no spin data for master:
146  displs[0] = 0;
147  sendcount[0] = 0;
148 
149  pSam->GetScatterVectors(sendcount.data(), displs.data(), size);
150 
151  // scatter sendcounts:
152  MPI_Scatter (sendcount.data(), 1, MPI_INT, &recvbuf, 1, MPI_INT, 0, MPI_COMM_WORLD);
153 
154  // broadcast number of individual spin prioperties:
155  long NProps = pSam->GetNProps();
156  MPI_Bcast (&NProps, 1, MPI_LONG, 0, MPI_COMM_WORLD);
157 
158  long hsize = pSam->GetHelperSize();
159 
160  MPI_Bcast (&hsize, 1, MPI_LONG, 0, MPI_COMM_WORLD);
161  MPI_Bcast (pSam->GetHelper(), hsize, MPI_DOUBLE, 0, MPI_COMM_WORLD);
162  int csize = pSam->GetNoSpinCompartments();
163  MPI_Bcast (&csize , 1, MPI_INT, 0, MPI_COMM_WORLD);
164 
165  MPI_Datatype MPI_SPINDATA = MPIspindata();
166 
167  //scatter sendcounts:
168  MPI_Scatterv (pSam->GetSpinsData(), sendcount.data(), displs.data(),
169  MPI_SPINDATA, &recvdummy, 0, MPI_SPINDATA, 0, MPI_COMM_WORLD);
170  // broadcast resolution:
171  MPI_Bcast (pSam->GetResolution(),3,MPI_DOUBLE,0, MPI_COMM_WORLD);
172 
173  // now listen for paket requests:
174  int SlavesDone=0;
175  while (SlavesDone < size - 1) {
176 
177  int SlaveID;
178  int NoSpins;
179  int NextSpinToSend;
180  MPI_Status status;
181  MPI_Recv(&SlaveID, 1, MPI_INT, MPI_ANY_SOURCE,
182  REQUEST_SPINS, MPI_COMM_WORLD, &status);
183 
184  //get next spin paket to send:
185  pSam->GetNextPacket(NoSpins,NextSpinToSend,SlaveID);
186 
187  if (NoSpins == 0)
188  SlavesDone++;
189 
190  // receive signal
191  for (unsigned int i=0; i < RxCA->GetSize(); i++)
192  mpi_recv_paket_signal(RxCA->GetCoil(i)->GetSignal(),SlaveID,i);
193 
194  // dump temp signal
195  pSam->DumpRestartInfo(RxCA);
196 
197  // now send NoSpins
198  MPI_Send(&NoSpins,1,MPI_INT,SlaveID,SEND_NO_SPINS, MPI_COMM_WORLD);
199  if (NoSpins > 0)
200  MPI_Send(&(pSam->GetSpinsData())[NextSpinToSend*pSam->GetNProps()],
201  NoSpins, MPI_SPINDATA,SlaveID,SEND_SAMPLE, MPI_COMM_WORLD);
202 
203 #ifndef HAVE_MPI_THREADS
204 
205  // without threads: write progress bar each time new spins are sent:
206  static int progress_percent = -1;
207  static int spinsdone = 0;
208  spinsdone += sendcount[SlaveID];
209  sendcount[SlaveID] = NoSpins;
210 
211  //update progress counter (pjemris without threads support)
212  World* pW = World::instance();
213  int progr = (100*(spinsdone+1)/pW->TotalSpinNumber);
214  // case of restart: set progress to 100 at end:
215  if (SlavesDone==(size-1)) progr=100;
216 
217  if (progr != progress_percent) {
218  progress_percent = progr;
219  ofstream fout(".jemris_progress.out" , ios::out);
220  fout << progr;
221  fout.close();
222  }
223 #endif
224 
225  } // end while (SlavesDone < size -1)
226 
227  flush (cout);
228 
229 #ifdef HAVE_MPI_THREADS
230  /* join threads: */
231  int *status; /* holds return code */
232  errcode = pthread_join(counter_thread,(void **) &status);
233  if (errcode) {
234  cout << "Joining of threads failed! (so what... ;) )"<<endl;
235  // exit(-1);
236  }
237 #endif
238 
239  return;
240 
241 }
242 
243 /*
244  * Receive first portion of sample from MPI master process.
245  * @return pointer to the new (sub)sample.
246  * Deletion of the (sub)sample has to be done elsewehere!
247  * (This is e.g. done by the Simulator class)
248  */
249 Sample* mpi_receive_sample(int sender, int tag){
250 
251  long NPoints;
252 
253  // get number of spins:
254  int nospins;
255  MPI_Scatter(NULL,1,MPI_INT,&nospins,1,MPI_INT,0, MPI_COMM_WORLD);
256  Sample* pSam = new Sample();
257 
258  NPoints= (long) nospins;
259  //get number of physical properties per spin
260  long NProps;
261  MPI_Bcast (&NProps,1,MPI_LONG,0, MPI_COMM_WORLD);
262 
263  pSam->CreateSpins(NProps, NPoints);
264 
265  long hsize = 0;
266  MPI_Bcast (&hsize, 1, MPI_LONG,0, MPI_COMM_WORLD);
267 
268  pSam->CreateHelper(hsize);
269  MPI_Bcast (pSam->GetHelper(),(int)hsize,MPI_DOUBLE,0, MPI_COMM_WORLD);
270 
271  int csize = 0;
272  MPI_Bcast (&csize, 1, MPI_INT,0, MPI_COMM_WORLD);
273 
274  pSam->SetNoSpinCompartments(csize);
277 
278  MPI_Datatype MPI_SPINDATA = MPIspindata();
279  // get sample:
280  MPI_Scatterv (NULL,NULL,NULL,MPI_SPINDATA,pSam->GetSpinsData(),nospins,MPI_SPINDATA,0, MPI_COMM_WORLD);
281  //get resolution (needed for position randomness)
282  MPI_Bcast (pSam->GetResolution(),3,MPI_DOUBLE,0, MPI_COMM_WORLD);
283 
284  int ilen; std::string hm (MPI_MAX_PROCESSOR_NAME, ' ');
285  MPI_Get_processor_name(&hm[0],&ilen);
286  //cout << hostname << " -> slave " << setw(2) << MPI::COMM_WORLD.Get_rank() << ": received "
287  // << NPoints << " spins for simulation ..." << endl << flush;
288 
289  return pSam;
290 
291 }
292 
293 /*****************************************************************************/
299 
300  World* pW = World::instance();
301  int myRank = pW->m_myRank;
302  int NoSpins = 0;
303 
304  // request new spins:
305  MPI_Send(&myRank,1,MPI_INT,0,REQUEST_SPINS,MPI_COMM_WORLD);
306 
307  // send signal
308  for (unsigned int i=0; i < RxCA->GetSize(); i++)
309  mpi_send_paket_signal(RxCA->GetCoil(i)->GetSignal(),i);
310 
311  // Receive number of spins
312  MPI_Status status;
313  MPI_Recv(&NoSpins,1,MPI_INT,0,SEND_NO_SPINS,MPI_COMM_WORLD,&status);
314 
315  // Spins left?
316  if (NoSpins == 0) return false;
317 
318  // prepare sample structure
319  samp->ClearSpins();
320  samp->CreateSpins(NoSpins);
321 
322  // Receive
323  MPI_Datatype MPI_SPINDATA = MPIspindata();
324  MPI_Recv(samp->GetSpinsData(),NoSpins,MPI_SPINDATA,0,SEND_SAMPLE,MPI_COMM_WORLD,&status);
325  int ilen; std::string hm (MPI_MAX_PROCESSOR_NAME, ' ');
326  MPI_Get_processor_name(&hm[0],&ilen);
327  //cout << hostname << " -> slave " << setw(2) << MPI::COMM_WORLD.Get_rank() << ": received "
328  // << NoSpins << " spins for simulation ..." << endl << flush;
329 
330  return true;
331 
332 }
333 /*****************************************************************************/
334 void mpi_send_paket_signal (Signal* pSig, const int CoilID) {
335 
336  int tsize = (int) pSig->Repo()->Size();
337 
338  if (World::instance()->m_myRank == 1)
339  MPI_Send(pSig->Repo()->Times(), (int) pSig->Repo()->Samples(), MPI_DOUBLE, 0, SIG_TP + (CoilID)*4,MPI_COMM_WORLD);
340 
341  MPI_Send(pSig->Repo()->Data(), tsize, MPI_DOUBLE, 0, SIG_MX + (CoilID)*4,MPI_COMM_WORLD);
342 
343 
344 }
345 
346 /*****************************************************************************/
347 void mpi_recv_paket_signal(Signal *pSig, const int SlaveId, const int CoilID) {
348 
349  int tsize = (int) pSig->Repo()->Size();
350  double* data = pSig->Repo()->Data();
351  std::vector<double> tmp (tsize);
352  MPI_Status status;
353 
354  if (SlaveId == 1)
355  MPI_Recv(pSig->Repo()->Times(), (int) pSig->Repo()->Samples(), MPI_DOUBLE, SlaveId, SIG_TP + (CoilID)*4,MPI_COMM_WORLD,&status);
356 
357  MPI_Recv(&tmp[0], tsize, MPI_DOUBLE, SlaveId, SIG_MX + (CoilID)*4,MPI_COMM_WORLD,&status);
358 
359  for (int i = 0; i < tsize; i++)
360  data[i] += tmp[i];
361 
362 }
363 
364 #endif
Implementation of JEMRIS Declarations.
static World * instance()
Get sole instance of the sequence tree.
Definition: World.cpp:34
Singleton with information about the simulation of the current spin.
Definition: World.h:51
void DumpRestartInfo(CoilArray *RxCA)
Utility function for restart: dump information to restart jemris after crash.
Definition: Sample.cpp:505
Signal * GetSignal()
Get the received signal of this coil.
Definition: Coil.h:116
long Size()
Size of bulk data (i.e. number of elemments of m_data)
Definition: Signal.h:71
void GetNextPacket(int &noSpins, int &NextSpinToSend, int SlaveId)
utility function to send sample in parallel mode: get next spin packet to be sent. (beginning index + no_spins to send)
Definition: Sample.cpp:441
void SetNoOfSpinProps(int n)
Set number of spinproperties.
Definition: World.cpp:82
int GetNoOfSpinProps()
Get number of spin properties.
Definition: World.h:85
long TotalSpinNumber
Total number of spins.
Definition: World.h:135
const long Samples() const
Number of samples.
Definition: Signal.h:107
double * GetSpinsData()
Definition: Sample.h:427
Coil configuration and sensitivities.
Definition: CoilArray.h:40
Implementation of JEMRIS Sample.
Coil * GetCoil(unsigned channel)
Get a particular coil.
Definition: CoilArray.cpp:250
double * Data()
Reference to data repository.
Definition: Signal.h:83
Implementation of JEMRIS Signal.
Repository * Repo()
Definition: Signal.h:306
bool mpi_recieve_sample_paket(Sample *samp, CoilArray *RxCA)
Definition: mpi_Model.h:298
double * GetResolution()
Get grid resolution.
Definition: Sample.h:382
int SpinsLeft()
Definition: Sample.cpp:565
double * Times()
Reference to time point repository.
Definition: Signal.h:95
void ClearSpins()
delete the spin structure
Definition: Sample.cpp:80
int m_myRank
MPI rank of this process. if m_myRank<0 process is serial jemris.
Definition: World.h:160
unsigned int GetSize()
Get the number of channels.
Definition: CoilArray.h:93
size_t GetNProps() const
Definition: Sample.h:352
void CreateSpins(const size_t size)
create the spin structure
Definition: Sample.cpp:88
size_t GetSize() const
Definition: Sample.cpp:303
void SetNoOfCompartments(int n)
Set number of compartments to.
Definition: World.cpp:110
void GetScatterVectors(int *sendcount, int *displ, int size)
utility function to send sample in parallel mode: initial samples are send via mpi_scatterv; get need...
Definition: Sample.cpp:349
The Sample is the object to simulate. It contains the spins.
Definition: Sample.h:282
The signal store and IO.
Definition: Signal.h:249

-- last change 17.06.2016 | Tony Stoecker | Imprint --