ft: bulk of code

This commit is contained in:
2026-04-27 19:33:19 +02:00
parent 6b390463ca
commit bbd7f59f0b

94
main.py Normal file
View File

@@ -0,0 +1,94 @@
#!/usr/bin/python3
import pytumblr
import os
from dotenv import load_dotenv
from typing import Dict, List
import logging
from time import sleep
class Tumblr:
def __init__(self):
self.client = self.auth()
while not self.set_info():
logging.info("sleeping for 10 minutes")
sleep(600)
def set_info(self) -> bool:
info = self.client.info()
if 'meta' in info:
if 'status' in info['meta']:
if info['meta']['status'] == 429:
logging.error(f"Rate limited. try again at some point: {info['errors']}")
return False
self.url = info.get("user").get("name")
self.num_posts = info.get("user").get("blogs")[0].get('posts')
return True
def auth(self) -> pytumblr.TumblrRestClient:
# tumblr requires four variables to authenticate for some reason
ck = os.environ.get('CONSUMER_KEY')
cs = os.environ.get('CONSUMER_SECRET')
ok = os.environ.get('OAUTH_TOKEN')
oss = os.environ.get('OAUTH_SECRET')
return pytumblr.TumblrRestClient(ck, cs, ok, oss)
def get_specific_post(self, id: str|int) -> Dict:
return self.client.posts(self.url, id=id)['posts'][0]
def is_empty_reblog(self, post: dict) -> bool:
logging.debug(f"checking if post {post['id']} is empty...")
try:
if post['trail'][-1].get('blog').get('name') != self.url: # filter out posts where i added the last reblog
if post['tags'] == []: # filter out posts where i added tags
return True
return False
except Exception as e:
logging.warning(f"Failed to check if reblog empty for post{post['id']}: {e}")
return False
def delete_post(self, id: str|int):
try:
logging.debug(f"Deleting post with id {id}")
self.client.delete_post(self.url, id)
except Exception as e:
logging.warning(f"Failed to delete post {id}: {e}")
def delete_posts(self, posts: List):
for p in posts:
self.delete_post(p)
def del_empty_streaming(self):
tot_deleted = 0
offset = 0
while offset < self.num_posts:
batch = self.client.posts(self.url, offset=offset).get('posts')
if batch:
batch_deleted = 0
for p in batch:
if self.is_empty_reblog(p):
tot_deleted += 1
batch_deleted += 1
self.delete_post(p['id'])
logging.info(f"posts {offset} through {offset+len(batch)} parsed, deleting {batch_deleted} empty reblogs.")
offset += batch_deleted
else:
# no poasts found prolly timeout
logging.info(f"batch was empty; assuming timeout and sleeping")
sleep(600)
print(f"Done. deleted {tot_deleted} empty reblogs.")
def main():
load_dotenv()
logging.basicConfig(level=logging.INFO)
acc = Tumblr()
acc.del_empty_streaming()
if __name__ == "__main__":
main()