A criação de um script Python para extrair dados do Twitter é, de facto, útil para recolher informações, como comentários de utilizadores ou discussões sobre tópicos específicos, que podem ajudar muito no marketing e na investigação. A automatização utilizando estes scripts simplifica o processo de recolha, tornando-o rápido e eficiente.
Existem 2 pacotes que tem de instalar antes de começar a escrever o código propriamente dito. Também precisa de um gestor de pacotes para pacotes Python (PIP) para instalar estes pacotes. Felizmente, assim que instala o Python na sua máquina, o PIP também é instalado. Para instalar estes pacotes, só precisa de executar o comando abaixo na sua interface de linha de comandos (CLI).
pip install selenium-wire selenium undetected-chromedriver
Quando a instalação estiver concluída, deve importar estes pacotes para o seu ficheiro Python, como se mostra abaixo.
from seleniumwire import webdriver as wiredriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import json
import undetected_chromedriver as uc
import random
Import ssl
Foi estabelecido várias vezes que a utilização de um proxy durante a recolha de dados é importante. O Twitter é uma das plataformas de redes sociais que desaprova a recolha de dados e, para estar seguro e evitar uma proibição, deve utilizar um proxy.
Tudo o que tem de fazer é fornecer o seu endereço de proxy, o nome de utilizador e a palavra-passe do proxy e o seu IP deverá estar agora mascarado e protegido. Executar um browser sem cabeça, basicamente o mesmo que executar um browser sem uma interface, ajuda a acelerar o processo de recolha de dados, razão pela qual adicionámos o sinalizador sem cabeça nas opções.
# Especificar o endereço do servidor proxy com nome de utilizador e palavra-passe numa Lista de proxies
proxies = [
"proxy_username:proxy_password@proxy_address:port_number",
]
# para obter um proxy aleatório
def get_proxy():
return random.choice(proxies)
# Configurar as opções do Chrome com o proxy e a autenticação
chrome_options = Options()
chrome_options.add_argument("--headless")
proxy = get_proxy()
proxy_options = {
"proxy": {
"http": f"http://{proxy}",
"https": f"https://{proxy}",
}
}
Para extrair efetivamente os dados do Twitter usando Python, o script requer credenciais de acesso para a conta do Twitter, incluindo o nome de usuário e a senha.
Além disso, é necessário especificar uma palavra-chave de pesquisa. O script usa o comando https://twitter.com/search?q={search_keyword}&src=typed_query&f=top para construir um URL que permite a busca por essa palavra-chave no Twitter.
A próxima etapa envolve a criação de uma instância do ChromeDriver, incorporando detalhes de proxy como uma opção. Essa configuração direciona o ChromeDriver para usar um endereço IP específico ao carregar a página. Após esta configuração, o URL de pesquisa é carregado com estas configurações. Depois de a página ser carregada, é necessário iniciar sessão para aceder aos resultados da pesquisa. Usando WebDriverWait, o script verifica se a página está totalmente carregada, verificando a presença da área de entrada do nome de usuário. Se essa área não for carregada, é aconselhável encerrar a instância do ChromeDriver.
search_keyword = input("What topic on X/Twitter would you like to gather data on?\n").replace(' ', '%20')
constructed_url = f"https://twitter.com/search?q={search_keyword}&src=typed_query&f=top"
# forneça o seu nome de utilizador e palavra-passe X/Twitter aqui
x_username = ""
x_password = ""
print(f'Opening {constructed_url} in Chrome...')
# Criar uma instância WebDriver com um controlador Chrome não detectado
driver = uc.Chrome(options=chrome_options, seleniumwire_options=proxy_options)
driver.get(constructed_url)
try:
element = WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.XPATH, "//div[@class='css-175oi2r r-1mmae3n r-1e084wir-13qz1uu']"))
)
except Exception as err:
print(f'WebDriver Wait Error: Most likely Network TimeOut: Details\n{err}')
driver.quit()
#Iniciar sessão
if element:
username_field = driver.find_element(By.XPATH, "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']")
username_field.send_keys(x_username)
username_field..send_keys(Keys.ENTER)
password_field = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']"))
)
password_field.send_keys(x_password)
password_field.send_keys(Keys.ENTER)
print("Sign In Successful...\n")
sleep(10)
Crie uma variável de lista, resultados, para armazenar sistematicamente todos os dados coletados no formato de dicionários. Posteriormente, estabeleça uma função chamada scrape() para coletar sistematicamente uma grande quantidade de dados para cada tweet, abrangendo detalhes cruciais como o nome de exibição, o nome de usuário, o conteúdo da postagem e métricas como curtidas e impressões.
Uma abordagem proativa foi adotada para garantir uniformidade nos comprimentos das listas. A função min() garante que o comprimento de cada lista esteja alinhado com os outros. Ao aderir a essa metodologia, garantimos uma abordagem sincronizada e estruturada para coletar e processar dados do Twitter.
Quando coletamos os números/métricas de vaidade, eles são retornados como strings e não como números. Então, precisamos converter as strings em números usando convert_to_numeric() para que o resultado possa ser organizado por impressões.
results = []
# Raspagem
def scrape():
display_names = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[1]/div/a/div/div[1]/span/span')
usernames = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[2]/div/div[1]/a/div/span')
posts = driver.find_elements(By.XPATH,
'//*[@class="css-146c3p1 r-8akbws r-krxsd3 r-dnmrzs r-1udh08x r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-bnwqim"]/span')
comments = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[1]/button/div/div[2]/span/span/span')
retweets = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[2]/button/div/div[2]/span/span/span')
likes = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[3]/button/div/div[2]/span/span/span')
impressions = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[4]/a/div/div[2]/span/span/span')
min_length = min(len(display_names), len(usernames), len(posts), len(comments), len(retweets), len(likes),
len(impressions))
for each in range(min_length):
results.append({
'Username': usernames[each].text,
'displayName': display_names[each].text,
'Post': posts[each].text.rstrip("Show more"),
'Comments': 0 if comments[each].text == "" else convert_to_numeric(comments[each].text),
'Retweets': 0 if retweets[each].text == "" else convert_to_numeric(retweets[each].text),
'Likes': 0 if likes[each].text == "" else convert_to_numeric(likes[each].text),
'Impressions': 0 if impressions[each].text == "" else convert_to_numeric(impressions[each].text)
})
def reorder_json_by_impressions(json_data):
# Ordenar a lista JSON no local com base em "Impressões" por ordem descendente
json_data.sort(key=lambda x: int(x['Impressions']), reverse=True)
def organize_write_data(data: dict):
output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
try:
with open("result.json", 'w', encoding='utf-8') as file:
file.write(output)
except Exception as err:
print(f"Error encountered: {err}")
def convert_to_numeric(value):
multipliers = {'K': 10 ** 3, 'M': 10 ** 6, 'B': 10 ** 9}
try:
if value[-1] in multipliers:
return int(float(value[:-1]) * multipliers[value[-1]])
else:
return int(value)
except ValueError:
# Tratar o caso em que a conversão falha
return None
Para organizar melhor os dados, criamos uma função que pega os resultados e classifica os tweets em ordem decrescente usando o número de impressões coletadas por cada tweet. Logicamente, queremos ver o tweet com o maior número de vaidade primeiro, antes dos outros.
def reorder_json_by_impressions(json_data):
# Ordenar a lista JSON no local com base em "Impressões" por ordem descendente
json_data.sort(key=lambda x:int(x['Impressions']), reverse=True)
Um ficheiro JSON é a melhor forma de visualizar todos os dados recolhidos. Escrever para um arquivo JSON é como escrever para qualquer outro arquivo em Python. A única diferença é que precisamos do módulo JSON para formatar corretamente os dados antes de serem gravados no arquivo.
Se o código foi executado corretamente, você deve ver um arquivo result.json na estrutura do arquivo e nele deve estar o resultado, conforme mostrado na seção abaixo.
def organize_write_data(data:dict):
output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
try:
with open("result.json", 'w', encoding='utf-8') as file:
file.write(output)
except Exception as err:
print(f"Error encountered: {err}")
Para iniciar a execução do código, precisamos chamar nossas funções sequencialmente para começar a raspagem de dados. Criamos uma referência usando o módulo ActionChains no Selenium para facilitar várias ações do Selenium. Esse módulo é essencial para simular a rolagem para baixo na página.
A primeira rodada envolve a coleta de dados da página atualmente carregada. Posteriormente, um loop é iniciado, iterando cinco vezes, durante o qual a página é rolada para baixo, seguida por uma pausa de cinco segundos antes da próxima iteração de raspagem.
Os utilizadores podem ajustar o intervalo do ciclo, aumentando-o ou diminuindo-o para personalizar o volume de dados recolhidos. É crucial observar que, se não houver conteúdo adicional a ser exibido, o script coletará persistentemente os mesmos dados, resultando em redundância. Para evitar isso, ajuste o intervalo de loop de acordo para evitar a gravação de dados redundantes.
actions = ActionChains(driver)
for i in range(5):
actions.send_keys(Keys.END).perform()
sleep(5)
scrape()
reorder_json_by_impressions(results)
organize_write_data(results)
print(f"Scraping Information on {search_keyword} is done.")
driver.quit()
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import json
import undetected_chromedriver as uc
import random
import ssl
ssl._create_default_https_context = ssl._create_stdlib_context
search_keyword = input("What topic on X/Twitter would you like to gather data on?\n").replace(' ', '%20')
constructed_url = f"https://twitter.com/search?q={search_keyword}&src=typed_query&f=top"
# forneça o seu nome de utilizador e palavra-passe X/Twitter aqui
x_username = ""
x_password = ""
print(f'Opening {constructed_url} in Chrome...')
# Especificar o endereço do servidor proxy com nome de utilizador e palavra-passe numa Lista de proxies
proxies = [
"USERNAME:PASSWORD@IP:PORT",
]
# para obter um proxy aleatório
def get_proxy():
return random.choice(proxies)
# Configurar as opções do Chrome com o proxy e a autenticação
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument('--ignore-ssl-errors')
proxy = get_proxy()
proxy_options = {
"proxy": {
"http": f"http://{proxy}",
"https": f"https://{proxy}",
}
}
# Criar uma instância WebDriver com um controlador chrome não detectado
driver = uc.Chrome(options=chrome_options, seleniumwire_options=proxy_options)
driver.get(constructed_url)
try:
element = WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.XPATH, "//div[@class='css-175oi2r r-1mmae3n r-1e084wi r-13qz1uu']"))
)
except Exception as err:
print(f'WebDriver Wait Error: Most likely Network TimeOut: Details\n{err}')
driver.quit()
# Iniciar sessão
if element:
username_field = driver.find_element(By.XPATH,
"//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']")
username_field.send_keys(x_username)
username_field.send_keys(Keys.ENTER)
password_field = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH,
"//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']"))
)
password_field.send_keys(x_password)
password_field.send_keys(Keys.ENTER)
print("Sign In Successful...\n")
sleep(10)
results = []
# Raspagem
def scrape():
display_names = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[1]/div/a/div/div[1]/span/span')
usernames = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[2]/div/div[1]/a/div/span')
posts = driver.find_elements(By.XPATH,
'//*[@class="css-146c3p1 r-8akbws r-krxsd3 r-dnmrzs r-1udh08x r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-bnwqim"]/span')
comments = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[1]/button/div/div[2]/span/span/span')
retweets = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[2]/button/div/div[2]/span/span/span')
likes = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[3]/button/div/div[2]/span/span/span')
impressions = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[4]/a/div/div[2]/span/span/span')
min_length = min(len(display_names), len(usernames), len(posts), len(comments), len(retweets), len(likes),
len(impressions))
for each in range(min_length):
results.append({
'Username': usernames[each].text,
'displayName': display_names[each].text,
'Post': posts[each].text.rstrip("Show more"),
'Comments': 0 if comments[each].text == "" else convert_to_numeric(comments[each].text),
'Retweets': 0 if retweets[each].text == "" else convert_to_numeric(retweets[each].text),
'Likes': 0 if likes[each].text == "" else convert_to_numeric(likes[each].text),
'Impressions': 0 if impressions[each].text == "" else convert_to_numeric(impressions[each].text)
})
def reorder_json_by_impressions(json_data):
# Ordenar a lista JSON no local com base em "Impressões" por ordem descendente
json_data.sort(key=lambda x: int(x['Impressions']), reverse=True)
def organize_write_data(data: dict):
output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
try:
with open("result.json", 'w', encoding='utf-8') as file:
file.write(output)
except Exception as err:
print(f"Error encountered: {err}")
def convert_to_numeric(value):
multipliers = {'K': 10 ** 3, 'M': 10 ** 6, 'B': 10 ** 9}
try:
if value[-1] in multipliers:
return int(float(value[:-1]) * multipliers[value[-1]])
else:
return int(value)
except ValueError:
# Tratar o caso em que a conversão falha
return None
actions = ActionChains(driver)
for i in range(5):
actions.send_keys(Keys.END).perform()
sleep(5)
scrape()
reorder_json_by_impressions(results)
organize_write_data(results)
print(f"Scraping Information on {search_keyword} is done.")
driver.quit()
Eis como deve ficar o ficheiro JSON depois de terminada a recolha de dados:
[
{
"Username": "@LindaEvelyn_N",
"displayName": "Linda Evelyn Namulindwa",
"Post": "Still getting used to Ugandan local foods so I had Glovo deliver me a KFC Streetwise Spicy rice meal (2 pcs of chicken & jollof rice at Ugx 18,000)\n\nNot only was it fast but it also accepts all payment methods.\n\n#GlovoDeliversKFC\n#ItsFingerLinkingGood",
"Comments": 105,
"Retweets": 148,
"Likes": 1500,
"Impressions": 66000
},
{
"Username": "@GymCheff",
"displayName": "The Gym Chef",
"Post": "Delicious High Protein KFC Zinger Rice Box!",
"Comments": 1,
"Retweets": 68,
"Likes": 363,
"Impressions": 49000
}
]
O guia delineado pode ser utilizado para recolher dados sobre vários tópicos de interesse, facilitando os estudos de análise do sentimento do público, acompanhamento de tendências, monitorização e gestão da reputação. O Python, por sua vez, simplifica o processo de recolha automática de dados com a sua extensa gama de módulos e funções incorporados. Estas ferramentas são essenciais para configurar proxies, gerir a deslocação da página e organizar eficazmente a informação recolhida.
Мы получили вашу заявку!
Ответ будет отправлен на почту в ближайшее время.
С уважением proxy-seller.com!
Comentários: 0