Merge branch 'feature/#31-zip-downloading' into feature/#29-pdf-downloads

This commit is contained in:
TheOnlyWayUp
2024-12-22 10:58:28 +00:00
2 changed files with 107 additions and 112 deletions
+98 -108
View File
@@ -2,7 +2,6 @@ from __future__ import annotations
from typing import List, Optional, Tuple, cast
from typing_extensions import TypedDict
import re
import json
import logging
import tempfile
import unicodedata
@@ -127,16 +126,77 @@ def smart_trim(text: str, max_length: int = 400) -> str:
return to_return
def clean_part_text(text: str) -> str:
"""Remove unnecessary newlines from Text"""
soup = BeautifulSoup(text, "lxml")
def generate_clean_part_html(part: Part, content: str) -> bs4.Tag:
chapter_title = part["title"]
chapter_id = part["id"]
for br in soup.find_all("br"):
clean = BeautifulSoup(
f"""
<section id="section_{chapter_id}" class="chapitre">
<h1 id="{chapter_id}" class="chapter-title">{chapter_title}</h1>
</section>
""",
"html.parser",
) # html.parser doesn't create <html>/<body> tags automatically
html = BeautifulSoup(content, "lxml")
for br in html.find_all("br"):
# Check if no content after br
if not br.next_sibling or br.next_sibling.name in ["br", None]:
br.decompose()
return str(soup)
section = cast(bs4.Tag, clean.find("section"))
if not section:
raise Exception()
for child in html.find_all("p"):
for p_child in list(child.children):
if not p_child:
continue
if isinstance(p_child, bs4.element.Tag):
if p_child.name == "br":
p_child.decompose()
elif p_child.name == "img":
src = p_child["src"]
img_tag = clean.new_tag("img")
img_tag["src"] = src
break_tag = clean.new_tag("br")
section.append(img_tag)
section.append(break_tag)
elif p_child.name == "b":
content = p_child.text
p_tag = clean.new_tag("p")
bold_tag = clean.new_tag("b")
bold_content = clean.new_string(content)
bold_tag.append(bold_content)
p_tag.append(bold_tag)
section.append(p_tag)
elif p_child.name == "i":
content = p_child.text
p_tag = clean.new_tag("p")
italic_tag = clean.new_tag("i")
italic_content = clean.new_string(content)
italic_tag.append(italic_content)
p_tag.append(italic_tag)
section.append(p_tag)
elif isinstance(p_child, bs4.element.NavigableString):
content = p_child.text
p_tag = clean.new_tag("p")
p_content = clean.new_string(content)
p_tag.append(p_content)
section.append(p_tag)
if not list(child.children):
# Some p tags only contain brs, once brs are removed, they are empty and can be removed as well.
child.decompose()
return section
def slugify(value, allow_unicode=False) -> str:
@@ -269,8 +329,8 @@ class PartNotFoundError(StoryNotFoundError): ...
@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15)
async def fetch_story_from_partId(
part_id: int, cookies: Optional[dict] = None
) -> Tuple[str, Story]:
"""Return a Story ID from a Part ID."""
) -> Tuple[int, Story]:
"""Fetch Story ID from Part ID."""
with start_action(action_type="api_fetch_storyFromPartId"):
async with CachedSession(
headers=headers, cache=None if cookies else cache
@@ -288,12 +348,12 @@ async def fetch_story_from_partId(
response.raise_for_status()
return str(body["groupId"]), story_ta.validate_python(body["group"])
return int(body["groupId"]), story_ta.validate_python(body["group"])
@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15)
async def fetch_story(story_id: int, cookies: Optional[dict] = None) -> Story:
"""Taking a story_id, return its information from the Wattpad API."""
"""Fetch Story metadata using a Story ID."""
with start_action(action_type="api_fetch_story", story_id=story_id):
async with CachedSession(
headers=headers, cookies=cookies, cache=None if cookies else cache
@@ -315,29 +375,25 @@ async def fetch_story(story_id: int, cookies: Optional[dict] = None) -> Story:
@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15)
async def fetch_part_content(part_id: int, cookies: Optional[dict] = None) -> str:
"""Return the HTML Content of a Part."""
with start_action(action_type="api_fetch_partContent", part_id=part_id):
async def fetch_story_content_zip(
story_id: int, cookies: Optional[dict] = None
) -> BytesIO:
"""Return a BytesIO stream of a .zip file containing each part's HTML content."""
with start_action(action_type="api_fetch_storyZip", story_id=story_id):
async with CachedSession(
headers=headers, cookies=cookies, cache=None if cookies else cache
headers=headers,
cookies=cookies,
cache=None if cookies else cache,
) as session:
async with session.get(
f"https://www.wattpad.com/apiv2/?m=storytext&id={part_id}"
f"https://www.wattpad.com/apiv2/?m=storytext&group_id={story_id}&output=zip"
) as response:
body = await response.text()
if response.status == 400:
data = json.loads(body)
match data.get("code"):
case 463: # ""Could not find any parts for that story""
logger.info(
f"{part_id=} for text not found on Wattpad, returning."
)
raise PartNotFoundError()
response.raise_for_status()
return body
bytes_object = await response.read()
bytes_stream = BytesIO(bytes_object)
return bytes_stream
@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15)
@@ -397,7 +453,9 @@ class EPUBGenerator:
cover_chapter.set_content('<img src="cover.jpg">')
self.epub.add_item(cover_chapter)
async def add_chapters(self, contents: List[str], download_images: bool = False):
async def add_chapters(
self, contents: List[bs4.Tag], download_images: bool = False
):
"""Add chapters to the Epub, downloading images if necessary. Sets the table of contents and spine."""
chapters: List[epub.EpubHtml] = []
@@ -412,8 +470,9 @@ class EPUBGenerator:
uid=str(part["id"]).encode(),
)
str_content = content.prettify()
if download_images:
soup = BeautifulSoup(content, "lxml")
soup = content
async with CachedSession(
headers=headers, cache=None
@@ -432,11 +491,11 @@ class EPUBGenerator:
self.epub.add_item(img)
# Fetch image and pack
content = content.replace(
str_content = str_content.replace(
str(image["src"]), f"static/{cidx}/{idx}.jpeg"
)
chapter.set_content(content)
chapter.set_content(str_content)
self.epub.add_item(chapter)
chapters.append(chapter)
@@ -535,7 +594,7 @@ class PDFGenerator:
with open("./pdf/book.html") as reader:
self.template = reader.read()
async def genernate_cover_and_copyright_html(
async def generate_cover_and_copyright_html(
self,
) -> str:
"""Generate Cover and Copyright file, fetch copyright image (cached), use self.cover for cover."""
@@ -618,75 +677,6 @@ id="copyright-license-image">""".format(
return about_author
def generate_clean_part_html(self, part: Part, content: str):
chapter_title = part["title"]
chapter_id = part["id"]
clean = BeautifulSoup(
f"""
<section id="section_{chapter_id}" class="chapitre">
<h1 id="{chapter_id}" class="chapter-title">{chapter_title}</h1>
</section>
""",
"html.parser",
) # html.parser doesn't create <html>/<body> tags automatically
html = BeautifulSoup(content, "lxml")
section = clean.find("section")
if not section:
raise Exception()
for child in html.find_all("p"):
for p_child in list(child.children):
if not p_child:
continue
if isinstance(p_child, bs4.element.Tag):
if p_child.name == "br":
p_child.decompose()
elif p_child.name == "img":
src = p_child["src"]
img_tag = clean.new_tag("img")
img_tag["src"] = src
break_tag = clean.new_tag("br")
section.append(img_tag)
section.append(break_tag)
elif p_child.name == "b":
content = p_child.text
p_tag = clean.new_tag("p")
bold_tag = clean.new_tag("b")
bold_content = clean.new_string(content)
bold_tag.append(bold_content)
p_tag.append(bold_tag)
section.append(p_tag)
elif p_child.name == "i":
content = p_child.text
p_tag = clean.new_tag("p")
italic_tag = clean.new_tag("i")
italic_content = clean.new_string(content)
italic_tag.append(italic_content)
p_tag.append(italic_tag)
section.append(p_tag)
elif isinstance(p_child, bs4.element.NavigableString):
content = p_child.text
p_tag = clean.new_tag("p")
p_content = clean.new_string(content)
p_tag.append(p_content)
section.append(p_tag)
if not list(child.children):
# Some p tags only contain brs, once brs are removed, they are empty and can be removed as well.
child.decompose()
insert_point = cast(bs4.Tag, self.tree.find("div", {"id": "book"}))
insert_point.append(section)
return str(clean)
def generate_toc(self):
ids = [part["id"] for part in self.data["parts"]]
clean = BeautifulSoup(
@@ -711,17 +701,21 @@ id="copyright-license-image">""".format(
insert_point.append(clean)
return str(clean)
async def add_chapters(self, contents: List[str], download_images: bool = False):
async def add_chapters(
self, contents: List[bs4.Tag], download_images: bool = False
):
"""Add chapters to the PDF, downloading images if necessary. Also add Cover, Copyright, and About the Author pages."""
# # Cover and Copyright Page
await self.genernate_cover_and_copyright_html()
await self.generate_cover_and_copyright_html()
await self.generate_about_author_chapter()
self.tree = BeautifulSoup(self.template)
self.generate_toc()
for part, content in zip(self.data["parts"], contents):
self.generate_clean_part_html(part, content)
insert_point = cast(bs4.Tag, self.tree.find("div", {"id": "book"}))
insert_point.append(content)
yield part["title"]
# # About the Author page
@@ -786,10 +780,6 @@ id="copyright-license-image">""".format(
)
)
# Close files and delete them from tmp
for chapter in chapters:
chapter.file.close()
def dump(self) -> BytesIO:
self.file.seek(0)
buffer = BytesIO(self.file.read())
+9 -4
View File
@@ -4,6 +4,7 @@ from typing import Optional
import asyncio
from pathlib import Path
from enum import Enum
from zipfile import ZipFile
from eliot import start_action
from aiohttp import ClientResponseError
from fastapi import FastAPI, Request
@@ -19,12 +20,12 @@ from create_book import (
PDFGenerator,
fetch_story,
fetch_story_from_partId,
fetch_part_content,
fetch_story_content_zip,
fetch_image,
fetch_cookies,
WattpadError,
StoryNotFoundError,
clean_part_text,
generate_clean_part_html,
slugify,
logger,
)
@@ -180,9 +181,13 @@ async def handle_download(
logger.info(f"Retrieved story metadata and cover ({story_id=})")
story_zip = await fetch_story_content_zip(story_id, cookies)
archive = ZipFile(story_zip, "r")
part_contents = [
f"<h1>{part['title']}</h1>"
+ (clean_part_text(await fetch_part_content(part["id"], cookies=cookies)))
generate_clean_part_html(
part, archive.read(str(part["id"])).decode("utf-8")
)
for part in metadata["parts"]
]