hdx.freshness.utils.retrieval

Utility to download and hash resources. Uses asyncio. Note that the purpose of asyncio is to help with IO-bound rather than CPU-bound code (for which multiprocessing is more suitable as it leverages multiple CPUs). Asyncio allows you to structure your code so that when one piece of linear single-threaded code (coroutine) is waiting for something to happen another can take over and use the CPU. While conceptually similar to threading, the difference is that with asyncio, it is the task of the developer rather than the OS to decide when to switch to the next task.

Retrieval Objects

class Retrieval()

[view_source]

Retrieval class for downloading and hashing resources.

Arguments:

  • user_agent str - User agent string to use when downloading
  • url_ignore Optional[str] - Parts of url to ignore for special xlsx handling

fetch

async def fetch(metadata: Tuple, session: Union[aiohttp.ClientSession,
                                                RateLimiter]) -> Tuple

[view_source]

Asynchronous code to download a resource and hash it. Returns a tuple with resource information including hashes.

Arguments:

  • metadata Tuple - Resource to be checked
  • session Union[aiohttp.ClientSession, RateLimiter] - session to use for requests

Returns:

  • Tuple - Resource information including hash

check_urls

async def check_urls(resources_to_check: List[Tuple],
                     loop: uvloop.Loop) -> Dict[str, Tuple]

[view_source]

Asynchronous code to download resources and hash them. Return dictionary with resources information including hashes.

Arguments:

  • resources_to_check List[Tuple] - List of resources to be checked
  • loop uvloop.Loop - Event loop to use

Returns:

Dict[str, Tuple]: Resources information including hashes

retrieve

def retrieve(resources_to_check: List[Tuple]) -> Dict[str, Tuple]

[view_source]

Download resources and hash them. Return dictionary with resources information including hashes.

Arguments:

  • resources_to_check List[Tuple] - List of resources to be checked

Returns:

Dict[str, Tuple]: Resources information including hashes

hdx.freshness.utils.retry

Utility to retry HTTP requests with exponential backoff interval

FailedRequest Objects

class FailedRequest(Exception)

[view_source]

A wrapper for all possible exceptions during an HTTP request

Arguments:

  • raised str - Exception type
  • message str - Exception message
  • code str - HTTP status code
  • url str - URL that was requested

send_http

async def send_http(
        session: aiohttp.ClientSession,
        method: str,
        url: str,
        *,
        retries: int = 1,
        interval: int = 1,
        backoff: int = 2,
        http_status_codes_to_retry: List[int] = HTTP_STATUS_CODES_TO_RETRY,
        fn: Callable[[ClientResponse], Any] = lambda x: x,
        **kwargs: Any)

[view_source]

Send an HTTP request and implement retry logic

Arguments:

  • session aiohttp.ClientSession - A client aiohttp session object
  • method str - Method to use eg. "get"
  • url str - URL for the request
  • retries int - Number of times to retry in case of failure
  • interval float - Time to wait before retries
  • backoff int - Multiply interval by this factor after each failure
  • http_status_codes_to_retry List[int] - List of status codes to retry fn (Callable[[x],x]: Function to call on successful connection **kwargs

hdx.freshness.utils.ratelimiter

aiohttp rate limiting: limit connections per timeframe to host (from https://quentin.pradet.me/blog/how-do-you-rate-limit-calls-with-aiohttp.html)

RateLimiter Objects

class RateLimiter()

[view_source]

Use like this: session = RateLimiter(session) ... async with await session.get(url) as response

Arguments:

  • session aiohttp.ClientSession - aiohttp session to use for requests

RATE

requests per second

get

async def get(url: str, *args: Any, **kwargs: Any) -> _RequestContextManager

[view_source]

Asynchronous code to download a resource after waiting for a token

Arguments:

  • url str - Url to download args *kwargs

Returns:

  • aiohttp.ClientResponse - Response from request

wait_for_token

async def wait_for_token(host: str) -> None

[view_source]

Asynchronous code to handle sleeping if host already connected to

Arguments:

  • host str - Host (server)

Returns:

None

add_new_tokens

def add_new_tokens(host: str) -> None

[view_source]

Adds new tokens

Arguments:

  • host str - Host (server)

Returns:

None