Web Scraping Project: Build a Production Scraper
This capstone article brings together every concept from the series into a single, production-grade web scraper. You will build a complete scraper that extracts product listings from a multi-page e-commerce site, handles both static HTML and JavaScript-rendered content, implements rate limiting and deduplication, recovers from errors, and saves data to SQLite. This is not pseudocode; it is a runnable, deployable scraper that applies best practices from all prior articles. After completing this project, you will have a template for scraping any website and a scraper ready for production use.
Every technique in this article comes from hard-won experience. This scraper embodies lessons learned from a decade of production scraping: resilience, compliance, efficiency, and maintainability.
Project Overview: Product Price Monitor Scraper
You will build a scraper that extracts product listings from an e-commerce site. The scraper will:
- Respect rate limits (1-2 second delays).
- Handle pagination (multiple pages of products).
- Detect and parse JavaScript-rendered content (using Playwright when needed).
- Check
robots.txtand respect crawl rules. - Store data in SQLite with deduplication.
- Log all actions and errors.
- Resume from checkpoints if interrupted.
- Implement retry logic with exponential backoff.
Complete Production Scraper Code
Here is the full, working scraper:
import requests
from bs4 import BeautifulSoup
from playwright.sync_api import sync_playwright
import sqlite3
import logging
import time
import json
import os
from datetime import datetime
from urllib.robotparser import RobotFileParser
from urllib.parse import urljoin
import random
# === LOGGING SETUP ===
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("product_scraper.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# === DATABASE CLASS ===
class ProductDatabase:
def __init__(self, db_file="products.db"):
self.db_file = db_file
self.conn = sqlite3.connect(db_file)
self.conn.row_factory = sqlite3.Row
self.create_table()
def create_table(self):
cursor = self.conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE NOT NULL,
title TEXT NOT NULL,
price REAL,
category TEXT,
description TEXT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
logger.info(f"Database ready: {db_file}")
def insert_or_update(self, records):
cursor = self.conn.cursor()
inserted = 0
updated = 0
for record in records:
try:
cursor.execute("""
INSERT INTO products (url, title, price, category, description)
VALUES (?, ?, ?, ?, ?)
""", (
record["url"],
record["title"],
record.get("price"),
record.get("category"),
record.get("description")
))
inserted += 1
except sqlite3.IntegrityError:
cursor.execute("""
UPDATE products
SET title = ?, price = ?, category = ?, description = ?,
updated_at = CURRENT_TIMESTAMP
WHERE url = ?
""", (
record["title"],
record.get("price"),
record.get("category"),
record.get("description"),
record["url"]
))
updated += 1
self.conn.commit()
logger.info(f"Database: inserted {inserted}, updated {updated}")
def count(self):
cursor = self.conn.cursor()
cursor.execute("SELECT COUNT(*) as count FROM products")
return cursor.fetchone()["count"]
def close(self):
self.conn.close()
# === CIRCUIT BREAKER ===
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "closed"
def record_success(self):
if self.state == "half_open":
logger.info("Circuit breaker closed (service recovered)")
self.state = "closed"
self.failure_count = 0
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
logger.warning(f"Circuit breaker open (failures: {self.failure_count})")
self.state = "open"
def can_request(self):
if self.state == "closed":
return True
elif self.state == "open":
elapsed = time.time() - self.last_failure_time
if elapsed > self.reset_timeout:
logger.info("Circuit breaker half-open (testing recovery)")
self.state = "half_open"
return True
return False
return self.state == "half_open"
# === ROBOTS.TXT CHECKER ===
class RobotsChecker:
def __init__(self, domain):
self.domain = domain
self.rp = RobotFileParser()
self.rp.set_url(urljoin(domain, "/robots.txt"))
try:
self.rp.read()
logger.info(f"Loaded robots.txt from {domain}")
except Exception as e:
logger.warning(f"Could not load robots.txt: {e}")
def can_fetch(self, url, user_agent="ProductScraper/1.0"):
return self.rp.can_fetch(user_agent, url)
# === MAIN SCRAPER CLASS ===
class ProductScraper:
def __init__(self, start_url, domain, use_playwright=False):
self.start_url = start_url
self.domain = domain
self.use_playwright = use_playwright
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0"
})
self.db = ProductDatabase()
self.circuit_breaker = CircuitBreaker(failure_threshold=3, reset_timeout=30)
self.robots = RobotsChecker(domain)
self.checkpoint_file = "scraper_checkpoint.json"
self.checkpoint = self.load_checkpoint()
self.last_request_time = None
self.min_delay = 2
def load_checkpoint(self):
if os.path.exists(self.checkpoint_file):
try:
with open(self.checkpoint_file, "r") as f:
checkpoint = json.load(f)
logger.info(f"Loaded checkpoint: page {checkpoint.get('last_page')}")
return checkpoint
except Exception as e:
logger.error(f"Could not load checkpoint: {e}")
return {"last_page": 0, "records_scraped": 0}
def save_checkpoint(self, page, records_count):
self.checkpoint = {
"last_page": page,
"records_scraped": records_count,
"timestamp": datetime.now().isoformat()
}
with open(self.checkpoint_file, "w") as f:
json.dump(self.checkpoint, f, indent=2)
def enforce_rate_limit(self):
if self.last_request_time:
elapsed = time.time() - self.last_request_time
delay = self.min_delay + random.uniform(0, 0.5)
sleep_time = delay - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
self.last_request_time = time.time()
def fetch_page_static(self, url, max_retries=3):
"""Fetch static HTML with retry logic."""
if not self.circuit_breaker.can_request():
logger.warning(f"Circuit breaker open; skipping {url}")
return None
for attempt in range(max_retries):
try:
self.enforce_rate_limit()
logger.info(f"Fetching (attempt {attempt + 1}): {url}")
response = self.session.get(url, timeout=10)
if response.status_code == 200:
self.circuit_breaker.record_success()
return response
elif response.status_code == 429:
wait = 5 + (2 ** attempt)
logger.warning(f"Rate limited. Waiting {wait}s...")
time.sleep(wait)
else:
logger.error(f"HTTP {response.status_code}")
response.raise_for_status()
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
self.circuit_breaker.record_failure()
if attempt < max_retries - 1:
wait = 2 ** attempt
logger.info(f"Retrying in {wait}s...")
time.sleep(wait)
return None
def fetch_page_dynamic(self, url):
"""Fetch JavaScript-rendered page with Playwright."""
try:
logger.info(f"Fetching (Playwright): {url}")
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url, wait_until="networkidle")
# Wait for products to load
page.wait_for_selector("div.product", timeout=10000)
html = page.content()
browser.close()
self.circuit_breaker.record_success()
return html
except Exception as e:
logger.error(f"Playwright error: {e}")
self.circuit_breaker.record_failure()
return None
def extract_products(self, soup):
"""Extract product data from page."""
products = []
items = soup.select("div.product")
for item in items:
try:
title_elem = item.select_one("h2.product-title")
price_elem = item.select_one("span.price")
category_elem = item.select_one("span.category")
desc_elem = item.select_one("p.description")
link_elem = item.select_one("a.product-link")
if not title_elem or not link_elem:
continue
title = title_elem.get_text(strip=True)
price = price_elem.get_text(strip=True) if price_elem else None
category = category_elem.get_text(strip=True) if category_elem else None
description = desc_elem.get_text(strip=True) if desc_elem else None
url = urljoin(self.domain, link_elem.get("href"))
# Parse price
try:
price_float = float(price.replace("$", "").replace(",", ""))
except:
price_float = None
products.append({
"title": title,
"price": price_float,
"category": category,
"description": description,
"url": url
})
except AttributeError:
continue
return products
def scrape(self, max_pages=10):
"""Main scraping loop."""
start_page = self.checkpoint.get("last_page", 0) + 1
logger.info(f"Starting from page {start_page}")
for page_num in range(start_page, start_page + max_pages):
# Build page URL
page_url = f"{self.start_url}?page={page_num}"
# Check robots.txt
if not self.robots.can_fetch(page_url):
logger.warning(f"robots.txt disallows: {page_url}")
break
# Fetch page (static or dynamic)
if self.use_playwright:
html = self.fetch_page_dynamic(page_url)
else:
response = self.fetch_page_static(page_url)
html = response.text if response else None
if not html:
logger.error(f"Could not fetch page {page_num}")
continue
# Parse and extract
soup = BeautifulSoup(html, "html.parser")
products = self.extract_products(soup)
if not products:
logger.info(f"Page {page_num} has no products. Stopping.")
break
# Save to database
self.db.insert_or_update(products)
# Save checkpoint
total_records = self.db.count()
self.save_checkpoint(page_num, total_records)
logger.info(f"Page {page_num}: {len(products)} products ({total_records} total)")
logger.info(f"Scraping complete. Total products: {self.db.count()}")
self.db.close()
# === MAIN EXECUTION ===
if __name__ == "__main__":
# Example: scrape a static HTML site
scraper = ProductScraper(
start_url="https://example.com/products",
domain="https://example.com",
use_playwright=False # Set to True for JavaScript-heavy sites
)
scraper.scrape(max_pages=10)
# For JavaScript-heavy sites:
# scraper = ProductScraper(
# start_url="https://example.com/products",
# domain="https://example.com",
# use_playwright=True
# )
# scraper.scrape(max_pages=5) # Fewer pages due to Playwright overhead
Running the Scraper
Save the code as product_scraper.py and run:
python product_scraper.py
The scraper will:
- Check
robots.txtand log permission status. - Fetch pages one at a time with 2-second delays (configurable).
- Extract product data from each page.
- Insert or update records in SQLite.
- Log all actions to both console and
product_scraper.log. - Save checkpoints after each page (resume-capable).
- Retry failed requests with exponential backoff.
- Use circuit breaker to stop if the site is persistently down.
Adapting the Scraper for Other Sites
To scrape a different site:
- Identify selectors: Open the target page in DevTools (F12), inspect product elements, and extract CSS selectors.
- Update CSS selectors: Replace
.product,.product-title, etc. with selectors from the target site. - Adjust rate limit: Most sites tolerate 1-3 second delays. Start conservative.
- Determine if Playwright is needed: Open the page in a browser. If content loads in HTML source, use static. If content loads after the page loads, use Playwright.
- Check robots.txt and ToS: Verify you have permission.
Example for a different site (Twitter/X):
# Replace extract_products method
def extract_products(self, soup):
tweets = []
for item in soup.select("article"):
try:
text = item.select_one("[data-testid='tweetText']")
timestamp = item.select_one("time")
url = item.select_one("a[href*='/status/']")
if text and url:
tweets.append({
"text": text.get_text(strip=True),
"timestamp": timestamp.get("datetime") if timestamp else None,
"url": url.get("href")
})
except:
continue
return tweets
Key Takeaways
- A production scraper combines HTTP fetching, HTML parsing, rate limiting, error handling, and data storage.
- Use databases (SQLite) for deduplication and queries; use checkpoints for resumption.
- Respect
robots.txt, rate limits, and Terms of Service. - Implement circuit breakers and retry logic for resilience.
- Log comprehensively for debugging.
- Test on a small dataset before scaling to thousands of pages.
Frequently Asked Questions
How do I schedule this scraper to run daily?
Use cron (Linux/macOS) or Task Scheduler (Windows):
# crontab -e
0 2 * * * /usr/bin/python3 /path/to/product_scraper.py
Can I scrape multiple sites in parallel?
Yes, but carefully. Threads can share the database but not the session/circuit breaker. Use a process pool for true parallelism. Limit to 2-4 parallel processes to avoid detection.
How do I export data to CSV or JSON?
import csv
import json
# Export SQLite to CSV
cursor = db.conn.cursor()
cursor.execute("SELECT * FROM products")
with open("products.csv", "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["title", "price", "url"])
writer.writeheader()
writer.writerows(cursor.fetchall())
# Export to JSON
with open("products.json", "w") as f:
cursor.execute("SELECT * FROM products")
json.dump([dict(row) for row in cursor.fetchall()], f, indent=2)
What if the site changes its HTML structure?
The selectors will break. Log which selectors failed, manually inspect the new HTML, update selectors, and redeploy. Consider adding a selector fallback (try multiple selectors for the same data).
How much data can I scrape before hitting rate limits?
It depends on the site. Start with 1-2 second delays. If you get 429 errors, increase delay. Most sites tolerate 10,000-100,000 pages/day with proper spacing and rotating headers.
Further Reading
- All previous articles in this series — Full reference for each technique used here.
- Scrapy Framework — Production-grade scraping framework (alternative to manual scraping).
- Selenium vs Playwright — Comparison of browser automation tools.
- Web Scraping Ethics and Law — Comprehensive guide to legal and ethical scraping.