如何使用 python 脚本抓取 Twitter 数据

评论: 0

创建一个 Python 脚本来刮擦 Twitter 数据确实有助于收集洞察力,如用户评论或围绕特定主题的讨论,这对营销和研究大有帮助。使用此类脚本进行自动化可简化收集过程,使其快速高效。

步骤 1:安装和导入

在开始编写实际代码之前,您必须安装 2 个软件包。您还需要一个 Python 软件包管理器 (PIP) 来安装这些软件包。幸运的是,一旦您在机器上安装了 Python,PIP 也就安装好了。要安装这些软件包,只需在命令行界面 (CLI) 中运行以下命令即可。

pip install selenium-wire selenium undetected-chromedriver

安装完成后,必须将这些软件包导入 Python 文件,如下图所示。

from seleniumwire import webdriver as wiredriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import json
import undetected_chromedriver as uc
import random
Import ssl
  • Seleniumwire:通过添加直接配置代理的功能来增强 Selenium,这对于避免在刮擦活动中出现阻塞至关重要。
  • Selenium:利用 ActionChains 和 "Keys"(用于模拟浏览器操作)、"By"(用于元素搜索)、"WebDriverWait"(用于基于条件的执行)和 "expected_conditions"(用于基于条件的执行)等工具,为数据搜刮提供便利。
  • 未检测到的 Chromedriver:改变 ChromeDriver 与 Selenium Wire 一起使用,以规避网站上的僵尸检测机制,降低拦截风险。
  • time、random、json:用于管理操作定时和处理 JSON 格式数据的标准 Python 库。

步骤 2:代理初始化

我们已经多次证明,在搜索过程中使用代理非常重要。Twitter 是不允许数据搜刮的社交媒体平台之一,为了安全起见,避免被禁言,您应该使用代理。

你只需提供代理地址、代理用户名和密码,你的 IP 就会被屏蔽和保护起来。运行无头浏览器(基本上等同于运行无界面浏览器)有助于加快搜索过程,这也是我们在选项中添加无头标志的原因。

# 在代理列表中指定带有用户名和密码的代理服务器地址
proxies = [
    "proxy_username:proxy_password@proxy_address:port_number",
]




# 函数获取随机代理
def get_proxy():
    return random.choice(proxies)


# Set up Chrome options with the proxy and authentication
chrome_options = Options()
chrome_options.add_argument("--headless")


proxy = get_proxy()
proxy_options = {
    "proxy": {
        "http": f"http://{proxy}",
        "https": f"https://{proxy}",
    }
}

步骤 3:如何登录 X/Twitter

要使用 Python 有效地抓取 Twitter 数据,脚本需要 Twitter 账户的访问凭证,包括用户名和密码。

此外,您还必须指定一个搜索关键字。脚本会使用 https://twitter.com/search?q={search_keyword}&src=typed_query&f=top 命令构建一个 URL,以便在 Twitter 上搜索该关键词。

下一步是创建一个 ChromeDriver 实例,并将代理详细信息作为一个选项。这一设置将引导 ChromeDriver 在加载页面时使用特定的 IP 地址。设置完成后,搜索 URL 将根据这些配置加载。加载页面后,必须登录才能访问搜索结果。脚本使用 WebDriverWait,通过检查是否存在用户名输入区域来验证页面是否已完全加载。如果该区域无法加载,建议终止 ChromeDriver 实例。

search_keyword = input("What topic on X/Twitter would you like to gather data on?\n").replace(' ', '%20')
constructed_url = f"https://twitter.com/search?q={search_keyword}&src=typed_query&f=top"


# 在此提供您的 X/Twitter 用户名和密码
x_username = "" 
x_password = ""


print(f'Opening {constructed_url} in Chrome...')


# 使用未检测到的 Chrome 驱动程序创建 WebDriver 实例
driver = uc.Chrome(options=chrome_options, seleniumwire_options=proxy_options)


driver.get(constructed_url)


try:
    element = WebDriverWait(driver, 20).until(
        EC.presence_of_element_located((By.XPATH, "//div[@class='css-175oi2r r-1mmae3n r-1e084wir-13qz1uu']"))
    )
except Exception as err:
    print(f'WebDriver Wait Error: Most likely Network TimeOut: Details\n{err}')
    driver.quit()


#登录
if element:
    username_field = driver.find_element(By.XPATH, "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']")
    username_field.send_keys(x_username)
    username_field..send_keys(Keys.ENTER)


    password_field = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.XPATH, "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']"))
    )
    password_field.send_keys(x_password)
    password_field.send_keys(Keys.ENTER)


    print("Sign In Successful...\n")


    sleep(10)

步骤 4:提取最重要的结果

创建一个名为 results 的列表变量,以字典格式系统地存储所有收集到的数据。随后,建立一个名为 scrape() 的函数,系统地收集每条推文的大量数据,包括显示名称、用户名、文章内容等关键细节,以及赞数和印象数等指标。

为了保证列表长度的一致性,我们采用了一种积极的方法。min() 函数确保每个列表的长度与其他列表一致。通过采用这种方法,我们确保以同步和结构化的方式收集和处理 Twitter 数据。

当我们抓取虚荣心号码/指标时,它们会以字符串而非数字的形式返回。然后,我们需要使用 convert_to_numeric()将字符串转换为数字,这样就可以按印象对结果进行整理。

results = []


# 刮削
def scrape():
   display_names = driver.find_elements(By.XPATH,
                                        '//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[1]/div/a/div/div[1]/span/span')
   usernames = driver.find_elements(By.XPATH,
                                    '//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[2]/div/div[1]/a/div/span')
   posts = driver.find_elements(By.XPATH,
                                '//*[@class="css-146c3p1 r-8akbws r-krxsd3 r-dnmrzs r-1udh08x r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-bnwqim"]/span')
   comments = driver.find_elements(By.XPATH,
                                   '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[1]/button/div/div[2]/span/span/span')
   retweets = driver.find_elements(By.XPATH,
                                   '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[2]/button/div/div[2]/span/span/span')
   likes = driver.find_elements(By.XPATH,
                                '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[3]/button/div/div[2]/span/span/span')
   impressions = driver.find_elements(By.XPATH,
                                      '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[4]/a/div/div[2]/span/span/span')

   min_length = min(len(display_names), len(usernames), len(posts), len(comments), len(retweets), len(likes),
                    len(impressions))

   for each in range(min_length):
       results.append({
           'Username': usernames[each].text,
           'displayName': display_names[each].text,
           'Post': posts[each].text.rstrip("Show more"),
           'Comments': 0 if comments[each].text == "" else convert_to_numeric(comments[each].text),
           'Retweets': 0 if retweets[each].text == "" else convert_to_numeric(retweets[each].text),
           'Likes': 0 if likes[each].text == "" else convert_to_numeric(likes[each].text),
           'Impressions': 0 if impressions[each].text == "" else convert_to_numeric(impressions[each].text)
       })


def reorder_json_by_impressions(json_data):
   # 根据 "印象 "以降序对 JSON 列表进行就地排序
   json_data.sort(key=lambda x: int(x['Impressions']), reverse=True)


def organize_write_data(data: dict):
   output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
   try:
       with open("result.json", 'w', encoding='utf-8') as file:
           file.write(output)
   except Exception as err:
       print(f"Error encountered: {err}")


def convert_to_numeric(value):
   multipliers = {'K': 10 ** 3, 'M': 10 ** 6, 'B': 10 ** 9}

   try:
       if value[-1] in multipliers:
           return int(float(value[:-1]) * multipliers[value[-1]])
       else:
           return int(value)
   except ValueError:
       # 处理转换失败的情况
       return None

步骤 5:整理数据

为了更好地组织数据,我们创建了一个函数,该函数可以获取结果,并根据每条推文收集的印象数按降序排列。从逻辑上讲,我们希望先看到虚荣心最高的推文,然后再看到其他推文。

def reorder_json_by_impressions(json_data):
    # 根据 "印象 "以降序对 JSON 列表进行就地排序
    json_data.sort(key=lambda x:int(x['Impressions']), reverse=True)

写入 JSON 文件

JSON 文件是将收集到的所有数据可视化的最佳方式。写入 JSON 文件就像在 Python 中写入其他文件一样。唯一不同的是,在将数据写入文件之前,我们需要 JSON 模块来正确格式化数据。

如果代码运行正确,文件结构中就会出现 result.json 文件,其中就会出现下图所示的结果。

def organize_write_data(data:dict):
    output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
    try:
        with open("result.json", 'w', encoding='utf-8') as file:
            file.write(output)
    except Exception as err:
        print(f"Error encountered: {err}") 

分页

要开始执行代码,我们需要依次调用函数以开始数据刮擦。我们使用 Selenium 中的 ActionChains 模块创建一个引用,以方便各种 Selenium 操作。该模块对于模拟页面向下滚动至关重要。

第一轮是从当前加载的页面中抓取数据。随后,启动一个循环,迭代五次,期间页面向下滚动,然后在下一次刮擦迭代之前暂停五秒钟。

用户可以调整循环的范围,增加或减少循环的范围,以定制刮擦的数据量。需要注意的是,如果没有其他内容需要显示,脚本就会持续搜刮相同的数据,从而造成冗余。为避免出现这种情况,请相应调整循环范围,以避免冗余数据记录。

actions = ActionChains(driver)
for i in range(5):
    actions.send_keys(Keys.END).perform()
    sleep(5)
    scrape()


reorder_json_by_impressions(results)
organize_write_data(results)


print(f"Scraping Information on {search_keyword} is done.")


driver.quit()

完整代码

from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import json
import undetected_chromedriver as uc
import random
import ssl

ssl._create_default_https_context = ssl._create_stdlib_context


search_keyword = input("What topic on X/Twitter would you like to gather data on?\n").replace(' ', '%20')
constructed_url = f"https://twitter.com/search?q={search_keyword}&src=typed_query&f=top"

# 在此提供您的 X/Twitter 用户名和密码
x_username = ""
x_password = ""

print(f'Opening {constructed_url} in Chrome...')

# 在代理列表中指定带有用户名和密码的代理服务器地址
proxies = [
   "USERNAME:PASSWORD@IP:PORT",
]


# 函数获取随机代理
def get_proxy():
   return random.choice(proxies)


# 使用代理和身份验证设置 Chrome 浏览器选项
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument('--ignore-ssl-errors')

proxy = get_proxy()
proxy_options = {
   "proxy": {
       "http": f"http://{proxy}",
       "https": f"https://{proxy}",
   }
}

# 使用未检测到的 Chrome 驱动程序创建 WebDriver 实例
driver = uc.Chrome(options=chrome_options, seleniumwire_options=proxy_options)

driver.get(constructed_url)

try:
   element = WebDriverWait(driver, 20).until(
       EC.presence_of_element_located((By.XPATH, "//div[@class='css-175oi2r r-1mmae3n r-1e084wi r-13qz1uu']"))
   )
except Exception as err:
   print(f'WebDriver Wait Error: Most likely Network TimeOut: Details\n{err}')
   driver.quit()

# 登录
if element:
   username_field = driver.find_element(By.XPATH,
                                        "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']")
   username_field.send_keys(x_username)
   username_field.send_keys(Keys.ENTER)

   password_field = WebDriverWait(driver, 10).until(
       EC.presence_of_element_located((By.XPATH,
                                       "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']"))
   )
   password_field.send_keys(x_password)
   password_field.send_keys(Keys.ENTER)

   print("Sign In Successful...\n")

   sleep(10)

results = []


# 刮削
def scrape():
   display_names = driver.find_elements(By.XPATH,
                                        '//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[1]/div/a/div/div[1]/span/span')
   usernames = driver.find_elements(By.XPATH,
                                    '//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[2]/div/div[1]/a/div/span')
   posts = driver.find_elements(By.XPATH,
                                '//*[@class="css-146c3p1 r-8akbws r-krxsd3 r-dnmrzs r-1udh08x r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-bnwqim"]/span')
   comments = driver.find_elements(By.XPATH,
                                   '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[1]/button/div/div[2]/span/span/span')
   retweets = driver.find_elements(By.XPATH,
                                   '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[2]/button/div/div[2]/span/span/span')
   likes = driver.find_elements(By.XPATH,
                                '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[3]/button/div/div[2]/span/span/span')
   impressions = driver.find_elements(By.XPATH,
                                      '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[4]/a/div/div[2]/span/span/span')

   min_length = min(len(display_names), len(usernames), len(posts), len(comments), len(retweets), len(likes),
                    len(impressions))

   for each in range(min_length):
       results.append({
           'Username': usernames[each].text,
           'displayName': display_names[each].text,
           'Post': posts[each].text.rstrip("Show more"),
           'Comments': 0 if comments[each].text == "" else convert_to_numeric(comments[each].text),
           'Retweets': 0 if retweets[each].text == "" else convert_to_numeric(retweets[each].text),
           'Likes': 0 if likes[each].text == "" else convert_to_numeric(likes[each].text),
           'Impressions': 0 if impressions[each].text == "" else convert_to_numeric(impressions[each].text)
       })


def reorder_json_by_impressions(json_data):
   # 根据 "印象 "以降序对 JSON 列表进行就地排序
   json_data.sort(key=lambda x: int(x['Impressions']), reverse=True)


def organize_write_data(data: dict):
   output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
   try:
       with open("result.json", 'w', encoding='utf-8') as file:
           file.write(output)
   except Exception as err:
       print(f"Error encountered: {err}")


def convert_to_numeric(value):
   multipliers = {'K': 10 ** 3, 'M': 10 ** 6, 'B': 10 ** 9}

   try:
       if value[-1] in multipliers:
           return int(float(value[:-1]) * multipliers[value[-1]])
       else:
           return int(value)
   except ValueError:
       # 处理转换失败的情况
       return None


actions = ActionChains(driver)
for i in range(5):
   actions.send_keys(Keys.END).perform()
   sleep(5)
   scrape()

reorder_json_by_impressions(results)
organize_write_data(results)

print(f"Scraping Information on {search_keyword} is done.")

driver.quit()

最终结果

下面是扫描完成后的 JSON 文件:

[
  {
    "Username": "@LindaEvelyn_N",
    "displayName": "Linda Evelyn Namulindwa",
    "Post": "Still getting used to Ugandan local foods so I had Glovo deliver me a KFC Streetwise Spicy rice meal (2 pcs of chicken & jollof rice at Ugx 18,000)\n\nNot only was it fast but it also accepts all payment methods.\n\n#GlovoDeliversKFC\n#ItsFingerLinkingGood",
    "Comments": 105,
    "Retweets": 148,
    "Likes": 1500,
    "Impressions": 66000
  },
  {
    "Username": "@GymCheff",
    "displayName": "The Gym Chef",
    "Post": "Delicious High Protein KFC Zinger Rice Box!",
    "Comments": 1,
    "Retweets": 68,
    "Likes": 363,
    "Impressions": 49000
  }
]

该指南可用于搜索各种相关主题的数据,为公众情绪分析、趋势跟踪、监测和声誉管理方面的研究提供便利。而 Python 则利用其大量内置模块和函数简化了自动数据收集过程。这些工具对于配置代理、管理页面滚动和有效组织收集到的信息至关重要。

评论:

0 评论