Python使用Selenium模擬瀏覽器自動操作功能
概述
在進行網(wǎng)站爬取數(shù)據(jù)的時候,會發(fā)現(xiàn)很多網(wǎng)站都進行了反爬蟲的處理,如JS加密,Ajax加密,反Debug等方法,通過請求獲取數(shù)據(jù)和頁面展示的內(nèi)容完全不同,這時候就用到Selenium技術(shù),來模擬瀏覽器的操作,然后獲取數(shù)據(jù)。本文以一個簡單的小例子,簡述Python搭配Tkinter和Selenium進行瀏覽器的模擬操作,僅供學(xué)習(xí)分享使用,如有不足之處,還請指正。
什么是Selenium?
Selenium是一個用于Web應(yīng)用程序測試的工具,Selenium測試直接運行在瀏覽器中,就像真正的用戶在操作一樣。支持的瀏覽器包括IE(7, 8, 9, 10, 11),Mozilla Firefox,Safari,Google Chrome,Opera等。Selenium支持多種操作系統(tǒng),如Windows、Linux、IOS等,如果需要支持Android,則需要特殊的selenium,本文主要以IE11瀏覽器為例。
安裝Selenium
通過pip install selenium 進行安裝即可,如果速度慢,則可以使用國內(nèi)的鏡像進行安裝。
涉及知識點
程序雖小,除了需要掌握的Html ,JavaScript,CSS等基礎(chǔ)知識外,本例涉及的Python相關(guān)知識點還是蠻多的,具體如下:
- Selenium相關(guān):
Selenium進行元素定位,主要有ID,Name,ClassName,Css Selector,Partial LinkText,LinkText,XPath,TagName等8種方式。
Selenium獲取單一元素(如:find_element_by_xpath)和獲取元素數(shù)組(如:find_elements_by_xpath)兩種方式。
Selenium元素定位后,可以給元素進行賦值和取值,或者進行相應(yīng)的事件操作(如:click)。
- 線程(Thread)相關(guān):
為了防止前臺頁面卡主,本文用到了線程進行后臺操作,如果要定義一個新的線程,只需要定義一個類并繼承threading.Thread,然后重寫run方法即可。
在使用線程的過程中,為了保證線程的同步,本例用到了線程鎖,如:threading.Lock()。
- 隊列(queue)相關(guān):
本例將Selenium執(zhí)行的過程信息,保存到對列中,并通過線程輸出到頁面顯示。queue默認先進先出方式。
對列通過put進行壓棧,通過get進行出棧。通過qsize()用于獲取當前對列元素個數(shù)。
- 日志(logging.Logger)相關(guān):
為了保存Selenium執(zhí)行過程中的日志,本例用到了日志模塊,為Pyhton自帶的模塊,不需要額外安裝。
Python的日志共六種級別,分別是:NOTSET,DEBUG,INFO,WARN,ERROR,F(xiàn)ATAL,CRITICAL。
示例效果圖
本例主要針對某一配置好的商品ID進行輪詢,監(jiān)控是否有貨,有貨則加入購物車,無貨則繼續(xù)輪詢,如下圖所示:

核心代碼
本例最核心的代碼,就是利用Selenium進行網(wǎng)站的模擬操作,如下所示:
class Smoking:
"""定義Smoking類"""
# 瀏覽器驅(qū)動
__driver: webdriver = None
# 配置幫助類
__cfg_info: dict = {}
# 日志幫助類
__log_helper: LogHelper = None
# 主程序目錄
__work_path: str = ''
# 是否正在運行
__running: bool = False
# 無貨
__no_stock = 'Currently Out of Stock'
# 線程等待秒數(shù)
__wait_sec = 2
def __init__(self, work_path, cfg_info, log_helper: LogHelper):
"""初始化"""
self.__cfg_info = cfg_info
self.__log_helper = log_helper
self.__work_path = work_path
self.__wait_sec = int(cfg_info['wait_sec'])
# 如果小于2,則等于2
self.__wait_sec = (2 if self.__wait_sec < 2 else self.__wait_sec)
def checkIsExistsById(self, id):
"""通過ID判斷是否存在"""
try:
i = 0
while self.__running and i < 3:
if len(self.__driver.find_elements_by_id(id)) > 0:
break
else:
time.sleep(self.__wait_sec)
i = i + 1
return len(self.__driver.find_elements_by_id(id)) > 0
except BaseException as e:
return False
def checkIsExistsByName(self, name):
"""通過名稱判斷是否存在"""
try:
i = 0
while self.__running and i < 3:
if len(self.__driver.find_elements_by_name(name)) > 0:
break
else:
time.sleep(self.__wait_sec)
i = i + 1
return len(self.__driver.find_elements_by_name(name)) > 0
except BaseException as e:
return False
def checkIsExistsByPath(self, path):
"""通過xpath判斷是否存在"""
try:
i = 0
while self.__running and i < 3:
if len(self.__driver.find_elements_by_xpath(path)) > 0:
break
else:
time.sleep(self.__wait_sec)
i = i + 1
return len(self.__driver.find_elements_by_xpath(path)) > 0
except BaseException as e:
return False
def checkIsExistsByClass(self, cls):
"""通過class名稱判斷是否存在"""
try:
i = 0
while self.__running and i < 3:
if len(self.__driver.find_elements_by_class_name(cls)) > 0:
break
else:
time.sleep(self.__wait_sec)
i = i + 1
return len(self.__driver.find_elements_by_class_name(cls)) > 0
except BaseException as e:
return False
def checkIsExistsByLinkText(self, link_text):
"""判斷LinkText是否存在"""
try:
i = 0
while self.__running and i < 3:
if len(self.__driver.find_elements_by_link_text(link_text)) > 0:
break
else:
time.sleep(self.__wait_sec)
i = i + 1
return len(self.__driver.find_elements_by_link_text(link_text)) > 0
except BaseException as e:
return False
def checkIsExistsByPartialLinkText(self, link_text):
"""判斷包含LinkText是否存在"""
try:
i = 0
while self.__running and i < 3:
if len(self.__driver.find_elements_by_partial_link_text(link_text)) > 0:
break
else:
time.sleep(self.__wait_sec)
i = i + 1
return len(self.__driver.find_elements_by_partial_link_text(link_text)) > 0
except BaseException as e:
return False
# def waiting(self, *locator):
# """等待完成"""
# # self.__driver.switch_to.window(self.__driver.window_handles[1])
# Wait(self.__driver, 60).until(EC.visibility_of_element_located(locator))
def login(self, username, password):
"""登錄"""
# 5. 點擊鏈接跳轉(zhuǎn)到登錄頁面
self.__driver.find_element_by_link_text('賬戶登錄').click()
# 6. 輸入賬號密碼
# 判斷是否加載完成
# self.waiting((By.ID, "email"))
if self.checkIsExistsById('email'):
self.__driver.find_element_by_id('email').send_keys(username)
self.__driver.find_element_by_id('password').send_keys(password)
# 7. 點擊登錄按鈕
self.__driver.find_element_by_id('sign-in').click()
def working(self, item_id):
"""工作狀態(tài)"""
while self.__running:
try:
# 正常獲取信息
if self.checkIsExistsById('string'):
self.__driver.find_element_by_id('string').clear()
self.__driver.find_element_by_id('string').send_keys(item_id)
self.__driver.find_element_by_id('string').send_keys(Keys.ENTER)
# 判斷是否查詢到商品
xpath = "http://div[@class='specialty-header search']/div[@class='specialty-description']/div[" \
"@class='gt-450']/span[2] "
if self.checkIsExistsByPath(xpath):
count = int(self.__driver.find_element_by_xpath(xpath).text)
if count < 1:
time.sleep(self.__wait_sec)
self.__log_helper.put('沒有查詢到item id =' + item_id + '對應(yīng)的信息')
continue
else:
time.sleep(self.__wait_sec)
self.__log_helper.put('沒有查詢到item id2 =' + item_id + '對應(yīng)的信息')
continue
# 判斷當前庫存是否有貨
xpath1 = "http://div[@class='product-list']/div[@class='product']/div[@class='price-and-detail']/div[" \
"@class='price']/span[@class='noStock'] "
if self.checkIsExistsByPath(xpath1):
txt = self.__driver.find_element_by_xpath(xpath1).text
if txt == self.__no_stock:
# 當前無貨
time.sleep(self.__wait_sec)
self.__log_helper.put('查詢一次' + item_id + ',無貨')
continue
# 鏈接path1
xpath2 = "http://div[@class='product-list']/div[@class='product']/div[@class='imgDiv']/a"
# 判斷是否加載完畢
# self.waiting((By.CLASS_NAME, "imgDiv"))
if self.checkIsExistsByPath(xpath2):
self.__driver.find_element_by_xpath(xpath2).click()
time.sleep(self.__wait_sec)
# 加入購物車
if self.checkIsExistsByClass('add-to-cart'):
self.__driver.find_element_by_class_name('add-to-cart').click()
self.__log_helper.put('加入購物車成功,商品item-id:' + item_id)
break
else:
self.__log_helper.put('未找到加入購物車按鈕')
else:
self.__log_helper.put('沒有查詢到,可能是商品編碼不對,或者已下架')
except BaseException as e:
self.__log_helper.put(e)
def startRun(self):
"""運行起來"""
try:
self.__running = True
url: str = self.__cfg_info['url']
username = self.__cfg_info['username']
password = self.__cfg_info['password']
item_id = self.__cfg_info['item_id']
if url is None or len(url) == 0 or username is None or len(username) == 0 or password is None or len(
password) == 0 or item_id is None or len(item_id) == 0:
self.__log_helper.put('配置信息不全,請檢查config.cfg文件是否為空,然后再重啟')
return
if self.__driver is None:
options = webdriver.IeOptions()
options.add_argument('encoding=UTF-8')
options.add_argument('Accept= text / css, * / *')
options.add_argument('Accept - Language= zh - Hans - CN, zh - Hans;q = 0.5')
options.add_argument('Accept - Encoding= gzip, deflate')
options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko')
# 2. 定義瀏覽器驅(qū)動對象
self.__driver = webdriver.Ie(executable_path=self.__work_path + r'\IEDriverServer.exe', options=options)
self.run(url, username, password, item_id)
except BaseException as e:
self.__log_helper.put('運行過程中出錯,請重新打開再試')
def run(self, url, username, password, item_id):
"""運行起來"""
# 3. 訪問網(wǎng)站
self.__driver.get(url)
# 4. 最大化窗口
self.__driver.maximize_window()
if self.checkIsExistsByLinkText('賬戶登錄'):
# 判斷是否登錄:未登錄
self.login(username, password)
if self.checkIsExistsByPartialLinkText('歡迎回來'):
# 判斷是否登錄:已登錄
self.__log_helper.put('登錄成功,下一步開始工作了')
self.working(item_id)
else:
self.__log_helper.put('登錄失敗,請設(shè)置賬號密碼')
def stop(self):
"""停止"""
try:
self.__running = False
# 如果驅(qū)動不為空,則關(guān)閉
self.close_browser_nicely(self.__driver)
if self.__driver is not None:
self.__driver.quit()
# 關(guān)閉后切要為None,否則啟動報錯
self.__driver = None
except BaseException as e:
print('Stop Failure')
finally:
self.__driver = None
def close_browser_nicely(self, browser):
try:
browser.execute_script("window.onunload=null; window.onbeforeunload=null")
except Exception as err:
print("Fail to execute_script:'window.onunload=null; window.onbeforeunload=null'")
socket.setdefaulttimeout(10)
try:
browser.quit()
print("Close browser and firefox by calling quit()")
except Exception as err:
print("Fail to quit from browser, error-type:%s, reason:%s" % (type(err), str(err)))
socket.setdefaulttimeout(30)
其他輔助類
日志類(LogHelper),代碼如下:
class LogHelper:
"""日志幫助類"""
__queue: queue.Queue = None # 隊列
__logging: logging.Logger = None # 日志
__running: bool = False # 是否記錄日志
def __init__(self, log_path):
"""初始化類"""
self.__queue = queue.Queue(1000)
self.init_log(log_path)
def put(self, value):
"""添加數(shù)據(jù)"""
# 記錄日志
self.__logging.info(value)
# 添加到隊列
if self.__queue.qsize() < self.__queue.maxsize:
self.__queue.put(value)
def get(self):
"""獲取數(shù)據(jù)"""
if self.__queue.qsize() > 0:
try:
return self.__queue.get(block=False)
except BaseException as e:
return None
else:
return None
def init_log(self, log_path):
"""初始化日志"""
self.__logging = logging.getLogger()
self.__logging.setLevel(logging.INFO)
# 日志
rq = time.strftime('%Y%m%d%H%M', time.localtime(time.time()))
log_name = log_path + rq + '.log'
logfile = log_name
# if not os.path.exists(logfile):
# # 創(chuàng)建空文件
# open(logfile, mode='r')
fh = logging.FileHandler(logfile, mode='a', encoding='UTF-8')
fh.setLevel(logging.DEBUG) # 輸出到file的log等級的開關(guān)
# 第三步,定義handler的輸出格式
formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s")
fh.setFormatter(formatter)
# 第四步,將logger添加到handler里面
self.__logging.addHandler(fh)
def get_running(self):
# 獲取當前記錄日志的狀態(tài)
return self.__running
def set_running(self, v: bool):
# 設(shè)置當前記錄日志的狀態(tài)
self.__running = v
配置類(ConfigHelper)
class ConfigHelper:
"""初始化數(shù)據(jù)類"""
__config_dir = None
__dic_cfg = {}
def __init__(self, config_dir):
"""初始化"""
self.__config_dir = config_dir
def ReadConfigInfo(self):
"""得到配置項"""
parser = ConfigParser()
parser.read(self.__config_dir + r"\config.cfg")
section = parser.sections()[0]
items = parser.items(section)
self.__dic_cfg.clear()
for item in items:
self.__dic_cfg.__setitem__(item[0], item[1])
def getConfigInfo(self):
"""獲取配置信息"""
if len(self.__dic_cfg) == 0:
self.ReadConfigInfo()
return self.__dic_cfg
線程類(MyThread)
class MyThread(threading.Thread):
"""后臺監(jiān)控線程"""
def __init__(self, tid, name, smoking: Smoking, log_helper: LogHelper):
"""線程初始化"""
threading.Thread.__init__(self)
self.threadID = tid
self.name = name
self.smoking = smoking
self.log_helper = log_helper
def run(self):
print("開啟線程: " + self.name)
self.log_helper.put("開啟線程: " + self.name)
# 獲取鎖,用于線程同步
# lock = threading.Lock()
# lock.acquire()
self.smoking.startRun()
# 釋放鎖,開啟下一個線程
# lock.release()
print("結(jié)束線程: " + self.name)
self.log_helper.put("結(jié)束線程: " + self.name)
備注
俠客行 [唐:李白]趙客縵胡纓,吳鉤霜雪明。銀鞍照白馬,颯沓如流星。
十步殺一人,千里不留行。事了拂衣去,深藏身與名。
閑過信陵飲,脫劍膝前橫。將炙啖朱亥,持觴勸侯嬴。
三杯吐然諾,五岳倒為輕。眼花耳熱后,意氣素霓生。
救趙揮金槌,邯鄲先震驚。千秋二壯士,烜赫大梁城。
縱死俠骨香,不慚世上英。誰能書閣下,白首太玄經(jīng)。
到此這篇關(guān)于Python使用Selenium模擬瀏覽器自動操作的文章就介紹到這了,更多相關(guān)Python模擬瀏覽器自動操作內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python?reversed函數(shù)用法小結(jié)
reversed函數(shù)是Python中的內(nèi)置函數(shù)之一,是對給定的序列返回一個逆序序列的迭代器,需要通過遍歷/list/next()等方法獲取作用后的值,本文給大家介紹Python?reversed函數(shù)及用法,感興趣的朋友一起看看吧2023-10-10
NumPy實現(xiàn)ndarray多維數(shù)組操作
NumPy一個非常重要的作用就是可以進行多維數(shù)組的操作,這篇文章主要介紹了NumPy實現(xiàn)ndarray多維數(shù)組操作,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-05-05
只需要100行Python代碼就可以實現(xiàn)的貪吃蛇小游戲
貪吃蛇小游戲相信80、90后小時候肯定都玩過,那么你知道如果通過Python來實現(xiàn)嗎?今天就來教大家,文中有非常詳細的代碼示例,對正在學(xué)習(xí)python的小伙伴們很有幫助,需要的朋友可以參考下2021-05-05

