Skip to content

Commit

Permalink
style: syntax overhaul
Browse files Browse the repository at this point in the history
  • Loading branch information
wzyboy committed Jul 9, 2023
1 parent 1377058 commit 88284fc
Showing 1 changed file with 44 additions and 60 deletions.
104 changes: 44 additions & 60 deletions ash.py
Expand Up @@ -10,6 +10,8 @@
from urllib.parse import urlsplit
from collections.abc import Mapping

from collections.abc import Iterator

import flask
import requests
from elasticsearch import Elasticsearch
Expand All @@ -22,10 +24,7 @@ class DefaultConfig:
T_EXTERNAL_TWEETS = False


app = flask.Flask(
__name__,
static_url_path='/tweet/static'
)
app = flask.Flask(__name__, static_url_path='/tweet/static')
app.config.from_object(DefaultConfig)
try:
app.config.from_object('config.Config')
Expand All @@ -35,7 +34,6 @@ class DefaultConfig:

# Set up external Tweets support
if app.config.get('T_EXTERNAL_TWEETS'):

# https://developer.twitter.com/en/docs/basics/authentication/api-reference/token
resp = requests.post(
'https://api.twitter.com/oauth2/token',
Expand All @@ -51,7 +49,7 @@ class DefaultConfig:
app.config['T_TWITTER_TOKEN'] = bearer_token


def toot_to_tweet(status):
def toot_to_tweet(status: dict) -> dict:
'''Transform toot to be compatible with tweet-interface'''
# Status is a tweet
if status.get('user'):
Expand Down Expand Up @@ -82,44 +80,41 @@ def toot_to_tweet(status):

class TweetsDatabase(Mapping):

def __init__(self, es_host, es_index):
def __init__(self, es_host: str, es_index: str) -> None:
self.es = Elasticsearch(es_host)
self.es_index = es_index

def _search(self, **kwargs):
def _search(self, **kwargs) -> Iterator[dict]:
if not kwargs.get('index'):
kwargs['index'] = self.es_index
hits = self.es.search(**kwargs)['hits']['hits']
tweets = []
for hit in hits:
tweet = hit['_source']
tweet['@index'] = hit['_index']
tweet = toot_to_tweet(tweet)
tweets.append(tweet)
return tweets
yield tweet

def __getitem__(self, tweet_id):
def __getitem__(self, tweet_id: str | int) -> dict:
resp = self._search(
query={
'term': {
'_id': tweet_id
}
})
if len(resp) == 0:
raise KeyError(f'Tweet ID {tweet_id} not found')
else:
tweet = resp[0]
return tweet
try:
return next(resp)
except StopIteration:
raise KeyError(f'Tweet ID {tweet_id} not found') from None

def __iter__(self):
def __iter__(self) -> Iterator[int]:
resp = self._search(
sort=['@timestamp'],
#size=1000,
)
for tweet in resp:
yield tweet['id']

def __reversed__(self):
def __reversed__(self) -> Iterator[int]:
resp = self._search(
sort=[{
'@timestamp': {'order': 'desc'}
Expand All @@ -129,10 +124,10 @@ def __reversed__(self):
for tweet in resp:
yield tweet['id']

def __len__(self):
def __len__(self) -> int:
return self.es.count(index=self.es_index)['count']

def search(self, *, keyword=None, user_screen_name=None, index=None, limit=100):
def search(self, *, keyword=None, user_screen_name=None, index=None, limit=100) -> Iterator[dict]:
keyword_query = {
'simple_query_string': {
'query': keyword,
Expand All @@ -142,7 +137,7 @@ def search(self, *, keyword=None, user_screen_name=None, index=None, limit=100):
}
if user_screen_name and '@' in user_screen_name: # Mastodon
screen_name_field = 'account.fqn.keyword'
else:
else: # Twitter
screen_name_field = 'user.screen_name.keyword'
user_query = {
'term': {
Expand All @@ -166,7 +161,7 @@ def search(self, *, keyword=None, user_screen_name=None, index=None, limit=100):
)
return resp

def get_users(self):
def get_users(self) -> Iterator[dict]:
agg_name_twitter = 'user_screen_names'
agg_name_mastodon = 'account_fqn'
resp = self.es.search(
Expand All @@ -186,16 +181,14 @@ def get_users(self):
},
)
buckets = resp['aggregations'][agg_name_twitter]['buckets'] + resp['aggregations'][agg_name_mastodon]['buckets']
users = [
{
for bucket in buckets:
user = {
'screen_name': bucket['key'],
'tweets_count': bucket['doc_count']
}
for bucket in buckets
]
return users
yield user

def get_indexes(self):
def get_indexes(self) -> Iterator[dict]:
agg_name = 'index_names'
resp = self.es.search(
index=self.es_index,
Expand All @@ -208,17 +201,15 @@ def get_indexes(self):
}
},
)
indexes = [
{
for bucket in resp['aggregations'][agg_name]['buckets']:
index = {
'name': bucket['key'],
'tweets_count': bucket['doc_count']
}
for bucket in resp['aggregations'][agg_name]['buckets']
]
return indexes
yield index


def get_tdb():
def get_tdb() -> TweetsDatabase:
if not hasattr(flask.g, 'tdb'):
flask.g.tdb = TweetsDatabase(
app.config['T_ES_HOST'],
Expand All @@ -228,16 +219,15 @@ def get_tdb():


@app.template_global('get_tweet_link')
def get_tweet_link(screen_name, tweet_id, original_link=False):
def get_tweet_link(screen_name: str, tweet_id: str | int, original_link: bool = False) -> str:
if original_link:
return f'https://twitter.com/{screen_name}/status/{tweet_id}'
else:
return flask.url_for('get_tweet', tweet_id=tweet_id, ext='html')


@app.template_filter('format_tweet_text')
def format_tweet_text(tweet):

def format_tweet_text(tweet: dict) -> str:
try:
tweet_text = tweet['full_text']
except KeyError:
Expand Down Expand Up @@ -286,15 +276,13 @@ def format_tweet_text(tweet):
# true and has a valid "retweeted_status". Tweets that are ingested via
# Twitter Archive always has "retweeted" set to false (identical to a
# "traditional" RT.
retweeted_status = tweet.get('retweeted_status')
if retweeted_status:
if retweeted_status := tweet.get('retweeted_status'):
link = get_tweet_link('status', retweeted_status['id'])
a = f'<a href="{link}">RT</a>'
tweet_text = tweet_text.replace('RT', a, 1)

# Format reblogged toot
reblogged_status = tweet.get('reblog')
if reblogged_status:
if reblogged_status := tweet.get('reblog'):
status_link = reblogged_status['url']
author = reblogged_status['account']['fqn']
author_link = reblogged_status['account']['url']
Expand All @@ -305,7 +293,7 @@ def format_tweet_text(tweet):


@app.template_filter('format_created_at')
def format_created_at(timestamp, fmt):
def format_created_at(timestamp: str, fmt: str) -> str:
try:
dt = datetime.strptime(timestamp, '%a %b %d %H:%M:%S %z %Y')
except ValueError:
Expand All @@ -317,7 +305,7 @@ def format_created_at(timestamp, fmt):


@app.template_filter('in_reply_to_link')
def in_reply_to_link(tweet):
def in_reply_to_link(tweet: dict) -> str:
if tweet.get('account'): # Mastodon
# If this is a self-thread, return local link
if tweet['in_reply_to_account_id'] == tweet['account']['id']:
Expand All @@ -329,7 +317,7 @@ def in_reply_to_link(tweet):
return get_tweet_link('status', tweet['in_reply_to_status_id'])


def replace_media_url(url):
def replace_media_url(url: str) -> str:
if app.config['T_MEDIA_FROM'] == 'direct':
return url
elif app.config['T_MEDIA_FROM'] == 'filesystem':
Expand All @@ -343,6 +331,8 @@ def replace_media_url(url):
return url.replace(orig, repl)
else:
return url
else:
return url


@app.route('/')
Expand All @@ -352,11 +342,9 @@ def root():

@app.route('/tweet/')
def index():

tdb = get_tdb()
total_tweets = len(tdb)
default_user = app.config.get('T_DEFAULT_USER')
if default_user:
if default_user := app.config.get('T_DEFAULT_USER'):
latest_tweets = tdb.search(keyword='*', user_screen_name=default_user, limit=10)
else:
latest_tweets = [tdb[tid] for tid in itertools.islice(reversed(tdb), 10)]
Expand All @@ -372,10 +360,8 @@ def index():


@lru_cache(maxsize=1024)
def fetch_tweet(tweet_id):

def fetch_tweet(tweet_id: int | str) -> dict:
token = app.config['T_TWITTER_TOKEN']

resp = requests.get(
'https://api.twitter.com/1.1/statuses/show.json',
headers={
Expand All @@ -395,7 +381,6 @@ def fetch_tweet(tweet_id):

@app.route('/tweet/<tweet_id>.<ext>')
def get_tweet(tweet_id, ext):

if ext not in ('txt', 'json', 'html'):
flask.abort(404)

Expand Down Expand Up @@ -424,7 +409,7 @@ def get_tweet(tweet_id, ext):

# HTML output

# Extract list images
# Extract media
images = []
videos = []
try:
Expand All @@ -434,7 +419,7 @@ def get_tweet(tweet_id, ext):
entities = tweet['entities']
media = entities.get('media', [])
for m in media:
# type = video
# type is video
if m.get('type') == 'video':
variants = m['video_info']['variants']
hq_variant = max(variants, key=lambda v: v.get('bitrate', -1))
Expand All @@ -444,7 +429,7 @@ def get_tweet(tweet_id, ext):
videos.append({
'url': media_url,
})
# type = photo
# type is photo
elif m.get('type') == 'photo':
media_url = m['media_url_https']
if not _is_external_tweet:
Expand All @@ -453,7 +438,7 @@ def get_tweet(tweet_id, ext):
'url': media_url,
'description': m.get('description', '')
})
# type = unknown
# type is unknown
else:
pass

Expand All @@ -470,12 +455,12 @@ def get_tweet(tweet_id, ext):


@app.route('/tweet/media/<path:fs_path>')
def get_media_from_filesystem(fs_path):
def get_media_from_filesystem(fs_path: str):
return flask.send_from_directory(app.config['T_MEDIA_FS_PATH'], fs_path)


@app.route('/tweet/search.<ext>')
def search_tweet(ext):
def search_tweet(ext: str):
if ext not in ('html', 'txt', 'json'):
flask.abort(404)

Expand All @@ -491,9 +476,8 @@ def search_tweet(ext):
indexes = tdb.get_indexes()

user = flask.request.args.get('u', '')
keyword = flask.request.args.get('q', '')
index = flask.request.args.get('i', '')
if keyword:
if keyword := flask.request.args.get('q', ''):
tweets = tdb.search(
keyword=keyword,
user_screen_name=user,
Expand Down

0 comments on commit 88284fc

Please sign in to comment.