Node.js分布式爬蟲的架構(gòu)設(shè)計和實現(xiàn)(從安裝到存儲實戰(zhàn))
單機爬蟲一天爬 1 萬條,分布式爬蟲一小時爬 100 萬條。這就是架構(gòu)的力量。
?? 開場:一個讓我加班到凌晨的需求
產(chǎn)品經(jīng)理說:“我們需要爬取全網(wǎng) 500 萬條商品數(shù)據(jù),明天要。”
我看了看我的單機爬蟲,算了一下:
- 每條數(shù)據(jù)平均耗時 2 秒(包括請求、解析、存儲)
- 500 萬 × 2 秒 = 1000 萬秒 ≈ 115 天
“明天要?你在逗我?”
然后我開始研究分布式爬蟲。
一周后,我搭建了一個 10 臺機器的爬蟲集群,每臺機器跑 10 個 Worker。
100 個 Worker 并行工作,500 萬條數(shù)據(jù),12 小時搞定。
這就是分布式的魅力。
??? 分布式爬蟲架構(gòu)
整體架構(gòu)圖
┌─────────────────────────────────────────────────────────────┐
│ Master 節(jié)點 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 任務(wù)調(diào)度器 │ │ URL 管理器 │ │ 監(jiān)控面板 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Redis 消息隊列 │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 待爬取隊列 │ │ 已爬取集合 │ │ 失敗隊列 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────────┼─────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Worker 1 │ │ Worker 2 │ │ Worker N │
│ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │
│ │ 爬取器 │ │ │ │ 爬取器 │ │ │ │ 爬取器 │ │
│ │ 解析器 │ │ │ │ 解析器 │ │ │ │ 解析器 │ │
│ │ 存儲器 │ │ │ │ 存儲器 │ │ │ │ 存儲器 │ │
│ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │
└───────────────┘ └───────────────┘ └───────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ MongoDB 數(shù)據(jù)存儲 │
└─────────────────────────────────────────────────────────────┘
核心組件
- Master 節(jié)點:負(fù)責(zé)任務(wù)調(diào)度、URL 管理、監(jiān)控
- Redis 隊列:任務(wù)分發(fā)、去重、失敗重試
- Worker 節(jié)點:實際執(zhí)行爬取任務(wù)
- MongoDB:存儲爬取的數(shù)據(jù)
??? 環(huán)境準(zhǔn)備
安裝依賴
npm install bull # 基于 Redis 的任務(wù)隊列 npm install ioredis # Redis 客戶端 npm install mongoose # MongoDB ODM npm install puppeteer # 無頭瀏覽器 npm install p-limit # 并發(fā)控制 npm install chalk # 終端彩色輸出 npm install express # 監(jiān)控 API
Docker 啟動 Redis 和 MongoDB
# docker-compose.yml
version: "3.8"
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
mongodb:
image: mongo:6
ports:
- "27017:27017"
volumes:
- mongo_data:/data/db
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: password
volumes:
redis_data:
mongo_data:
docker-compose up -d
?? 任務(wù)隊列實現(xiàn)
使用 Bull 創(chuàng)建隊列
// src/queue/crawlerQueue.js
const Queue = require("bull")
const chalk = require("chalk")
// 創(chuàng)建爬蟲任務(wù)隊列
const crawlerQueue = new Queue("crawler", {
redis: {
host: process.env.REDIS_HOST || "localhost",
port: process.env.REDIS_PORT || 6379,
},
defaultJobOptions: {
attempts: 3, // 失敗重試 3 次
backoff: {
type: "exponential", // 指數(shù)退避
delay: 2000, // 初始延遲 2 秒
},
removeOnComplete: 100, // 保留最近 100 個完成的任務(wù)
removeOnFail: 1000, // 保留最近 1000 個失敗的任務(wù)
},
})
// 隊列事件監(jiān)聽
crawlerQueue.on("completed", (job, result) => {
console.log(chalk.green(`? 任務(wù)完成: ${job.id} - ${job.data.url}`))
})
crawlerQueue.on("failed", (job, err) => {
console.log(chalk.red(`? 任務(wù)失敗: ${job.id} - ${err.message}`))
})
crawlerQueue.on("stalled", (job) => {
console.log(chalk.yellow(`?? 任務(wù)卡住: ${job.id}`))
})
module.exports = { crawlerQueue }
添加任務(wù)到隊列
// src/queue/producer.js
const { crawlerQueue } = require("./crawlerQueue")
const chalk = require("chalk")
/**
* 批量添加爬取任務(wù)
* @param {string[]} urls - URL 列表
* @param {object} options - 任務(wù)選項
*/
async function addCrawlTasks(urls, options = {}) {
console.log(chalk.blue(`?? 添加 ${urls.length} 個任務(wù)到隊列...`))
const jobs = urls.map((url) => ({
name: "crawl",
data: {
url,
...options,
},
opts: {
priority: options.priority || 0,
delay: options.delay || 0,
},
}))
await crawlerQueue.addBulk(jobs)
console.log(chalk.green(`? 任務(wù)添加完成`))
}
/**
* 添加單個任務(wù)
*/
async function addCrawlTask(url, options = {}) {
const job = await crawlerQueue.add("crawl", {
url,
...options,
})
return job
}
/**
* 獲取隊列狀態(tài)
*/
async function getQueueStats() {
const [waiting, active, completed, failed, delayed] = await Promise.all([
crawlerQueue.getWaitingCount(),
crawlerQueue.getActiveCount(),
crawlerQueue.getCompletedCount(),
crawlerQueue.getFailedCount(),
crawlerQueue.getDelayedCount(),
])
return { waiting, active, completed, failed, delayed }
}
module.exports = {
addCrawlTasks,
addCrawlTask,
getQueueStats,
}
?? Worker 實現(xiàn)
爬蟲 Worker
// src/worker/crawlerWorker.js
const { crawlerQueue } = require("../queue/crawlerQueue")
const puppeteer = require("puppeteer")
const { saveToMongo } = require("../storage/mongodb")
const { checkDuplicate, markAsCrawled } = require("../utils/dedup")
const chalk = require("chalk")
let browser = null
/**
* 初始化瀏覽器
*/
async function initBrowser() {
if (!browser) {
browser = await puppeteer.launch({
headless: "new",
args: [
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
],
})
console.log(chalk.green("?? 瀏覽器已啟動"))
}
return browser
}
/**
* 處理爬取任務(wù)
*/
async function processCrawlJob(job) {
const { url, type = "default" } = job.data
// 1. 檢查是否已爬?。ㄈブ兀?
const isDuplicate = await checkDuplicate(url)
if (isDuplicate) {
console.log(chalk.yellow(`?? 跳過已爬取: ${url}`))
return { skipped: true, url }
}
// 2. 初始化瀏覽器
const browser = await initBrowser()
const page = await browser.newPage()
try {
// 3. 設(shè)置頁面
await page.setViewport({ width: 1920, height: 1080 })
await page.setUserAgent(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
)
// 4. 訪問頁面
await page.goto(url, {
waitUntil: "networkidle2",
timeout: 30000,
})
// 5. 根據(jù)類型選擇解析器
let data
switch (type) {
case "product":
data = await parseProductPage(page)
break
case "article":
data = await parseArticlePage(page)
break
default:
data = await parseDefaultPage(page)
}
// 6. 保存數(shù)據(jù)
await saveToMongo(type, {
...data,
url,
crawledAt: new Date(),
})
// 7. 標(biāo)記為已爬取
await markAsCrawled(url)
// 8. 更新進度
await job.progress(100)
return { success: true, url, data }
} finally {
await page.close()
}
}
/**
* 解析商品頁面
*/
async function parseProductPage(page) {
return page.evaluate(() => {
return {
title: document.querySelector(".product-title")?.textContent?.trim(),
price: document.querySelector(".product-price")?.textContent?.trim(),
description: document.querySelector(".product-desc")?.textContent?.trim(),
images: Array.from(document.querySelectorAll(".product-image img")).map(
(img) => img.src
),
specs: Array.from(document.querySelectorAll(".spec-item")).map(
(item) => ({
name: item.querySelector(".spec-name")?.textContent?.trim(),
value: item.querySelector(".spec-value")?.textContent?.trim(),
})
),
}
})
}
/**
* 解析文章頁面
*/
async function parseArticlePage(page) {
return page.evaluate(() => {
return {
title: document.querySelector("h1")?.textContent?.trim(),
author: document.querySelector(".author")?.textContent?.trim(),
content: document.querySelector(".article-content")?.innerHTML,
publishDate: document.querySelector(".publish-date")?.textContent?.trim(),
tags: Array.from(document.querySelectorAll(".tag")).map((tag) =>
tag.textContent?.trim()
),
}
})
}
/**
* 默認(rèn)解析器
*/
async function parseDefaultPage(page) {
return page.evaluate(() => {
return {
title: document.title,
content: document.body.innerText.slice(0, 5000),
}
})
}
/**
* 啟動 Worker
*/
function startWorker(concurrency = 5) {
console.log(chalk.blue(`?? 啟動 Worker,并發(fā)數(shù): ${concurrency}`))
crawlerQueue.process("crawl", concurrency, async (job) => {
return processCrawlJob(job)
})
}
// 優(yōu)雅退出
process.on("SIGTERM", async () => {
console.log(chalk.yellow("?? 收到退出信號,正在關(guān)閉..."))
await crawlerQueue.close()
if (browser) await browser.close()
process.exit(0)
})
module.exports = { startWorker }
?? URL 去重
使用 Redis 實現(xiàn)去重
// src/utils/dedup.js
const Redis = require("ioredis")
const crypto = require("crypto")
const redis = new Redis({
host: process.env.REDIS_HOST || "localhost",
port: process.env.REDIS_PORT || 6379,
})
const CRAWLED_SET = "crawler:crawled"
const PENDING_SET = "crawler:pending"
/**
* 生成 URL 的哈希值
*/
function hashUrl(url) {
return crypto.createHash("md5").update(url).digest("hex")
}
/**
* 檢查 URL 是否已爬取
*/
async function checkDuplicate(url) {
const hash = hashUrl(url)
return redis.sismember(CRAWLED_SET, hash)
}
/**
* 標(biāo)記 URL 為已爬取
*/
async function markAsCrawled(url) {
const hash = hashUrl(url)
await redis.sadd(CRAWLED_SET, hash)
}
/**
* 批量檢查去重
*/
async function filterDuplicates(urls) {
const pipeline = redis.pipeline()
urls.forEach((url) => {
pipeline.sismember(CRAWLED_SET, hashUrl(url))
})
const results = await pipeline.exec()
return urls.filter((url, index) => {
const [err, isMember] = results[index]
return !isMember
})
}
/**
* 使用布隆過濾器(更省內(nèi)存,適合海量數(shù)據(jù))
*/
async function checkWithBloomFilter(url) {
const hash = hashUrl(url)
// Redis 4.0+ 支持 BF.EXISTS 命令
// 需要安裝 RedisBloom 模塊
return redis.call("BF.EXISTS", "crawler:bloom", hash)
}
async function addToBloomFilter(url) {
const hash = hashUrl(url)
return redis.call("BF.ADD", "crawler:bloom", hash)
}
module.exports = {
checkDuplicate,
markAsCrawled,
filterDuplicates,
checkWithBloomFilter,
addToBloomFilter,
}
?? 數(shù)據(jù)存儲
MongoDB 存儲
// src/storage/mongodb.js
const mongoose = require("mongoose")
const chalk = require("chalk")
// 連接 MongoDB
mongoose.connect(process.env.MONGO_URI || "mongodb://localhost:27017/crawler", {
useNewUrlParser: true,
useUnifiedTopology: true,
})
mongoose.connection.on("connected", () => {
console.log(chalk.green("?? MongoDB 已連接"))
})
// 定義 Schema
const productSchema = new mongoose.Schema(
{
url: { type: String, required: true, unique: true },
title: String,
price: String,
description: String,
images: [String],
specs: [
{
name: String,
value: String,
},
],
crawledAt: { type: Date, default: Date.now },
},
{ timestamps: true }
)
const articleSchema = new mongoose.Schema(
{
url: { type: String, required: true, unique: true },
title: String,
author: String,
content: String,
publishDate: String,
tags: [String],
crawledAt: { type: Date, default: Date.now },
},
{ timestamps: true }
)
// 創(chuàng)建索引
productSchema.index({ crawledAt: -1 })
articleSchema.index({ crawledAt: -1 })
const Product = mongoose.model("Product", productSchema)
const Article = mongoose.model("Article", articleSchema)
/**
* 保存數(shù)據(jù)到 MongoDB
*/
async function saveToMongo(type, data) {
const Model = type === "product" ? Product : Article
try {
await Model.findOneAndUpdate({ url: data.url }, data, {
upsert: true,
new: true,
})
} catch (error) {
if (error.code !== 11000) {
// 忽略重復(fù)鍵錯誤
throw error
}
}
}
/**
* 批量保存
*/
async function bulkSave(type, dataList) {
const Model = type === "product" ? Product : Article
const operations = dataList.map((data) => ({
updateOne: {
filter: { url: data.url },
update: { $set: data },
upsert: true,
},
}))
await Model.bulkWrite(operations)
}
module.exports = {
Product,
Article,
saveToMongo,
bulkSave,
}
?? 監(jiān)控面板
簡單的監(jiān)控 API
// src/monitor/server.js
const express = require("express")
const { getQueueStats } = require("../queue/producer")
const { crawlerQueue } = require("../queue/crawlerQueue")
const chalk = require("chalk")
const app = express()
// 隊列狀態(tài)
app.get("/api/stats", async (req, res) => {
const stats = await getQueueStats()
res.json(stats)
})
// 最近的任務(wù)
app.get("/api/jobs/recent", async (req, res) => {
const [completed, failed] = await Promise.all([
crawlerQueue.getCompleted(0, 10),
crawlerQueue.getFailed(0, 10),
])
res.json({
completed: completed.map((job) => ({
id: job.id,
url: job.data.url,
finishedOn: job.finishedOn,
})),
failed: failed.map((job) => ({
id: job.id,
url: job.data.url,
failedReason: job.failedReason,
})),
})
})
// 暫停/恢復(fù)隊列
app.post("/api/queue/pause", async (req, res) => {
await crawlerQueue.pause()
res.json({ status: "paused" })
})
app.post("/api/queue/resume", async (req, res) => {
await crawlerQueue.resume()
res.json({ status: "resumed" })
})
// 清空隊列
app.post("/api/queue/clean", async (req, res) => {
await crawlerQueue.clean(0, "completed")
await crawlerQueue.clean(0, "failed")
res.json({ status: "cleaned" })
})
// 啟動服務(wù)
const PORT = process.env.MONITOR_PORT || 3000
app.listen(PORT, () => {
console.log(chalk.blue(`?? 監(jiān)控面板運行在 http://localhost:${PORT}`))
})
?? 啟動腳本
Master 節(jié)點
// src/master.js
const { addCrawlTasks, getQueueStats } = require("./queue/producer")
const chalk = require("chalk")
async function main() {
console.log(chalk.blue("\n?? Master 節(jié)點啟動\n"))
// 示例:添加一批 URL
const urls = [
"https://example.com/product/1",
"https://example.com/product/2",
"https://example.com/product/3",
// ... 更多 URL
]
await addCrawlTasks(urls, { type: "product" })
// 定時打印隊列狀態(tài)
setInterval(async () => {
const stats = await getQueueStats()
console.log(
chalk.cyan(`
?? 隊列狀態(tài):
等待中: ${stats.waiting}
處理中: ${stats.active}
已完成: ${stats.completed}
已失敗: ${stats.failed}
`)
)
}, 5000)
}
main().catch(console.error)
Worker 節(jié)點
// src/worker.js
const { startWorker } = require("./worker/crawlerWorker")
// 從環(huán)境變量獲取并發(fā)數(shù)
const concurrency = parseInt(process.env.CONCURRENCY) || 5
startWorker(concurrency)
啟動命令
# 啟動 Master(添加任務(wù)) node src/master.js # 啟動 Worker(可以啟動多個) CONCURRENCY=10 node src/worker.js # 啟動監(jiān)控 node src/monitor/server.js
?? 性能優(yōu)化技巧
1. 連接池復(fù)用
// 復(fù)用瀏覽器實例
const browserPool = []
const MAX_BROWSERS = 5
async function getBrowser() {
if (browserPool.length < MAX_BROWSERS) {
const browser = await puppeteer.launch({ headless: "new" })
browserPool.push(browser)
return browser
}
// 輪詢使用
return browserPool[Math.floor(Math.random() * browserPool.length)]
}
2. 批量操作
// 批量寫入數(shù)據(jù)庫
const buffer = []
const BATCH_SIZE = 100
async function addToBuffer(data) {
buffer.push(data)
if (buffer.length >= BATCH_SIZE) {
await bulkSave("product", buffer)
buffer.length = 0
}
}
3. 內(nèi)存管理
// 定期清理瀏覽器內(nèi)存
setInterval(async () => {
for (const browser of browserPool) {
const pages = await browser.pages()
for (const page of pages) {
if (page.url() === "about:blank") continue
await page.close()
}
}
}, 60000)
總結(jié)
到此這篇關(guān)于Node.js分布式爬蟲的架構(gòu)設(shè)計和實現(xiàn)(從安裝到存儲實戰(zhàn))的文章就介紹到這了,更多相關(guān)Node.js分布式爬蟲內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
利用Node.js和MySQL實現(xiàn)創(chuàng)建API服務(wù)器
這篇文章主要為大家詳細(xì)介紹了如何使用Node.js和MySQL創(chuàng)建API服務(wù)器的步驟,這也是從前端邁向全棧的一個開始,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解下2024-01-01
win7下安裝配置node.js+express開發(fā)環(huán)境
windows7下安裝nodejs及框架express,從誕生至今一直被熱捧,筆者最近也裝了個環(huán)境打算了解一下。安裝步驟簡單比較簡單,這里分享給大家,希望大家能夠喜歡。2015-12-12

