/
scrape_twitter_selenium.py
154 lines (124 loc) · 4.63 KB
/
scrape_twitter_selenium.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import json
import re
from dataclasses import dataclass
from pathlib import Path
from time import sleep, time
from typing import List, Dict
from bs4 import BeautifulSoup
from selenium import webdriver
import argparse
@dataclass
class Tweet:
text: str
time: str
time_tag: str
author: str
author_handle: str
def parse(page_source, tweets: List):
soup = BeautifulSoup(page_source, "html.parser")
for article in soup.find_all("article"):
curr_tweet = None
author_tag = article.find(
"a",
class_="css-4rbku5 css-18t94o4 css-1dbjc4n r-1loqt21 r-1wbh5a2 r-dnmrzs r-1ny4l3l",
)
text_tag = article.find(
"div",
class_="css-901oao r-jwli3a r-1qd0xha r-a023e6 r-16dba41 r-ad9z0x r-bcqeeo r-bnwqim r-qvutc0",
)
time_tag = article.find("time")
if (
author_tag is not None
and time_tag is not None
and text_tag is not None
):
author = author_tag["href"].split("@")[0]
time_text = time_tag.get_text()
timestamp = time_tag["datetime"]
tweet_text = text_tag.get_text()
author_text = author_tag.get_text()
# quickfix for fixing scandinavian unicode letters
author_text = author_text.encode("utf-8")
tweet_text = tweet_text.encode("utf-8")
curr_tweet = Tweet(
author=author_text,
author_handle=author,
text=tweet_text,
time_tag=timestamp,
time=time_text,
)
if curr_tweet is not None:
tweets.append(curr_tweet)
def main(args:Dict):
output_file = args["output"]
url = args["url"]
print("Starting - loading URL")
# create a new Firefox session
driver = webdriver.Safari()
driver.implicitly_wait(30)
driver.get(url)
print("Now waiting for some time....")
# try:
# replies_present = EC.presence_of_element_located((By.TAG_NAME, 'article'))
# WebDriverWait(driver, 10).until(replies_present)
# except TimeoutException:
# print("Timeoot!")
# exit()
sleep(1)
tweets = []
print("Starting scroll")
# Scroll to bottom to make sure we get all tweets
pre_scroll_height = driver.execute_script("return document.body.scrollHeight;")
run_time, max_run_time = 0, 2
while True:
iteration_start = time()
# Scroll webpage, the 100 allows for a more 'aggressive' scroll
driver.execute_script("window.scrollTo(0, 100*document.body.scrollHeight);")
post_scroll_height = driver.execute_script(
"return document.body.scrollHeight;"
)
scrolled = post_scroll_height != pre_scroll_height
timed_out = run_time >= max_run_time
if scrolled:
run_time = 0
pre_scroll_height = post_scroll_height
parse(driver.page_source, tweets)
elif not scrolled and not timed_out:
run_time += time() - iteration_start
elif not scrolled and timed_out:
break
parse(driver.page_source, tweets)
# end the Selenium browser session
driver.quit()
# Data cleanup - remove duplicates and clean author names
outdict = [d.__dict__ for d in tweets]
unique_tweets = list({each["text"]: each for each in outdict}.values())
f = open("test.txt", "a")
# for d in unique_tweets:
# f.write(d)
# remove / from authornames
for d in unique_tweets:
d["author"] = d["author"].decode("utf-8")
d["text"] = d["text"].decode("utf-8")
d["author_handle"] = re.sub("[/@]", "", d["author_handle"])
d["author"] = d["author"].replace("@" + d["author_handle"], "")
print(json.dumps(unique_tweets, indent=4, ensure_ascii=False))
print(f"FOUND {len(unique_tweets)} TWEETS")
with output_file.open("w", encoding="utf-8") as f:
json.dump(unique_tweets, f, ensure_ascii=False, indent=4)
def parse_args():
parser = argparse.ArgumentParser(description='Scrape all replies from a tweet.')
parser.add_argument('--url', type=str, default="http://twitter.com/francesarnold/status/1260227823580491776",
help='URL to scrape')
parser.add_argument('--output',
default="./output/test.json",
type=str,
help='File to output JSON to.')
args = parser.parse_args()
argdict = vars(args)
argdict["output"] = Path(argdict["output"])
argdict["output"].parent.mkdir(exist_ok=True)
return argdict
if __name__ == '__main__':
args = parse_args()
main(args)