From 713c8071fb04361782dd49c8c1c2ab6eb5ed0226 Mon Sep 17 00:00:00 2001 From: TheOnlyWayUp Date: Wed, 4 Jun 2025 11:48:02 +0000 Subject: [PATCH] api: Make create_book a module --- src/api/src/create_book.py | 782 ------------------ src/api/src/create_book/__init__.py | 10 + src/api/src/create_book/config.py | 45 + src/api/src/create_book/create_book.py | 165 ++++ src/api/src/create_book/exceptions.py | 11 + .../src/create_book/generators/__init__.py | 2 + src/api/src/create_book/generators/epub.py | 115 +++ src/api/src/create_book/generators/pdf.py | 286 +++++++ src/api/src/create_book/logs.py | 18 + src/api/src/create_book/models.py | 42 + src/api/src/create_book/utils.py | 108 +++ 11 files changed, 802 insertions(+), 782 deletions(-) delete mode 100644 src/api/src/create_book.py create mode 100644 src/api/src/create_book/__init__.py create mode 100644 src/api/src/create_book/config.py create mode 100644 src/api/src/create_book/create_book.py create mode 100644 src/api/src/create_book/exceptions.py create mode 100644 src/api/src/create_book/generators/__init__.py create mode 100644 src/api/src/create_book/generators/epub.py create mode 100644 src/api/src/create_book/generators/pdf.py create mode 100644 src/api/src/create_book/logs.py create mode 100644 src/api/src/create_book/models.py create mode 100644 src/api/src/create_book/utils.py diff --git a/src/api/src/create_book.py b/src/api/src/create_book.py deleted file mode 100644 index 8a90a0b..0000000 --- a/src/api/src/create_book.py +++ /dev/null @@ -1,782 +0,0 @@ -from __future__ import annotations -from typing import List, Optional, Tuple, cast -from typing_extensions import TypedDict -import re -import logging -import tempfile -import unicodedata -from os import environ -from io import BytesIO -from enum import Enum -from base64 import b64encode -import bs4 -import backoff -from weasyprint import HTML, CSS, default_url_fetcher -from weasyprint.text.fonts import FontConfiguration -from ebooklib import epub -from exiftool import ExifTool -from eliot import to_file, start_action -from eliot.stdlib import EliotHandler -from bs4 import BeautifulSoup -from dotenv import load_dotenv -from pydantic import TypeAdapter, model_validator, field_validator -from pydantic_settings import BaseSettings -from aiohttp import ClientResponseError -from aiohttp_client_cache.session import CachedSession -from aiohttp_client_cache import FileBackend, RedisBackend - -load_dotenv(override=True) - -handler = EliotHandler() - -logging.getLogger("fastapi").setLevel(logging.INFO) -logging.getLogger("fastapi").addHandler(handler) - -exiftool_logger = logging.getLogger("exiftool") -exiftool_logger.addHandler(handler) - -logger = logging.Logger("wpd") -logger.addHandler(handler) - -if environ.get("DEBUG"): - to_file(open("eliot.log", "wb")) - - -# --- # - - -class CacheTypes(Enum): - file = "file" - redis = "redis" - - -class Config(BaseSettings): - USE_CACHE: bool = True - CACHE_TYPE: CacheTypes = CacheTypes.file - REDIS_CONNECTION_URL: str = "" - - @field_validator("USE_CACHE", mode="before") - def validate_use_cache(cls, value): - # Return default if value is an empty string - if value == "": - return True # Default value for USE_CACHE - return value - - @field_validator("CACHE_TYPE", mode="before") - def validate_cache_type(cls, value): - # Thanks https://stackoverflow.com/a/78157474 - if value == "": - return "file" - return value - - @model_validator(mode="after") - def prevent_mismatched_redis_url(self): - match self.CACHE_TYPE: - case CacheTypes.file: - if self.REDIS_CONNECTION_URL: - raise ValueError( - "REDIS_CONNECTION_URL provided when File cache selected. To use Redis as a cache, set CACHE_TYPE=redis." - ) - case CacheTypes.redis: - if not self.REDIS_CONNECTION_URL: - raise ValueError( - "REDIS_CONNECTION_URL not provided when Redis cache selected. To use File cache, set CACHE_TYPE=file." - ) - return self - - -config = Config() - -# --- # - -headers = { - "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36" -} - -if config.USE_CACHE: - match config.CACHE_TYPE: - case CacheTypes.file: - cache = FileBackend(use_temp=True, expire_after=43200) # 12 hours - case CacheTypes.redis: - cache = RedisBackend( - cache_name="wpd-aiohttp-cache", - address=config.REDIS_CONNECTION_URL, - expire_after=43200, # 12 hours - ) -else: - cache = None - -logger.info(f"Using {cache=}") - -# --- Utilities --- # - - -def smart_trim(text: str, max_length: int = 400) -> str: - """Truncate a string intelligently at newlines. Coherence and max-length adherence.""" - chunks = [t for t in text.split("\n") if t] - - to_return = "" - for chunk in chunks: - if len(to_return) + len(chunk) < max_length: - to_return = chunk + "
" - else: - to_return = to_return.rstrip("
") - break - - return to_return - - -def generate_clean_part_html(part: Part, content: str) -> bs4.Tag: - """Rebuild HTML Structure for a Part.""" - chapter_title = part["title"] - chapter_id = part["id"] - - clean = BeautifulSoup( - f""" -
-

{chapter_title}

-
- """, - "html.parser", - ) # html.parser doesn't create / tags automatically - - html = BeautifulSoup(content, "lxml") - for br in html.find_all("br"): - # Check if no content after br - if not br.next_sibling or br.next_sibling.name in ["br", None]: - br.decompose() - - section = cast(bs4.Tag, clean.find("section")) - if not section: - raise Exception() - - for child in html.find_all("p"): - current_paragraph = clean.new_tag("p") - - # Attempt to carry over paragraph styling - current_paragraph["style"] = child.get("style", "text-align: left;") - - for p_child in list(child.children): - if not p_child: - continue - if isinstance(p_child, bs4.element.Tag): - if p_child.name == "br": - p_child.decompose() - elif p_child.name == "img": - src = p_child["src"] - img_tag = clean.new_tag("img") - img_tag["src"] = src - section.append(img_tag) - section.append(clean.new_tag("br")) - elif p_child.name in ["b", "i"]: - styled_tag = clean.new_tag(p_child.name) - styled_content = clean.new_string(p_child.text) - styled_tag.append(styled_content) - current_paragraph.append(styled_tag) - else: - # Append any other tags as-is - current_paragraph.append(p_child) - elif isinstance(p_child, bs4.element.NavigableString): - content = clean.new_string(p_child) - current_paragraph.append(content) - - if current_paragraph.contents: - section.append(current_paragraph) - - if not list(child.children): - # Some p tags only contain brs, once brs are removed, they are empty and can be removed as well. - child.decompose() - - return section - - -def slugify(value, allow_unicode=False) -> str: - """ - Taken from https://github.com/django/django/blob/master/django/utils/text.py - Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated - dashes to single dashes. Remove characters that aren't alphanumerics, - underscores, or hyphens. Convert to lowercase. Also strip leading and - trailing whitespace, dashes, and underscores. - - Thanks https://stackoverflow.com/a/295466. - """ - value = str(value) - if allow_unicode: - value = unicodedata.normalize("NFKC", value) - else: - value = ( - unicodedata.normalize("NFKD", value) - .encode("ascii", "ignore") - .decode("ascii") - ) - value = re.sub(r"[^\w\s-]", "", value.lower()) - return re.sub(r"[-\s]+", "-", value).strip("-_") - - -async def fetch_cookies(username: str, password: str) -> dict: - # source: https://github.com/TheOnlyWayUp/WP-DM-Export/blob/dd4c7c51cb43f2108e0f63fc10a66cd24a740e4e/src/API/src/main.py#L25-L58 - """Retrieves authorization cookies from Wattpad by logging in with user creds. - - Args: - username (str): Username. - password (str): Password. - - Raises: - ValueError: Bad status code. - ValueError: No cookies returned. - - Returns: - dict: Authorization cookies. - """ - with start_action(action_type="api_fetch_cookies"): - async with CachedSession(headers=headers, cache=None) as session: - async with session.post( - "https://www.wattpad.com/auth/login?nextUrl=%2F&_data=routes%2Fauth.login", - data={ - "username": username.lower(), - "password": password, - }, # the username.lower() is for caching - ) as response: - if response.status != 204: - raise ValueError("Not a 204.") - - cookies = { - k: v.value - for k, v in response.cookies.items() # Thanks https://stackoverflow.com/a/32281245 - } - - if not cookies: - raise ValueError("No cookies.") - - return cookies - - -# --- Models --- # - - -class CopyrightData(TypedDict): - name: str - statement: str - freedoms: str - printing: str - image_url: Optional[str] - - -class Language(TypedDict): - name: str - - -class User(TypedDict): - username: str - avatar: str - description: str - - -class Part(TypedDict): - id: int - title: str - - -class Story(TypedDict): - id: str - title: str - createDate: str - modifyDate: str - language: Language - user: User - description: str - cover: str - completed: bool - tags: List[str] - mature: bool - url: str - parts: List[Part] - isPaywalled: bool - copyright: int - - -story_ta = TypeAdapter(Story) - - -# --- Exceptions --- # - - -class WattpadError(Exception): - """Base Exception class for Wattpad related errors.""" - - -class StoryNotFoundError(WattpadError): - """Display the "This story was not found" error to the user.""" - - ... - - -class PartNotFoundError(StoryNotFoundError): ... - - -# --- API Calls --- # - - -@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15) -async def fetch_story_from_partId( - part_id: int, cookies: Optional[dict] = None -) -> Tuple[int, Story]: - """Fetch Story metadata from a Part ID.""" - with start_action(action_type="api_fetch_storyFromPartId"): - async with CachedSession( - headers=headers, cache=None if cookies else cache - ) as session: # Don't cache requests with Cookies. - async with session.get( - f"https://www.wattpad.com/api/v3/story_parts/{part_id}?fields=groupId,group(tags,id,title,createDate,modifyDate,language(name),description,completed,mature,url,isPaywalled,user(username,avatar,description),parts(id,title),cover,copyright)" - ) as response: - body = await response.json() - - if response.status == 400: - match body.get("error_code"): - case 1020: # "Story part not found" - logger.info(f"{part_id=} not found on Wattpad, returning.") - raise PartNotFoundError() - - response.raise_for_status() - - return int(body["groupId"]), story_ta.validate_python(body["group"]) - - -@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15) -async def fetch_story(story_id: int, cookies: Optional[dict] = None) -> Story: - """Fetch Story metadata from a Story ID.""" - with start_action(action_type="api_fetch_story", story_id=story_id): - async with CachedSession( - headers=headers, cookies=cookies, cache=None if cookies else cache - ) as session: - async with session.get( - f"https://www.wattpad.com/api/v3/stories/{story_id}?fields=tags,id,title,createDate,modifyDate,language(name),description,completed,mature,url,isPaywalled,user(username,avatar,description),parts(id,title),cover,copyright" - ) as response: - body = await response.json() - - if response.status == 400: - match body.get("error_code"): - case 1017: # "Story not found" - logger.info(f"{story_id=} not found on Wattpad, returning.") - raise StoryNotFoundError() - - response.raise_for_status() - - return story_ta.validate_python(body) - - -@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15) -async def fetch_story_content_zip( - story_id: int, cookies: Optional[dict] = None -) -> BytesIO: - """BytesIO Stream of an Archive of Part Contents for a Story.""" - with start_action(action_type="api_fetch_storyZip", story_id=story_id): - async with CachedSession( - headers=headers, - cookies=cookies, - cache=None if cookies else cache, - ) as session: - async with session.get( - f"https://www.wattpad.com/apiv2/?m=storytext&group_id={story_id}&output=zip" - ) as response: - response.raise_for_status() - - bytes_stream = BytesIO(await response.read()) - - return bytes_stream - - -@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15) -async def fetch_image(url: str, should_cache: bool = False) -> bytes: - """Fetch image bytes.""" - with start_action(action_type="api_fetch_image", url=url): - async with CachedSession( - headers=headers, cache=cache if should_cache else None - ) as session: # Don't cache images. - async with session.get(url) as response: - response.raise_for_status() - - body = await response.read() - - return body - - -# --- Generation --- # - - -class EPUBGenerator: - """EPUB Generation utilities""" - - def __init__(self, data: Story, cover: bytes): - """Initialize EPUBGenerator. Create epub.EpubBook() and set metadata and cover.""" - self.epub = epub.EpubBook() - self.data = data - self.cover = cover - - # set metadata, defined in https://www.dublincore.org/specifications/dublin-core/dcmi-terms/#section-2 - self.epub.add_author(data["user"]["username"]) - - self.epub.add_metadata("DC", "title", data["title"]) - self.epub.add_metadata("DC", "description", data["description"]) - self.epub.add_metadata("DC", "date", data["createDate"]) - self.epub.add_metadata("DC", "modified", data["modifyDate"]) - self.epub.add_metadata("DC", "language", data["language"]["name"]) - - self.epub.add_metadata( - None, "meta", "", {"name": "tags", "content": ", ".join(data["tags"])} - ) - self.epub.add_metadata( - None, "meta", "", {"name": "mature", "content": str(int(data["mature"]))} - ) - self.epub.add_metadata( - None, - "meta", - "", - {"name": "completed", "content": str(int(data["completed"]))}, - ) - - # Set cover - self.epub.set_cover("cover.jpg", cover) - cover_chapter = epub.EpubHtml( - file_name="titlepage.xhtml", # Standard for cover page - ) - cover_chapter.set_content('') - self.epub.add_item(cover_chapter) - - async def add_chapters( - self, contents: List[bs4.Tag], download_images: bool = False - ): - """Add chapters to the Epub, downloading images if necessary. Sets the table of contents and spine.""" - chapters: List[epub.EpubHtml] = [] - - for cidx, (part, content) in enumerate(zip(self.data["parts"], contents)): - title = part["title"] - - # Thanks https://eu17.proxysite.com/process.php?d=5VyWYcoQl%2BVF0BYOuOavtvjOloFUZz2BJ%2Fepiusk6Nz7PV%2B9i8rs7cFviGftrBNll%2B0a3qO7UiDkTt4qwCa0fDES&b=1 - chapter = epub.EpubHtml( - title=title, - file_name=f"{cidx}_{part['id']}.xhtml", # See issue #30 - lang=self.data["language"]["name"], - uid=str(part["id"]).encode(), - ) - - str_content = content.prettify() - if download_images: - soup = content - - async with CachedSession( - headers=headers, cache=None - ) as session: # Don't cache images. - for idx, image in enumerate(soup.find_all("img")): - if not image["src"]: - continue - # Find all image tags and filter for those with sources - - async with session.get(image["src"]) as response: - img = epub.EpubImage( - media_type="image/jpeg", - content=await response.read(), - file_name=f"static/{cidx}/{idx}.jpeg", - ) - self.epub.add_item(img) - # Fetch image and pack - - str_content = str_content.replace( - str(image["src"]), f"static/{cidx}/{idx}.jpeg" - ) - - chapter.set_content(str_content) - self.epub.add_item(chapter) - - chapters.append(chapter) - - yield title - - self.epub.toc = chapters - - # Thanks https://github.com/aerkalov/ebooklib/blob/master/samples/09_create_image/create.py - self.epub.add_item(epub.EpubNcx()) - self.epub.add_item(epub.EpubNav()) - - # create spine - self.epub.spine = ["nav"] + chapters - - def dump(self) -> BytesIO: - # Thanks https://stackoverflow.com/a/75398222 - buffer = BytesIO() - epub.write_epub(buffer, self.epub) - - buffer.seek(0) - - return buffer - - -class PDFGenerator: - """PDF Generation utilities""" - - def __init__(self, data: Story, cover: bytes): - """Initialize PDGenerator, create PDF Temporary file.""" - self.data = data - self.file = tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) - self.cover = cover - self.content: str = "" - self.copyright = { - 1: { - "name": "All Rights Reserved", - "statement": "©️ {published_year} by {username}. All Rights Reserved.", - "freedoms": "No reuse, redistribution, or modification without permission.", - "printing": "Not allowed without explicit permission.", - "image_url": None, - }, - 2: { - "name": "Public Domain", - "statement": "This work is in the public domain. Originally published in {published_year} by {username}.", - "freedoms": "Free to use for any purpose without permission.", - "printing": "Allowed for personal or commercial purposes.", - "image_url": "http://mirrors.creativecommons.org/presskit/buttons/88x31/png/cc-zero.png", - }, - 3: { - "name": "Creative Commons Attribution (CC-BY)", - "statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution 4.0 International License.", - "freedoms": "Allows reuse, redistribution, and modification with credit to the author.", - "printing": "Allowed with proper credit.", - "image_url": "https://mirrors.creativecommons.org/presskit/buttons/88x31/png/by.png", - }, - 4: { - "name": "CC Attribution NonCommercial (CC-BY-NC)", - "statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.", - "freedoms": "Allows reuse and modification for non-commercial purposes with credit.", - "printing": "Allowed for non-commercial purposes with proper credit.", - "image_url": "http://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-nc.png", - }, - 5: { - "name": "CC Attribution NonCommercial NoDerivs (CC-BY-NC-ND)", - "statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 4.0 International License.", - "freedoms": "Allows sharing in original form for non-commercial purposes with credit; no modifications allowed.", - "printing": "Allowed for non-commercial purposes in original form with proper credit.", - "image_url": "http://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-nc-nd.png", - }, - 6: { - "name": "CC Attribution NonCommercial ShareAlike (CC-BY-NC-SA)", - "statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.", - "freedoms": "Allows reuse and modification for non-commercial purposes under the same license, with credit.", - "printing": "Allowed for non-commercial purposes with proper credit under the same license.", - "image_url": "http://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-nc-sa.png", - }, - 7: { - "name": "CC Attribution ShareAlike (CC-BY-SA)", - "statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License.", - "freedoms": "Allows reuse and modification for any purpose under the same license, with credit.", - "printing": "Allowed with proper credit under the same license.", - "image_url": "https://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-sa.png", - }, - 8: { - "name": "CC Attribution NoDerivs (CC-BY-ND)", - "statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NoDerivs 4.0 International License.", - "freedoms": "Allows sharing in original form for any purpose with credit; no modifications allowed.", - "printing": "Allowed in original form with proper credit.", - "image_url": "https://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-nd.png", - }, - } - - with open("./pdf/stylesheet.css") as reader: - self.stylesheet = reader.read() - with open("./pdf/book.html") as reader: - self.template = reader.read() - - async def generate_cover_and_copyright_html( - self, - ) -> str: - """Generate Cover and Copyright file, fetch copyright image (cached), use self.cover for cover.""" - - copyright_data = self.copyright[self.data["copyright"]] - - template = self.template - about_copyright = ( - template.replace( - "{statement}", - copyright_data["statement"].format( - username=self.data["user"]["username"], - published_year=self.data["createDate"].split("-", 2)[0], - ), - ) - .replace("{author}", self.data["user"]["username"]) - .replace("{freedoms}", copyright_data["freedoms"]) - .replace( - "{printing}", - copyright_data["printing"], - ) - .replace("{book_id}", self.data["id"]) - .replace("{book_title}", self.data["title"]) - ) - - copyright_image = ( - await fetch_image(copyright_data["image_url"], should_cache=True) - if copyright_data["image_url"] - else None - ) - image_block = ( - """""".format( - image_url=f"data:image/jpg;base64,{b64encode(copyright_image).decode()}", - name=copyright_data["name"], - ) - if copyright_image - else "" - ) - about_copyright = ( - about_copyright.replace( - "{copyright_image}", - image_block, - ) - if image_block - else about_copyright.replace("{copyright_image}", "") - ) - about_copyright = about_copyright.replace( - "{cover}", f"data:image/jpg;base64,{b64encode(self.cover).decode()}" - ) - - self.template = about_copyright - return about_copyright - - async def generate_about_author_chapter(self) -> str: - """Generate About the Author file, fetch avatar.""" - author_avatar = ( - await fetch_image( - self.data["user"]["avatar"].replace("128", "512") - ) # Increase image resolution - if self.data["user"]["avatar"] - else None - ) - about_author = self.template.replace( - "{username}", self.data["user"]["username"] - ).replace("{description}", smart_trim(self.data["user"]["description"])) - - about_author = ( - about_author.replace( - "{avatar}", - f""" - Author's profile picture""", - ) - if author_avatar - else about_author.replace("{avatar}", "") - ) - - self.template = about_author - return about_author - - def generate_toc(self): - ids = [part["id"] for part in self.data["parts"]] - clean = BeautifulSoup( - """ -
-

Table of Contents

- -
- """, - "html.parser", - ) # html.parser doesn't create / tags automatically - - ul = cast(bs4.Tag, clean.find("ul")) - for part_id in ids: - li = clean.new_tag("li") - a = clean.new_tag("a") - a["href"] = f"#{part_id}" - li.append(a) - ul.append(li) - - insert_point = cast(bs4.Tag, self.tree.find("div", {"id": "book"})) - insert_point.append(clean) - return str(clean) - - async def add_chapters( - self, contents: List[bs4.Tag], download_images: bool = False - ): - """Add chapters to the PDF, downloading images if necessary. Also add Cover, Copyright, and About the Author pages.""" - - # # Cover and Copyright Page - await self.generate_cover_and_copyright_html() - await self.generate_about_author_chapter() - self.tree = BeautifulSoup(self.template, "lxml") - - self.generate_toc() - for part, content in zip(self.data["parts"], contents): - insert_point = cast(bs4.Tag, self.tree.find("div", {"id": "book"})) - insert_point.append(content) - - yield part["title"] - - # # About the Author page - # about_author_html = await self.generate_about_author_chapter() - - # chapters.insert(0, cover_and_copyright_html) - # chapters.append(about_author_html) - - with start_action( - action_type="generate_pdf", - output_filename=self.file.name, - title=self.data["title"], - ): - # PDF Generation with wkhtmltopdf, written to self.file - - # At this stage, we have a bunch of HTML Files representing all the chapters that need to be generated. PDFKit handles ToC generation, so that's not included. - - font_config = FontConfiguration() - - stylesheet_obj = CSS(string=self.stylesheet, font_config=font_config) - - html_obj = HTML(string=str(self.tree)) - html_obj.write_pdf( - self.file.name, stylesheets=[stylesheet_obj], font_config=font_config - ) - - with start_action(action_type="add_metadata") as action: - # Metadata generation with Exiftool - clean_description = ( - self.data["description"].strip().replace("\n", "$/") - ) # exiftool doesn't parse \ns correctly, they support $/ for the same instead. ` ` is another option. - - action.log(f"clean_description: {clean_description}") - - metadata = { - "Author": self.data["user"]["username"], - "Title": self.data["title"], - "Subject": clean_description, - "CreationDate": self.data["createDate"], - "ModDate": self.data["modifyDate"], - "Keywords": ",".join(self.data["tags"]), - "Language": self.data["language"]["name"], - "Completed": self.data["completed"], - "MatureContent": self.data["mature"], - "Producer": "Dhanush Rambhatla (TheOnlyWayUp - https://rambhat.la) and WattpadDownloader", - } # As per https://exiftool.org/TagNames/PDF.html - - action.log(f"options: {metadata}") - - with ExifTool( - config_file="../exiftool.config", logger=exiftool_logger - ) as et: - # Custom configuration adds Completed and MatureContent tags. - # exiftool logger logs executed command - et.execute( - *( - [f"-{key}={value}" for key, value in metadata.items()] - + [ - "-overwrite_original", - self.file.file.name, - ] - ) - ) - - def dump(self) -> BytesIO: - self.file.seek(0) - buffer = BytesIO(self.file.read()) - self.file.close() - - return buffer - - -# ------ # diff --git a/src/api/src/create_book/__init__.py b/src/api/src/create_book/__init__.py new file mode 100644 index 0000000..5450f75 --- /dev/null +++ b/src/api/src/create_book/__init__.py @@ -0,0 +1,10 @@ +from .create_book import ( + fetch_story, + fetch_story_from_partId, + fetch_story_content_zip, + fetch_image, + fetch_cookies, +) +from generators import PDFGenerator, EPUBGenerator +from exceptions import WattpadError, StoryNotFoundError, PartNotFoundError +from utils import generate_clean_part_html, slugify, logger diff --git a/src/api/src/create_book/config.py b/src/api/src/create_book/config.py new file mode 100644 index 0000000..efddf2e --- /dev/null +++ b/src/api/src/create_book/config.py @@ -0,0 +1,45 @@ +from enum import Enum +from pydantic import model_validator, field_validator +from pydantic_settings import BaseSettings + + +class CacheTypes(Enum): + file = "file" + redis = "redis" + + +class Config(BaseSettings): + # Values can be overriden by envvars. + + USE_CACHE: bool = True + CACHE_TYPE: CacheTypes = CacheTypes.file + REDIS_CONNECTION_URL: str = "" + + @field_validator("USE_CACHE", mode="before") + def validate_use_cache(cls, value): + # Return default if value is an empty string + if value == "": + return True # Default value for USE_CACHE + return value + + @field_validator("CACHE_TYPE", mode="before") + def validate_cache_type(cls, value): + # Thanks https://stackoverflow.com/a/78157474 + if value == "": + return "file" + return value + + @model_validator(mode="after") + def prevent_mismatched_redis_url(self): + match self.CACHE_TYPE: + case CacheTypes.file: + if self.REDIS_CONNECTION_URL: + raise ValueError( + "REDIS_CONNECTION_URL provided when File cache selected. To use Redis as a cache, set CACHE_TYPE=redis." + ) + case CacheTypes.redis: + if not self.REDIS_CONNECTION_URL: + raise ValueError( + "REDIS_CONNECTION_URL not provided when Redis cache selected. To use File cache, set CACHE_TYPE=file." + ) + return self diff --git a/src/api/src/create_book/create_book.py b/src/api/src/create_book/create_book.py new file mode 100644 index 0000000..c1e7f9e --- /dev/null +++ b/src/api/src/create_book/create_book.py @@ -0,0 +1,165 @@ +from __future__ import annotations +from typing import Optional, Tuple +from io import BytesIO +import backoff +from pydantic import TypeAdapter +from config import Config, CacheTypes +from logs import logger +from eliot import start_action +from dotenv import load_dotenv +from aiohttp import ClientResponseError +from aiohttp_client_cache.session import CachedSession +from aiohttp_client_cache import FileBackend, RedisBackend +from models import Story +from exceptions import PartNotFoundError, StoryNotFoundError + +load_dotenv(override=True) + +config = Config() +story_ta = TypeAdapter(Story) + +# --- # + +headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36" +} + +if config.USE_CACHE: + match config.CACHE_TYPE: + case CacheTypes.file: + cache = FileBackend(use_temp=True, expire_after=43200) # 12 hours + case CacheTypes.redis: + cache = RedisBackend( + cache_name="wpd-aiohttp-cache", + address=config.REDIS_CONNECTION_URL, + expire_after=43200, # 12 hours + ) +else: + cache = None + +logger.info(f"Using {cache=}") + + +async def fetch_cookies(username: str, password: str) -> dict: + # source: https://github.com/TheOnlyWayUp/WP-DM-Export/blob/dd4c7c51cb43f2108e0f63fc10a66cd24a740e4e/src/API/src/main.py#L25-L58 + """Retrieves authorization cookies from Wattpad by logging in with user creds. + + Args: + username (str): Username. + password (str): Password. + + Raises: + ValueError: Bad status code. + ValueError: No cookies returned. + + Returns: + dict: Authorization cookies. + """ + with start_action(action_type="api_fetch_cookies"): + async with CachedSession(headers=headers, cache=None) as session: + async with session.post( + "https://www.wattpad.com/auth/login?nextUrl=%2F&_data=routes%2Fauth.login", + data={ + "username": username.lower(), + "password": password, + }, # the username.lower() is for caching + ) as response: + if response.status != 204: + raise ValueError("Not a 204.") + + cookies = { + k: v.value + for k, v in response.cookies.items() # Thanks https://stackoverflow.com/a/32281245 + } + + if not cookies: + raise ValueError("No cookies.") + + return cookies + + +# --- API Calls --- # + + +@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15) +async def fetch_story_from_partId( + part_id: int, cookies: Optional[dict] = None +) -> Tuple[int, Story]: + """Fetch Story metadata from a Part ID.""" + with start_action(action_type="api_fetch_storyFromPartId"): + async with CachedSession( + headers=headers, cache=None if cookies else cache + ) as session: # Don't cache requests with Cookies. + async with session.get( + f"https://www.wattpad.com/api/v3/story_parts/{part_id}?fields=groupId,group(tags,id,title,createDate,modifyDate,language(name),description,completed,mature,url,isPaywalled,user(username,avatar,description),parts(id,title),cover,copyright)" + ) as response: + body = await response.json() + + if response.status == 400: + match body.get("error_code"): + case 1020: # "Story part not found" + logger.info(f"{part_id=} not found on Wattpad, returning.") + raise PartNotFoundError() + + response.raise_for_status() + + return int(body["groupId"]), story_ta.validate_python(body["group"]) + + +@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15) +async def fetch_story(story_id: int, cookies: Optional[dict] = None) -> Story: + """Fetch Story metadata from a Story ID.""" + with start_action(action_type="api_fetch_story", story_id=story_id): + async with CachedSession( + headers=headers, cookies=cookies, cache=None if cookies else cache + ) as session: + async with session.get( + f"https://www.wattpad.com/api/v3/stories/{story_id}?fields=tags,id,title,createDate,modifyDate,language(name),description,completed,mature,url,isPaywalled,user(username,avatar,description),parts(id,title),cover,copyright" + ) as response: + body = await response.json() + + if response.status == 400: + match body.get("error_code"): + case 1017: # "Story not found" + logger.info(f"{story_id=} not found on Wattpad, returning.") + raise StoryNotFoundError() + + response.raise_for_status() + + return story_ta.validate_python(body) + + +@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15) +async def fetch_story_content_zip( + story_id: int, cookies: Optional[dict] = None +) -> BytesIO: + """BytesIO Stream of an Archive of Part Contents for a Story.""" + with start_action(action_type="api_fetch_storyZip", story_id=story_id): + async with CachedSession( + headers=headers, + cookies=cookies, + cache=None if cookies else cache, + ) as session: + async with session.get( + f"https://www.wattpad.com/apiv2/?m=storytext&group_id={story_id}&output=zip" + ) as response: + response.raise_for_status() + + bytes_stream = BytesIO(await response.read()) + + return bytes_stream + + +@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15) +async def fetch_image(url: str, should_cache: bool = False) -> bytes: + """Fetch image bytes.""" + with start_action(action_type="api_fetch_image", url=url): + async with CachedSession( + headers=headers, cache=cache if should_cache else None + ) as session: # Don't cache images. + async with session.get(url) as response: + response.raise_for_status() + + body = await response.read() + + return body diff --git a/src/api/src/create_book/exceptions.py b/src/api/src/create_book/exceptions.py new file mode 100644 index 0000000..50225d7 --- /dev/null +++ b/src/api/src/create_book/exceptions.py @@ -0,0 +1,11 @@ +class WattpadError(Exception): + """Base Exception class for Wattpad related errors.""" + + +class StoryNotFoundError(WattpadError): + """Display the "This story was not found" error to the user.""" + + ... + + +class PartNotFoundError(StoryNotFoundError): ... diff --git a/src/api/src/create_book/generators/__init__.py b/src/api/src/create_book/generators/__init__.py new file mode 100644 index 0000000..e4c891e --- /dev/null +++ b/src/api/src/create_book/generators/__init__.py @@ -0,0 +1,2 @@ +from epub import EPUBGenerator +from pdf import PDFGenerator diff --git a/src/api/src/create_book/generators/epub.py b/src/api/src/create_book/generators/epub.py new file mode 100644 index 0000000..1c919ee --- /dev/null +++ b/src/api/src/create_book/generators/epub.py @@ -0,0 +1,115 @@ +from ebooklib import epub +from typing import List +from models import Story +from io import BytesIO +import bs4 +from aiohttp_client_cache.session import CachedSession + +headers = {} + + +class EPUBGenerator: + """EPUB Generation utilities""" + + def __init__(self, data: Story, cover: bytes): + """Initialize EPUBGenerator. Create epub.EpubBook() and set metadata and cover.""" + self.epub = epub.EpubBook() + self.data = data + self.cover = cover + + # set metadata, defined in https://www.dublincore.org/specifications/dublin-core/dcmi-terms/#section-2 + self.epub.add_author(data["user"]["username"]) + + self.epub.add_metadata("DC", "title", data["title"]) + self.epub.add_metadata("DC", "description", data["description"]) + self.epub.add_metadata("DC", "date", data["createDate"]) + self.epub.add_metadata("DC", "modified", data["modifyDate"]) + self.epub.add_metadata("DC", "language", data["language"]["name"]) + + self.epub.add_metadata( + None, "meta", "", {"name": "tags", "content": ", ".join(data["tags"])} + ) + self.epub.add_metadata( + None, "meta", "", {"name": "mature", "content": str(int(data["mature"]))} + ) + self.epub.add_metadata( + None, + "meta", + "", + {"name": "completed", "content": str(int(data["completed"]))}, + ) + + # Set cover + self.epub.set_cover("cover.jpg", cover) + cover_chapter = epub.EpubHtml( + file_name="titlepage.xhtml", # Standard for cover page + ) + cover_chapter.set_content('') + self.epub.add_item(cover_chapter) + + async def add_chapters( + self, contents: List[bs4.Tag], download_images: bool = False + ): + """Add chapters to the Epub, downloading images if necessary. Sets the table of contents and spine.""" + chapters: List[epub.EpubHtml] = [] + + for cidx, (part, content) in enumerate(zip(self.data["parts"], contents)): + title = part["title"] + + # Thanks https://eu17.proxysite.com/process.php?d=5VyWYcoQl%2BVF0BYOuOavtvjOloFUZz2BJ%2Fepiusk6Nz7PV%2B9i8rs7cFviGftrBNll%2B0a3qO7UiDkTt4qwCa0fDES&b=1 + chapter = epub.EpubHtml( + title=title, + file_name=f"{cidx}_{part['id']}.xhtml", # See issue #30 + lang=self.data["language"]["name"], + uid=str(part["id"]).encode(), + ) + + str_content = content.prettify() + if download_images: # ! TODO : Download images elsewhere + soup = content + + async with CachedSession( + headers=headers, cache=None + ) as session: # Don't cache images. + for idx, image in enumerate(soup.find_all("img")): + if not image["src"]: + continue + # Find all image tags and filter for those with sources + + async with session.get(image["src"]) as response: + img = epub.EpubImage( + media_type="image/jpeg", + content=await response.read(), + file_name=f"static/{cidx}/{idx}.jpeg", + ) + self.epub.add_item(img) + # Fetch image and pack + + str_content = str_content.replace( + str(image["src"]), f"static/{cidx}/{idx}.jpeg" + ) + + chapter.set_content(str_content) + self.epub.add_item(chapter) + + chapters.append(chapter) + + yield title + + self.epub.toc = chapters + + # Thanks https://github.com/aerkalov/ebooklib/blob/master/samples/09_create_image/create.py + self.epub.add_item(epub.EpubNcx()) + self.epub.add_item(epub.EpubNav()) + + # create spine + self.epub.spine = ["nav"] + chapters + + def dump(self) -> BytesIO: + # Thanks https://stackoverflow.com/a/75398222 + buffer = BytesIO() + epub.write_epub(buffer, self.epub) + + buffer.seek(0) + + return buffer diff --git a/src/api/src/create_book/generators/pdf.py b/src/api/src/create_book/generators/pdf.py new file mode 100644 index 0000000..dd49fea --- /dev/null +++ b/src/api/src/create_book/generators/pdf.py @@ -0,0 +1,286 @@ +from typing import List, cast +import tempfile +from base64 import b64encode +import bs4 +from weasyprint import HTML, CSS +from weasyprint.text.fonts import FontConfiguration +from exiftool import ExifTool +from logs import exiftool_logger +from bs4 import BeautifulSoup +from utils import smart_trim +from models import Story +from eliot import start_action +from io import BytesIO + + +async def fetch_image(*args, **kwargs): + # TODO + raise NotImplementedError() + + +class PDFGenerator: + """PDF Generation utilities""" + + def __init__(self, data: Story, cover: bytes): + """Initialize PDGenerator, create PDF Temporary file.""" + self.data = data + self.file = tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) + self.cover = cover + self.content: str = "" + self.copyright = { + 1: { + "name": "All Rights Reserved", + "statement": "©️ {published_year} by {username}. All Rights Reserved.", + "freedoms": "No reuse, redistribution, or modification without permission.", + "printing": "Not allowed without explicit permission.", + "image_url": None, + }, + 2: { + "name": "Public Domain", + "statement": "This work is in the public domain. Originally published in {published_year} by {username}.", + "freedoms": "Free to use for any purpose without permission.", + "printing": "Allowed for personal or commercial purposes.", + "image_url": "http://mirrors.creativecommons.org/presskit/buttons/88x31/png/cc-zero.png", + }, + 3: { + "name": "Creative Commons Attribution (CC-BY)", + "statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution 4.0 International License.", + "freedoms": "Allows reuse, redistribution, and modification with credit to the author.", + "printing": "Allowed with proper credit.", + "image_url": "https://mirrors.creativecommons.org/presskit/buttons/88x31/png/by.png", + }, + 4: { + "name": "CC Attribution NonCommercial (CC-BY-NC)", + "statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.", + "freedoms": "Allows reuse and modification for non-commercial purposes with credit.", + "printing": "Allowed for non-commercial purposes with proper credit.", + "image_url": "http://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-nc.png", + }, + 5: { + "name": "CC Attribution NonCommercial NoDerivs (CC-BY-NC-ND)", + "statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 4.0 International License.", + "freedoms": "Allows sharing in original form for non-commercial purposes with credit; no modifications allowed.", + "printing": "Allowed for non-commercial purposes in original form with proper credit.", + "image_url": "http://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-nc-nd.png", + }, + 6: { + "name": "CC Attribution NonCommercial ShareAlike (CC-BY-NC-SA)", + "statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.", + "freedoms": "Allows reuse and modification for non-commercial purposes under the same license, with credit.", + "printing": "Allowed for non-commercial purposes with proper credit under the same license.", + "image_url": "http://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-nc-sa.png", + }, + 7: { + "name": "CC Attribution ShareAlike (CC-BY-SA)", + "statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License.", + "freedoms": "Allows reuse and modification for any purpose under the same license, with credit.", + "printing": "Allowed with proper credit under the same license.", + "image_url": "https://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-sa.png", + }, + 8: { + "name": "CC Attribution NoDerivs (CC-BY-ND)", + "statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NoDerivs 4.0 International License.", + "freedoms": "Allows sharing in original form for any purpose with credit; no modifications allowed.", + "printing": "Allowed in original form with proper credit.", + "image_url": "https://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-nd.png", + }, + } + + with open("./pdf/stylesheet.css") as reader: + self.stylesheet = reader.read() + with open("./pdf/book.html") as reader: + self.template = reader.read() + + async def generate_cover_and_copyright_html( + self, + ) -> str: + """Generate Cover and Copyright file, fetch copyright image (cached), use self.cover for cover.""" + + copyright_data = self.copyright[self.data["copyright"]] + + template = self.template + about_copyright = ( + template.replace( + "{statement}", + copyright_data["statement"].format( + username=self.data["user"]["username"], + published_year=self.data["createDate"].split("-", 2)[0], + ), + ) + .replace("{author}", self.data["user"]["username"]) + .replace("{freedoms}", copyright_data["freedoms"]) + .replace( + "{printing}", + copyright_data["printing"], + ) + .replace("{book_id}", self.data["id"]) + .replace("{book_title}", self.data["title"]) + ) + + copyright_image = ( + await fetch_image(copyright_data["image_url"], should_cache=True) + if copyright_data["image_url"] + else None + ) + image_block = ( + """""".format( + image_url=f"data:image/jpg;base64,{b64encode(copyright_image).decode()}", + name=copyright_data["name"], + ) + if copyright_image + else "" + ) + about_copyright = ( + about_copyright.replace( + "{copyright_image}", + image_block, + ) + if image_block + else about_copyright.replace("{copyright_image}", "") + ) + about_copyright = about_copyright.replace( + "{cover}", f"data:image/jpg;base64,{b64encode(self.cover).decode()}" + ) + + self.template = about_copyright + return about_copyright + + async def generate_about_author_chapter(self) -> str: + """Generate About the Author file, fetch avatar.""" + author_avatar = ( + await fetch_image( + self.data["user"]["avatar"].replace("128", "512") + ) # Increase image resolution + if self.data["user"]["avatar"] + else None + ) + about_author = self.template.replace( + "{username}", self.data["user"]["username"] + ).replace("{description}", smart_trim(self.data["user"]["description"])) + + about_author = ( + about_author.replace( + "{avatar}", + f""" + Author's profile picture""", + ) + if author_avatar + else about_author.replace("{avatar}", "") + ) + + self.template = about_author + return about_author + + def generate_toc(self): + ids = [part["id"] for part in self.data["parts"]] + clean = BeautifulSoup( + """ +
+

Table of Contents

+ +
+ """, + "html.parser", + ) # html.parser doesn't create / tags automatically + + ul = cast(bs4.Tag, clean.find("ul")) + for part_id in ids: + li = clean.new_tag("li") + a = clean.new_tag("a") + a["href"] = f"#{part_id}" + li.append(a) + ul.append(li) + + insert_point = cast(bs4.Tag, self.tree.find("div", {"id": "book"})) + insert_point.append(clean) + return str(clean) + + async def add_chapters( + self, contents: List[bs4.Tag], download_images: bool = False + ): + """Add chapters to the PDF, downloading images if necessary. Also add Cover, Copyright, and About the Author pages.""" + + # # Cover and Copyright Page + await self.generate_cover_and_copyright_html() + await self.generate_about_author_chapter() + self.tree = BeautifulSoup(self.template, "lxml") + + self.generate_toc() + for part, content in zip(self.data["parts"], contents): + insert_point = cast(bs4.Tag, self.tree.find("div", {"id": "book"})) + insert_point.append(content) + + yield part["title"] + + # # About the Author page + # about_author_html = await self.generate_about_author_chapter() + + # chapters.insert(0, cover_and_copyright_html) + # chapters.append(about_author_html) + + with start_action( + action_type="generate_pdf", + output_filename=self.file.name, + title=self.data["title"], + ): + # PDF Generation with wkhtmltopdf, written to self.file + + # At this stage, we have a bunch of HTML Files representing all the chapters that need to be generated. PDFKit handles ToC generation, so that's not included. + + font_config = FontConfiguration() + + stylesheet_obj = CSS(string=self.stylesheet, font_config=font_config) + + html_obj = HTML(string=str(self.tree)) + html_obj.write_pdf( + self.file.name, stylesheets=[stylesheet_obj], font_config=font_config + ) + + with start_action(action_type="add_metadata") as action: + # Metadata generation with Exiftool + clean_description = ( + self.data["description"].strip().replace("\n", "$/") + ) # exiftool doesn't parse \ns correctly, they support $/ for the same instead. ` ` is another option. + + action.log(f"clean_description: {clean_description}") + + metadata = { + "Author": self.data["user"]["username"], + "Title": self.data["title"], + "Subject": clean_description, + "CreationDate": self.data["createDate"], + "ModDate": self.data["modifyDate"], + "Keywords": ",".join(self.data["tags"]), + "Language": self.data["language"]["name"], + "Completed": self.data["completed"], + "MatureContent": self.data["mature"], + "Producer": "Dhanush Rambhatla (TheOnlyWayUp - https://rambhat.la) and WattpadDownloader", + } # As per https://exiftool.org/TagNames/PDF.html + + action.log(f"options: {metadata}") + + with ExifTool( + config_file="../exiftool.config", logger=exiftool_logger + ) as et: + # Custom configuration adds Completed and MatureContent tags. + # exiftool logger logs executed command + et.execute( + *( + [f"-{key}={value}" for key, value in metadata.items()] + + [ + "-overwrite_original", + self.file.file.name, + ] + ) + ) + + def dump(self) -> BytesIO: + self.file.seek(0) + buffer = BytesIO(self.file.read()) + self.file.close() + + return buffer diff --git a/src/api/src/create_book/logs.py b/src/api/src/create_book/logs.py new file mode 100644 index 0000000..6f4381d --- /dev/null +++ b/src/api/src/create_book/logs.py @@ -0,0 +1,18 @@ +import logging +from os import environ +from eliot import to_file +from eliot.stdlib import EliotHandler + +handler = EliotHandler() + +logging.getLogger("fastapi").setLevel(logging.INFO) +logging.getLogger("fastapi").addHandler(handler) + +exiftool_logger = logging.getLogger("exiftool") +exiftool_logger.addHandler(handler) + +logger = logging.Logger("wpd") +logger.addHandler(handler) + +if environ.get("DEBUG"): + to_file(open("eliot.log", "wb")) diff --git a/src/api/src/create_book/models.py b/src/api/src/create_book/models.py new file mode 100644 index 0000000..9b26357 --- /dev/null +++ b/src/api/src/create_book/models.py @@ -0,0 +1,42 @@ +from typing import TypedDict, Optional, List + + +class CopyrightData(TypedDict): + name: str + statement: str + freedoms: str + printing: str + image_url: Optional[str] + + +class Language(TypedDict): + name: str + + +class User(TypedDict): + username: str + avatar: str + description: str + + +class Part(TypedDict): + id: int + title: str + + +class Story(TypedDict): + id: str + title: str + createDate: str + modifyDate: str + language: Language + user: User + description: str + cover: str + completed: bool + tags: List[str] + mature: bool + url: str + parts: List[Part] + isPaywalled: bool + copyright: int diff --git a/src/api/src/create_book/utils.py b/src/api/src/create_book/utils.py new file mode 100644 index 0000000..4bf1075 --- /dev/null +++ b/src/api/src/create_book/utils.py @@ -0,0 +1,108 @@ +import re +import bs4 +import unicodedata +from bs4 import BeautifulSoup +from typing import cast +from models import Part + + +def smart_trim(text: str, max_length: int = 400) -> str: + """Truncate a string intelligently at newlines. Coherence and max-length adherence.""" + chunks = [t for t in text.split("\n") if t] + + to_return = "" + for chunk in chunks: + if len(to_return) + len(chunk) < max_length: + to_return = chunk + "
" + else: + to_return = to_return.rstrip("
") + break + + return to_return + + +def generate_clean_part_html(part: Part, content: str) -> bs4.Tag: + """Rebuild HTML Structure for a Part.""" + chapter_title = part["title"] + chapter_id = part["id"] + + clean = BeautifulSoup( + f""" +
+

{chapter_title}

+
+ """, + "html.parser", + ) # html.parser doesn't create / tags automatically + + html = BeautifulSoup(content, "lxml") + for br in html.find_all("br"): + # Check if no content after br + if not br.next_sibling or br.next_sibling.name in ["br", None]: + br.decompose() + + section = cast(bs4.Tag, clean.find("section")) + if not section: + raise Exception() + + for child in html.find_all("p"): + current_paragraph = clean.new_tag("p") + + # Attempt to carry over paragraph styling + current_paragraph["style"] = child.get("style", "text-align: left;") + + for p_child in list(child.children): + if not p_child: + continue + if isinstance(p_child, bs4.element.Tag): + if p_child.name == "br": + p_child.decompose() + elif p_child.name == "img": + src = p_child["src"] + img_tag = clean.new_tag("img") + img_tag["src"] = src + section.append(img_tag) + section.append(clean.new_tag("br")) + elif p_child.name in ["b", "i"]: + styled_tag = clean.new_tag(p_child.name) + styled_content = clean.new_string(p_child.text) + styled_tag.append(styled_content) + current_paragraph.append(styled_tag) + else: + # Append any other tags as-is + current_paragraph.append(p_child) + elif isinstance(p_child, bs4.element.NavigableString): + content = clean.new_string(p_child) + current_paragraph.append(content) + + if current_paragraph.contents: + section.append(current_paragraph) + + if not list(child.children): + # Some p tags only contain brs, once brs are removed, they are empty and can be removed as well. + child.decompose() + + return section + + +def slugify(value, allow_unicode=False) -> str: + """ + Taken from https://github.com/django/django/blob/master/django/utils/text.py + Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated + dashes to single dashes. Remove characters that aren't alphanumerics, + underscores, or hyphens. Convert to lowercase. Also strip leading and + trailing whitespace, dashes, and underscores. + + Thanks https://stackoverflow.com/a/295466. + """ + value = str(value) + if allow_unicode: + value = unicodedata.normalize("NFKC", value) + else: + value = ( + unicodedata.normalize("NFKD", value) + .encode("ascii", "ignore") + .decode("ascii") + ) + value = re.sub(r"[^\w\s-]", "", value.lower()) + return re.sub(r"[-\s]+", "-", value).strip("-_")