La creazione di uno script Python per lo scraping dei dati di Twitter è utile per raccogliere informazioni, come le recensioni degli utenti o le discussioni su argomenti specifici, che possono essere di grande aiuto per il marketing e la ricerca. L'automazione di tali script semplifica il processo di raccolta, rendendolo rapido ed efficiente.
Ci sono due pacchetti da installare prima di iniziare a scrivere il codice vero e proprio. Per installare questi pacchetti è necessario un gestore di pacchetti Python (PIP). Fortunatamente, una volta installato Python sulla macchina, viene installato anche PIP. Per installare questi pacchetti, è sufficiente eseguire il comando seguente nell'interfaccia a riga di comando (CLI).
pip install selenium-wire selenium undetected-chromedriver
Una volta completata l'installazione, è necessario importare questi pacchetti nel proprio file Python, come mostrato di seguito.
from seleniumwire import webdriver as wiredriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import json
import undetected_chromedriver as uc
import random
Import ssl
È stato stabilito più volte che l'uso di un proxy durante lo scraping è importante. Twitter è una delle piattaforme di social media che non vede di buon occhio lo scraping dei dati e per essere sicuri ed evitare un divieto, è necessario utilizzare un proxy.
È sufficiente fornire l'indirizzo del proxy, il nome utente e la password del proxy e il vostro IP sarà ora mascherato e protetto. L'esecuzione di un browser headless, in pratica come l'esecuzione di un browser senza interfaccia, aiuta a velocizzare il processo di scraping, motivo per cui abbiamo aggiunto il flag headless nelle opzioni.
# Specificare l'indirizzo del server proxy con nome utente e password in un Elenco di proxy
proxies = [
"proxy_username:proxy_password@proxy_address:port_number",
]
# per ottenere un proxy casuale
def get_proxy():
return random.choice(proxies)
# Impostare le opzioni di Chrome con il proxy e l'autenticazione
chrome_options = Options()
chrome_options.add_argument("--headless")
proxy = get_proxy()
proxy_options = {
"proxy": {
"http": f"http://{proxy}",
"https": f"https://{proxy}",
}
}
Per eseguire efficacemente lo scraping dei dati di Twitter utilizzando Python, lo script richiede le credenziali di accesso all'account Twitter, compresi il nome utente e la password.
Inoltre, è necessario specificare una parola chiave di ricerca. Lo script utilizza il comando https://twitter.com/search?q={search_keyword}&src=typed_query&f=top per costruire un URL che abilita la ricerca di questa parola chiave su Twitter.
Il passo successivo consiste nel creare un'istanza di ChromeDriver, incorporando i dettagli del proxy come opzione. Questa configurazione indica a ChromeDriver di utilizzare un indirizzo IP specifico durante il caricamento della pagina. Dopo questa impostazione, l'URL di ricerca viene caricato con queste configurazioni. Una volta caricata la pagina, è necessario effettuare il login per accedere ai risultati della ricerca. Utilizzando WebDriverWait, lo script verifica che la pagina sia completamente caricata controllando la presenza dell'area di inserimento del nome utente. Se quest'area non viene caricata, è consigliabile terminare l'istanza di ChromeDriver.
search_keyword = input("What topic on X/Twitter would you like to gather data on?\n").replace(' ', '%20')
constructed_url = f"https://twitter.com/search?q={search_keyword}&src=typed_query&f=top"
# fornire qui il nome utente e la password di X/Twitter
x_username = ""
x_password = ""
print(f'Opening {constructed_url} in Chrome...')
# Creare un'istanza di WebDriver con il driver di chrome non rilevato
driver = uc.Chrome(options=chrome_options, seleniumwire_options=proxy_options)
driver.get(constructed_url)
try:
element = WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.XPATH, "//div[@class='css-175oi2r r-1mmae3n r-1e084wir-13qz1uu']"))
)
except Exception as err:
print(f'WebDriver Wait Error: Most likely Network TimeOut: Details\n{err}')
driver.quit()
#Accedi
if element:
username_field = driver.find_element(By.XPATH, "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']")
username_field.send_keys(x_username)
username_field..send_keys(Keys.ENTER)
password_field = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']"))
)
password_field.send_keys(x_password)
password_field.send_keys(Keys.ENTER)
print("Sign In Successful...\n")
sleep(10)
Creare una variabile elenco, results, per memorizzare sistematicamente tutti i dati raccolti nel formato di un dizionario. Successivamente, creare una funzione denominata scrape() per raccogliere sistematicamente una serie di dati per ogni tweet, includendo dettagli cruciali come il nome visualizzato, il nome utente, il contenuto del post e metriche come i like e le impressioni.
È stato adottato un approccio proattivo per garantire l'uniformità delle lunghezze degli elenchi. La funzione min() assicura che la lunghezza di ciascun elenco sia allineata a quella degli altri. Seguendo questa metodologia, garantiamo un approccio sincronizzato e strutturato alla raccolta e all'elaborazione dei dati di Twitter.
Quando si esegue lo scraping dei numeri/metriche di vanità, questi vengono restituiti come stringhe e non come numeri. Quindi, dobbiamo convertire le stringhe in numeri usando convert_to_numeric(), in modo che il risultato possa essere organizzato per impressioni.
results = []
# Raschiare
def scrape():
display_names = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[1]/div/a/div/div[1]/span/span')
usernames = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[2]/div/div[1]/a/div/span')
posts = driver.find_elements(By.XPATH,
'//*[@class="css-146c3p1 r-8akbws r-krxsd3 r-dnmrzs r-1udh08x r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-bnwqim"]/span')
comments = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[1]/button/div/div[2]/span/span/span')
retweets = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[2]/button/div/div[2]/span/span/span')
likes = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[3]/button/div/div[2]/span/span/span')
impressions = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[4]/a/div/div[2]/span/span/span')
min_length = min(len(display_names), len(usernames), len(posts), len(comments), len(retweets), len(likes),
len(impressions))
for each in range(min_length):
results.append({
'Username': usernames[each].text,
'displayName': display_names[each].text,
'Post': posts[each].text.rstrip("Show more"),
'Comments': 0 if comments[each].text == "" else convert_to_numeric(comments[each].text),
'Retweets': 0 if retweets[each].text == "" else convert_to_numeric(retweets[each].text),
'Likes': 0 if likes[each].text == "" else convert_to_numeric(likes[each].text),
'Impressions': 0 if impressions[each].text == "" else convert_to_numeric(impressions[each].text)
})
def reorder_json_by_impressions(json_data):
# Ordinare l'elenco JSON in base a 'Impressioni' in ordine decrescente.
json_data.sort(key=lambda x: int(x['Impressions']), reverse=True)
def organize_write_data(data: dict):
output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
try:
with open("result.json", 'w', encoding='utf-8') as file:
file.write(output)
except Exception as err:
print(f"Error encountered: {err}")
def convert_to_numeric(value):
multipliers = {'K': 10 ** 3, 'M': 10 ** 6, 'B': 10 ** 9}
try:
if value[-1] in multipliers:
return int(float(value[:-1]) * multipliers[value[-1]])
else:
return int(value)
except ValueError:
# Gestire il caso in cui la conversione non riesce
return None
Per organizzare meglio i dati, abbiamo creato una funzione che prende i risultati e ordina i tweet in ordine decrescente utilizzando il numero di impressioni raccolte da ciascun tweet. Logicamente, vogliamo vedere prima il tweet con il numero di vanità più alto e poi gli altri.
def reorder_json_by_impressions(json_data):
# Ordinare l'elenco JSON in base a 'Impressioni' in ordine decrescente.
json_data.sort(key=lambda x:int(x['Impressions']), reverse=True)
Un file JSON è il modo migliore per visualizzare tutti i dati raccolti. Scrivere su un file JSON è come scrivere su qualsiasi altro file in Python. L'unica differenza è che abbiamo bisogno del modulo JSON per formattare correttamente i dati prima che vengano scritti sul file.
Se il codice è stato eseguito correttamente, si dovrebbe vedere un file result.json nella struttura dei file e al suo interno dovrebbe esserci il risultato mostrato nella sezione sottostante.
def organize_write_data(data:dict):
output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
try:
with open("result.json", 'w', encoding='utf-8') as file:
file.write(output)
except Exception as err:
print(f"Error encountered: {err}")
Per iniziare l'esecuzione del codice, dobbiamo chiamare le nostre funzioni in sequenza per iniziare lo scraping dei dati. Creiamo un riferimento utilizzando il modulo ActionChains di Selenium per facilitare le varie azioni di Selenium. Questo modulo si rivela fondamentale per simulare lo scorrimento della pagina.
Il primo ciclo prevede lo scraping dei dati dalla pagina attualmente caricata. Successivamente, viene avviato un ciclo che itera cinque volte, durante il quale la pagina viene fatta scorrere verso il basso, seguito da una pausa di cinque secondi prima della successiva iterazione di scraping.
Gli utenti possono regolare l'intervallo del ciclo, aumentandolo o diminuendolo per personalizzare il volume di dati scrapati. È fondamentale notare che se non ci sono contenuti aggiuntivi da visualizzare, lo script continuerà a eseguire lo scraping degli stessi dati, con conseguente ridondanza. Per evitare ciò, regolare l'intervallo del ciclo di conseguenza per evitare la registrazione di dati ridondanti.
actions = ActionChains(driver)
for i in range(5):
actions.send_keys(Keys.END).perform()
sleep(5)
scrape()
reorder_json_by_impressions(results)
organize_write_data(results)
print(f"Scraping Information on {search_keyword} is done.")
driver.quit()
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import json
import undetected_chromedriver as uc
import random
import ssl
ssl._create_default_https_context = ssl._create_stdlib_context
search_keyword = input("What topic on X/Twitter would you like to gather data on?\n").replace(' ', '%20')
constructed_url = f"https://twitter.com/search?q={search_keyword}&src=typed_query&f=top"
# fornire qui il nome utente e la password di X/Twitter
x_username = ""
x_password = ""
print(f'Opening {constructed_url} in Chrome...')
# Specificare l'indirizzo del server proxy con nome utente e password in un Elenco di proxy
proxies = [
"USERNAME:PASSWORD@IP:PORT",
]
# per ottenere un proxy casuale
def get_proxy():
return random.choice(proxies)
# Impostare le opzioni di Chrome con il proxy e l'autenticazione
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument('--ignore-ssl-errors')
proxy = get_proxy()
proxy_options = {
"proxy": {
"http": f"http://{proxy}",
"https": f"https://{proxy}",
}
}
# Creare un'istanza di WebDriver con il driver di chrome non rilevato
driver = uc.Chrome(options=chrome_options, seleniumwire_options=proxy_options)
driver.get(constructed_url)
try:
element = WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.XPATH, "//div[@class='css-175oi2r r-1mmae3n r-1e084wi r-13qz1uu']"))
)
except Exception as err:
print(f'WebDriver Wait Error: Most likely Network TimeOut: Details\n{err}')
driver.quit()
# Accedi
if element:
username_field = driver.find_element(By.XPATH,
"//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']")
username_field.send_keys(x_username)
username_field.send_keys(Keys.ENTER)
password_field = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH,
"//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']"))
)
password_field.send_keys(x_password)
password_field.send_keys(Keys.ENTER)
print("Sign In Successful...\n")
sleep(10)
results = []
# Raschiare
def scrape():
display_names = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[1]/div/a/div/div[1]/span/span')
usernames = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[2]/div/div[1]/a/div/span')
posts = driver.find_elements(By.XPATH,
'//*[@class="css-146c3p1 r-8akbws r-krxsd3 r-dnmrzs r-1udh08x r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-bnwqim"]/span')
comments = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[1]/button/div/div[2]/span/span/span')
retweets = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[2]/button/div/div[2]/span/span/span')
likes = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[3]/button/div/div[2]/span/span/span')
impressions = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[4]/a/div/div[2]/span/span/span')
min_length = min(len(display_names), len(usernames), len(posts), len(comments), len(retweets), len(likes),
len(impressions))
for each in range(min_length):
results.append({
'Username': usernames[each].text,
'displayName': display_names[each].text,
'Post': posts[each].text.rstrip("Show more"),
'Comments': 0 if comments[each].text == "" else convert_to_numeric(comments[each].text),
'Retweets': 0 if retweets[each].text == "" else convert_to_numeric(retweets[each].text),
'Likes': 0 if likes[each].text == "" else convert_to_numeric(likes[each].text),
'Impressions': 0 if impressions[each].text == "" else convert_to_numeric(impressions[each].text)
})
def reorder_json_by_impressions(json_data):
# Ordinare l'elenco JSON in base a 'Impressioni' in ordine decrescente.
json_data.sort(key=lambda x: int(x['Impressions']), reverse=True)
def organize_write_data(data: dict):
output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
try:
with open("result.json", 'w', encoding='utf-8') as file:
file.write(output)
except Exception as err:
print(f"Error encountered: {err}")
def convert_to_numeric(value):
multipliers = {'K': 10 ** 3, 'M': 10 ** 6, 'B': 10 ** 9}
try:
if value[-1] in multipliers:
return int(float(value[:-1]) * multipliers[value[-1]])
else:
return int(value)
except ValueError:
# Handle the case where the conversion fails
return None
actions = ActionChains(driver)
for i in range(5):
actions.send_keys(Keys.END).perform()
sleep(5)
scrape()
reorder_json_by_impressions(results)
organize_write_data(results)
print(f"Scraping Information on {search_keyword} is done.")
driver.quit()
Ecco come dovrebbe essere il file JSON al termine dello scraping:
[
{
"Username": "@LindaEvelyn_N",
"displayName": "Linda Evelyn Namulindwa",
"Post": "Still getting used to Ugandan local foods so I had Glovo deliver me a KFC Streetwise Spicy rice meal (2 pcs of chicken & jollof rice at Ugx 18,000)\n\nNot only was it fast but it also accepts all payment methods.\n\n#GlovoDeliversKFC\n#ItsFingerLinkingGood",
"Comments": 105,
"Retweets": 148,
"Likes": 1500,
"Impressions": 66000
},
{
"Username": "@GymCheff",
"displayName": "The Gym Chef",
"Post": "Delicious High Protein KFC Zinger Rice Box!",
"Comments": 1,
"Retweets": 68,
"Likes": 363,
"Impressions": 49000
}
]
La guida descritta può essere utilizzata per raccogliere dati su vari argomenti di interesse, facilitando gli studi sull'analisi del sentiment pubblico, il monitoraggio delle tendenze e la gestione della reputazione. Python, a sua volta, semplifica il processo di raccolta automatica dei dati grazie alla sua vasta gamma di moduli e funzioni integrate. Questi strumenti sono essenziali per configurare i proxy, gestire lo scorrimento delle pagine e organizzare efficacemente le informazioni raccolte.
Мы получили вашу заявку!
Ответ будет отправлен на почту в ближайшее время.
С уважением proxy-seller.com!
Commenti: 0