Comment récupérer des données Twitter à l'aide d'un script python

Commentaires: 0

La création d'un script Python pour récupérer les données de Twitter est en effet utile pour recueillir des informations, telles que des avis d'utilisateurs ou des discussions sur des sujets spécifiques, qui peuvent être d'une grande aide pour le marketing et la recherche. L'automatisation à l'aide de ces scripts rationalise le processus de collecte, le rendant rapide et efficace.

Étape 1 : Installations et importations

Il y a deux paquets que vous devez installer avant de commencer à écrire le code proprement dit. Vous avez également besoin d'un gestionnaire de paquets Python (PIP) pour installer ces paquets. Heureusement, une fois que vous avez installé Python sur votre machine, PIP est également installé. Pour installer ces paquets, il vous suffit d'exécuter la commande ci-dessous dans votre interface de ligne de commande (CLI).

pip install selenium-wire selenium undetected-chromedriver

Une fois l'installation terminée, vous devez importer ces paquets dans votre fichier Python, comme indiqué ci-dessous.

from seleniumwire import webdriver as wiredriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import json
import undetected_chromedriver as uc
import random
Import ssl
  • Seleniumwire : améliore Selenium en ajoutant la possibilité de configurer directement les proxies, ce qui est crucial pour éviter les blocages lors des activités de scraping.
  • Selenium : facilite le scraping de données grâce à des outils tels que ActionChains et "Keys" pour simuler les actions du navigateur, "By" pour la recherche d'éléments, "WebDriverWait" et "expected_conditions" pour l'exécution basée sur des conditions.
  • Undetected Chromedriver : modifie ChromeDriver pour l'utiliser avec Selenium Wire afin de contourner les mécanismes de détection des robots sur les sites web, réduisant ainsi les risques de blocage.
  • time, random, json : bibliothèques Python standard pour gérer la durée des opérations et traiter les données au format JSON.

Étape 2 : Initialisation du proxy

Il a été établi à plusieurs reprises qu'il est important d'utiliser un proxy pendant le scraping. Twitter est l'une des plateformes de médias sociaux qui désapprouve le scraping de données et, pour être sûr et éviter une interdiction, vous devriez utiliser un proxy.

Tout ce que vous avez à faire est de fournir votre adresse proxy, votre nom d'utilisateur proxy et votre mot de passe et votre IP devrait maintenant être masquée et protégée. L'exécution d'un navigateur sans tête, ce qui revient à exécuter un navigateur sans interface, permet d'accélérer le processus de scraping, c'est pourquoi nous avons ajouté le drapeau headless dans les options.

# Spécifier l'adresse du serveur proxy avec le nom d'utilisateur et le mot de passe dans une liste de proxies
proxies = [
    "proxy_username:proxy_password@proxy_address:port_number",
]




# pour obtenir un proxy aléatoire
def get_proxy():
    return random.choice(proxies)


# Configurer les options de Chrome avec le proxy et l'authentification
chrome_options = Options()
chrome_options.add_argument("--headless")


proxy = get_proxy()
proxy_options = {
    "proxy": {
        "http": f"http://{proxy}",
        "https": f"https://{proxy}",
    }
}

Étape 3 : Comment se connecter à X/Twitter

Pour extraire efficacement les données de Twitter à l'aide de Python, le script a besoin des informations d'accès au compte Twitter, notamment le nom d'utilisateur et le mot de passe.

En outre, vous devez spécifier un mot-clé de recherche. Le script utilise la commande https://twitter.com/search?q={search_keyword}&src=typed_query&f=top pour construire une URL qui permet la recherche de ce mot clé sur Twitter.

L'étape suivante consiste à créer une instance de ChromeDriver, en incorporant les détails du proxy en tant qu'option. Cette configuration indique à ChromeDriver d'utiliser une adresse IP spécifique lors du chargement de la page. Après cette configuration, l'URL de recherche est chargée avec ces configurations. Une fois la page chargée, vous devez vous connecter pour accéder aux résultats de la recherche. À l'aide de WebDriverWait, le script vérifie que la page est entièrement chargée en contrôlant la présence de la zone d'entrée du nom d'utilisateur. Si cette zone ne se charge pas, il est conseillé de terminer l'instance de ChromeDriver.

search_keyword = input("What topic on X/Twitter would you like to gather data on?\n").replace(' ', '%20')
constructed_url = f"https://twitter.com/search?q={search_keyword}&src=typed_query&f=top"


# indiquez votre nom d'utilisateur et votre mot de passe X/Twitter ici
x_username = "" 
x_password = ""


print(f'Opening {constructed_url} in Chrome...')


# Créer une instance de WebDriver avec un pilote chrome non détecté
driver = uc.Chrome(options=chrome_options, seleniumwire_options=proxy_options)


driver.get(constructed_url)


try:
    element = WebDriverWait(driver, 20).until(
        EC.presence_of_element_located((By.XPATH, "//div[@class='css-175oi2r r-1mmae3n r-1e084wir-13qz1uu']"))
    )
except Exception as err:
    print(f'WebDriver Wait Error: Most likely Network TimeOut: Details\n{err}')
    driver.quit()


#S'inscrire
if element:
    username_field = driver.find_element(By.XPATH, "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']")
    username_field.send_keys(x_username)
    username_field..send_keys(Keys.ENTER)


    password_field = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.XPATH, "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']"))
    )
    password_field.send_keys(x_password)
    password_field.send_keys(Keys.ENTER)


    print("Sign In Successful...\n")


    sleep(10)

Étape 4 : Extraire les meilleurs résultats

Créez une variable de liste, results, pour stocker systématiquement toutes les données glanées dans le format des dictionnaires. Ensuite, créez une fonction nommée scrape() pour collecter systématiquement une multitude de données pour chaque tweet, comprenant des détails cruciaux comme le nom d'affichage, le nom d'utilisateur, le contenu du message et des métriques telles que les likes et les impressions.

Une approche proactive a été adoptée pour garantir l'uniformité des longueurs des listes. La fonction min() permet de s'assurer que la longueur de chaque liste est alignée sur les autres. En adhérant à cette méthodologie, nous garantissons une approche synchronisée et structurée de la collecte et du traitement des données Twitter.

Lorsque nous récupérons les numéros de vanité/métriques, ils sont renvoyés sous forme de chaînes et non de nombres. Nous devons donc convertir les chaînes en nombres en utilisant convert_to_numeric() afin que le résultat puisse être organisé par impressions.

results = []


# Grattage
def scrape():
   display_names = driver.find_elements(By.XPATH,
                                        '//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[1]/div/a/div/div[1]/span/span')
   usernames = driver.find_elements(By.XPATH,
                                    '//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[2]/div/div[1]/a/div/span')
   posts = driver.find_elements(By.XPATH,
                                '//*[@class="css-146c3p1 r-8akbws r-krxsd3 r-dnmrzs r-1udh08x r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-bnwqim"]/span')
   comments = driver.find_elements(By.XPATH,
                                   '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[1]/button/div/div[2]/span/span/span')
   retweets = driver.find_elements(By.XPATH,
                                   '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[2]/button/div/div[2]/span/span/span')
   likes = driver.find_elements(By.XPATH,
                                '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[3]/button/div/div[2]/span/span/span')
   impressions = driver.find_elements(By.XPATH,
                                      '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[4]/a/div/div[2]/span/span/span')

   min_length = min(len(display_names), len(usernames), len(posts), len(comments), len(retweets), len(likes),
                    len(impressions))

   for each in range(min_length):
       results.append({
           'Username': usernames[each].text,
           'displayName': display_names[each].text,
           'Post': posts[each].text.rstrip("Show more"),
           'Comments': 0 if comments[each].text == "" else convert_to_numeric(comments[each].text),
           'Retweets': 0 if retweets[each].text == "" else convert_to_numeric(retweets[each].text),
           'Likes': 0 if likes[each].text == "" else convert_to_numeric(likes[each].text),
           'Impressions': 0 if impressions[each].text == "" else convert_to_numeric(impressions[each].text)
       })


def reorder_json_by_impressions(json_data):
   # Trier la liste JSON sur place sur la base des "Impressions" dans l'ordre décroissant
   json_data.sort(key=lambda x: int(x['Impressions']), reverse=True)


def organize_write_data(data: dict):
   output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
   try:
       with open("result.json", 'w', encoding='utf-8') as file:
           file.write(output)
   except Exception as err:
       print(f"Error encountered: {err}")


def convert_to_numeric(value):
   multipliers = {'K': 10 ** 3, 'M': 10 ** 6, 'B': 10 ** 9}

   try:
       if value[-1] in multipliers:
           return int(float(value[:-1]) * multipliers[value[-1]])
       else:
           return int(value)
   except ValueError:
       # Gérer le cas où la conversion échoue
       return None

Étape 5 : Organiser les données

Pour mieux organiser les données, nous avons créé une fonction qui prend les résultats et trie les tweets par ordre décroissant en utilisant le nombre d'impressions recueillies par chaque tweet. Logiquement, nous voulons voir le tweet avec le plus grand nombre d'impressions avant les autres.

def reorder_json_by_impressions(json_data):
    # Trier la liste JSON sur place sur la base des "Impressions" dans l'ordre décroissant
    json_data.sort(key=lambda x:int(x['Impressions']), reverse=True)

Écrire dans un fichier JSON

Un fichier JSON est le meilleur moyen de visualiser toutes les données collectées. L'écriture dans un fichier JSON est identique à l'écriture dans n'importe quel autre fichier en Python. La seule différence est que nous avons besoin du module JSON pour formater correctement les données avant qu'elles ne soient écrites dans le fichier.

Si le code s'est exécuté correctement, vous devriez voir un fichier result.json dans la structure du fichier et dans celui-ci devrait se trouver le résultat comme indiqué dans la section ci-dessous.

def organize_write_data(data:dict):
    output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
    try:
        with open("result.json", 'w', encoding='utf-8') as file:
            file.write(output)
    except Exception as err:
        print(f"Error encountered: {err}") 

Pagination

Pour commencer l'exécution du code, nous devons appeler nos fonctions de manière séquentielle afin de commencer le scraping de données. Nous créons une référence à l'aide du module ActionChains de Selenium pour faciliter diverses actions Selenium. Ce module s'avère essentiel pour simuler le défilement vers le bas de la page.

Le premier cycle consiste à extraire des données de la page actuellement chargée. Ensuite, une boucle est lancée, itérant cinq fois, au cours desquelles la page défile vers le bas, suivie d'une pause de cinq secondes avant l'itération de scraping suivante.

Les utilisateurs peuvent ajuster la portée de la boucle, en l'augmentant ou en la diminuant pour personnaliser le volume de données récupérées. Il est essentiel de noter que s'il n'y a pas de contenu supplémentaire à afficher, le script récupérera constamment les mêmes données, ce qui entraînera une redondance. Pour éviter cela, ajustez la plage de la boucle en conséquence afin d'éviter l'enregistrement de données redondantes.

actions = ActionChains(driver)
for i in range(5):
    actions.send_keys(Keys.END).perform()
    sleep(5)
    scrape()


reorder_json_by_impressions(results)
organize_write_data(results)


print(f"Scraping Information on {search_keyword} is done.")


driver.quit()

Code complet

from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import json
import undetected_chromedriver as uc
import random
import ssl

ssl._create_default_https_context = ssl._create_stdlib_context


search_keyword = input("What topic on X/Twitter would you like to gather data on?\n").replace(' ', '%20')
constructed_url = f"https://twitter.com/search?q={search_keyword}&src=typed_query&f=top"

# indiquez votre nom d'utilisateur et votre mot de passe X/Twitter ici
x_username = ""
x_password = ""

print(f'Opening {constructed_url} in Chrome...')

# Spécifier l'adresse du serveur proxy avec le nom d'utilisateur et le mot de passe dans une liste de proxies
proxies = [
   "USERNAME:PASSWORD@IP:PORT",
]


# pour obtenir un proxy aléatoire
def get_proxy():
   return random.choice(proxies)


# Configurer les options de Chrome avec le proxy et l'authentification
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument('--ignore-ssl-errors')

proxy = get_proxy()
proxy_options = {
   "proxy": {
       "http": f"http://{proxy}",
       "https": f"https://{proxy}",
   }
}

# Créer une instance de WebDriver avec un pilote chrome non détecté
driver = uc.Chrome(options=chrome_options, seleniumwire_options=proxy_options)

driver.get(constructed_url)

try:
   element = WebDriverWait(driver, 20).until(
       EC.presence_of_element_located((By.XPATH, "//div[@class='css-175oi2r r-1mmae3n r-1e084wi r-13qz1uu']"))
   )
except Exception as err:
   print(f'WebDriver Wait Error: Most likely Network TimeOut: Details\n{err}')
   driver.quit()

# S'inscrire
if element:
   username_field = driver.find_element(By.XPATH,
                                        "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']")
   username_field.send_keys(x_username)
   username_field.send_keys(Keys.ENTER)

   password_field = WebDriverWait(driver, 10).until(
       EC.presence_of_element_located((By.XPATH,
                                       "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']"))
   )
   password_field.send_keys(x_password)
   password_field.send_keys(Keys.ENTER)

   print("Sign In Successful...\n")

   sleep(10)

results = []


# Scrape
def scrape():
   display_names = driver.find_elements(By.XPATH,
                                        '//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[1]/div/a/div/div[1]/span/span')
   usernames = driver.find_elements(By.XPATH,
                                    '//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[2]/div/div[1]/a/div/span')
   posts = driver.find_elements(By.XPATH,
                                '//*[@class="css-146c3p1 r-8akbws r-krxsd3 r-dnmrzs r-1udh08x r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-bnwqim"]/span')
   comments = driver.find_elements(By.XPATH,
                                   '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[1]/button/div/div[2]/span/span/span')
   retweets = driver.find_elements(By.XPATH,
                                   '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[2]/button/div/div[2]/span/span/span')
   likes = driver.find_elements(By.XPATH,
                                '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[3]/button/div/div[2]/span/span/span')
   impressions = driver.find_elements(By.XPATH,
                                      '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[4]/a/div/div[2]/span/span/span')

   min_length = min(len(display_names), len(usernames), len(posts), len(comments), len(retweets), len(likes),
                    len(impressions))

   for each in range(min_length):
       results.append({
           'Username': usernames[each].text,
           'displayName': display_names[each].text,
           'Post': posts[each].text.rstrip("Show more"),
           'Comments': 0 if comments[each].text == "" else convert_to_numeric(comments[each].text),
           'Retweets': 0 if retweets[each].text == "" else convert_to_numeric(retweets[each].text),
           'Likes': 0 if likes[each].text == "" else convert_to_numeric(likes[each].text),
           'Impressions': 0 if impressions[each].text == "" else convert_to_numeric(impressions[each].text)
       })


def reorder_json_by_impressions(json_data):
   # Trier la liste JSON sur place sur la base des "Impressions" dans l'ordre décroissant
   json_data.sort(key=lambda x: int(x['Impressions']), reverse=True)


def organize_write_data(data: dict):
   output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
   try:
       with open("result.json", 'w', encoding='utf-8') as file:
           file.write(output)
   except Exception as err:
       print(f"Error encountered: {err}")


def convert_to_numeric(value):
   multipliers = {'K': 10 ** 3, 'M': 10 ** 6, 'B': 10 ** 9}

   try:
       if value[-1] in multipliers:
           return int(float(value[:-1]) * multipliers[value[-1]])
       else:
           return int(value)
   except ValueError:
       # Gérer le cas où la conversion échoue
       return None


actions = ActionChains(driver)
for i in range(5):
   actions.send_keys(Keys.END).perform()
   sleep(5)
   scrape()

reorder_json_by_impressions(results)
organize_write_data(results)

print(f"Scraping Information on {search_keyword} is done.")

driver.quit()

Résultats finaux

Voici à quoi devrait ressembler le fichier JSON une fois le scraping effectué :

[
  {
    "Username": "@LindaEvelyn_N",
    "displayName": "Linda Evelyn Namulindwa",
    "Post": "Still getting used to Ugandan local foods so I had Glovo deliver me a KFC Streetwise Spicy rice meal (2 pcs of chicken & jollof rice at Ugx 18,000)\n\nNot only was it fast but it also accepts all payment methods.\n\n#GlovoDeliversKFC\n#ItsFingerLinkingGood",
    "Comments": 105,
    "Retweets": 148,
    "Likes": 1500,
    "Impressions": 66000
  },
  {
    "Username": "@GymCheff",
    "displayName": "The Gym Chef",
    "Post": "Delicious High Protein KFC Zinger Rice Box!",
    "Comments": 1,
    "Retweets": 68,
    "Likes": 363,
    "Impressions": 49000
  }
]

Le guide décrit peut être utilisé pour récupérer des données sur divers sujets d'intérêt, facilitant ainsi les études sur l'analyse des sentiments du public, le suivi des tendances, la surveillance et la gestion de la réputation. Python, quant à lui, simplifie le processus de collecte automatique de données grâce à son large éventail de modules et de fonctions intégrés. Ces outils sont essentiels pour configurer les proxys, gérer le défilement des pages et organiser efficacement les informations collectées.

Commentaires:

0 Commentaires