加入星計(jì)劃,您可以享受以下權(quán)益:

  • 創(chuàng)作內(nèi)容快速變現(xiàn)
  • 行業(yè)影響力擴(kuò)散
  • 作品版權(quán)保護(hù)
  • 300W+ 專(zhuān)業(yè)用戶(hù)
  • 1.5W+ 優(yōu)質(zhì)創(chuàng)作者
  • 5000+ 長(zhǎng)期合作伙伴
立即加入
  • 正文
    • 1. 多進(jìn)程并發(fā)服務(wù)器
    • 2. 多進(jìn)程并發(fā)服務(wù)器代碼實(shí)現(xiàn)
    • 3. 多線(xiàn)程并發(fā)服務(wù)器
    • 4. 多線(xiàn)程并發(fā)服務(wù)器代碼實(shí)現(xiàn)
    • 5. 擴(kuò)展:Socket API封裝
  • 相關(guān)推薦
  • 電子產(chǎn)業(yè)圖譜
申請(qǐng)入駐 產(chǎn)業(yè)圖譜

TCP并發(fā)服務(wù)器(多進(jìn)程與多線(xiàn)程)

10/10 14:40
691
閱讀需 29 分鐘
加入交流群
掃碼加入
獲取工程師必備禮包
參與熱點(diǎn)資訊討論

1. 多進(jìn)程并發(fā)服務(wù)器

我們?cè)谏弦还?jié)寫(xiě)的TCP服務(wù)器只能處理單連接,在代碼實(shí)現(xiàn)時(shí),多進(jìn)程并發(fā)服務(wù)器與非并發(fā)服務(wù)器在創(chuàng)建監(jiān)聽(tīng)套接字、綁定、監(jiān)聽(tīng)這幾個(gè)步驟是一樣的,但是在接收連接請(qǐng)求的時(shí)候,多進(jìn)程并發(fā)服務(wù)器是這樣實(shí)現(xiàn)的:父進(jìn)程負(fù)責(zé)接受連接請(qǐng)求,一旦連接成功,將會(huì)創(chuàng)建一個(gè)子進(jìn)程與客戶(hù)端通信。示意圖如下:

(1)什么是并發(fā)

單核CPU → 多進(jìn)程/線(xiàn)程并發(fā) → 時(shí)間片輪轉(zhuǎn)

并發(fā) → 某一個(gè)時(shí)間片/點(diǎn)所能處理的任務(wù)數(shù)

服務(wù)器并發(fā):服務(wù)器在某個(gè)時(shí)間點(diǎn)/片所能處理的連接數(shù)所能接收的client連接越多,并發(fā)量越大

(2)多進(jìn)程并發(fā)服務(wù)器需要注意的幾個(gè)要點(diǎn)

使用多進(jìn)程的方式來(lái)解決服務(wù)器處理多連接的問(wèn)題,需要注意下面幾點(diǎn):

共享:讀時(shí)共享、寫(xiě)時(shí)復(fù)制。有血緣關(guān)系的進(jìn)程間將會(huì)共享

文件描述符

內(nèi)存映射區(qū)mmap

父進(jìn)程扮演什么角色?

等待接受客戶(hù)端連接accept()

有連接的時(shí)候通過(guò)fork()創(chuàng)建一個(gè)子進(jìn)程。父進(jìn)程只負(fù)責(zé)等待客戶(hù)端連接,即通過(guò)accept()阻塞等待連接請(qǐng)求,一旦有連接請(qǐng)求,馬上通過(guò)fork()創(chuàng)建一個(gè)子進(jìn)程,子進(jìn)程通過(guò)共享父進(jìn)程的文件描述符來(lái)實(shí)現(xiàn)和client通信。

將用于通信的文件描述符關(guān)閉。accept()接受連接請(qǐng)求后會(huì)返回一個(gè)用于通信的文件描述符,而父進(jìn)程的職責(zé)是等待連接并fork()創(chuàng)建用于通信的子進(jìn)程,所以對(duì)于父進(jìn)程來(lái)說(shuō),用于通信的文件描述符是沒(méi)有用處的,關(guān)閉該文件描述符來(lái)節(jié)省開(kāi)銷(xiāo)。我們知道,文件描述符是有上限的,最多1024個(gè)(0-1023),如果不關(guān)閉的話(huà),每次fork()一個(gè)子進(jìn)程都要浪費(fèi)一個(gè)文件描述符,如果進(jìn)程多了,可能文件描述符就不夠用了。

子進(jìn)程扮演什么角色?

通信。通過(guò)共享的父進(jìn)程accept()返回的文件描述符來(lái)與客戶(hù)端通信。

將用于監(jiān)聽(tīng)的文件描述符關(guān)閉。同樣是為了節(jié)省資源,子進(jìn)程被fork()出來(lái)后也會(huì)擁有一個(gè)用于監(jiān)聽(tīng)的文件描述符(因?yàn)樽舆M(jìn)程是對(duì)父進(jìn)程的拷貝),但是子進(jìn)程的作用是與客戶(hù)端通信,所以用于監(jiān)聽(tīng)的文件描述符對(duì)子進(jìn)程而言并無(wú)用處,關(guān)閉以節(jié)省資源。

創(chuàng)建的子進(jìn)程個(gè)數(shù)有限制嗎?

硬件限制

文件描述符默認(rèn)上限1024

子進(jìn)程資源回收

wait/waitpid

使用信號(hào)回收

signal

sigaction

捕捉信號(hào)SIGCHLD

(3)讀時(shí)共享寫(xiě)時(shí)復(fù)制詳解

首先看圖

如果父子進(jìn)程都只是讀數(shù)據(jù),那么他們都通過(guò)虛擬地址去訪(fǎng)問(wèn)1號(hào)物理地址的內(nèi)容,如果此時(shí)父進(jìn)程修改了數(shù)據(jù)a=8,那么父進(jìn)程會(huì)先復(fù)制一份數(shù)據(jù)到2號(hào)內(nèi)存,然后修改2號(hào)內(nèi)存的數(shù)據(jù),父進(jìn)程再讀的時(shí)候就去2號(hào)內(nèi)存讀,而子進(jìn)程依然去1號(hào)內(nèi)存讀。如果子進(jìn)程也要修改這個(gè)全局變量,那么子進(jìn)程也會(huì)拷貝一份數(shù)據(jù)到內(nèi)存3,然后修改內(nèi)存3的數(shù)據(jù),子進(jìn)程訪(fǎng)問(wèn)數(shù)據(jù)時(shí)會(huì)訪(fǎng)問(wèn)內(nèi)存3的數(shù)據(jù)。(多個(gè)子進(jìn)程就會(huì)拷貝多份)

2. 多進(jìn)程并發(fā)服務(wù)器代碼實(shí)現(xiàn)

#include <stdio.h>#include <unistd.h>#include <stdlib.h>#include <sys/types.h>#include <string.h>#include <sys/socket.h>#include <arpa/inet.h>#include <ctype.h>#include <signal.h>#include <sys/wait.h>#include <errno.h>
// 進(jìn)程回收函數(shù)void recyle(int num){    pid_t pid;    while( (pid = waitpid(-1, NULL, WNOHANG)) > 0 )    {        printf("child died , pid = %dn", pid);    }}
int main(int argc, const char* argv[]){    if(argc < 2)    {        printf("eg: ./a.out portn");        exit(1);    }    struct sockaddr_in serv_addr;    socklen_t serv_len = sizeof(serv_addr);    int port = atoi(argv[1]);
    // 創(chuàng)建套接字    int lfd = socket(AF_INET, SOCK_STREAM, 0);    // 初始化服務(wù)器 sockaddr_in     memset(&serv_addr, 0, serv_len);    serv_addr.sin_family = AF_INET;                   // 地址族     serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);    // 監(jiān)聽(tīng)本機(jī)所有的IP    serv_addr.sin_port = htons(port);            // 設(shè)置端口     // 綁定IP和端口    bind(lfd, (struct sockaddr*)&serv_addr, serv_len);
    // 設(shè)置同時(shí)監(jiān)聽(tīng)的最大個(gè)數(shù)    listen(lfd, 36);    printf("Start accept ......n");
    // 使用信號(hào)回收子進(jìn)程pcb //這個(gè)子進(jìn)程回收機(jī)制會(huì)被子進(jìn)程復(fù)制    struct sigaction act;    act.sa_handler = recyle;    act.sa_flags = 0;    sigemptyset(&act.sa_mask);    sigaction(SIGCHLD, &act, NULL);
    struct sockaddr_in client_addr;    socklen_t cli_len = sizeof(client_addr);    while(1)    {        // 父進(jìn)程接收連接請(qǐng)求        // accept阻塞的時(shí)候被信號(hào)中斷, 處理信號(hào)對(duì)應(yīng)的操作之后(比如子進(jìn)程終止,收到信號(hào)后去回收子進(jìn)程)        // 回來(lái)之后不阻塞了, 直接返回-1, 這時(shí)候 errno==EINTR        int cfd = accept(lfd, (struct sockaddr*)&client_addr, &cli_len);        //解決方法就是,在一個(gè)循環(huán)中判斷,如果accept阻塞過(guò)程中被信號(hào)打斷        //也就是返回值-1且errno == EINTR,那么再一次調(diào)用accept        //這樣accept會(huì)再次回到阻塞狀態(tài),并且返回值不是-1,也就不會(huì)進(jìn)入循環(huán)        //等到再次被信號(hào)打斷的時(shí)候才會(huì)再次進(jìn)入循環(huán)        /*這里的cfd雖然只定義了一個(gè),但是在每個(gè)子進(jìn)程中都會(huì)有一個(gè)拷貝,并且修改一個(gè)子進(jìn)程的cfd不會(huì)影響其它子進(jìn)程*/        while(cfd == -1 && errno == EINTR)        {            cfd = accept(lfd, (struct sockaddr*)&client_addr, &cli_len);        }        printf("connect sucessfuln");        // 創(chuàng)建子進(jìn)程        pid_t pid = fork();        if(pid == 0)        {            close(lfd);            // child process            // 通信            char ip[64];            while(1)            {                // client ip port                printf("client IP: %s, port: %dn",                        inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, ip, sizeof(ip)),                       ntohs(client_addr.sin_port));                char buf[1024];                int len = read(cfd, buf, sizeof(buf));                if(len == -1)                {                    perror("read error");                    exit(1);                }                else if(len == 0)                {                    printf("客戶(hù)端斷開(kāi)了連接n");                    close(cfd);                    break;                }                else                {                    printf("recv buf: %sn", buf);                    write(cfd, buf, len);                }            }            // 干掉子進(jìn)程            return 0;
        }        else if(pid > 0)        {            // parent process            close(cfd);        }    }
    close(lfd);    return 0;}

3. 多線(xiàn)程并發(fā)服務(wù)器

多線(xiàn)程并發(fā)服務(wù)器示意圖如下:

在多進(jìn)程模型中,fork得到的子進(jìn)程會(huì)復(fù)制父進(jìn)程的文件描述符cfd等信息,每個(gè)進(jìn)程的cfd都是自己的,操作互不影響。但是線(xiàn)程不同,現(xiàn)在只有主線(xiàn)程的cfd,多個(gè)線(xiàn)程間的信息是共享的,假如說(shuō)傳遞給每個(gè)子線(xiàn)程的cfd都是同一個(gè),那么線(xiàn)程1修改該文件描述符指向的內(nèi)容會(huì)影響到線(xiàn)程2的通信,因?yàn)樗鼈児蚕磉@一個(gè)文件描述符。所以這里需要建立一個(gè)文件描述符數(shù)組,每個(gè)子線(xiàn)程對(duì)應(yīng)數(shù)組中的一個(gè)文件描述符。

另外連接主線(xiàn)程的client是哪一個(gè),也就是說(shuō)哪個(gè)client對(duì)應(yīng)和哪個(gè)子線(xiàn)程通信,這也需要把和子線(xiàn)程通信的client的ip和port傳給和該client通信的子線(xiàn)程,這樣子線(xiàn)程才能知道通信的客戶(hù)端的ip和port。

于是我們需要?jiǎng)?chuàng)建一個(gè)結(jié)構(gòu)體數(shù)組,每個(gè)子線(xiàn)程對(duì)應(yīng)結(jié)構(gòu)體數(shù)組中的一個(gè)成員,而結(jié)構(gòu)體數(shù)組中的每個(gè)成員將作為參數(shù)傳遞給子進(jìn)程的回調(diào)函數(shù)。

歸根到底就是因?yàn)?,進(jìn)程是獨(dú)立的,線(xiàn)程是共享的。

線(xiàn)程共享下面的資源:

全局?jǐn)?shù)據(jù)區(qū)

堆區(qū)

一塊有效內(nèi)存的地址,比如說(shuō)把線(xiàn)程1的一塊內(nèi)存的地址傳給線(xiàn)程2,那么線(xiàn)程2也可以操作這塊內(nèi)存。

4. 多線(xiàn)程并發(fā)服務(wù)器代碼實(shí)現(xiàn)

#include <stdio.h>#include <unistd.h>#include <stdlib.h>#include <sys/types.h>#include <string.h>#include <sys/socket.h>#include <arpa/inet.h>#include <ctype.h>#include <pthread.h>
// 自定義數(shù)據(jù)結(jié)構(gòu) //把線(xiàn)程處理函數(shù)所需要的信息封裝進(jìn)來(lái)typedef struct SockInfo{    int fd; // 文件描述符    struct sockaddr_in addr; //ip地址結(jié)構(gòu)體    pthread_t id; //線(xiàn)程id}SockInfo;
// 子線(xiàn)程處理函數(shù)void* worker(void* arg){    char ip[64];    char buf[1024];    SockInfo* info = (SockInfo*)arg;    // 通信    while(1)    {        printf("Client IP: %s, port: %dn",               inet_ntop(AF_INET, &info->addr.sin_addr.s_addr, ip, sizeof(ip)),               ntohs(info->addr.sin_port));        int len = read(info->fd, buf, sizeof(buf));        if(len == -1)        {            perror("read error");            pthread_exit(NULL); //只退出子線(xiàn)程        //exit(1); //exit會(huì)把主線(xiàn)程也一塊退出        }        else if(len == 0)        {            printf("客戶(hù)端已經(jīng)斷開(kāi)了連接n");            close(info->fd);            break;        }        else        {            printf("recv buf: %sn", buf);            write(info->fd, buf, len);        }    }    return NULL;}
int main(int argc, const char* argv[]){    if(argc < 2)    {        printf("eg: ./a.out portn");        exit(1);    }    struct sockaddr_in serv_addr;    socklen_t serv_len = sizeof(serv_addr);    int port = atoi(argv[1]);
    // 創(chuàng)建套接字    int lfd = socket(AF_INET, SOCK_STREAM, 0);    // 初始化服務(wù)器 sockaddr_in     memset(&serv_addr, 0, serv_len);    serv_addr.sin_family = AF_INET;                   // 地址族     serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);    // 監(jiān)聽(tīng)本機(jī)所有的IP    serv_addr.sin_port = htons(port);            // 設(shè)置端口     // 綁定IP和端口    bind(lfd, (struct sockaddr*)&serv_addr, serv_len);
    // 設(shè)置同時(shí)監(jiān)聽(tīng)的最大個(gè)數(shù)    listen(lfd, 36);    printf("Start accept ......n");
    int i = 0;    SockInfo info[256]; //每個(gè)線(xiàn)程對(duì)應(yīng)數(shù)組的一個(gè)元素,最多256個(gè)線(xiàn)程    // 規(guī)定 fd == -1  說(shuō)明這是一個(gè)無(wú)效文件描述符,也就是說(shuō)這個(gè)文件描述符是空閑的,沒(méi)被占用    for(i=0; i<sizeof(info)/sizeof(info[0]); ++i)    {        info[i].fd = -1; //所有文件描述符全部初始化為-1    }
    socklen_t cli_len = sizeof(struct sockaddr_in);    while(1)    {        // 選一個(gè)沒(méi)有被使用的, 最小的數(shù)組元素        //因?yàn)橛锌赡芪覀兪褂玫奈募枋龇麑?duì)應(yīng)數(shù)組下標(biāo)i已經(jīng)累加到了100,但是前面        //99個(gè)都已經(jīng)被釋放了(斷開(kāi)連接了),我們最好選用一個(gè)當(dāng)前空閑的數(shù)組下標(biāo)最小        //的文件描述符,以合理利用資源        for(i=0; i<256; ++i)        {            if(info[i].fd == -1)            {                break; //這樣就能把數(shù)組下標(biāo)最小的fd找出來(lái),并確保i指向它,直接break出去            }        }        if(i == 256) //整個(gè)數(shù)組都被用完了,直接break出while循環(huán)        {            break;        }        // 主線(xiàn)程 - 等待接受連接請(qǐng)求        info[i].fd = accept(lfd, (struct sockaddr*)&info[i].addr, &cli_len); //第二個(gè)參數(shù)是傳出參數(shù),        //傳出客戶(hù)端ip信息(struct sockaddr*)類(lèi)型
        // 創(chuàng)建子線(xiàn)程 - 通信        pthread_create(&info[i].id, NULL, worker, &info[i]);        // 設(shè)置線(xiàn)程分離 //這樣子線(xiàn)程終止的時(shí)候會(huì)自動(dòng)釋放,就不需要主線(xiàn)程去釋放了        pthread_detach(info[i].id);    }
    close(lfd);
    // 只退出主線(xiàn)程 //對(duì)子線(xiàn)程無(wú)影響,子線(xiàn)程可以繼續(xù)通信    pthread_exit(NULL);    return 0;}

5. 擴(kuò)展:Socket API封裝

#include <stdlib.h>#include <stdio.h>#include <unistd.h>#include <errno.h>#include <sys/socket.h>
void perr_exit(const char *s){        perror(s);        exit(-1);}
//也可以在vim下按2K跳轉(zhuǎn)到man文檔中的accept函數(shù),因?yàn)閙an文檔跳轉(zhuǎn)不區(qū)分大小寫(xiě)int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr){        int n;
again:        if ((n = accept(fd, sa, salenptr)) < 0)     {        //ECONNABORTED 發(fā)生在重傳(一定次數(shù))失敗后,強(qiáng)制關(guān)閉套接字        //EINTR 進(jìn)程被信號(hào)中斷 //如果accept函數(shù)在阻塞時(shí)被信號(hào)打斷,處理完信號(hào)           //返回時(shí)就不會(huì)在阻塞了,而是直接返回-1        if ((errno == ECONNABORTED) || (errno == EINTR))        {        goto again; //如果accept阻塞時(shí)被信號(hào)打斷了,需要在執(zhí)行一次accept繼續(xù)阻塞        }        else        {        perr_exit("accept error");        }        }        return n;}
int Bind(int fd, const struct sockaddr *sa, socklen_t salen){    int n;
        if ((n = bind(fd, sa, salen)) < 0)    {        perr_exit("bind error");    }
    return n;}
int Connect(int fd, const struct sockaddr *sa, socklen_t salen){    int n;    n = connect(fd, sa, salen);        if (n < 0)     {        perr_exit("connect error");    }
    return n;}
int Listen(int fd, int backlog){    int n;
        if ((n = listen(fd, backlog)) < 0)    {        perr_exit("listen error");    }
    return n;}
int Socket(int family, int type, int protocol){        int n;
        if ((n = socket(family, type, protocol)) < 0)    {        perr_exit("socket error");    }
        return n;}
ssize_t Read(int fd, void *ptr, size_t nbytes){        ssize_t n;
again:        if ( (n = read(fd, ptr, nbytes)) == -1)     {        if (errno == EINTR)                goto again; //如果read被信號(hào)中斷了,應(yīng)該讓它繼續(xù)去read等待讀數(shù)據(jù) (read阻塞時(shí))        else                return -1;        }
        return n;}
ssize_t Write(int fd, const void *ptr, size_t nbytes){        ssize_t n;
again:        if ((n = write(fd, ptr, nbytes)) == -1)     {        if (errno == EINTR)                goto again;        else                return -1;        }        return n;}
int Close(int fd){    int n;        if ((n = close(fd)) == -1)                perr_exit("close error");
    return n;}
/*參三: 應(yīng)該讀取的字節(jié)數(shù)*/     //一直讀到n字節(jié)數(shù)才會(huì)返回,否則阻塞等待                     //socket 4096  readn(cfd, buf, 4096)   nleft = 4096-1500ssize_t Readn(int fd, void *vptr, size_t n){        size_t  nleft;              //usigned int 剩余未讀取的字節(jié)數(shù)        ssize_t nread;              //int 實(shí)際讀到的字節(jié)數(shù)        char   *ptr;
        ptr = vptr;        nleft = n;                  //n 未讀取字節(jié)數(shù)
        while (nleft > 0)     {        if ((nread = read(fd, ptr, nleft)) < 0)         {        if (errno == EINTR)            {        nread = 0;            }        else            {        return -1;            }        }         else if (nread == 0)        {        break;        }
        nleft -= nread;   //nleft = nleft - nread         ptr += nread;        }        return n - nleft;}
ssize_t Writen(int fd, const void *vptr, size_t n){        size_t nleft;        ssize_t nwritten;        const char *ptr;
        ptr = vptr;        nleft = n;        while (nleft > 0)     {        if ( (nwritten = write(fd, ptr, nleft)) <= 0)         {        if (nwritten < 0 && errno == EINTR)                nwritten = 0;        else                return -1;        }        nleft -= nwritten;        ptr += nwritten;        }        return n;}
static ssize_t my_read(int fd, char *ptr) //靜態(tài)函數(shù)保證了讀完第一個(gè)100字節(jié)才去讀下一個(gè)100字節(jié),而不是每次調(diào)用都讀100字節(jié){        static int read_cnt; //改變量存在靜態(tài)數(shù)據(jù)區(qū),下次調(diào)用my_read函數(shù)的時(shí)候,read_cnt會(huì)保留上次的值        static char *read_ptr;        static char read_buf[100];                //因?yàn)檫@里的變量都是static的,所以并非每次調(diào)用my_read都會(huì)讀100字節(jié),而是讀完100字節(jié)再去讀下一個(gè)100字節(jié)        if (read_cnt <= 0) { again:        if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0)    //"hellon"        {        if (errno == EINTR)                goto again;        return -1;        }         else if (read_cnt == 0)        return 0;
        read_ptr = read_buf;        }        read_cnt--; //在上次調(diào)用結(jié)束的值基礎(chǔ)上--,保證了讀完100字節(jié)再去讀下一個(gè)100字節(jié)        *ptr = *read_ptr++;
        return 1;}
/*readline --- fgets*/    //傳出參數(shù) vptrssize_t Readline(int fd, void *vptr, size_t maxlen){        ssize_t n, rc;        char    c, *ptr;        ptr = vptr;
        for (n = 1; n < maxlen; n++)     {        if ((rc = my_read(fd, &c)) == 1)    //ptr[] = hellon        {        *ptr++ = c;        if (c == 'n') //先讀100個(gè)字節(jié),依次遍歷,遇到 'n' 說(shuō)明一行讀完了                break;        }         else if (rc == 0)         {        *ptr = 0;        return n-1;        }         else        return -1;        }        *ptr = 0;
        return n;}

 

相關(guān)推薦

電子產(chǎn)業(yè)圖譜

Linux、C、C++、Python、Matlab,機(jī)器人運(yùn)動(dòng)控制、多機(jī)器人協(xié)作,智能優(yōu)化算法,貝葉斯濾波與卡爾曼濾波估計(jì)、多傳感器信息融合,機(jī)器學(xué)習(xí),人工智能。