#!/usr/bin/python3 import pytumblr import os from dotenv import load_dotenv from typing import Dict, List import logging from time import sleep class Tumblr: def __init__(self): self.client = self.auth() while not self.set_info(): logging.info("sleeping for 10 minutes") sleep(600) def set_info(self) -> bool: info = self.client.info() if 'meta' in info: if 'status' in info['meta']: if info['meta']['status'] == 429: logging.error(f"Rate limited. try again at some point: {info['errors']}") return False self.url = info.get("user").get("name") self.num_posts = info.get("user").get("blogs")[0].get('posts') return True def auth(self) -> pytumblr.TumblrRestClient: # tumblr requires four variables to authenticate for some reason ck = os.environ.get('CONSUMER_KEY') cs = os.environ.get('CONSUMER_SECRET') ok = os.environ.get('OAUTH_TOKEN') oss = os.environ.get('OAUTH_SECRET') return pytumblr.TumblrRestClient(ck, cs, ok, oss) def get_specific_post(self, id: str|int) -> Dict: return self.client.posts(self.url, id=id)['posts'][0] def is_empty_reblog(self, post: dict) -> bool: logging.debug(f"checking if post {post['id']} is empty...") try: if post['trail'][-1].get('blog').get('name') != self.url: # filter out posts where i added the last reblog if post['tags'] == []: # filter out posts where i added tags return True return False except Exception as e: logging.warning(f"Failed to check if reblog empty for post{post['id']}: {e}") return False def delete_post(self, id: str|int): try: logging.debug(f"Deleting post with id {id}") self.client.delete_post(self.url, id) except Exception as e: logging.warning(f"Failed to delete post {id}: {e}") def delete_posts(self, posts: List): for p in posts: self.delete_post(p) def del_empty_streaming(self): tot_deleted = 0 offset = 0 while offset < self.num_posts: batch = self.client.posts(self.url, offset=offset).get('posts') if batch: batch_deleted = 0 for p in batch: if self.is_empty_reblog(p): tot_deleted += 1 batch_deleted += 1 self.delete_post(p['id']) logging.info(f"posts {offset} through {offset+len(batch)} parsed, deleting {batch_deleted} empty reblogs.") offset += batch_deleted else: # no poasts found prolly timeout logging.info(f"batch was empty; assuming timeout and sleeping") sleep(600) print(f"Done. deleted {tot_deleted} empty reblogs.") def main(): load_dotenv() logging.basicConfig(level=logging.INFO) acc = Tumblr() acc.del_empty_streaming() if __name__ == "__main__": main()