Python基于socket實(shí)現(xiàn)TCP客戶端和服務(wù)端
一、基于socket實(shí)現(xiàn)的TCP客戶端
import socket
?
# 建立socket對象
# 參數(shù)一表示IP地址類型(AF_INET為IPV4,AF_INET6為IPV6),參數(shù)二表示連接的類型(SOCK_STREAM表示TCP形式,SOCK_DGRAM表示UDP形式)
client_socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM) ?# 代表(ipv4,TCP)
?
# 連接服務(wù)器(元組中填目標(biāo)ip地址和端口號)
client_socket.connect(('127.0.0.1',7777))
# 準(zhǔn)備數(shù)據(jù),需要轉(zhuǎn)換為二進(jìn)制數(shù)據(jù),encode()中填寫的是本地的字符串編碼格式,mac、linux填utf-8
data='hello'.encode('gbk')
# 向服務(wù)器發(fā)送數(shù)據(jù)
client_socket.send(data)
# 接收數(shù)據(jù),必須要指定接收數(shù)據(jù)的大小,單位字節(jié),最大4096,即4k
recv_data=client_socket.recv(1024)
# 接收的數(shù)據(jù)要進(jìn)行decode()解碼,發(fā)送的時(shí)候用啥編碼就填啥編碼
recv_data=recv_data.decode('gbk')
print(recv_data)
# 關(guān)閉連接
client_socket.close()二、基于socket實(shí)現(xiàn)的TCP服務(wù)端
import socket
# 建立socket對象
server_socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 如果服務(wù)器是一次性的,如果服務(wù)器結(jié)束后馬上重新啟動(dòng)會(huì)出現(xiàn)一個(gè)錯(cuò)誤,原因是地址和端口沒有被釋放
# OSError: [Errno 48] Address already in use
# 如果想馬上釋放,要設(shè)置一下socket選項(xiàng)
server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,True)
# 綁定IP和端口,如果在綁定IP時(shí)沒有給定IP,默認(rèn)是綁定本地IP
server_socket.bind(('',7777))
# 設(shè)置監(jiān)聽(最大監(jiān)聽數(shù)),設(shè)置完后服務(wù)端會(huì)進(jìn)入被動(dòng)模式,不能主動(dòng)連接客戶端,只能被動(dòng)地等待客戶端的連接
server_socket.listen(128)
# 等待客戶端連接,連接上后,函數(shù)會(huì)返回客戶端的Socket對象和地址信息
client_socket,ip_port=server_socket.accept()
print(f'客戶端{(lán)ip_port[0]}使用端口{ip_port[1]}連接成功...')
# 接收客戶端數(shù)據(jù)
data=client_socket.recv(1024)
# 查看客戶端發(fā)送的數(shù)據(jù)長度
if len(data)!=0:
? ? data=data.decode('gbk')
? ? print(f'客戶端{(lán)ip_port[0]}使用端口{ip_port[1]}發(fā)送是數(shù)據(jù)是{data}')
else:
? ? print(f'客戶端{(lán)ip_port[0]}使用端口{ip_port[1]}關(guān)閉了連接')
?
# 給客戶端發(fā)送數(shù)據(jù)
data='你好'.encode('gbk')
client_socket.send(data)
# 關(guān)閉客戶端
client_socket.close()
# 關(guān)閉服務(wù)端
server_socket.close()三、socket實(shí)現(xiàn)的多任務(wù)版TCP服務(wù)端
import socket
import threading
?
?
def client_task(client_socket,ip_port):
? ? print(ip_port,'加入連接')
? ? # 持續(xù)接收客戶端的消息
? ? while True:
? ? ? ? data=client_socket.recv(1024).decode('gbk')
? ? ? ? if len(data)!=0:
? ? ? ? ? ? print(f'客戶端{(lán)ip_port[0]}發(fā)來的信息是{data}')
? ? ? ? else:
? ? ? ? ? ? print(f'客戶端{(lán)ip_port[0]}已經(jīng)斷開連接')
? ? ? ? ? ? break
? ? ? ? send_data=('Hello--'+data).encode('gbk')
? ? ? ? client_socket.send(send_data)
?
?
if __name__ == '__main__':
? ? server_socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
? ? server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,True)
? ? server_socket.bind(('',7777))
? ? server_socket.listen(128)
? ? # 循環(huán)接收客戶連接
? ? while True:
? ? ? ? client_socket,ip_port=server_socket.accept() ? ?# 會(huì)一直等待接收連接
? ? ? ? t_client=threading.Thread(target=client_task,args=(client_socket,ip_port))
? ? ? ? t_client.setDaemon(True)
? ? ? ? t_client.start()1、面向?qū)ο蟀姹?/h3>
'''
基于socket實(shí)現(xiàn)的多任務(wù)版TCP服務(wù)端(面向?qū)ο螅?
'''
?
import socket
import threading
?
class SocketServer(object):
? ? def __init__(self, port):
? ? ? ? self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
? ? ? ? self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
? ? ? ? self.server_socket.bind(('', port))
? ? ? ? self.server_socket.listen(128)
?
?
? ? def start(self):
? ? ? ? # 循環(huán)接收客戶連接
? ? ? ? while True:
? ? ? ? ? ? client_socket, ip_port = self.server_socket.accept() ?# 會(huì)一直等待接收連接
? ? ? ? ? ? t_client = threading.Thread(target=self.client_task, args=(client_socket, ip_port))
? ? ? ? ? ? t_client.setDaemon(True)
? ? ? ? ? ? t_client.start()
?
? ? def client_task(self,client_socket, ip_port):
? ? ? ? print(ip_port, '加入連接')
? ? ? ? # 持續(xù)接收客戶端的消息
? ? ? ? while True:
? ? ? ? ? ? data = client_socket.recv(1024).decode('gbk')
? ? ? ? ? ? if len(data) != 0:
? ? ? ? ? ? ? ? print(f'客戶端{(lán)ip_port[0]}發(fā)來的信息是{data}')
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? print(f'客戶端{(lán)ip_port[0]}已經(jīng)斷開連接')
? ? ? ? ? ? ? ? break
? ? ? ? ? ? send_data = ('Hello--' + data).encode('gbk')
? ? ? ? ? ? client_socket.send(send_data)
?
?
if __name__ == '__main__':
? ?server_socket=SocketServer(7777)
? ?server_socket.start()
'''
基于socket實(shí)現(xiàn)的多任務(wù)版TCP服務(wù)端(面向?qū)ο螅?
'''
?
import socket
import threading
?
class SocketServer(object):
? ? def __init__(self, port):
? ? ? ? self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
? ? ? ? self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
? ? ? ? self.server_socket.bind(('', port))
? ? ? ? self.server_socket.listen(128)
?
?
? ? def start(self):
? ? ? ? # 循環(huán)接收客戶連接
? ? ? ? while True:
? ? ? ? ? ? client_socket, ip_port = self.server_socket.accept() ?# 會(huì)一直等待接收連接
? ? ? ? ? ? t_client = threading.Thread(target=self.client_task, args=(client_socket, ip_port))
? ? ? ? ? ? t_client.setDaemon(True)
? ? ? ? ? ? t_client.start()
?
? ? def client_task(self,client_socket, ip_port):
? ? ? ? print(ip_port, '加入連接')
? ? ? ? # 持續(xù)接收客戶端的消息
? ? ? ? while True:
? ? ? ? ? ? data = client_socket.recv(1024).decode('gbk')
? ? ? ? ? ? if len(data) != 0:
? ? ? ? ? ? ? ? print(f'客戶端{(lán)ip_port[0]}發(fā)來的信息是{data}')
? ? ? ? ? ? else:
? ? ? ? ? ? ? ? print(f'客戶端{(lán)ip_port[0]}已經(jīng)斷開連接')
? ? ? ? ? ? ? ? break
? ? ? ? ? ? send_data = ('Hello--' + data).encode('gbk')
? ? ? ? ? ? client_socket.send(send_data)
?
?
if __name__ == '__main__':
? ?server_socket=SocketServer(7777)
? ?server_socket.start()到此這篇關(guān)于Python基于socket實(shí)現(xiàn)TCP客戶端和服務(wù)端的文章就介紹到這了,更多相關(guān)socket實(shí)現(xiàn) TCP客戶端和服務(wù)端內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python復(fù)制Word內(nèi)容并使用格式設(shè)字體與大小實(shí)例代碼
這篇文章主要介紹了Python復(fù)制Word內(nèi)容并使用格式設(shè)字體與大小實(shí)例代碼,小編覺得還是挺不錯(cuò)的,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01
Windows下Python的Django框架環(huán)境部署及應(yīng)用編寫入門
這篇文章主要介紹了Windows下Python的Django框架環(huán)境部署及程序編寫入門,Django在Python的框架中算是一個(gè)重量級的MVC框架,本文將從程序部署開始講到hellow world web應(yīng)用的編寫,需要的朋友可以參考下2016-03-03
python實(shí)例小練習(xí)之Turtle繪制南方的雪花
Turtle庫是Python語言中一個(gè)很流行的繪制圖像的函數(shù)庫,想象一個(gè)小烏龜,在一個(gè)橫軸為x、縱軸為y的坐標(biāo)系原點(diǎn),(0,0)位置開始,它根據(jù)一組函數(shù)指令的控制,在這個(gè)平面坐標(biāo)系中移動(dòng),從而在它爬行的路徑上繪制了圖形2021-09-09
Python中super().__init__()測試以及理解
__init__()一般用來創(chuàng)建對象的實(shí)例變量,或一次性操作,super()用于調(diào)用父類的方法,可用來解決多重繼承問題,下面這篇文章主要給大家介紹了關(guān)于Python中super().__init__()測試及理解的相關(guān)資料,需要的朋友可以參考下2021-12-12
使用Anaconda3建立虛擬獨(dú)立的python2.7環(huán)境方法
今天小編就為大家分享一篇使用Anaconda3建立虛擬獨(dú)立的python2.7環(huán)境方法,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-06-06

