diff --git a/main.py b/main.py new file mode 100644 index 0000000..25bc84b --- /dev/null +++ b/main.py @@ -0,0 +1,94 @@ +#!/usr/bin/python3 + +import pytumblr +import os +from dotenv import load_dotenv +from typing import Dict, List +import logging +from time import sleep + +class Tumblr: + def __init__(self): + self.client = self.auth() + while not self.set_info(): + logging.info("sleeping for 10 minutes") + sleep(600) + + + def set_info(self) -> bool: + info = self.client.info() + if 'meta' in info: + if 'status' in info['meta']: + if info['meta']['status'] == 429: + logging.error(f"Rate limited. try again at some point: {info['errors']}") + return False + self.url = info.get("user").get("name") + self.num_posts = info.get("user").get("blogs")[0].get('posts') + return True + + + def auth(self) -> pytumblr.TumblrRestClient: + # tumblr requires four variables to authenticate for some reason + ck = os.environ.get('CONSUMER_KEY') + cs = os.environ.get('CONSUMER_SECRET') + ok = os.environ.get('OAUTH_TOKEN') + oss = os.environ.get('OAUTH_SECRET') + + return pytumblr.TumblrRestClient(ck, cs, ok, oss) + + def get_specific_post(self, id: str|int) -> Dict: + return self.client.posts(self.url, id=id)['posts'][0] + + def is_empty_reblog(self, post: dict) -> bool: + logging.debug(f"checking if post {post['id']} is empty...") + try: + if post['trail'][-1].get('blog').get('name') != self.url: # filter out posts where i added the last reblog + if post['tags'] == []: # filter out posts where i added tags + return True + return False + except Exception as e: + logging.warning(f"Failed to check if reblog empty for post{post['id']}: {e}") + return False + + def delete_post(self, id: str|int): + try: + logging.debug(f"Deleting post with id {id}") + self.client.delete_post(self.url, id) + except Exception as e: + logging.warning(f"Failed to delete post {id}: {e}") + + def delete_posts(self, posts: List): + for p in posts: + self.delete_post(p) + + def del_empty_streaming(self): + tot_deleted = 0 + offset = 0 + while offset < self.num_posts: + batch = self.client.posts(self.url, offset=offset).get('posts') + if batch: + batch_deleted = 0 + for p in batch: + if self.is_empty_reblog(p): + tot_deleted += 1 + batch_deleted += 1 + self.delete_post(p['id']) + logging.info(f"posts {offset} through {offset+len(batch)} parsed, deleting {batch_deleted} empty reblogs.") + offset += batch_deleted + else: + # no poasts found prolly timeout + logging.info(f"batch was empty; assuming timeout and sleeping") + sleep(600) + print(f"Done. deleted {tot_deleted} empty reblogs.") + + +def main(): + load_dotenv() + logging.basicConfig(level=logging.INFO) + + acc = Tumblr() + acc.del_empty_streaming() + + +if __name__ == "__main__": + main()