Source code for seedr_client.seedr_handler

import os
import re
import json
import aria2p
import requests
import tempfile
import subprocess
from time import sleep
from random import randrange
from torrentool.api import Torrent
from .errors import (
    InvalidLogin,
    InvalidToken,
    LoginRequired,
    InvalidTorrent,
    DriveLimit,
    BadLeeching,
)


[docs]class SeedrHandler: def __init__( self, email=None, password=None, access_token=None, aria2c_secret=None, download_directory=".", ): self.rate_limit = 1 self.email = email self.password = password self.access_token = access_token self.base_folder_url = "https://www.seedr.cc/api/folder" self.base_oauth_url = "https://www.seedr.cc/oauth_test" # This is the list of file types Seedr client is not allowed to download, # alter this to your requirements, or leave blank if you want Seedr clients # should download all files types # TODO Replace this argument from environment instead self.exclude_file_type = ["jpg", "png", "txt", "exe"] self.get_token() self.magnet_regex = re.compile(r"magnet:\?xt=urn:[a-z0-9]+:[a-zA-Z0-9]{32}") self.torrent_regex = re.compile(r".*torrent$") self.drive_size = 0 self.get_drive() # This finishes init of the Seedr client, by setting up the drive size sleep(self.rate_limit) self.aria2 = None self.aria2c_secret = aria2c_secret self.seedr_download_options = {"user_agent": "Mozilla/5.0"} self.download_directory = download_directory
[docs] @staticmethod def contains_bad_token(response_text): return any( fail_resp in response_text for fail_resp in ["invalid_token", "expired_token"] )
[docs] @staticmethod def bytes_to_mb_gb(byte): """ A simple function that converts Bytes to Megabytes if value is less than 1 Gigabyte else returns value in Gigabytes :param byte: The value in Bytes to be converted :type byte: int :return: The converted value in MB or GB based on logic :rtype: str """ mb = byte / (1024**2) if mb >= 1024: gb = mb / 1024 return f"{round(gb, 2)} GB" return f"{round(mb, 2)} MB"
[docs] def list_contents(self, response_json): """ This method takes the response provided by the api request and sanitizes it, to only returns the relevant details pertaining to the folder structure :param response_json: The response from the api requests passed in json format :type response_json: dict :return: The folder structure of the folder pertaining to the api response :rtype: dict """ content = { "folders": [ { "folder_id": folder["id"], "folder_name": folder["name"], "size": self.bytes_to_mb_gb(folder["size"]), } for folder in response_json["folders"] ], "files": [ { "folder_file_id": file["folder_file_id"], "file_name": file["name"], "size": self.bytes_to_mb_gb(file["size"]), "folder_path": response_json["fullname"], } for file in response_json["files"] ], } return content
[docs] @staticmethod def is_op_success(response_text): """ Checks if there is a success code in the response provided by the api call and returns the appropriate boolean :param response_text: The response from the api call in text format :type response_text: str :return: Returns true if success, false otherwise :rtype: bool """ if json.loads(response_text)["result"]: return True else: return False
[docs] def is_login_success(self): """ Checks if the access token is valid :return: Returns True if the token is valid, False if it isn't :rtype: bool """ response = requests.get( f"{self.base_folder_url}?access_token={self.access_token}" ) if self.contains_bad_token(response_text=response.text): return False else: return True
[docs] def is_request_failed(self, response_text, requested_on): """ Checks for any failed codes in the response and if there is any, then it raises the appropriate error code with detail, else returns false :param response_text: The text response received by Seedr api to the request made by the previous method :type response_text: str :param requested_on: A string that is either "file" or "folder" based on what kinda file system operations was going on in the previous method :type requested_on: str :return: Returns False if no fail code :rtype: bool """ if "access_denied" in response_text: # TODO add a way to notify when this happens raise FileNotFoundError(f"{requested_on.title()} not found in drive") elif self.contains_bad_token(response_text=response_text): # TODO add a way to notify when this happens raise InvalidToken("Invalid/Expired access token.") else: return False
[docs] def get_token(self): """ Gets the access token if email and password is passed during init, else if token itself was provided instead verifies that the access token is valid and if an error occurs during the previous process or if none of the required details are passed during init of client raises an error. """ if self.email and self.password: data = { "grant_type": "password", "client_id": "seedr_chrome", "type": "login", "username": self.email, "password": self.password, } response = requests.post(f"{self.base_oauth_url}/token.php", data=data) if "access_token" in response.text: self.access_token = json.loads(response.text)["access_token"] else: # TODO add a way to notify when this happens raise InvalidLogin("Invalid username and password combination.") elif self.access_token: if not self.is_login_success(): # TODO add a way to notify when this happens raise InvalidToken("Invalid/Expired access token.") else: # TODO add a way to notify when this happens raise LoginRequired("Account login or token is required.")
[docs] def get_drive(self): """ Gets details about the drive's space, active torrents, and all downloaded files and folders Note: This is the same as the get_folder method except seedr api does not ask for a folder id, additionally we are able top display a few additional details, including active torrents :return: A dictionary containing, available and used drive space, active torrents and its details, downloaded folders and files and their details :rtype: dict """ response = requests.get( f"{self.base_folder_url}?access_token={self.access_token}" ) if self.contains_bad_token(response_text=response.text): # TODO add a way to notify when this happens raise InvalidToken("Invalid/Expired access token.") else: response_json = json.loads(response.text) drive = { "space": { "total": self.bytes_to_mb_gb(response_json["space_max"]), "used": self.bytes_to_mb_gb(response_json["space_used"]), }, "parent_folder_id": response_json["folder_id"] if response_json["parent"] == -1 else None, "torrents": [ { "name": torrent["name"], "torrent_id": torrent["id"], "progress": torrent["progress"], "progress_url": torrent["progress_url"], } for torrent in response_json["torrents"] ], } drive = drive | self.list_contents(response_json=response_json) self.drive_size = response_json["space_max"] return drive
[docs] def get_folder(self, folder_id): """ This method lists all the content of the folder as a dictionary associated with the folder id that is passed if it exists :param folder_id: The unique id associated with the folder that you need the info about :type folder_id: int :return: A dictionary contains a list of all files and folders in the associated folder :rtype: dict """ response = requests.get( f"{self.base_folder_url}/{str(folder_id)}?access_token={self.access_token}" ) response_text = response.text if not self.is_request_failed( response_text=response_text, requested_on="folder" ): response_json = json.loads(response_text) if folder_id == response_json["folder_id"]: folder = {"folder_name": response_json["fullname"]} folder = folder | self.list_contents(response_json=response_json) return folder else: # TODO add a way to notify when this happens raise LookupError( "Provided folder id does not match the received folder id" )
[docs] def get_file(self, folder_file_id): """ This method returns the name and the download url associated with the file id that is passed :param folder_file_id: The unique id associated with the file you need the info about :type folder_file_id: int :return: A dictionary containing file name and download url :rtype: dict """ data = { "access_token": self.access_token, "func": "fetch_file", "folder_file_id": str(folder_file_id), } response = requests.post(f"{self.base_oauth_url}/resource.php", data=data) response_text = response.text if not self.is_request_failed(response_text=response_text, requested_on="file"): response_json = json.loads(response_text) file = {"name": response_json["name"], "download_url": response_json["url"]} return file
[docs] def add_torrent( self, torrent=None, wishlist_id=None, folder_id=-1, check_size=True ): """ This function allows you to pass either torrent file or magnet uri or a wishlist id, to start downloading by Seedr. Note: If magnet uri is passed instead of a torrent file which is the preferred method, and if the peers in the torrent is lower than 3 or if the torrent is completely dead then this function might not be able to add the torrent due to the underlying logic which requires the meta info of the torrent. This is a drawback of this approach as this helps avoid wasting time checking if there is enough storage space, but can't use the Seedr's cache, which works even if there is no seeders. If there is enough requests to correct this approach I will add a timeout logic to the method in next iteration. For now try supplying torrents file instead of the magnet uri. :param torrent: The torrent file or magnet uri that you want to add to be leeched/downloaded :type torrent: str :param wishlist_id: The wishlist id that you want to add from your Seedr wishlist :type wishlist_id: int :param folder_id: The folder you want the torrent to be downloaded to. Defaults to parent. :type folder_id: int :param check_size: Used to inform function if checking of Seedr drive space with torrent size is required or not. By default, it checks the torrent size with drive size and raise error if torrent size is larger than drive size. :type check_size: bool :return: If successful returns the name, ID and progress url(returns completed incase the download has already completed and there is no url to present) of the torrent. :rtype: dict """ if torrent: if check_size: if self.torrent_regex.match(torrent): pass elif self.magnet_regex.match(torrent): # TODO Add a timeout wrapper, which will jump right to adding the torrent instead temp_torrent_file_path = os.path.join( tempfile.gettempdir(), f"temp{randrange(1, 10**4):04}.torrent" ) subprocess.run( ["ih2torrent", "--file", temp_torrent_file_path, torrent] ) torrent = temp_torrent_file_path else: raise InvalidTorrent( f"The torrent passed is invalid, please verify it fix it.\nTorrent/Magnet: {torrent}" ) torrent_info = Torrent.from_file(torrent) if self.drive_size >= torrent_info.total_size: torrent_magnet_uri = torrent_info.magnet_link else: print(self.drive_size, torrent_info.total_size) raise DriveLimit( "The torrent is larger than the total available space in the drive" ) else: if self.torrent_regex.match(torrent): torrent_info = Torrent.from_file(torrent) torrent_magnet_uri = torrent_info.magnet_link elif self.magnet_regex.match(torrent): torrent_magnet_uri = torrent else: raise InvalidTorrent( f"The torrent passed is invalid, please verify it fix it.\nTorrent/Magnet: {torrent}" ) elif wishlist_id: torrent_magnet_uri = None else: raise TypeError( "add_torrent() is missing an argument. At least one argument needs to be passed" ) data = { "access_token": self.access_token, "func": "add_torrent", "torrent_magnet": torrent_magnet_uri, "wishlist_id": wishlist_id, "folder_id": folder_id, } response = requests.post(f"{self.base_oauth_url}/resource.php", data=data) response_json = json.loads(response.text) sleep(self.rate_limit) current_drive_content = self.get_drive() torrents_active = current_drive_content["torrents"] progress_url = None if response_json["result"]: for torrent in torrents_active: if torrent["torrent_id"] == response_json["user_torrent_id"]: progress_url = torrent["progress_url"] if progress_url is None: for folder in current_drive_content["folders"]: if folder["folder_name"] == response_json["title"]: progress_url = "completed" return { "torrent_id": response_json["user_torrent_id"], "file_name": response_json["title"], "progress_url": progress_url, } else: raise BadLeeching( f"The provided Torrent couldn't be leeched/downloaded to the drive.\n {data=}" )
[docs] def download_folder(self, folder_id, builtin_downloader=True): content = self.get_folder(folder_id=folder_id) sleep(self.rate_limit) download_list = content["files"] next_list_folders = content["folders"] while True: if next_list_folders: temp_list_folders = [] for folder in next_list_folders: subfolder_content = self.get_folder(folder_id=folder["folder_id"]) sleep(self.rate_limit) temp_list_folders += subfolder_content["folders"] download_list += subfolder_content["files"] next_list_folders = temp_list_folders else: break # This loop checks if there is any file that matches the list of extensions that are to be excluded and if # present will remove them from the list temp_download_list = [] for item in download_list: temp_item_ext = item["file_name"].split(".")[-1] if temp_item_ext not in self.exclude_file_type: item["folder_path"] = os.path.join( self.download_directory, *item["folder_path"].split("/"), "" ) temp_download_list.append(item) download_list = temp_download_list for i in range(len(download_list)): download_list[i]["download_url"] = self.get_file( download_list[i]["folder_file_id"] )["download_url"] sleep(self.rate_limit) if not builtin_downloader: return download_list download_list = sorted(download_list, key=lambda d: d["size"]) # Only runs if aria2p client hasn't already been initiated if not self.aria2: self.aria2 = aria2p.API( aria2p.Client( host="http://localhost", port=6800, secret=self.aria2c_secret ) ) download_queue = [] # Adds each url obtained from previous steps one by one with priorities given for smallest files first for i in range(len(download_list)): download_adder = self.aria2.add( uri=download_list[i]["download_url"], # TODO make the directory data dynamic, such that each file is downloaded to the appropriate folder options={"dir": download_list[i]["folder_path"]}, position=i, ) download_queue.append(download_adder[0].gid) while True: # TODO FUTURE show progress real time temp_download_queue = [] for gid in download_queue: download_info = self.aria2.get_download(gid=gid) if not download_info.is_complete: temp_download_queue.append(gid) download_queue = temp_download_queue if len(download_queue) == 0: break sleep(5) # TODO return parent directory instead return True
[docs] def delete_folder(self, folder_id): """ This method deletes the folder associated with the id that is passed and returns True on success :param folder_id: The id of the folder you wish to delete :type folder_id: int :return: Returns True or False based on success :rtype: bool """ data = { "access_token": self.access_token, "func": "delete", "delete_arr": '[{"type":"folder","id":"' + str(folder_id) + '"}]', } # TODO Correct this stub to follow DRY # TODO add a conditional statement to prevent deletion of parent folder response = requests.post(f"{self.base_oauth_url}/resource.php", data=data) response_text = response.text if not self.is_request_failed( response_text=response_text, requested_on="folder" ): return self.is_op_success(response_text=response_text)
[docs] def delete_file(self, folder_file_id): """ This method deletes the file associated with the id that is passed and returns True on success :param folder_file_id: The id of the file you wish to delete :type folder_file_id: int :return: Returns True or False based on success :rtype: bool """ data = { "access_token": self.access_token, "func": "delete", "delete_arr": '[{"type":"file","id":"' + str(folder_file_id) + '"}]', } # TODO Correct this stub to follow DRY response = requests.post(f"{self.base_oauth_url}/resource.php", data=data) response_text = response.text if not self.is_request_failed(response_text=response_text, requested_on="file"): return self.is_op_success(response_text=response_text)
[docs] def delete_torrent(self, torrent_id): """ This method deletes any active torrent associated with the id that is passed and returns True on success :param torrent_id: The id of the active torrent you wish to delete :type torrent_id: int :return: Returns True or False based on success :rtype: bool """ data = { "access_token": self.access_token, "func": "delete", "delete_arr": f'[{{"type":"torrent","id":{torrent_id}}}]', } response = requests.post(f"{self.base_oauth_url}/resource.php", data=data) response_text = response.text if not self.is_request_failed( response_text=response_text, requested_on="torrent" ): return self.is_op_success(response_text=response_text)
[docs] def delete_all(self): """ This method deletes all the content within the drive, whether it downloaded to the drive or being downloaded. :return: Returns True if the method clears the whole drive successfully, False otherwise. :rtype: bool """ content = self.get_drive() for folder in content["folders"]: self.delete_folder(folder["folder_id"]) sleep(self.rate_limit) for file in content["files"]: self.delete_file(file["folder_file_id"]) sleep(self.rate_limit) for torrent in content["torrents"]: self.delete_torrent(torrent["torrent_id"]) sleep(self.rate_limit) content = self.get_drive() if content["space"]["used"] == "0.0 GB": return True return False