MPI 并行编程入门 中国科学院计算机网络信息中心超级计算中心
聚合通信 定义 三种通信方式 聚合函数列表 同步 广播 收集 散发 全散发收集 归约
定义 communicator 1 3 4 5 0 2 一个通信器的所有进程参与, 所有进程都调用聚合通信函数 MPI 系统保证聚合通信函数与点对点调用不会混淆 聚合通信不需要消息标号 聚合通信函数都为阻塞式函数 聚合通信的功能 : 通信 同步 计算等
三种通信方式 一对多 多对一 多对多
聚合函数列表 MPI_Barrier MPI_Bcast MPI_Gather/MPI_Gatherv MPI_Allgather/MPI_Allgatherv MPI_Scatter/MPI_Scatterv MPI_Alltoall/MPI_Alltoallv MPI_Reduce/MPI_Allreduce/MPI_Reduce_scatter MPI_Scan
同步 该函数用于进程同步, 即一个进程调用该函数后需等 待通信器内所有进程调用该函数后返回
Sample - Fortran CALL MPI_COMM_RANK(COMM,RANK,IERR) IF(RANK.EQ.0) THEN CALL WORK0( ) ELSE CALL WORK1( ) CALL MPI_BARRIER(COMM,IERR) CALL WORK2( ) CALL MPI_COMM_RANK(COMM,RANK,IERR) IF(RANK.EQ.0) THEN CALL WORK0( ) CALL MPI_BARRIER(COMM,IERR) ELSE CALL WORK1( ) CALL WORK2( )
广播 processes data A0 broadcast A0 A0 A0 A0 (a) A0 A0
广播
广播 通信器中 root 进程将自己 buffer 内的数据发给通信器内所有进程 非 root 进程用自己的 buffer 接收数据
Sample - C #include<mpi.h> int main (int argc, char *argv[]) { int rank; double param; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD,&rank); if(rank==5) param=23.0; MPI_Bcast(¶m,1,MPI_DOUBLE,5,MPI_COMM_WORLD); printf("p:%d after broadcast parameter is %f\n",rank,param); } MPI_Finalize(); Program Output P:0 after broadcast parameter is 23.000000 P:6 after broadcast parameter is 23.000000 P:5 after broadcast parameter is 23.000000 P:2 after broadcast parameter is 23.000000 P:3 after broadcast parameter is 23.000000 P:7 after broadcast parameter is 23.000000 P:1 after broadcast parameter is 23.000000 P:4 after broadcast parameter is 23.000000
收集 & 散发 A0 A1 A2 A3 A4 A5 scatter A0 A1 gather A2 A3 A4 (b) A5
ROOT 收集 (MPI_Gather)
收集 (MPI_Gather) 所有进程 ( 包括根进程 ) 将 sendbuf 的数据传输给根进程 ; 根进程按着进程号顺序依次接收到 recvbuf 发送与接收的数据类型相同 ;sendcount 和 recvcount 相同 非根进程接收消息缓冲区被忽略, 但需要提供
收集 (MPI_Gatherv) A B C D B A C D
收集 (MPI_Gatherv) 每个进程发送的数据个数不同 根进程接收不一定连续存放 数组 recvcounts 和 displs 的元素个数等于进程总数, 并与进程顺序对应 数据的个数与位移都以 recvtype 为单位
Sample Fortran INTEGER A(100),RBUF(10000),SIZE,ROOT,RTYPE,RANK,RECS(100),DISP(100) CALL MPI_COMM_SIZE(COMM,SIZE,IERR) CALL MPI_COMM_RANK(COMM,RANK,IERR) IF(100*SIZE.GT. 10000) THEN PRINT*, NOT ENOUGH RECEIVING BUF CALL MPI_FINALIZE(IERR) ELSE 进程 i 向进程 0 发送 100-i 个整型数, 每隔 100 个整型数依次存储消息 ROOT=0 IF(RANK.EQ.0) THEN DO I=0,SIZE-1 RECS(I)=100-I 0 1 3 4 DISP(I)=I*100 ENDDO ENDIF CALL MPI_GATHERV(A,100-RANK,MPI_INTEGER,RBUF,RECS,DISP,MPI_INTERGER,ROOT,COMM,IERR) ENDIF
收集 (MPI_Allgather) A0 A0 B0 C0 D0 E0 F0 B0 C0 allgather A0 A0 B0 B0 C0 D0 C0 D0 E0 E0 F0 F0 D0 A0 B0 C0 D0 E0 F0 E0 A0 B0 C0 D0 E0 F0 F0 (c) A0 B0 C0 D0 E0 F0
收集 (MPI_Allgather)
收集 (MPI_Allgather)
收集 (MPI_Allgatherv)
散发 (MPI_Scatter)
散发 (MPI_Scatter) 根进程有 np 个数据块, 每块包含 sendcount 个类型为 sendtype 的数据 ; 根进程将这些数据块按着进程号顺序依次散发到各个进程 ( 包含根进程 ) 的 recvbuf 发送与接收的数据类型相同 ;sendcount 和 recvcount 相同 非根进程发送消息缓冲区被忽略, 但需要提供
散发 (MPI_Scatterv) B A C D A B C D
散发 (MPI_Scatterv) 根进程向各个进程发送的数据个数不等 根进程散发各个进程的数据, 其缓存区不一定连续 数组 recvcounts 和 displs 的元素个数等于进程总数, 并与进程顺序对应 数据的个数与位移都以 sendtype 为单位
Sample - C #include <mpi.h> int main (int argc, char *argv[]) { int rank,size,i,j; double param[400],mine; int sndcnt,revcnt; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD,&rank); MPI_Comm_size(MPI_COMM_WORLD,&size); revcnt=1; if(rank==3) { for(i=0;i<size;i++) param[i]=23.0+i; sndcnt=1; } MPI_Scatter(param,sndcnt,MPI_DOUBLE,&mine,revcnt,MPI_DOUBLE,3,MPI_COMM_WORLD); printf("p:%d mine is %f\n",rank,mine); MPI_Finalize(); } 根进程向所有进程次序分发 1 个数组元素 Program Output P:0 mine is 23.000000 P:1 mine is 24.000000 P:2 mine is 25.000000 P:3 mine is 26.000000
全散发收集 (MPI_Alltoall) A0 A1 A2 A3 A4 A5 A0 B0 C0 D0 E0 F0 B0 C0 B1 C1 B2 B3 C2 C3 B4 C4 B5 C5 alltoall A1 A2 B1 B2 C1 C2 D1 D2 E1 E2 F1 F2 D0 D1 D2 D3 D4 D5 A3 B3 C3 D3 E3 F3 E0 E1 E2 E3 E4 E5 A4 B4 C4 D4 E4 F4 F0 F1 F2 F3 F4 D5 (d) A5 B5 C5 D5 E5 D5
全散发收集 (MPI_Alltoall) A B C D E F G H I J K L MN O P A B C D E F G H I J K L MN O P A E I M B F J N C G K O D HL P
全散发收集 (MPI_Alltoall) Do I=0,NPROCS-1 CALL MPI_SCATTER(SENDBUF(I), SENDCOUNT, SENDTYPE, + RECVBUF+I*RECVCOUNT*extent(RECVTYPE), RECVCOUNT, RECVTYPE, I, COMM, IERR) ENDDO
全散发收集 (MPI_Alltoallv) 任意行散发, 参照 sdispls 任意列收集, 参照 rdispls
全散发收集 (MPI_Alltoallv) 每个进程如同根进程一样, 执行一次 MPI_Scatterv 发送 Do I=0,NPROCS-1 CALL MPI_SCATTERV(SENDBUF(I), SENDCOUNTS, SDISPLS, SENDTYPE, + RECVBUF+RDISPLS(I)*extent(RECVTYPE), RECVCOUNTS(I), RECVTYPE, I, ) ENDDO 每个进程如同根进程一样, 执行一次 MPI_Gatherv 接收 Do I=0,NPROCS-1 CALL MPI_GATHERV(SENDBUF+SDISPLS(I)*extent(RECVTYPE), SENDCOUNTS(I), + SENDTYPE, RECVBUF(I), RECVCOUNTS, RDISPLS, RECVTYPE, I, ) ENDDO
归约
归约 (MPI_Reduce) 各进程提供数据 (sendbuf,count,datatype) 归约结果存放在 root 进程的缓冲区 recvbuf
归约
归约
归约
Sample - C #include <mpi.h> /* Run with 16 processes */ int main (int argc, char *argv[]) { 数对的归约操作 int rank, root=7; struct { double value; int rank; } in, out; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD,&rank); in.value=rank+1; in.rank=rank; MPI_Reduce(&in,&out,1,MPI_DOUBLE_INT,MPI_MAXLOC,root,MPI_COMM_WORLD); if(rank==root) printf("p :%d max=%lf at rank %d\n",rank,out.value,out.rank); MPI_Reduce(&in,&out,1,MPI_DOUBLE_INT,MPI_MINLOC,root,MPI_COMM_WORLD); if(rank==root) printf("p :%d min=%lf at rank %d\n",rank,out.value,out.rank); MPI_Finalize(); } (1.000000,0) (2.000000,1) (16.000000,15) Program Output P:7 max = 16.000000 at rank 15 P:7 min = 1.000000 at rank 0
Sample - Fortran PROGRAM MaxMin C Run with 8 processes INCLUDE 'mpif.h' INTEGER err, rank, size integer in(2),out(2) CALL MPI_INIT(err) CALL MPI_COMM_RANK(MPI_WORLD_COMM,rank,err) CALL MPI_COMM_SIZE(MPI_WORLD_COMM,size,err) in(1)=rank+1 in(2)=rank call MPI_REDUCE(in,out,1,MPI_2INTEGER,MPI_MAXLOC, 7,MPI_COMM_WORLD,err) if(rank.eq.7) print *,"P:",rank," max=",out(1)," at rank ",out(2) call MPI_REDUCE(in,out,1,MPI_2INTEGER,MPI_MINLOC, 2,MPI_COMM_WORLD,err) if(rank.eq.2) print *,"P:",rank," min=",out(1)," at rank ",out(2) CALL MPI_FINALIZE(err) END Program Output P:2 min=1 at rank 0 P:7 max=8 at rank 7
全归约 (MPI_Allreduce)
归约散发 (MPI_Reduce_scatter)
归约散发 (MPI_Reduce_scatter)
前缀归约 (MPI_Scan)
归约 创建新运算 func 是用户提供的用于完成运算的外部函数 commute 用来指明所定义的运算是否满足交换律 一个运算创建后和 MPI 预定义的运算一样使用
归约 用户自定义函数 invec 与 inoutvec 分别指出要被归约的数据所在缓冲的首地址 datatype 指出归约对象的数据类型 len 给出了 invec 与 inoutvec 中包含的元素个数 ( 相当于归约函数中的 count) 函数返回时, 运算结果储存在 inoutvec 中
归约
Sample - Fortran PROGRAM UserOP C Run with 8 processes INCLUDE 'mpif.h' INTEGER err, rank, size integer source, reslt EXTERNAL digit LOGICAL commute INTEGER myop CALL MPI_INIT(err) CALL MPI_COMM_RANK(MPI_WORLD_COMM,rank,err) CALL MPI_COMM_SIZE(MPI_WORLD_COMM,size,err) commute= true call MPI_OP_CREATE(digit,commute,myop,err) source=(rank+1)**2 call MPI_BARRIER(MPI_COM_WORLD,err) call MPI_SCAN(source,reslt,1,MPI_INTEGER,myop,MPI_COMM_WORLD,err) print *,"P:",rank," my result is ",reslt CALL MPI_OP_FREE(myop,err) CALL MPI_FINALIZE(err) END integer function digit(in,inout,len,type) integer len,type integer in(len),inout(len) do i=1,len inout(i)=mod((in(i)+inout(i)),10) end do digit=5 end source={ 1,4,9,16,25,36,49,64} Program Output P:6 my result is 0 P:5 my result is 1 P:7 my result is 4 P:1 my result is 5 P:3 my result is 0 P:2 my result is 4 P:4 my result is 5 P:0 my result is 1