feat(api): Parallelize image downloads
This commit is contained in:
@@ -3,42 +3,18 @@ from typing import Optional, Tuple
|
|||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
import backoff
|
import backoff
|
||||||
from pydantic import TypeAdapter
|
from pydantic import TypeAdapter
|
||||||
from .config import Config, CacheTypes
|
|
||||||
from .logs import logger
|
from .logs import logger
|
||||||
from eliot import start_action
|
from eliot import start_action
|
||||||
from dotenv import load_dotenv
|
|
||||||
from aiohttp import ClientResponseError
|
from aiohttp import ClientResponseError
|
||||||
from aiohttp_client_cache.session import CachedSession
|
from aiohttp_client_cache.session import CachedSession
|
||||||
from aiohttp_client_cache import FileBackend, RedisBackend
|
|
||||||
from .models import Story
|
from .models import Story
|
||||||
from .exceptions import PartNotFoundError, StoryNotFoundError
|
from .exceptions import PartNotFoundError, StoryNotFoundError
|
||||||
|
from .vars import headers, cache
|
||||||
|
|
||||||
load_dotenv(override=True)
|
|
||||||
|
|
||||||
config = Config()
|
|
||||||
story_ta = TypeAdapter(Story)
|
story_ta = TypeAdapter(Story)
|
||||||
|
|
||||||
# --- #
|
# --- #
|
||||||
|
|
||||||
headers = {
|
|
||||||
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.USE_CACHE:
|
|
||||||
match config.CACHE_TYPE:
|
|
||||||
case CacheTypes.file:
|
|
||||||
cache = FileBackend(use_temp=True, expire_after=43200) # 12 hours
|
|
||||||
case CacheTypes.redis:
|
|
||||||
cache = RedisBackend(
|
|
||||||
cache_name="wpd-aiohttp-cache",
|
|
||||||
address=config.REDIS_CONNECTION_URL,
|
|
||||||
expire_after=43200, # 12 hours
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
cache = None
|
|
||||||
|
|
||||||
logger.info(f"Using {cache=}")
|
|
||||||
|
|
||||||
|
|
||||||
async def fetch_cookies(username: str, password: str) -> dict:
|
async def fetch_cookies(username: str, password: str) -> dict:
|
||||||
# source: https://github.com/TheOnlyWayUp/WP-DM-Export/blob/dd4c7c51cb43f2108e0f63fc10a66cd24a740e4e/src/API/src/main.py#L25-L58
|
# source: https://github.com/TheOnlyWayUp/WP-DM-Export/blob/dd4c7c51cb43f2108e0f63fc10a66cd24a740e4e/src/API/src/main.py#L25-L58
|
||||||
@@ -148,18 +124,3 @@ async def fetch_story_content_zip(
|
|||||||
bytes_stream = BytesIO(await response.read())
|
bytes_stream = BytesIO(await response.read())
|
||||||
|
|
||||||
return bytes_stream
|
return bytes_stream
|
||||||
|
|
||||||
|
|
||||||
@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15)
|
|
||||||
async def fetch_image(url: str, should_cache: bool = False) -> bytes:
|
|
||||||
"""Fetch image bytes."""
|
|
||||||
with start_action(action_type="api_fetch_image", url=url):
|
|
||||||
async with CachedSession(
|
|
||||||
headers=headers, cache=cache if should_cache else None
|
|
||||||
) as session: # Don't cache images.
|
|
||||||
async with session.get(url) as response:
|
|
||||||
response.raise_for_status()
|
|
||||||
|
|
||||||
body = await response.read()
|
|
||||||
|
|
||||||
return body
|
|
||||||
|
|||||||
@@ -1,4 +1,11 @@
|
|||||||
|
from typing import List, Tuple
|
||||||
|
from aiohttp import ClientSession
|
||||||
from bs4 import BeautifulSoup, Tag
|
from bs4 import BeautifulSoup, Tag
|
||||||
|
from itertools import batched, chain
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from .vars import headers
|
||||||
|
from eliot import start_action
|
||||||
|
|
||||||
|
|
||||||
def clean_tree(title: str, id: int, body: str) -> BeautifulSoup:
|
def clean_tree(title: str, id: int, body: str) -> BeautifulSoup:
|
||||||
@@ -48,3 +55,27 @@ def clean_tree(title: str, id: int, body: str) -> BeautifulSoup:
|
|||||||
insert_at.append(br_tag)
|
insert_at.append(br_tag)
|
||||||
|
|
||||||
return new_soup
|
return new_soup
|
||||||
|
|
||||||
|
|
||||||
|
async def fetch_image(url: str) -> bytes | None:
|
||||||
|
"""Fetch image bytes."""
|
||||||
|
with start_action(action_type="api_fetch_image", url=url):
|
||||||
|
async with ClientSession(headers=headers) as session: # Don't cache images.
|
||||||
|
async with session.get(url) as response:
|
||||||
|
if not response.ok:
|
||||||
|
return None
|
||||||
|
|
||||||
|
body = await response.read()
|
||||||
|
|
||||||
|
return body
|
||||||
|
|
||||||
|
|
||||||
|
async def download_tree_images(tree: BeautifulSoup) -> Tuple[bytes]:
|
||||||
|
image_urls = [img["src"] for img in tree.find_all("img")]
|
||||||
|
downloaded_images: List[bytes] = list(
|
||||||
|
chain(
|
||||||
|
await asyncio.gather(*[fetch_image(url) for url in chunk])
|
||||||
|
for chunk in batched(image_urls, 3)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return downloaded_images
|
||||||
|
|||||||
@@ -0,0 +1,28 @@
|
|||||||
|
from .config import Config, CacheTypes
|
||||||
|
from aiohttp_client_cache.session import CachedSession
|
||||||
|
from aiohttp_client_cache import FileBackend, RedisBackend
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from .logs import logger
|
||||||
|
|
||||||
|
headers = {
|
||||||
|
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"
|
||||||
|
}
|
||||||
|
|
||||||
|
load_dotenv(override=True)
|
||||||
|
|
||||||
|
config = Config()
|
||||||
|
|
||||||
|
if config.USE_CACHE:
|
||||||
|
match config.CACHE_TYPE:
|
||||||
|
case CacheTypes.file:
|
||||||
|
cache = FileBackend(use_temp=True, expire_after=43200) # 12 hours
|
||||||
|
case CacheTypes.redis:
|
||||||
|
cache = RedisBackend(
|
||||||
|
cache_name="wpd-aiohttp-cache",
|
||||||
|
address=config.REDIS_CONNECTION_URL,
|
||||||
|
expire_after=43200, # 12 hours
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
cache = None
|
||||||
|
|
||||||
|
logger.info(f"Using {cache=}")
|
||||||
Reference in New Issue
Block a user