Source code for parfive.downloader

import os
import signal
import asyncio
import logging
import pathlib
import warnings
import threading
import contextlib
import urllib.parse
from typing import Union, Callable, Optional
from functools import reduce

try:
    from typing import Literal  # Added in Python 3.8
except ImportError:
    from typing_extensions import Literal  # type: ignore

from functools import partial

import aiohttp
from tqdm import tqdm as tqdm_std
from tqdm.auto import tqdm as tqdm_auto

import parfive
from .config import DownloaderConfig, SessionConfig
from .results import Results
from .utils import (
    FailedDownload,
    MultiPartDownloadError,
    Token,
    _QueueList,
    cancel_task,
    default_name,
    get_filepath,
    get_ftp_size,
    get_http_size,
    remove_file,
    run_task_in_thread,
)

try:
    import aioftp
except ImportError:  # pragma: nocover
    aioftp = None


__all__ = ["Downloader"]


[docs] class Downloader: """ Download files in parallel. Parameters ---------- max_conn The number of parallel download slots. max_splits The maximum number of splits to use to download a file (server dependent). progress If `True` show a main progress bar showing how many of the total files have been downloaded. If `False`, no progress bars will be shown at all. overwrite Determine how to handle downloading if a file already exists with the same name. If `False` the file download will be skipped and the path returned to the existing file, if `True` the file will be downloaded and the existing file will be overwritten, if `'unique'` the filename will be modified to be unique. config A config object containing more complex settings for this ``Downloader`` instance. """ def __init__( self, max_conn: int = 5, max_splits: int = 5, progress: bool = True, overwrite: Union[bool, Literal["unique"]] = False, config: Optional[SessionConfig] = None, ): self.config = DownloaderConfig( max_conn=max_conn, max_splits=max_splits, progress=progress, overwrite=overwrite, config=config, ) self._init_queues() # Configure progress bars self.tqdm = tqdm_auto if self.config.notebook is not None: if self.config.notebook is True: from tqdm.notebook import tqdm as tqdm_notebook self.tqdm = tqdm_notebook elif self.config.notebook is False: self.tqdm = tqdm_std else: raise ValueError( "The notebook keyword argument should be one of None, True or False." ) self._configure_logging() def _init_queues(self): # Setup queues self.http_queue = _QueueList() self.ftp_queue = _QueueList() def _generate_tokens(self): # Create a Queue with max_conn tokens queue = asyncio.Queue(maxsize=self.config.max_conn) for i in range(self.config.max_conn): queue.put_nowait(Token(i + 1)) return queue def _configure_logging(self): # pragma: no cover if self.config.log_level is None: return sh = logging.StreamHandler() sh.setLevel(self.config.log_level) formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s") sh.setFormatter(formatter) parfive.log.addHandler(sh) parfive.log.setLevel(self.config.log_level) aiohttp_logger = logging.getLogger("aiohttp.client") aioftp_logger = logging.getLogger("aioftp.client") aioftp_logger.addHandler(sh) aioftp_logger.setLevel(self.config.log_level) aiohttp_logger.addHandler(sh) aiohttp_logger.setLevel(self.config.log_level) parfive.log.debug("Configured parfive to run with debug logging...") @property def queued_downloads(self): """ The total number of files already queued for download. """ return len(self.http_queue) + len(self.ftp_queue)
[docs] def enqueue_file( self, url: str, path: Optional[Union[str, os.PathLike]] = None, filename: Optional[ Union[str, Callable[[str, Optional[aiohttp.ClientResponse]], os.PathLike]] ] = None, overwrite: Optional[Union[bool, Literal["unique"]]] = None, **kwargs, ): """ Add a file to the download queue. Parameters ---------- url The URL to retrieve. path The directory to retrieve the file into, if `None` defaults to the current directory. filename The filename to save the file as. Can also be a callable which takes two arguments the url and the response object from opening that URL, and returns the filename. (Note, for FTP downloads the response will be ``None``.) If `None` the HTTP headers will be read for the filename, or the last segment of the URL will be used. overwrite Determine how to handle downloading if a file already exists with the same name. If `False` the file download will be skipped and the path returned to the existing file, if `True` the file will be downloaded and the existing file will be overwritten, if `'unique'` the filename will be modified to be unique. If `None` the value set when constructing the `~parfive.Downloader` object will be used. kwargs : `dict` Extra keyword arguments are passed to `aiohttp.ClientSession.request` or `aioftp.Client.context` depending on the protocol. """ overwrite = overwrite or self.config.overwrite if path is None and filename is None: raise ValueError("Either path or filename must be specified.") elif path is None: path = "./" path = pathlib.Path(path) filepath: Callable[[str, Optional[aiohttp.ClientResponse]], os.PathLike] if not filename: filepath = partial(default_name, path) elif callable(filename): filepath = filename else: # Define a function because get_file expects a callback def filepath(url, resp): return path / filename scheme = urllib.parse.urlparse(url).scheme if scheme in ("http", "https"): get_file = partial( self._get_http, url=url, filepath_partial=filepath, overwrite=overwrite, **kwargs ) self.http_queue.append(get_file) elif scheme == "ftp": if aioftp is None: raise ValueError("The aioftp package must be installed to download over FTP.") get_file = partial( self._get_ftp, url=url, filepath_partial=filepath, overwrite=overwrite, **kwargs ) self.ftp_queue.append(get_file) else: raise ValueError("URL must start with either 'http' or 'ftp'.")
@staticmethod def _add_shutdown_signals(loop, task): if os.name == "nt": return if threading.current_thread() != threading.main_thread(): warnings.warn( "This download has been started in a thread which is not the main thread. You will not be able to interrupt the download.", UserWarning, ) return for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, task.cancel) def _run_in_loop(self, coro): """ Take a coroutine and figure out where to run it and how to cancel it. """ try: loop = asyncio.get_running_loop() except RuntimeError: loop = None # If we already have a loop and it's already running then we should # make a new loop (as we are probably in a Jupyter Notebook) should_run_in_thread = loop and loop.is_running() # If we don't already have a loop, make a new one if should_run_in_thread or loop is None: loop = asyncio.new_event_loop() # Wrap up the coroutine in a task so we can cancel it later task = loop.create_task(coro) # Add handlers for shutdown signals self._add_shutdown_signals(loop, task) # Execute the task if should_run_in_thread: return run_task_in_thread(loop, task) return loop.run_until_complete(task)
[docs] async def run_download(self): """ Download all files in the queue. Returns ------- `parfive.Results` A list of files downloaded. """ tasks = set() with self._get_main_pb(self.queued_downloads) as main_pb: try: if len(self.http_queue): tasks.add(asyncio.create_task(self._run_http_download(main_pb))) if len(self.ftp_queue): tasks.add(asyncio.create_task(self._run_ftp_download(main_pb))) dl_results = await asyncio.gather(*tasks, return_exceptions=True) except asyncio.CancelledError: for task in tasks: task.cancel() dl_results = await asyncio.gather(*tasks, return_exceptions=True) finally: results_obj = self._format_results(dl_results, main_pb) return results_obj
def _format_results(self, retvals, main_pb): # Squash all nested lists into a single flat list if retvals and isinstance(retvals[0], list): retvals = list(reduce(list.__add__, retvals)) errors = sum([isinstance(i, FailedDownload) for i in retvals]) if errors: total_files = self.queued_downloads message = f"{errors}/{total_files} files failed to download. Please check `.errors` for details" if main_pb: main_pb.write(message) else: parfive.log.info(message) results = Results() # Iterate through the results and store any failed download errors in # the errors list of the results object. for res in retvals: if isinstance(res, FailedDownload): results.add_error(res.filepath_partial, res.url, res.exception) parfive.log.info( "%s failed to download with exception\n" "%s", res.url, res.exception ) elif isinstance(res, Exception): raise res else: requested_url, filepath = res results.append(path=filepath, url=requested_url) return results
[docs] def download(self): """ Download all files in the queue. Returns ------- `parfive.Results` A list of files downloaded. Notes ----- This is a synchronous version of `~parfive.Downloader.run_download`, an `asyncio` event loop will be created to run the download (in it's own thread if a loop is already running). """ return self._run_in_loop(self.run_download())
[docs] def retry(self, results: Results): """ Retry any failed downloads in a results object. .. note:: This will start a new event loop. Parameters ---------- results : `parfive.Results` A previous results object, the ``.errors`` property will be read and the downloads retried. Returns ------- `parfive.Results` A modified version of the input ``results`` with all the errors from this download attempt and any new files appended to the list of file paths. """ # Reset the queues self._init_queues() for err in results.errors: self.enqueue_file(err.url, filename=err.filepath_partial) new_res = self.download() results += new_res results._errors = new_res._errors return results
[docs] @classmethod def simple_download(cls, urls, *, path="./", overwrite=None): """ Download a series of URLs to a single destination. Parameters ---------- urls : iterable A sequence of URLs to download. path : `pathlib.Path`, optional The destination directory for the downloaded files. Defaults to the current directory. overwrite: `bool`, optional Overwrite the files at the destination directory. If `False` the URL will not be downloaded if a file with the corresponding filename already exists. Returns ------- `parfive.Results` A list of files downloaded. """ dl = cls() for url in urls: dl.enqueue_file(url, path=path, overwrite=overwrite) return dl.download()
def _get_main_pb(self, total): """ Return the tqdm instance if we want it, else return a contextmanager that just returns None. """ if self.config.progress: return self.tqdm(total=total, unit="file", desc="Files Downloaded", position=0) else: return contextlib.contextmanager(lambda: iter([None]))() async def _run_http_download(self, main_pb): async with self.config.aiohttp_client_session() as session: futures = await self._run_from_queue( self.http_queue.generate_queue(), self._generate_tokens(), main_pb, session=session, ) try: # Wait for all the coroutines to finish done, _ = await asyncio.wait(futures) except asyncio.CancelledError: for task in futures: task.cancel() return await asyncio.gather(*futures, return_exceptions=True) async def _run_ftp_download(self, main_pb): futures = await self._run_from_queue( self.ftp_queue.generate_queue(), self._generate_tokens(), main_pb, ) try: # Wait for all the coroutines to finish done, _ = await asyncio.wait(futures) except asyncio.CancelledError: for task in futures: task.cancel() return await asyncio.gather(*futures, return_exceptions=True) async def _run_from_queue(self, queue, tokens, main_pb, *, session=None): futures = [] try: while not queue.empty(): get_file = await queue.get() token = await tokens.get() file_pb = self.tqdm if self.config.file_progress else False future = asyncio.create_task(get_file(session, token=token, file_pb=file_pb)) def callback(token, future, main_pb): try: tokens.put_nowait(token) # Update the main progressbar if main_pb and not future.exception(): main_pb.update(1) except asyncio.CancelledError: return future.add_done_callback(partial(callback, token, main_pb=main_pb)) futures.append(future) except asyncio.CancelledError: for task in futures: task.cancel() return futures async def _get_http( self, session, *, url, filepath_partial, chunksize=None, file_pb=None, token, overwrite, max_splits=None, **kwargs, ): """ Read the file from the given url into the filename given by ``filepath_partial``. Parameters ---------- session : `aiohttp.ClientSession` The `aiohttp.ClientSession` to use to retrieve the files. url : `str` The url to retrieve. filepath_partial : `callable` A function to call which returns the filepath to save the url to. Takes two arguments ``resp, url``. chunksize : `int` The number of bytes to read into the file at a time. file_pb : `tqdm.tqdm` or `False` Should progress bars be displayed for each file downloaded. token : `parfive.downloader.Token` A token for this download slot. overwrite : `bool` Overwrite the file if it already exists. max_splits: `int`, optional Number of maximum concurrent connections per file. kwargs : `dict` Extra keyword arguments are passed to `aiohttp.ClientSession.get`. Returns ------- `str` The name of the file saved. """ if chunksize is None: chunksize = 1024 if max_splits is None: max_splits = self.config.max_splits # Define filepath and writer here as we use them in the except block filepath = writer = None tasks = [] try: scheme = urllib.parse.urlparse(url).scheme if scheme == "http": kwargs["proxy"] = self.config.http_proxy elif scheme == "https": kwargs["proxy"] = self.config.https_proxy async with session.get(url, timeout=self.config.timeouts, **kwargs) as resp: parfive.log.debug( "%s request made to %s with headers=%s", resp.request_info.method, resp.request_info.url, resp.request_info.headers, ) parfive.log.debug( "%s Response received from %s with headers=%s", resp.status, resp.request_info.url, resp.headers, ) if resp.status < 200 or resp.status >= 300: raise FailedDownload(filepath_partial, url, resp) else: filepath, skip = get_filepath(filepath_partial(resp, url), overwrite) if skip: parfive.log.debug( "File %s already exists and overwrite is False; skipping download.", filepath, ) return url, str(filepath) if callable(file_pb): file_pb = file_pb( position=token.n, unit="B", unit_scale=True, desc=filepath.name, leave=False, total=get_http_size(resp), ) else: file_pb = None # This queue will contain the downloaded chunks and their offsets # as tuples: (offset, chunk) downloaded_chunk_queue = asyncio.Queue() writer = asyncio.create_task( self._write_worker(downloaded_chunk_queue, file_pb, filepath) ) if ( not self.config.env.disable_range and max_splits and resp.headers.get("Accept-Ranges", None) == "bytes" and "Content-length" in resp.headers ): content_length = int(resp.headers["Content-length"]) split_length = max(1, content_length // max_splits) ranges = [ [start, start + split_length] for start in range(0, content_length, split_length) ] # let the last part download everything ranges[-1][1] = "" for _range in ranges: tasks.append( asyncio.create_task( self._http_download_worker( session, url, chunksize, _range, downloaded_chunk_queue, **kwargs, ) ) ) else: tasks.append( asyncio.create_task( self._http_download_worker( session, url, chunksize, None, downloaded_chunk_queue, **kwargs, ) ) ) # Close the initial request here before we start transferring data. # run all the download workers await asyncio.gather(*tasks) # join() waits till all the items in the queue have been processed await downloaded_chunk_queue.join() for callback in self.config.done_callbacks: callback(filepath, url, None) return url, str(filepath) except (Exception, asyncio.CancelledError) as e: for task in tasks: task.cancel() # We have to cancel the writer here before we try and remove the # file so it's closed (otherwise windows gets angry). if writer is not None: await cancel_task(writer) # Set writer to None so we don't cancel it twice. writer = None # If filepath is None then the exception occurred before the request # computed the filepath, so we have no file to cleanup if filepath is not None: remove_file(filepath) for callback in self.config.done_callbacks: callback(filepath, url, e) raise FailedDownload(filepath_partial, url, e) finally: if writer is not None: writer.cancel() if isinstance(file_pb, self.tqdm): file_pb.close() async def _write_worker(self, queue, file_pb, filepath): """ Worker for writing the downloaded chunk to the file. The downloaded chunk is put into a asyncio Queue by a download worker. This worker gets the chunk from the queue and write it to the file using the specified offset of the chunk. Parameters ---------- queue: `asyncio.Queue` Queue for chunks file_pb : `tqdm.tqdm` or `False` Should progress bars be displayed for each file downloaded. filepath: `pathlib.Path` Path to the which the file should be downloaded. """ if self.config.use_aiofiles: await self._async_write_worker(queue, file_pb, filepath) else: await self._blocking_write_worker(queue, file_pb, filepath) async def _async_write_worker(self, queue, file_pb, filepath): import aiofiles async with aiofiles.open(filepath, mode="wb") as f: while True: offset, chunk = await queue.get() await f.seek(offset) await f.write(chunk) await f.flush() # Update the progressbar for file if file_pb is not None: file_pb.update(len(chunk)) queue.task_done() async def _blocking_write_worker(self, queue, file_pb, filepath): with open(filepath, "wb") as f: while True: offset, chunk = await queue.get() f.seek(offset) f.write(chunk) f.flush() # Update the progressbar for file if file_pb is not None: file_pb.update(len(chunk)) queue.task_done() async def _http_download_worker(self, session, url, chunksize, http_range, queue, **kwargs): """ Worker for downloading chunks from http urls. This function downloads the chunk from the specified http range and puts the chunk in the asyncio Queue. If no range is specified, then the whole file is downloaded via chunks and put in the queue. Parameters ---------- session : `aiohttp.ClientSession` The `aiohttp.ClientSession` to use to retrieve the files. url : `str` The url to retrieve. chunksize : `int` The number of bytes to read into the file at a time. http_range: (`int`, `int`) or `None` Start and end bytes of the file. In None, then no `Range` header is specified in request and the whole file will be downloaded. queue: `asyncio.Queue` Queue to put the download chunks. kwargs : `dict` Extra keyword arguments are passed to `aiohttp.ClientSession.get`. """ headers = kwargs.pop("headers", {}) if http_range: headers["Range"] = "bytes={}-{}".format(*http_range) # init offset to start of range offset, _ = http_range else: offset = 0 async with session.get(url, timeout=self.config.timeouts, headers=headers, **kwargs) as resp: parfive.log.debug( "%s request made for download to %s with headers=%s", resp.request_info.method, resp.request_info.url, resp.request_info.headers, ) parfive.log.debug( "%s Response received from %s with headers=%s", resp.status, resp.request_info.url, resp.headers, ) if resp.status < 200 or resp.status >= 300: raise MultiPartDownloadError(resp) while True: chunk = await resp.content.read(chunksize) if not chunk: break await queue.put((offset, chunk)) offset += len(chunk) async def _get_ftp( self, session=None, *, url, filepath_partial, file_pb=None, token, overwrite, **kwargs, ): """ Read the file from the given url into the filename given by ``filepath_partial``. Parameters ---------- session : `None` A placeholder for API compatibility with ``_get_http`` url : `str` The url to retrieve. filepath_partial : `callable` A function to call which returns the filepath to save the url to. Takes two arguments ``resp, url``. file_pb : `tqdm.tqdm` or `False` Should progress bars be displayed for each file downloaded. token : `parfive.downloader.Token` A token for this download slot. overwrite : `bool` Whether to overwrite the file if it already exists. kwargs : `dict` Extra keyword arguments are passed to `aioftp.Client.context`. Returns ------- `str` The name of the file saved. """ filepath = writer = None parse = urllib.parse.urlparse(url) try: async with aioftp.Client.context(parse.hostname, **kwargs) as client: parfive.log.debug("Connected to ftp server %s", parse.hostname) if parse.username and parse.password: parfive.log.debug( "Explicitly Logging in with %s:%s", parse.username, parse.password ) await client.login(parse.username, parse.password) # This has to be done before we start streaming the file: filepath, skip = get_filepath(filepath_partial(None, url), overwrite) if skip: parfive.log.debug( "File %s already exists and overwrite is False; skipping download.", filepath, ) return url, str(filepath) if callable(file_pb): total_size = await get_ftp_size(client, parse.path) file_pb = file_pb( position=token.n, unit="B", unit_scale=True, desc=filepath.name, leave=False, total=total_size, ) else: file_pb = None parfive.log.debug("Downloading file %s from %s", parse.path, parse.hostname) async with client.download_stream(parse.path) as stream: downloaded_chunks_queue = asyncio.Queue() download_workers = [] writer = asyncio.create_task( self._write_worker(downloaded_chunks_queue, file_pb, filepath) ) download_workers.append( asyncio.create_task( self._ftp_download_worker(stream, downloaded_chunks_queue) ) ) await asyncio.gather(*download_workers) await downloaded_chunks_queue.join() for callback in self.config.done_callbacks: callback(filepath, url, None) return url, str(filepath) except (Exception, asyncio.CancelledError) as e: if writer is not None: await cancel_task(writer) writer = None # If filepath is None then the exception occurred before the request # computed the filepath, so we have no file to cleanup if filepath is not None: remove_file(filepath) filepath = None for callback in self.config.done_callbacks: callback(filepath, url, e) raise FailedDownload(filepath_partial, url, e) finally: # Just make sure we close the file. if writer is not None: writer.cancel() if isinstance(file_pb, self.tqdm): file_pb.close() async def _ftp_download_worker(self, stream, queue): """ Similar to `Downloader._http_download_worker`. See that function's documentation for more info. Parameters ---------- stream: `aioftp.StreamIO` Stream of the file to be downloaded. queue: `asyncio.Queue` Queue to put the download chunks. """ offset = 0 async for chunk in stream.iter_by_block(): # Write this chunk to the output file. await queue.put((offset, chunk)) offset += len(chunk)