|
PaCO++
0.05
|
#include <stdio.h>#include <mpi.h>#include <algorithm>#include "Schedule.h"#include "Internal.h"#include "DistributionBloc.h"#include <vector>#include <iostream>
Go to the source code of this file.
Functions | |
| bool | cmp_rank (const LocalData_t &a, const LocalData_t &b) |
| void | computeReceiveBlock1D (const GlobalData_t &gd, const LocalData_t &dd, const Topology_t &stopo, const Topology_t &dtopo, const ParisBlock_param_t *param, vector< LocalData_t > &vOut) |
| void | computeSendBlock1D (const GlobalData_t &gd, const LocalData_t &sd, const Topology_t &stopo, const Topology_t &dtopo, const ParisBlock_param_t *param, vector< LocalData_t > &vOut) |
| void | doSchedule (const GlobalData_t &gd, const LocalData_t &ld, const Topology_t &ctopo, vector< LocalData_t > &sched_send, vector< LocalData_t > &sched_recv, void *comm) |
| bool cmp_rank | ( | const LocalData_t & | a, |
| const LocalData_t & | b | ||
| ) |
Definition at line 27 of file Schedule.cc.
Referenced by doSchedule().
{
return a.start < b.start;
}
| void computeReceiveBlock1D | ( | const GlobalData_t & | gd, |
| const LocalData_t & | dd, | ||
| const Topology_t & | stopo, | ||
| const Topology_t & | dtopo, | ||
| const ParisBlock_param_t * | param, | ||
| vector< LocalData_t > & | vOut | ||
| ) |
Definition at line 135 of file Schedule.cc.
References BlockNumberOfElementProc(), blockSize(), getProcRangeInf(), getProcRangeSup(), NumberOfBlockProc(), and OwnerBlock().
Referenced by computeReceiveDataBlock1DServer(), and computeSendDataBlock1DClient().
{
#ifdef DEBUG_INTERNAL
cerr << "\nIn compute Receive Schedule--------------------\n";
fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total);
fprintf(stderr, "gd.len %ld\tdd.start %d\tdd.len %d\n", gd.len, dd.start, dd.len);
#endif
if (stopo.total == dtopo.total) {
vOut.push_back(dd);
#ifdef DEBUG_INTERNAL
fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", dd.rank, dd.start, dd.len, dd.base);
#endif
} else {
// Apend mode
vOut.reserve(vOut.size()+stopo.total); // in bloc mode, at most one msg from each src node
unsigned slbsz = blockSize(gd.len, stopo.total, param);
if (gd.cyclic == 0) {
unsigned long dlow = dd.start;
unsigned long dhigh = dlow + dd.len;
unsigned fpid, lpid;
fpid = getProcRangeInf(dlow, slbsz);
lpid = getProcRangeSup(dhigh, slbsz);
#ifdef DEBUG_INTERNAL
fprintf(stderr, " loop from %d to %d width stotal: %ld\n", fpid, lpid, stopo.total);
#endif
// for each src bloc
for(unsigned i=fpid; i <= lpid; i++) {
vOut.resize(vOut.size()+1);
LocalData_t& s = vOut[vOut.size()-1];
s.rank = i;
unsigned tmp = i*slbsz;
s.start = ( dlow >= tmp)?dlow:tmp; // max
tmp = (i+1)*slbsz;
unsigned end = ( dhigh <= tmp)?dhigh:tmp; // min
s.len = end - s.start;
s.base = dd.base + ((s.start - dd.start) * gd.unit_size);
#ifdef DEBUG_INTERNAL
fprintf(stderr, " r1: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
#endif
}
} else {
// it is a blockcyclic distribution
unsigned dlbsz = blockSize(gd.len, dtopo.total, param);
unsigned dtbsz = dlbsz * dtopo.total;
unsigned nbbloc = NumberOfBlockProc(gd.len, dtopo.total, dlbsz, dd.rank);
// for each dst bloc, find a src node
for(unsigned b=0; b<nbbloc; b++) {
unsigned gb = b * dtopo.total + dd.rank; // global bloc id
unsigned srank = OwnerBlock(gb, stopo.total);
vOut.resize(vOut.size()+1);
LocalData_t& s = vOut[vOut.size()-1];
s.rank = srank;
s.start = dtbsz*b + dd.rank*dlbsz;
s.len = BlockNumberOfElementProc(gd.len, dd.rank, dtopo.total, dlbsz, b);
s.base = dd.base + b * dlbsz * gd.unit_size;
#ifdef DEBUG_INTERNAL
fprintf(stderr, " r2: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
#endif
}
}
}
}

| void computeSendBlock1D | ( | const GlobalData_t & | gd, |
| const LocalData_t & | sd, | ||
| const Topology_t & | stopo, | ||
| const Topology_t & | dtopo, | ||
| const ParisBlock_param_t * | param, | ||
| vector< LocalData_t > & | vOut | ||
| ) |
Definition at line 38 of file Schedule.cc.
References BlockNumberOfElementProc(), blockSize(), computeBlockBounds(), getProcRangeInf(), getProcRangeSup(), NumberOfBlockProc(), and OwnerBlock().
Referenced by computeReceiveDataBlock1DServer(), and computeSendDataBlock1DClient().
{
#ifdef DEBUG_INTERNAL
cerr << "\nIn compute Send Schedule--------------------\n";
fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total);
fprintf(stderr, "gd.len %ld\tgd.cyclic: %ld\tsd.start %d\tsd.len %d\n", gd.len, gd.cyclic, sd.start, sd.len);
#endif
if (stopo.total == dtopo.total) {
// vOut.reserve(vOut.size()+dtopo.total); // in bloc mode, at most one msg to each dest node
vOut.push_back(sd);
#ifdef DEBUG_INTERNAL
fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", sd.rank, sd.start, sd.len, sd.base);
#endif
} else {
// Append mode
vOut.reserve(vOut.size() + dtopo.total); // in bloc mode, at most one msg to each dest node
unsigned slbsz = blockSize(gd.len, stopo.total, param);
if (gd.cyclic == 0) {
// that's a standard bloc redistribution
unsigned long slow, shigh;
computeBlockBounds(&slow, &shigh, gd.len, sd.rank, stopo.total, slbsz, 0);
unsigned dlbsz = blockSize(gd.len, dtopo.total, param);
unsigned fpid, lpid;
fpid = getProcRangeInf(slow, dlbsz);
lpid = getProcRangeSup(shigh, dlbsz);
#ifdef DEBUG_INTERNAL
fprintf(stderr, " loop from %d to %d width dtotal: %ld\n", fpid, lpid, dtopo.total);
#endif
// for each dest bloc
for(unsigned i=fpid; i <= lpid; i++) {
vOut.resize(vOut.size()+1);
LocalData_t& s = vOut[vOut.size()-1];
s.rank = i;
unsigned tmp = i*dlbsz;
s.start = ( slow >= tmp)?slow:tmp; // max
tmp = (i+1)*dlbsz;
unsigned end = ( shigh <= tmp)?shigh:tmp; // min
s.len = end - s.start;
s.base = sd.base + ((s.start - sd.start) * gd.unit_size);
#ifdef DEBUG_INTERNAL
fprintf(stderr, " s1: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
#endif
}
} else {
// it is a blockcyclic distribution
unsigned stbsz = slbsz * stopo.total;
unsigned nbbloc = NumberOfBlockProc(gd.len, stopo.total, slbsz, sd.rank);
// for each src bloc, find a dst node
for(unsigned b=0; b<nbbloc; b++) {
unsigned gb = b * stopo.total + sd.rank; // global bloc id
unsigned drank = OwnerBlock(gb, dtopo.total);
vOut.resize(vOut.size()+1);
LocalData_t& s = vOut[vOut.size()-1];
s.rank = drank;
s.start = (stbsz*b) + (sd.rank*slbsz);
s.len = BlockNumberOfElementProc(gd.len, sd.rank, stopo.total, slbsz, b);
s.base = sd.base + ( b * slbsz * gd.unit_size );
#ifdef DEBUG_INTERNAL
fprintf(stderr, " s2: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
#endif
}
}
}
#ifdef DEBUG_INTERNAL
cerr << "\nIn compute Send Schedule-------------------- done\n";
#endif
}

| void doSchedule | ( | const GlobalData_t & | gd, |
| const LocalData_t & | ld, | ||
| const Topology_t & | ctopo, | ||
| vector< LocalData_t > & | sched_send, | ||
| vector< LocalData_t > & | sched_recv, | ||
| void * | comm | ||
| ) |
Definition at line 227 of file Schedule.cc.
Referenced by computeReceiveDataBlock1DServer(), and computeSendDataBlock1DClient().
{
#ifdef DEBUG_INTERNAL
cerr << "\nIn doSchedule--------------------\n";
#endif
MPI_Comm mpi_comm = *(MPI_Comm*) comm;
#ifdef DEBUG_COMM
fprintf(stderr," MPI_COMM_WORLD=%d mpi_comm=%d\n", MPI_COMM_WORLD, mpi_comm);
#endif
if (sched_send.size() || sched_recv.size()) {
MPI_Request sreq[sched_send.size()];
MPI_Request rreq[sched_recv.size()];
unsigned si, ri;
si=0;
ri=0;
MPI_Status sstat[sched_send.size()];
MPI_Status rstat[sched_recv.size()];
vector<LocalData_t*> local_recv;
vector<LocalData_t*> local_send;
local_recv.clear();
local_send.clear();
// Sorting data
if (sched_send.size()) std::stable_sort(sched_send.begin(), sched_send.end(), cmp_rank);
if (sched_recv.size()) std::stable_sort(sched_recv.begin(), sched_recv.end(), cmp_rank);
// Sending data
// Post Asynchronous MPI receive
#ifdef DEBUG_COM
cerr << " #sched_recv: " << sched_recv.size() << endl;
#endif
for(unsigned i=0; i < sched_recv.size(); i++) {
unsigned from = getProcId(sched_recv[i].rank, ctopo);
if (from == ld.rank) {
#ifdef DEBUG_COMM
fprintf(stderr, " recv: schedr no=%d start=%d len=%d from=%d LOCAL\n", i,
sched_recv[i].start, sched_recv[i].len, from);
#endif
local_recv.push_back(&sched_recv[i]);
} else {
#ifdef DEBUG_COMM
fprintf(stderr, " recv: schedr no=%d start=%d len=%d from=%d base=%p\n", i,
sched_recv[i].start, sched_recv[i].len, from, sched_recv[i].base);
#endif
int err = MPI_Irecv(sched_recv[i].base, sched_recv[i].len*gd.unit_size,
MPI_BYTE, from, 51, mpi_comm, &rreq[ri++]);
if (err!= MPI_SUCCESS) {
cerr << "EROR IN MPI_Irecv: return value is "<<err<<endl;
}
}
}
// Send data via MPI
#ifdef DEBUG_COMM
cerr << " #sched_send: " << sched_send.size() << endl;
#endif
for(unsigned i=0; i < sched_send.size(); i++) {
unsigned to = getProcId(sched_send[i].rank, ctopo);
if (to == ld.rank) {
#ifdef DEBUG_COMM
fprintf(stderr, " send: scheds no=%d start=%d len=%d to=%d LOCAL\n", i,
sched_send[i].start, sched_send[i].len, to);
#endif
local_send.push_back(&sched_send[i]);
} else {
#ifdef DEBUG_COMM
fprintf(stderr, " send: scheds no=%d start=%d len=%d to=%d base=%p\n", i,
sched_send[i].start, sched_send[i].len, to, sched_send[i].base);
#endif
int err = MPI_Isend(sched_send[i].base, sched_send[i].len*gd.unit_size,
MPI_BYTE, to, 51, mpi_comm, &sreq[si++]);
if (err!= MPI_SUCCESS) {
cerr << "EROR IN MPI_Isend: return value is "<<err<<endl;
}
}
}
// Do local communication vie memcpy
if (local_recv.size() != local_send.size()) {
cerr << "Error: local recv & send have different size: " << local_recv.size() << " " << local_send.size() << endl;
}
for(unsigned i=0; i < local_recv.size(); i++) {
if (local_recv[i]->len != local_send[i]->len) {
cerr << "Error: local recv & send have different len for i= "<<i<< " :" << local_recv[i]->len << " " << local_send[i]->len << endl;
}
#ifdef DEBUG_COMM
fprintf(stderr, " local: scheds no=%d start=%d len=%d\n", i,
sched_send[i].start, sched_send[i].len);
#endif
memcpy(local_recv[i]->base, local_send[i]->base, local_send[i]->len*gd.unit_size);
}
// Wait all receive & send
#ifdef DEBUG_INTERNAL
cerr << "WAITING local communications to end...\n";
#endif
int err;
err = MPI_Waitall(si, sreq, sstat);
if (err!= MPI_SUCCESS) {
cerr << "EROR IN MPI_WaitAll for send: return value is "<<err<<endl;
}
err = MPI_Waitall(ri, rreq, rstat);
if (err!= MPI_SUCCESS) {
cerr << "EROR IN MPI_WaitAll for recv: return value is "<<err<<endl;
}
#ifdef DEBUG_INTERNAL
cerr << "WAITING local communications to end...ok \n";
#endif
}
}