Skip to main content

Web Scraping Project: Build a Production Scraper

This capstone article brings together every concept from the series into a single, production-grade web scraper. You will build a complete scraper that extracts product listings from a multi-page e-commerce site, handles both static HTML and JavaScript-rendered content, implements rate limiting and deduplication, recovers from errors, and saves data to SQLite. This is not pseudocode; it is a runnable, deployable scraper that applies best practices from all prior articles. After completing this project, you will have a template for scraping any website and a scraper ready for production use.

Every technique in this article comes from hard-won experience. This scraper embodies lessons learned from a decade of production scraping: resilience, compliance, efficiency, and maintainability.

Project Overview: Product Price Monitor Scraper

You will build a scraper that extracts product listings from an e-commerce site. The scraper will:

  1. Respect rate limits (1-2 second delays).
  2. Handle pagination (multiple pages of products).
  3. Detect and parse JavaScript-rendered content (using Playwright when needed).
  4. Check robots.txt and respect crawl rules.
  5. Store data in SQLite with deduplication.
  6. Log all actions and errors.
  7. Resume from checkpoints if interrupted.
  8. Implement retry logic with exponential backoff.

Complete Production Scraper Code

Here is the full, working scraper:

import requests
from bs4 import BeautifulSoup
from playwright.sync_api import sync_playwright
import sqlite3
import logging
import time
import json
import os
from datetime import datetime
from urllib.robotparser import RobotFileParser
from urllib.parse import urljoin
import random

# === LOGGING SETUP ===
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("product_scraper.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)

# === DATABASE CLASS ===
class ProductDatabase:
def __init__(self, db_file="products.db"):
self.db_file = db_file
self.conn = sqlite3.connect(db_file)
self.conn.row_factory = sqlite3.Row
self.create_table()

def create_table(self):
cursor = self.conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE NOT NULL,
title TEXT NOT NULL,
price REAL,
category TEXT,
description TEXT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
logger.info(f"Database ready: {db_file}")

def insert_or_update(self, records):
cursor = self.conn.cursor()
inserted = 0
updated = 0

for record in records:
try:
cursor.execute("""
INSERT INTO products (url, title, price, category, description)
VALUES (?, ?, ?, ?, ?)
""", (
record["url"],
record["title"],
record.get("price"),
record.get("category"),
record.get("description")
))
inserted += 1
except sqlite3.IntegrityError:
cursor.execute("""
UPDATE products
SET title = ?, price = ?, category = ?, description = ?,
updated_at = CURRENT_TIMESTAMP
WHERE url = ?
""", (
record["title"],
record.get("price"),
record.get("category"),
record.get("description"),
record["url"]
))
updated += 1

self.conn.commit()
logger.info(f"Database: inserted {inserted}, updated {updated}")

def count(self):
cursor = self.conn.cursor()
cursor.execute("SELECT COUNT(*) as count FROM products")
return cursor.fetchone()["count"]

def close(self):
self.conn.close()

# === CIRCUIT BREAKER ===
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "closed"

def record_success(self):
if self.state == "half_open":
logger.info("Circuit breaker closed (service recovered)")
self.state = "closed"
self.failure_count = 0

def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
logger.warning(f"Circuit breaker open (failures: {self.failure_count})")
self.state = "open"

def can_request(self):
if self.state == "closed":
return True
elif self.state == "open":
elapsed = time.time() - self.last_failure_time
if elapsed > self.reset_timeout:
logger.info("Circuit breaker half-open (testing recovery)")
self.state = "half_open"
return True
return False
return self.state == "half_open"

# === ROBOTS.TXT CHECKER ===
class RobotsChecker:
def __init__(self, domain):
self.domain = domain
self.rp = RobotFileParser()
self.rp.set_url(urljoin(domain, "/robots.txt"))
try:
self.rp.read()
logger.info(f"Loaded robots.txt from {domain}")
except Exception as e:
logger.warning(f"Could not load robots.txt: {e}")

def can_fetch(self, url, user_agent="ProductScraper/1.0"):
return self.rp.can_fetch(user_agent, url)

# === MAIN SCRAPER CLASS ===
class ProductScraper:
def __init__(self, start_url, domain, use_playwright=False):
self.start_url = start_url
self.domain = domain
self.use_playwright = use_playwright

self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0"
})

self.db = ProductDatabase()
self.circuit_breaker = CircuitBreaker(failure_threshold=3, reset_timeout=30)
self.robots = RobotsChecker(domain)

self.checkpoint_file = "scraper_checkpoint.json"
self.checkpoint = self.load_checkpoint()

self.last_request_time = None
self.min_delay = 2

def load_checkpoint(self):
if os.path.exists(self.checkpoint_file):
try:
with open(self.checkpoint_file, "r") as f:
checkpoint = json.load(f)
logger.info(f"Loaded checkpoint: page {checkpoint.get('last_page')}")
return checkpoint
except Exception as e:
logger.error(f"Could not load checkpoint: {e}")
return {"last_page": 0, "records_scraped": 0}

def save_checkpoint(self, page, records_count):
self.checkpoint = {
"last_page": page,
"records_scraped": records_count,
"timestamp": datetime.now().isoformat()
}
with open(self.checkpoint_file, "w") as f:
json.dump(self.checkpoint, f, indent=2)

def enforce_rate_limit(self):
if self.last_request_time:
elapsed = time.time() - self.last_request_time
delay = self.min_delay + random.uniform(0, 0.5)
sleep_time = delay - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
self.last_request_time = time.time()

def fetch_page_static(self, url, max_retries=3):
"""Fetch static HTML with retry logic."""

if not self.circuit_breaker.can_request():
logger.warning(f"Circuit breaker open; skipping {url}")
return None

for attempt in range(max_retries):
try:
self.enforce_rate_limit()
logger.info(f"Fetching (attempt {attempt + 1}): {url}")
response = self.session.get(url, timeout=10)

if response.status_code == 200:
self.circuit_breaker.record_success()
return response
elif response.status_code == 429:
wait = 5 + (2 ** attempt)
logger.warning(f"Rate limited. Waiting {wait}s...")
time.sleep(wait)
else:
logger.error(f"HTTP {response.status_code}")
response.raise_for_status()

except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
self.circuit_breaker.record_failure()
if attempt < max_retries - 1:
wait = 2 ** attempt
logger.info(f"Retrying in {wait}s...")
time.sleep(wait)

return None

def fetch_page_dynamic(self, url):
"""Fetch JavaScript-rendered page with Playwright."""

try:
logger.info(f"Fetching (Playwright): {url}")
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url, wait_until="networkidle")

# Wait for products to load
page.wait_for_selector("div.product", timeout=10000)

html = page.content()
browser.close()
self.circuit_breaker.record_success()
return html

except Exception as e:
logger.error(f"Playwright error: {e}")
self.circuit_breaker.record_failure()
return None

def extract_products(self, soup):
"""Extract product data from page."""

products = []
items = soup.select("div.product")

for item in items:
try:
title_elem = item.select_one("h2.product-title")
price_elem = item.select_one("span.price")
category_elem = item.select_one("span.category")
desc_elem = item.select_one("p.description")
link_elem = item.select_one("a.product-link")

if not title_elem or not link_elem:
continue

title = title_elem.get_text(strip=True)
price = price_elem.get_text(strip=True) if price_elem else None
category = category_elem.get_text(strip=True) if category_elem else None
description = desc_elem.get_text(strip=True) if desc_elem else None
url = urljoin(self.domain, link_elem.get("href"))

# Parse price
try:
price_float = float(price.replace("$", "").replace(",", ""))
except:
price_float = None

products.append({
"title": title,
"price": price_float,
"category": category,
"description": description,
"url": url
})

except AttributeError:
continue

return products

def scrape(self, max_pages=10):
"""Main scraping loop."""

start_page = self.checkpoint.get("last_page", 0) + 1
logger.info(f"Starting from page {start_page}")

for page_num in range(start_page, start_page + max_pages):
# Build page URL
page_url = f"{self.start_url}?page={page_num}"

# Check robots.txt
if not self.robots.can_fetch(page_url):
logger.warning(f"robots.txt disallows: {page_url}")
break

# Fetch page (static or dynamic)
if self.use_playwright:
html = self.fetch_page_dynamic(page_url)
else:
response = self.fetch_page_static(page_url)
html = response.text if response else None

if not html:
logger.error(f"Could not fetch page {page_num}")
continue

# Parse and extract
soup = BeautifulSoup(html, "html.parser")
products = self.extract_products(soup)

if not products:
logger.info(f"Page {page_num} has no products. Stopping.")
break

# Save to database
self.db.insert_or_update(products)

# Save checkpoint
total_records = self.db.count()
self.save_checkpoint(page_num, total_records)

logger.info(f"Page {page_num}: {len(products)} products ({total_records} total)")

logger.info(f"Scraping complete. Total products: {self.db.count()}")
self.db.close()

# === MAIN EXECUTION ===
if __name__ == "__main__":
# Example: scrape a static HTML site
scraper = ProductScraper(
start_url="https://example.com/products",
domain="https://example.com",
use_playwright=False # Set to True for JavaScript-heavy sites
)
scraper.scrape(max_pages=10)

# For JavaScript-heavy sites:
# scraper = ProductScraper(
# start_url="https://example.com/products",
# domain="https://example.com",
# use_playwright=True
# )
# scraper.scrape(max_pages=5) # Fewer pages due to Playwright overhead

Running the Scraper

Save the code as product_scraper.py and run:

python product_scraper.py

The scraper will:

  1. Check robots.txt and log permission status.
  2. Fetch pages one at a time with 2-second delays (configurable).
  3. Extract product data from each page.
  4. Insert or update records in SQLite.
  5. Log all actions to both console and product_scraper.log.
  6. Save checkpoints after each page (resume-capable).
  7. Retry failed requests with exponential backoff.
  8. Use circuit breaker to stop if the site is persistently down.

Adapting the Scraper for Other Sites

To scrape a different site:

  1. Identify selectors: Open the target page in DevTools (F12), inspect product elements, and extract CSS selectors.
  2. Update CSS selectors: Replace .product, .product-title, etc. with selectors from the target site.
  3. Adjust rate limit: Most sites tolerate 1-3 second delays. Start conservative.
  4. Determine if Playwright is needed: Open the page in a browser. If content loads in HTML source, use static. If content loads after the page loads, use Playwright.
  5. Check robots.txt and ToS: Verify you have permission.

Example for a different site (Twitter/X):

# Replace extract_products method
def extract_products(self, soup):
tweets = []
for item in soup.select("article"):
try:
text = item.select_one("[data-testid='tweetText']")
timestamp = item.select_one("time")
url = item.select_one("a[href*='/status/']")

if text and url:
tweets.append({
"text": text.get_text(strip=True),
"timestamp": timestamp.get("datetime") if timestamp else None,
"url": url.get("href")
})
except:
continue
return tweets

Key Takeaways

  • A production scraper combines HTTP fetching, HTML parsing, rate limiting, error handling, and data storage.
  • Use databases (SQLite) for deduplication and queries; use checkpoints for resumption.
  • Respect robots.txt, rate limits, and Terms of Service.
  • Implement circuit breakers and retry logic for resilience.
  • Log comprehensively for debugging.
  • Test on a small dataset before scaling to thousands of pages.

Frequently Asked Questions

How do I schedule this scraper to run daily?

Use cron (Linux/macOS) or Task Scheduler (Windows):

# crontab -e
0 2 * * * /usr/bin/python3 /path/to/product_scraper.py

Can I scrape multiple sites in parallel?

Yes, but carefully. Threads can share the database but not the session/circuit breaker. Use a process pool for true parallelism. Limit to 2-4 parallel processes to avoid detection.

How do I export data to CSV or JSON?

import csv
import json

# Export SQLite to CSV
cursor = db.conn.cursor()
cursor.execute("SELECT * FROM products")
with open("products.csv", "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=["title", "price", "url"])
writer.writeheader()
writer.writerows(cursor.fetchall())

# Export to JSON
with open("products.json", "w") as f:
cursor.execute("SELECT * FROM products")
json.dump([dict(row) for row in cursor.fetchall()], f, indent=2)

What if the site changes its HTML structure?

The selectors will break. Log which selectors failed, manually inspect the new HTML, update selectors, and redeploy. Consider adding a selector fallback (try multiple selectors for the same data).

How much data can I scrape before hitting rate limits?

It depends on the site. Start with 1-2 second delays. If you get 429 errors, increase delay. Most sites tolerate 10,000-100,000 pages/day with proper spacing and rotating headers.

Further Reading