Changeset - d74c0da6d370
[Not reviewed]
tip default
0 3 0
Dennis Fink - 2 years ago 2022-08-30 10:13:44
dennis.fink@c3l.lu
Simplify book handling
3 files changed with 50 insertions and 23 deletions:
0 comments (0 inline, 0 general)
stockcli/books.py
Show inline comments
 
import logging
 
from typing import Dict, Optional, Tuple
 

	
 
import isbnlib
 
from rich.panel import Panel
 
from rich.table import Table
 

	
 
from . import utils
 
from .console import DEFAULT_PADDING, console, error_console, int_prompt, prompt
 
from .console import (DEFAULT_PADDING, console, error_console, int_prompt,
 
                      prompt)
 
from .style import GreenBoldText
 

	
 

	
 
def book_already_in_database(
 
    isbn: str, userentity_id: int
 
) -> Tuple[bool, Optional[int]]:
 
    all_books = utils.get_request(
 
        f"objects/userobjects?query[]=userentity_id={userentity_id}"
 
    )
 

	
 
    for book in all_books:
 
        book_metadata = utils.get_request(
 
            f"userfields/userentity-books/{book['id']}", cached=True
 
        )
 

	
 
        if book_metadata["isbn"] is None:
 
            continue
 

	
 
        if isbnlib.canonical(book_metadata["isbn"]) == isbn:
 
            return True, book["id"]
 

	
 
    return False, None
 

	
 

	
 
def add_book_by_barcode(barcode: str) -> None:
 

	
 
    canonical_isbn = isbnlib.canonical(barcode)
 
    if not (isbnlib.is_isbn10(canonical_isbn) or isbnlib.is_isbn13(canonical_isbn)):
 
        logging.error(f"{barcode} is not a valid ISBN!")
 
        error_console.print(f"{barcode} is not a valid ISBN!")
 
        return
 

	
 
    userentity = utils.get_request("objects/userentities?query[]=name=books")
 
    userentity_id = userentity[0]["id"]
 

	
 
    all_books = utils.get_request(
 
        f"objects/userobjects?query[]=userentity_id={userentity_id}"
 
    )
 

	
 
    for book in all_books:
 
        book_metadata = utils.get_request(
 
            f"userfields/userentity-books/{book['id']}", cached=True
 
    already_in_database, book_id = book_already_in_database(
 
        canonical_isbn, userentity_id
 
        )
 

	
 
        if book_metadata["isbn"] is None:
 
            continue
 

	
 
        if isbnlib.canonical(book_metadata["isbn"]) == canonical_isbn:
 

	
 
    if already_in_database:
 
            book_metadata = utils.get_request(
 
                f"userfields/userentity-books/{book['id']}", cached=False
 
            f"userfields/userentity-books/{book_id}", cached=False
 
            )
 

	
 
            grid = Table.grid(padding=DEFAULT_PADDING)
 
            grid.add_column(justify="right", no_wrap=True)
 
            grid.add_column(justify="left", style="cyan", no_wrap=True)
 
            grid.add_row(GreenBoldText("Title:"), book_metadata["title"])
 

	
 
        if book_metadata["authors"] is not None:
 
            grid.add_row(
 
                GreenBoldText("Authors:"),
 
                ", ".join(book_metadata["authors"].split("\n")),
 
            )
 
            grid.add_row(GreenBoldText("Amount:"), book_metadata["amount"])
 
            console.print(
 
                Panel(grid, title="[green bold]Book already found[/green bold]")
 

	
 
        if book_metadata["categories"] is not None:
 
            grid.add_row(
 
                GreenBoldText("Categories:"),
 
                book_metadata["categories"].replace(",", ", "),
 
            )
 

	
 
        console.print(Panel(grid, title="[green bold]Book already found[/green bold]"))
 

	
 
            add_to_amount = bool(
 
                int_prompt.ask("Add? (Enter 0 to abort)", choices=["0", "1"], default=0)
 
            )
 
            if not add_to_amount:
 
                logging.debug("User aborted task!")
 
                return
 
            else:
 
                book_metadata["amount"] = str(int(book_metadata["amount"]) + 1)
 
                response = utils.put_request(
 
                    f"userfields/userentity-books/{book['id']}",
 
                    book_metadata,
 
                    cached=True,
 
                )
 
                console.print("Successfully updated!")
 
                return
 

	
 
    try:
 
    metadata = isbnlib.meta(canonical_isbn, "worldcat")
 
        if metadata["Title"] is None and metadata["Authors"] is None:
 
            metadata = isbnlib.meta(canonical_isbn, "default")
 
    except:
 
        metadata = isbnlib.meta(canonical_isbn, "default")
 

	
 
    grid = Table.grid(padding=DEFAULT_PADDING)
 
    grid.add_column(justify="right", no_wrap=True)
 
    grid.add_column(justify="left", style="cyan", no_wrap=True)
 
    grid.add_column(justify="left", style="cyan", no_wrap=False)
 
    grid.add_row(GreenBoldText("Title:"), metadata.get("Title", None))
 
    grid.add_row(GreenBoldText("Author(s)"), ", ".join(metadata.get("Authors", None)))
 
    console.print(Panel(grid, title="[green bold]Book Info[/green bold]"))
 

	
 
    ok = bool(
 
        int_prompt.ask(
 
            "Is the metadata correct? (Enter 0 to abort)", choices=["0", "1"], default=0
 
            "Is the metadata correct? (Enter 0 to abort)", choices=["0", "1"], default=1
 
        )
 
    )
 
    if not ok:
 
        logging.debug("User aborted task!")
 
        return
 
    new_book_id = utils.post_request(
 
        "objects/userobjects", {"userentity_id": userentity_id}
 
    )["created_object_id"]
 

	
 
    response = utils.put_request(
 
        f"userfields/userentity-books/{new_book_id}",
 
        {
 
            "title": metadata["Title"],
 
            "isbn": isbnlib.mask(canonical_isbn),
 
            "authors": "\n".join(metadata["Authors"]),
 
            "amount": "1",
 
            "categories": None,
 
        },
 
    )
 
    console.print("Successfully added!")
 
    return
 

	
 

	
 
def update_isbn(barcode: str) -> None:
 
    userentity = utils.get_request("objects/userentities?query[]=name=books")
 
    userentity_id = userentity[0]["id"]
 

	
 
    all_books = utils.get_request(
 
        f"objects/userobjects?query[]=userentity_id={userentity_id}"
 
    )
 

	
 
    for book in all_books:
 
        book_metadata = utils.get_request(
 
            f"userfields/userentity-books/{book['id']}", cached=False
 
        )
 
        console.print("Handling")
 
        console.print(book_metadata)
 
        try:
 
            book_metadata["isbn"] = isbnlib.mask(
 
                isbnlib.canonical(book_metadata["isbn"])
 
            )
 
            response = utils.put_request(
 
                f"userfields/userentity-books/{book['id']}",
 
                book_metadata,
 
            )
 
        except:
 
            continue
stockcli/cli.py
Show inline comments
 
import json
 
import logging
 
import logging.config
 
from datetime import timedelta
 
from operator import itemgetter
 

	
 
import click
 
import requests
 
import requests_cache
 
from rich.panel import Panel
 
from rich.table import Table
 

	
 
from .books import add_book_by_barcode, update_isbn
 
from .console import DEFAULT_PADDING, console, int_prompt, prompt
 
from .stock import (
 
    add_by_barcode,
 
    get_info_by_barcode,
 
    transfer_by_barcode,
 
    update_by_barcode,
 
)
 
from .utils import prepare_barcode
 

	
 
TASK_MAP = {
 
    "1": ("Transfer stock", transfer_by_barcode),
 
    "2": ("Add stock", add_by_barcode),
 
    "3": ("Update stock", update_by_barcode),
 
    "4": ("Get product info", get_info_by_barcode),
 
    "5": ("Add book", add_book_by_barcode),
 
    "6": ("Update ISBN", update_isbn),
 
}
 

	
 

	
 
@click.command(context_settings={"help_option_names": ("-h", "--help", "-?")})
 
@click.option(
 
    "-c",
 
    "--config",
 
    "configfile",
 
    default="/etc/stockcli.json",
 
    type=click.File("r"),
 
    help="Config file to load",
 
)
 
@click.pass_context
 
def stockcli(ctx: click.Context, configfile: click.File) -> None:
 

	
 
    config = json.load(configfile)  # type: ignore
 
    logging.config.dictConfig(config["logging"])
 

	
 
    ctx.ensure_object(dict)
 
    ctx.obj["request_session"] = requests.Session()
 
    ctx.obj["request_session"].headers.update(
 
        {
 
            "accept": "application/json",
 
            "GROCY-API-KEY": config["grocy"]["apikey"],
 
        }
 
    )
 
    ctx.obj["cached_session"] = requests_cache.CachedSession(
 
        "stockcli",
 
        ignored_parameters=["GROCY-API-KEY"],
 
        expire_after=timedelta(days=1),
 
        cache_control=True,
 
        allowable_methods=["GET", "POST"],
 
        allowable_codes=[200, 400],
 
        match_headers=True,
 
        stale_if_error=True,
 
    )
 
    ctx.obj["cached_session"].headers.update(
 
        {
 
            "accept": "application/json",
 
            "GROCY-API-KEY": config["grocy"]["apikey"],
 
        }
 
    )
 
    ctx.obj["base_url"] = f"{config['grocy']['url']}/api/"
 

	
 
    menu = Table.grid(padding=DEFAULT_PADDING)
 
    menu.add_column(justify="left", style="green", no_wrap=True)
 
    menu.add_column(justify="left", style="cyan", no_wrap=True)
 

	
stockcli/utils.py
Show inline comments
 
import logging
 
import string
 
from operator import attrgetter
 
from typing import Any, Dict, Optional
 

	
 
import click
 
import requests
 

	
 
from .console import error_console
 

	
 
UNALLOWED_CHARACTERS = str.maketrans(dict((c, None) for c in string.whitespace))
 

	
 

	
 
def make_request(
 
    method: str, url_path: str, data: Optional[Any] = None, *, cached: bool = False
 
) -> Any:
 
    obj = click.get_current_context().obj
 

	
 
    if cached:
 
        session = obj["cached_session"]
 
    else:
 
        session = obj["request_session"]
 
    session = obj["cached_session"] if cached else obj["request_session"]
 
    base_url = obj["base_url"]
 
    requested_url = base_url + url_path
 

	
 
    method_function = attrgetter(method)
 

	
 
    try:
 
        if data is not None:
 
            logging.debug(
 
                f"Making {method.upper()} request: {requested_url} with {data}"
 
            )
 
            response = method_function(session)(requested_url, json=data)
 
        else:
 
            logging.debug(f"Making {method.upper()} request: {requested_url}")
 
            response = method_function(session)(requested_url)
 
        response.raise_for_status()
 
    except requests.Timeout:
 
        logging.error(f"The connection to {requested_url} timed out!")
 
        error_console.print("Connection timed out!")
 
        raise
 
    except requests.ConnectionError:
 
        logging.error(f"Couldn't establish a connection to {requested_url}!")
 
        error_console.print("Couldn't establish a connection!")
 
        raise
 
    except requests.HTTPError:
 
        logging.error(f"{requested_url} sent back an HTTPError")
 
        error_console.print("Got the following error:")
 
        error_message = response.json()["error_message"]
 
        logging.error(error_message)
 
        error_console.print(error_message)
 
        raise
 
    except requests.TooManyRedirects:
 
        logging.error(f"{requested_url} had too many redirects!")
 
        error_console.print("Too many redirects!")
 
        raise
 
    else:
 
        try:
 
            return response.json()
 
        except requests.JSONDecodeError:
 
            return response
 

	
 

	
 
def get_request(url_path: str, *, cached: bool = False) -> Any:
 
    return make_request("get", url_path, cached=cached)
 

	
 

	
 
def post_request(url_path: str, data: Dict[str, Any], *, cached: bool = False) -> Any:
 
    return make_request("post", url_path, data, cached=cached)
 

	
0 comments (0 inline, 0 general)