Source code for camelot.parsers.lattice

"""Implementation of the Lattice table parser."""

from __future__ import annotations

import os
from typing import Any

import cv2

from ..backends import ImageConversionBackend
from ..image_processing import _line_crossing
from ..image_processing import adaptive_threshold
from ..image_processing import find_contours
from ..image_processing import find_joints
from ..image_processing import find_joints_from_lines
from ..image_processing import find_lines
from ..image_processing import find_lines_from_layout
from ..utils import bbox_from_str
from ..utils import merge_close_lines
from ..utils import scale_image
from ..utils import scale_pdf
from ..utils import segments_in_bbox
from ..utils import text_in_bbox_per_axis
from .base import BaseParser

#: Reject a detected ruled grid whose cells are at least this percent empty.
#: Real data tables are mostly filled; a near-empty grid (whitespace in the
#: 90s) is almost always ruled *noise* — page borders, form rules, header
#: separators — that the contour/joint pipeline mistook for a table. Chosen
#: above the observed real-table whitespace range and below the spurious one
#: (the false positives measured on the ICDAR set sit at 91-95 %).
_GRID_WHITESPACE_REJECT = 90.0


def _line_in_any_bbox(line, bboxes):
    """True if a ruled line's extent overlaps any of the given bboxes.

    Used by the vector engine to keep only lines relevant to the
    user-supplied ``table_regions``. ``line`` is ``(x0, y0, x1, y1)`` and
    each bbox is ``(x0, y0, x1, y1)`` = (left, bottom, right, top), both in
    PDF coords.
    """
    lx0, ly0, lx1, ly1 = line
    lxmin, lxmax = (lx0, lx1) if lx0 <= lx1 else (lx1, lx0)
    lymin, lymax = (ly0, ly1) if ly0 <= ly1 else (ly1, ly0)
    for x0, y0, x1, y1 in bboxes:
        if lxmax >= x0 and lxmin <= x1 and lymax >= y0 and lymin <= y1:
            return True
    return False


def _joints_in_bbox(h_lines, v_lines, bbox):
    """Line crossings (joints) that fall inside ``bbox`` (PDF coords).

    The vector engine's equivalent of the raster ``find_joints`` over a
    user-supplied ``table_areas`` box: every h/v line intersection within
    the box becomes a joint.
    """
    x0, y0, x1, y1 = bbox
    joints = []
    for h in h_lines:
        for v in v_lines:
            pt = _line_crossing(h, v)
            if pt is not None and x0 <= pt[0] <= x1 and y0 <= pt[1] <= y1:
                joints.append(pt)
    return joints


[docs] class Lattice(BaseParser): """Lattice method looks for lines between text to parse the table. Parameters ---------- table_regions : list, optional (default: None) List of page regions that may contain tables of the form x1,y1,x2,y2 where (x1, y1) -> left-top and (x2, y2) -> right-bottom in PDF coordinate space. table_areas : list, optional (default: None) List of table area strings of the form x1,y1,x2,y2 where (x1, y1) -> left-top and (x2, y2) -> right-bottom in PDF coordinate space. process_background : bool, optional (default: False) Process background lines. line_scale : int, optional (default: 15) Line size scaling factor. The larger the value the smaller the detected lines. Making it very large will lead to text being detected as lines. copy_text : list, optional (default: None) {'h', 'v'} Direction in which text in a spanning cell will be copied over. shift_text : list, optional (default: ['l', 't']) {'l', 'r', 't', 'b'} Direction in which text in a spanning cell will flow. split_text : bool, optional (default: False) Split text that spans across multiple cells. flag_size : bool, optional (default: False) Flag text based on font size. Useful to detect super/subscripts. Adds <s></s> around flagged text. strip_text : str, optional (default: '') Characters that should be stripped from a string before assigning it to a cell. line_tol : int, optional (default: 2) Tolerance parameter used to merge close vertical and horizontal lines. joint_tol : int, optional (default: 2) Tolerance parameter used to decide whether the detected lines and points lie close to each other. threshold_blocksize : int, optional (default: 15) Size of a pixel neighborhood that is used to calculate a threshold value for the pixel: 3, 5, 7, and so on. For more information, refer `OpenCV's adaptiveThreshold <https://docs.opencv.org/2.4/modules/imgproc/doc/miscellaneous_transformations.html#adaptivethreshold>`_. threshold_constant : int, optional (default: -2) Constant subtracted from the mean or weighted mean. Normally, it is positive but may be zero or negative as well. For more information, refer `OpenCV's adaptiveThreshold <https://docs.opencv.org/2.4/modules/imgproc/doc/miscellaneous_transformations.html#adaptivethreshold>`_. iterations : int, optional (default: 0) Number of dilation passes applied to close small gaps in the line mask (useful when a table's ruled lines don't quite meet at corners). For more information, refer `OpenCV's dilate <https://docs.opencv.org/2.4/modules/imgproc/doc/filtering.html#dilate>`_. erode_iterations : int, optional (default: 0) Number of erosion passes applied **after** dilation. Set equal to ``iterations`` for a morphological closing (bridges gaps without thickening the mask, which avoids spurious extra rows above/below the detected table). See #363. backend* : str, optional by default "pdfium" The backend to use for converting the PDF to an image so it can be processed by OpenCV. use_fallback* : bool, optional Fallback to another backend if unavailable, by default True resolution : int, optional (default: 300) Resolution used for PDF to PNG conversion. engine : str, optional (default: 'combined') Line-detection engine (lattice only): - ``'combined'`` (default): OpenCV on the rendered page **plus** the PDF's native vector ruled lines unioned into the line masks before contour/joint detection — recovers tables whose rules render faintly. Safe by construction (raster always runs first, vector lines can only add; vector lines are clipped to ``table_regions`` so it never expands a table past the region). - ``'raster'``: OpenCV on the rendered page only (the pre-#763 behaviour). - ``'vector'``: detect tables purely from the PDF's vector ruled lines, skipping rasterisation entirely — fastest, for PDFs whose tables are drawn with real vector strokes (#763). """ def __init__( self, table_regions=None, table_areas=None, process_background=False, line_scale=15, copy_text=None, shift_text=None, split_text=False, flag_size=False, strip_text="", replace_text=None, line_tol=2, joint_tol=2, threshold_blocksize=15, threshold_constant=-2, iterations=0, erode_iterations=0, resolution=300, use_fallback=True, backend="pdfium", engine="combined", **kwargs, ): if engine not in ("raster", "vector", "combined"): raise ValueError( f"engine must be 'raster', 'vector' or 'combined', got {engine!r}" ) self.engine = engine #: Vector ruled lines drawn onto the raster line masks, in image #: coords, accumulated by the 'combined' engine for diagnostics / #: plotting. Populated per page in :meth:`_detect_line_masks`. self._vector_segments: list[tuple[int, int, int, int]] = [] #: Optional {page_no: png_path} of pages already rendered elsewhere #: (e.g. the flavor='auto' probe), reused to skip re-rasterising. self._render_cache: dict[int, str] = {} super().__init__("lattice", replace_text=replace_text) self.table_regions = table_regions self.table_areas = table_areas self.process_background = process_background self.line_scale = line_scale self.copy_text = copy_text self.shift_text = shift_text or ["l", "t"] self.split_text = split_text self.flag_size = flag_size self.strip_text = strip_text self.line_tol = line_tol self.joint_tol = joint_tol self.threshold_blocksize = threshold_blocksize self.threshold_constant = threshold_constant self.iterations = iterations self.erode_iterations = erode_iterations self.resolution = resolution self.use_fallback = use_fallback self.icb = ImageConversionBackend(use_fallback=use_fallback, backend=backend) self.image_path = None self.pdf_image = None @staticmethod def _shift_index( table: Any, r_idx: int, c_idx: int, direction: str ) -> tuple[int, int]: """ Shift the index based on the specified direction. Parameters ---------- table : camelot.core.Table The table structure containing rows and columns. r_idx : int Row index of the cell. c_idx : int Column index of the cell. direction : str Direction in which to shift the index ('l', 'r', 't', 'b'). Returns ------- tuple New row and column indices after the shift. """ if direction == "l": while c_idx > 0 and not table.cells[r_idx][c_idx].left: c_idx -= 1 elif direction == "r": while ( c_idx < len(table.cells[r_idx]) - 1 and not table.cells[r_idx][c_idx].right ): c_idx += 1 elif direction == "t": while r_idx > 0 and not table.cells[r_idx][c_idx].top: r_idx -= 1 elif direction == "b": while r_idx < len(table.cells) - 1 and not table.cells[r_idx][c_idx].bottom: r_idx += 1 return r_idx, c_idx @staticmethod def _reduce_index( table: Any, idx: list[tuple[int, int, str]], shift_text: list[str] ) -> list[tuple[int, int, str]]: """ Reduces the index of a text object if it lies within a spanning cell. Parameters ---------- table : camelot.core.Table The table structure containing rows and columns. idx : list of tuples List of tuples of the form (r_idx, c_idx, text) where r_idx is the row index, c_idx is the column index, and text is the associated text for that index. shift_text : list of str A list containing one or more of the following strings: {'l', 'r', 't', 'b'} to specify the direction in which the text in a spanning cell should flow. 'l' for left, 'r' for right, 't' for top, 'b' for bottom. Returns ------- list of tuples List of tuples of the form (r_idx, c_idx, text) where r_idx and c_idx are the new row and column indices for the text after adjustment. """ indices = [] for r_idx, c_idx, text in idx: # Adjust the index based on specified shift directions for direction in shift_text: r_idx, c_idx = Lattice._shift_index(table, r_idx, c_idx, direction) indices.append((r_idx, c_idx, text)) return indices
[docs] def record_parse_metadata(self, table): """Record data about the origin of the table.""" super().record_parse_metadata(table) # for plotting table._segments = (self.vertical_segments, self.horizontal_segments)
def _reject_table(self, table) -> bool: """Drop near-empty ruled grids — detection noise, not real tables. A genuine ruled table is mostly filled; a grid whose cells are ~all empty (whitespace in the 90s) is page borders / form rules / header separators the contour-joint pipeline mistook for a table. Rejecting these is a precision gate that cuts false positives on pages with no real table (#36). """ return table.whitespace >= _GRID_WHITESPACE_REJECT def _resolve_engine(self): """Return the effective line-detection engine for this page. * ``'combined'`` (default) → raster OpenCV pipeline **plus** the PDF's vector ruled lines unioned into the line masks (#763). Safe by construction: raster always runs first, so vector lines can only *add* to the detected masks, never remove — a page with no usable vector lines yields the same result as ``'raster'``. The strongest detector, hence the default. * ``'raster'`` → the OpenCV pipeline only (the pre-#763 behaviour). * ``'vector'`` → render-free: detect tables from the PDF's vector ruled lines without rasterising the page (fastest) — see :meth:`_generate_table_bbox_vector`. """ return self.engine def _augment_masks_with_vector_lines( self, vertical_mask, horizontal_mask, image_scalers, ): """Union the PDF's vector ruled lines into the raster line masks. The ``'combined'`` engine (#763) draws the layout's vector ruled lines — converted from PDF to image coords with the same :func:`scale_pdf` the rest of this method uses — onto the OpenCV line masks *before* :func:`find_contours` / :func:`find_joints`. A table whose rules are PDF vector strokes (and therefore render faintly / anti-aliased in the raster) is then found just like a crisply-printed one. The masks are mutated in place; the image-coord vector segments are returned so the caller can append them to the segment lists fed to :func:`scale_image`. A ``None`` layout (no vector graphics available) is a no-op. When ``table_regions`` are supplied, the vector lines are clipped to them — mirroring the raster ``find_lines(regions=...)`` mask — so combined never expands a table beyond the user's region (the raster path already respects it; the vector path must too). """ layout = getattr(self, "layout", None) if layout is None: return [], [] v_lines = find_lines_from_layout(layout, direction="vertical") h_lines = find_lines_from_layout(layout, direction="horizontal") if self.table_regions is not None: region_bboxes = [bbox_from_str(r) for r in self.table_regions] v_lines = [ln for ln in v_lines if _line_in_any_bbox(ln, region_bboxes)] h_lines = [ln for ln in h_lines if _line_in_any_bbox(ln, region_bboxes)] v_segments = self._stamp_vector_lines(v_lines, vertical_mask, image_scalers) h_segments = self._stamp_vector_lines(h_lines, horizontal_mask, image_scalers) return v_segments, h_segments @staticmethod def _stamp_vector_lines(pdf_lines, mask, image_scalers): """Draw PDF-coord lines onto ``mask`` (image coords); return segments.""" segments = [] for line in pdf_lines: ix0, iy0, ix1, iy1 = scale_pdf(line, image_scalers) # 2px so a crossing v/h pair shares pixels and np.multiply in # find_joints registers the intersection. cv2.line(mask, (ix0, iy0), (ix1, iy1), 255, 2) segments.append((ix0, iy0, ix1, iy1)) return segments def _detect_line_masks(self, regions, image_scalers, engine): """find_lines for both directions, plus optional vector union. Extracted from :meth:`_generate_table_bbox` so the raster + ``'combined'`` mask-building is shared between the table-areas and auto-detect branches (and to keep that method under the complexity limit). Returns ``(vertical_mask, horizontal_mask, vertical_segments, horizontal_segments)`` in image coords. """ vertical_mask, vertical_segments = find_lines( self.threshold, regions=regions, direction="vertical", line_scale=self.line_scale, iterations=self.iterations, erode_iterations=self.erode_iterations, ) horizontal_mask, horizontal_segments = find_lines( self.threshold, regions=regions, direction="horizontal", line_scale=self.line_scale, iterations=self.iterations, erode_iterations=self.erode_iterations, ) if engine == "combined": v_vec, h_vec = self._augment_masks_with_vector_lines( vertical_mask, horizontal_mask, image_scalers ) self._vector_segments = v_vec + h_vec vertical_segments = vertical_segments + v_vec horizontal_segments = horizontal_segments + h_vec return vertical_mask, horizontal_mask, vertical_segments, horizontal_segments def _generate_table_bbox(self): engine = self._resolve_engine() if engine == "vector": self._generate_table_bbox_vector() return def scale_areas(areas): scaled_areas = [] for area in areas: # Validate (clear error on a malformed / zero-area box instead # of a later ZeroDivision, #63) but keep the caller's raw # corner order: scale_pdf + the (x, y, w, h) form below expect # y1 to be the *top* edge, so bbox_from_str's min/max # normalisation must NOT be applied here. bbox_from_str(area) x1, y1, x2, y2 = (float(v) for v in area.split(",")) x1, y1, x2, y2 = scale_pdf((x1, y1, x2, y2), image_scalers) scaled_areas.append((x1, y1, abs(x2 - x1), abs(y2 - y1))) return scaled_areas cached_image = self._render_cache.get(self.page) if cached_image and os.path.exists(cached_image): # Reuse a page already rendered upstream (flavor='auto' probe) — # skip the second, redundant rasterisation. (#797) self.image_path = cached_image image_input = cached_image else: # Render straight to an in-memory BGR array — no PNG encode/decode # round-trip (#40). Plotting renders its own image separately, so # nothing downstream needs this on disk. self.image_path = None image_input = self.icb.to_array(self.filename, self.page) self.pdf_image, self.threshold = adaptive_threshold( image_input, process_background=self.process_background, blocksize=self.threshold_blocksize, c=self.threshold_constant, rotation=self.rotation, ) image_width = self.pdf_image.shape[1] image_height = self.pdf_image.shape[0] image_width_scaler = image_width / float(self.pdf_width) image_height_scaler = image_height / float(self.pdf_height) pdf_width_scaler = self.pdf_width / float(image_width) pdf_height_scaler = self.pdf_height / float(image_height) image_scalers = (image_width_scaler, image_height_scaler, self.pdf_height) pdf_scalers = (pdf_width_scaler, pdf_height_scaler, image_height) if self.table_areas is None: regions = None if self.table_regions is not None: regions = scale_areas(self.table_regions) vertical_mask, horizontal_mask, vertical_segments, horizontal_segments = ( self._detect_line_masks(regions, image_scalers, engine) ) contours = find_contours(vertical_mask, horizontal_mask) table_bbox = find_joints(contours, vertical_mask, horizontal_mask) else: vertical_mask, horizontal_mask, vertical_segments, horizontal_segments = ( self._detect_line_masks(None, image_scalers, engine) ) areas = scale_areas(self.table_areas) table_bbox = find_joints(areas, vertical_mask, horizontal_mask) [self.table_bbox_parses, self.vertical_segments, self.horizontal_segments] = ( scale_image(table_bbox, vertical_segments, horizontal_segments, pdf_scalers) ) self._compute_table_anchors() def _generate_table_bbox_vector(self): """Detect tables from the PDF's vector ruled lines — no rasterisation. The render-free ``engine='vector'`` path (#763): read ruled lines from the page layout (already in PDF coordinate space), cluster them into table bounding boxes + joints with the vector-native :func:`find_joints_from_lines`, and build the same ``table_bbox_parses`` structure the raster path produces — but skipping the page render, adaptive threshold, and OpenCV morphology entirely. ``pdf_image`` / ``threshold`` stay ``None`` (extraction never needs them; only the debug plots do). """ layout = getattr(self, "layout", None) h_lines = find_lines_from_layout(layout, "horizontal") if layout else [] v_lines = find_lines_from_layout(layout, "vertical") if layout else [] if self.table_regions is not None: regions = [bbox_from_str(r) for r in self.table_regions] h_lines = [ln for ln in h_lines if _line_in_any_bbox(ln, regions)] v_lines = [ln for ln in v_lines if _line_in_any_bbox(ln, regions)] self.pdf_image = None self.threshold = None self.vertical_segments = v_lines self.horizontal_segments = h_lines if self.table_areas is not None: areas = [bbox_from_str(a) for a in self.table_areas] joints_by_bbox = { area: _joints_in_bbox(h_lines, v_lines, area) for area in areas } else: joints_by_bbox = find_joints_from_lines(h_lines, v_lines) self.table_bbox_parses = { bbox: {"joints": joints} for bbox, joints in joints_by_bbox.items() } self._compute_table_anchors() def _compute_table_anchors(self): """Derive col/row anchors from each table's joints (raster + vector). Operates in place on ``self.table_bbox_parses`` — shared by both the raster/combined and the vector paths, which differ only in how the ``{bbox: {"joints": [...]}}`` mapping is built. """ line_tol = self.line_tol for bbox, parse in self.table_bbox_parses.items(): joints = parse["joints"] # Merge x coordinates that are close together. Sort the joints, # make them a list of lists (instead of sets). joints_normalized = list( map(lambda x: list(x), sorted(joints, key=lambda j: -j[0])) ) for idx in range(1, len(joints_normalized)): x_left, x_right = ( joints_normalized[idx - 1][0], joints_normalized[idx][0], ) if x_left - line_tol <= x_right <= x_left + line_tol: joints_normalized[idx][0] = x_left # Merge y coordinates that are close together joints_normalized = sorted(joints_normalized, key=lambda j: -j[1]) for idx in range(1, len(joints_normalized)): y_bottom, y_top = ( joints_normalized[idx - 1][1], joints_normalized[idx][1], ) if y_bottom - line_tol <= y_top <= y_bottom + line_tol: joints_normalized[idx][1] = y_bottom parse["joints_normalized"] = joints_normalized cols = list(map(lambda coords: coords[0], joints)) cols.extend([bbox[0], bbox[2]]) rows = list(map(lambda coords: coords[1], joints)) rows.extend([bbox[1], bbox[3]]) # sort horizontal and vertical segments cols = merge_close_lines(sorted(cols), line_tol=self.line_tol) rows = merge_close_lines(sorted(rows, reverse=True), line_tol=self.line_tol) parse["col_anchors"] = cols parse["row_anchors"] = rows def _generate_columns_and_rows(self, bbox, user_cols): # select elements which lie within table_bbox v_s, h_s = segments_in_bbox( bbox, self.vertical_segments, self.horizontal_segments ) self.t_bbox = text_in_bbox_per_axis( bbox, self.horizontal_text, self.vertical_text ) parse = self.table_bbox_parses[bbox] cols = [ (parse["col_anchors"][i], parse["col_anchors"][i + 1]) for i in range(0, len(parse["col_anchors"]) - 1) ] rows = [ (parse["row_anchors"][i], parse["row_anchors"][i + 1]) for i in range(0, len(parse["row_anchors"]) - 1) ] return cols, rows, v_s, h_s def _generate_table(self, table_idx, bbox, cols, rows, **kwargs): v_s = kwargs.get("v_s") h_s = kwargs.get("h_s") if v_s is None or h_s is None: raise ValueError(f"No segments found on {self.rootname}") table = self._initialize_new_table(table_idx, bbox, cols, rows) # set table edges to True using ver+hor lines table = table.set_edges(v_s, h_s, joint_tol=self.joint_tol) # set table border edges to True table = table.set_border() self.record_parse_metadata(table) return table