Stworzenie skryptu Python do skrobania danych z Twittera jest rzeczywiście przydatne do zbierania spostrzeżeń, takich jak opinie użytkowników lub dyskusje na określone tematy, co może znacznie pomóc w marketingu i badaniach. Automatyzacja przy użyciu takich skryptów usprawnia proces zbierania danych, czyniąc go szybkim i wydajnym.
Istnieją 2 pakiety, które należy zainstalować przed rozpoczęciem pisania właściwego kodu. Do zainstalowania tych pakietów potrzebny jest również menedżer pakietów Pythona (PIP). Na szczęście po zainstalowaniu Pythona na komputerze instalowany jest również PIP. Aby zainstalować te pakiety, wystarczy uruchomić poniższe polecenie w interfejsie wiersza poleceń (CLI).
pip install selenium-wire selenium undetected-chromedriver
Po zakończeniu instalacji należy zaimportować te pakiety do pliku Pythona, jak pokazano poniżej.
from seleniumwire import webdriver as wiredriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import json
import undetected_chromedriver as uc
import random
Import ssl
Kilkakrotnie ustalono, że korzystanie z serwera proxy podczas skrobania jest ważne. Twitter jest jedną z platform mediów społecznościowych, która nie pochwala skrobania danych i aby być bezpiecznym i uniknąć bana, należy korzystać z serwera proxy.
Wszystko, co musisz zrobić, to podać adres proxy, nazwę użytkownika proxy i hasło, a twoje IP powinno być teraz zamaskowane i chronione. Uruchamianie przeglądarki bezgłowej, w zasadzie tak samo jak uruchamianie przeglądarki bez interfejsu, pomaga przyspieszyć proces skrobania, dlatego też dodaliśmy flagę headless w opcjach.
# Określ adres serwera proxy z nazwą użytkownika i hasłem na liście serwerów proxy.
proxies = [
"proxy_username:proxy_password@proxy_address:port_number",
]
# funkcja do uzyskania losowego proxy
def get_proxy():
return random.choice(proxies)
# Skonfiguruj opcje Chrome z proxy i uwierzytelnianiem
chrome_options = Options()
chrome_options.add_argument("--headless")
proxy = get_proxy()
proxy_options = {
"proxy": {
"http": f"http://{proxy}",
"https": f"https://{proxy}",
}
}
Aby skutecznie skrobać dane z Twittera za pomocą Pythona, skrypt wymaga poświadczeń dostępu do konta na Twitterze, w tym nazwy użytkownika i hasła.
Dodatkowo należy określić słowo kluczowe wyszukiwania. Skrypt używa polecenia https://twitter.com/search?q={search_keyword}&src=typed_query&f=top do skonstruowania adresu URL, który umożliwia wyszukiwanie tego słowa kluczowego na Twitterze.
Następnym krokiem jest utworzenie instancji ChromeDriver, włączając szczegóły proxy jako opcję. Ta konfiguracja kieruje ChromeDriver do używania określonego adresu IP podczas ładowania strony. Po tej konfiguracji adres URL wyszukiwania jest ładowany z tymi konfiguracjami. Po załadowaniu strony należy się zalogować, aby uzyskać dostęp do wyników wyszukiwania. Korzystając z WebDriverWait, skrypt weryfikuje, czy strona jest w pełni załadowana, sprawdzając obecność obszaru wprowadzania nazwy użytkownika. Jeśli obszar ten nie zostanie załadowany, zaleca się zamknięcie instancji ChromeDriver.
search_keyword = input("What topic on X/Twitter would you like to gather data on?\n").replace(' ', '%20')
constructed_url = f"https://twitter.com/search?q={search_keyword}&src=typed_query&f=top"
# podaj tutaj swoją nazwę użytkownika i hasło do serwisu X/Twitter
x_username = ""
x_password = ""
print(f'Opening {constructed_url} in Chrome...')
# Tworzenie instancji WebDriver z niewykrytym sterownikiem chrome
driver = uc.Chrome(options=chrome_options, seleniumwire_options=proxy_options)
driver.get(constructed_url)
try:
element = WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.XPATH, "//div[@class='css-175oi2r r-1mmae3n r-1e084wir-13qz1uu']"))
)
except Exception as err:
print(f'WebDriver Wait Error: Most likely Network TimeOut: Details\n{err}')
driver.quit()
#Zaloguj się
if element:
username_field = driver.find_element(By.XPATH, "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']")
username_field.send_keys(x_username)
username_field..send_keys(Keys.ENTER)
password_field = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']"))
)
password_field.send_keys(x_password)
password_field.send_keys(Keys.ENTER)
print("Sign In Successful...\n")
sleep(10)
Utwórz zmienną listy, results, aby systematycznie przechowywać wszystkie zebrane dane w formacie słowników. Następnie utwórz funkcję o nazwie scrape(), aby systematycznie zbierać bogactwo danych dla każdego tweeta, obejmując kluczowe szczegóły, takie jak nazwa wyświetlana, nazwa użytkownika, treść posta i wskaźniki, takie jak polubienia i wyświetlenia.
Przyjęto proaktywne podejście, aby zagwarantować jednolitość długości list. Funkcja min() zapewnia, że długość każdej listy jest zgodna z pozostałymi. Przestrzegając tej metodologii, zapewniamy zsynchronizowane i ustrukturyzowane podejście do gromadzenia i przetwarzania danych z Twittera.
Kiedy zeskrobujemy numery/metryki vanity, są one zwracane jako ciągi znaków, a nie jako liczby. Następnie musimy przekonwertować ciągi na liczby za pomocą convert_to_numeric(), aby wynik mógł być uporządkowany według wyświetleń.
results = []
# Scrape
def scrape():
display_names = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[1]/div/a/div/div[1]/span/span')
usernames = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[2]/div/div[1]/a/div/span')
posts = driver.find_elements(By.XPATH,
'//*[@class="css-146c3p1 r-8akbws r-krxsd3 r-dnmrzs r-1udh08x r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-bnwqim"]/span')
comments = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[1]/button/div/div[2]/span/span/span')
retweets = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[2]/button/div/div[2]/span/span/span')
likes = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[3]/button/div/div[2]/span/span/span')
impressions = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[4]/a/div/div[2]/span/span/span')
min_length = min(len(display_names), len(usernames), len(posts), len(comments), len(retweets), len(likes),
len(impressions))
for each in range(min_length):
results.append({
'Username': usernames[each].text,
'displayName': display_names[each].text,
'Post': posts[each].text.rstrip("Show more"),
'Comments': 0 if comments[each].text == "" else convert_to_numeric(comments[each].text),
'Retweets': 0 if retweets[each].text == "" else convert_to_numeric(retweets[each].text),
'Likes': 0 if likes[each].text == "" else convert_to_numeric(likes[each].text),
'Impressions': 0 if impressions[each].text == "" else convert_to_numeric(impressions[each].text)
})
def reorder_json_by_impressions(json_data):
# Posortuj listę JSON w miejscu na podstawie "Impressions" w porządku malejącym.
json_data.sort(key=lambda x: int(x['Impressions']), reverse=True)
def organize_write_data(data: dict):
output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
try:
with open("result.json", 'w', encoding='utf-8') as file:
file.write(output)
except Exception as err:
print(f"Error encountered: {err}")
def convert_to_numeric(value):
multipliers = {'K': 10 ** 3, 'M': 10 ** 6, 'B': 10 ** 9}
try:
if value[-1] in multipliers:
return int(float(value[:-1]) * multipliers[value[-1]])
else:
return int(value)
except ValueError:
# Obsługa przypadku, gdy konwersja nie powiedzie się
return None
Aby lepiej uporządkować dane, stworzyliśmy funkcję, która pobiera wyniki i sortuje tweety w kolejności malejącej na podstawie liczby wyświetleń zebranych przez każdy tweet. Logicznie rzecz biorąc, chcemy zobaczyć tweet z najwyższą liczbą wyświetleń przed innymi.
def reorder_json_by_impressions(json_data):
# Posortuj listę JSON w miejscu na podstawie "Impressions" w porządku malejącym.
json_data.sort(key=lambda x:int(x['Impressions']), reverse=True)
Plik JSON to najlepszy sposób na wizualizację wszystkich zebranych danych. Zapis do pliku JSON jest taki sam jak zapis do dowolnego innego pliku w Pythonie. Jedyną różnicą jest to, że potrzebujemy modułu JSON do prawidłowego sformatowania danych przed ich zapisaniem do pliku.
Jeśli kod został uruchomiony poprawnie, w strukturze plików powinien pojawić się plik result.json, a w nim wynik pokazany w poniższej sekcji.
def organize_write_data(data:dict):
output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
try:
with open("result.json", 'w', encoding='utf-8') as file:
file.write(output)
except Exception as err:
print(f"Error encountered: {err}")
Aby rozpocząć wykonywanie kodu, musimy sekwencyjnie wywoływać nasze funkcje w celu rozpoczęcia skrobania danych. Tworzymy referencję za pomocą modułu ActionChains w Selenium, aby ułatwić różne działania Selenium. Moduł ten okazuje się kluczowy dla symulacji przewijania strony w dół.
Pierwsza runda obejmuje pobieranie danych z aktualnie załadowanej strony. Następnie inicjowana jest pętla, iterująca pięć razy, podczas której strona jest przewijana w dół, po czym następuje pięciosekundowa przerwa przed kolejną iteracją skrobania.
Użytkownicy mogą dostosować zakres pętli, zwiększając go lub zmniejszając, aby dostosować ilość skrobanych danych. Ważne jest, aby pamiętać, że jeśli nie ma dodatkowej zawartości do wyświetlenia, skrypt będzie stale pobierał te same dane, co spowoduje redundancję. Aby temu zapobiec, należy odpowiednio dostosować zakres pętli, aby uniknąć nadmiarowego rejestrowania danych.
actions = ActionChains(driver)
for i in range(5):
actions.send_keys(Keys.END).perform()
sleep(5)
scrape()
reorder_json_by_impressions(results)
organize_write_data(results)
print(f"Scraping Information on {search_keyword} is done.")
driver.quit()
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import json
import undetected_chromedriver as uc
import random
import ssl
ssl._create_default_https_context = ssl._create_stdlib_context
search_keyword = input("What topic on X/Twitter would you like to gather data on?\n").replace(' ', '%20')
constructed_url = f"https://twitter.com/search?q={search_keyword}&src=typed_query&f=top"
# podaj tutaj swoją nazwę użytkownika i hasło do serwisu X/Twitter
x_username = ""
x_password = ""
print(f'Opening {constructed_url} in Chrome...')
# Określ adres serwera proxy z nazwą użytkownika i hasłem na liście serwerów proxy.
proxies = [
"USERNAME:PASSWORD@IP:PORT",
]
# funkcja do uzyskania losowego proxy
def get_proxy():
return random.choice(proxies)
# Skonfiguruj opcje Chrome z proxy i uwierzytelnianiem
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument('--ignore-ssl-errors')
proxy = get_proxy()
proxy_options = {
"proxy": {
"http": f"http://{proxy}",
"https": f"https://{proxy}",
}
}
# Tworzenie instancji WebDriver z niewykrytym sterownikiem chrome
driver = uc.Chrome(options=chrome_options, seleniumwire_options=proxy_options)
driver.get(constructed_url)
try:
element = WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.XPATH, "//div[@class='css-175oi2r r-1mmae3n r-1e084wi r-13qz1uu']"))
)
except Exception as err:
print(f'WebDriver Wait Error: Most likely Network TimeOut: Details\n{err}')
driver.quit()
# Zaloguj się
if element:
username_field = driver.find_element(By.XPATH,
"//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']")
username_field.send_keys(x_username)
username_field.send_keys(Keys.ENTER)
password_field = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH,
"//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']"))
)
password_field.send_keys(x_password)
password_field.send_keys(Keys.ENTER)
print("Sign In Successful...\n")
sleep(10)
results = []
# Scrape
def scrape():
display_names = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[1]/div/a/div/div[1]/span/span')
usernames = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[2]/div/div[1]/a/div/span')
posts = driver.find_elements(By.XPATH,
'//*[@class="css-146c3p1 r-8akbws r-krxsd3 r-dnmrzs r-1udh08x r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-bnwqim"]/span')
comments = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[1]/button/div/div[2]/span/span/span')
retweets = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[2]/button/div/div[2]/span/span/span')
likes = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[3]/button/div/div[2]/span/span/span')
impressions = driver.find_elements(By.XPATH,
'//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[4]/a/div/div[2]/span/span/span')
min_length = min(len(display_names), len(usernames), len(posts), len(comments), len(retweets), len(likes),
len(impressions))
for each in range(min_length):
results.append({
'Username': usernames[each].text,
'displayName': display_names[each].text,
'Post': posts[each].text.rstrip("Show more"),
'Comments': 0 if comments[each].text == "" else convert_to_numeric(comments[each].text),
'Retweets': 0 if retweets[each].text == "" else convert_to_numeric(retweets[each].text),
'Likes': 0 if likes[each].text == "" else convert_to_numeric(likes[each].text),
'Impressions': 0 if impressions[each].text == "" else convert_to_numeric(impressions[each].text)
})
def reorder_json_by_impressions(json_data):
# Posortuj listę JSON w miejscu na podstawie "Impressions" w porządku malejącym.
json_data.sort(key=lambda x: int(x['Impressions']), reverse=True)
def organize_write_data(data: dict):
output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
try:
with open("result.json", 'w', encoding='utf-8') as file:
file.write(output)
except Exception as err:
print(f"Error encountered: {err}")
def convert_to_numeric(value):
multipliers = {'K': 10 ** 3, 'M': 10 ** 6, 'B': 10 ** 9}
try:
if value[-1] in multipliers:
return int(float(value[:-1]) * multipliers[value[-1]])
else:
return int(value)
except ValueError:
# Obsługa przypadku, gdy konwersja nie powiedzie się
return None
actions = ActionChains(driver)
for i in range(5):
actions.send_keys(Keys.END).perform()
sleep(5)
scrape()
reorder_json_by_impressions(results)
organize_write_data(results)
print(f"Scraping Information on {search_keyword} is done.")
driver.quit()
Oto jak powinien wyglądać plik JSON po zakończeniu skrobania:
[
{
"Username": "@LindaEvelyn_N",
"displayName": "Linda Evelyn Namulindwa",
"Post": "Still getting used to Ugandan local foods so I had Glovo deliver me a KFC Streetwise Spicy rice meal (2 pcs of chicken & jollof rice at Ugx 18,000)\n\nNot only was it fast but it also accepts all payment methods.\n\n#GlovoDeliversKFC\n#ItsFingerLinkingGood",
"Comments": 105,
"Retweets": 148,
"Likes": 1500,
"Impressions": 66000
},
{
"Username": "@GymCheff",
"displayName": "The Gym Chef",
"Post": "Delicious High Protein KFC Zinger Rice Box!",
"Comments": 1,
"Retweets": 68,
"Likes": 363,
"Impressions": 49000
}
]
Przedstawiony przewodnik można wykorzystać do skrobania danych na różne interesujące tematy, ułatwiając badania w zakresie analizy nastrojów społecznych, śledzenia trendów, monitorowania i zarządzania reputacją. Python z kolei upraszcza proces automatycznego gromadzenia danych dzięki szerokiej gamie wbudowanych modułów i funkcji. Narzędzia te są niezbędne do konfigurowania serwerów proxy, zarządzania przewijaniem stron i efektywnego organizowania zebranych informacji.
Komentarze: 0