"""Implementation of the Lattice table parser."""
from __future__ import annotations
import os
from typing import Any
import cv2
from ..backends import ImageConversionBackend
from ..image_processing import _line_crossing
from ..image_processing import adaptive_threshold
from ..image_processing import find_contours
from ..image_processing import find_joints
from ..image_processing import find_joints_from_lines
from ..image_processing import find_lines
from ..image_processing import find_lines_from_layout
from ..utils import bbox_from_str
from ..utils import merge_close_lines
from ..utils import scale_image
from ..utils import scale_pdf
from ..utils import segments_in_bbox
from ..utils import text_in_bbox_per_axis
from .base import BaseParser
#: Reject a detected ruled grid whose cells are at least this percent empty.
#: Real data tables are mostly filled; a near-empty grid (whitespace in the
#: 90s) is almost always ruled *noise* — page borders, form rules, header
#: separators — that the contour/joint pipeline mistook for a table. Chosen
#: above the observed real-table whitespace range and below the spurious one
#: (the false positives measured on the ICDAR set sit at 91-95 %).
_GRID_WHITESPACE_REJECT = 90.0
def _line_in_any_bbox(line, bboxes):
"""True if a ruled line's extent overlaps any of the given bboxes.
Used by the vector engine to keep only lines relevant to the
user-supplied ``table_regions``. ``line`` is ``(x0, y0, x1, y1)`` and
each bbox is ``(x0, y0, x1, y1)`` = (left, bottom, right, top), both in
PDF coords.
"""
lx0, ly0, lx1, ly1 = line
lxmin, lxmax = (lx0, lx1) if lx0 <= lx1 else (lx1, lx0)
lymin, lymax = (ly0, ly1) if ly0 <= ly1 else (ly1, ly0)
for x0, y0, x1, y1 in bboxes:
if lxmax >= x0 and lxmin <= x1 and lymax >= y0 and lymin <= y1:
return True
return False
def _joints_in_bbox(h_lines, v_lines, bbox):
"""Line crossings (joints) that fall inside ``bbox`` (PDF coords).
The vector engine's equivalent of the raster ``find_joints`` over a
user-supplied ``table_areas`` box: every h/v line intersection within
the box becomes a joint.
"""
x0, y0, x1, y1 = bbox
joints = []
for h in h_lines:
for v in v_lines:
pt = _line_crossing(h, v)
if pt is not None and x0 <= pt[0] <= x1 and y0 <= pt[1] <= y1:
joints.append(pt)
return joints
[docs]
class Lattice(BaseParser):
"""Lattice method looks for lines between text to parse the table.
Parameters
----------
table_regions : list, optional (default: None)
List of page regions that may contain tables of the form x1,y1,x2,y2
where (x1, y1) -> left-top and (x2, y2) -> right-bottom
in PDF coordinate space.
table_areas : list, optional (default: None)
List of table area strings of the form x1,y1,x2,y2
where (x1, y1) -> left-top and (x2, y2) -> right-bottom
in PDF coordinate space.
process_background : bool, optional (default: False)
Process background lines.
line_scale : int, optional (default: 15)
Line size scaling factor. The larger the value the smaller
the detected lines. Making it very large will lead to text
being detected as lines.
copy_text : list, optional (default: None)
{'h', 'v'}
Direction in which text in a spanning cell will be copied
over.
shift_text : list, optional (default: ['l', 't'])
{'l', 'r', 't', 'b'}
Direction in which text in a spanning cell will flow.
split_text : bool, optional (default: False)
Split text that spans across multiple cells.
flag_size : bool, optional (default: False)
Flag text based on font size. Useful to detect
super/subscripts. Adds <s></s> around flagged text.
strip_text : str, optional (default: '')
Characters that should be stripped from a string before
assigning it to a cell.
line_tol : int, optional (default: 2)
Tolerance parameter used to merge close vertical and horizontal
lines.
joint_tol : int, optional (default: 2)
Tolerance parameter used to decide whether the detected lines
and points lie close to each other.
threshold_blocksize : int, optional (default: 15)
Size of a pixel neighborhood that is used to calculate a
threshold value for the pixel: 3, 5, 7, and so on.
For more information, refer `OpenCV's adaptiveThreshold
<https://docs.opencv.org/2.4/modules/imgproc/doc/miscellaneous_transformations.html#adaptivethreshold>`_.
threshold_constant : int, optional (default: -2)
Constant subtracted from the mean or weighted mean.
Normally, it is positive but may be zero or negative as well.
For more information, refer `OpenCV's adaptiveThreshold
<https://docs.opencv.org/2.4/modules/imgproc/doc/miscellaneous_transformations.html#adaptivethreshold>`_.
iterations : int, optional (default: 0)
Number of dilation passes applied to close small gaps in the
line mask (useful when a table's ruled lines don't quite meet
at corners).
For more information, refer `OpenCV's dilate <https://docs.opencv.org/2.4/modules/imgproc/doc/filtering.html#dilate>`_.
erode_iterations : int, optional (default: 0)
Number of erosion passes applied **after** dilation. Set equal
to ``iterations`` for a morphological closing (bridges gaps
without thickening the mask, which avoids spurious extra
rows above/below the detected table). See #363.
backend* : str, optional by default "pdfium"
The backend to use for converting the PDF to an image so it can be processed by OpenCV.
use_fallback* : bool, optional
Fallback to another backend if unavailable, by default True
resolution : int, optional (default: 300)
Resolution used for PDF to PNG conversion.
engine : str, optional (default: 'combined')
Line-detection engine (lattice only):
- ``'combined'`` (default): OpenCV on the rendered page **plus**
the PDF's native vector ruled lines unioned into the line masks
before contour/joint detection — recovers tables whose rules
render faintly. Safe by construction (raster always runs first,
vector lines can only add; vector lines are clipped to
``table_regions`` so it never expands a table past the region).
- ``'raster'``: OpenCV on the rendered page only (the pre-#763
behaviour).
- ``'vector'``: detect tables purely from the PDF's vector ruled
lines, skipping rasterisation entirely — fastest, for PDFs whose
tables are drawn with real vector strokes (#763).
"""
def __init__(
self,
table_regions=None,
table_areas=None,
process_background=False,
line_scale=15,
copy_text=None,
shift_text=None,
split_text=False,
flag_size=False,
strip_text="",
replace_text=None,
line_tol=2,
joint_tol=2,
threshold_blocksize=15,
threshold_constant=-2,
iterations=0,
erode_iterations=0,
resolution=300,
use_fallback=True,
backend="pdfium",
engine="combined",
**kwargs,
):
if engine not in ("raster", "vector", "combined"):
raise ValueError(
f"engine must be 'raster', 'vector' or 'combined', got {engine!r}"
)
self.engine = engine
#: Vector ruled lines drawn onto the raster line masks, in image
#: coords, accumulated by the 'combined' engine for diagnostics /
#: plotting. Populated per page in :meth:`_detect_line_masks`.
self._vector_segments: list[tuple[int, int, int, int]] = []
#: Optional {page_no: png_path} of pages already rendered elsewhere
#: (e.g. the flavor='auto' probe), reused to skip re-rasterising.
self._render_cache: dict[int, str] = {}
super().__init__("lattice", replace_text=replace_text)
self.table_regions = table_regions
self.table_areas = table_areas
self.process_background = process_background
self.line_scale = line_scale
self.copy_text = copy_text
self.shift_text = shift_text or ["l", "t"]
self.split_text = split_text
self.flag_size = flag_size
self.strip_text = strip_text
self.line_tol = line_tol
self.joint_tol = joint_tol
self.threshold_blocksize = threshold_blocksize
self.threshold_constant = threshold_constant
self.iterations = iterations
self.erode_iterations = erode_iterations
self.resolution = resolution
self.use_fallback = use_fallback
self.icb = ImageConversionBackend(use_fallback=use_fallback, backend=backend)
self.image_path = None
self.pdf_image = None
@staticmethod
def _shift_index(
table: Any, r_idx: int, c_idx: int, direction: str
) -> tuple[int, int]:
"""
Shift the index based on the specified direction.
Parameters
----------
table : camelot.core.Table
The table structure containing rows and columns.
r_idx : int
Row index of the cell.
c_idx : int
Column index of the cell.
direction : str
Direction in which to shift the index ('l', 'r', 't', 'b').
Returns
-------
tuple
New row and column indices after the shift.
"""
if direction == "l":
while c_idx > 0 and not table.cells[r_idx][c_idx].left:
c_idx -= 1
elif direction == "r":
while (
c_idx < len(table.cells[r_idx]) - 1
and not table.cells[r_idx][c_idx].right
):
c_idx += 1
elif direction == "t":
while r_idx > 0 and not table.cells[r_idx][c_idx].top:
r_idx -= 1
elif direction == "b":
while r_idx < len(table.cells) - 1 and not table.cells[r_idx][c_idx].bottom:
r_idx += 1
return r_idx, c_idx
@staticmethod
def _reduce_index(
table: Any, idx: list[tuple[int, int, str]], shift_text: list[str]
) -> list[tuple[int, int, str]]:
"""
Reduces the index of a text object if it lies within a spanning cell.
Parameters
----------
table : camelot.core.Table
The table structure containing rows and columns.
idx : list of tuples
List of tuples of the form (r_idx, c_idx, text) where r_idx
is the row index, c_idx is the column index, and text is the
associated text for that index.
shift_text : list of str
A list containing one or more of the following strings:
{'l', 'r', 't', 'b'} to specify the direction in which the
text in a spanning cell should flow. 'l' for left, 'r' for right,
't' for top, 'b' for bottom.
Returns
-------
list of tuples
List of tuples of the form (r_idx, c_idx, text) where r_idx
and c_idx are the new row and column indices for the text after
adjustment.
"""
indices = []
for r_idx, c_idx, text in idx:
# Adjust the index based on specified shift directions
for direction in shift_text:
r_idx, c_idx = Lattice._shift_index(table, r_idx, c_idx, direction)
indices.append((r_idx, c_idx, text))
return indices
def _reject_table(self, table) -> bool:
"""Drop near-empty ruled grids — detection noise, not real tables.
A genuine ruled table is mostly filled; a grid whose cells are
~all empty (whitespace in the 90s) is page borders / form rules /
header separators the contour-joint pipeline mistook for a table.
Rejecting these is a precision gate that cuts false positives on
pages with no real table (#36).
"""
return table.whitespace >= _GRID_WHITESPACE_REJECT
def _resolve_engine(self):
"""Return the effective line-detection engine for this page.
* ``'combined'`` (default) → raster OpenCV pipeline **plus** the
PDF's vector ruled lines unioned into the line masks (#763). Safe
by construction: raster always runs first, so vector lines can
only *add* to the detected masks, never remove — a page with no
usable vector lines yields the same result as ``'raster'``. The
strongest detector, hence the default.
* ``'raster'`` → the OpenCV pipeline only (the pre-#763 behaviour).
* ``'vector'`` → render-free: detect tables from the PDF's vector
ruled lines without rasterising the page (fastest) — see
:meth:`_generate_table_bbox_vector`.
"""
return self.engine
def _augment_masks_with_vector_lines(
self,
vertical_mask,
horizontal_mask,
image_scalers,
):
"""Union the PDF's vector ruled lines into the raster line masks.
The ``'combined'`` engine (#763) draws the layout's vector ruled
lines — converted from PDF to image coords with the same
:func:`scale_pdf` the rest of this method uses — onto the OpenCV
line masks *before* :func:`find_contours` / :func:`find_joints`.
A table whose rules are PDF vector strokes (and therefore render
faintly / anti-aliased in the raster) is then found just like a
crisply-printed one. The masks are mutated in place; the
image-coord vector segments are returned so the caller can append
them to the segment lists fed to :func:`scale_image`.
A ``None`` layout (no vector graphics available) is a no-op.
When ``table_regions`` are supplied, the vector lines are clipped to
them — mirroring the raster ``find_lines(regions=...)`` mask — so
combined never expands a table beyond the user's region (the raster
path already respects it; the vector path must too).
"""
layout = getattr(self, "layout", None)
if layout is None:
return [], []
v_lines = find_lines_from_layout(layout, direction="vertical")
h_lines = find_lines_from_layout(layout, direction="horizontal")
if self.table_regions is not None:
region_bboxes = [bbox_from_str(r) for r in self.table_regions]
v_lines = [ln for ln in v_lines if _line_in_any_bbox(ln, region_bboxes)]
h_lines = [ln for ln in h_lines if _line_in_any_bbox(ln, region_bboxes)]
v_segments = self._stamp_vector_lines(v_lines, vertical_mask, image_scalers)
h_segments = self._stamp_vector_lines(h_lines, horizontal_mask, image_scalers)
return v_segments, h_segments
@staticmethod
def _stamp_vector_lines(pdf_lines, mask, image_scalers):
"""Draw PDF-coord lines onto ``mask`` (image coords); return segments."""
segments = []
for line in pdf_lines:
ix0, iy0, ix1, iy1 = scale_pdf(line, image_scalers)
# 2px so a crossing v/h pair shares pixels and np.multiply in
# find_joints registers the intersection.
cv2.line(mask, (ix0, iy0), (ix1, iy1), 255, 2)
segments.append((ix0, iy0, ix1, iy1))
return segments
def _detect_line_masks(self, regions, image_scalers, engine):
"""find_lines for both directions, plus optional vector union.
Extracted from :meth:`_generate_table_bbox` so the raster +
``'combined'`` mask-building is shared between the table-areas and
auto-detect branches (and to keep that method under the
complexity limit). Returns
``(vertical_mask, horizontal_mask, vertical_segments,
horizontal_segments)`` in image coords.
"""
vertical_mask, vertical_segments = find_lines(
self.threshold,
regions=regions,
direction="vertical",
line_scale=self.line_scale,
iterations=self.iterations,
erode_iterations=self.erode_iterations,
)
horizontal_mask, horizontal_segments = find_lines(
self.threshold,
regions=regions,
direction="horizontal",
line_scale=self.line_scale,
iterations=self.iterations,
erode_iterations=self.erode_iterations,
)
if engine == "combined":
v_vec, h_vec = self._augment_masks_with_vector_lines(
vertical_mask, horizontal_mask, image_scalers
)
self._vector_segments = v_vec + h_vec
vertical_segments = vertical_segments + v_vec
horizontal_segments = horizontal_segments + h_vec
return vertical_mask, horizontal_mask, vertical_segments, horizontal_segments
def _generate_table_bbox(self):
engine = self._resolve_engine()
if engine == "vector":
self._generate_table_bbox_vector()
return
def scale_areas(areas):
scaled_areas = []
for area in areas:
# Validate (clear error on a malformed / zero-area box instead
# of a later ZeroDivision, #63) but keep the caller's raw
# corner order: scale_pdf + the (x, y, w, h) form below expect
# y1 to be the *top* edge, so bbox_from_str's min/max
# normalisation must NOT be applied here.
bbox_from_str(area)
x1, y1, x2, y2 = (float(v) for v in area.split(","))
x1, y1, x2, y2 = scale_pdf((x1, y1, x2, y2), image_scalers)
scaled_areas.append((x1, y1, abs(x2 - x1), abs(y2 - y1)))
return scaled_areas
cached_image = self._render_cache.get(self.page)
if cached_image and os.path.exists(cached_image):
# Reuse a page already rendered upstream (flavor='auto' probe) —
# skip the second, redundant rasterisation. (#797)
self.image_path = cached_image
image_input = cached_image
else:
# Render straight to an in-memory BGR array — no PNG encode/decode
# round-trip (#40). Plotting renders its own image separately, so
# nothing downstream needs this on disk.
self.image_path = None
image_input = self.icb.to_array(self.filename, self.page)
self.pdf_image, self.threshold = adaptive_threshold(
image_input,
process_background=self.process_background,
blocksize=self.threshold_blocksize,
c=self.threshold_constant,
rotation=self.rotation,
)
image_width = self.pdf_image.shape[1]
image_height = self.pdf_image.shape[0]
image_width_scaler = image_width / float(self.pdf_width)
image_height_scaler = image_height / float(self.pdf_height)
pdf_width_scaler = self.pdf_width / float(image_width)
pdf_height_scaler = self.pdf_height / float(image_height)
image_scalers = (image_width_scaler, image_height_scaler, self.pdf_height)
pdf_scalers = (pdf_width_scaler, pdf_height_scaler, image_height)
if self.table_areas is None:
regions = None
if self.table_regions is not None:
regions = scale_areas(self.table_regions)
vertical_mask, horizontal_mask, vertical_segments, horizontal_segments = (
self._detect_line_masks(regions, image_scalers, engine)
)
contours = find_contours(vertical_mask, horizontal_mask)
table_bbox = find_joints(contours, vertical_mask, horizontal_mask)
else:
vertical_mask, horizontal_mask, vertical_segments, horizontal_segments = (
self._detect_line_masks(None, image_scalers, engine)
)
areas = scale_areas(self.table_areas)
table_bbox = find_joints(areas, vertical_mask, horizontal_mask)
[self.table_bbox_parses, self.vertical_segments, self.horizontal_segments] = (
scale_image(table_bbox, vertical_segments, horizontal_segments, pdf_scalers)
)
self._compute_table_anchors()
def _generate_table_bbox_vector(self):
"""Detect tables from the PDF's vector ruled lines — no rasterisation.
The render-free ``engine='vector'`` path (#763): read ruled lines
from the page layout (already in PDF coordinate space), cluster
them into table bounding boxes + joints with the vector-native
:func:`find_joints_from_lines`, and build the same
``table_bbox_parses`` structure the raster path produces — but
skipping the page render, adaptive threshold, and OpenCV
morphology entirely. ``pdf_image`` / ``threshold`` stay ``None``
(extraction never needs them; only the debug plots do).
"""
layout = getattr(self, "layout", None)
h_lines = find_lines_from_layout(layout, "horizontal") if layout else []
v_lines = find_lines_from_layout(layout, "vertical") if layout else []
if self.table_regions is not None:
regions = [bbox_from_str(r) for r in self.table_regions]
h_lines = [ln for ln in h_lines if _line_in_any_bbox(ln, regions)]
v_lines = [ln for ln in v_lines if _line_in_any_bbox(ln, regions)]
self.pdf_image = None
self.threshold = None
self.vertical_segments = v_lines
self.horizontal_segments = h_lines
if self.table_areas is not None:
areas = [bbox_from_str(a) for a in self.table_areas]
joints_by_bbox = {
area: _joints_in_bbox(h_lines, v_lines, area) for area in areas
}
else:
joints_by_bbox = find_joints_from_lines(h_lines, v_lines)
self.table_bbox_parses = {
bbox: {"joints": joints} for bbox, joints in joints_by_bbox.items()
}
self._compute_table_anchors()
def _compute_table_anchors(self):
"""Derive col/row anchors from each table's joints (raster + vector).
Operates in place on ``self.table_bbox_parses`` — shared by both
the raster/combined and the vector paths, which differ only in how
the ``{bbox: {"joints": [...]}}`` mapping is built.
"""
line_tol = self.line_tol
for bbox, parse in self.table_bbox_parses.items():
joints = parse["joints"]
# Merge x coordinates that are close together. Sort the joints,
# make them a list of lists (instead of sets).
joints_normalized = list(
map(lambda x: list(x), sorted(joints, key=lambda j: -j[0]))
)
for idx in range(1, len(joints_normalized)):
x_left, x_right = (
joints_normalized[idx - 1][0],
joints_normalized[idx][0],
)
if x_left - line_tol <= x_right <= x_left + line_tol:
joints_normalized[idx][0] = x_left
# Merge y coordinates that are close together
joints_normalized = sorted(joints_normalized, key=lambda j: -j[1])
for idx in range(1, len(joints_normalized)):
y_bottom, y_top = (
joints_normalized[idx - 1][1],
joints_normalized[idx][1],
)
if y_bottom - line_tol <= y_top <= y_bottom + line_tol:
joints_normalized[idx][1] = y_bottom
parse["joints_normalized"] = joints_normalized
cols = list(map(lambda coords: coords[0], joints))
cols.extend([bbox[0], bbox[2]])
rows = list(map(lambda coords: coords[1], joints))
rows.extend([bbox[1], bbox[3]])
# sort horizontal and vertical segments
cols = merge_close_lines(sorted(cols), line_tol=self.line_tol)
rows = merge_close_lines(sorted(rows, reverse=True), line_tol=self.line_tol)
parse["col_anchors"] = cols
parse["row_anchors"] = rows
def _generate_columns_and_rows(self, bbox, user_cols):
# select elements which lie within table_bbox
v_s, h_s = segments_in_bbox(
bbox, self.vertical_segments, self.horizontal_segments
)
self.t_bbox = text_in_bbox_per_axis(
bbox, self.horizontal_text, self.vertical_text
)
parse = self.table_bbox_parses[bbox]
cols = [
(parse["col_anchors"][i], parse["col_anchors"][i + 1])
for i in range(0, len(parse["col_anchors"]) - 1)
]
rows = [
(parse["row_anchors"][i], parse["row_anchors"][i + 1])
for i in range(0, len(parse["row_anchors"]) - 1)
]
return cols, rows, v_s, h_s
def _generate_table(self, table_idx, bbox, cols, rows, **kwargs):
v_s = kwargs.get("v_s")
h_s = kwargs.get("h_s")
if v_s is None or h_s is None:
raise ValueError(f"No segments found on {self.rootname}")
table = self._initialize_new_table(table_idx, bbox, cols, rows)
# set table edges to True using ver+hor lines
table = table.set_edges(v_s, h_s, joint_tol=self.joint_tol)
# set table border edges to True
table = table.set_border()
self.record_parse_metadata(table)
return table