Cómo scrapear datos de Twitter con un script de Python

Comentarios: 0

Crear un script en Python para extraer datos de Twitter es muy útil para recopilar información, como opiniones de usuarios o debates sobre temas específicos, que pueden ser de gran ayuda en marketing e investigación. La automatización mediante este tipo de scripts agiliza el proceso de recopilación, haciéndolo rápido y eficiente.

Paso 1: Instalaciones e importaciones

Hay 2 paquetes que debes instalar antes de empezar a escribir el código real. También necesitas un gestor de paquetes para paquetes Python (PIP) para instalar estos paquetes. Afortunadamente, una vez que instalas Python en tu máquina, PIP se instala también. Para instalar estos paquetes, sólo necesitas ejecutar el siguiente comando en tu Interfaz de Línea de Comandos (CLI).

pip install selenium-wire selenium undetected-chromedriver

Una vez completada la instalación, debe importar estos paquetes en su archivo Python como se muestra a continuación.

from seleniumwire import webdriver as wiredriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import json
import undetected_chromedriver as uc
import random
Import ssl
  • Seleniumwire: mejora Selenium añadiendo la posibilidad de configurar proxies directamente, algo crucial para evitar bloqueos durante las actividades de scraping.
  • Selenium: facilita el raspado de datos con herramientas como ActionChains y "Keys" para simular acciones del navegador, "By" para la búsqueda de elementos, "WebDriverWait" y "expected_conditions" para la ejecución basada en condiciones.
  • Chromedriver no detectado: altera ChromeDriver para su uso con Selenium Wire para eludir los mecanismos de detección de bots en sitios web, reduciendo los riesgos de bloqueo.
  • time, random, json: bibliotecas estándar de Python para gestionar el tiempo de las operaciones y manejar datos en formato JSON.

Paso 2: Inicialización del proxy

Se ha establecido en varias ocasiones que el uso de un proxy durante el scraping es importante. Twitter es una de las plataformas de medios sociales que frunce el ceño en el raspado de datos y para estar seguro y evitar una prohibición, se debe utilizar un proxy.

Todo lo que tienes que hacer es proporcionar tu dirección proxy, el nombre de usuario y la contraseña del proxy y tu IP debería estar ahora enmascarada y protegida. Ejecutar un navegador headless, básicamente lo mismo que ejecutar un navegador sin interfaz, ayuda a acelerar el proceso de scraping, razón por la cual añadimos la bandera headless en las opciones.

# Especifique la dirección del servidor proxy con nombre de usuario y contraseña en una Lista de proxies
proxies = [
    "proxy_username:proxy_password@proxy_address:port_number",
]




# para obtener un proxy aleatorio
def get_proxy():
    return random.choice(proxies)


# Configurar las opciones de Chrome con el proxy y la autenticación
chrome_options = Options()
chrome_options.add_argument("--headless")


proxy = get_proxy()
proxy_options = {
    "proxy": {
        "http": f"http://{proxy}",
        "https": f"https://{proxy}",
    }
}

Paso 3: Cómo iniciar sesión en X/Twitter

Para raspar eficazmente los datos de Twitter utilizando Python, el script requiere credenciales de acceso a la cuenta de Twitter, incluidos el nombre de usuario y la contraseña.

Además, debe especificar una palabra clave de búsqueda. El script utiliza el comando https://twitter.com/search?q={search_keyword}&src=typed_query&f=top para construir una URL que permita la búsqueda de esta palabra clave en Twitter.

El siguiente paso consiste en crear una instancia de ChromeDriver, incorporando los detalles del proxy como opción. Esta configuración indica a ChromeDriver que utilice una dirección IP específica al cargar la página. Tras esta configuración, se carga la URL de búsqueda con estas configuraciones. Una vez cargada la página, debes iniciar sesión para acceder a los resultados de la búsqueda. Utilizando WebDriverWait, el script verifica que la página está completamente cargada comprobando la presencia del área de entrada del nombre de usuario. Si esta área no se carga, es aconsejable finalizar la instancia de ChromeDriver.

search_keyword = input("What topic on X/Twitter would you like to gather data on?\n").replace(' ', '%20')
constructed_url = f"https://twitter.com/search?q={search_keyword}&src=typed_query&f=top"


# indique aquí su nombre de usuario y contraseña de X/Twitter
x_username = "" 
x_password = ""


print(f'Opening {constructed_url} in Chrome...')


# Crear una instancia WebDriver con un controlador Chrome no detectado
driver = uc.Chrome(options=chrome_options, seleniumwire_options=proxy_options)


driver.get(constructed_url)


try:
    element = WebDriverWait(driver, 20).until(
        EC.presence_of_element_located((By.XPATH, "//div[@class='css-175oi2r r-1mmae3n r-1e084wir-13qz1uu']"))
    )
except Exception as err:
    print(f'WebDriver Wait Error: Most likely Network TimeOut: Details\n{err}')
    driver.quit()


#Iniciar sesión
if element:
    username_field = driver.find_element(By.XPATH, "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']")
    username_field.send_keys(x_username)
    username_field..send_keys(Keys.ENTER)


    password_field = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.XPATH, "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']"))
    )
    password_field.send_keys(x_password)
    password_field.send_keys(Keys.ENTER)


    print("Sign In Successful...\n")


    sleep(10)

Paso 4: Extracción de los mejores resultados

Crea una variable de lista, results, para almacenar sistemáticamente todos los datos recogidos en formato de diccionario. A continuación, crea una función llamada scrape() para recopilar sistemáticamente una gran cantidad de datos de cada tuit, incluidos detalles cruciales como el nombre de usuario, el contenido de la publicación y métricas como los "me gusta" y las impresiones.

Se ha adoptado un enfoque proactivo para garantizar la uniformidad de las longitudes de las listas. La función min() garantiza que la longitud de cada lista coincida con la de las demás. Siguiendo esta metodología, garantizamos un enfoque sincronizado y estructurado para recopilar y procesar los datos de Twitter.

Cuando obtenemos los números de vanidad o las métricas, se devuelven como cadenas, no como números. Entonces, tenemos que convertir las cadenas en números utilizando convert_to_numeric() para que el resultado se pueda organizar por impresiones.

results = []


# Raspar
def scrape():
   display_names = driver.find_elements(By.XPATH,
                                        '//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[1]/div/a/div/div[1]/span/span')
   usernames = driver.find_elements(By.XPATH,
                                    '//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[2]/div/div[1]/a/div/span')
   posts = driver.find_elements(By.XPATH,
                                '//*[@class="css-146c3p1 r-8akbws r-krxsd3 r-dnmrzs r-1udh08x r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-bnwqim"]/span')
   comments = driver.find_elements(By.XPATH,
                                   '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[1]/button/div/div[2]/span/span/span')
   retweets = driver.find_elements(By.XPATH,
                                   '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[2]/button/div/div[2]/span/span/span')
   likes = driver.find_elements(By.XPATH,
                                '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[3]/button/div/div[2]/span/span/span')
   impressions = driver.find_elements(By.XPATH,
                                      '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[4]/a/div/div[2]/span/span/span')

   min_length = min(len(display_names), len(usernames), len(posts), len(comments), len(retweets), len(likes),
                    len(impressions))

   for each in range(min_length):
       results.append({
           'Username': usernames[each].text,
           'displayName': display_names[each].text,
           'Post': posts[each].text.rstrip("Show more"),
           'Comments': 0 if comments[each].text == "" else convert_to_numeric(comments[each].text),
           'Retweets': 0 if retweets[each].text == "" else convert_to_numeric(retweets[each].text),
           'Likes': 0 if likes[each].text == "" else convert_to_numeric(likes[each].text),
           'Impressions': 0 if impressions[each].text == "" else convert_to_numeric(impressions[each].text)
       })


def reorder_json_by_impressions(json_data):
   # Ordenar la lista JSON en el lugar basado en "Impresiones" en orden descendente
   json_data.sort(key=lambda x: int(x['Impressions']), reverse=True)


def organize_write_data(data: dict):
   output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
   try:
       with open("result.json", 'w', encoding='utf-8') as file:
           file.write(output)
   except Exception as err:
       print(f"Error encountered: {err}")


def convert_to_numeric(value):
   multipliers = {'K': 10 ** 3, 'M': 10 ** 6, 'B': 10 ** 9}

   try:
       if value[-1] in multipliers:
           return int(float(value[:-1]) * multipliers[value[-1]])
       else:
           return int(value)
   except ValueError:
       # Gestionar el caso de que falle la conversión
       return None

Paso 5: Organizar los datos

Para organizar mejor los datos, creamos una función que toma los resultados y ordena los tuits en orden descendente utilizando el número de impresiones recogidas por cada tuit. Lógicamente, queremos ver primero el tuit con el mayor número de vanidades antes que los demás.

def reorder_json_by_impressions(json_data):
    # Ordenar la lista JSON en el lugar basado en "Impresiones" en orden descendente
    json_data.sort(key=lambda x:int(x['Impressions']), reverse=True)

Escribir en un archivo JSON

Un archivo JSON es la mejor manera de visualizar todos los datos recogidos. Escribir en un archivo JSON es como escribir en cualquier otro archivo en Python. La única diferencia es que necesitamos el módulo JSON para formatear correctamente los datos antes de escribirlos en el archivo.

Si el código se ejecutó correctamente, debería ver un archivo result.json en la estructura de archivos y en él debería estar el resultado como se muestra en la sección de abajo.

def organize_write_data(data:dict):
    output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
    try:
        with open("result.json", 'w', encoding='utf-8') as file:
            file.write(output)
    except Exception as err:
        print(f"Error encountered: {err}") 

Paginación

Para comenzar la ejecución del código, necesitamos llamar a nuestras funciones secuencialmente para comenzar el raspado de datos. Creamos una referencia utilizando el módulo ActionChains dentro de Selenium para facilitar varias acciones de Selenium. Este módulo resulta fundamental para simular el desplazamiento hacia abajo en la página.

La primera ronda consiste en extraer datos de la página cargada en ese momento. Posteriormente, se inicia un bucle que itera cinco veces, durante el cual la página se desplaza hacia abajo, seguido de una pausa de cinco segundos antes de la siguiente iteración de raspado.

Los usuarios pueden ajustar el alcance del bucle, aumentándolo o disminuyéndolo para personalizar el volumen de datos raspados. Es crucial tener en cuenta que si no hay contenido adicional que mostrar, el script raspará persistentemente los mismos datos, dando lugar a redundancia. Para evitarlo, ajuste el rango del bucle en consecuencia para evitar el registro de datos redundantes.

actions = ActionChains(driver)
for i in range(5):
    actions.send_keys(Keys.END).perform()
    sleep(5)
    scrape()


reorder_json_by_impressions(results)
organize_write_data(results)


print(f"Scraping Information on {search_keyword} is done.")


driver.quit()

Código completo

from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from time import sleep
import json
import undetected_chromedriver as uc
import random
import ssl

ssl._create_default_https_context = ssl._create_stdlib_context


search_keyword = input("What topic on X/Twitter would you like to gather data on?\n").replace(' ', '%20')
constructed_url = f"https://twitter.com/search?q={search_keyword}&src=typed_query&f=top"

# indique aquí su nombre de usuario y contraseña de X/Twitter
x_username = ""
x_password = ""

print(f'Opening {constructed_url} in Chrome...')

# Especifique la dirección del servidor proxy con nombre de usuario y contraseña en una Lista de proxies
proxies = [
   "USERNAME:PASSWORD@IP:PORT",
]


# para obtener un proxy aleatorio
def get_proxy():
   return random.choice(proxies)


# Configurar las opciones de Chrome con el proxy y la autenticación
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument('--ignore-certificate-errors')
chrome_options.add_argument('--ignore-ssl-errors')

proxy = get_proxy()
proxy_options = {
   "proxy": {
       "http": f"http://{proxy}",
       "https": f"https://{proxy}",
   }
}

# Crear una instancia WebDriver con un controlador Chrome no detectado
driver = uc.Chrome(options=chrome_options, seleniumwire_options=proxy_options)

driver.get(constructed_url)

try:
   element = WebDriverWait(driver, 20).until(
       EC.presence_of_element_located((By.XPATH, "//div[@class='css-175oi2r r-1mmae3n r-1e084wi r-13qz1uu']"))
   )
except Exception as err:
   print(f'WebDriver Wait Error: Most likely Network TimeOut: Details\n{err}')
   driver.quit()

# Iniciar sesión
if element:
   username_field = driver.find_element(By.XPATH,
                                        "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']")
   username_field.send_keys(x_username)
   username_field.send_keys(Keys.ENTER)

   password_field = WebDriverWait(driver, 10).until(
       EC.presence_of_element_located((By.XPATH,
                                       "//input[@class='r-30o5oe r-1dz5y72 r-13qz1uu r-1niwhzg r-17gur6a r-1yadl64 r-deolkf r-homxoj r-poiln3 r-7cikom r-1ny4l3l r-t60dpp r-fdjqy7']"))
   )
   password_field.send_keys(x_password)
   password_field.send_keys(Keys.ENTER)

   print("Sign In Successful...\n")

   sleep(10)

results = []


# Raspar
def scrape():
   display_names = driver.find_elements(By.XPATH,
                                        '//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[1]/div/a/div/div[1]/span/span')
   usernames = driver.find_elements(By.XPATH,
                                    '//*[@class="css-175oi2r r-1wbh5a2 r-dnmrzs r-1ny4l3l r-1awozwy r-18u37iz"]/div[2]/div/div[1]/a/div/span')
   posts = driver.find_elements(By.XPATH,
                                '//*[@class="css-146c3p1 r-8akbws r-krxsd3 r-dnmrzs r-1udh08x r-bcqeeo r-1ttztb7 r-qvutc0 r-37j5jr r-a023e6 r-rjixqe r-16dba41 r-bnwqim"]/span')
   comments = driver.find_elements(By.XPATH,
                                   '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[1]/button/div/div[2]/span/span/span')
   retweets = driver.find_elements(By.XPATH,
                                   '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[2]/button/div/div[2]/span/span/span')
   likes = driver.find_elements(By.XPATH,
                                '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[3]/button/div/div[2]/span/span/span')
   impressions = driver.find_elements(By.XPATH,
                                      '//*[@class="css-175oi2r r-1kbdv8c r-18u37iz r-1wtj0ep r-1ye8kvj r-1s2bzr4"]/div[4]/a/div/div[2]/span/span/span')

   min_length = min(len(display_names), len(usernames), len(posts), len(comments), len(retweets), len(likes),
                    len(impressions))

   for each in range(min_length):
       results.append({
           'Username': usernames[each].text,
           'displayName': display_names[each].text,
           'Post': posts[each].text.rstrip("Show more"),
           'Comments': 0 if comments[each].text == "" else convert_to_numeric(comments[each].text),
           'Retweets': 0 if retweets[each].text == "" else convert_to_numeric(retweets[each].text),
           'Likes': 0 if likes[each].text == "" else convert_to_numeric(likes[each].text),
           'Impressions': 0 if impressions[each].text == "" else convert_to_numeric(impressions[each].text)
       })


def reorder_json_by_impressions(json_data):
   # Ordenar la lista JSON en el lugar basado en "Impresiones" en orden descendente
   json_data.sort(key=lambda x: int(x['Impressions']), reverse=True)


def organize_write_data(data: dict):
   output = json.dumps(data, indent=2, ensure_ascii=False).encode("ascii", "ignore").decode("utf-8")
   try:
       with open("result.json", 'w', encoding='utf-8') as file:
           file.write(output)
   except Exception as err:
       print(f"Error encountered: {err}")


def convert_to_numeric(value):
   multipliers = {'K': 10 ** 3, 'M': 10 ** 6, 'B': 10 ** 9}

   try:
       if value[-1] in multipliers:
           return int(float(value[:-1]) * multipliers[value[-1]])
       else:
           return int(value)
   except ValueError:
       # Handle the case where the conversion fails
       return None


actions = ActionChains(driver)
for i in range(5):
   actions.send_keys(Keys.END).perform()
   sleep(5)
   scrape()

reorder_json_by_impressions(results)
organize_write_data(results)

print(f"Scraping Information on {search_keyword} is done.")

driver.quit()

Resultados finales

Este es el aspecto que debería tener el archivo JSON una vez realizado el scraping:

[
  {
    "Username": "@LindaEvelyn_N",
    "displayName": "Linda Evelyn Namulindwa",
    "Post": "Still getting used to Ugandan local foods so I had Glovo deliver me a KFC Streetwise Spicy rice meal (2 pcs of chicken & jollof rice at Ugx 18,000)\n\nNot only was it fast but it also accepts all payment methods.\n\n#GlovoDeliversKFC\n#ItsFingerLinkingGood",
    "Comments": 105,
    "Retweets": 148,
    "Likes": 1500,
    "Impressions": 66000
  },
  {
    "Username": "@GymCheff",
    "displayName": "The Gym Chef",
    "Post": "Delicious High Protein KFC Zinger Rice Box!",
    "Comments": 1,
    "Retweets": 68,
    "Likes": 363,
    "Impressions": 49000
  }
]

La guía descrita puede utilizarse para obtener datos sobre diversos temas de interés, lo que facilita los estudios de análisis del sentimiento público, seguimiento de tendencias, monitorización y gestión de la reputación. Python, a su vez, simplifica el proceso de recopilación automática de datos con su amplia gama de módulos y funciones incorporados. Estas herramientas son esenciales para configurar proxies, gestionar el desplazamiento de páginas y organizar eficazmente la información recopilada.

Comentarios:

0 Comentarios