Linux進程間通信之管道如何實現(xiàn)進程池
一、管道的特點
只能用于具有共同祖先的進程之間進行通信,通常,一個管道由一個進程創(chuàng)建,然后該進程調用fork創(chuàng)建子進程,此后父子進程就可以使用該管道進行通信
管道面向字節(jié)流,即管道不曉得自己里面的內(nèi)容,只是一味按照父子進程之間的協(xié)調進行傳輸信息,父子進程在讀取其中的內(nèi)容時是不看內(nèi)容是否有\n和\0等含有特殊意義的內(nèi)容
因為管道的本質是一種內(nèi)存級文件,所以管道的生命周期伴隨著進程的退出而結束
一般而言,,內(nèi)核會對管道操作進行同步與互斥,同步是指多個進程或線程在訪問共享資源或進行特定操作時,按照一定的順序或規(guī)則進行協(xié)調,以確保它們之間的操作能夠正確、有序地執(zhí)行,互斥是指在同一時刻,只允許一個進程或線程訪問共享資源,以避免多個進程或線程同時訪問導致的數(shù)據(jù)不一致或沖突問題
管道為半雙工通道,只能單向傳遞信息,需要雙向通信就要建立兩個管道
我們在命令行中使用的|就是匿名通道
二、進程池
1、概念
我們知道在我們創(chuàng)建子進程的時候要調用fork函數(shù),這是一個系統(tǒng)調用接口,所以會對系統(tǒng)產(chǎn)生成本,如果我們一次創(chuàng)建很多個進程,那么系統(tǒng)會變得很累,所以我們引入池的概念,進程池可以保證在我們需要使用進程的情況下,由于提前創(chuàng)建了子進程,我們直接分配就行了,避免了我們需要大量進程的情況下操作系統(tǒng)很吃力的情況,對提前創(chuàng)建好的這些子進程進行先描述后組織的

2、用管道實現(xiàn)一個簡易進程池
(一)頭文件、宏、全局變量和main函數(shù)
#include <iostream>
#include <vector>
#include <string>
#include <unistd.h>
#include "task.hpp"
#include <sys/stat.h>
#include <sys/wait.h>
#include <cstdio>
#define PROCESSNUM 10
std::vector<task_t> tasks;
int main()
{
//加載任務
LoadTask(&tasks);
//定義一個vector管理所有的管道,channel是描述,channels是組織
std::vector<channel> channels;
//初始化
InitProcessPool(&channels);
//開始進行
StartProcessPool(channels);
//清理
CleanProcessPool(channels);
return 0;
}(二)初始化函數(shù)InitProcessPool
初始化函數(shù)里有一個重要的點就是,我們的子進程是循環(huán)創(chuàng)建的,所以在創(chuàng)建第一個子進程時沒有問題,但是創(chuàng)建第二個子進程開始,因為剛創(chuàng)建出的第二個子進程與父進程是一樣的,此時都作為寫端連接著一個管道,我們在圖中用綠色的線標注出來了,第三個子進程又可以成為第一二個管道的寫端,以此類推,每個子進程后創(chuàng)建的子進程都會是上個信道的寫端,這與我們想要父進程寫,子進程讀的要求相悖,所以我們初始化的另一個目的就是將這些多余的連接全部斷開,也就是圖中彩色的線全部斷開,進而保證只有父進程在寫端

- task.hpp
#pragma once
#include <iostream>
#include <vector>
//定義一個函數(shù)指針task_t指向返回值為void,沒有參數(shù)的函數(shù)
typedef void (*task_t)();
void task1()
{
std::cout << "this is task1 running" << std::endl;
}
void task2()
{
std::cout << "this is task2 running" << std::endl;
}
void task3()
{
std::cout << "this is task3 running" << std::endl;
}
void task4()
{
std::cout << "this is task4 running" << std::endl;
}
//加載任務函數(shù),將任務pushback到vector中
void LoadTask(std::vector<task_t> *tasks)
{
tasks->push_back(task1);
tasks->push_back(task2);
tasks->push_back(task3);
tasks->push_back(task4);
}- test.cpp
class channel
{
public:
// 描述父進程的fd,對應子進程的pid,子進程的名字
channel(int cmdfd, int slaverid, const std::string &processname)
:_cmdfd(cmdfd),_slaverid(slaverid),_processname(processname)
{}
int _cmdfd;
pid_t _slaverid;
std::string _processname;
};
void slaver()
{
while(true)
{
// 用于存儲從標準輸入讀取的命令碼
int cmdcode = 0;
// 從標準輸入(管道)讀取數(shù)據(jù),嘗試讀取sizeof(int)字節(jié)的數(shù)據(jù)到cmdcode中
// 如果父進程不給子進程發(fā)送數(shù)據(jù)子進程就會進入阻塞等待
int n = read(0, &cmdcode, sizeof(int));
if(n == sizeof(int))
{
// read的返回值與sizeof(int)相等,就輸出子進程pid和獲得命令碼
// 如果命令碼有效就調用task任務,無效就退出
std::cout <<"slaver say@ get a command: "<< getpid() << " : cmdcode: "
<< cmdcode << std::endl;
if(cmdcode >= 0 && cmdcode < tasks.size()) tasks[cmdcode]();
}
else break;
}
}
void InitProcessPool(std::vector<channel>* channels)
{
//用于存儲之前創(chuàng)建的管道的寫端文件描述符
//目的是讓后續(xù)創(chuàng)建的子進程可以關閉這些舊的寫端文件描述符,避免資源泄漏
std::vector<int> oldfds;
//循環(huán)創(chuàng)建子進程
for(int i = 0; i < PROCESSNUM; i++)
{
int pipefd[2] = {0};
int n = pipe(pipefd);
if(n < 0)
{
return;
}
pid_t id = fork();
if(id < 0)
{
return;
}
if(id == 0)
{
//打印子進程pid,打印并關閉上一個管道寫端文件描述符
std::cout << "child : " << getpid() << " close history fd: ";
for(auto fd : oldfds)
{
std::cout << fd << " ";
close(fd);
}
std::cout << std::endl;
//關閉寫端通道
close(pipefd[1]);
//將當前管道的讀端文件描述符復制到標準輸入
//這樣子進程就可以通過標準輸入從管道讀取數(shù)據(jù)
dup2(pipefd[0],0);
// 讀取完關閉管道讀端
close(pipefd[0]);
// 子進程主要業(yè)務
slaver();
//打印子進程要退出了
std::cout << "process : " << getpid() << " quit" << std::endl;
exit(0);
}
//父進程開始
//關閉讀端
close(pipefd[0]);
//將當前channel信息添加到channels進行組織
std::string name = "process-" + std::to_string(i);
channels->push_back(channel(pipefd[1],id,name));
//添加這個寫端的文件描述符,方便后面的進程關閉它
oldfds.push_back(pipefd[1]);
sleep(1);
}
}(三)執(zhí)行函數(shù)StartProcessPool
//打印一個選擇任務的菜單
void Menu()
{
std::cout << "################################################" << std::endl;
std::cout << "# 1. 任務一 2. 任務二 #" << std::endl;
std::cout << "# 3. 任務三 4. 任務四 #" << std::endl;
std::cout << "# 0. 退出 #" << std::endl;
std::cout << "#################################################" << std::endl;
}
void StartProcessPool(std::vector<channel>* channels)
{
while(true)
{
int select = 0;
Menu();
sleep(1);
//輸入選項
std::cout << "Please Enter>> ";
std::cin >> select;
if(select <= 0 || select >= 5) break;
//將控制碼也就是選擇的數(shù)字select1234轉化為0123,因為vector下標從0開始,所以要-1
int cmdcode = select - 1;
//通過管道寫入信息,等待slaver()讀取
write(channels[select]._cmdfd, &cmdcode, sizeof(cmdcode));
sleep(1);
}
}(四)清理函數(shù)CleanProcessPool
void CleanProcessPool(std::vector<channel> &channels)
{
//每個channel對象的左邊為父進程的fd,右邊為子進程fd,斷開父進程fd,然后進程等待
//父進程斷開后子進程會在管道中讀到0,即文件結束,然后子進程就會終止
//然后被父進程回收
for(const auto &c : channels){
close(c._cmdfd);
waitpid(c._slaverid, nullptr, 0);
}
}三、進程池其他問題
1、描述整個過程
首先啟動進程,將任務函數(shù)“上膛”到vector中,然后進行初始化,創(chuàng)建出第一個子進程,第一個子進程執(zhí)行常規(guī)操作,比如將寫端關閉,將當前管道讀端文件描述符復制到標準輸入以來獲取標準輸入的數(shù)據(jù),然后就是等待父進程發(fā)送信息,在此同時,父進程也不閑著,將當前讀端關閉,然后描述channel進而pushback到channels中進行組織,然后在oldfds中存下管道寫端對應的fd,方便后面子進程的斷開,然后創(chuàng)建第二個子進程,第二個子進程執(zhí)行和第一個子進程差不多的操作,唯一的區(qū)別就是要將oldfds里面的寫端全部斷開,然后以此類推
2、細節(jié)處理
開始創(chuàng)建第一個子進程并形成管道時,父進程的讀端fd==3寫端fd==4,到后面就會關閉讀端,第二次創(chuàng)建時父進程的讀端fd==3寫端fd==5,以此類推,父進程的讀端將一直為3,而寫端遞增
創(chuàng)建完成的子進程在父進程發(fā)送信息之前都處于阻塞狀態(tài),一旦父進程發(fā)送信息,比如說上面我們提到的指定某個管道或者指定某個任務
3、標準的制定
一種良好的編程習慣對于一個程序員來說是一件非常好的事情,對于我們main函數(shù)中的這三個函數(shù)參數(shù),我們發(fā)現(xiàn)它們遵守著一定的規(guī)則
const &:當我們只進行輸入不要輸出內(nèi)容的時候*:當我們要輸出內(nèi)容的時候,類似于輸出型參數(shù)&:當我們既要輸入又要輸出的時候
總結
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
在Apache服務器上利用Varnish優(yōu)化移動端訪問的方法
這篇文章主要介紹了在Apach服務器上利用Varnish優(yōu)化移動端訪問的方法,包括清除緩存等常用操作的介紹,需要的朋友可以參考下2015-06-06
linux服務器上安裝Anaconda與pytorch的詳細過程
這篇文章主要介紹了linux服務器上安裝Anaconda與pytorch的詳細過程,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-10-10
Apache FlinkCEP 實現(xiàn)超時狀態(tài)監(jiān)控的步驟詳解
這篇文章主要介紹了Apache FlinkCEP 實現(xiàn)超時狀態(tài)監(jiān)控的步驟,本文給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2019-10-10
apache啟動報錯:httpd: apr_sockaddr_info_get() failed
httpd: Could not reliably determine the server's fully qualified domain name, using 127.0.0.1 for ServerName2013-02-02

