Fixed potential race conditions in GoogleDriveHelper class

* Also simplified code a bit
Signed-off-by: lzzy12 <jhashivam2020@gmail.com>
This commit is contained in:
lzzy12 2019-12-26 15:28:27 +05:30
parent 820d72f317
commit 20dae25dc9
2 changed files with 27 additions and 30 deletions

View File

@ -4,7 +4,8 @@ from google.auth.transport.requests import Request
from googleapiclient.http import MediaFileUpload from googleapiclient.http import MediaFileUpload
import pickle import pickle
import os import os
from bot import LOGGER, parent_id, DOWNLOAD_DIR, DOWNLOAD_STATUS_UPDATE_INTERVAL, IS_TEAM_DRIVE, INDEX_URL import threading
from bot import LOGGER, parent_id, DOWNLOAD_DIR, IS_TEAM_DRIVE, INDEX_URL
from bot.helper.ext_utils.fs_utils import get_mime_type from bot.helper.ext_utils.fs_utils import get_mime_type
from bot.helper.ext_utils.bot_utils import * from bot.helper.ext_utils.bot_utils import *
@ -23,20 +24,22 @@ class GoogleDriveHelper:
self.__G_DRIVE_BASE_DOWNLOAD_URL = "https://drive.google.com/uc?id={}&export=download" self.__G_DRIVE_BASE_DOWNLOAD_URL = "https://drive.google.com/uc?id={}&export=download"
self.__listener = listener self.__listener = listener
self.__service = self.authorize() self.__service = self.authorize()
self._file_uploaded_bytes = 0 self.__uploaded_bytes = 0
self.uploaded_bytes = 0
self.start_time = 0 self.start_time = 0
self.total_time = 0
self._should_update = True
self.is_uploading = True self.is_uploading = True
self.is_cancelled = False self.is_cancelled = False
self.status = None
self.updater = None
self.name = name self.name = name
self.resource_lock = threading.Lock()
def cancel(self): def cancel(self):
self.is_cancelled = True with self.resource_lock:
self.is_uploading = False self.is_cancelled = True
self.is_uploading = False
@property
def uploaded_bytes(self):
with self.resource_lock:
return self.__uploaded_bytes
def speed(self): def speed(self):
""" """
@ -44,18 +47,11 @@ class GoogleDriveHelper:
:return: Upload speed in bytes/second :return: Upload speed in bytes/second
""" """
try: try:
return self.uploaded_bytes / self.total_time with self.resource_lock:
return self.__uploaded_bytes / (time.time() - self.start_time)
except ZeroDivisionError: except ZeroDivisionError:
return 0 return 0
def _on_upload_progress(self):
if self.status is not None:
chunk_size = self.status.total_size * self.status.progress() - self._file_uploaded_bytes
self._file_uploaded_bytes = self.status.total_size * self.status.progress()
LOGGER.info(f'Chunk size: {get_readable_file_size(chunk_size)}')
self.uploaded_bytes += chunk_size
self.total_time += DOWNLOAD_STATUS_UPDATE_INTERVAL
def __upload_empty_file(self, path, file_name, mime_type, parent_id=None): def __upload_empty_file(self, path, file_name, mime_type, parent_id=None):
media_body = MediaFileUpload(path, media_body = MediaFileUpload(path,
mimetype=mime_type, mimetype=mime_type,
@ -109,11 +105,16 @@ class GoogleDriveHelper:
drive_file = self.__service.files().create(supportsTeamDrives=True, drive_file = self.__service.files().create(supportsTeamDrives=True,
body=file_metadata, media_body=media_body) body=file_metadata, media_body=media_body)
response = None response = None
last_uploaded = 0
while response is None: while response is None:
if self.is_cancelled: if self.is_cancelled:
return None return None
self.status, response = drive_file.next_chunk() status, response = drive_file.next_chunk()
self._file_uploaded_bytes = 0 if status is not None:
with self.resource_lock:
chunk_size = status.total_size * status.progress() - last_uploaded
last_uploaded = status.total_size * status.progress()
self.__uploaded_bytes += chunk_size
# Insert new permissions # Insert new permissions
if not IS_TEAM_DRIVE: if not IS_TEAM_DRIVE:
self.__set_permission(response['id']) self.__set_permission(response['id'])
@ -128,7 +129,6 @@ class GoogleDriveHelper:
file_path = f"{file_dir}/{file_name}" file_path = f"{file_dir}/{file_name}"
LOGGER.info("Uploading File: " + file_path) LOGGER.info("Uploading File: " + file_path)
self.start_time = time.time() self.start_time = time.time()
self.updater = setInterval(5, self._on_upload_progress)
if os.path.isfile(file_path): if os.path.isfile(file_path):
try: try:
mime_type = get_mime_type(file_path) mime_type = get_mime_type(file_path)
@ -142,8 +142,6 @@ class GoogleDriveHelper:
e_str = e_str.replace('>', '') e_str = e_str.replace('>', '')
self.__listener.onUploadError(e_str) self.__listener.onUploadError(e_str)
return return
finally:
self.updater.cancel()
else: else:
try: try:
dir_id = self.create_directory(os.path.basename(os.path.abspath(file_name)), parent_id) dir_id = self.create_directory(os.path.basename(os.path.abspath(file_name)), parent_id)
@ -158,8 +156,6 @@ class GoogleDriveHelper:
e_str = e_str.replace('>', '') e_str = e_str.replace('>', '')
self.__listener.onUploadError(e_str) self.__listener.onUploadError(e_str)
return return
finally:
self.updater.cancel()
LOGGER.info(download_dict) LOGGER.info(download_dict)
self.__listener.onUploadComplete(link) self.__listener.onUploadComplete(link)
LOGGER.info("Deleting downloaded file/folder..") LOGGER.info("Deleting downloaded file/folder..")

View File

@ -6,8 +6,9 @@ from bot.helper.ext_utils.fs_utils import clean_download
from bot.helper.telegram_helper.bot_commands import BotCommands from bot.helper.telegram_helper.bot_commands import BotCommands
from time import sleep from time import sleep
@run_async @run_async
def cancel_mirror(bot,update): def cancel_mirror(bot, update):
mirror_message = update.message.reply_to_message mirror_message = update.message.reply_to_message
with download_dict_lock: with download_dict_lock:
keys = download_dict.keys() keys = download_dict.keys()
@ -26,11 +27,11 @@ def cancel_mirror(bot,update):
aria2.pause(downloads) aria2.pause(downloads)
aria2.pause([download]) aria2.pause([download])
elif dl.status() == "Uploading": elif dl.status() == "Uploading":
sendMessage("Upload in Progress, Dont Cancel it.",bot,update) sendMessage("Upload in Progress, Don't Cancel it.", bot, update)
return return
else: else:
dl._listener.onDownloadError("Download stopped by user!") dl._listener.onDownloadError("Download stopped by user!")
sleep(1) #Wait a Second For Aria2 To free Resources. sleep(1) # Wait a Second For Aria2 To free Resources.
clean_download(f'{DOWNLOAD_DIR}{mirror_message.message_id}/') clean_download(f'{DOWNLOAD_DIR}{mirror_message.message_id}/')
@ -42,8 +43,8 @@ def cancel_all(update, bot):
dlDetails._listener.onDownloadError("Download Manually Cancelled By user.") dlDetails._listener.onDownloadError("Download Manually Cancelled By user.")
aria2.pause([dlDetails.download()]) aria2.pause([dlDetails.download()])
delete_all_messages() delete_all_messages()
sendMessage('Cancelled all downloads!',update,bot) sendMessage('Cancelled all downloads!', update, bot)
sleep(0.5)#Wait a Second For Aria2 To free Resources. sleep(0.5) # Wait a Second For Aria2 To free Resources.
clean_download(DOWNLOAD_DIR) clean_download(DOWNLOAD_DIR)