Source code for camelot.handlers

"""Functions to handle all operations on the PDF's."""

from __future__ import annotations

import logging
import multiprocessing as mp
import os
import tempfile
from functools import partial
from itertools import chain
from pathlib import Path
from typing import IO
from typing import Any

import playa
from playa.exceptions import PDFPasswordIncorrect
from playa.exceptions import PDFTextExtractionNotAllowed
from playa.miner import LTChar
from playa.miner import LTImage
from playa.miner import LTTextLineHorizontal
from playa.miner import LTTextLineVertical

from .core import TableList
from .parsers import Hybrid
from .parsers import Lattice
from .parsers import MachineLearning
from .parsers import Network
from .parsers import Stream
from .utils import download_url
from .utils import get_image_char_and_text_objects
from .utils import get_page_layout
from .utils import get_rotation
from .utils import is_url

PARSERS = {
    "lattice": Lattice,
    "stream": Stream,
    "network": Network,
    "hybrid": Hybrid,
    "ml": MachineLearning,
}

logger = logging.getLogger("camelot")


FilepathOrBuffer = str | Path | bytes | bytearray | memoryview | IO[bytes]


def _spill_bytes_to_tempfile(data: bytes) -> str:
    """Write `data` to a NamedTemporaryFile and return its path.

    Used by ``PDFHandler`` when the caller passes a ``bytes``-like object
    or a file-like stream rather than a filesystem path — the Lattice
    flavor's OpenCV image-conversion backend needs a real on-disk file,
    so the simplest contract is "always spill once, treat as a path from
    here on, clean up on close()". The file is created with ``delete=False``
    and reaped in :meth:`PDFHandler.close`.
    """
    with tempfile.NamedTemporaryFile(
        prefix="camelot-", suffix=".pdf", delete=False
    ) as f:
        f.write(data)
        return f.name


def _consume_filelike_to_bytes(filepath: IO[bytes]) -> bytes:
    """Read a binary stream from the start, preserve the caller's cursor.

    Always reads from position 0 — a PDF mid-stream isn't useful — and
    seeks back to whatever position the caller had before the call so
    they can keep using the same handle. Non-seekable streams (sockets,
    pipes) silently fall through: ``read()`` returns whatever's left,
    and the seek-back is best-effort.

    Raises ``TypeError`` when ``read()`` returns text rather than bytes,
    which catches accidental text-mode opens before the bad data reaches
    pdfium and produces a clearer error.
    """
    tell = getattr(filepath, "tell", None)
    seek = getattr(filepath, "seek", None)
    pos = tell() if callable(tell) else None
    if callable(seek):
        try:
            seek(0)
        except (OSError, ValueError):
            pass
    data = filepath.read()
    if pos is not None and callable(seek):
        try:
            seek(pos)
        except (OSError, ValueError):
            pass
    if not isinstance(data, (bytes, bytearray, memoryview)):
        raise TypeError(
            "file-like 'filepath' must return bytes from .read(),"
            f" got {type(data).__name__}"
        )
    return bytes(data)


def _resolve_filepath(filepath: FilepathOrBuffer) -> tuple[str | Path, bool]:
    """Normalise read_pdf's polymorphic filepath argument to a real path.

    Returns ``(path, is_temp)``. ``is_temp`` is True when the returned
    path points to a tempfile we created (and therefore own — the
    caller's :meth:`PDFHandler.close` should reap it).

    Three branches:

    * ``bytes`` / ``bytearray`` / ``memoryview``: spill to a tempfile.
    * binary file-like (``.read()`` returning bytes): read once,
      preserve the caller's cursor, spill to a tempfile.
    * everything else (path / URL): URL → download to tempfile; path
      → pass through untouched.
    """
    if isinstance(filepath, (bytes, bytearray, memoryview)):
        return _spill_bytes_to_tempfile(bytes(filepath)), True
    if hasattr(filepath, "read") and callable(filepath.read):
        data = _consume_filelike_to_bytes(filepath)  # type: ignore[arg-type]
        return _spill_bytes_to_tempfile(data), True
    if is_url(filepath):
        return download_url(str(filepath)), True
    # mypy: filepath is the str | Path | os.PathLike subset of
    # FilepathOrBuffer (we've ruled out bytes / file-like / URL).
    return filepath, False  # type: ignore[return-value]


[docs] class PDFHandler: """Handles all operations on the PDF's. Handles all operations like temp directory creation, splitting file into single page PDFs, parsing each PDF and then removing the temp directory. Parameters ---------- filepath : str, Path, bytes, or binary file-like Source PDF. Accepts a filesystem path / URL, or — since #270 — a ``bytes``-like object or any binary stream with a ``.read()`` method (``io.BytesIO``, an open ``"rb"`` file, ``requests`` response ``.raw``, etc). In the in-memory cases the bytes are spilled to a temporary file once and cleaned up when the handler is closed; this keeps the rest of the pipeline (in particular the Lattice OpenCV image-conversion backend) unchanged. pages : str, optional (default: '1') Comma-separated page numbers. Example: '1,3,4' or '1,4-end' or 'all'. password : str, optional (default: None) Password for decryption. debug : bool, optional (default: False) Whether the parser should store debug information during parsing. """ def __init__( self, filepath: FilepathOrBuffer, pages="1", password=None, debug=False, ): self.debug = debug resolved, self.is_temp_file = _resolve_filepath(filepath) self.filepath: str | Path = resolved if password is None: self.password = "" # noqa: S105 else: self.password = password # Defer page resolution until parse() opens the PDF, so that we don't # open the document twice per read_pdf call. The literal default # "1" doesn't need the PDF at all and is resolved eagerly. self._pages_spec = pages self._pages_cache: list[int] | None = [1] if pages == "1" else None @property def pages(self) -> list[int]: """Resolved 1-based page numbers, sorted and de-duplicated. Lazy: only opens the PDF if the spec is something other than the default ``"1"``. Cached after first access. """ if self._pages_cache is None: self._pages_cache = self._resolve_pages(self._pages_spec) return self._pages_cache def _resolve_pages(self, pages: str, pdf: Any | None = None) -> list[int]: """Convert the ``pages`` spec to a sorted, de-duplicated list of ints. Pass an already-open ``pdf`` to avoid the playa.open() round-trip that would otherwise be needed to read ``len(pdf.pages)``. """ if pages == "1": return [1] if pdf is None: with playa.open(self.filepath, space="page", password=self.password) as pdf: return self._resolve_pages(pages, pdf) return self._expand_pages_spec(pages, len(pdf.pages)) @staticmethod def _expand_pages_spec(pages: str, page_count: int) -> list[int]: """Expand a pages spec string (``"all"``, ``"1,3,4"``, ``"1,4-end"``).""" page_numbers: list[dict[str, int]] = [] if pages == "all": page_numbers.append({"start": 1, "end": page_count}) else: for r in pages.split(","): if "-" in r: a, b = r.split("-") b_int = page_count if b == "end" else int(b) page_numbers.append({"start": int(a), "end": b_int}) else: page_numbers.append({"start": int(r), "end": int(r)}) result: list[int] = [] for p in page_numbers: result.extend(range(p["start"], p["end"] + 1)) return sorted(set(result)) def __enter__(self) -> PDFHandler: """Allow ``PDFHandler`` to be used as a context manager. On ``__exit__`` any temp file created by :func:`download_url` (when the caller passed a URL) is removed — see :meth:`close`. """ return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: """Clean up the URL-downloaded temp file, if any.""" self.close()
[docs] def close(self) -> None: """Delete the URL-downloaded temp file, if any. Idempotent; safe to call from both ``__exit__`` and an explicit ``handler.close()`` call. No-op when ``filepath`` was a user-owned path (we never delete a file the caller passed in). """ if not self.is_temp_file: return path = self.filepath if isinstance(path, (str, Path)) and os.path.exists(path): try: os.remove(path) except OSError: # On Windows (issue #678) pdfium / playa can still hold # an open handle to the temp file when close() runs, # giving WinError 32 ("being used by another process"). # Leave the file behind and let the OS reap it later # (NamedTemporaryFile's default tempdir is wiped at # reboot) — losing a few KB beats raising mid-cleanup. pass # Mark cleaned so a second close() doesn't re-stat-and-remove. self.is_temp_file = False
# Kept for backwards-compat with anything that called the old name. # New code should access self.pages or call _resolve_pages directly. def _get_pages(self, pages): """Convert pages string to list of integers. .. deprecated:: Use the :attr:`pages` property; this method exists only for backwards-compat with callers that imported it directly. Parameters ---------- filepath : str Filepath or URL of the PDF file. pages : str, optional (default: '1') Comma-separated page numbers. Example: '1,3,4' or '1,4-end' or 'all'. Returns ------- P : list List of int page numbers. """ page_numbers = [] if pages == "1": page_numbers.append({"start": 1, "end": 1}) else: with playa.open(self.filepath, space="page", password=self.password) as pdf: page_count = len(pdf.pages) if pages == "all": page_numbers.append({"start": 1, "end": page_count}) else: for r in pages.split(","): if "-" in r: a, b = r.split("-") if b == "end": b = page_count page_numbers.append({"start": int(a), "end": int(b)}) else: page_numbers.append({"start": int(r), "end": int(r)}) result = [] for p in page_numbers: result.extend(range(p["start"], p["end"] + 1)) return sorted(set(result)) def _get_layout( self, page: playa.Page, **layout_kwargs ) -> tuple[ Any, tuple[float, float], list[LTImage], list[LTChar], list[LTTextLineHorizontal], list[LTTextLineVertical], str, ]: """Get layout from a page. Parameters ---------- page : playa.Page Page in the document. Returns ------- layout : object dimensions : tuple The dimensions of the pdf page filepath : str The path of the single page PDF - either the original, or a normalized version. """ layout, dimensions = get_page_layout(page, **layout_kwargs) # fix rotated PDF images, chars, horizontal_text, vertical_text = get_image_char_and_text_objects( layout ) rotation = get_rotation(chars, horizontal_text, vertical_text) if rotation: # de-rotate the page if rotation == "clockwise": # rotate -90 degrees page.set_initial_ctm(page.space, page.rotate - 90) elif rotation == "anticlockwise": # rotate 90 degrees page.set_initial_ctm(page.space, page.rotate + 90) else: raise AssertionError( f"rotation should be clockwise or anticlockwise, is {rotation}" ) # now re-run layout analysis layout, dimensions = get_page_layout(page, **layout_kwargs) images, chars, horizontal_text, vertical_text = ( get_image_char_and_text_objects(layout) ) return ( layout, dimensions, images, chars, horizontal_text, vertical_text, rotation, )
[docs] def parse( self, flavor: str = "lattice", suppress_stdout: bool = False, parallel: bool = False, cpu_count: int | None = None, layout_kwargs: dict[str, Any] | None = None, per_page: dict[int, dict[str, Any]] | None = None, pages: list[int] | None = None, render_cache: dict[int, str] | None = None, **kwargs, ): """Extract tables by calling parser.get_tables on all single page PDFs. Parameters ---------- flavor : str (default: 'lattice') The parsing method to use. Lattice is used by default. suppress_stdout : bool (default: False) Suppress logs and warnings. parallel : bool (default: False) Process pages in parallel using all available cpu cores. cpu_count : int, optional (default: None) Maximum number of worker processes to use when ``parallel`` is True. ``None`` (default) uses all available cores. Values are clamped to ``[1, multiprocessing.cpu_count()]``. Ignored when ``parallel`` is False. layout_kwargs : dict, optional (default: {}) A dict of `pdfminer.layout.LAParams <https://pdfminersix.readthedocs.io/en/latest/reference/composable.html#laparams>`_ kwargs. kwargs : dict See camelot.read_pdf kwargs. Returns ------- tables : camelot.core.TableList List of tables found in PDF. """ if layout_kwargs is None: layout_kwargs = {} per_page = per_page or {} # Default parser used by any page without a per_page override. parser_obj = PARSERS[flavor] parser = parser_obj(debug=self.debug, **kwargs) if render_cache and hasattr(parser, "_render_cache"): # Pre-rendered page images (page_no -> png path) from the # flavor='auto' probe, so the parser skips re-rasterising them. parser._render_cache = render_cache # Compute worker count up-front so we can pass it to playa.open(). The # old code also gated the parallel branch on len(self.pages) > 1, but # touching self.pages here would force a separate playa.open() to # read the page count *before* this one — exactly the redundant open # this change exists to remove. playa.pages[…].map(…) honours # max_workers regardless of page count, and a single-page doc just # uses one worker effectively. max_cpus = mp.cpu_count() if parallel and max_cpus > 1: cpu_count = ( max_cpus if cpu_count is None else max(1, min(cpu_count, max_cpus)) ) else: cpu_count = 1 try: with playa.open( self.filepath, password=self.password, space="page", max_workers=cpu_count, ) as pdf: if not pdf.is_extractable: raise PDFTextExtractionNotAllowed( f"Text extraction is not allowed: {self.filepath}" ) # Resolve pages using the already-open document instead of # opening it a second time via the .pages property. A caller # may pass an explicit ``pages`` subset (used by flavor='auto' # to parse each per-page-detected flavor group separately). if pages is None: if self._pages_cache is None: self._pages_cache = self._resolve_pages(self._pages_spec, pdf) pages = self._pages_cache pages = [x - 1 for x in pages] tables = pdf.pages[pages].map( partial( self._parse_page, parser=parser, layout_kwargs=layout_kwargs, flavor=flavor, base_kwargs=kwargs, per_page=per_page, suppress_stdout=suppress_stdout, ) ) except PDFPasswordIncorrect as e: raise RuntimeError("File has not been decrypted") from e except PDFTextExtractionNotAllowed: raise return TableList(sorted(chain.from_iterable(tables)))
def _parse_page( self, page: playa.Page, parser, layout_kwargs, flavor: str = "lattice", base_kwargs: dict[str, Any] | None = None, per_page: dict[int, dict[str, Any]] | None = None, suppress_stdout: bool = False, ): """Extract tables by calling parser.get_tables on a single page PDF. Parameters ---------- page : playa.Page Page to parse parser : Lattice, Stream, Network or Hybrid The default parser to use when no per-page override applies. layout_kwargs : dict, optional (default: {}) A dict of `pdfminer.layout.LAParams <https://pdfminersix.readthedocs.io/en/latest/reference/composable.html#laparams>`_ kwargs. flavor : str, optional The global flavor; used as the fallback when a per-page override doesn't itself supply ``flavor=``. base_kwargs : dict, optional The global (already-cleaned) parser kwargs. Merged with any per-page override to construct a fresh parser for that page. per_page : dict, optional Page-number-keyed kwargs overrides (already validated upstream). suppress_stdout : bool, optional (default: False) Suppress per-page progress logs. Returns ------- tables : camelot.core.TableList List of tables found in PDF. """ # playa's page_idx is 0-indexed; user-facing per_page uses 1-indexed. page_no = page.page_idx + 1 if not suppress_stdout: logger.info("Processing page %d", page_no) overrides = (per_page or {}).get(page_no) if overrides: page_flavor = overrides.get("flavor", flavor) merged = dict(base_kwargs or {}) for k, v in overrides.items(): if k != "flavor": merged[k] = v page_parser = PARSERS[page_flavor](debug=self.debug, **merged) else: page_parser = parser layout, dimensions, images, chars, horizontal_text, vertical_text, rotation = ( self._get_layout(page, **layout_kwargs) ) page_parser.prepare_page_parse( self.filepath, layout, dimensions, page.page_idx + 1, images, horizontal_text, vertical_text, rotation, layout_kwargs=layout_kwargs, ) tables = page_parser.extract_tables() return tables