diff --git a/src/api/src/create_book.py b/src/api/src/create_book.py index 7df2f2f..bd2daa2 100644 --- a/src/api/src/create_book.py +++ b/src/api/src/create_book.py @@ -2,7 +2,6 @@ from __future__ import annotations from typing import List, Optional, Tuple, cast from typing_extensions import TypedDict import re -import json import logging import tempfile import unicodedata @@ -127,16 +126,77 @@ def smart_trim(text: str, max_length: int = 400) -> str: return to_return -def clean_part_text(text: str) -> str: - """Remove unnecessary newlines from Text""" - soup = BeautifulSoup(text, "lxml") +def generate_clean_part_html(part: Part, content: str) -> bs4.Tag: + chapter_title = part["title"] + chapter_id = part["id"] - for br in soup.find_all("br"): + clean = BeautifulSoup( + f""" +
+

{chapter_title}

+
+ """, + "html.parser", + ) # html.parser doesn't create / tags automatically + + html = BeautifulSoup(content, "lxml") + for br in html.find_all("br"): # Check if no content after br if not br.next_sibling or br.next_sibling.name in ["br", None]: br.decompose() - return str(soup) + section = cast(bs4.Tag, clean.find("section")) + if not section: + raise Exception() + + for child in html.find_all("p"): + for p_child in list(child.children): + if not p_child: + continue + if isinstance(p_child, bs4.element.Tag): + if p_child.name == "br": + p_child.decompose() + elif p_child.name == "img": + src = p_child["src"] + img_tag = clean.new_tag("img") + img_tag["src"] = src + break_tag = clean.new_tag("br") + section.append(img_tag) + section.append(break_tag) + elif p_child.name == "b": + content = p_child.text + p_tag = clean.new_tag("p") + bold_tag = clean.new_tag("b") + bold_content = clean.new_string(content) + + bold_tag.append(bold_content) + p_tag.append(bold_tag) + + section.append(p_tag) + + elif p_child.name == "i": + content = p_child.text + p_tag = clean.new_tag("p") + italic_tag = clean.new_tag("i") + italic_content = clean.new_string(content) + + italic_tag.append(italic_content) + p_tag.append(italic_tag) + + section.append(p_tag) + + elif isinstance(p_child, bs4.element.NavigableString): + content = p_child.text + p_tag = clean.new_tag("p") + p_content = clean.new_string(content) + p_tag.append(p_content) + section.append(p_tag) + + if not list(child.children): + # Some p tags only contain brs, once brs are removed, they are empty and can be removed as well. + child.decompose() + + return section def slugify(value, allow_unicode=False) -> str: @@ -269,8 +329,8 @@ class PartNotFoundError(StoryNotFoundError): ... @backoff.on_exception(backoff.expo, ClientResponseError, max_time=15) async def fetch_story_from_partId( part_id: int, cookies: Optional[dict] = None -) -> Tuple[str, Story]: - """Return a Story ID from a Part ID.""" +) -> Tuple[int, Story]: + """Fetch Story ID from Part ID.""" with start_action(action_type="api_fetch_storyFromPartId"): async with CachedSession( headers=headers, cache=None if cookies else cache @@ -288,12 +348,12 @@ async def fetch_story_from_partId( response.raise_for_status() - return str(body["groupId"]), story_ta.validate_python(body["group"]) + return int(body["groupId"]), story_ta.validate_python(body["group"]) @backoff.on_exception(backoff.expo, ClientResponseError, max_time=15) async def fetch_story(story_id: int, cookies: Optional[dict] = None) -> Story: - """Taking a story_id, return its information from the Wattpad API.""" + """Fetch Story metadata using a Story ID.""" with start_action(action_type="api_fetch_story", story_id=story_id): async with CachedSession( headers=headers, cookies=cookies, cache=None if cookies else cache @@ -315,29 +375,25 @@ async def fetch_story(story_id: int, cookies: Optional[dict] = None) -> Story: @backoff.on_exception(backoff.expo, ClientResponseError, max_time=15) -async def fetch_part_content(part_id: int, cookies: Optional[dict] = None) -> str: - """Return the HTML Content of a Part.""" - with start_action(action_type="api_fetch_partContent", part_id=part_id): +async def fetch_story_content_zip( + story_id: int, cookies: Optional[dict] = None +) -> BytesIO: + """Return a BytesIO stream of a .zip file containing each part's HTML content.""" + with start_action(action_type="api_fetch_storyZip", story_id=story_id): async with CachedSession( - headers=headers, cookies=cookies, cache=None if cookies else cache + headers=headers, + cookies=cookies, + cache=None if cookies else cache, ) as session: async with session.get( - f"https://www.wattpad.com/apiv2/?m=storytext&id={part_id}" + f"https://www.wattpad.com/apiv2/?m=storytext&group_id={story_id}&output=zip" ) as response: - body = await response.text() - - if response.status == 400: - data = json.loads(body) - match data.get("code"): - case 463: # ""Could not find any parts for that story"" - logger.info( - f"{part_id=} for text not found on Wattpad, returning." - ) - raise PartNotFoundError() - response.raise_for_status() - return body + bytes_object = await response.read() + bytes_stream = BytesIO(bytes_object) + + return bytes_stream @backoff.on_exception(backoff.expo, ClientResponseError, max_time=15) @@ -397,7 +453,9 @@ class EPUBGenerator: cover_chapter.set_content('') self.epub.add_item(cover_chapter) - async def add_chapters(self, contents: List[str], download_images: bool = False): + async def add_chapters( + self, contents: List[bs4.Tag], download_images: bool = False + ): """Add chapters to the Epub, downloading images if necessary. Sets the table of contents and spine.""" chapters: List[epub.EpubHtml] = [] @@ -412,8 +470,9 @@ class EPUBGenerator: uid=str(part["id"]).encode(), ) + str_content = content.prettify() if download_images: - soup = BeautifulSoup(content, "lxml") + soup = content async with CachedSession( headers=headers, cache=None @@ -432,11 +491,11 @@ class EPUBGenerator: self.epub.add_item(img) # Fetch image and pack - content = content.replace( + str_content = str_content.replace( str(image["src"]), f"static/{cidx}/{idx}.jpeg" ) - chapter.set_content(content) + chapter.set_content(str_content) self.epub.add_item(chapter) chapters.append(chapter) @@ -535,7 +594,7 @@ class PDFGenerator: with open("./pdf/book.html") as reader: self.template = reader.read() - async def genernate_cover_and_copyright_html( + async def generate_cover_and_copyright_html( self, ) -> str: """Generate Cover and Copyright file, fetch copyright image (cached), use self.cover for cover.""" @@ -618,75 +677,6 @@ id="copyright-license-image">""".format( return about_author - def generate_clean_part_html(self, part: Part, content: str): - chapter_title = part["title"] - chapter_id = part["id"] - - clean = BeautifulSoup( - f""" -
-

{chapter_title}

-
- """, - "html.parser", - ) # html.parser doesn't create / tags automatically - html = BeautifulSoup(content, "lxml") - - section = clean.find("section") - if not section: - raise Exception() - - for child in html.find_all("p"): - for p_child in list(child.children): - if not p_child: - continue - if isinstance(p_child, bs4.element.Tag): - if p_child.name == "br": - p_child.decompose() - elif p_child.name == "img": - src = p_child["src"] - img_tag = clean.new_tag("img") - img_tag["src"] = src - break_tag = clean.new_tag("br") - section.append(img_tag) - section.append(break_tag) - elif p_child.name == "b": - content = p_child.text - p_tag = clean.new_tag("p") - bold_tag = clean.new_tag("b") - bold_content = clean.new_string(content) - - bold_tag.append(bold_content) - p_tag.append(bold_tag) - - section.append(p_tag) - - elif p_child.name == "i": - content = p_child.text - p_tag = clean.new_tag("p") - italic_tag = clean.new_tag("i") - italic_content = clean.new_string(content) - - italic_tag.append(italic_content) - p_tag.append(italic_tag) - - section.append(p_tag) - - elif isinstance(p_child, bs4.element.NavigableString): - content = p_child.text - p_tag = clean.new_tag("p") - p_content = clean.new_string(content) - p_tag.append(p_content) - section.append(p_tag) - - if not list(child.children): - # Some p tags only contain brs, once brs are removed, they are empty and can be removed as well. - child.decompose() - - insert_point = cast(bs4.Tag, self.tree.find("div", {"id": "book"})) - insert_point.append(section) - return str(clean) - def generate_toc(self): ids = [part["id"] for part in self.data["parts"]] clean = BeautifulSoup( @@ -711,17 +701,21 @@ id="copyright-license-image">""".format( insert_point.append(clean) return str(clean) - async def add_chapters(self, contents: List[str], download_images: bool = False): + async def add_chapters( + self, contents: List[bs4.Tag], download_images: bool = False + ): """Add chapters to the PDF, downloading images if necessary. Also add Cover, Copyright, and About the Author pages.""" # # Cover and Copyright Page - await self.genernate_cover_and_copyright_html() + await self.generate_cover_and_copyright_html() await self.generate_about_author_chapter() self.tree = BeautifulSoup(self.template) self.generate_toc() for part, content in zip(self.data["parts"], contents): - self.generate_clean_part_html(part, content) + insert_point = cast(bs4.Tag, self.tree.find("div", {"id": "book"})) + insert_point.append(content) + yield part["title"] # # About the Author page @@ -786,10 +780,6 @@ id="copyright-license-image">""".format( ) ) - # Close files and delete them from tmp - for chapter in chapters: - chapter.file.close() - def dump(self) -> BytesIO: self.file.seek(0) buffer = BytesIO(self.file.read()) diff --git a/src/api/src/main.py b/src/api/src/main.py index 372af1b..76b5fdb 100644 --- a/src/api/src/main.py +++ b/src/api/src/main.py @@ -4,6 +4,7 @@ from typing import Optional import asyncio from pathlib import Path from enum import Enum +from zipfile import ZipFile from eliot import start_action from aiohttp import ClientResponseError from fastapi import FastAPI, Request @@ -19,12 +20,12 @@ from create_book import ( PDFGenerator, fetch_story, fetch_story_from_partId, - fetch_part_content, + fetch_story_content_zip, fetch_image, fetch_cookies, WattpadError, StoryNotFoundError, - clean_part_text, + generate_clean_part_html, slugify, logger, ) @@ -180,9 +181,13 @@ async def handle_download( logger.info(f"Retrieved story metadata and cover ({story_id=})") + story_zip = await fetch_story_content_zip(story_id, cookies) + archive = ZipFile(story_zip, "r") + part_contents = [ - f"

{part['title']}

" - + (clean_part_text(await fetch_part_content(part["id"], cookies=cookies))) + generate_clean_part_html( + part, archive.read(str(part["id"])).decode("utf-8") + ) for part in metadata["parts"] ]