Implement speed and ETA for upload
Signed-off-by: lzzy12 <jhashivam2020@gmail.com>
This commit is contained in:
parent
2cf2cd4cd6
commit
1e3b5578d5
|
|
@ -1,9 +1,31 @@
|
||||||
from bot import download_dict
|
from bot import download_dict
|
||||||
from bot.helper.download_status import DownloadStatus
|
|
||||||
|
|
||||||
|
class MirrorStatus:
|
||||||
|
|
||||||
|
STATUS_UPLOADING = "Uploading"
|
||||||
|
STATUS_DOWNLOADING = "Downloading"
|
||||||
|
STATUS_WAITING = "Queued"
|
||||||
|
STATUS_FAILED = "Failed. Cleaning download"
|
||||||
|
STATUS_CANCELLED = "Cancelled"
|
||||||
|
|
||||||
|
|
||||||
PROGRESS_MAX_SIZE = 100 // 8
|
PROGRESS_MAX_SIZE = 100 // 8
|
||||||
PROGRESS_INCOMPLETE = ['▏', '▎', '▍', '▌', '▋', '▊', '▉']
|
PROGRESS_INCOMPLETE = ['▏', '▎', '▍', '▌', '▋', '▊', '▉']
|
||||||
|
|
||||||
|
SIZE_UNITS = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
|
||||||
|
|
||||||
|
|
||||||
|
def get_readable_file_size(size_in_bytes) -> str:
|
||||||
|
index = 0
|
||||||
|
while size_in_bytes >= 1024:
|
||||||
|
size_in_bytes /= 1024
|
||||||
|
index += 1
|
||||||
|
try:
|
||||||
|
return f'{round(size_in_bytes, 2)} {SIZE_UNITS[index]}'
|
||||||
|
except IndexError:
|
||||||
|
return 'File too large'
|
||||||
|
|
||||||
|
|
||||||
def get_download(message_id):
|
def get_download(message_id):
|
||||||
return download_dict[message_id].download()
|
return download_dict[message_id].download()
|
||||||
|
|
@ -13,7 +35,10 @@ def get_download_status_list():
|
||||||
return list(download_dict.values())
|
return list(download_dict.values())
|
||||||
|
|
||||||
|
|
||||||
def get_progress_bar_string(status: DownloadStatus):
|
def get_progress_bar_string(status):
|
||||||
|
if status.status() == MirrorStatus.STATUS_UPLOADING:
|
||||||
|
completed = status.uploaded_bytes/8
|
||||||
|
else:
|
||||||
completed = status.download().completed_length/8
|
completed = status.download().completed_length/8
|
||||||
total = status.download().total_length/8
|
total = status.download().total_length/8
|
||||||
if total == 0:
|
if total == 0:
|
||||||
|
|
@ -50,18 +75,8 @@ def get_readable_message(progress_list: list = download_dict.values()):
|
||||||
msg = ''
|
msg = ''
|
||||||
for status in progress_list:
|
for status in progress_list:
|
||||||
msg += f'<b>Name:</b> {status.name()}\n' \
|
msg += f'<b>Name:</b> {status.name()}\n' \
|
||||||
f'<b>status:</b> {status.status()}\n'
|
f'<b>status:</b> {status.status()}\n' \
|
||||||
|
f'<code>{get_progress_bar_string(status)}</code> {status.progress()} of {status.size()}\n' \
|
||||||
if status.status() == DownloadStatus.STATUS_DOWNLOADING:
|
|
||||||
msg += f'<code>{get_progress_bar_string(status)}</code> {status.progress()} of {status.size()}\n' \
|
|
||||||
f'<b>Speed:</b> {status.speed()}\n' \
|
f'<b>Speed:</b> {status.speed()}\n' \
|
||||||
f'<b>ETA:</b> {status.eta()}\n'
|
f'<b>ETA:</b> {status.eta()}\n'
|
||||||
msg += '\n'
|
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
|
|
||||||
# Custom Exception class for killing thread as soon as they aren't needed
|
|
||||||
class KillThreadException(Exception):
|
|
||||||
def __init__(self, message, error=None):
|
|
||||||
super().__init__(message)
|
|
||||||
self.error = error
|
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,5 @@
|
||||||
from bot import aria2, download_dict, DOWNLOAD_DIR
|
from bot import aria2, DOWNLOAD_DIR
|
||||||
|
from .bot_utils import get_readable_file_size, MirrorStatus
|
||||||
|
|
||||||
|
|
||||||
def get_download(gid):
|
def get_download(gid):
|
||||||
|
|
@ -6,26 +7,51 @@ def get_download(gid):
|
||||||
|
|
||||||
|
|
||||||
class DownloadStatus:
|
class DownloadStatus:
|
||||||
STATUS_UPLOADING = "Uploading"
|
|
||||||
STATUS_DOWNLOADING = "Downloading"
|
|
||||||
STATUS_WAITING = "Queued"
|
|
||||||
STATUS_FAILED = "Failed. Cleaning download"
|
|
||||||
STATUS_CANCELLED = "Cancelled"
|
|
||||||
|
|
||||||
def __init__(self, gid, message_id):
|
def __init__(self, gid, message_id):
|
||||||
self.__gid = gid
|
self.__gid = gid
|
||||||
self.__download = get_download(gid)
|
self.__download = get_download(gid)
|
||||||
self.__uid = message_id
|
self.__uid = message_id
|
||||||
|
self.uploaded_bytes = 0
|
||||||
|
self.upload_time = 0
|
||||||
|
|
||||||
def __update(self):
|
def __update(self):
|
||||||
self.__download = get_download(self.__gid)
|
self.__download = get_download(self.__gid)
|
||||||
|
|
||||||
def progress(self):
|
def progress(self):
|
||||||
|
"""
|
||||||
|
Calculates the progress of the mirror (upload or download)
|
||||||
|
:return: returns progress in percentage
|
||||||
|
"""
|
||||||
self.__update()
|
self.__update()
|
||||||
|
if self.status() == MirrorStatus.STATUS_UPLOADING:
|
||||||
|
return f'{round(self.upload_progress(), 2)}%'
|
||||||
return self.__download.progress_string()
|
return self.__download.progress_string()
|
||||||
|
|
||||||
|
def upload_progress(self):
|
||||||
|
return self.uploaded_bytes / self.download().total_length * 100
|
||||||
|
|
||||||
|
def __size(self):
|
||||||
|
"""
|
||||||
|
Gets total size of the mirror file/folder
|
||||||
|
:return: total size of mirror
|
||||||
|
"""
|
||||||
|
return self.download().total_length
|
||||||
|
|
||||||
|
def __upload_speed(self):
|
||||||
|
"""
|
||||||
|
Calculates upload speed in bytes/second
|
||||||
|
:return: Upload speed in Bytes/Seconds
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return self.uploaded_bytes / self.upload_time
|
||||||
|
except ZeroDivisionError:
|
||||||
|
return 0
|
||||||
|
|
||||||
def speed(self):
|
def speed(self):
|
||||||
self.__update()
|
self.__update()
|
||||||
|
if self.status() == MirrorStatus.STATUS_UPLOADING:
|
||||||
|
return f'{get_readable_file_size(self.__upload_speed())}/s'
|
||||||
return self.__download.download_speed_string()
|
return self.__download.download_speed_string()
|
||||||
|
|
||||||
def name(self):
|
def name(self):
|
||||||
|
|
@ -39,23 +65,28 @@ class DownloadStatus:
|
||||||
|
|
||||||
def eta(self):
|
def eta(self):
|
||||||
self.__update()
|
self.__update()
|
||||||
|
if self.status() == MirrorStatus.STATUS_UPLOADING:
|
||||||
|
try:
|
||||||
|
return f'{round(self.__size() / self.__upload_speed(), 2)} seconds'
|
||||||
|
except ZeroDivisionError:
|
||||||
|
return '-'
|
||||||
return self.__download.eta_string()
|
return self.__download.eta_string()
|
||||||
|
|
||||||
def status(self):
|
def status(self):
|
||||||
self.__update()
|
self.__update()
|
||||||
status = None
|
status = None
|
||||||
if self.__download.is_waiting:
|
if self.__download.is_waiting:
|
||||||
status = DownloadStatus.STATUS_WAITING
|
status = MirrorStatus.STATUS_WAITING
|
||||||
elif self.download().is_paused:
|
elif self.download().is_paused:
|
||||||
status = DownloadStatus.STATUS_CANCELLED
|
status = MirrorStatus.STATUS_CANCELLED
|
||||||
elif self.__download.is_complete:
|
elif self.__download.is_complete:
|
||||||
# If download exists and is complete the it must be uploading
|
# If download exists and is complete the it must be uploading
|
||||||
# otherwise the gid would have been removed from the download_list
|
# otherwise the gid would have been removed from the download_list
|
||||||
status = DownloadStatus.STATUS_UPLOADING
|
status = MirrorStatus.STATUS_UPLOADING
|
||||||
elif self.__download.has_failed:
|
elif self.__download.has_failed:
|
||||||
status = DownloadStatus.STATUS_FAILED
|
status = MirrorStatus.STATUS_FAILED
|
||||||
elif self.__download.is_active:
|
elif self.__download.is_active:
|
||||||
status = DownloadStatus.STATUS_DOWNLOADING
|
status = MirrorStatus.STATUS_DOWNLOADING
|
||||||
return status
|
return status
|
||||||
|
|
||||||
def download(self):
|
def download(self):
|
||||||
|
|
|
||||||
|
|
@ -1,2 +1,8 @@
|
||||||
class DriveAuthError(Exception):
|
class DriveAuthError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
# Custom Exception class for killing thread as soon as they aren't needed
|
||||||
|
class KillThreadException(Exception):
|
||||||
|
def __init__(self, message, error=None):
|
||||||
|
super().__init__(message)
|
||||||
|
self.error = error
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ import os
|
||||||
from bot import LOGGER, parent_id, DOWNLOAD_DIR
|
from bot import LOGGER, parent_id, DOWNLOAD_DIR
|
||||||
from .fs_utils import get_mime_type
|
from .fs_utils import get_mime_type
|
||||||
from .bot_utils import *
|
from .bot_utils import *
|
||||||
|
import time
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
logging.getLogger('googleapiclient.discovery').setLevel(logging.ERROR)
|
logging.getLogger('googleapiclient.discovery').setLevel(logging.ERROR)
|
||||||
|
|
@ -25,6 +25,8 @@ class GoogleDriveHelper:
|
||||||
self.__G_DRIVE_BASE_DOWNLOAD_URL = "https://drive.google.com/uc?id={}&export=download"
|
self.__G_DRIVE_BASE_DOWNLOAD_URL = "https://drive.google.com/uc?id={}&export=download"
|
||||||
self.__listener = listener
|
self.__listener = listener
|
||||||
self.__service = self.authorize()
|
self.__service = self.authorize()
|
||||||
|
self.uploadedBytes = 0
|
||||||
|
self.start_time = 0
|
||||||
|
|
||||||
def upload_file(self, file_path, file_name, mime_type, parent_id):
|
def upload_file(self, file_path, file_name, mime_type, parent_id):
|
||||||
# File body description
|
# File body description
|
||||||
|
|
@ -47,11 +49,27 @@ class GoogleDriveHelper:
|
||||||
'withLink': True
|
'withLink': True
|
||||||
}
|
}
|
||||||
# Insert a file
|
# Insert a file
|
||||||
drive_file = self.__service.files().create(body=file_metadata, media_body=media_body).execute()
|
drive_file = self.__service.files().create(body=file_metadata, media_body=media_body)
|
||||||
|
response = None
|
||||||
|
_list = get_download_status_list()
|
||||||
|
index = get_download_index(_list, get_download(self.__listener.message.message_id).gid)
|
||||||
|
uploaded_bytes = 0
|
||||||
|
while response is None:
|
||||||
|
status, response = drive_file.next_chunk()
|
||||||
|
time_lapsed = time.time() - self.start_time
|
||||||
|
|
||||||
|
if status:
|
||||||
|
# The iconic formula of speed = distance / time :)
|
||||||
|
LOGGER.info(status.progress() * 100)
|
||||||
|
chunk_size = status.total_size*status.progress() - uploaded_bytes
|
||||||
|
uploaded_bytes = status.total_size*status.progress()
|
||||||
|
download_dict[self.__listener.uid].uploaded_bytes += chunk_size
|
||||||
|
download_dict[self.__listener.uid].upload_time = time_lapsed
|
||||||
|
self.__listener.onUploadProgress(_list, index)
|
||||||
# Insert new permissions
|
# Insert new permissions
|
||||||
self.__service.permissions().create(fileId=drive_file['id'], body=permissions).execute()
|
self.__service.permissions().create(fileId=response['id'], body=permissions).execute()
|
||||||
# Define file instance and get url for download
|
# Define file instance and get url for download
|
||||||
drive_file = self.__service.files().get(fileId=drive_file['id']).execute()
|
drive_file = self.__service.files().get(fileId=response['id']).execute()
|
||||||
download_url = self.__G_DRIVE_BASE_DOWNLOAD_URL.format(drive_file.get('id'))
|
download_url = self.__G_DRIVE_BASE_DOWNLOAD_URL.format(drive_file.get('id'))
|
||||||
return download_url
|
return download_url
|
||||||
|
|
||||||
|
|
@ -61,17 +79,20 @@ class GoogleDriveHelper:
|
||||||
self.__listener.onUploadStarted(_list, index)
|
self.__listener.onUploadStarted(_list, index)
|
||||||
file_dir = f"{DOWNLOAD_DIR}{self.__listener.message.message_id}"
|
file_dir = f"{DOWNLOAD_DIR}{self.__listener.message.message_id}"
|
||||||
file_path = f"{file_dir}/{file_name}"
|
file_path = f"{file_dir}/{file_name}"
|
||||||
link = None
|
|
||||||
LOGGER.info("Uploading File: " + file_name)
|
LOGGER.info("Uploading File: " + file_name)
|
||||||
|
self.start_time = time.time()
|
||||||
if os.path.isfile(file_path):
|
if os.path.isfile(file_path):
|
||||||
mime_type = get_mime_type(file_path)
|
|
||||||
try:
|
try:
|
||||||
|
mime_type = get_mime_type(file_path)
|
||||||
g_drive_link = self.upload_file(file_path, file_name, mime_type, parent_id)
|
g_drive_link = self.upload_file(file_path, file_name, mime_type, parent_id)
|
||||||
LOGGER.info("Uploaded To G-Drive: " + file_path)
|
LOGGER.info("Uploaded To G-Drive: " + file_path)
|
||||||
link = g_drive_link
|
link = g_drive_link
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOGGER.error(str(e))
|
LOGGER.error(str(e))
|
||||||
self.__listener.onUploadError(str(e), _list, index)
|
e_str = str(e).replace('<', '')
|
||||||
|
e_str = e_str.replace('>', '')
|
||||||
|
self.__listener.onUploadError(e_str, _list, index)
|
||||||
|
return
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
dir_id = self.create_directory(os.path.basename(os.path.abspath(file_name)), parent_id)
|
dir_id = self.create_directory(os.path.basename(os.path.abspath(file_name)), parent_id)
|
||||||
|
|
@ -80,7 +101,10 @@ class GoogleDriveHelper:
|
||||||
link = f"https://drive.google.com/folderview?id={dir_id}"
|
link = f"https://drive.google.com/folderview?id={dir_id}"
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOGGER.error(str(e))
|
LOGGER.error(str(e))
|
||||||
self.__listener.onUploadError(str(e), _list, index)
|
e_str = str(e).replace('<', '')
|
||||||
|
e_str = e_str.replace('>', '')
|
||||||
|
self.__listener.onUploadError(e_str, _list, index)
|
||||||
|
return
|
||||||
LOGGER.info(download_dict)
|
LOGGER.info(download_dict)
|
||||||
self.__listener.onUploadComplete(link, _list, index)
|
self.__listener.onUploadComplete(link, _list, index)
|
||||||
LOGGER.info("Deleting downloaded file/folder..")
|
LOGGER.info("Deleting downloaded file/folder..")
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ class MirrorListeners:
|
||||||
self.context = context
|
self.context = context
|
||||||
self.update = update
|
self.update = update
|
||||||
self.message = update.message
|
self.message = update.message
|
||||||
|
self.uid = self.message.message_id
|
||||||
self.reply_message = reply_message
|
self.reply_message = reply_message
|
||||||
|
|
||||||
def onDownloadStarted(self, link: str):
|
def onDownloadStarted(self, link: str):
|
||||||
|
|
@ -20,6 +21,9 @@ class MirrorListeners:
|
||||||
def onUploadStarted(self, progress_status_list: list, index: int):
|
def onUploadStarted(self, progress_status_list: list, index: int):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def onUploadProgress(self, progress: list, index: int):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
def onUploadComplete(self, link: str, progress_status_list: list, index: int):
|
def onUploadComplete(self, link: str, progress_status_list: list, index: int):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,7 @@ from bot import LOGGER, dispatcher
|
||||||
from bot.helper import fs_utils
|
from bot.helper import fs_utils
|
||||||
from bot import download_dict, status_reply_dict
|
from bot import download_dict, status_reply_dict
|
||||||
from bot.helper.message_utils import *
|
from bot.helper.message_utils import *
|
||||||
from bot.helper.bot_utils import get_readable_message, KillThreadException
|
from bot.helper.bot_utils import get_readable_message, KillThreadException, MirrorStatus
|
||||||
from bot.helper.download_status import DownloadStatus
|
|
||||||
|
|
||||||
|
|
||||||
class MirrorListener(listeners.MirrorListeners):
|
class MirrorListener(listeners.MirrorListeners):
|
||||||
|
|
@ -17,7 +16,7 @@ class MirrorListener(listeners.MirrorListeners):
|
||||||
LOGGER.info("Adding link: " + link)
|
LOGGER.info("Adding link: " + link)
|
||||||
|
|
||||||
def onDownloadProgress(self, progress_status_list: list, index: int):
|
def onDownloadProgress(self, progress_status_list: list, index: int):
|
||||||
if progress_status_list[index].status() == DownloadStatus.STATUS_CANCELLED:
|
if progress_status_list[index].status() == MirrorStatus.STATUS_CANCELLED:
|
||||||
raise KillThreadException('Mirror cancelled by user')
|
raise KillThreadException('Mirror cancelled by user')
|
||||||
msg = get_readable_message(progress_status_list)
|
msg = get_readable_message(progress_status_list)
|
||||||
# LOGGER.info("Editing message")
|
# LOGGER.info("Editing message")
|
||||||
|
|
@ -65,6 +64,10 @@ class MirrorListener(listeners.MirrorListeners):
|
||||||
del download_dict[self.message.message_id]
|
del download_dict[self.message.message_id]
|
||||||
fs_utils.clean_download(progress_status[index].path())
|
fs_utils.clean_download(progress_status[index].path())
|
||||||
|
|
||||||
|
def onUploadProgress(self, progress: list, index: int):
|
||||||
|
msg = get_readable_message(progress)
|
||||||
|
editMessage(msg, self.context, self.reply_message)
|
||||||
|
|
||||||
|
|
||||||
@run_async
|
@run_async
|
||||||
def mirror(update, context):
|
def mirror(update, context):
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue