feat(api): Add parsing engine (#68 - @theonlywayup, @aaronbendaniel)

Add parsing engine
This commit is contained in:
Dhanush R
2025-06-11 13:55:20 +05:30
committed by GitHub
38 changed files with 1337 additions and 1657 deletions
+2 -2
View File
@@ -1,8 +1,8 @@
__pycache__
venv
*epub
*pdf
*html
*.pdf
# *html
data
*ipynb
build
+2 -2
View File
@@ -9,7 +9,7 @@ COPY src/frontend/. .
RUN npm run build
# Thanks https://stackoverflow.com/q/76988450
FROM python:3.10-slim
FROM python:3.13-slim
WORKDIR /app
@@ -38,7 +38,7 @@ WORKDIR /app
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
COPY src/api/requirements.txt requirements.txt
COPY src/api/exiftool.config exiftool.config
COPY src/api/src/create_book/generators/pdf/exiftool.config exiftool.config
RUN uv pip install -r requirements.txt --system
COPY --from=0 /build/build /app/src/build
COPY src/api/src src
+1 -1
View File
@@ -1 +1 @@
3.10
3.13
+10 -3
View File
@@ -3,7 +3,7 @@ name = "api"
version = "0.1.0"
description = "Wattpad Downloader API"
readme = "../../README.md"
requires-python = ">=3.10"
requires-python = ">=3.13"
dependencies = [
"aiohttp>=3.9.1",
"rich>=13.9.4",
@@ -19,10 +19,17 @@ dependencies = [
"uvicorn>=0.32.1",
"pyexiftool>=0.5.6",
"weasyprint>=63.0",
"jinja2>=3.1.6",
]
[tool.ruff.lint]
ignore = ['E402']
ignore = ['E402'] # module import not at top of file
[tool.uv.sources]
aiohttp-client-cache = { git = "https://github.com/TheOnlyWayUp/aiohttp-client-cache.git", rev = "keydb-ttl" }
aiohttp-client-cache = { git = "https://github.com/TheOnlyWayUp/aiohttp-client-cache.git", rev = "keydb-ttl" } # Fork which leverages keydb's EXPIREMEMBER feature for TTLs on Hash members.
[dependency-groups]
dev = [
"ipykernel>=6.29.5",
"ruff>=0.11.12",
]
+1
View File
@@ -31,6 +31,7 @@ frozenlist==1.4.1
h11==0.14.0
idna==3.6
itsdangerous==2.2.0
jinja2==3.1.6
jmespath==1.0.1
lxml==5.3.0
markdown-it-py==3.0.0
-782
View File
@@ -1,782 +0,0 @@
from __future__ import annotations
from typing import List, Optional, Tuple, cast
from typing_extensions import TypedDict
import re
import logging
import tempfile
import unicodedata
from os import environ
from io import BytesIO
from enum import Enum
from base64 import b64encode
import bs4
import backoff
from weasyprint import HTML, CSS, default_url_fetcher
from weasyprint.text.fonts import FontConfiguration
from ebooklib import epub
from exiftool import ExifTool
from eliot import to_file, start_action
from eliot.stdlib import EliotHandler
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from pydantic import TypeAdapter, model_validator, field_validator
from pydantic_settings import BaseSettings
from aiohttp import ClientResponseError
from aiohttp_client_cache.session import CachedSession
from aiohttp_client_cache import FileBackend, RedisBackend
load_dotenv(override=True)
handler = EliotHandler()
logging.getLogger("fastapi").setLevel(logging.INFO)
logging.getLogger("fastapi").addHandler(handler)
exiftool_logger = logging.getLogger("exiftool")
exiftool_logger.addHandler(handler)
logger = logging.Logger("wpd")
logger.addHandler(handler)
if environ.get("DEBUG"):
to_file(open("eliot.log", "wb"))
# --- #
class CacheTypes(Enum):
file = "file"
redis = "redis"
class Config(BaseSettings):
USE_CACHE: bool = True
CACHE_TYPE: CacheTypes = CacheTypes.file
REDIS_CONNECTION_URL: str = ""
@field_validator("USE_CACHE", mode="before")
def validate_use_cache(cls, value):
# Return default if value is an empty string
if value == "":
return True # Default value for USE_CACHE
return value
@field_validator("CACHE_TYPE", mode="before")
def validate_cache_type(cls, value):
# Thanks https://stackoverflow.com/a/78157474
if value == "":
return "file"
return value
@model_validator(mode="after")
def prevent_mismatched_redis_url(self):
match self.CACHE_TYPE:
case CacheTypes.file:
if self.REDIS_CONNECTION_URL:
raise ValueError(
"REDIS_CONNECTION_URL provided when File cache selected. To use Redis as a cache, set CACHE_TYPE=redis."
)
case CacheTypes.redis:
if not self.REDIS_CONNECTION_URL:
raise ValueError(
"REDIS_CONNECTION_URL not provided when Redis cache selected. To use File cache, set CACHE_TYPE=file."
)
return self
config = Config()
# --- #
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"
}
if config.USE_CACHE:
match config.CACHE_TYPE:
case CacheTypes.file:
cache = FileBackend(use_temp=True, expire_after=43200) # 12 hours
case CacheTypes.redis:
cache = RedisBackend(
cache_name="wpd-aiohttp-cache",
address=config.REDIS_CONNECTION_URL,
expire_after=43200, # 12 hours
)
else:
cache = None
logger.info(f"Using {cache=}")
# --- Utilities --- #
def smart_trim(text: str, max_length: int = 400) -> str:
"""Truncate a string intelligently at newlines. Coherence and max-length adherence."""
chunks = [t for t in text.split("\n") if t]
to_return = ""
for chunk in chunks:
if len(to_return) + len(chunk) < max_length:
to_return = chunk + "<br />"
else:
to_return = to_return.rstrip("<br />")
break
return to_return
def generate_clean_part_html(part: Part, content: str) -> bs4.Tag:
"""Rebuild HTML Structure for a Part."""
chapter_title = part["title"]
chapter_id = part["id"]
clean = BeautifulSoup(
f"""
<section id="section_{chapter_id}" class="chapitre">
<h1 id="{chapter_id}" class="chapter-title">{chapter_title}</h1>
</section>
""",
"html.parser",
) # html.parser doesn't create <html>/<body> tags automatically
html = BeautifulSoup(content, "lxml")
for br in html.find_all("br"):
# Check if no content after br
if not br.next_sibling or br.next_sibling.name in ["br", None]:
br.decompose()
section = cast(bs4.Tag, clean.find("section"))
if not section:
raise Exception()
for child in html.find_all("p"):
current_paragraph = clean.new_tag("p")
# Attempt to carry over paragraph styling
current_paragraph["style"] = child.get("style", "text-align: left;")
for p_child in list(child.children):
if not p_child:
continue
if isinstance(p_child, bs4.element.Tag):
if p_child.name == "br":
p_child.decompose()
elif p_child.name == "img":
src = p_child["src"]
img_tag = clean.new_tag("img")
img_tag["src"] = src
section.append(img_tag)
section.append(clean.new_tag("br"))
elif p_child.name in ["b", "i"]:
styled_tag = clean.new_tag(p_child.name)
styled_content = clean.new_string(p_child.text)
styled_tag.append(styled_content)
current_paragraph.append(styled_tag)
else:
# Append any other tags as-is
current_paragraph.append(p_child)
elif isinstance(p_child, bs4.element.NavigableString):
content = clean.new_string(p_child)
current_paragraph.append(content)
if current_paragraph.contents:
section.append(current_paragraph)
if not list(child.children):
# Some p tags only contain brs, once brs are removed, they are empty and can be removed as well.
child.decompose()
return section
def slugify(value, allow_unicode=False) -> str:
"""
Taken from https://github.com/django/django/blob/master/django/utils/text.py
Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated
dashes to single dashes. Remove characters that aren't alphanumerics,
underscores, or hyphens. Convert to lowercase. Also strip leading and
trailing whitespace, dashes, and underscores.
Thanks https://stackoverflow.com/a/295466.
"""
value = str(value)
if allow_unicode:
value = unicodedata.normalize("NFKC", value)
else:
value = (
unicodedata.normalize("NFKD", value)
.encode("ascii", "ignore")
.decode("ascii")
)
value = re.sub(r"[^\w\s-]", "", value.lower())
return re.sub(r"[-\s]+", "-", value).strip("-_")
async def fetch_cookies(username: str, password: str) -> dict:
# source: https://github.com/TheOnlyWayUp/WP-DM-Export/blob/dd4c7c51cb43f2108e0f63fc10a66cd24a740e4e/src/API/src/main.py#L25-L58
"""Retrieves authorization cookies from Wattpad by logging in with user creds.
Args:
username (str): Username.
password (str): Password.
Raises:
ValueError: Bad status code.
ValueError: No cookies returned.
Returns:
dict: Authorization cookies.
"""
with start_action(action_type="api_fetch_cookies"):
async with CachedSession(headers=headers, cache=None) as session:
async with session.post(
"https://www.wattpad.com/auth/login?nextUrl=%2F&_data=routes%2Fauth.login",
data={
"username": username.lower(),
"password": password,
}, # the username.lower() is for caching
) as response:
if response.status != 204:
raise ValueError("Not a 204.")
cookies = {
k: v.value
for k, v in response.cookies.items() # Thanks https://stackoverflow.com/a/32281245
}
if not cookies:
raise ValueError("No cookies.")
return cookies
# --- Models --- #
class CopyrightData(TypedDict):
name: str
statement: str
freedoms: str
printing: str
image_url: Optional[str]
class Language(TypedDict):
name: str
class User(TypedDict):
username: str
avatar: str
description: str
class Part(TypedDict):
id: int
title: str
class Story(TypedDict):
id: str
title: str
createDate: str
modifyDate: str
language: Language
user: User
description: str
cover: str
completed: bool
tags: List[str]
mature: bool
url: str
parts: List[Part]
isPaywalled: bool
copyright: int
story_ta = TypeAdapter(Story)
# --- Exceptions --- #
class WattpadError(Exception):
"""Base Exception class for Wattpad related errors."""
class StoryNotFoundError(WattpadError):
"""Display the "This story was not found" error to the user."""
...
class PartNotFoundError(StoryNotFoundError): ...
# --- API Calls --- #
@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15)
async def fetch_story_from_partId(
part_id: int, cookies: Optional[dict] = None
) -> Tuple[int, Story]:
"""Fetch Story metadata from a Part ID."""
with start_action(action_type="api_fetch_storyFromPartId"):
async with CachedSession(
headers=headers, cache=None if cookies else cache
) as session: # Don't cache requests with Cookies.
async with session.get(
f"https://www.wattpad.com/api/v3/story_parts/{part_id}?fields=groupId,group(tags,id,title,createDate,modifyDate,language(name),description,completed,mature,url,isPaywalled,user(username,avatar,description),parts(id,title),cover,copyright)"
) as response:
body = await response.json()
if response.status == 400:
match body.get("error_code"):
case 1020: # "Story part not found"
logger.info(f"{part_id=} not found on Wattpad, returning.")
raise PartNotFoundError()
response.raise_for_status()
return int(body["groupId"]), story_ta.validate_python(body["group"])
@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15)
async def fetch_story(story_id: int, cookies: Optional[dict] = None) -> Story:
"""Fetch Story metadata from a Story ID."""
with start_action(action_type="api_fetch_story", story_id=story_id):
async with CachedSession(
headers=headers, cookies=cookies, cache=None if cookies else cache
) as session:
async with session.get(
f"https://www.wattpad.com/api/v3/stories/{story_id}?fields=tags,id,title,createDate,modifyDate,language(name),description,completed,mature,url,isPaywalled,user(username,avatar,description),parts(id,title),cover,copyright"
) as response:
body = await response.json()
if response.status == 400:
match body.get("error_code"):
case 1017: # "Story not found"
logger.info(f"{story_id=} not found on Wattpad, returning.")
raise StoryNotFoundError()
response.raise_for_status()
return story_ta.validate_python(body)
@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15)
async def fetch_story_content_zip(
story_id: int, cookies: Optional[dict] = None
) -> BytesIO:
"""BytesIO Stream of an Archive of Part Contents for a Story."""
with start_action(action_type="api_fetch_storyZip", story_id=story_id):
async with CachedSession(
headers=headers,
cookies=cookies,
cache=None if cookies else cache,
) as session:
async with session.get(
f"https://www.wattpad.com/apiv2/?m=storytext&group_id={story_id}&output=zip"
) as response:
response.raise_for_status()
bytes_stream = BytesIO(await response.read())
return bytes_stream
@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15)
async def fetch_image(url: str, should_cache: bool = False) -> bytes:
"""Fetch image bytes."""
with start_action(action_type="api_fetch_image", url=url):
async with CachedSession(
headers=headers, cache=cache if should_cache else None
) as session: # Don't cache images.
async with session.get(url) as response:
response.raise_for_status()
body = await response.read()
return body
# --- Generation --- #
class EPUBGenerator:
"""EPUB Generation utilities"""
def __init__(self, data: Story, cover: bytes):
"""Initialize EPUBGenerator. Create epub.EpubBook() and set metadata and cover."""
self.epub = epub.EpubBook()
self.data = data
self.cover = cover
# set metadata, defined in https://www.dublincore.org/specifications/dublin-core/dcmi-terms/#section-2
self.epub.add_author(data["user"]["username"])
self.epub.add_metadata("DC", "title", data["title"])
self.epub.add_metadata("DC", "description", data["description"])
self.epub.add_metadata("DC", "date", data["createDate"])
self.epub.add_metadata("DC", "modified", data["modifyDate"])
self.epub.add_metadata("DC", "language", data["language"]["name"])
self.epub.add_metadata(
None, "meta", "", {"name": "tags", "content": ", ".join(data["tags"])}
)
self.epub.add_metadata(
None, "meta", "", {"name": "mature", "content": str(int(data["mature"]))}
)
self.epub.add_metadata(
None,
"meta",
"",
{"name": "completed", "content": str(int(data["completed"]))},
)
# Set cover
self.epub.set_cover("cover.jpg", cover)
cover_chapter = epub.EpubHtml(
file_name="titlepage.xhtml", # Standard for cover page
)
cover_chapter.set_content('<img src="cover.jpg">')
self.epub.add_item(cover_chapter)
async def add_chapters(
self, contents: List[bs4.Tag], download_images: bool = False
):
"""Add chapters to the Epub, downloading images if necessary. Sets the table of contents and spine."""
chapters: List[epub.EpubHtml] = []
for cidx, (part, content) in enumerate(zip(self.data["parts"], contents)):
title = part["title"]
# Thanks https://eu17.proxysite.com/process.php?d=5VyWYcoQl%2BVF0BYOuOavtvjOloFUZz2BJ%2Fepiusk6Nz7PV%2B9i8rs7cFviGftrBNll%2B0a3qO7UiDkTt4qwCa0fDES&b=1
chapter = epub.EpubHtml(
title=title,
file_name=f"{cidx}_{part['id']}.xhtml", # See issue #30
lang=self.data["language"]["name"],
uid=str(part["id"]).encode(),
)
str_content = content.prettify()
if download_images:
soup = content
async with CachedSession(
headers=headers, cache=None
) as session: # Don't cache images.
for idx, image in enumerate(soup.find_all("img")):
if not image["src"]:
continue
# Find all image tags and filter for those with sources
async with session.get(image["src"]) as response:
img = epub.EpubImage(
media_type="image/jpeg",
content=await response.read(),
file_name=f"static/{cidx}/{idx}.jpeg",
)
self.epub.add_item(img)
# Fetch image and pack
str_content = str_content.replace(
str(image["src"]), f"static/{cidx}/{idx}.jpeg"
)
chapter.set_content(str_content)
self.epub.add_item(chapter)
chapters.append(chapter)
yield title
self.epub.toc = chapters
# Thanks https://github.com/aerkalov/ebooklib/blob/master/samples/09_create_image/create.py
self.epub.add_item(epub.EpubNcx())
self.epub.add_item(epub.EpubNav())
# create spine
self.epub.spine = ["nav"] + chapters
def dump(self) -> BytesIO:
# Thanks https://stackoverflow.com/a/75398222
buffer = BytesIO()
epub.write_epub(buffer, self.epub)
buffer.seek(0)
return buffer
class PDFGenerator:
"""PDF Generation utilities"""
def __init__(self, data: Story, cover: bytes):
"""Initialize PDGenerator, create PDF Temporary file."""
self.data = data
self.file = tempfile.NamedTemporaryFile(suffix=".pdf", delete=True)
self.cover = cover
self.content: str = ""
self.copyright = {
1: {
"name": "All Rights Reserved",
"statement": "©️ {published_year} by {username}. All Rights Reserved.",
"freedoms": "No reuse, redistribution, or modification without permission.",
"printing": "Not allowed without explicit permission.",
"image_url": None,
},
2: {
"name": "Public Domain",
"statement": "This work is in the public domain. Originally published in {published_year} by {username}.",
"freedoms": "Free to use for any purpose without permission.",
"printing": "Allowed for personal or commercial purposes.",
"image_url": "http://mirrors.creativecommons.org/presskit/buttons/88x31/png/cc-zero.png",
},
3: {
"name": "Creative Commons Attribution (CC-BY)",
"statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution 4.0 International License.",
"freedoms": "Allows reuse, redistribution, and modification with credit to the author.",
"printing": "Allowed with proper credit.",
"image_url": "https://mirrors.creativecommons.org/presskit/buttons/88x31/png/by.png",
},
4: {
"name": "CC Attribution NonCommercial (CC-BY-NC)",
"statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.",
"freedoms": "Allows reuse and modification for non-commercial purposes with credit.",
"printing": "Allowed for non-commercial purposes with proper credit.",
"image_url": "http://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-nc.png",
},
5: {
"name": "CC Attribution NonCommercial NoDerivs (CC-BY-NC-ND)",
"statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 4.0 International License.",
"freedoms": "Allows sharing in original form for non-commercial purposes with credit; no modifications allowed.",
"printing": "Allowed for non-commercial purposes in original form with proper credit.",
"image_url": "http://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-nc-nd.png",
},
6: {
"name": "CC Attribution NonCommercial ShareAlike (CC-BY-NC-SA)",
"statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.",
"freedoms": "Allows reuse and modification for non-commercial purposes under the same license, with credit.",
"printing": "Allowed for non-commercial purposes with proper credit under the same license.",
"image_url": "http://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-nc-sa.png",
},
7: {
"name": "CC Attribution ShareAlike (CC-BY-SA)",
"statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License.",
"freedoms": "Allows reuse and modification for any purpose under the same license, with credit.",
"printing": "Allowed with proper credit under the same license.",
"image_url": "https://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-sa.png",
},
8: {
"name": "CC Attribution NoDerivs (CC-BY-ND)",
"statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NoDerivs 4.0 International License.",
"freedoms": "Allows sharing in original form for any purpose with credit; no modifications allowed.",
"printing": "Allowed in original form with proper credit.",
"image_url": "https://mirrors.creativecommons.org/presskit/buttons/88x31/png/by-nd.png",
},
}
with open("./pdf/stylesheet.css") as reader:
self.stylesheet = reader.read()
with open("./pdf/book.html") as reader:
self.template = reader.read()
async def generate_cover_and_copyright_html(
self,
) -> str:
"""Generate Cover and Copyright file, fetch copyright image (cached), use self.cover for cover."""
copyright_data = self.copyright[self.data["copyright"]]
template = self.template
about_copyright = (
template.replace(
"{statement}",
copyright_data["statement"].format(
username=self.data["user"]["username"],
published_year=self.data["createDate"].split("-", 2)[0],
),
)
.replace("{author}", self.data["user"]["username"])
.replace("{freedoms}", copyright_data["freedoms"])
.replace(
"{printing}",
copyright_data["printing"],
)
.replace("{book_id}", self.data["id"])
.replace("{book_title}", self.data["title"])
)
copyright_image = (
await fetch_image(copyright_data["image_url"], should_cache=True)
if copyright_data["image_url"]
else None
)
image_block = (
"""<img src="{image_url}"
alt="{name}"
width="88"
height="31"
id="copyright-license-image">""".format(
image_url=f"data:image/jpg;base64,{b64encode(copyright_image).decode()}",
name=copyright_data["name"],
)
if copyright_image
else ""
)
about_copyright = (
about_copyright.replace(
"{copyright_image}",
image_block,
)
if image_block
else about_copyright.replace("{copyright_image}", "")
)
about_copyright = about_copyright.replace(
"{cover}", f"data:image/jpg;base64,{b64encode(self.cover).decode()}"
)
self.template = about_copyright
return about_copyright
async def generate_about_author_chapter(self) -> str:
"""Generate About the Author file, fetch avatar."""
author_avatar = (
await fetch_image(
self.data["user"]["avatar"].replace("128", "512")
) # Increase image resolution
if self.data["user"]["avatar"]
else None
)
about_author = self.template.replace(
"{username}", self.data["user"]["username"]
).replace("{description}", smart_trim(self.data["user"]["description"]))
about_author = (
about_author.replace(
"{avatar}",
f"""
<img src="data:image/jpg;base64,{b64encode(author_avatar).decode()}" alt="Author's profile picture" id="author-profile-picture">""",
)
if author_avatar
else about_author.replace("{avatar}", "")
)
self.template = about_author
return about_author
def generate_toc(self):
ids = [part["id"] for part in self.data["parts"]]
clean = BeautifulSoup(
"""
<section id="contents" class="toc">
<h1>Table of Contents</h1>
<ul></ul>
</section>
""",
"html.parser",
) # html.parser doesn't create <html>/<body> tags automatically
ul = cast(bs4.Tag, clean.find("ul"))
for part_id in ids:
li = clean.new_tag("li")
a = clean.new_tag("a")
a["href"] = f"#{part_id}"
li.append(a)
ul.append(li)
insert_point = cast(bs4.Tag, self.tree.find("div", {"id": "book"}))
insert_point.append(clean)
return str(clean)
async def add_chapters(
self, contents: List[bs4.Tag], download_images: bool = False
):
"""Add chapters to the PDF, downloading images if necessary. Also add Cover, Copyright, and About the Author pages."""
# # Cover and Copyright Page
await self.generate_cover_and_copyright_html()
await self.generate_about_author_chapter()
self.tree = BeautifulSoup(self.template, "lxml")
self.generate_toc()
for part, content in zip(self.data["parts"], contents):
insert_point = cast(bs4.Tag, self.tree.find("div", {"id": "book"}))
insert_point.append(content)
yield part["title"]
# # About the Author page
# about_author_html = await self.generate_about_author_chapter()
# chapters.insert(0, cover_and_copyright_html)
# chapters.append(about_author_html)
with start_action(
action_type="generate_pdf",
output_filename=self.file.name,
title=self.data["title"],
):
# PDF Generation with wkhtmltopdf, written to self.file
# At this stage, we have a bunch of HTML Files representing all the chapters that need to be generated. PDFKit handles ToC generation, so that's not included.
font_config = FontConfiguration()
stylesheet_obj = CSS(string=self.stylesheet, font_config=font_config)
html_obj = HTML(string=str(self.tree))
html_obj.write_pdf(
self.file.name, stylesheets=[stylesheet_obj], font_config=font_config
)
with start_action(action_type="add_metadata") as action:
# Metadata generation with Exiftool
clean_description = (
self.data["description"].strip().replace("\n", "$/")
) # exiftool doesn't parse \ns correctly, they support $/ for the same instead. `&#xa;` is another option.
action.log(f"clean_description: {clean_description}")
metadata = {
"Author": self.data["user"]["username"],
"Title": self.data["title"],
"Subject": clean_description,
"CreationDate": self.data["createDate"],
"ModDate": self.data["modifyDate"],
"Keywords": ",".join(self.data["tags"]),
"Language": self.data["language"]["name"],
"Completed": self.data["completed"],
"MatureContent": self.data["mature"],
"Producer": "Dhanush Rambhatla (TheOnlyWayUp - https://rambhat.la) and WattpadDownloader",
} # As per https://exiftool.org/TagNames/PDF.html
action.log(f"options: {metadata}")
with ExifTool(
config_file="../exiftool.config", logger=exiftool_logger
) as et:
# Custom configuration adds Completed and MatureContent tags.
# exiftool logger logs executed command
et.execute(
*(
[f"-{key}={value}" for key, value in metadata.items()]
+ [
"-overwrite_original",
self.file.file.name,
]
)
)
def dump(self) -> BytesIO:
self.file.seek(0)
buffer = BytesIO(self.file.read())
self.file.close()
return buffer
# ------ #
+13
View File
@@ -0,0 +1,13 @@
# ruff: noqa: F401
from .create_book import (
fetch_cookies,
fetch_story,
fetch_story_content_zip,
fetch_story_from_partId,
)
from .exceptions import PartNotFoundError, StoryNotFoundError, WattpadError
from .generators import EPUBGenerator, PDFGenerator
from .logs import logger
from .parser import fetch_image
from .utils import slugify
+46
View File
@@ -0,0 +1,46 @@
from enum import Enum
from pydantic import field_validator, model_validator
from pydantic_settings import BaseSettings
class CacheTypes(Enum):
file = "file"
redis = "redis"
class Config(BaseSettings):
# Values can be overriden by envvars.
USE_CACHE: bool = True
CACHE_TYPE: CacheTypes = CacheTypes.file
REDIS_CONNECTION_URL: str = ""
@field_validator("USE_CACHE", mode="before")
def validate_use_cache(cls, value):
# Return default if value is an empty string
if value == "":
return True # Default value for USE_CACHE
return value
@field_validator("CACHE_TYPE", mode="before")
def validate_cache_type(cls, value):
# Thanks https://stackoverflow.com/a/78157474
if value == "":
return "file"
return value
@model_validator(mode="after")
def prevent_mismatched_redis_url(self):
match self.CACHE_TYPE:
case CacheTypes.file:
if self.REDIS_CONNECTION_URL:
raise ValueError(
"REDIS_CONNECTION_URL provided when File cache selected. To use Redis as a cache, set CACHE_TYPE=redis."
)
case CacheTypes.redis:
if not self.REDIS_CONNECTION_URL:
raise ValueError(
"REDIS_CONNECTION_URL not provided when Redis cache selected. To use File cache, set CACHE_TYPE=file."
)
return self
+129
View File
@@ -0,0 +1,129 @@
from __future__ import annotations
from io import BytesIO
from typing import Optional
import backoff
from aiohttp import ClientResponseError
from aiohttp_client_cache.session import CachedSession
from eliot import start_action
from pydantic import TypeAdapter
from .exceptions import PartNotFoundError, StoryNotFoundError
from .logs import logger
from .models import Story
from .vars import cache, headers
story_ta = TypeAdapter(Story)
# --- #
async def fetch_cookies(username: str, password: str) -> dict:
# source: https://github.com/TheOnlyWayUp/WP-DM-Export/blob/dd4c7c51cb43f2108e0f63fc10a66cd24a740e4e/src/API/src/main.py#L25-L58
"""Retrieves authorization cookies from Wattpad by logging in with user creds.
Args:
username (str): Username.
password (str): Password.
Raises:
ValueError: Bad status code.
ValueError: No cookies returned.
Returns:
dict: Authorization cookies.
"""
with start_action(action_type="api_fetch_cookies"):
async with CachedSession(headers=headers, cache=None) as session:
async with session.post(
"https://www.wattpad.com/auth/login?nextUrl=%2F&_data=routes%2Fauth.login",
data={
"username": username.lower(),
"password": password,
}, # the username.lower() is for caching
) as response:
if response.status != 204:
raise ValueError("Not a 204.")
cookies = {
k: v.value
for k, v in response.cookies.items() # Thanks https://stackoverflow.com/a/32281245
}
if not cookies:
raise ValueError("No cookies.")
return cookies
# --- API Calls --- #
@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15)
async def fetch_story_from_partId(
part_id: int, cookies: Optional[dict] = None
) -> tuple[int, Story]:
"""Fetch Story metadata from a Part ID."""
with start_action(action_type="api_fetch_storyFromPartId"):
async with CachedSession(
headers=headers, cache=None if cookies else cache
) as session: # Don't cache requests with Cookies.
async with session.get(
f"https://www.wattpad.com/api/v3/story_parts/{part_id}?fields=groupId,group(tags,id,title,createDate,modifyDate,language(name),description,completed,mature,url,isPaywalled,user(username,avatar,description),parts(id,title),cover,copyright)"
) as response:
body = await response.json()
if response.status == 400:
match body.get("error_code"):
case 1020: # "Story part not found"
logger.info(f"{part_id=} not found on Wattpad, returning.")
raise PartNotFoundError()
response.raise_for_status()
return int(body["groupId"]), story_ta.validate_python(body["group"])
@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15)
async def fetch_story(story_id: int, cookies: Optional[dict] = None) -> Story:
"""Fetch Story metadata from a Story ID."""
with start_action(action_type="api_fetch_story", story_id=story_id):
async with CachedSession(
headers=headers, cookies=cookies, cache=None if cookies else cache
) as session:
async with session.get(
f"https://www.wattpad.com/api/v3/stories/{story_id}?fields=tags,id,title,createDate,modifyDate,language(name),description,completed,mature,url,isPaywalled,user(username,avatar,description),parts(id,title),cover,copyright"
) as response:
body = await response.json()
if response.status == 400:
match body.get("error_code"):
case 1017: # "Story not found"
logger.info(f"{story_id=} not found on Wattpad, returning.")
raise StoryNotFoundError()
response.raise_for_status()
return story_ta.validate_python(body)
@backoff.on_exception(backoff.expo, ClientResponseError, max_time=15)
async def fetch_story_content_zip(
story_id: int, cookies: Optional[dict] = None
) -> BytesIO:
"""BytesIO Stream of an Archive of Part Contents for a Story."""
with start_action(action_type="api_fetch_storyZip", story_id=story_id):
async with CachedSession(
headers=headers,
cookies=cookies,
cache=None if cookies else cache,
) as session:
async with session.get(
f"https://www.wattpad.com/apiv2/?m=storytext&group_id={story_id}&output=zip"
) as response:
response.raise_for_status()
bytes_stream = BytesIO(await response.read())
return bytes_stream
+12
View File
@@ -0,0 +1,12 @@
class WattpadError(Exception):
"""Base Exception class for Wattpad related errors."""
class StoryNotFoundError(WattpadError):
"""Display the "This story was not found" error to the user."""
...
class PartNotFoundError(StoryNotFoundError):
...
@@ -0,0 +1,4 @@
# ruff: noqa: F401
from .epub import EPUBGenerator
from .pdf import PDFGenerator
+108
View File
@@ -0,0 +1,108 @@
from io import BytesIO
from bs4 import BeautifulSoup
from ebooklib import epub
from ..models import Story
from .types import AbstractGenerator
class EPUBGenerator(AbstractGenerator):
def __init__(
self,
metadata: Story,
part_trees: list[BeautifulSoup],
cover: bytes,
images: list[list[bytes | None]],
):
self.story = metadata
self.parts = part_trees
self.cover = cover
self.images = images
self.book: epub.EpubBook = epub.EpubBook()
def add_metadata(self):
"""Add metadata to epub."""
self.book.add_author(self.story["user"]["username"])
self.book.add_metadata("DC", "title", self.story["title"])
self.book.add_metadata("DC", "description", self.story["description"])
self.book.add_metadata("DC", "date", self.story["createDate"])
self.book.add_metadata("DC", "modified", self.story["modifyDate"])
self.book.add_metadata("DC", "language", self.story["language"]["name"])
self.book.add_metadata(
None, "meta", "", {"name": "tags", "content": ", ".join(self.story["tags"])}
)
self.book.add_metadata(
None,
"meta",
"",
{"name": "mature", "content": str(int(self.story["mature"]))},
)
self.book.add_metadata(
None,
"meta",
"",
{"name": "completed", "content": str(int(self.story["completed"]))},
)
def add_cover(self):
"""Add cover to epub."""
self.book.set_cover("cover.jpg", self.cover)
cover_chapter = epub.EpubHtml(
file_name="titlepage.xhtml", # Standard for cover page
)
cover_chapter.set_content('<img src="cover.jpg">')
self.book.add_item(cover_chapter)
def add_chapters(self):
"""Add chapters to epub, replacing references to image urls to static image paths if images are provided during initialization."""
chapters = []
for idx, (part, tree) in enumerate(zip(self.story["parts"], self.parts)):
chapter = epub.EpubHtml(
title=part["title"], file_name=f"{idx}_{part['id']}.xhtml"
)
if self.images:
for img_idx, (img_data, img_tag) in enumerate(
zip(self.images[idx], tree.find_all("img"))
):
path = f"static/{idx}_{part['id']}/{img_idx}.jpeg"
img = epub.EpubImage(
media_type="image/jpeg", content=img_data, file_name=path
)
self.book.add_item(img)
img_tag["src"] = path
chapter.set_content(tree.prettify())
self.book.add_item(chapter)
chapters.append(chapter)
# ! Review, are these needed? #11
self.book.toc = chapters
# Thanks https://github.com/aerkalov/ebooklib/blob/master/samples/09_create_image/create.py
self.book.add_item(epub.EpubNcx())
self.book.add_item(epub.EpubNav())
# create spine
self.book.spine = ["nav"] + chapters
def compile(self):
self.add_metadata()
self.add_cover()
self.add_chapters()
return True
def dump(self) -> BytesIO:
# Thanks https://stackoverflow.com/a/75398222
buffer = BytesIO()
epub.write_epub(buffer, self.book)
buffer.seek(0)
return buffer
+208
View File
@@ -0,0 +1,208 @@
from base64 import b64encode
from io import BytesIO
from pathlib import Path
from tempfile import NamedTemporaryFile, _TemporaryFileWrapper
from bs4 import BeautifulSoup
from exiftool import ExifTool
from jinja2 import Template
from weasyprint import CSS, HTML
from weasyprint.text.fonts import FontConfiguration
from ..models import Story
from .types import AbstractGenerator
DATA_PATH = Path(__file__).parent / "pdf"
ASSET_PATH = DATA_PATH / "assets"
COPYRIGHT_DATA = {
1: {
"name": "All Rights Reserved",
"statement": "©️ {published_year} by {username}. All Rights Reserved.",
"freedoms": "No reuse, redistribution, or modification without permission.",
"printing": "Not allowed without explicit permission.",
"asset": None,
},
2: {
"name": "Public Domain",
"statement": "This work is in the public domain. Originally published in {published_year} by {username}.",
"freedoms": "Free to use for any purpose without permission.",
"printing": "Allowed for personal or commercial purposes.",
"asset": ASSET_PATH / "cc-zero.png",
},
3: {
"name": "Creative Commons Attribution (CC-BY)",
"statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution 4.0 International License.",
"freedoms": "Allows reuse, redistribution, and modification with credit to the author.",
"printing": "Allowed with proper credit.",
"asset": ASSET_PATH / "by.png",
},
4: {
"name": "CC Attribution NonCommercial (CC-BY-NC)",
"statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.",
"freedoms": "Allows reuse and modification for non-commercial purposes with credit.",
"printing": "Allowed for non-commercial purposes with proper credit.",
"asset": ASSET_PATH / "by-nc.png",
},
5: {
"name": "CC Attribution NonCommercial NoDerivs (CC-BY-NC-ND)",
"statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NonCommercial-NoDerivs 4.0 International License.",
"freedoms": "Allows sharing in original form for non-commercial purposes with credit; no modifications allowed.",
"printing": "Allowed for non-commercial purposes in original form with proper credit.",
"asset": ASSET_PATH / "by-nc-nd.png",
},
6: {
"name": "CC Attribution NonCommercial ShareAlike (CC-BY-NC-SA)",
"statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.",
"freedoms": "Allows reuse and modification for non-commercial purposes under the same license, with credit.",
"printing": "Allowed for non-commercial purposes with proper credit under the same license.",
"asset": ASSET_PATH / "by-nc-sa.png",
},
7: {
"name": "CC Attribution ShareAlike (CC-BY-SA)",
"statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License.",
"freedoms": "Allows reuse and modification for any purpose under the same license, with credit.",
"printing": "Allowed with proper credit under the same license.",
"asset": ASSET_PATH / "by-sa.png",
},
8: {
"name": "CC Attribution NoDerivs (CC-BY-ND)",
"statement": "©️ {published_year} by {username}. This work is licensed under a Creative Commons Attribution-NoDerivs 4.0 International License.",
"freedoms": "Allows sharing in original form for any purpose with credit; no modifications allowed.",
"printing": "Allowed in original form with proper credit.",
"asset": ASSET_PATH / "by-nd.png",
},
} # Maps Wattpad Copyright IDs to their corresponding data.
with open(DATA_PATH / "stylesheet.css") as reader:
STYLESHEET = reader.read()
with open(DATA_PATH / "book.html") as reader:
TEMPLATE = reader.read()
class PDFGenerator(AbstractGenerator):
def __init__(
self,
metadata: Story,
part_trees: list[BeautifulSoup],
cover: bytes,
images: list[list[bytes | None]],
author_image: bytes,
):
self.story = metadata
self.parts = part_trees
self.cover = cover
self.images = images
self.author = author_image
self.book: _TemporaryFileWrapper = NamedTemporaryFile(suffix=".pdf")
self.content = TEMPLATE
def generate_chapters(self) -> dict[int, str]:
"""Return a dictionary of part_ids to content trees, with image URLs replaced with base64 encoded images if provided during initialization."""
data: dict[int, str] = {}
for idx, (part, tree) in enumerate(zip(self.story["parts"], self.parts)):
if self.images:
for img_idx, (img_data, img_tag) in enumerate(
zip(self.images[idx], tree.find_all("img"))
):
if not img_data:
continue
img_tag["src"] = (
f"data:image/jpg;base64,{b64encode(img_data).decode()}"
)
data[part["id"]] = tree.prettify()
return data
def populate_template(self, parts: dict[int, str]):
"""Populate HTML Template with Story data."""
copyright = COPYRIGHT_DATA[self.story["copyright"]]
data = {
"statement": copyright["statement"].format(
username=self.story["user"]["username"],
published_year=self.story["createDate"].split("-", 2)[0],
),
"author": self.story["user"]["username"],
"freedoms": copyright["freedoms"],
"printing": copyright["printing"],
"book_id": self.story["id"],
"book_title": self.story["title"],
"cover": f"data:image/jpg;base64,{b64encode(self.cover).decode()}",
"username": self.story["user"]["username"],
"description": self.story["description"],
"avatar": b64encode(self.author).decode(),
"copyright": {
"data": (
b64encode(copyright["asset"].read_bytes()).decode()
if copyright["asset"]
else ""
),
"name": copyright["name"],
},
"parts": parts,
}
self.content: str = Template(self.content).render(data)
def generate_pdf(self):
"""Generate and write the PDF to a temporary file (self.book)."""
font_config = FontConfiguration()
stylesheet_obj = CSS(string=STYLESHEET, font_config=font_config)
html_obj = HTML(string=self.content)
html_obj.write_pdf(
self.book.name, stylesheets=[stylesheet_obj], font_config=font_config
)
def add_metadata(self):
"""Write metadata to generated PDF file at self.book, using ExifTool."""
clean_description = (
self.story["description"].strip().replace("\n", "$/")
) # exiftool doesn't parse \ns correctly, they support $/ for the same instead. `&#xa;` is another option.
metadata = {
"Author": self.story["user"]["username"],
"Title": self.story["title"],
"Subject": clean_description,
"CreationDate": self.story["createDate"],
"ModDate": self.story["modifyDate"],
"Keywords": ",".join(self.story["tags"]),
"Language": self.story["language"]["name"],
"Completed": self.story["completed"],
"MatureContent": self.story["mature"],
"Producer": "Dhanush Rambhatla (TheOnlyWayUp - https://rambhat.la) and WattpadDownloader",
} # As per https://exiftool.org/TagNames/PDF.html
with ExifTool(config_file=DATA_PATH / "exiftool.config") as et:
# Custom configuration adds Completed and MatureContent tags.
# exiftool logger logs executed command
et.execute(
*(
[f"-{key}={value}" for key, value in metadata.items()]
+ [
"-overwrite_original",
self.book.file.name,
]
)
)
def compile(self):
parts = self.generate_chapters()
self.populate_template(parts)
self.generate_pdf()
self.add_metadata()
return True
def dump(self) -> BytesIO:
self.book.seek(0)
buffer = BytesIO(self.book.read())
self.book.close()
return buffer
Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.3 KiB

@@ -0,0 +1,73 @@
<!DOCTYPE html>
<html lang="{{ langcode }}">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{{ book_title }}</title>
<section class="fullpage">
<img src="{{ cover }}" alt="Cover">
</section>
<div id="copyright-container">
<h1 id="copyright-notice">Copyright Notice</h1>
<h2 id="copyright-title">{{ book_title }}</h2>
<p id="copyright-author">By {{ author }}</p>
<div id="copyright-separator"></div>
<p id="copyright-ex-libris">Ex Libris Sapientiae</p>
<div id="copyright-separator"></div>
{% if copyright.data %}
<img src="data:image/jpg;base64,{{copyright.data}}"
alt="{{copyright.name}}"
width="88"
height="31"
id="copyright-license-image">
{% endif %}
<p id="copyright-copyright">{{ statement }}</p>
<p id="copyright-rights">{{ freedoms }}</p>
<p id="copyright-printing">Printing: {{ printing }}</p>
<p id="book-link">
ID: {{ book_id }}.
<a href="https://wattpad.com/story/{{ book_id }}" target="_blank" id="copyright-link">View this Book Online</a>
</p>
</div>
<div id="book">
<section id="contents" class="toc">
<h1>Table of Contents</h1>
<ul>
{% for part_id in parts %}
<li><a href="#{{part_id}}"></a></li>
{% endfor %}
</ul>
</section>
{% for part_id in parts %}
{{parts[part_id] | safe}}
{% endfor %}
</div>
<h1>About the Author</h1>
<div id="author-container">
<div id="author-about">
<img src="data:image/jpg;base64,{{avatar}}" alt="{{author}}'s profile picture" id="author-profile-picture">
<h2 id="author-name">
<a href="https://wattpad.com/user/{{ username }}" id="author-link">{{ username }}</a>
</h2>
<hr id="author-divider">
<p id="author-bio">
{{ description }}
</p>
</div>
</div>
</html>
@@ -205,6 +205,8 @@ section {
#contents a {
color: inherit;
text-decoration: none;
display: flex;
justify-content: space-between;
}
#contents a::before {
content: target-counter(attr(href), h2-counter) '. ' target-text(attr(href));
@@ -389,6 +391,14 @@ a:hover {
}
#book-link {
font-size: 14px;
color: #666;
margin: 8px 0;
text-align: center;
}
#copyright-separator {
width: 100%;
max-width: 400px;
@@ -0,0 +1,47 @@
from io import BytesIO
from tempfile import _TemporaryFileWrapper
from typing import Literal
from bs4 import BeautifulSoup
from ebooklib.epub import EpubBook
from ..models import Story
class AbstractGenerator:
"""Compile parsed part trees to a file.
Args:
metadata (Story): Story Metadata.
part_trees (List[BeautifulSoup]): Parsed part trees.
cover (bytes): Cover image.
images (List[List[bytes | None]]): An array of images for each chapter, if images have been downloaded.
"""
def __init__(
self,
metadata: Story,
part_trees: list[BeautifulSoup],
cover: bytes,
images: list[list[bytes | None]],
):
self.story = metadata
self.parts = part_trees
self.cover = cover
self.images = images
self.book: EpubBook | _TemporaryFileWrapper = None # type: ignore
def compile(self) -> Literal[True]:
"""Compile the part trees into the corresponding in-memory representation of the generator format.
Returns:
Literal[True]: Compiled successfully.
"""
return True
def dump(self) -> BytesIO:
"""Return a Buffer of the compiled file."""
buffer = BytesIO()
return buffer
+19
View File
@@ -0,0 +1,19 @@
import logging
from os import environ
from eliot import to_file
from eliot.stdlib import EliotHandler
handler = EliotHandler()
logging.getLogger("fastapi").setLevel(logging.INFO)
logging.getLogger("fastapi").addHandler(handler)
exiftool_logger = logging.getLogger("exiftool")
exiftool_logger.addHandler(handler)
logger = logging.Logger("wpd")
logger.addHandler(handler)
if environ.get("DEBUG"):
to_file(open("eliot.log", "wb"))
+42
View File
@@ -0,0 +1,42 @@
from typing import Optional, TypedDict
class CopyrightData(TypedDict):
name: str
statement: str
freedoms: str
printing: str
image_url: Optional[str]
class Language(TypedDict):
name: str
class User(TypedDict):
username: str
avatar: str
description: str
class Part(TypedDict):
id: int
title: str
class Story(TypedDict):
id: str
title: str
createDate: str
modifyDate: str
language: Language
user: User
description: str
cover: str
completed: bool
tags: list[str]
mature: bool
url: str
parts: list[Part]
isPaywalled: bool
copyright: int
+86
View File
@@ -0,0 +1,86 @@
import asyncio
from itertools import batched
from typing import cast
from aiohttp import ClientSession
from bs4 import BeautifulSoup, Tag
from eliot import start_action
from .vars import headers
def clean_tree(title: str, id: int, body: str) -> BeautifulSoup:
original_soup = BeautifulSoup(body, features="lxml")
new_soup = BeautifulSoup(
f"""
<h1 class="chapter-title" id={id}>{title}</h1>
<section class="chapter-body"></section>
""",
features="html.parser", # head/body tags aren't generated
)
insert_at = cast(Tag, new_soup.find("section"))
children = cast(Tag, original_soup.find("body")).children
for tag in cast(list[Tag], list(children)):
if tag.name != "p": # Casted to lower
continue
style = tag.attrs.get("style")
for child in cast(list[Tag], tag.children):
# tag is a <p> enclosing either text, media, or a break
if child.name in [None, "b", "i", "u", "strong", "em"]:
# text is enclosed, can be italic, bold, underlined, or a mix
tag.attrs = {}
p_tag = tag
if style:
p_tag["style"] = style
insert_at.append(p_tag)
break
elif child.name == "img":
# image is enclosed
img_tag = Tag(name="img")
img_tag.attrs = {
"height": child.attrs.get("data-original-height"),
"width": child.attrs.get("data-original-width"),
"src": child["src"],
}
if style:
img_tag["style"] = style
insert_at.append(img_tag)
elif child.name == "br":
# br tag is enclosed
br_tag = Tag(name="br", can_be_empty_element=True)
if style:
br_tag["style"] = style
insert_at.append(br_tag)
return new_soup
async def fetch_image(url: str) -> bytes | None:
"""Fetch image bytes."""
with start_action(action_type="api_fetch_image", url=url):
async with ClientSession(headers=headers) as session: # Don't cache images.
async with session.get(url) as response:
if not response.ok:
return None
body = await response.read()
return body
async def fetch_tree_images(tree: BeautifulSoup):
"""Return a Generator of bytes containing image data for all images referenced in the tree."""
image_urls = [img["src"] for img in tree.find_all("img")]
images = []
for chunk in batched(image_urls, 3):
for image_data in await asyncio.gather(*[fetch_image(url) for url in chunk]):
images.append(image_data)
return images
+25
View File
@@ -0,0 +1,25 @@
import re
import unicodedata
def slugify(value, allow_unicode=False) -> str:
"""
Taken from https://github.com/django/django/blob/master/django/utils/text.py
Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated
dashes to single dashes. Remove characters that aren't alphanumerics,
underscores, or hyphens. Convert to lowercase. Also strip leading and
trailing whitespace, dashes, and underscores.
Thanks https://stackoverflow.com/a/295466.
"""
value = str(value)
if allow_unicode:
value = unicodedata.normalize("NFKC", value)
else:
value = (
unicodedata.normalize("NFKD", value)
.encode("ascii", "ignore")
.decode("ascii")
)
value = re.sub(r"[^\w\s-]", "", value.lower())
return re.sub(r"[-\s]+", "-", value).strip("-_")
+28
View File
@@ -0,0 +1,28 @@
from aiohttp_client_cache import FileBackend, RedisBackend
from dotenv import load_dotenv
from .config import CacheTypes, Config
from .logs import logger
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"
}
load_dotenv(override=True)
config = Config()
if config.USE_CACHE:
match config.CACHE_TYPE:
case CacheTypes.file:
cache = FileBackend(use_temp=True, expire_after=43200) # 12 hours
case CacheTypes.redis:
cache = RedisBackend(
cache_name="wpd-aiohttp-cache",
address=config.REDIS_CONNECTION_URL,
expire_after=43200, # 12 hours
)
else:
cache = None
logger.info(f"Using {cache=}")
+46 -35
View File
@@ -1,13 +1,15 @@
"""WattpadDownloader API Server."""
from typing import Optional
import asyncio
from pathlib import Path
from enum import Enum
from pathlib import Path
from typing import Optional
from zipfile import ZipFile
from eliot import start_action
from aiohttp import ClientResponseError
from fastapi import FastAPI, Request
from bs4 import BeautifulSoup
from eliot import start_action
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import (
FileResponse,
HTMLResponse,
@@ -15,29 +17,25 @@ from fastapi.responses import (
StreamingResponse,
)
from fastapi.staticfiles import StaticFiles
from create_book import (
EPUBGenerator,
PDFGenerator,
fetch_story,
fetch_story_from_partId,
fetch_story_content_zip,
fetch_image,
fetch_cookies,
WattpadError,
StoryNotFoundError,
generate_clean_part_html,
slugify,
WattpadError,
fetch_cookies,
fetch_image,
fetch_story,
fetch_story_content_zip,
fetch_story_from_partId,
logger,
slugify,
)
from create_book.parser import clean_tree, fetch_tree_images
app = FastAPI()
BUILD_PATH = Path(__file__).parent / "build"
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36"
}
class RequestCancelledMiddleware:
# Thanks https://github.com/fastapi/fastapi/discussions/11360#discussion-6427734
@@ -77,7 +75,7 @@ app.add_middleware(RequestCancelledMiddleware)
class DownloadFormat(Enum):
# pdf = "pdf"
pdf = "pdf"
epub = "epub"
@@ -170,31 +168,44 @@ async def handle_download(
cover_data = await fetch_image(
metadata["cover"].replace("-256-", "-512-")
) # Increase resolution
match format:
case DownloadFormat.epub:
book = EPUBGenerator(metadata, cover_data)
media_type = "application/epub+zip"
# case DownloadFormat.pdf:
# book = PDFGenerator(metadata, cover_data)
# media_type = "application/pdf"
logger.info(f"Retrieved story metadata and cover ({story_id=})")
if not cover_data:
raise HTTPException(status_code=422)
story_zip = await fetch_story_content_zip(story_id, cookies)
archive = ZipFile(story_zip, "r")
part_contents = [
generate_clean_part_html(
part, archive.read(str(part["id"])).decode("utf-8")
part_trees: list[BeautifulSoup] = [
clean_tree(
part["title"], part["id"], archive.read(str(part["id"])).decode("utf-8")
)
for part in metadata["parts"]
]
async for title in book.add_chapters(
part_contents, download_images=download_images
):
...
images = (
[await fetch_tree_images(tree) for tree in part_trees]
if download_images
else []
)
match format:
case DownloadFormat.epub:
book = EPUBGenerator(metadata, part_trees, cover_data, images)
media_type = "application/epub+zip"
case DownloadFormat.pdf:
author_image = await fetch_image(
metadata["user"]["avatar"].replace("-256-", "-512-")
)
if not author_image:
raise HTTPException(status_code=422)
book = PDFGenerator(
metadata, part_trees, cover_data, images, author_image
)
media_type = "application/pdf"
logger.info(f"Retrieved story metadata and cover ({story_id=})")
book.compile()
book_buffer = book.dump()
-54
View File
@@ -1,54 +0,0 @@
<!DOCTYPE html>
<html lang="{langcode}">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{book_title}</title>
<section class="fullpage">
<img src="{cover}" alt="Cover">
</section>
<div id="copyright-container">
<h1 id="copyright-notice">Copyright Notice</h1>
<h2 id="copyright-title">{book_title}</h2>
<p id="copyright-author">By {author}</p>
<div id="copyright-separator"></div>
<p id="copyright-ex-libris">Ex Libris Sapientiae</p>
<div id="copyright-separator"></div>
{copyright_image}
<p id="copyright-copyright">{statement}</p>
<p id="copyright-rights">{freedoms}</p>
<p id="copyright-printing">Printing: {printing}</p>
<p id="copyright-printing">ID: {book_id}. <a href="https://wattpad.com/story/{book_id}" target="_blank" id="copyright-link">View this Book Online</a></p>
</div>
<div id="book">
</div>
<h1>About the Author</h1>
<div id="author-container">
<div id="author-about">
{avatar}
<h2 id="author-name"><a href="https://wattpad.com/user/{username}" id="author-link">{username}</a></h2>
<hr id="author-divider">
<p id="author-bio">
{description}
</p>
</div>
</div>
</html>
-120
View File
@@ -1,120 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<xsl:stylesheet version="2.0"
xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:outline="http://wkhtmltopdf.org/outline"
xmlns="http://www.w3.org/1999/xhtml">
<xsl:output doctype-public="-//W3C//DTD XHTML 1.0 Strict//EN"
doctype-system="http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"
indent="yes" />
<xsl:template match="outline:outline">
<html>
<head>
<style>
@font-face {
font-family: 'PT Serif';
src: url('./fonts/PTSerif-Regular.ttf') format('truetype');
font-weight: 400;
font-style: normal;
}
@font-face {
font-family: 'PT Serif';
src: url('./fonts/PTSerif-Bold.ttf') format('truetype');
font-weight: 700;
font-style: normal;
}
@font-face {
font-family: 'PT Serif';
src: url('./fonts/PTSerif-Italic.ttf') format('truetype');
font-weight: 400;
font-style: italic;
}
@font-face {
font-family: 'PT Serif';
src: url('./fonts/PTSerif-BoldItalic.ttf') format('truetype');
font-weight: 700;
font-style: italic;
}
.pt-serif-regular {
font-family: "PT Serif", serif;
font-weight: 400;
font-style: normal;
}
.pt-serif-bold {
font-family: "PT Serif", serif;
font-weight: 700;
font-style: normal;
}
.pt-serif-regular-italic {
font-family: "PT Serif", serif;
font-weight: 400;
font-style: italic;
}
.pt-serif-bold-italic {
font-family: "PT Serif", serif;
font-weight: 700;
font-style: italic;
}
h1 {
text-align: center;
font-family: "PT Serif", serif !important;
font-weight: 700 !important;
font-style: normal !important;
font-size: 36px !important; /* Uniform size */
margin-bottom: 20px; /* Space below the heading */
border-bottom: 4px solid black; /* Black line */
padding-bottom: 10px; /* Space between text and line */
}
div {border-bottom: 1px dashed rgb(100,000,100);
padding-top: 5px;}
span {float: right;}
li {list-style: none;}
ul {
font-size: 22px;
font-family: arial;
}
ul ul {font-size: 80%; }
ul {padding-left: 0em;}
ul ul {padding-left: 1em;}
a {text-decoration:none; color: black;}
</style>
</head>
<body>
<h1>Table of Contents</h1>
<ul><xsl:apply-templates select="outline:item/outline:item"/></ul>
</body>
</html>
</xsl:template>
<xsl:template match="outline:item">
<li>
<xsl:if test="@title!=''">
<div>
<a class="pt-serif-regular">
<xsl:if test="@link">
<xsl:attribute name="href"><xsl:value-of select="@link"/></xsl:attribute>
</xsl:if>
<xsl:if test="@backLink">
<xsl:attribute name="name"><xsl:value-of select="@backLink"/></xsl:attribute>
</xsl:if>
<xsl:value-of select="@title" />
</a>
<span> <xsl:value-of select="@page" /> </span>
</div>
</xsl:if>
<ul>
<xsl:comment>added to prevent self-closing tags in QtXmlPatterns</xsl:comment>
<xsl:apply-templates select="outline:item"/>
</ul>
</li>
</xsl:template>
</xsl:stylesheet>
+425 -658
View File
File diff suppressed because it is too large Load Diff