From db0dcb0d8ba6173bd61f423a3097784d94442500 Mon Sep 17 00:00:00 2001 From: lzzy12 Date: Sun, 10 Nov 2019 19:07:22 +0530 Subject: [PATCH] [DO NOT DEPLOY] Incomplete save Signed-off-by: lzzy12 --- bot/__init__.py | 2 + bot/helper/ext_utils/bot_utils.py | 19 ++--- bot/helper/ext_utils/exceptions.py | 2 +- bot/helper/mirror_utils/aria2_download.py | 90 ++++++++++++++++++++ bot/helper/mirror_utils/download_helper.py | 28 +++++++ bot/helper/mirror_utils/download_tools.py | 91 --------------------- bot/helper/mirror_utils/gdriveTools.py | 4 +- bot/helper/mirror_utils/listeners.py | 8 +- bot/helper/telegram_helper/message_utils.py | 22 +++-- bot/modules/mirror.py | 78 ++++++++---------- 10 files changed, 186 insertions(+), 158 deletions(-) create mode 100644 bot/helper/mirror_utils/aria2_download.py create mode 100644 bot/helper/mirror_utils/download_helper.py delete mode 100644 bot/helper/mirror_utils/download_tools.py diff --git a/bot/__init__.py b/bot/__init__.py index 4507171..1555825 100644 --- a/bot/__init__.py +++ b/bot/__init__.py @@ -3,6 +3,7 @@ import configparser import aria2p import threading from telegram.ext import Updater +from telegram import Bot import os import time @@ -72,5 +73,6 @@ except KeyError as e: LOGGER.error("One or more env variables missing! Exiting now") exit(1) +bot = Bot(BOT_TOKEN) updater = Updater(token=BOT_TOKEN, use_context=True) dispatcher = updater.dispatcher diff --git a/bot/helper/ext_utils/bot_utils.py b/bot/helper/ext_utils/bot_utils.py index 273804f..c941d93 100644 --- a/bot/helper/ext_utils/bot_utils.py +++ b/bot/helper/ext_utils/bot_utils.py @@ -81,17 +81,16 @@ def get_download_str(): return result -def get_readable_message(progress_list: list = None): - if progress_list is None: - with download_dict_lock: - progress_list = list(download_dict.values()) +def get_readable_message(): + with download_dict_lock: + progress_list = list(download_dict.values()) msg = '' - for status in progress_list: - msg += f'Name: {status.name()}\n' \ - f'Status: {status.status()}\n' \ - f'{get_progress_bar_string(status)} {status.progress()} of {status.size()}\n' \ - f'Speed: {status.speed()}\n' \ - f'ETA: {status.eta()}\n\n' + for download in progress_list: + msg += f'Name: {download.name()}\n' \ + f'Status: {download.status()}\n' \ + f'{get_progress_bar_string(download)} {download.progress()} of {download.size()}\n' \ + f'Speed: {download.speed()}\n' \ + f'ETA: {download.eta()}\n\n' # LOGGER.info(msg) return msg diff --git a/bot/helper/ext_utils/exceptions.py b/bot/helper/ext_utils/exceptions.py index e39e15a..0181b51 100644 --- a/bot/helper/ext_utils/exceptions.py +++ b/bot/helper/ext_utils/exceptions.py @@ -2,7 +2,7 @@ class DriveAuthError(Exception): pass -class KillThreadException(Exception): +class MessageDeletedError(Exception): """ Custom Exception class for killing thread as soon as they aren't needed""" def __init__(self, message, error=None): diff --git a/bot/helper/mirror_utils/aria2_download.py b/bot/helper/mirror_utils/aria2_download.py new file mode 100644 index 0000000..9a19557 --- /dev/null +++ b/bot/helper/mirror_utils/aria2_download.py @@ -0,0 +1,90 @@ +from bot import DOWNLOAD_STATUS_UPDATE_INTERVAL, aria2 +from bot.helper.ext_utils.bot_utils import * +from .download_helper import DownloadHelper +from .download_status import DownloadStatus +import threading +from aria2p import API, ClientException +import schedule +import time + + +class AriaDownloadHelper(DownloadHelper): + + def __init__(self, listener): + super(AriaDownloadHelper, self).__init__(listener) + self.__is_torrent = False + self.gid = None + self.__scheduler = schedule.every(DOWNLOAD_STATUS_UPDATE_INTERVAL).seconds.do(self.__onDownloadProgress()) + + def __updater(self): + while True: + with self._resource_lock: + if self.__scheduler is None: + break + schedule.run_pending() + time.sleep(1) + + def __onDownloadStarted(self, api, gid): + with self._resource_lock: + if self.gid == gid: + download = api.get_download(gid) + self.name = download.name + self.size = download.length + self._listener.onDownloadStarted() + self.should_update = True + + def __onDownloadProgress(self): + with self._resource_lock: + download = aria2.get_download(self.gid) + self.progress = download.progress + self.progress_string = download.progress_string + self.eta_string = download.eta_string + self.eta = download.eta + + def __onDownloadComplete(self, api: API, gid): + with self._resource_lock: + if self.gid == gid: + if self.__is_torrent: + self.__is_torrent = False + self.gid = api.get_download(gid).followed_by_ids[0] + LOGGER.info(f'Changed gid from {gid} to {self.gid}') + else: + self._listener.onDownloadComplete() + self.__scheduler = None + + def __onDownloadPause(self, api, gid): + if self.gid == gid: + self._listener.onDownloadError('Download stopped by user!') + + def __onDownloadStopped(self, api, gid): + if self.gid == gid: + self._listener.onDownloadError() + + def __onDownloadError(self, api, gid): + with self._resource_lock: + if self.gid == gid: + download = api.get_download(gid) + error = download.error_message + self._listener.onDownloadError(error) + + def add_download(self, link: str, path): + if is_magnet(link): + download = aria2.add_magnet(link, {'dir': path}) + self.__is_torrent = True + else: + download = aria2.add_uris([link], {'dir': path}) + if download.name.endswith('.torrent'): + self.__is_torrent = True + self.gid = download.gid + aria2.listen_to_notifications(threaded=True, on_download_start=self._listener.onDownloadStarted, + on_download_error=self.__onDownloadError, + on_download_complete=self.__onDownloadComplete) + threading.Thread(target=self.__updater).start() + + def cancel_download(self): + # Returns None if successfully cancelled, else error string + download = aria2.get_download(self.gid) + try: + download.pause(force=True) + except ClientException: + return 'Unable to cancel download! Internal error.' diff --git a/bot/helper/mirror_utils/download_helper.py b/bot/helper/mirror_utils/download_helper.py new file mode 100644 index 0000000..f2ec29a --- /dev/null +++ b/bot/helper/mirror_utils/download_helper.py @@ -0,0 +1,28 @@ +# An abstract class which will be inherited by the tool specific classes like aria2_helper or mega_download_helper +import threading + + +class MethodNotImplementedError(NotImplementedError): + def __int__(self): + super(self, 'Not implemented method') + + +class DownloadHelper: + def __int__(self, listener): + self.name = '' # Name of the download; empty string if no download has been started + self.size = 0.0 # Size of the download + self.downloaded_bytes = 0.0 # Bytes downloaded + self.speed = 0.0 # Download speed in bytes per second + self.progress = 0.0 + self.progress_string = '0.00%' + self.eta = 0 # Estimated time of download complete + self.eta_string = '0s' + self._listener = listener # A listener class which have event callbacks + self._resource_lock = threading.Lock() + + def add_download(self, link: str, path): + raise MethodNotImplementedError + + def cancel_download(self): + # Returns None if successfully cancelled, else error string + raise MethodNotImplementedError diff --git a/bot/helper/mirror_utils/download_tools.py b/bot/helper/mirror_utils/download_tools.py deleted file mode 100644 index f4ae591..0000000 --- a/bot/helper/mirror_utils/download_tools.py +++ /dev/null @@ -1,91 +0,0 @@ -from time import sleep -from bot import DOWNLOAD_DIR, DOWNLOAD_STATUS_UPDATE_INTERVAL, aria2 -from .download_status import DownloadStatus -from bot.helper.ext_utils.bot_utils import * -from bot.helper.ext_utils.exceptions import KillThreadException - - -class DownloadHelper: - - def __init__(self, listener=None): - self.__listener = listener - self.__is_torrent = False - - def add_download(self, link: str): - if is_magnet(link): - download = aria2.add_magnet(link, {'dir': DOWNLOAD_DIR + str(self.__listener.uid)}) - self.__is_torrent = True - else: - if link.endswith('.torrent'): - self.__is_torrent = True - download = aria2.add_uris([link], {'dir': DOWNLOAD_DIR + str(self.__listener.uid)}) - with download_dict_lock: - download_dict[self.__listener.message.message_id] = DownloadStatus(download.gid, - self.__listener.uid) - self.__listener.onDownloadStarted(link) - self.__update_download_status() - - def __get_download(self): - return get_download(self.__listener.uid) - - def __get_followed_download_gid(self): - download = self.__get_download() - if len(download.followed_by_ids) != 0: - return download.followed_by_ids[0] - return None - - def __update_download_status(self): - status_list = get_download_status_list() - index = get_download_index(status_list, self.__get_download().gid) - # This tracks if message exists or did it get replaced by other status message - should_update = True - if self.__is_torrent: - # Waiting for the actual gid - download = self.__get_download() - while not download.is_complete: # Check every few seconds - status_list = get_download_status_list() - index = get_download_index(status_list, self.__get_download().gid) - download = self.__get_download() - if download.has_failed: - self.__listener.onDownloadError(download.error_message, status_list, index) - return - if download.is_paused: - self.__listener.onDownloadError("Download cancelled manually by user", status_list, index) - return - if should_update: - try: - self.__listener.onDownloadProgress(get_download_status_list(), index) - except KillThreadException: - should_update = False - sleep(DOWNLOAD_STATUS_UPDATE_INTERVAL) - new_gid = self.__get_followed_download_gid() - with download_dict_lock: - LOGGER.info(f"{download.name}: Changing GID {download.gid} to {new_gid}") - download_dict[self.__listener.message.message_id] = DownloadStatus(new_gid, - self.__listener.message.message_id) - - # Start tracking the actual download - previous = None - download = self.__get_download() - while not download.is_complete: - status_list = get_download_status_list() - index = get_download_index(status_list, self.__get_download().gid) - if download.has_failed: - self.__listener.onDownloadError(self.__get_download().error_message, status_list, index) - return - if download.is_paused: - self.__listener.onDownloadError("Download has been canceled", status_list, index) - return - if should_update: - # TODO: Find a better way to differentiate between 2 list of objects - progress_str_list = get_download_str() - if progress_str_list != previous: - try: - self.__listener.onDownloadProgress(status_list, index) - except KillThreadException: - should_update = False - previous = progress_str_list - sleep(DOWNLOAD_STATUS_UPDATE_INTERVAL) - download = self.__get_download() - - self.__listener.onDownloadComplete(status_list, index) diff --git a/bot/helper/mirror_utils/gdriveTools.py b/bot/helper/mirror_utils/gdriveTools.py index 897bade..dcc779e 100644 --- a/bot/helper/mirror_utils/gdriveTools.py +++ b/bot/helper/mirror_utils/gdriveTools.py @@ -8,7 +8,7 @@ import time from bot import LOGGER, parent_id, DOWNLOAD_DIR, DOWNLOAD_STATUS_UPDATE_INTERVAL from bot.helper.ext_utils.fs_utils import get_mime_type from bot.helper.ext_utils.bot_utils import * -from bot.helper.ext_utils.exceptions import KillThreadException +from bot.helper.ext_utils.exceptions import MessageDeletedError import threading logging.getLogger('googleapiclient.discovery').setLevel(logging.ERROR) @@ -64,7 +64,7 @@ class GoogleDriveHelper: _list = get_download_status_list() index = get_download_index(_list, get_download(self.__listener.message.message_id).gid) self.__listener.onUploadProgress(_list, index) - except KillThreadException as e: + except MessageDeletedError as e: LOGGER.info(f'Stopped calling onDownloadProgress(): {str(e)}') # TODO: Find a way to know if the Error is actually about message not found and not found # self._should_update = False diff --git a/bot/helper/mirror_utils/listeners.py b/bot/helper/mirror_utils/listeners.py index 749aa2a..ca2d750 100644 --- a/bot/helper/mirror_utils/listeners.py +++ b/bot/helper/mirror_utils/listeners.py @@ -5,16 +5,16 @@ class MirrorListeners: self.message = update.message self.uid = self.message.message_id - def onDownloadStarted(self, link: str): + def onDownloadStarted(self): raise NotImplementedError - def onDownloadProgress(self, progress_status_list: list, index: int): + def onDownloadProgress(self): raise NotImplementedError - def onDownloadComplete(self, progress_status_list: list, index: int): + def onDownloadComplete(self): raise NotImplementedError - def onDownloadError(self, error: str, progress_status_list: list, index: int): + def onDownloadError(self, error: str): raise NotImplementedError def onUploadStarted(self, progress_status_list: list, index: int): diff --git a/bot/helper/telegram_helper/message_utils.py b/bot/helper/telegram_helper/message_utils.py index fe70d61..9e3296b 100644 --- a/bot/helper/telegram_helper/message_utils.py +++ b/bot/helper/telegram_helper/message_utils.py @@ -1,7 +1,9 @@ from telegram.message import Message from telegram.update import Update import time -from bot import AUTO_DELETE_MESSAGE_DURATION, LOGGER +from bot import AUTO_DELETE_MESSAGE_DURATION, LOGGER, bot, \ + status_reply_dict, status_reply_dict_lock, download_dict_lock, download_dict +from bot.helper.ext_utils.bot_utils import get_readable_message from telegram.error import TimedOut @@ -11,15 +13,14 @@ def sendMessage(text: str, context, update: Update): text=text, parse_mode='HTMl') -def editMessage(text: str, context, message: Message): +def editMessage(text: str, message: Message): try: - context.bot.edit_message_text(text=text, message_id=message.message_id, - chat_id=message.chat.id, - parse_mode='HTMl') + bot.edit_message_text(text=text, message_id=message.message_id, + chat_id=message.chat.id, + parse_mode='HTMl') except TimedOut as e: LOGGER.error(str(e)) - pass - + pass def deleteMessage(context, message: Message): @@ -43,3 +44,10 @@ def auto_delete_message(context, cmd_message: Message, bot_message: Message): deleteMessage(context, bot_message) except AttributeError: pass + + +def update_all_messages(): + msg = get_readable_message() + with status_reply_dict_lock: + for message in list(status_reply_dict.values()): + editMessage(msg, message) diff --git a/bot/modules/mirror.py b/bot/modules/mirror.py index bbe6d5a..2fccd03 100644 --- a/bot/modules/mirror.py +++ b/bot/modules/mirror.py @@ -1,52 +1,44 @@ from telegram.ext import CommandHandler, run_async from telegram.error import BadRequest, TimedOut -from bot.helper.mirror_utils import download_tools, gdriveTools, listeners +from bot.helper.mirror_utils import aria2_download, gdriveTools, listeners from bot import LOGGER, dispatcher, DOWNLOAD_DIR from bot.helper.ext_utils import fs_utils, bot_utils from bot import download_dict, status_reply_dict, status_reply_dict_lock, download_dict_lock from bot.helper.telegram_helper.message_utils import * from bot.helper.ext_utils.bot_utils import get_readable_message, MirrorStatus -from bot.helper.ext_utils.exceptions import KillThreadException +from bot.helper.ext_utils.exceptions import MessageDeletedError from bot.helper.telegram_helper.filters import CustomFilters from bot.helper.telegram_helper.bot_commands import BotCommands import pathlib -import threading class MirrorListener(listeners.MirrorListeners): def __init__(self, context, update, isTar=False): super().__init__(context, update) self.isTar = isTar + self.reply_message = None - def onDownloadStarted(self, link): - LOGGER.info(f"Adding link: {link}") - self.reply_message = sendMessage(bot_utils.get_readable_message(), self.context, self.update) + def onDownloadStarted(self): + self.reply_message = sendMessage(bot_utils.get_readable_message(), self.context, self.update) with status_reply_dict_lock: status_reply_dict[self.update.effective_chat.id] = self.reply_message - def onDownloadProgress(self, progress_status_list: list, index: int): - msg = get_readable_message(progress_status_list) - if progress_status_list[index].status() == MirrorStatus.STATUS_CANCELLED: - editMessage(msg, self.context, self.reply_message) - raise KillThreadException('Mirror cancelled by user') - # LOGGER.info("Editing message") - try: - editMessage(msg, self.context, self.reply_message) - except BadRequest: - raise KillThreadException('Message deleted. Terminate thread') + def onDownloadProgress(self): + # We are handling this on our own! + pass - def onDownloadComplete(self, progress_status_list, index: int): - LOGGER.info(f"Download completed: {progress_status_list[index].name()}") - if self.isTar: - with download_dict_lock: + def onDownloadComplete(self): + with download_dict_lock: + LOGGER.info(f"Download completed: {download_dict[self.uid].name()}") + if self.isTar: download_dict[self.uid].is_archiving = True - try: - path = fs_utils.tar(f'{DOWNLOAD_DIR}{self.uid}/{progress_status_list[index].name()}') - except FileNotFoundError: - self.onUploadError('Download cancelled!', progress_status_list, index) - return - else: - path = f'{DOWNLOAD_DIR}{self.uid}/{progress_status_list[index].name()}' + try: + path = fs_utils.tar(f'{DOWNLOAD_DIR}{self.uid}/{download_dict[self.uid].name()}') + except FileNotFoundError: + self.onUploadError('Download cancelled!', download_dict, self.uid) + return + else: + path = f'{DOWNLOAD_DIR}{self.uid}/{download_dict[self.uid].name()}' name = pathlib.PurePath(path).name with download_dict_lock: download_dict[self.uid].is_archiving = False @@ -57,7 +49,7 @@ class MirrorListener(listeners.MirrorListeners): download_dict[self.uid].upload_helper = gdrive gdrive.upload(name) - def onDownloadError(self, error, progress_status_list: list, index: int): + def onDownloadError(self, error): LOGGER.error(error) with status_reply_dict_lock: if len(status_reply_dict) == 1: @@ -70,21 +62,20 @@ class MirrorListener(listeners.MirrorListeners): del status_reply_dict[self.update.effective_chat.id] except KeyError: pass - with download_dict_lock: - LOGGER.info(f"Deleting {progress_status_list[index].name()} from download_dict.") - try: - del download_dict[self.uid] - except KeyError as e: - LOGGER.info(str(e)) - pass try: - fs_utils.clean_download(progress_status_list[index].path()) + with download_dict_lock: + LOGGER.info(f"Deleting folder: {download_dict[self.uid].path()}") + fs_utils.clean_download(download_dict[self.uid].path()) + LOGGER.info(f"Deleting {download_dict[self.uid].name()} from download_dict.") + del download_dict[self.uid] except FileNotFoundError: pass + except KeyError as e: + LOGGER.info(str(e)) if self.message.from_user.username: uname = f"@{self.message.from_user.username}" else: - uname = f'{self.message.from_user.first_name}' + uname = f'{self.message.from_user.first_name}' msg = f"{uname} your download has been stopped due to: {error}" sendMessage(msg, self.context, self.update) @@ -127,7 +118,7 @@ class MirrorListener(listeners.MirrorListeners): try: editMessage(msg, self.context, self.reply_message) except BadRequest as e: - raise KillThreadException(str(e)) + raise MessageDeletedError(str(e)) except TimedOut: pass @@ -160,10 +151,10 @@ def _mirror(update, context, isTar=False): except BadRequest: pass listener = MirrorListener(context, update, isTar) - aria = download_tools.DownloadHelper(listener) - t = threading.Thread(target=aria.add_download, args=(link,)) - t.start() - + aria = aria2_download.AriaDownloadHelper(listener) + with download_dict_lock: + download_dict[listener.uid] = aria + aria.add_download(link, f'{DOWNLOAD_DIR}/{listener.uid}/') @run_async def mirror(update, context): @@ -175,7 +166,8 @@ def tar_mirror(update, context): _mirror(update, context, True) -mirror_handler = CommandHandler(BotCommands.MirrorCommand, mirror, filters=CustomFilters.authorized_chat | CustomFilters.authorized_user) +mirror_handler = CommandHandler(BotCommands.MirrorCommand, mirror, + filters=CustomFilters.authorized_chat | CustomFilters.authorized_user) tar_mirror_handler = CommandHandler(BotCommands.TarMirrorCommand, tar_mirror, filters=CustomFilters.authorized_chat | CustomFilters.authorized_user) dispatcher.add_handler(mirror_handler)