pandas apply多線程實現(xiàn)代碼
一、多線程化選擇
并行化一個代碼有兩大選擇:multithread 和 multiprocess。
Multithread,多線程,同一個進程(process)可以開啟多個線程執(zhí)行計算。每個線程代表了一個 CPU 核心,這么多線程可以訪問同樣的內存地址(所謂共享內存),實現(xiàn)了線程之間的通訊,算是最簡單的并行模型。
Multiprocess,多進程,則相當于同時開啟多個 Python 解釋器,每個解釋器有自己獨有的數(shù)據,自然不會有數(shù)據沖突。
二、并行化思想
并行化的基本思路是把 dataframe 用 np.array_split 方法切割成多個子 dataframe。再調用 Pool.map 函數(shù)并行地執(zhí)行。注意到順序執(zhí)行的 pandas.DataFrame.apply 是如何轉化成 Pool.map 然后并行執(zhí)行的。
Pool 對象是一組并行的進程,開源Pool類
開源Pool類定義
def Pool(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None):
'''Returns a process pool object'''
from .pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild,
context=self.get_context())
設置進程初始化函數(shù)
def init_process(global_vars): global a a = global_vars
設置進程初始化函數(shù)
Pool(processes=8,initializer=init_process,initargs=(a,))
其中,指定產生 8 個進程,每個進程的初始化需運行 init_process函數(shù),其參數(shù)為一個 singleton tuple a. 利用 init_process 和 initargs,我們可以方便的設定需要在進程間共享的全局變量(這里是 a)。
with 關鍵詞是 context manager,避免寫很繁瑣的處理開關進程的邏輯。
with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool:
result_parts = pool.map(apply_f,df_parts)
三、多線程化應用
多線程時間比較和多線程的幾種apply應用
import numpy as np
import pandas as pd
import time
from multiprocessing import Pool
def f(row):
#直接對某列進行操作
return sum(row)+a
def f1_1(row):
#對某一列進行操作,我這里的columns=range(0,2),此處是對第0列進行操作
return row[0]**2
def f1_2(row1):
#對某一列進行操作,我這里的columns=range(0,2),此處是對第0列進行操作
return row1**2
def f2_1(row):
#對某兩列進行操作,我這里的columns=range(0,2),此處是對第0,2列進行操作
return pd.Series([row[0]**2,row[1]**2],index=['1_1','1_2'])
def f2_2(row1,row2):
#對某兩列進行操作,我這里的columns=range(0,2),此處是對第0,2列進行操作
return pd.Series([row1**2,row2**2],index=['2_1','2_2'])
def apply_f(df):
return df.apply(f,axis=1)
def apply_f1_1(df):
return df.apply(f1_1,axis=1)
def apply_f1_2(df):
return df[0].apply(f1_2)
def apply_f2_1(df):
return df.apply(f2_1,axis=1)
def apply_f2_2(df):
return df.apply(lambda row :f2_2(row[0],row[1]),axis=1)
def init_process(global_vars):
global a
a = global_vars
def time_compare():
'''直接調用和多線程調用時間對比'''
a = 2
np.random.seed(0)
df = pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2))
print(df.columns)
t1= time.time()
result_serial = df.apply(f,axis=1)
t2 = time.time()
print("Serial time =",t2-t1)
print(result_serial.head())
df_parts=np.array_split(df,20)
print(len(df_parts),type(df_parts[0]))
with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool:
#with Pool(processes=8) as pool:
result_parts = pool.map(apply_f,df_parts)
result_parallel= pd.concat(result_parts)
t3 = time.time()
print("Parallel time =",t3-t2)
print(result_parallel.head())
def apply_fun():
'''多種apply函數(shù)的調用'''
a = 2
np.random.seed(0)
df = pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2))
print(df.columns)
df_parts=np.array_split(df,20)
print(len(df_parts),type(df_parts[0]))
with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool:
#with Pool(processes=8) as pool:
res_part0 = pool.map(apply_f,df_parts)
res_part1 = pool.map(apply_f1_1,df_parts)
res_part2 = pool.map(apply_f1_2,df_parts)
res_part3 = pool.map(apply_f2_1,df_parts)
res_part4 = pool.map(apply_f2_2,df_parts)
res_parallel0 = pd.concat(res_part0)
res_parallel1 = pd.concat(res_part1)
res_parallel2 = pd.concat(res_part2)
res_parallel3 = pd.concat(res_part3)
res_parallel4 = pd.concat(res_part4)
print("f:\n",res_parallel0.head())
print("f1:\n",res_parallel1.head())
print("f2:\n",res_parallel2.head())
print("f3:\n",res_parallel3.head())
print("f4:\n",res_parallel4.head())
df=pd.concat([df,res_parallel0],axis=1)
df=pd.concat([df,res_parallel1],axis=1)
df=pd.concat([df,res_parallel2],axis=1)
df=pd.concat([df,res_parallel3],axis=1)
df=pd.concat([df,res_parallel4],axis=1)
print(df.head())
if __name__ == '__main__':
time_compare()
apply_fun()
參考網址
https://blog.fangzhou.me/posts/20170702-python-parallelism/
https://docs.python.org/3.7/library/multiprocessing.html
到此這篇關于pandas apply多線程實現(xiàn)代碼的文章就介紹到這了,更多相關pandas apply多線程內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
python利用Appium實現(xiàn)自動控制移動設備并提取數(shù)據功能
這篇文章主要介紹了python利用Appium自動控制移動設備并提取數(shù)據,本文以控制抖音app滑動并獲取抖音短視頻發(fā)布者為例,通過實例代碼給大家介紹的非常詳細,需要的朋友可以參考下2021-09-09
Python中l(wèi)ambda表達式的使用詳解(完整通透版)
這篇文章主要介紹了Python中l(wèi)ambda表達式使用的相關資料,包括其基本語法、常見應用場景(如排序、map、filter、reduce函數(shù)結合使用)以及如何在函數(shù)內部或一次性使用,通過代碼介紹的非常詳細,需要的朋友可以參考下2024-12-12
Mac PyCharm中的.gitignore 安裝設置教程
這篇文章主要介紹了Mac PyCharm中的.gitignore 安裝設置教程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-04-04

