Source code for grobber.url_pool
import logging
from asyncio import Lock
from datetime import datetime, timedelta
from typing import List
from . import locals
from .exceptions import GrobberException
from .request import Request
log = logging.getLogger(__name__)
[docs]class UrlPool:
"""Pool of possible urls which provides easy access to a working one.
Attributes:
name (str): Name given to the pool
urls (List[str]): List of the possible urls
strip_slash (bool): Whether or not tailing slashes should be removed
ttl (timedelta): Time until the current url expires
"""
def __init__(self, name: str, urls: List[str], *, strip_slash: bool = True, ttl: int = 3600) -> None:
self._url = None
self._next_update = None
self.name = name
self.urls = urls
self.strip_slash = strip_slash
self.ttl = timedelta(seconds=ttl)
self.__lock = None
def __repr__(self) -> str:
return f"UrlPool({self.name!r}, {self.urls!r}, strip_slash={self.strip_slash}, ttl={self.ttl})"
def __str__(self) -> str:
return self._id
@property
def name(self) -> str:
return self._name
@name.setter
def name(self, value: str) -> None:
self._name = value
self._id = value.upper() + "_URL"
@property
def _lock(self) -> Lock:
if not self.__lock:
self.__lock = Lock()
return self.__lock
@property
def needs_update(self) -> bool:
"""Whether the current url is outdated."""
return (not self._next_update) or datetime.now() > self._next_update
@property
async def url(self) -> str:
"""Current url."""
async with self._lock:
if self.needs_update:
await self.fetch()
if self.needs_update:
log.debug(f"searching new url for {self}")
await self.update_url()
self._next_update = datetime.now() + self.ttl
await self.upload()
return self.prepare_url(self._url)
[docs] async def fetch(self) -> None:
"""Get the current url from the database."""
doc = await locals.url_pool_collection.find_one(self.name)
if not doc:
log.debug(f"creating pool for {self}")
else:
log.debug(f"{self} initialising from database {doc}")
self._url = doc["url"]
self._next_update = doc["next_update"]
[docs] async def upload(self) -> None:
"""Upload the current url to the database."""
await locals.url_pool_collection.update_one(dict(_id=self.name),
{"$set": dict(url=self._url, next_update=self._next_update)},
upsert=True)
[docs] def prepare_url(self, url: str) -> str:
"""Prepare an url to be used as the current url.
This function is performed for all urls returned by `url`
"""
if self.strip_slash:
url = url.rstrip("/")
return url
[docs] async def update_url(self) -> None:
"""Search for a working url.
This is automatically called.
"""
requests = [Request(url, allow_redirects=True) for url in self.urls]
req = await Request.first(requests)
if req:
self._url = str((await req.head_response).url)
log.debug(f"{req} successful, moving to front! ({self._url})")
self.urls.insert(0, self.urls.pop(requests.index(req)))
else:
raise GrobberException(f"{self} No working url found {requests}")