python3實(shí)現(xiàn)ftp服務(wù)功能(客戶端)
本文實(shí)例為大家分享了python3實(shí)現(xiàn)ftp服務(wù)功能的具體代碼,供大家參考,具體內(nèi)容如下
客戶端 main代碼:
#Author by Andy
#_*_ coding:utf-8 _*_
'''
This program is used to create a ftp client
'''
import socket,os,json,time,hashlib,sys
class Ftp_client(object):
def __init__(self):
self.client = socket.socket()
def help(self):
msg = '''useage:
ls
pwd
cd dir(example: / .. . /var)
put filename
rm filename
get filename
mkdir directory name
'''
print(msg)
def connect(self,addr,port):
self.client.connect((addr,port))
def auth(self):
m = hashlib.md5()
username = input("請(qǐng)輸入用戶名:").strip()
m.update(input("請(qǐng)輸入密碼:").strip().encode())
password = m.hexdigest()
user_info = {
'action':'auth',
'username':username,
'password':password}
self.client.send(json.dumps(user_info).encode('utf-8'))
server_response = self.client.recv(1024).decode()
# print(server_response)
return server_response
def interactive(self):
while True:
msg = input(">>>:").strip()
if not msg:
print("不能發(fā)送空內(nèi)容!")
continue
cmd = msg.split()[0]
if hasattr(self,cmd):
func = getattr(self,cmd)
func(msg)
else:
self.help()
continue
def put(self,*args):
cmd_split = args[0].split()
if len(cmd_split) > 1:
filename = cmd_split[1]
if os.path.isfile(filename):
filesize = os.stat(filename).st_size
file_info = {
"action":"put",
"filename":filename,
"size":filesize,
"overriding":'True'
}
self.client.send( json.dumps(file_info).encode('utf-8') )
#防止粘包,等待服務(wù)器確認(rèn)。
request_code = {
'200': 'Ready to recceive data!',
'210': 'Not ready to received data!'
}
server_response = self.client.recv(1024).decode()
if server_response == '200':
f = open(filename,"rb")
send_size = 0
start_time = time.time()
for line in f:
self.client.send(line)
send_size += len(line)
send_percentage = int((send_size / filesize) * 100)
while True:
progress = ('\r已上傳%sMB(%s%%)' % (round(send_size / 102400, 2), send_percentage)).encode(
'utf-8')
os.write(1, progress)
sys.stdout.flush()
time.sleep(0.0001)
break
else:
end_time = time.time()
time_use = int(end_time - start_time)
print("\nFile %s has been sent successfully!" % filename)
print('\n平均下載速度%s MB/s' % (round(round(send_size / 102400, 2) / time_use, 2)))
f.close()
else:
print("Sever isn't ready to receive data!")
time.sleep(10)
start_time = time.time()
f = open(filename, "rb")
send_size = 0
for line in f:
self.client.send(line)
send_size += len(line)
# print(send_size)
while True:
send_percentage = int((send_size / filesize) * 100)
progress = ('\r已上傳%sMB(%s%%)' % (round(send_size / 102400, 2), send_percentage)).encode(
'utf-8')
os.write(1, progress)
sys.stdout.flush()
# time.sleep(0.0001)
break
else:
end_time = time.time()
time_use = int(end_time - start_time)
print("File %s has been sent successfully!" % filename)
print('\n平均下載速度%s MB/s' % (round(round(send_size / 102400, 2) / time_use, 2)))
f.close()
else:
print("File %s is not exit!" %filename)
else:
self.help()
def ls(self,*args):
cmd_split = args[0].split()
# print(cmd_split)
if len(cmd_split) > 1:
path = cmd_split[1]
elif len(cmd_split) == 1:
path = '.'
request_info = {
'action': 'ls',
'path': path
}
self.client.send(json.dumps(request_info).encode('utf-8'))
sever_response = self.client.recv(1024).decode()
print(sever_response)
def pwd(self,*args):
cmd_split = args[0].split()
if len(cmd_split) == 1:
request_info = {
'action': 'pwd',
}
self.client.send(json.dumps(request_info).encode("utf-8"))
server_response = self.client.recv(1024).decode()
print(server_response)
else:
self.help()
def get(self,*args):
cmd_split = args[0].split()
if len(cmd_split) > 1:
filename = cmd_split[1]
file_info = {
"action": "get",
"filename": filename,
"overriding": 'True'
}
self.client.send(json.dumps(file_info).encode('utf-8'))
server_response = self.client.recv(1024).decode() #服務(wù)器反饋文件是否存在
self.client.send('0'.encode('utf-8'))
if server_response == '0':
file_size = int(self.client.recv(1024).decode())
# print(file_size)
self.client.send('0'.encode('utf-8')) #確認(rèn)開始傳輸數(shù)據(jù)
if os.path.isfile(filename):
filename = filename+'.new'
f = open(filename,'wb')
receive_size = 0
m = hashlib.md5()
start_time = time.time()
while receive_size < file_size:
if file_size - receive_size > 1024: # 還需接收不止1次
size = 1024
else:
size = file_size - receive_size
data = self.client.recv(size)
m.update(data)
receive_size += len(data)
data_percent=int((receive_size / file_size) * 100)
f.write(data)
progress = ('\r已下載%sMB(%s%%)' %(round(receive_size/102400,2),data_percent)).encode('utf-8')
os.write(1,progress)
sys.stdout.flush()
time.sleep(0.0001)
else:
end_time = time.time()
time_use = int(end_time - start_time)
print('\n平均下載速度%s MB/s'%(round(round(receive_size/102400,2)/time_use,2)))
Md5_server = self.client.recv(1024).decode()
Md5_client = m.hexdigest()
print('文件校驗(yàn)中,請(qǐng)稍候...')
time.sleep(0.3)
if Md5_server == Md5_client:
print('文件正常。')
else:
print('文件與服務(wù)器MD5值不符,請(qǐng)確認(rèn)!')
else:
print('File not found!')
client.interactive()
else:
self.help()
def rm(self,*args):
cmd_split = args[0].split()
if len(cmd_split) > 1:
filename = cmd_split[1]
request_info = {
'action':'rm',
'filename': filename,
'prompt':'Y'
}
self.client.send(json.dumps(request_info).encode("utf-8"))
server_response = self.client.recv(10240).decode()
request_code = {
'0':'confirm to deleted',
'1':'cancel to deleted'
}
if server_response == '0':
confirm = input("請(qǐng)確認(rèn)是否真的刪除該文件:")
if confirm == 'Y' or confirm == 'y':
self.client.send('0'.encode("utf-8"))
print(self.client.recv(1024).decode())
else:
self.client.send('1'.encode("utf-8"))
print(self.client.recv(1024).decode())
else:
print('File not found!')
client.interactive()
else:
self.help()
def cd(self,*args):
cmd_split = args[0].split()
if len(cmd_split) > 1:
path = cmd_split[1]
elif len(cmd_split) == 1:
path = '.'
request_info = {
'action':'cd',
'path':path
}
self.client.send(json.dumps(request_info).encode("utf-8"))
server_response = self.client.recv(10240).decode()
print(server_response)
def mkdir(self,*args):
request_code = {
'0': 'Directory has been made!',
'1': 'Directory is aleady exist!'
}
cmd_split = args[0].split()
if len(cmd_split) > 1:
dir_name = cmd_split[1]
request_info = {
'action':'mkdir',
'dir_name': dir_name
}
self.client.send(json.dumps(request_info).encode("utf-8"))
server_response = self.client.recv(1024).decode()
if server_response == '0':
print('Directory has been made!')
else:
print('Directory is aleady exist!')
else:
self.help()
# def touch(self,*args):
def run():
client = Ftp_client()
# client.connect('10.1.2.3',6969)
Addr = input("請(qǐng)輸入服務(wù)器IP:").strip()
Port = int(input("請(qǐng)輸入端口號(hào):").strip())
client.connect(Addr,Port)
while True:
if client.auth() == '0':
print("Welcome.....")
client.interactive()
break
else:
print("用戶名或密碼錯(cuò)誤!")
continue
目錄結(jié)構(gòu):

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- python3實(shí)現(xiàn)ftp服務(wù)功能(服務(wù)端 For Linux)
- 詳解Python下ftp上傳文件linux服務(wù)器
- Python FTP兩個(gè)文件夾間的同步實(shí)例代碼
- Python基于FTP模塊實(shí)現(xiàn)ftp文件上傳操作示例
- python2.7實(shí)現(xiàn)FTP文件下載功能
- Python實(shí)現(xiàn)的FTP通信客戶端與服務(wù)器端功能示例
- 1 行 Python 代碼快速實(shí)現(xiàn) FTP 服務(wù)器
- Python搭建FTP服務(wù)器的方法示例
- Python實(shí)現(xiàn)基于多線程、多用戶的FTP服務(wù)器與客戶端功能完整實(shí)例
- python實(shí)現(xiàn)FTP服務(wù)器服務(wù)的方法
- 詳解Python3的TFTP文件傳輸
相關(guān)文章
python用sqlacodegen根據(jù)已有數(shù)據(jù)庫(表)結(jié)構(gòu)生成對(duì)應(yīng)SQLAlchemy模型
本文介紹了如何使用sqlacodegen獲取數(shù)據(jù)庫所有表的模型類,然后使用ORM技術(shù)進(jìn)行CRUD操作,有此需求的朋友可以了解下本文2021-06-06
pyecharts繪制各種數(shù)據(jù)可視化圖表案例附效果+代碼
這篇文章主要介紹了pyecharts繪制各種數(shù)據(jù)可視化圖表案例并附效果和代碼,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,感興趣的小伙伴可以參考一下2022-06-06
Python使用Opencv實(shí)現(xiàn)邊緣檢測(cè)以及輪廓檢測(cè)的實(shí)現(xiàn)
這篇文章主要介紹了Python使用Opencv實(shí)現(xiàn)邊緣檢測(cè)以及輪廓檢測(cè)的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12
使用PyTorch常見4個(gè)錯(cuò)誤解決示例詳解
這篇文章主要為大家介紹了使用PyTorch常見4個(gè)錯(cuò)誤解決示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10
Python創(chuàng)建模塊及模塊導(dǎo)入的方法
這篇文章主要介紹了Python創(chuàng)建模塊及模塊導(dǎo)入的方法,實(shí)例分析了模塊的定義、導(dǎo)入及模塊屬性的使用技巧,并附帶說明了包的概念與用法,需要的朋友可以參考下2015-05-05
python在windows下創(chuàng)建隱藏窗口子進(jìn)程的方法
這篇文章主要介紹了python在windows下創(chuàng)建隱藏窗口子進(jìn)程的方法,涉及Python使用subprocess模塊操作進(jìn)程的相關(guān)技巧,需要的朋友可以參考下2015-06-06

