詳解Tensorflow數(shù)據(jù)讀取有三種方式(next_batch)
Tensorflow數(shù)據(jù)讀取有三種方式:
- Preloaded data: 預(yù)加載數(shù)據(jù)
- Feeding: Python產(chǎn)生數(shù)據(jù),再把數(shù)據(jù)喂給后端。
- Reading from file: 從文件中直接讀取
這三種有讀取方式有什么區(qū)別呢? 我們首先要知道TensorFlow(TF)是怎么樣工作的。
TF的核心是用C++寫的,這樣的好處是運(yùn)行快,缺點(diǎn)是調(diào)用不靈活。而Python恰好相反,所以結(jié)合兩種語(yǔ)言的優(yōu)勢(shì)。涉及計(jì)算的核心算子和運(yùn)行框架是用C++寫的,并提供API給Python。Python調(diào)用這些API,設(shè)計(jì)訓(xùn)練模型(Graph),再將設(shè)計(jì)好的Graph給后端去執(zhí)行。簡(jiǎn)而言之,Python的角色是Design,C++是Run。
一、預(yù)加載數(shù)據(jù):
import tensorflow as tf # 設(shè)計(jì)Graph x1 = tf.constant([2, 3, 4]) x2 = tf.constant([4, 0, 1]) y = tf.add(x1, x2) # 打開一個(gè)session --> 計(jì)算y with tf.Session() as sess: print sess.run(y)
二、python產(chǎn)生數(shù)據(jù),再將數(shù)據(jù)喂給后端
import tensorflow as tf
# 設(shè)計(jì)Graph
x1 = tf.placeholder(tf.int16)
x2 = tf.placeholder(tf.int16)
y = tf.add(x1, x2)
# 用Python產(chǎn)生數(shù)據(jù)
li1 = [2, 3, 4]
li2 = [4, 0, 1]
# 打開一個(gè)session --> 喂數(shù)據(jù) --> 計(jì)算y
with tf.Session() as sess:
print sess.run(y, feed_dict={x1: li1, x2: li2})
說(shuō)明:在這里x1, x2只是占位符,沒(méi)有具體的值,那么運(yùn)行的時(shí)候去哪取值呢?這時(shí)候就要用到sess.run()中的feed_dict參數(shù),將Python產(chǎn)生的數(shù)據(jù)喂給后端,并計(jì)算y。
這兩種方案的缺點(diǎn):
1、預(yù)加載:將數(shù)據(jù)直接內(nèi)嵌到Graph中,再把Graph傳入Session中運(yùn)行。當(dāng)數(shù)據(jù)量比較大時(shí),Graph的傳輸會(huì)遇到效率問(wèn)題。
2、用占位符替代數(shù)據(jù),待運(yùn)行的時(shí)候填充數(shù)據(jù)。
前兩種方法很方便,但是遇到大型數(shù)據(jù)的時(shí)候就會(huì)很吃力,即使是Feeding,中間環(huán)節(jié)的增加也是不小的開銷,比如數(shù)據(jù)類型轉(zhuǎn)換等等。最優(yōu)的方案就是在Graph定義好文件讀取的方法,讓TF自己去從文件中讀取數(shù)據(jù),并解碼成可使用的樣本集。
三、從文件中讀取,簡(jiǎn)單來(lái)說(shuō)就是將數(shù)據(jù)讀取模塊的圖搭好

1、準(zhǔn)備數(shù)據(jù),構(gòu)造三個(gè)文件,A.csv,B.csv,C.csv
$ echo -e "Alpha1,A1\nAlpha2,A2\nAlpha3,A3" > A.csv $ echo -e "Bee1,B1\nBee2,B2\nBee3,B3" > B.csv $ echo -e "Sea1,C1\nSea2,C2\nSea3,C3" > C.csv
2、單個(gè)Reader,單個(gè)樣本
#-*- coding:utf-8 -*-
import tensorflow as tf
# 生成一個(gè)先入先出隊(duì)列和一個(gè)QueueRunner,生成文件名隊(duì)列
filenames = ['A.csv', 'B.csv', 'C.csv']
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
# 定義Reader
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
# 定義Decoder
example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']])
#example_batch, label_batch = tf.train.shuffle_batch([example,label], batch_size=1, capacity=200, min_after_dequeue=100, num_threads=2)
# 運(yùn)行Graph
with tf.Session() as sess:
coord = tf.train.Coordinator() #創(chuàng)建一個(gè)協(xié)調(diào)器,管理線程
threads = tf.train.start_queue_runners(coord=coord) #啟動(dòng)QueueRunner, 此時(shí)文件名隊(duì)列已經(jīng)進(jìn)隊(duì)。
for i in range(10):
print example.eval(),label.eval()
coord.request_stop()
coord.join(threads)
說(shuō)明:這里沒(méi)有使用tf.train.shuffle_batch,會(huì)導(dǎo)致生成的樣本和label之間對(duì)應(yīng)不上,亂序了。生成結(jié)果如下:
Alpha1 A2
Alpha3 B1
Bee2 B3
Sea1 C2
Sea3 A1
Alpha2 A3
Bee1 B2
Bee3 C1
Sea2 C3
Alpha1 A2
解決方案:用tf.train.shuffle_batch,那么生成的結(jié)果就能夠?qū)?yīng)上。
#-*- coding:utf-8 -*-
import tensorflow as tf
# 生成一個(gè)先入先出隊(duì)列和一個(gè)QueueRunner,生成文件名隊(duì)列
filenames = ['A.csv', 'B.csv', 'C.csv']
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
# 定義Reader
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
# 定義Decoder
example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']])
example_batch, label_batch = tf.train.shuffle_batch([example,label], batch_size=1, capacity=200, min_after_dequeue=100, num_threads=2)
# 運(yùn)行Graph
with tf.Session() as sess:
coord = tf.train.Coordinator() #創(chuàng)建一個(gè)協(xié)調(diào)器,管理線程
threads = tf.train.start_queue_runners(coord=coord) #啟動(dòng)QueueRunner, 此時(shí)文件名隊(duì)列已經(jīng)進(jìn)隊(duì)。
for i in range(10):
e_val,l_val = sess.run([example_batch, label_batch])
print e_val,l_val
coord.request_stop()
coord.join(threads)
3、單個(gè)Reader,多個(gè)樣本,主要也是通過(guò)tf.train.shuffle_batch來(lái)實(shí)現(xiàn)
#-*- coding:utf-8 -*-
import tensorflow as tf
filenames = ['A.csv', 'B.csv', 'C.csv']
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']])
# 使用tf.train.batch()會(huì)多加了一個(gè)樣本隊(duì)列和一個(gè)QueueRunner。
#Decoder解后數(shù)據(jù)會(huì)進(jìn)入這個(gè)隊(duì)列,再批量出隊(duì)。
# 雖然這里只有一個(gè)Reader,但可以設(shè)置多線程,相應(yīng)增加線程數(shù)會(huì)提高讀取速度,但并不是線程越多越好。
example_batch, label_batch = tf.train.batch(
[example, label], batch_size=5)
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for i in range(10):
e_val,l_val = sess.run([example_batch,label_batch])
print e_val,l_val
coord.request_stop()
coord.join(threads)
說(shuō)明:下面這種寫法,提取出來(lái)的batch_size個(gè)樣本,特征和label之間也是不同步的
#-*- coding:utf-8 -*-
import tensorflow as tf
filenames = ['A.csv', 'B.csv', 'C.csv']
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
example, label = tf.decode_csv(value, record_defaults=[['null'], ['null']])
# 使用tf.train.batch()會(huì)多加了一個(gè)樣本隊(duì)列和一個(gè)QueueRunner。
#Decoder解后數(shù)據(jù)會(huì)進(jìn)入這個(gè)隊(duì)列,再批量出隊(duì)。
# 雖然這里只有一個(gè)Reader,但可以設(shè)置多線程,相應(yīng)增加線程數(shù)會(huì)提高讀取速度,但并不是線程越多越好。
example_batch, label_batch = tf.train.batch(
[example, label], batch_size=5)
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for i in range(10):
print example_batch.eval(), label_batch.eval()
coord.request_stop()
coord.join(threads)
說(shuō)明:輸出結(jié)果如下:可以看出feature和label之間是不對(duì)應(yīng)的
['Alpha1' 'Alpha2' 'Alpha3' 'Bee1' 'Bee2'] ['B3' 'C1' 'C2' 'C3' 'A1']
['Alpha2' 'Alpha3' 'Bee1' 'Bee2' 'Bee3'] ['C1' 'C2' 'C3' 'A1' 'A2']
['Alpha3' 'Bee1' 'Bee2' 'Bee3' 'Sea1'] ['C2' 'C3' 'A1' 'A2' 'A3']
4、多個(gè)reader,多個(gè)樣本
#-*- coding:utf-8 -*-
import tensorflow as tf
filenames = ['A.csv', 'B.csv', 'C.csv']
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
record_defaults = [['null'], ['null']]
#定義了多種解碼器,每個(gè)解碼器跟一個(gè)reader相連
example_list = [tf.decode_csv(value, record_defaults=record_defaults)
for _ in range(2)] # Reader設(shè)置為2
# 使用tf.train.batch_join(),可以使用多個(gè)reader,并行讀取數(shù)據(jù)。每個(gè)Reader使用一個(gè)線程。
example_batch, label_batch = tf.train.batch_join(
example_list, batch_size=5)
with tf.Session() as sess:
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for i in range(10):
e_val,l_val = sess.run([example_batch,label_batch])
print e_val,l_val
coord.request_stop()
coord.join(threads)
tf.train.batch與tf.train.shuffle_batch函數(shù)是單個(gè)Reader讀取,但是可以多線程。tf.train.batch_join與tf.train.shuffle_batch_join可設(shè)置多Reader讀取,每個(gè)Reader使用一個(gè)線程。至于兩種方法的效率,單Reader時(shí),2個(gè)線程就達(dá)到了速度的極限。多Reader時(shí),2個(gè)Reader就達(dá)到了極限。所以并不是線程越多越快,甚至更多的線程反而會(huì)使效率下降。
5、迭代控制,設(shè)置epoch參數(shù),指定我們的樣本在訓(xùn)練的時(shí)候只能被用多少輪
#-*- coding:utf-8 -*-
import tensorflow as tf
filenames = ['A.csv', 'B.csv', 'C.csv']
#num_epoch: 設(shè)置迭代數(shù)
filename_queue = tf.train.string_input_producer(filenames, shuffle=False,num_epochs=3)
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
record_defaults = [['null'], ['null']]
#定義了多種解碼器,每個(gè)解碼器跟一個(gè)reader相連
example_list = [tf.decode_csv(value, record_defaults=record_defaults)
for _ in range(2)] # Reader設(shè)置為2
# 使用tf.train.batch_join(),可以使用多個(gè)reader,并行讀取數(shù)據(jù)。每個(gè)Reader使用一個(gè)線程。
example_batch, label_batch = tf.train.batch_join(
example_list, batch_size=1)
#初始化本地變量
init_local_op = tf.initialize_local_variables()
with tf.Session() as sess:
sess.run(init_local_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
try:
while not coord.should_stop():
e_val,l_val = sess.run([example_batch,label_batch])
print e_val,l_val
except tf.errors.OutOfRangeError:
print('Epochs Complete!')
finally:
coord.request_stop()
coord.join(threads)
coord.request_stop()
coord.join(threads)
在迭代控制中,記得添加tf.initialize_local_variables(),官網(wǎng)教程沒(méi)有說(shuō)明,但是如果不初始化,運(yùn)行就會(huì)報(bào)錯(cuò)。
對(duì)于傳統(tǒng)的機(jī)器學(xué)習(xí)而言,比方說(shuō)分類問(wèn)題,[x1 x2 x3]是feature。對(duì)于二分類問(wèn)題,label經(jīng)過(guò)one-hot編碼之后就會(huì)是[0,1]或者[1,0]。一般情況下,我們會(huì)考慮將數(shù)據(jù)組織在csv文件中,一行代表一個(gè)sample。然后使用隊(duì)列的方式去讀取數(shù)據(jù)

說(shuō)明:對(duì)于該數(shù)據(jù),前三列代表的是feature,因?yàn)槭欠诸悊?wèn)題,后兩列就是經(jīng)過(guò)one-hot編碼之后得到的label
使用隊(duì)列讀取該csv文件的代碼如下:
#-*- coding:utf-8 -*-
import tensorflow as tf
# 生成一個(gè)先入先出隊(duì)列和一個(gè)QueueRunner,生成文件名隊(duì)列
filenames = ['A.csv']
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
# 定義Reader
reader = tf.TextLineReader()
key, value = reader.read(filename_queue)
# 定義Decoder
record_defaults = [[1], [1], [1], [1], [1]]
col1, col2, col3, col4, col5 = tf.decode_csv(value,record_defaults=record_defaults)
features = tf.pack([col1, col2, col3])
label = tf.pack([col4,col5])
example_batch, label_batch = tf.train.shuffle_batch([features,label], batch_size=2, capacity=200, min_after_dequeue=100, num_threads=2)
# 運(yùn)行Graph
with tf.Session() as sess:
coord = tf.train.Coordinator() #創(chuàng)建一個(gè)協(xié)調(diào)器,管理線程
threads = tf.train.start_queue_runners(coord=coord) #啟動(dòng)QueueRunner, 此時(shí)文件名隊(duì)列已經(jīng)進(jìn)隊(duì)。
for i in range(10):
e_val,l_val = sess.run([example_batch, label_batch])
print e_val,l_val
coord.request_stop()
coord.join(threads)
輸出結(jié)果如下:

說(shuō)明:
record_defaults = [[1], [1], [1], [1], [1]]
代表解析的模板,每個(gè)樣本有5列,在數(shù)據(jù)中是默認(rèn)用‘,'隔開的,然后解析的標(biāo)準(zhǔn)是[1],也即每一列的數(shù)值都解析為整型。[1.0]就是解析為浮點(diǎn),['null']解析為string類型
二、此處給出了幾種不同的next_batch方法,該文章只是做出代碼片段的解釋,以備以后查看:
def next_batch(self, batch_size, fake_data=False):
"""Return the next `batch_size` examples from this data set."""
if fake_data:
fake_image = [1] * 784
if self.one_hot:
fake_label = [1] + [0] * 9
else:
fake_label = 0
return [fake_image for _ in xrange(batch_size)], [
fake_label for _ in xrange(batch_size)
]
start = self._index_in_epoch
self._index_in_epoch += batch_size
if self._index_in_epoch > self._num_examples: # epoch中的句子下標(biāo)是否大于所有語(yǔ)料的個(gè)數(shù),如果為True,開始新一輪的遍歷
# Finished epoch
self._epochs_completed += 1
# Shuffle the data
perm = numpy.arange(self._num_examples) # arange函數(shù)用于創(chuàng)建等差數(shù)組
numpy.random.shuffle(perm) # 打亂
self._images = self._images[perm]
self._labels = self._labels[perm]
# Start next epoch
start = 0
self._index_in_epoch = batch_size
assert batch_size <= self._num_examples
end = self._index_in_epoch
return self._images[start:end], self._labels[start:end]
該段代碼摘自mnist.py文件,從代碼第12行start = self._index_in_epoch開始解釋,_index_in_epoch-1是上一次batch個(gè)圖片中最后一張圖片的下邊,這次epoch第一張圖片的下標(biāo)是從 _index_in_epoch開始,最后一張圖片的下標(biāo)是_index_in_epoch+batch, 如果 _index_in_epoch 大于語(yǔ)料中圖片的個(gè)數(shù),表示這個(gè)epoch是不合適的,就算是完成了語(yǔ)料的一遍的遍歷,所以應(yīng)該對(duì)圖片洗牌然后開始新一輪的語(yǔ)料組成batch開始
def ptb_iterator(raw_data, batch_size, num_steps):
"""Iterate on the raw PTB data.
This generates batch_size pointers into the raw PTB data, and allows
minibatch iteration along these pointers.
Args:
raw_data: one of the raw data outputs from ptb_raw_data.
batch_size: int, the batch size.
num_steps: int, the number of unrolls.
Yields:
Pairs of the batched data, each a matrix of shape [batch_size, num_steps].
The second element of the tuple is the same data time-shifted to the
right by one.
Raises:
ValueError: if batch_size or num_steps are too high.
"""
raw_data = np.array(raw_data, dtype=np.int32)
data_len = len(raw_data)
batch_len = data_len // batch_size #有多少個(gè)batch
data = np.zeros([batch_size, batch_len], dtype=np.int32) # batch_len 有多少個(gè)單詞
for i in range(batch_size): # batch_size 有多少個(gè)batch
data[i] = raw_data[batch_len * i:batch_len * (i + 1)]
epoch_size = (batch_len - 1) // num_steps # batch_len 是指一個(gè)batch中有多少個(gè)句子
#epoch_size = ((len(data) // model.batch_size) - 1) // model.num_steps # // 表示整數(shù)除法
if epoch_size == 0:
raise ValueError("epoch_size == 0, decrease batch_size or num_steps")
for i in range(epoch_size):
x = data[:, i*num_steps:(i+1)*num_steps]
y = data[:, i*num_steps+1:(i+1)*num_steps+1]
yield (x, y)
第三種方式:
def next(self, batch_size):
""" Return a batch of data. When dataset end is reached, start over.
"""
if self.batch_id == len(self.data):
self.batch_id = 0
batch_data = (self.data[self.batch_id:min(self.batch_id +
batch_size, len(self.data))])
batch_labels = (self.labels[self.batch_id:min(self.batch_id +
batch_size, len(self.data))])
batch_seqlen = (self.seqlen[self.batch_id:min(self.batch_id +
batch_size, len(self.data))])
self.batch_id = min(self.batch_id + batch_size, len(self.data))
return batch_data, batch_labels, batch_seqlen
第四種方式:
def batch_iter(sourceData, batch_size, num_epochs, shuffle=True):
data = np.array(sourceData) # 將sourceData轉(zhuǎn)換為array存儲(chǔ)
data_size = len(sourceData)
num_batches_per_epoch = int(len(sourceData) / batch_size) + 1
for epoch in range(num_epochs):
# Shuffle the data at each epoch
if shuffle:
shuffle_indices = np.random.permutation(np.arange(data_size))
shuffled_data = sourceData[shuffle_indices]
else:
shuffled_data = sourceData
for batch_num in range(num_batches_per_epoch):
start_index = batch_num * batch_size
end_index = min((batch_num + 1) * batch_size, data_size)
yield shuffled_data[start_index:end_index]
迭代器的用法,具體學(xué)習(xí)Python迭代器的用法
另外需要注意的是,前三種方式只是所有語(yǔ)料遍歷一次,而最后一種方法是,所有語(yǔ)料遍歷了num_epochs次
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
python實(shí)現(xiàn)MySQL指定表增量同步數(shù)據(jù)到clickhouse的腳本
這篇文章主要介紹了python實(shí)現(xiàn)MySQL指定表增量同步數(shù)據(jù)到clickhouse的腳本,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-02-02
Python requests獲取網(wǎng)頁(yè)常用方法解析
這篇文章主要介紹了Python requests獲取網(wǎng)頁(yè)常用方法解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02
Python urlencode和unquote函數(shù)使用實(shí)例解析
這篇文章主要介紹了Python urlencode和unquote函數(shù)使用實(shí)例解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03
詳解PyQt5 GUI 接收UDP數(shù)據(jù)并動(dòng)態(tài)繪圖的過(guò)程(多線程間信號(hào)傳遞)
這篇文章主要介紹了PyQt5 GUI 接收UDP數(shù)據(jù)并動(dòng)態(tài)繪圖(多線程間信號(hào)傳遞),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-09-09
keras 讀取多標(biāo)簽圖像數(shù)據(jù)方式
這篇文章主要介紹了keras 讀取多標(biāo)簽圖像數(shù)據(jù)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-06-06
如何通過(guò)神經(jīng)網(wǎng)絡(luò)實(shí)現(xiàn)線性回歸的擬合
這篇文章主要介紹了如何通過(guò)神經(jīng)網(wǎng)絡(luò)實(shí)現(xiàn)線性回歸的擬合問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-05-05
Python的pytest測(cè)試框架中fixture的使用詳解
這篇文章主要介紹了pytest中fixture的使用詳解,pytest是一個(gè)非常成熟的全功能的Python測(cè)試框架,能夠支持簡(jiǎn)單的單元測(cè)試和復(fù)雜的功能測(cè)試,還可以用來(lái)做selenium/appnium等自動(dòng)化測(cè)試、接口自動(dòng)化測(cè)試,需要的朋友可以參考下2023-07-07
使用python將大量數(shù)據(jù)導(dǎo)出到Excel中的小技巧分享
今天小編就為大家分享一篇使用python將大量數(shù)據(jù)導(dǎo)出到Excel中的小技巧心得,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-06-06

