神經(jīng)網(wǎng)絡(luò)python源碼分享
神經(jīng)網(wǎng)絡(luò)的邏輯應(yīng)該都是熟知的了,在這里想說明一下交叉驗(yàn)證
交叉驗(yàn)證方法:

看圖大概就能理解了,大致就是先將數(shù)據(jù)集分成K份,對這K份中每一份都取不一樣的比例數(shù)據(jù)進(jìn)行訓(xùn)練和測試。得出K個(gè)誤差,將這K個(gè)誤差平均得到最終誤差
這第一個(gè)部分是BP神經(jīng)網(wǎng)絡(luò)的建立
參數(shù)選取參照論文:基于數(shù)據(jù)挖掘技術(shù)的股價(jià)指數(shù)分析與預(yù)測研究_胡林林
import math
import random
import tushare as ts
import pandas as pd
random.seed(0)
def getData(id,start,end):
df = ts.get_hist_data(id,start,end)
DATA=pd.DataFrame(columns=['rate1', 'rate2','rate3','pos1','pos2','pos3','amt1','amt2','amt3','MA20','MA5','r'])
P1 = pd.DataFrame(columns=['high','low','close','open','volume'])
DATA2=pd.DataFrame(columns=['R'])
DATA['MA20']=df['ma20']
DATA['MA5']=df['ma5']
P=df['close']
P1['high']=df['high']
P1['low']=df['low']
P1['close']=df['close']
P1['open']=df['open']
P1['volume']=df['volume']
DATA['rate1']=(P1['close'].shift(1)-P1['open'].shift(1))/P1['open'].shift(1)
DATA['rate2']=(P1['close'].shift(2)-P1['open'].shift(2))/P1['open'].shift(2)
DATA['rate3']=(P1['close'].shift(3)-P1['open'].shift(3))/P1['open'].shift(3)
DATA['pos1']=(P1['close'].shift(1)-P1['low'].shift(1))/(P1['high'].shift(1)-P1['low'].shift(1))
DATA['pos2']=(P1['close'].shift(2)-P1['low'].shift(2))/(P1['high'].shift(2)-P1['low'].shift(2))
DATA['pos3']=(P1['close'].shift(3)-P1['low'].shift(3))/(P1['high'].shift(3)-P1['low'].shift(3))
DATA['amt1']=P1['volume'].shift(1)/((P1['volume'].shift(1)+P1['volume'].shift(2)+P1['volume'].shift(3))/3)
DATA['amt2']=P1['volume'].shift(2)/((P1['volume'].shift(2)+P1['volume'].shift(3)+P1['volume'].shift(4))/3)
DATA['amt3']=P1['volume'].shift(3)/((P1['volume'].shift(3)+P1['volume'].shift(4)+P1['volume'].shift(5))/3)
templist=(P-P.shift(1))/P.shift(1)
tempDATA = []
for indextemp in templist:
tempDATA.append(1/(1+math.exp(-indextemp*100)))
DATA['r'] = tempDATA
DATA=DATA.dropna(axis=0)
DATA2['R']=DATA['r']
del DATA['r']
DATA=DATA.T
DATA2=DATA2.T
DATAlist=DATA.to_dict("list")
result = []
for key in DATAlist:
result.append(DATAlist[key])
DATAlist2=DATA2.to_dict("list")
result2 = []
for key in DATAlist2:
result2.append(DATAlist2[key])
return result
def getDataR(id,start,end):
df = ts.get_hist_data(id,start,end)
DATA=pd.DataFrame(columns=['rate1', 'rate2','rate3','pos1','pos2','pos3','amt1','amt2','amt3','MA20','MA5','r'])
P1 = pd.DataFrame(columns=['high','low','close','open','volume'])
DATA2=pd.DataFrame(columns=['R'])
DATA['MA20']=df['ma20'].shift(1)
DATA['MA5']=df['ma5'].shift(1)
P=df['close']
P1['high']=df['high']
P1['low']=df['low']
P1['close']=df['close']
P1['open']=df['open']
P1['volume']=df['volume']
DATA['rate1']=(P1['close'].shift(1)-P1['open'].shift(1))/P1['open'].shift(1)
DATA['rate2']=(P1['close'].shift(2)-P1['open'].shift(2))/P1['open'].shift(2)
DATA['rate3']=(P1['close'].shift(3)-P1['open'].shift(3))/P1['open'].shift(3)
DATA['pos1']=(P1['close'].shift(1)-P1['low'].shift(1))/(P1['high'].shift(1)-P1['low'].shift(1))
DATA['pos2']=(P1['close'].shift(2)-P1['low'].shift(2))/(P1['high'].shift(2)-P1['low'].shift(2))
DATA['pos3']=(P1['close'].shift(3)-P1['low'].shift(3))/(P1['high'].shift(3)-P1['low'].shift(3))
DATA['amt1']=P1['volume'].shift(1)/((P1['volume'].shift(1)+P1['volume'].shift(2)+P1['volume'].shift(3))/3)
DATA['amt2']=P1['volume'].shift(2)/((P1['volume'].shift(2)+P1['volume'].shift(3)+P1['volume'].shift(4))/3)
DATA['amt3']=P1['volume'].shift(3)/((P1['volume'].shift(3)+P1['volume'].shift(4)+P1['volume'].shift(5))/3)
templist=(P-P.shift(1))/P.shift(1)
tempDATA = []
for indextemp in templist:
tempDATA.append(1/(1+math.exp(-indextemp*100)))
DATA['r'] = tempDATA
DATA=DATA.dropna(axis=0)
DATA2['R']=DATA['r']
del DATA['r']
DATA=DATA.T
DATA2=DATA2.T
DATAlist=DATA.to_dict("list")
result = []
for key in DATAlist:
result.append(DATAlist[key])
DATAlist2=DATA2.to_dict("list")
result2 = []
for key in DATAlist2:
result2.append(DATAlist2[key])
return result2
def rand(a, b):
return (b - a) * random.random() + a
def make_matrix(m, n, fill=0.0):
mat = []
for i in range(m):
mat.append([fill] * n)
return mat
def sigmoid(x):
return 1.0 / (1.0 + math.exp(-x))
def sigmod_derivate(x):
return x * (1 - x)
class BPNeuralNetwork:
def __init__(self):
self.input_n = 0
self.hidden_n = 0
self.output_n = 0
self.input_cells = []
self.hidden_cells = []
self.output_cells = []
self.input_weights = []
self.output_weights = []
self.input_correction = []
self.output_correction = []
def setup(self, ni, nh, no):
self.input_n = ni + 1
self.hidden_n = nh
self.output_n = no
# init cells
self.input_cells = [1.0] * self.input_n
self.hidden_cells = [1.0] * self.hidden_n
self.output_cells = [1.0] * self.output_n
# init weights
self.input_weights = make_matrix(self.input_n, self.hidden_n)
self.output_weights = make_matrix(self.hidden_n, self.output_n)
# random activate
for i in range(self.input_n):
for h in range(self.hidden_n):
self.input_weights[i][h] = rand(-0.2, 0.2)
for h in range(self.hidden_n):
for o in range(self.output_n):
self.output_weights[h][o] = rand(-2.0, 2.0)
# init correction matrix
self.input_correction = make_matrix(self.input_n, self.hidden_n)
self.output_correction = make_matrix(self.hidden_n, self.output_n)
def predict(self, inputs):
# activate input layer
for i in range(self.input_n - 1):
self.input_cells[i] = inputs[i]
# activate hidden layer
for j in range(self.hidden_n):
total = 0.0
for i in range(self.input_n):
total += self.input_cells[i] * self.input_weights[i][j]
self.hidden_cells[j] = sigmoid(total)
# activate output layer
for k in range(self.output_n):
total = 0.0
for j in range(self.hidden_n):
total += self.hidden_cells[j] * self.output_weights[j][k]
self.output_cells[k] = sigmoid(total)
return self.output_cells[:]
def back_propagate(self, case, label, learn, correct):
# feed forward
self.predict(case)
# get output layer error
output_deltas = [0.0] * self.output_n
for o in range(self.output_n):
error = label[o] - self.output_cells[o]
output_deltas[o] = sigmod_derivate(self.output_cells[o]) * error
# get hidden layer error
hidden_deltas = [0.0] * self.hidden_n
for h in range(self.hidden_n):
error = 0.0
for o in range(self.output_n):
error += output_deltas[o] * self.output_weights[h][o]
hidden_deltas[h] = sigmod_derivate(self.hidden_cells[h]) * error
# update output weights
for h in range(self.hidden_n):
for o in range(self.output_n):
change = output_deltas[o] * self.hidden_cells[h]
self.output_weights[h][o] += learn * change + correct * self.output_correction[h][o]
self.output_correction[h][o] = change
# update input weights
for i in range(self.input_n):
for h in range(self.hidden_n):
change = hidden_deltas[h] * self.input_cells[i]
self.input_weights[i][h] += learn * change + correct * self.input_correction[i][h]
self.input_correction[i][h] = change
# get global error
error = 0.0
for o in range(len(label)):
error += 0.5 * (label[o] - self.output_cells[o]) ** 2
return error
def train(self, cases, labels, limit=10000, learn=0.05, correct=0.1):
for i in range(limit):
error = 0.0
for i in range(len(cases)):
label = labels[i]
case = cases[i]
error += self.back_propagate(case, label, learn, correct)
def test(self,id):
result=getData("000001", "2015-01-05", "2015-01-09")
result2=getDataR("000001", "2015-01-05", "2015-01-09")
self.setup(11, 5, 1)
self.train(result, result2, 10000, 0.05, 0.1)
for t in resulttest:
print(self.predict(t))
下面是選取14-15年數(shù)據(jù)進(jìn)行訓(xùn)練,16年數(shù)據(jù)作為測試集,調(diào)倉周期為20個(gè)交易日,大約1個(gè)月,對上證50中的股票進(jìn)行預(yù)測,選取預(yù)測的漲幅前10的股票買入,對每只股票分配一樣的資金,初步運(yùn)行沒有問題,但就是太慢了,等哪天有空了再運(yùn)行
import BPnet
import tushare as ts
import pandas as pd
import math
import xlrd
import datetime as dt
import time
#
#nn =BPnet.BPNeuralNetwork()
#nn.test('000001')
#for i in ts.get_sz50s()['code']:
holdList=pd.DataFrame(columns=['time','id','value'])
share=ts.get_sz50s()['code']
time2=ts.get_k_data('000001')['date']
newtime = time2[400:640]
newcount=0
for itime in newtime:
print(itime)
if newcount % 20 == 0:
sharelist = pd.DataFrame(columns=['time','id','value'])
for ishare in share:
backwardtime = time.strftime('%Y-%m-%d',time.localtime(time.mktime(time.strptime(itime,'%Y-%m-%d'))-432000*4))
trainData = BPnet.getData(ishare, '2014-05-22',itime)
trainDataR = BPnet.getDataR(ishare, '2014-05-22',itime)
testData = BPnet.getData(ishare, backwardtime,itime)
try:
print(testData)
testData = testData[-1]
print(testData)
nn = BPnet.BPNeuralNetwork()
nn.setup(11, 5, 1)
nn.train(trainData, trainDataR, 10000, 0.05, 0.1)
value = nn.predict(testData)
newlist= pd.DataFrame({'time':itime,"id":ishare,"value":value},index=["0"])
sharelist = sharelist.append(newlist,ignore_index=True)
except:
pass
sharelist=sharelist.sort(columns ='value',ascending=False)
sharelist = sharelist[:10]
holdList=holdList.append(sharelist,ignore_index=True)
newcount+=1
print(holdList)
總結(jié)
以上就是本文關(guān)于神經(jīng)網(wǎng)絡(luò)python源碼分享的全部內(nèi)容,希望對大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站:
神經(jīng)網(wǎng)絡(luò)理論基礎(chǔ)及Python實(shí)現(xiàn)詳解
Python語言實(shí)現(xiàn)百度語音識(shí)別API的使用實(shí)例
Python通過matplotlib繪制動(dòng)畫簡單實(shí)例
如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!
- Python編程實(shí)現(xiàn)的簡單神經(jīng)網(wǎng)絡(luò)算法示例
- python機(jī)器學(xué)習(xí)之神經(jīng)網(wǎng)絡(luò)(二)
- python機(jī)器學(xué)習(xí)之神經(jīng)網(wǎng)絡(luò)(一)
- Python實(shí)現(xiàn)感知器模型、兩層神經(jīng)網(wǎng)絡(luò)
- Python與人工神經(jīng)網(wǎng)絡(luò):使用神經(jīng)網(wǎng)絡(luò)識(shí)別手寫圖像介紹
- 神經(jīng)網(wǎng)絡(luò)理論基礎(chǔ)及Python實(shí)現(xiàn)詳解
- Python實(shí)現(xiàn)的人工神經(jīng)網(wǎng)絡(luò)算法示例【基于反向傳播算法】
- Python基于numpy靈活定義神經(jīng)網(wǎng)絡(luò)結(jié)構(gòu)的方法
- Python實(shí)現(xiàn)的遞歸神經(jīng)網(wǎng)絡(luò)簡單示例
- python實(shí)現(xiàn)簡單神經(jīng)網(wǎng)絡(luò)算法
相關(guān)文章
對python中arange()和linspace()的區(qū)別說明
這篇文章主要介紹了對python中arange()和linspace()的區(qū)別說明,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-05-05
Python與數(shù)據(jù)庫的交互問題小結(jié)
這篇文章主要介紹了Python與數(shù)據(jù)庫的交互,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-12-12
Python(TensorFlow框架)實(shí)現(xiàn)手寫數(shù)字識(shí)別系統(tǒng)的方法
這篇文章主要介紹了Python(TensorFlow框架)實(shí)現(xiàn)手寫數(shù)字識(shí)別系統(tǒng)的方法。小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-05-05
python找出一個(gè)列表中相同元素的多個(gè)索引實(shí)例
今天小編就為大家分享一篇python找出一個(gè)列表中相同元素的多個(gè)索引實(shí)例,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-06-06
Python實(shí)現(xiàn)在PDF中插入單圖像水印和平鋪圖像水印
這篇文章主要為大家詳細(xì)介紹了如何使用Python實(shí)現(xiàn)在PDF中插入單圖像水印和平鋪圖像水印,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-04-04
Python接口自動(dòng)化淺析登錄接口測試實(shí)戰(zhàn)
本文主要接好了python接口自動(dòng)化的接口概念、接口用例設(shè)計(jì)及登錄,跟隨本文章來進(jìn)行一個(gè)接口用例設(shè)計(jì)及登錄接口測試實(shí)戰(zhàn),有需要的朋友可以參考下2021-08-08
Python圖像濾波處理操作示例【基于ImageFilter類】
這篇文章主要介紹了Python圖像濾波處理操作,結(jié)合實(shí)例形式分析了Python基于ImageFilter類實(shí)現(xiàn)的濾波處理相關(guān)操作技巧,需要的朋友可以參考下2019-01-01

