hdx.freshness.utils.retrieval
Utility to download and hash resources. Uses asyncio. Note that the purpose of asyncio is to help with IO-bound rather than CPU-bound code (for which multiprocessing is more suitable as it leverages multiple CPUs). Asyncio allows you to structure your code so that when one piece of linear single-threaded code (coroutine) is waiting for something to happen another can take over and use the CPU. While conceptually similar to threading, the difference is that with asyncio, it is the task of the developer rather than the OS to decide when to switch to the next task.
Retrieval Objects
class Retrieval()
Retrieval class for downloading and hashing resources.
Arguments:
user_agent
str - User agent string to use when downloadingurl_ignore
Optional[str] - Parts of url to ignore for special xlsx handling
fetch
async def fetch(metadata: Tuple, session: Union[aiohttp.ClientSession,
RateLimiter]) -> Tuple
Asynchronous code to download a resource and hash it. Returns a tuple with resource information including hashes.
Arguments:
metadata
Tuple - Resource to be checkedsession
Union[aiohttp.ClientSession, RateLimiter] - session to use for requests
Returns:
Tuple
- Resource information including hash
check_urls
async def check_urls(resources_to_check: List[Tuple],
loop: uvloop.Loop) -> Dict[str, Tuple]
Asynchronous code to download resources and hash them. Return dictionary with resources information including hashes.
Arguments:
resources_to_check
List[Tuple] - List of resources to be checkedloop
uvloop.Loop - Event loop to use
Returns:
Dict[str, Tuple]: Resources information including hashes
retrieve
def retrieve(resources_to_check: List[Tuple]) -> Dict[str, Tuple]
Download resources and hash them. Return dictionary with resources information including hashes.
Arguments:
resources_to_check
List[Tuple] - List of resources to be checked
Returns:
Dict[str, Tuple]: Resources information including hashes
hdx.freshness.utils.retry
Utility to retry HTTP requests with exponential backoff interval
FailedRequest Objects
class FailedRequest(Exception)
A wrapper for all possible exceptions during an HTTP request
Arguments:
raised
str - Exception typemessage
str - Exception messagecode
str - HTTP status codeurl
str - URL that was requested
send_http
async def send_http(
session: aiohttp.ClientSession,
method: str,
url: str,
*,
retries: int = 1,
interval: int = 1,
backoff: int = 2,
http_status_codes_to_retry: List[int] = HTTP_STATUS_CODES_TO_RETRY,
fn: Callable[[ClientResponse], Any] = lambda x: x,
**kwargs: Any)
Send an HTTP request and implement retry logic
Arguments:
session
aiohttp.ClientSession - A client aiohttp session objectmethod
str - Method to use eg. "get"url
str - URL for the requestretries
int - Number of times to retry in case of failureinterval
float - Time to wait before retriesbackoff
int - Multiply interval by this factor after each failurehttp_status_codes_to_retry
List[int] - List of status codes to retry fn (Callable[[x],x]: Function to call on successful connection **kwargs
hdx.freshness.utils.ratelimiter
aiohttp rate limiting: limit connections per timeframe to host (from https://quentin.pradet.me/blog/how-do-you-rate-limit-calls-with-aiohttp.html)
RateLimiter Objects
class RateLimiter()
Use like this: session = RateLimiter(session) ... async with await session.get(url) as response
Arguments:
session
aiohttp.ClientSession - aiohttp session to use for requests
RATE
requests per second
get
async def get(url: str, *args: Any, **kwargs: Any) -> _RequestContextManager
Asynchronous code to download a resource after waiting for a token
Arguments:
url
str - Url to download args *kwargs
Returns:
aiohttp.ClientResponse
- Response from request
wait_for_token
async def wait_for_token(host: str) -> None
Asynchronous code to handle sleeping if host already connected to
Arguments:
host
str - Host (server)
Returns:
None
add_new_tokens
def add_new_tokens(host: str) -> None
Adds new tokens
Arguments:
host
str - Host (server)
Returns:
None