diff --git a/bot/__init__.py b/bot/__init__.py
index 4507171..1555825 100644
--- a/bot/__init__.py
+++ b/bot/__init__.py
@@ -3,6 +3,7 @@ import configparser
import aria2p
import threading
from telegram.ext import Updater
+from telegram import Bot
import os
import time
@@ -72,5 +73,6 @@ except KeyError as e:
LOGGER.error("One or more env variables missing! Exiting now")
exit(1)
+bot = Bot(BOT_TOKEN)
updater = Updater(token=BOT_TOKEN, use_context=True)
dispatcher = updater.dispatcher
diff --git a/bot/helper/ext_utils/bot_utils.py b/bot/helper/ext_utils/bot_utils.py
index 273804f..c941d93 100644
--- a/bot/helper/ext_utils/bot_utils.py
+++ b/bot/helper/ext_utils/bot_utils.py
@@ -81,17 +81,16 @@ def get_download_str():
return result
-def get_readable_message(progress_list: list = None):
- if progress_list is None:
- with download_dict_lock:
- progress_list = list(download_dict.values())
+def get_readable_message():
+ with download_dict_lock:
+ progress_list = list(download_dict.values())
msg = ''
- for status in progress_list:
- msg += f'Name: {status.name()}\n' \
- f'Status: {status.status()}\n' \
- f'{get_progress_bar_string(status)} {status.progress()} of {status.size()}\n' \
- f'Speed: {status.speed()}\n' \
- f'ETA: {status.eta()}\n\n'
+ for download in progress_list:
+ msg += f'Name: {download.name()}\n' \
+ f'Status: {download.status()}\n' \
+ f'{get_progress_bar_string(download)} {download.progress()} of {download.size()}\n' \
+ f'Speed: {download.speed()}\n' \
+ f'ETA: {download.eta()}\n\n'
# LOGGER.info(msg)
return msg
diff --git a/bot/helper/ext_utils/exceptions.py b/bot/helper/ext_utils/exceptions.py
index e39e15a..0181b51 100644
--- a/bot/helper/ext_utils/exceptions.py
+++ b/bot/helper/ext_utils/exceptions.py
@@ -2,7 +2,7 @@ class DriveAuthError(Exception):
pass
-class KillThreadException(Exception):
+class MessageDeletedError(Exception):
""" Custom Exception class for killing thread as soon as they aren't needed"""
def __init__(self, message, error=None):
diff --git a/bot/helper/mirror_utils/aria2_download.py b/bot/helper/mirror_utils/aria2_download.py
new file mode 100644
index 0000000..9a19557
--- /dev/null
+++ b/bot/helper/mirror_utils/aria2_download.py
@@ -0,0 +1,90 @@
+from bot import DOWNLOAD_STATUS_UPDATE_INTERVAL, aria2
+from bot.helper.ext_utils.bot_utils import *
+from .download_helper import DownloadHelper
+from .download_status import DownloadStatus
+import threading
+from aria2p import API, ClientException
+import schedule
+import time
+
+
+class AriaDownloadHelper(DownloadHelper):
+
+ def __init__(self, listener):
+ super(AriaDownloadHelper, self).__init__(listener)
+ self.__is_torrent = False
+ self.gid = None
+ self.__scheduler = schedule.every(DOWNLOAD_STATUS_UPDATE_INTERVAL).seconds.do(self.__onDownloadProgress())
+
+ def __updater(self):
+ while True:
+ with self._resource_lock:
+ if self.__scheduler is None:
+ break
+ schedule.run_pending()
+ time.sleep(1)
+
+ def __onDownloadStarted(self, api, gid):
+ with self._resource_lock:
+ if self.gid == gid:
+ download = api.get_download(gid)
+ self.name = download.name
+ self.size = download.length
+ self._listener.onDownloadStarted()
+ self.should_update = True
+
+ def __onDownloadProgress(self):
+ with self._resource_lock:
+ download = aria2.get_download(self.gid)
+ self.progress = download.progress
+ self.progress_string = download.progress_string
+ self.eta_string = download.eta_string
+ self.eta = download.eta
+
+ def __onDownloadComplete(self, api: API, gid):
+ with self._resource_lock:
+ if self.gid == gid:
+ if self.__is_torrent:
+ self.__is_torrent = False
+ self.gid = api.get_download(gid).followed_by_ids[0]
+ LOGGER.info(f'Changed gid from {gid} to {self.gid}')
+ else:
+ self._listener.onDownloadComplete()
+ self.__scheduler = None
+
+ def __onDownloadPause(self, api, gid):
+ if self.gid == gid:
+ self._listener.onDownloadError('Download stopped by user!')
+
+ def __onDownloadStopped(self, api, gid):
+ if self.gid == gid:
+ self._listener.onDownloadError()
+
+ def __onDownloadError(self, api, gid):
+ with self._resource_lock:
+ if self.gid == gid:
+ download = api.get_download(gid)
+ error = download.error_message
+ self._listener.onDownloadError(error)
+
+ def add_download(self, link: str, path):
+ if is_magnet(link):
+ download = aria2.add_magnet(link, {'dir': path})
+ self.__is_torrent = True
+ else:
+ download = aria2.add_uris([link], {'dir': path})
+ if download.name.endswith('.torrent'):
+ self.__is_torrent = True
+ self.gid = download.gid
+ aria2.listen_to_notifications(threaded=True, on_download_start=self._listener.onDownloadStarted,
+ on_download_error=self.__onDownloadError,
+ on_download_complete=self.__onDownloadComplete)
+ threading.Thread(target=self.__updater).start()
+
+ def cancel_download(self):
+ # Returns None if successfully cancelled, else error string
+ download = aria2.get_download(self.gid)
+ try:
+ download.pause(force=True)
+ except ClientException:
+ return 'Unable to cancel download! Internal error.'
diff --git a/bot/helper/mirror_utils/download_helper.py b/bot/helper/mirror_utils/download_helper.py
new file mode 100644
index 0000000..f2ec29a
--- /dev/null
+++ b/bot/helper/mirror_utils/download_helper.py
@@ -0,0 +1,28 @@
+# An abstract class which will be inherited by the tool specific classes like aria2_helper or mega_download_helper
+import threading
+
+
+class MethodNotImplementedError(NotImplementedError):
+ def __int__(self):
+ super(self, 'Not implemented method')
+
+
+class DownloadHelper:
+ def __int__(self, listener):
+ self.name = '' # Name of the download; empty string if no download has been started
+ self.size = 0.0 # Size of the download
+ self.downloaded_bytes = 0.0 # Bytes downloaded
+ self.speed = 0.0 # Download speed in bytes per second
+ self.progress = 0.0
+ self.progress_string = '0.00%'
+ self.eta = 0 # Estimated time of download complete
+ self.eta_string = '0s'
+ self._listener = listener # A listener class which have event callbacks
+ self._resource_lock = threading.Lock()
+
+ def add_download(self, link: str, path):
+ raise MethodNotImplementedError
+
+ def cancel_download(self):
+ # Returns None if successfully cancelled, else error string
+ raise MethodNotImplementedError
diff --git a/bot/helper/mirror_utils/download_tools.py b/bot/helper/mirror_utils/download_tools.py
deleted file mode 100644
index f4ae591..0000000
--- a/bot/helper/mirror_utils/download_tools.py
+++ /dev/null
@@ -1,91 +0,0 @@
-from time import sleep
-from bot import DOWNLOAD_DIR, DOWNLOAD_STATUS_UPDATE_INTERVAL, aria2
-from .download_status import DownloadStatus
-from bot.helper.ext_utils.bot_utils import *
-from bot.helper.ext_utils.exceptions import KillThreadException
-
-
-class DownloadHelper:
-
- def __init__(self, listener=None):
- self.__listener = listener
- self.__is_torrent = False
-
- def add_download(self, link: str):
- if is_magnet(link):
- download = aria2.add_magnet(link, {'dir': DOWNLOAD_DIR + str(self.__listener.uid)})
- self.__is_torrent = True
- else:
- if link.endswith('.torrent'):
- self.__is_torrent = True
- download = aria2.add_uris([link], {'dir': DOWNLOAD_DIR + str(self.__listener.uid)})
- with download_dict_lock:
- download_dict[self.__listener.message.message_id] = DownloadStatus(download.gid,
- self.__listener.uid)
- self.__listener.onDownloadStarted(link)
- self.__update_download_status()
-
- def __get_download(self):
- return get_download(self.__listener.uid)
-
- def __get_followed_download_gid(self):
- download = self.__get_download()
- if len(download.followed_by_ids) != 0:
- return download.followed_by_ids[0]
- return None
-
- def __update_download_status(self):
- status_list = get_download_status_list()
- index = get_download_index(status_list, self.__get_download().gid)
- # This tracks if message exists or did it get replaced by other status message
- should_update = True
- if self.__is_torrent:
- # Waiting for the actual gid
- download = self.__get_download()
- while not download.is_complete: # Check every few seconds
- status_list = get_download_status_list()
- index = get_download_index(status_list, self.__get_download().gid)
- download = self.__get_download()
- if download.has_failed:
- self.__listener.onDownloadError(download.error_message, status_list, index)
- return
- if download.is_paused:
- self.__listener.onDownloadError("Download cancelled manually by user", status_list, index)
- return
- if should_update:
- try:
- self.__listener.onDownloadProgress(get_download_status_list(), index)
- except KillThreadException:
- should_update = False
- sleep(DOWNLOAD_STATUS_UPDATE_INTERVAL)
- new_gid = self.__get_followed_download_gid()
- with download_dict_lock:
- LOGGER.info(f"{download.name}: Changing GID {download.gid} to {new_gid}")
- download_dict[self.__listener.message.message_id] = DownloadStatus(new_gid,
- self.__listener.message.message_id)
-
- # Start tracking the actual download
- previous = None
- download = self.__get_download()
- while not download.is_complete:
- status_list = get_download_status_list()
- index = get_download_index(status_list, self.__get_download().gid)
- if download.has_failed:
- self.__listener.onDownloadError(self.__get_download().error_message, status_list, index)
- return
- if download.is_paused:
- self.__listener.onDownloadError("Download has been canceled", status_list, index)
- return
- if should_update:
- # TODO: Find a better way to differentiate between 2 list of objects
- progress_str_list = get_download_str()
- if progress_str_list != previous:
- try:
- self.__listener.onDownloadProgress(status_list, index)
- except KillThreadException:
- should_update = False
- previous = progress_str_list
- sleep(DOWNLOAD_STATUS_UPDATE_INTERVAL)
- download = self.__get_download()
-
- self.__listener.onDownloadComplete(status_list, index)
diff --git a/bot/helper/mirror_utils/gdriveTools.py b/bot/helper/mirror_utils/gdriveTools.py
index 897bade..dcc779e 100644
--- a/bot/helper/mirror_utils/gdriveTools.py
+++ b/bot/helper/mirror_utils/gdriveTools.py
@@ -8,7 +8,7 @@ import time
from bot import LOGGER, parent_id, DOWNLOAD_DIR, DOWNLOAD_STATUS_UPDATE_INTERVAL
from bot.helper.ext_utils.fs_utils import get_mime_type
from bot.helper.ext_utils.bot_utils import *
-from bot.helper.ext_utils.exceptions import KillThreadException
+from bot.helper.ext_utils.exceptions import MessageDeletedError
import threading
logging.getLogger('googleapiclient.discovery').setLevel(logging.ERROR)
@@ -64,7 +64,7 @@ class GoogleDriveHelper:
_list = get_download_status_list()
index = get_download_index(_list, get_download(self.__listener.message.message_id).gid)
self.__listener.onUploadProgress(_list, index)
- except KillThreadException as e:
+ except MessageDeletedError as e:
LOGGER.info(f'Stopped calling onDownloadProgress(): {str(e)}')
# TODO: Find a way to know if the Error is actually about message not found and not found
# self._should_update = False
diff --git a/bot/helper/mirror_utils/listeners.py b/bot/helper/mirror_utils/listeners.py
index 749aa2a..ca2d750 100644
--- a/bot/helper/mirror_utils/listeners.py
+++ b/bot/helper/mirror_utils/listeners.py
@@ -5,16 +5,16 @@ class MirrorListeners:
self.message = update.message
self.uid = self.message.message_id
- def onDownloadStarted(self, link: str):
+ def onDownloadStarted(self):
raise NotImplementedError
- def onDownloadProgress(self, progress_status_list: list, index: int):
+ def onDownloadProgress(self):
raise NotImplementedError
- def onDownloadComplete(self, progress_status_list: list, index: int):
+ def onDownloadComplete(self):
raise NotImplementedError
- def onDownloadError(self, error: str, progress_status_list: list, index: int):
+ def onDownloadError(self, error: str):
raise NotImplementedError
def onUploadStarted(self, progress_status_list: list, index: int):
diff --git a/bot/helper/telegram_helper/message_utils.py b/bot/helper/telegram_helper/message_utils.py
index fe70d61..9e3296b 100644
--- a/bot/helper/telegram_helper/message_utils.py
+++ b/bot/helper/telegram_helper/message_utils.py
@@ -1,7 +1,9 @@
from telegram.message import Message
from telegram.update import Update
import time
-from bot import AUTO_DELETE_MESSAGE_DURATION, LOGGER
+from bot import AUTO_DELETE_MESSAGE_DURATION, LOGGER, bot, \
+ status_reply_dict, status_reply_dict_lock, download_dict_lock, download_dict
+from bot.helper.ext_utils.bot_utils import get_readable_message
from telegram.error import TimedOut
@@ -11,15 +13,14 @@ def sendMessage(text: str, context, update: Update):
text=text, parse_mode='HTMl')
-def editMessage(text: str, context, message: Message):
+def editMessage(text: str, message: Message):
try:
- context.bot.edit_message_text(text=text, message_id=message.message_id,
- chat_id=message.chat.id,
- parse_mode='HTMl')
+ bot.edit_message_text(text=text, message_id=message.message_id,
+ chat_id=message.chat.id,
+ parse_mode='HTMl')
except TimedOut as e:
LOGGER.error(str(e))
- pass
-
+ pass
def deleteMessage(context, message: Message):
@@ -43,3 +44,10 @@ def auto_delete_message(context, cmd_message: Message, bot_message: Message):
deleteMessage(context, bot_message)
except AttributeError:
pass
+
+
+def update_all_messages():
+ msg = get_readable_message()
+ with status_reply_dict_lock:
+ for message in list(status_reply_dict.values()):
+ editMessage(msg, message)
diff --git a/bot/modules/mirror.py b/bot/modules/mirror.py
index bbe6d5a..2fccd03 100644
--- a/bot/modules/mirror.py
+++ b/bot/modules/mirror.py
@@ -1,52 +1,44 @@
from telegram.ext import CommandHandler, run_async
from telegram.error import BadRequest, TimedOut
-from bot.helper.mirror_utils import download_tools, gdriveTools, listeners
+from bot.helper.mirror_utils import aria2_download, gdriveTools, listeners
from bot import LOGGER, dispatcher, DOWNLOAD_DIR
from bot.helper.ext_utils import fs_utils, bot_utils
from bot import download_dict, status_reply_dict, status_reply_dict_lock, download_dict_lock
from bot.helper.telegram_helper.message_utils import *
from bot.helper.ext_utils.bot_utils import get_readable_message, MirrorStatus
-from bot.helper.ext_utils.exceptions import KillThreadException
+from bot.helper.ext_utils.exceptions import MessageDeletedError
from bot.helper.telegram_helper.filters import CustomFilters
from bot.helper.telegram_helper.bot_commands import BotCommands
import pathlib
-import threading
class MirrorListener(listeners.MirrorListeners):
def __init__(self, context, update, isTar=False):
super().__init__(context, update)
self.isTar = isTar
+ self.reply_message = None
- def onDownloadStarted(self, link):
- LOGGER.info(f"Adding link: {link}")
- self.reply_message = sendMessage(bot_utils.get_readable_message(), self.context, self.update)
+ def onDownloadStarted(self):
+ self.reply_message = sendMessage(bot_utils.get_readable_message(), self.context, self.update)
with status_reply_dict_lock:
status_reply_dict[self.update.effective_chat.id] = self.reply_message
- def onDownloadProgress(self, progress_status_list: list, index: int):
- msg = get_readable_message(progress_status_list)
- if progress_status_list[index].status() == MirrorStatus.STATUS_CANCELLED:
- editMessage(msg, self.context, self.reply_message)
- raise KillThreadException('Mirror cancelled by user')
- # LOGGER.info("Editing message")
- try:
- editMessage(msg, self.context, self.reply_message)
- except BadRequest:
- raise KillThreadException('Message deleted. Terminate thread')
+ def onDownloadProgress(self):
+ # We are handling this on our own!
+ pass
- def onDownloadComplete(self, progress_status_list, index: int):
- LOGGER.info(f"Download completed: {progress_status_list[index].name()}")
- if self.isTar:
- with download_dict_lock:
+ def onDownloadComplete(self):
+ with download_dict_lock:
+ LOGGER.info(f"Download completed: {download_dict[self.uid].name()}")
+ if self.isTar:
download_dict[self.uid].is_archiving = True
- try:
- path = fs_utils.tar(f'{DOWNLOAD_DIR}{self.uid}/{progress_status_list[index].name()}')
- except FileNotFoundError:
- self.onUploadError('Download cancelled!', progress_status_list, index)
- return
- else:
- path = f'{DOWNLOAD_DIR}{self.uid}/{progress_status_list[index].name()}'
+ try:
+ path = fs_utils.tar(f'{DOWNLOAD_DIR}{self.uid}/{download_dict[self.uid].name()}')
+ except FileNotFoundError:
+ self.onUploadError('Download cancelled!', download_dict, self.uid)
+ return
+ else:
+ path = f'{DOWNLOAD_DIR}{self.uid}/{download_dict[self.uid].name()}'
name = pathlib.PurePath(path).name
with download_dict_lock:
download_dict[self.uid].is_archiving = False
@@ -57,7 +49,7 @@ class MirrorListener(listeners.MirrorListeners):
download_dict[self.uid].upload_helper = gdrive
gdrive.upload(name)
- def onDownloadError(self, error, progress_status_list: list, index: int):
+ def onDownloadError(self, error):
LOGGER.error(error)
with status_reply_dict_lock:
if len(status_reply_dict) == 1:
@@ -70,21 +62,20 @@ class MirrorListener(listeners.MirrorListeners):
del status_reply_dict[self.update.effective_chat.id]
except KeyError:
pass
- with download_dict_lock:
- LOGGER.info(f"Deleting {progress_status_list[index].name()} from download_dict.")
- try:
- del download_dict[self.uid]
- except KeyError as e:
- LOGGER.info(str(e))
- pass
try:
- fs_utils.clean_download(progress_status_list[index].path())
+ with download_dict_lock:
+ LOGGER.info(f"Deleting folder: {download_dict[self.uid].path()}")
+ fs_utils.clean_download(download_dict[self.uid].path())
+ LOGGER.info(f"Deleting {download_dict[self.uid].name()} from download_dict.")
+ del download_dict[self.uid]
except FileNotFoundError:
pass
+ except KeyError as e:
+ LOGGER.info(str(e))
if self.message.from_user.username:
uname = f"@{self.message.from_user.username}"
else:
- uname = f'{self.message.from_user.first_name}'
+ uname = f'{self.message.from_user.first_name}'
msg = f"{uname} your download has been stopped due to: {error}"
sendMessage(msg, self.context, self.update)
@@ -127,7 +118,7 @@ class MirrorListener(listeners.MirrorListeners):
try:
editMessage(msg, self.context, self.reply_message)
except BadRequest as e:
- raise KillThreadException(str(e))
+ raise MessageDeletedError(str(e))
except TimedOut:
pass
@@ -160,10 +151,10 @@ def _mirror(update, context, isTar=False):
except BadRequest:
pass
listener = MirrorListener(context, update, isTar)
- aria = download_tools.DownloadHelper(listener)
- t = threading.Thread(target=aria.add_download, args=(link,))
- t.start()
-
+ aria = aria2_download.AriaDownloadHelper(listener)
+ with download_dict_lock:
+ download_dict[listener.uid] = aria
+ aria.add_download(link, f'{DOWNLOAD_DIR}/{listener.uid}/')
@run_async
def mirror(update, context):
@@ -175,7 +166,8 @@ def tar_mirror(update, context):
_mirror(update, context, True)
-mirror_handler = CommandHandler(BotCommands.MirrorCommand, mirror, filters=CustomFilters.authorized_chat | CustomFilters.authorized_user)
+mirror_handler = CommandHandler(BotCommands.MirrorCommand, mirror,
+ filters=CustomFilters.authorized_chat | CustomFilters.authorized_user)
tar_mirror_handler = CommandHandler(BotCommands.TarMirrorCommand, tar_mirror,
filters=CustomFilters.authorized_chat | CustomFilters.authorized_user)
dispatcher.add_handler(mirror_handler)