Python讀取文件的四種方式的實例詳解
故事背景:最近在處理Wikipedia的數(shù)據(jù)時發(fā)現(xiàn)由于數(shù)據(jù)量過大,之前的文件讀取和數(shù)據(jù)處理方法幾乎不可用,或耗時非常久。今天學校安排統(tǒng)一核酸檢查,剛好和文件讀取的過程非常相似。正好借此機會和大家一起從頭梳理一下幾種文件讀取方法。
故事設定:現(xiàn)在學校要求對所有同學進行核酸采集,每位同學先在宿舍內(nèi)等候防護人員(以下簡稱“大白”)叫號,叫到自己時去停車場排隊等候大白對自己進行采集,采集完之后的樣本由大白統(tǒng)一有序收集并儲存。
名詞解釋:
- 學生:所有的學生是一個大文件,每個學生是其中的一行數(shù)據(jù)
- 宿舍:硬盤
- 停車場:內(nèi)存
- 核酸采集:數(shù)據(jù)處理
- 樣本:處理后的數(shù)據(jù)
- 大白:程序
學生數(shù)量特別少的情況
當學生數(shù)量特別少時,可以考慮將所有學生統(tǒng)一叫到停車場等候,再依次進行核酸采集。

方法一:簡單情況
此時的程序可以模擬為:
import time
from typing import List
def pick_all_students(dorm: str) -> List[str]:
with open(dorm, "rt", encoding="utf8") as fin:
students = fin.readlines()
return students
def pick_sample(student: str) -> str:
time.sleep(0.01)
sample = f"{student.strip()}'s sample"
return sample
def process(dorm: str, sample_storeroom: str) -> None:
with open(sample_storeroom, "wt", encoding="utf8") as fout:
students = pick_all_students(dorm)
for student in students:
sample = pick_sample(student)
fout.write(f"{sample}\n")
fout.flush()
if __name__ == "__main__":
process(
"student_names.txt",
"sample_storeroom.txt"
)
注意,在第19行中,大白一次性把所有同學都叫到了停車場中。這種做法在學生比較少時做起來很快,但是如果學生特別多,停車場裝不下怎么辦?
停車場空間不夠時怎么辦?

方法二:邊讀邊處理
一般來說,由于停車場空間有限,我們不會采用一次性把所有學生都叫到停車場中,而是會一個一個地處理,這樣可以節(jié)約內(nèi)存空間。
import time
from typing import Iterator
def pick_one_student(dorm: str) -> Iterator[str]:
with open(dorm, "rt", encoding="utf8") as fin:
for student in fin:
yield student
def pick_sample(student: str) -> str:
time.sleep(0.01)
sample = f"{student.strip()}'s sample"
return sample
def process(dorm: str, sample_storeroom: str) -> None:
with open(sample_storeroom, "wt", encoding="utf8") as fout:
for student in pick_one_student(dorm):
sample = pick_sample(student)
fout.write(f"{sample}\n")
fout.flush()
if __name__ == "__main__":
process(
"student_names.txt",
"sample_storeroom.txt"
)
這里pick_one_student函數(shù)中的返回值是用yield返回的,一次只會返回一名同學。
不過,這種做法雖然確保了停車場不會滿員,但是這種做法在人數(shù)特別多的時候就不再適合了。雖然可以保證完成任務,但由于每次只能采集一個同學,程序的執(zhí)行并不高。特別是當你的CPU有多個核時,會浪費機器性能,出現(xiàn)一核有難,其它圍觀的現(xiàn)象。

怎么加快執(zhí)行效率?
大家可能也已經(jīng)注意到了,剛剛我們的場景中,不論采用哪種方法,都只有一名大白在工作。那我們能不能加派人手,從而提高效率呢?
答案當然是可行的。我們現(xiàn)在先考慮增加兩名大白,使得一名大白專注于叫號,安排學生進入停車場,另外一名大白專注于采集核酸,最后一名大白用于存儲核酸樣本。

方法三
import time
from multiprocessing import Queue, Process
from typing import Iterator
def pick_student(stu_queue: Queue, dorm: str) -> Iterator[str]:
print("pick_student: started")
picked_num = 0
with open(dorm, "rt", encoding="utf8") as fin:
for student in fin:
stu_queue.put(student)
picked_num += 1
if picked_num % 500 == 0:
print(f"pick_student: {picked_num}")
# end signal
stu_queue.put(None)
print("pick_student: finished")
def pick_sample(student: str) -> str:
time.sleep(0.01)
sample = f"{student.strip()}'s sample"
return sample
def process(stu_queue: Queue, store_queue: Queue) -> None:
print("process: started")
process_num = 0
while True:
student = stu_queue.get()
if student is not None:
sample = pick_sample(student)
store_queue.put(sample)
process_num += 1
if process_num % 500 == 0:
print(f"process: {process_num}")
else:
break
# end signal
store_queue.put(None)
print("process: finished")
def store_sample(store_queue: Queue, sample_storeroom: str) -> None:
print("store_sample: started")
store_num = 0
with open(sample_storeroom, "wt", encoding="utf8") as fout:
while True:
sample = store_queue.get()
if sample is not None:
fout.write(f"{sample}\n")
fout.flush()
store_num += 1
if store_num % 500 == 0:
print(f"store_sample: {store_num}")
else:
break
print("store_sample: finished")
if __name__ == "__main__":
dorm = "student_names.txt"
sample_storeroom = "sample_storeroom.txt"
stu_queue = Queue()
store_queue = Queue()
store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True)
store_p.start()
process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True)
process_p.start()
read_p = Process(target=pick_student, args=(stu_queue, dorm), daemon=True)
read_p.start()
store_p.join()
這份代碼中,我們引入了多進程的思路,將每個大白看作一個進程,并使用了隊列Queue作為進程間通信的媒介。stu_queue表示學生叫號進停車場的隊列,store_queue表示已經(jīng)采集過的待存儲核酸樣本的隊列。
此外,為了控制進程的停止,我們在pick_student和 process函數(shù)的最后都向各自隊列中添加了None作為結(jié)束標志符。
假設有1w名學生(student_names.txt文件有1w行),經(jīng)過測試后發(fā)現(xiàn)上述方法的時間如下:
- 方法一:1m40.716s
- 方法二:1m40.717s
- 方法三:1m41.097s
咦?不是做了分工嗎?怎么速度還變慢了?經(jīng)筆者觀察,這是因為叫號的大白速度太快了(文件讀取速度快)通常是TA已經(jīng)齊活了,另外倆人還在吭哧吭哧干活呢,體現(xiàn)不出來分工的優(yōu)勢。如果這個時候我們對法二和法三的叫號做延時操作,每個學生叫號之后停滯10ms再叫下一位學生,則方法三的處理時間幾乎不變,而方法二的時間則會延長至3m21.345s。
怎么加快處理速度?
上面提到,大白采核酸的時間較長,往往上一個人的核酸還沒采完,下一個人就已經(jīng)在后面等著了。我們能不能提高核酸采集這個動作(數(shù)據(jù)處理)的速度呢?其實一名大白執(zhí)行一次核酸采集的時間我們幾乎無法再縮短了,但是我們可以通過增加人手的方式,來達到這個目的。就像去銀行辦業(yè)務,如果開放的窗口越多,那么每個人等待的時間就會越短。這里我們也采取類似的策略,增加核酸采集的窗口。

import time
from multiprocessing import Queue, Process, cpu_count
from typing import Iterator
def pick_student(stu_queue: Queue, dorm: str, num_workers: int) -> Iterator[str]:
print("pick_student: started")
picked_num = 0
with open(dorm, "rt", encoding="utf8") as fin:
for student in fin:
stu_queue.put(student)
picked_num += 1
if picked_num % 500 == 0:
print(f"pick_student: {picked_num}")
# end signal
for _ in range(num_workers):
stu_queue.put(None)
print("pick_student: finished")
def pick_sample(student: str) -> str:
time.sleep(0.01)
sample = f"{student.strip()}'s sample"
return sample
def process(stu_queue: Queue, store_queue: Queue) -> None:
print("process: started")
process_num = 0
while True:
student = stu_queue.get()
if student is not None:
sample = pick_sample(student)
store_queue.put(sample)
process_num += 1
if process_num % 500 == 0:
print(f"process: {process_num}")
else:
break
print("process: finished")
def store_sample(store_queue: Queue, sample_storeroom: str) -> None:
print("store_sample: started")
store_num = 0
with open(sample_storeroom, "wt", encoding="utf8") as fout:
while True:
sample = store_queue.get()
if sample is not None:
fout.write(f"{sample}\n")
fout.flush()
store_num += 1
if store_num % 500 == 0:
print(f"store_sample: {store_num}")
else:
break
print("store_sample: finished")
if __name__ == "__main__":
dorm = "student_names.txt"
sample_storeroom = "sample_storeroom.txt"
num_process = max(1, cpu_count() - 1)
maxsize = 10 * num_process
stu_queue = Queue(maxsize=maxsize)
store_queue = Queue(maxsize=maxsize)
store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True)
store_p.start()
process_workers = []
for _ in range(num_process):
process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True)
process_p.start()
process_workers.append(process_p)
read_p = Process(target=pick_student, args=(stu_queue, dorm, num_process), daemon=True)
read_p.start()
for worker in process_workers:
worker.join()
# end signal
store_queue.put(None)
store_p.join()總耗時 0m4.160s !我們來具體看看其中的細節(jié)部分:
首先我們將CPU核數(shù) - 3作為采核酸的大白數(shù)量。這里減3是為其它工作進程保留了一些資源,你也可以根據(jù)自己的具體情況做調(diào)整
這次我們在 Queue中增加了 maxsize參數(shù),這個參數(shù)是限制隊列的最大長度,這個參數(shù)通常與你的實際內(nèi)存情況有關(guān)。如果數(shù)據(jù)特別多時要考慮做些調(diào)整。這里我采用10倍的工作進程數(shù)目作為隊列的長度
注意這里pick_student函數(shù)中要為每個后續(xù)的工作進程都添加一個結(jié)束標志,因此最后會有個for循環(huán)
我們把之前放在process函數(shù)中的結(jié)束標志提取出來,放在了最外側(cè),使得所有工作進程均結(jié)束之后再關(guān)閉最后的store_p進程
結(jié)語
總結(jié)來說,如果你的數(shù)據(jù)集特別小,用法一;通常情況下用法二;數(shù)據(jù)集特別大時用法四。
以上就是Python讀取文件的四種方式的實例詳解的詳細內(nèi)容,更多關(guān)于Python讀取文件的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
關(guān)于pandas.date_range()的用法及說明
這篇文章主要介紹了關(guān)于pandas.date_range()的用法及說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-07-07
Pytorch之上/下采樣函數(shù)torch.nn.functional.interpolate插值詳解
這篇文章主要介紹了Pytorch之上/下采樣函數(shù)torch.nn.functional.interpolate插值,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2025-04-04
Pytorch轉(zhuǎn)keras的有效方法,以FlowNet為例講解
這篇文章主要介紹了Pytorch轉(zhuǎn)keras的有效方法,以FlowNet為例講解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-05-05

