NCCL深度學(xué)習(xí)之初始化及ncclUniqueId的產(chǎn)生源碼解析
NCCL
NCCL是英偉達(dá)開源的GPU通信庫,支持集合通信和點(diǎn)對(duì)點(diǎn)通信。
看下官方給的一個(gè)demo:
#include <stdio.h>
#include "cuda_runtime.h"
#include "nccl.h"
#include "mpi.h"
#include <unistd.h>
#include <stdint.h>
#define MPICHECK(cmd) do { \
int e = cmd; \
if( e != MPI_SUCCESS ) { \
printf("Failed: MPI error %s:%d '%d'\n", \
__FILE__,__LINE__, e); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define CUDACHECK(cmd) do { \
cudaError_t e = cmd; \
if( e != cudaSuccess ) { \
printf("Failed: Cuda error %s:%d '%s'\n", \
__FILE__,__LINE__,cudaGetErrorString(e)); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define NCCLCHECK(cmd) do { \
ncclResult_t r = cmd; \
if (r!= ncclSuccess) { \
printf("Failed, NCCL error %s:%d '%s'\n", \
__FILE__,__LINE__,ncclGetErrorString(r)); \
exit(EXIT_FAILURE); \
} \
} while(0)
static uint64_t getHostHash(const char* string) {
// Based on DJB2a, result = result * 33 ^ char
uint64_t result = 5381;
for (int c = 0; string[c] != '\0'; c++){
result = ((result << 5) + result) ^ string[c];
}
return result;
}
static void getHostName(char* hostname, int maxlen) {
gethostname(hostname, maxlen);
for (int i=0; i< maxlen; i++) {
if (hostname[i] == '.') {
hostname[i] = '\0';
return;
}
}
}
int main(int argc, char* argv[])
{
int size = 32*1024*1024;
int myRank, nRanks, localRank = 0;
//initializing MPI
MPICHECK(MPI_Init(&argc, &argv));
MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));
MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));
//calculating localRank which is used in selecting a GPU
uint64_t hostHashs[nRanks];
char hostname[1024];
getHostName(hostname, 1024);
hostHashs[myRank] = getHostHash(hostname);
MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD));
for (int p=0; p<nRanks; p++) {
if (p == myRank) break;
if (hostHashs[p] == hostHashs[myRank]) localRank++;
}
//each process is using two GPUs
int nDev = 2;
float** sendbuff = (float**)malloc(nDev * sizeof(float*));
float** recvbuff = (float**)malloc(nDev * sizeof(float*));
cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev);
//picking GPUs based on localRank
for (int i = 0; i < nDev; ++i) {
CUDACHECK(cudaSetDevice(localRank*nDev + i));
CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof(float)));
CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof(float)));
CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float)));
CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float)));
CUDACHECK(cudaStreamCreate(s+i));
}
ncclUniqueId id;
ncclComm_t comms[nDev];
//generating NCCL unique ID at one process and broadcasting it to all
if (myRank == 0) ncclGetUniqueId(&id);
MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD));
//initializing NCCL, group API is required around ncclCommInitRank as it is
//called across multiple GPUs in each thread/process
NCCLCHECK(ncclGroupStart());
for (int i=0; i<nDev; i++) {
CUDACHECK(cudaSetDevice(localRank*nDev + i));
NCCLCHECK(ncclCommInitRank(comms+i, nRanks*nDev, id, myRank*nDev + i));
}
NCCLCHECK(ncclGroupEnd());
//calling NCCL communication API. Group API is required when using
//multiple devices per thread/process
NCCLCHECK(ncclGroupStart());
for (int i=0; i<nDev; i++)
NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum,
comms[i], s[i]));
NCCLCHECK(ncclGroupEnd());
//synchronizing on CUDA stream to complete NCCL communication
for (int i=0; i<nDev; i++)
CUDACHECK(cudaStreamSynchronize(s[i]));
//freeing device memory
for (int i=0; i<nDev; i++) {
CUDACHECK(cudaFree(sendbuff[i]));
CUDACHECK(cudaFree(recvbuff[i]));
}
//finalizing NCCL
for (int i=0; i<nDev; i++) {
ncclCommDestroy(comms[i]);
}
//finalizing MPI
MPICHECK(MPI_Finalize());
printf("[MPI Rank %d] Success \n", myRank);
return 0;
}
在上邊的示例中,rank0會(huì)執(zhí)行ncclGetUniqueId獲取Id,然后通過mpi廣播給其他rank,接下來看下UniqueId是怎么產(chǎn)生的。
ncclResult_t ncclGetUniqueId(ncclUniqueId* out) {
NCCLCHECK(ncclInit());
NCCLCHECK(PtrCheck(out, "GetUniqueId", "out"));
return bootstrapGetUniqueId(out);
}
然后看下ncclInit
首先執(zhí)行initEnv,設(shè)置環(huán)境變量
然后執(zhí)行initNet,用來初始化nccl所需要的網(wǎng)絡(luò),包括兩個(gè),一個(gè)是bootstrap網(wǎng)絡(luò),另外一個(gè)是數(shù)據(jù)通信網(wǎng)絡(luò),bootstrap網(wǎng)絡(luò)主要用于初始化時(shí)交換一些簡(jiǎn)單的信息,比如每個(gè)機(jī)器的ip端口,由于數(shù)據(jù)量很小,而且主要是在初始化階段執(zhí)行一次,因此bootstrap使用的是tcp;而通信網(wǎng)絡(luò)是用于實(shí)際數(shù)據(jù)的傳輸,因此會(huì)優(yōu)先使用rdma(支持gdr的話會(huì)優(yōu)先使用gdr)。
ncclResult_t initNet() {
// Always initialize bootstrap network
NCCLCHECK(bootstrapNetInit());
NCCLCHECK(initNetPlugin(&ncclNet, &ncclCollNet));
if (ncclNet != NULL) return ncclSuccess;
if (initNet(&ncclNetIb) == ncclSuccess) {
ncclNet = &ncclNetIb;
} else {
NCCLCHECK(initNet(&ncclNetSocket));
ncclNet = &ncclNetSocket;
}
return ncclSuccess;
}
bootstrapNetInit就是bootstrap網(wǎng)絡(luò)的初始化,主要就是通過findInterfaces遍歷機(jī)器上所有的網(wǎng)卡信息,通過prefixList匹配選擇使用哪些網(wǎng)卡,將可用網(wǎng)卡的信息保存下來,將ifa_name保存到全局的bootstrapNetIfNames,ip地址保存到全局bootstrapNetIfAddrs,默認(rèn)除了docker和lo其他的網(wǎng)卡都可以使用。
例如在測(cè)試機(jī)器上有三張網(wǎng)卡,分別是xgbe0、xgbe1、xgbe2,那么就會(huì)把這三個(gè)ifaname和對(duì)應(yīng)的ip地址保存下來,另外nccl提供了環(huán)境變量NCCL_SOCKET_IFNAME可以用來指定想用的網(wǎng)卡名,例如通過export NCCL_SOCKET_IFNAME=xgbe0來指定使用xgbe0,其實(shí)就是通過prefixList來匹配做到的。
static int findInterfaces(const char* prefixList, char* names, union socketAddress *addrs, int sock_family, int maxIfNameSize, int maxIfs) {
struct netIf userIfs[MAX_IFS];
bool searchNot = prefixList && prefixList[0] == '^';
if (searchNot) prefixList++;
bool searchExact = prefixList && prefixList[0] == '=';
if (searchExact) prefixList++;
int nUserIfs = parseStringList(prefixList, userIfs, MAX_IFS);
int found = 0;
struct ifaddrs *interfaces, *interface;
getifaddrs(&interfaces);
for (interface = interfaces; interface && found < maxIfs; interface = interface->ifa_next) {
if (interface->ifa_addr == NULL) continue;
int family = interface->ifa_addr->sa_family;
if (family != AF_INET && family != AF_INET6)
continue;
if (sock_family != -1 && family != sock_family)
continue;
if (family == AF_INET6) {
struct sockaddr_in6* sa = (struct sockaddr_in6*)(interface->ifa_addr);
if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue;
}
if (!(matchIfList(interface->ifa_name, -1, userIfs, nUserIfs, searchExact) ^ searchNot)) {
continue;
}
bool duplicate = false;
for (int i = 0; i < found; i++) {
if (strcmp(interface->ifa_name, names+i*maxIfNameSize) == 0) { duplicate = true; break; }
}
if (!duplicate) {
strncpy(names+found*maxIfNameSize, interface->ifa_name, maxIfNameSize);
int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);
memcpy(addrs+found, interface->ifa_addr, salen);
found++;
}
}
freeifaddrs(interfaces);
return found;
}
開始初始化通信網(wǎng)絡(luò)
ncclNet_t結(jié)構(gòu)體是一系列的函數(shù)指針,比如初始化,發(fā)送,接收等;socket,IB等通信方式都實(shí)現(xiàn)了自己的ncclNet_t,如ncclNetSocket,ncclNetIb,初始化通信網(wǎng)絡(luò)的過程就是依次看哪個(gè)通信模式可用,然后賦值給全局的ncclNet。
首先執(zhí)行initNetPlugin,查看是否有l(wèi)ibnccl-net.so,測(cè)試環(huán)境沒有這個(gè)so,所以直接返回。
然后嘗試使用IB網(wǎng)絡(luò):
首先執(zhí)行ncclNetIb的init函數(shù),就是ncclIbInit
ncclResult_t ncclIbInit(ncclDebugLogger_t logFunction) {
static int shownIbHcaEnv = 0;
if(wrap_ibv_symbols() != ncclSuccess) { return ncclInternalError; }
if (ncclParamIbDisable()) return ncclInternalError;
if (ncclNIbDevs == -1) {
pthread_mutex_lock(&ncclIbLock);
wrap_ibv_fork_init();
if (ncclNIbDevs == -1) {
ncclNIbDevs = 0;
if (findInterfaces(ncclIbIfName, &ncclIbIfAddr, MAX_IF_NAME_SIZE, 1) != 1) {
WARN("NET/IB : No IP interface found.");
return ncclInternalError;
}
// Detect IB cards
int nIbDevs;
struct ibv_device** devices;
// Check if user defined which IB device:port to use
char* userIbEnv = getenv("NCCL_IB_HCA");
if (userIbEnv != NULL && shownIbHcaEnv++ == 0) INFO(NCCL_NET|NCCL_ENV, "NCCL_IB_HCA set to %s", userIbEnv);
struct netIf userIfs[MAX_IB_DEVS];
bool searchNot = userIbEnv && userIbEnv[0] == '^';
if (searchNot) userIbEnv++;
bool searchExact = userIbEnv && userIbEnv[0] == '=';
if (searchExact) userIbEnv++;
int nUserIfs = parseStringList(userIbEnv, userIfs, MAX_IB_DEVS);
if (ncclSuccess != wrap_ibv_get_device_list(&devices, &nIbDevs)) return ncclInternalError;
for (int d=0; d<nIbDevs && ncclNIbDevs<MAX_IB_DEVS; d++) {
struct ibv_context * context;
if (ncclSuccess != wrap_ibv_open_device(&context, devices[d]) || context == NULL) {
WARN("NET/IB : Unable to open device %s", devices[d]->name);
continue;
}
int nPorts = 0;
struct ibv_device_attr devAttr;
memset(&devAttr, 0, sizeof(devAttr));
if (ncclSuccess != wrap_ibv_query_device(context, &devAttr)) {
WARN("NET/IB : Unable to query device %s", devices[d]->name);
if (ncclSuccess != wrap_ibv_close_device(context)) { return ncclInternalError; }
continue;
}
for (int port = 1; port <= devAttr.phys_port_cnt; port++) {
struct ibv_port_attr portAttr;
if (ncclSuccess != wrap_ibv_query_port(context, port, &portAttr)) {
WARN("NET/IB : Unable to query port %d", port);
continue;
}
if (portAttr.state != IBV_PORT_ACTIVE) continue;
if (portAttr.link_layer != IBV_LINK_LAYER_INFINIBAND
&& portAttr.link_layer != IBV_LINK_LAYER_ETHERNET) continue;
// check against user specified HCAs/ports
if (! (matchIfList(devices[d]->name, port, userIfs, nUserIfs, searchExact) ^ searchNot)) {
continue;
}
TRACE(NCCL_INIT|NCCL_NET,"NET/IB: [%d] %s:%d/%s ", d, devices[d]->name, port,
portAttr.link_layer == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE");
ncclIbDevs[ncclNIbDevs].device = d;
ncclIbDevs[ncclNIbDevs].guid = devAttr.sys_image_guid;
ncclIbDevs[ncclNIbDevs].port = port;
ncclIbDevs[ncclNIbDevs].link = portAttr.link_layer;
ncclIbDevs[ncclNIbDevs].speed = ncclIbSpeed(portAttr.active_speed) * ncclIbWidth(portAttr.active_width);
ncclIbDevs[ncclNIbDevs].context = context;
strncpy(ncclIbDevs[ncclNIbDevs].devName, devices[d]->name, MAXNAMESIZE);
NCCLCHECK(ncclIbGetPciPath(ncclIbDevs[ncclNIbDevs].devName, &ncclIbDevs[ncclNIbDevs].pciPath, &ncclIbDevs[ncclNIbDevs].realPort));
ncclIbDevs[ncclNIbDevs].maxQp = devAttr.max_qp;
ncclNIbDevs++;
nPorts++;
pthread_create(&ncclIbAsyncThread, NULL, ncclIbAsyncThreadMain, context);
}
if (nPorts == 0 && ncclSuccess != wrap_ibv_close_device(context)) { return ncclInternalError; }
}
if (nIbDevs && (ncclSuccess != wrap_ibv_free_device_list(devices))) { return ncclInternalError; };
}
if (ncclNIbDevs == 0) {
INFO(NCCL_INIT|NCCL_NET, "NET/IB : No device found.");
} else {
char line[1024];
line[0] = '\0';
for (int d=0; d<ncclNIbDevs; d++) {
snprintf(line+strlen(line), 1023-strlen(line), " [%d]%s:%d/%s", d, ncclIbDevs[d].devName,
ncclIbDevs[d].port, ncclIbDevs[d].link == IBV_LINK_LAYER_INFINIBAND ? "IB" : "RoCE");
}
line[1023] = '\0';
char addrline[1024];
INFO(NCCL_INIT|NCCL_NET, "NET/IB : Using%s ; OOB %s:%s", line, ncclIbIfName, socketToString(&ncclIbIfAddr.sa, addrline));
}
pthread_mutex_unlock(&ncclIbLock);
}
return ncclSuccess;
}
首先第三行通過wrap_ibv_symbols加載動(dòng)態(tài)庫libibverbs.so,然后獲取動(dòng)態(tài)庫的各個(gè)函數(shù)。
然后通過wrap_ibv_fork_init避免fork引起rdma網(wǎng)卡讀寫出錯(cuò)。
后面會(huì)講到ib網(wǎng)絡(luò)也會(huì)用到socket進(jìn)行帶外網(wǎng)絡(luò)的傳輸,所以這里也通過findInterfaces獲取一個(gè)可用的網(wǎng)卡保存到ncclIbIfAddr。
通過ibv_get_device_list獲取所有rdma設(shè)備到devices中,遍歷devices的每個(gè)device,因?yàn)槊總€(gè)HCA可能有多個(gè)物理port,所以對(duì)每個(gè)device遍歷每一個(gè)物理port,獲取每個(gè)port的信息。
然后將相關(guān)信息保存到全局的ncclIbDevs中,比如是哪個(gè)device的哪個(gè)port,使用的是IB還是ROCE,device的pci路徑,maxqp,device的name等,注意這里也有類似bootstrap網(wǎng)絡(luò)NCCL_SOCKET_IFNAME的環(huán)境變量,叫NCCL_IB_HCA,可以指定使用哪個(gè)IB HCA。
到這里整個(gè)初始化的過程就完成了,一句話總結(jié)就是,獲取了當(dāng)前機(jī)器上所有可用的IB網(wǎng)卡和普通以太網(wǎng)卡之后保存下來。
然后開始生成UniqueId
ncclResult_t bootstrapCreateRoot(ncclUniqueId* id, bool idFromEnv) {
ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id;
void* listenComm;
NCCLCHECK(bootstrapNetListen(idFromEnv ? dontCareIf : 0, netHandle, &listenComm));
pthread_t thread;
pthread_create(&thread, NULL, bootstrapRoot, listenComm);
return ncclSuccess;
}
ncclNetHandle_t也是一個(gè)字符數(shù)組,然后執(zhí)行bootstrapNetListen。
static ncclResult_t bootstrapNetListen(int dev, ncclNetHandle_t* netHandle, void** listenComm) {
union socketAddress* connectAddr = (union socketAddress*) netHandle;
static_assert(sizeof(union socketAddress) < NCCL_NET_HANDLE_MAXSIZE, "union socketAddress size is too large");
// if dev >= 0, listen based on dev
if (dev >= 0) {
NCCLCHECK(bootstrapNetGetSocketAddr(dev, connectAddr));
} else if (dev == findSubnetIf) {
...
} // Otherwise, handle stores a local address
struct bootstrapNetComm* comm;
NCCLCHECK(bootstrapNetNewComm(&comm));
NCCLCHECK(createListenSocket(&comm->fd, connectAddr));
*listenComm = comm;
return ncclSuccess;
}
依次看下這三個(gè)函數(shù),通過bootstrapNetGetSocketAddr獲取一個(gè)可用的ip地址。
static ncclResult_t bootstrapNetGetSocketAddr(int dev, union socketAddress* addr) {
if (dev >= bootstrapNetIfs) return ncclInternalError;
memcpy(addr, bootstrapNetIfAddrs+dev, sizeof(*addr));
return ncclSuccess;
}
此時(shí)dev是0, bootstrapNetIfs是初始化bootstrap網(wǎng)絡(luò)的時(shí)候一共找到了幾個(gè)可用的網(wǎng)卡,這里就是獲取了第0個(gè)可用的ip地址。
然后通過bootstrapNetNewComm創(chuàng)建bootstrapNetComm,bootstrapNetComm其實(shí)就是fd,bootstrapNetNewComm其實(shí)就是new了一個(gè)bootstrapNetComm。
struct bootstrapNetComm {
int fd;
};
通過createListenSocket啟動(dòng)socker server
static ncclResult_t createListenSocket(int *fd, union socketAddress *localAddr) {
/* IPv4/IPv6 support */
int family = localAddr->sa.sa_family;
int salen = (family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);
/* Create socket and bind it to a port */
int sockfd = socket(family, SOCK_STREAM, 0);
if (sockfd == -1) {
WARN("Net : Socket creation failed : %s", strerror(errno));
return ncclSystemError;
}
if (socketToPort(&localAddr->sa)) {
// Port is forced by env. Make sure we get the port.
int opt = 1;
#if defined(SO_REUSEPORT)
SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)), "setsockopt");
#else
SYSCHECK(setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)), "setsockopt");
#endif
}
// localAddr port should be 0 (Any port)
SYSCHECK(bind(sockfd, &localAddr->sa, salen), "bind");
/* Get the assigned Port */
socklen_t size = salen;
SYSCHECK(getsockname(sockfd, &localAddr->sa, &size), "getsockname");
#ifdef ENABLE_TRACE
char line[1024];
TRACE(NCCL_INIT|NCCL_NET,"Listening on socket %s", socketToString(&localAddr->sa, line));
#endif
/* Put the socket in listen mode
* NB: The backlog will be silently truncated to the value in /proc/sys/net/core/somaxconn
*/
SYSCHECK(listen(sockfd, 16384), "listen");
*fd = sockfd;
return ncclSuccess;
}
創(chuàng)建監(jiān)聽fd,ip由localaddr指定,初始端口為0,bind時(shí)隨機(jī)找一個(gè)可用端口,并通過getsockname(sockfd, &localAddr->sa, &size)將ip端口寫回到localaddr,這里localaddr就是UniqueId。
到這里UniqueId也就產(chǎn)生了,其實(shí)就是當(dāng)前機(jī)器的ip和port。
歡迎 Star、試用 OneFlow 最新版本:github.com/Oneflow-Inc…
以上就是NCCL源碼解析之初始化及ncclUniqueId的產(chǎn)生詳解的詳細(xì)內(nèi)容,更多關(guān)于NCCL初始化ncclUniqueId產(chǎn)生的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
解決pandas .to_excel不覆蓋已有sheet的問題
今天小編就為大家分享一篇解決pandas .to_excel不覆蓋已有sheet的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-12-12
Pandas數(shù)據(jù)操作及數(shù)據(jù)分析常用技術(shù)介紹
Pandas是Python中用于數(shù)據(jù)處理和數(shù)據(jù)分析的庫,具有強(qiáng)大的數(shù)據(jù)操作和分析功能,包括數(shù)據(jù)清洗、轉(zhuǎn)換、篩選、聚合等。常用技術(shù)有數(shù)據(jù)讀取與寫入、數(shù)據(jù)索引、數(shù)據(jù)切片、數(shù)據(jù)合并、數(shù)據(jù)透視表、數(shù)據(jù)可視化等,適用于各種數(shù)據(jù)分析和機(jī)器學(xué)習(xí)任務(wù)2023-04-04
Python在報(bào)表自動(dòng)化的優(yōu)勢(shì)及實(shí)現(xiàn)流程
本文利用Python實(shí)現(xiàn)報(bào)表自動(dòng)化,通過介紹環(huán)境設(shè)置、數(shù)據(jù)收集和準(zhǔn)備、報(bào)表生成以及自動(dòng)化流程,展示Python的靈活性和豐富的生態(tài)系統(tǒng)在報(bào)表自動(dòng)化中的卓越表現(xiàn),從設(shè)置虛擬環(huán)境到使用Pandas和Matplotlib處理數(shù)據(jù),到借助APScheduler實(shí)現(xiàn)定期自動(dòng)化,每個(gè)步驟都得到詳盡闡述2023-12-12
Python二進(jìn)制文件轉(zhuǎn)換為文本文件的代碼實(shí)現(xiàn)
在日常編程中,我們經(jīng)常會(huì)遇到需要將二進(jìn)制文件轉(zhuǎn)換為文本文件的情況,在Python中,我們可以利用各種庫和技術(shù)來完成這項(xiàng)任務(wù),本文將介紹如何使用Python將二進(jìn)制文件轉(zhuǎn)換為文本文件,并提供實(shí)用的代碼示例,需要的朋友可以參考下2024-04-04
python十進(jìn)制轉(zhuǎn)二進(jìn)制的詳解
在本篇文章里小編給大家整理了關(guān)于python十進(jìn)制轉(zhuǎn)二進(jìn)制的相關(guān)知識(shí)點(diǎn)內(nèi)容,需要的朋友們可以參考學(xué)習(xí)下。2020-02-02

