使用 Python 进行电子邮件抓取:附示例的完整指南

评论: 0

要让直接推广发挥作用,你需要一个坚实的基础--一个包含真实、最新电子邮件地址的数据库。这就是使用 Python 进行电子邮件搜刮的用武之地:一种以编程方式从网站收集地址的方法。

在本指南中,我们将介绍如何使用 Python 从零开始构建电子邮件刮擦,如何处理动态页面,如何过滤和验证收集到的地址,以及如何在实际营销或业务工作流中使用由此产生的数据。

如果您需要,这些材料会很有用:

  • 在没有现成服务的情况下,自己摸索如何用 Python 从网站上抓取电子邮件地址;
  • 自动创建用于通讯、客户关系管理或研究的邮件列表;
  • 将代码与实际用例连接起来--从提取到集成。

接下来,我们将了解如何使用 Python 将公开网页变成与可能成为您客户的人直接交流的渠道。

什么是电子邮件抓取?

这种扫描的核心是自动扫描 HTML 或动态页面,并在内容或属性中寻找与地址格式(如 username@domain.tld)相匹配的模式。然后对结果进行过滤、验证和保存。

使用 Python Email Scraper 的任务

它广泛应用于商业、营销、研究和日常流程自动化。当您需要从多个来源收集和整理大量公共信息时,它尤其有用。

使用 Python 进行电子邮件搜索的具体任务示例:

  • 为电子邮件营销活动建立联系人数据库;
  • 市场营销和线索生成;
  • 研究和分析公开的联系方式;
  • 填充和更新客户关系管理系统;
  • 监测竞争对手的活动;
  • 审核和验证自己的联系人数据。

如果您有兴趣为电子商务项目收集联系人数据,请参阅我们的以下指南 电子商务数据搜索.

基础知识:工具和准备

要使刮擦有效,您需要准备好环境并选择正确的工具。这些工具可以帮助您更快地检索数据、处理复杂或动态页面以及组织大型项目。

选择图书馆来抓取电子邮件地址

常用的 Python 工具

工具 使用
requests / httpx 获取静态页面
BeautifulSoup HTML 解析/元素搜索
re(正则表达式) 提取模式
lxml 更快的解析速度
Selenium / Playwright 处理 JavaScript 驱动的页面
Scrapy 大型爬行的全面框架

准备工作环境

  1. 创建虚拟环境(venv 或 virtualenv)。
  2. 安装依赖项:
    pip install requests beautifulsoup4 lxml
    pip install selenium  # if you need dynamic rendering
  3. (如果需要)设置浏览器驱动程序(ChromeDriver、GeckoDriver)。
  4. 准备一份起始 URL 或域的列表。
  5. 决定遍历策略--递归或有限。

要了解类似方法如何应用于其他平台,请查看我们的详细指南 使用 Python 搜刮 Reddit.

示例:使用 Python 进行电子邮件抓取 - 核心逻辑(伪代码)

# 1. Create an HTTP session with timeouts and retries
session = make_session()
# 2. Load the page
html = session.get(url)
# 3. Look for email addresses:
#    - via regex across the entire text
#    - via mailto: links in HTML
emails = extract_emails_from_text(html)
emails.update(find_mailto_links(html))
# 4. Return a unique list of addresses
return emails

为什么这样做?

  • 会话 + 重试 - 避免随机失败,并在出现错误时执行重复请求。
  • Regex + mailto: - 两种简单有效的路径。
  • BeautifulSoup 中的 lxml - 更快、更精确的 HTML 解析器。
  • 规范化 mailto: - 去掉所有多余内容(?subject=...),只保留地址。

扩展变体:多层爬行器

"""
Iterate over internal links within one domain and collect email addresses.
Highlights:
- Page limit (max_pages) to stop safely
- Verifying that a link belongs to the base domain
- Avoiding re-visits
- Optional respect for robots.txt
"""

from __future__ import annotations
from collections import deque
from typing import Set
from urllib.parse import urljoin, urlparse, urlsplit, urlunsplit
import time
import requests
from bs4 import BeautifulSoup
import lxml # Import lxml to ensure it's available for BeautifulSoup
from urllib import robotparser  # standard robots.txt parser
# We use functions from the previous block:
# - make_session()
# - scrape_emails_from_url()
import re

# General regular expression for email addresses
EMAIL_RE = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9.-]+")

def scrape_emails_from_url(url: str, session: requests.Session) -> Set[str]:
   """Collect email addresses from the given URL page."""
   emails: Set[str] = set()
   try:
       resp = session.get(url, timeout=getattr(session, "_default_timeout", 10.0))
       resp.raise_for_status()
       # Regular expression for email addresses
       # Note: this regex isn't perfect, but it's sufficient for typical cases
       email_pattern = re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}")
       emails.update(email_pattern.findall(resp.text))
   except requests.RequestException:
       pass
   return emails

def make_session() -> requests.Session:
   """Create and return a requests session with basic settings."""
   session = requests.Session()
   session.headers.update({
       "User-Agent": "EmailScraper/1.0",
       "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
       "Accept-Language": "en-US,en;q=0.9",
       # Don't force Accept-Encoding to avoid br issues without brotli
       "Connection": "keep-alive",
   })
   return session
def same_host(url: str, base_netloc: str) -> bool:
   """True if the link belongs to the same host (domain/subdomain)."""
   return urlparse(url).netloc == base_netloc
def load_robots(start_url: str, user_agent: str = "EmailScraper") -> robotparser.RobotFileParser:
   """Read robots.txt and return a parser for permission checks."""
   base = urlparse(start_url)
   robots_url = f"{base.scheme}://{base.netloc}/robots.txt"
   rp = robotparser.RobotFileParser()
   rp.set_url(robots_url)
   try:
       rp.read()
   except Exception:
       pass
   rp.useragent = user_agent
   return rp

def normalize_url(url: str, base: str | None = None) -> str | None:
   try:
       abs_url = urljoin(base, url) if base else url
       parts = urlsplit(abs_url)
       if parts.scheme not in ("http", "https"):
           return None
       host = parts.hostname
       if not host:
           return None
       host = host.lower()
       netloc = host
       if parts.port:
           netloc = f"{host}:{parts.port}"
       parts = parts._replace(fragment="")
       return urlunsplit((parts.scheme.lower(), netloc, parts.path or "/", parts.query, ""))
   except Exception:
       return None

def in_scope(url: str, base_host: str, include_subdomains: bool) -> bool:
   try:
       host = urlsplit(url).hostname
       if not host:
           return False
       host = host.lower()
       base_host = (base_host or "").lower()
       if include_subdomains:
           return host == base_host or host.endswith("." + base_host)
       else:
           return host == base_host
   except Exception:
       return False
def collect_emails_from_site(
   start_url: str,
   max_pages: int = 100,
   delay_sec: float = 0.5,
   respect_robots: bool = True,
   include_subdomains: bool = True,
) -> Set[str]:
   """
   Traverse pages within a domain and return unique email addresses.
   - max_pages: hard limit on visited pages.
   - delay_sec: polite pause between requests.
   - respect_robots: if True — checks access rules.
   - include_subdomains: if True — allows subdomains (www, etc.).
   """
   session = make_session()
   base_host = (urlparse(start_url).netloc or "").lower()
   visited: Set[str] = set()
   queue: deque[str] = deque()
   enqueued: Set[str] = set()
   all_emails: Set[str] = set()

   start_norm = normalize_url(start_url)
   if start_norm:
       queue.append(start_norm)
       enqueued.add(start_norm)

   rp = load_robots(start_url, user_agent="EmailScraper/1.0") if respect_robots else None

   while queue and len(visited) < max_pages:
       url = queue.popleft()
       if url in visited:
           continue

       # robots.txt check
       if respect_robots and rp is not None:
           try:
               if not rp.can_fetch("EmailScraper/1.0", url):
                   continue
           except Exception:
               pass

       # One request: used both for emails and links
       try:
           resp = session.get(url, timeout=10)
           resp.raise_for_status()
           html_text = resp.text or ""
       except requests.RequestException:
           continue

       visited.add(url)

       # Skip non-HTML pages
       ctype = resp.headers.get("Content-Type", "")
       if ctype and "text/html" not in ctype:
           continue

       # Collect emails
       for m in EMAIL_RE.findall(html_text):
           all_emails.add(m.lower())

       # Parse links
       soup = BeautifulSoup(html_text, "lxml")

       # Emails from mailto:
       for a in soup.find_all("a", href=True):
           href = a["href"].strip()
           if href.lower().startswith("mailto:"):
               addr_part = href[7:].split("?", 1)[0]
               for piece in addr_part.split(","):
                   email = piece.strip()
                   if EMAIL_RE.fullmatch(email):
                       all_emails.add(email.lower())

       for a in soup.find_all("a", href=True):
           href = a["href"].strip()
           if not href or href.startswith(("javascript:", "mailto:", "tel:", "data:")):
               continue
           next_url = normalize_url(href, base=url)
           if not next_url:
               continue
           if not in_scope(next_url, base_host, include_subdomains):
               continue
           if next_url not in visited and next_url not in enqueued:
               queue.append(next_url)
               enqueued.add(next_url)

       if delay_sec > 0:
           time.sleep(delay_sec)

   try:
       session.close()
   except Exception:
       pass
   return all_emails
if __name__ == "__main__":
   import argparse

parser = argparse.ArgumentParser(
   description="An email scraper that traverses pages within a site and prints discovered addresses."
)

parser.add_argument(
   "start_url",
   help="Starting URL, for example: https://example.com"
)

parser.add_argument(
   "--max-pages",
   type=int,
   default=100,
   dest="max_pages",
   help="Maximum number of pages to traverse (default: 100)"
)

parser.add_argument(
   "--delay",
   type=float,
   default=0.5,
   help="Delay between requests in seconds (default: 0.5)"
)

parser.add_argument(
   "--no-robots",
   action="store_true",
   help="Ignore robots.txt (use carefully)"
)

scope = parser.add_mutually_exclusive_group()

scope.add_argument(
   "--include-subdomains",
   dest="include_subdomains",
   action="store_true",
   default=True,
   help="Include subdomains (default)"
)

scope.add_argument(
   "--exact-host",
   dest="include_subdomains",
   action="store_false",
   help="Restrict traversal to the exact host (no subdomains)"
)

parser.add_argument(
   "--output",
   type=str,
   default=None,
   help="Optional: path to a file to save found email addresses (one per line)"

   args = parser.parse_args()

   emails = collect_emails_from_site(
       args.start_url,
       max_pages=args.max_pages,
       delay_sec=args.delay,
       respect_robots=not args.no_robots,
       include_subdomains=args.include_subdomains,
   )

   for e in sorted(emails):
       print(e)

   print(f"Found {len(emails)} unique emails.")

   if args.output:
       try:
           with open(args.output, "w", encoding="utf-8") as f:
               for e in sorted(emails):
                   f.write(e + "\n")
       except Exception as ex:
           print(f"Could not write the output file: {ex}")

如何运行和配置扩展脚本

main.py https://example.com
脚本参数
  • start_url - 开始遍历的起始 URL(如 https://example.com)。
  • --max-pages - 要遍历的最大页数。默认值:100。
  • --delay(延迟)--请求之间的延迟(以秒为单位),以减少服务器负载。默认值:0.5。
  • --no-robots--忽略 robots.txt 中的规则。小心使用,因为网站可能不允许自动遍历。
  • --include-subdomains(包含子域)--在遍历过程中包含子域。默认已启用。
  • --精确主机--限制遍历精确主机(无子域)。
  • --output - 保存找到的地址的文件路径(每行一个)。如果未提供,地址将打印到控制台。

处理混淆和动态内容

当你运行脚本时,事情并不总是那么简单:许多网站故意隐藏电子邮件地址,或者只在 JavaScript 渲染后才显示它们。以下是可能会出现的问题以及处理方法。

潜在问题

1. 混淆

网站通常会使用一些技术来隐藏地址,以防机器人入侵:

  • JavaScript 将地址的各个部分(如 user + "@" + domain.com)组合在一起;
  • 加密或编码字符串(如 Base64、HTML 实体);
  • 隐藏部分地址的 HTML 注释或插入内容;
  • 电子邮件为图像(文本图片),在这种情况下,脚本什么也看不到;
  • 字符替换:user [at] example [dot] com 和其他 "人类可读 "的形式(地址篡改)。

2.动态页面

现代网站经常通过 JavaScript 加载内容(如获取、AJAX)。普通的 requests.get() 可能会返回一个没有电子邮件内容的 "空" HTML 外壳。

克服这些障碍的方法

遇到此类网页时的实用方法:

  1. Selenium 或 Playwright

    启动浏览器,让页面 "加载",等待所需的元素,然后捕获完整的 HTML。如果电子邮件是在呈现后通过 JS 注入的,这种方法就会起作用。

  2. API 调用

    通常情况下,页面确实会从 API 中提取数据。检查网络请求(DevTools → Network),看看是否有以 JSON 格式返回电子邮件或联系信息的请求。如果有,最好直接使用 API。

  3. 解析内联 JS/脚本

    有时,地址会 "嵌入 "到 JavaScript 中(如 Base64 字符串或分割成若干部分)。您可以解释 JS,提取字符串,并解码地址。

  4. 如果电子邮件是图像

    下载图像并应用 OCR(光学字符识别),例如使用 Tesseract。这需要更多资源,但有时是必要的。

  5. 延迟和时间安排

    有些元素会在几秒后或特定事件(滚动、点击)发生后出现。这样做是有意义的:

    • 使用 sleep() 或等待选择器;
    • 尝试多次;
    • 采用 "找不到重试 "策略。

结论

通过应用本文讨论的使用 Python 进行电子邮件刮擦的技术,您可以让脚本在实际条件下可靠地工作。请记住,数据质量会直接影响后续营销活动的效果,因此值得从一开始就实施过滤、验证和保存为方便的格式。

评论:

0 评论