创建一个 Python 脚本来刮擦 Twitter 数据确实有助于收集洞察力,如用户评论或围绕特定主题的讨论,这对营销和研究大有帮助。使用此类脚本进行自动化可简化收集过程,使其快速高效。
在开始编写实际代码之前,您必须安装 2 个软件包。您还需要一个 Python 软件包管理器 (PIP) 来安装这些软件包。幸运的是,一旦您在机器上安装了 Python,PIP 也就安装好了。要安装这些软件包,只需在命令行界面 (CLI) 中运行以下命令即可。
pip install selenium-wire selenium undetected-chromedriver
安装完成后,必须将这些软件包导入 Python 文件,如下图所示。
from seleniumwire import webdriver as wiredriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import json
import undetected_chromedriver as uc
import random
Import ssl
我们已经多次证明,在搜索过程中使用代理非常重要。Twitter 是不允许数据搜刮的社交媒体平台之一,为了安全起见,避免被禁言,您应该使用代理。
你只需提供代理地址、代理用户名和密码,你的 IP 就会被屏蔽和保护起来。运行无头浏览器(基本上等同于运行无界面浏览器)有助于加快搜索过程,这也是我们在选项中添加无头标志的原因。
# 在代理列表中指定带有用户名和密码的代理服务器地址
proxies = [
"proxy_username:proxy_password@proxy_address:port_number",
]
# 函数获取随机代理
def get_proxy():
return random.choice(proxies)
# Set up Chrome options with the proxy and authentication
chrome_options = Options()
chrome_options.add_argument("--headless")
proxy = get_proxy()
proxy_options = {
"proxy": {
"http": f"http://{proxy}",
"https": f"https://{proxy}",
}
}
要使用 Python 有效地抓取 Twitter 数据,脚本需要 Twitter 账户的访问凭证,包括用户名和密码。
此外,您还必须指定一个搜索关键字。脚本会使用 https://twitter.com/search?q={search_keyword}&src=typed_query&f=top 命令构建一个 URL,以便在 Twitter 上搜索该关键词。
下一步是创建一个 ChromeDriver 实例,并将代理详细信息作为一个选项。这一设置将引导 ChromeDriver 在加载页面时使用特定的 IP 地址。设置完成后,搜索 URL 将根据这些配置加载。加载页面后,必须登录才能访问搜索结果。脚本使用 WebDriverWait,通过检查是否存在用户名输入区域来验证页面是否已完全加载。如果该区域无法加载,建议终止 ChromeDriver 实例。
search_keyword = input("What topic on X/Twitter would you like to gather data on?\n").replace(' ', '%20')
constructed_url = f"https://twitter.com/search?q={search_keyword}&src=typed_query&f=top"
# 在此提供您的 X/Twitter 用户名和密码
x_username = ""
x_password = ""
print(f'Opening {constructed_url} in Chrome...')
# 使用未检测到的 Chrome 驱动程序创建 WebDriver 实例
driver = uc.Chrome(options=chrome_options, seleniumwire_options=proxy_options)
driver.get(constructed_url)
try:
element = WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.XPATH, "//div[@class='css-175oi2r r-1mmae3n r-1e084wir-13qz1uu']"))
)
except Exception as err:
print(f'WebDriver Wait Error: Most likely Network TimeOut: Details\n{err}')
driver.quit()
#登录
if element:
username_field = driver.find_element(By.XPATH, "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']")
username_field.send_keys(x_username)
username_field..send_keys(Keys.ENTER)
password_field = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']"))
)
password_field.send_keys(x_password)
password_field.send_keys(Keys.ENTER)
print("Sign In Successful...\n")
sleep(10)
创建一个名为 results 的列表变量,以字典格式系统地存储所有收集到的数据。随后,建立一个名为 scrape() 的函数,系统地收集每条推文的大量数据,包括显示名称、用户名、文章内容等关键细节,以及赞数和印象数等指标。
为了保证列表长度的一致性,我们采用了一种积极的方法。min() 函数确保每个列表的长度与其他列表一致。通过采用这种方法,我们确保以同步和结构化的方式收集和处理 Twitter 数据。
当我们抓取虚荣心号码/指标时,它们会以字符串而非数字的形式返回。然后,我们需要使用 convert_to_numeric()将字符串转换为数字,这样就可以按印象对结果进行整理。
results = []
# 刮削
def scrape():
display_names = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[1]/div/a/div/div[1]/span/span')
usernames = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[2]/div/div[1]/a/div/span')
posts = driver.find_elements(By.XPATH,
'//*[@class="css-146c3p1 r-8akbws r-krxsd3 r-dnmrzs r-1udh08x r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-bnwqim"]/span')
comments = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[1]/button/div/div[2]/span/span/span')
retweets = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[2]/button/div/div[2]/span/span/span')
likes = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[3]/button/div/div[2]/span/span/span')
impressions = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[4]/a/div/div[2]/span/span/span')
min_length = min(len(display_names), len(usernames), len(posts), len(comments), len(retweets), len(likes),
len(impressions))
for each in range(min_length):
results.append({
'Username': usernames[each].text,
'displayName': display_names[each].text,
'Post': posts[each].text.rstrip("Show more"),
'Comments': 0 if comments[each].text == "" else convert_to_numeric(comments[each].text),
'Retweets': 0 if retweets[each].text == "" else convert_to_numeric(retweets[each].text),
'Likes': 0 if likes[each].text == "" else convert_to_numeric(likes[each].text),
'Impressions': 0 if impressions[each].text == "" else convert_to_numeric(impressions[each].text)
})
def reorder_json_by_impressions(json_data):
# 根据 "印象 "以降序对 JSON 列表进行就地排序
json_data.sort(key=lambda x: int(x['Impressions']), reverse=True)
def organize_write_data(data: dict):
output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
try:
with open("result.json", 'w', encoding='utf-8') as file:
file.write(output)
except Exception as err:
print(f"Error encountered: {err}")
def convert_to_numeric(value):
multipliers = {'K': 10 ** 3, 'M': 10 ** 6, 'B': 10 ** 9}
try:
if value[-1] in multipliers:
return int(float(value[:-1]) * multipliers[value[-1]])
else:
return int(value)
except ValueError:
# 处理转换失败的情况
return None
为了更好地组织数据,我们创建了一个函数,该函数可以获取结果,并根据每条推文收集的印象数按降序排列。从逻辑上讲,我们希望先看到虚荣心最高的推文,然后再看到其他推文。
def reorder_json_by_impressions(json_data):
# 根据 "印象 "以降序对 JSON 列表进行就地排序
json_data.sort(key=lambda x:int(x['Impressions']), reverse=True)
JSON 文件是将收集到的所有数据可视化的最佳方式。写入 JSON 文件就像在 Python 中写入其他文件一样。唯一不同的是,在将数据写入文件之前,我们需要 JSON 模块来正确格式化数据。
如果代码运行正确,文件结构中就会出现 result.json 文件,其中就会出现下图所示的结果。
def organize_write_data(data:dict):
output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
try:
with open("result.json", 'w', encoding='utf-8') as file:
file.write(output)
except Exception as err:
print(f"Error encountered: {err}")
要开始执行代码,我们需要依次调用函数以开始数据刮擦。我们使用 Selenium 中的 ActionChains 模块创建一个引用,以方便各种 Selenium 操作。该模块对于模拟页面向下滚动至关重要。
第一轮是从当前加载的页面中抓取数据。随后,启动一个循环,迭代五次,期间页面向下滚动,然后在下一次刮擦迭代之前暂停五秒钟。
用户可以调整循环的范围,增加或减少循环的范围,以定制刮擦的数据量。需要注意的是,如果没有其他内容需要显示,脚本就会持续搜刮相同的数据,从而造成冗余。为避免出现这种情况,请相应调整循环范围,以避免冗余数据记录。
actions = ActionChains(driver)
for i in range(5):
actions.send_keys(Keys.END).perform()
sleep(5)
scrape()
reorder_json_by_impressions(results)
organize_write_data(results)
print(f"Scraping Information on {search_keyword} is done.")
driver.quit()
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import json
import undetected_chromedriver as uc
import random
import ssl
ssl._create_default_https_context = ssl._create_stdlib_context
search_keyword = input("What topic on X/Twitter would you like to gather data on?\n").replace(' ', '%20')
constructed_url = f"https://twitter.com/search?q={search_keyword}&src=typed_query&f=top"
# 在此提供您的 X/Twitter 用户名和密码
x_username = ""
x_password = ""
print(f'Opening {constructed_url} in Chrome...')
# 在代理列表中指定带有用户名和密码的代理服务器地址
proxies = [
"USERNAME:PASSWORD@IP:PORT",
]
# 函数获取随机代理
def get_proxy():
return random.choice(proxies)
# 使用代理和身份验证设置 Chrome 浏览器选项
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument('--ignore-ssl-errors')
proxy = get_proxy()
proxy_options = {
"proxy": {
"http": f"http://{proxy}",
"https": f"https://{proxy}",
}
}
# 使用未检测到的 Chrome 驱动程序创建 WebDriver 实例
driver = uc.Chrome(options=chrome_options, seleniumwire_options=proxy_options)
driver.get(constructed_url)
try:
element = WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.XPATH, "//div[@class='css-175oi2r r-1mmae3n r-1e084wi r-13qz1uu']"))
)
except Exception as err:
print(f'WebDriver Wait Error: Most likely Network TimeOut: Details\n{err}')
driver.quit()
# 登录
if element:
username_field = driver.find_element(By.XPATH,
"//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']")
username_field.send_keys(x_username)
username_field.send_keys(Keys.ENTER)
password_field = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH,
"//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']"))
)
password_field.send_keys(x_password)
password_field.send_keys(Keys.ENTER)
print("Sign In Successful...\n")
sleep(10)
results = []
# 刮削
def scrape():
display_names = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[1]/div/a/div/div[1]/span/span')
usernames = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[2]/div/div[1]/a/div/span')
posts = driver.find_elements(By.XPATH,
'//*[@class="css-146c3p1 r-8akbws r-krxsd3 r-dnmrzs r-1udh08x r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-bnwqim"]/span')
comments = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[1]/button/div/div[2]/span/span/span')
retweets = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[2]/button/div/div[2]/span/span/span')
likes = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[3]/button/div/div[2]/span/span/span')
impressions = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[4]/a/div/div[2]/span/span/span')
min_length = min(len(display_names), len(usernames), len(posts), len(comments), len(retweets), len(likes),
len(impressions))
for each in range(min_length):
results.append({
'Username': usernames[each].text,
'displayName': display_names[each].text,
'Post': posts[each].text.rstrip("Show more"),
'Comments': 0 if comments[each].text == "" else convert_to_numeric(comments[each].text),
'Retweets': 0 if retweets[each].text == "" else convert_to_numeric(retweets[each].text),
'Likes': 0 if likes[each].text == "" else convert_to_numeric(likes[each].text),
'Impressions': 0 if impressions[each].text == "" else convert_to_numeric(impressions[each].text)
})
def reorder_json_by_impressions(json_data):
# 根据 "印象 "以降序对 JSON 列表进行就地排序
json_data.sort(key=lambda x: int(x['Impressions']), reverse=True)
def organize_write_data(data: dict):
output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
try:
with open("result.json", 'w', encoding='utf-8') as file:
file.write(output)
except Exception as err:
print(f"Error encountered: {err}")
def convert_to_numeric(value):
multipliers = {'K': 10 ** 3, 'M': 10 ** 6, 'B': 10 ** 9}
try:
if value[-1] in multipliers:
return int(float(value[:-1]) * multipliers[value[-1]])
else:
return int(value)
except ValueError:
# 处理转换失败的情况
return None
actions = ActionChains(driver)
for i in range(5):
actions.send_keys(Keys.END).perform()
sleep(5)
scrape()
reorder_json_by_impressions(results)
organize_write_data(results)
print(f"Scraping Information on {search_keyword} is done.")
driver.quit()
下面是扫描完成后的 JSON 文件:
[
{
"Username": "@LindaEvelyn_N",
"displayName": "Linda Evelyn Namulindwa",
"Post": "Still getting used to Ugandan local foods so I had Glovo deliver me a KFC Streetwise Spicy rice meal (2 pcs of chicken & jollof rice at Ugx 18,000)\n\nNot only was it fast but it also accepts all payment methods.\n\n#GlovoDeliversKFC\n#ItsFingerLinkingGood",
"Comments": 105,
"Retweets": 148,
"Likes": 1500,
"Impressions": 66000
},
{
"Username": "@GymCheff",
"displayName": "The Gym Chef",
"Post": "Delicious High Protein KFC Zinger Rice Box!",
"Comments": 1,
"Retweets": 68,
"Likes": 363,
"Impressions": 49000
}
]
该指南可用于搜索各种相关主题的数据,为公众情绪分析、趋势跟踪、监测和声誉管理方面的研究提供便利。而 Python 则利用其大量内置模块和函数简化了自动数据收集过程。这些工具对于配置代理、管理页面滚动和有效组织收集到的信息至关重要。
Мы получили вашу заявку!
Ответ будет отправлен на почту в ближайшее время.
С уважением proxy-seller.com!
评论: 0