Source code for camelot.parsers.hybrid

"""Implementation of hybrid table parser."""

from ..utils import bboxes_overlap
from ..utils import boundaries_to_split_lines
from ..utils import text_in_bbox
from .base import BaseParser
from .lattice import Lattice
from .network import Network

#: Minimum fraction of a lattice grid's (cols x rows) crossing points that
#: must carry an actual joint for the grid to count as "complete" — a real
#: lattice of ruled lines rather than a couple of stray rules. Below this,
#: hybrid keeps augmenting lattice's boundaries with network's text splits.
_LATTICE_GRID_COVERAGE = 0.5

#: A complete grid's row count must stay *commensurate* with the
#: column-aligned text rows inside it: at least ``_LATTICE_ROW_MATCH`` of them
#: (else lattice is a partially-ruled fragment dropping unruled rows — the
#: us-008 / us-033 failure mode) and at most ``_LATTICE_ROW_CEIL`` times them
#: (else lattice's interior rules don't separate real multi-column rows —
#: spurious rules or a complex multi-level header that network handles
#: better, e.g. the vertical_header fixture). Outside the band, hybrid keeps
#: the network-augmented path. See :meth:`Hybrid._count_column_aligned_rows`.
_LATTICE_ROW_MATCH = 0.55
_LATTICE_ROW_CEIL = 1.5

#: The row-match band is only trusted when a grid has at least this many
#: column-aligned rows — below it the count is too small to be meaningful
#: (e.g. a list-like ruled table with one multi-column row), so the band is
#: skipped and the grid kept on its joint-coverage merit alone.
_MIN_ALIGNED_ROWS = 3


[docs] class Hybrid(BaseParser): """Defines a hybrid parser, leveraging both network and lattice parsers. Parameters ---------- table_regions : list, optional (default: None) List of page regions that may contain tables of the form x1,y1,x2,y2 where (x1, y1) -> left-top and (x2, y2) -> right-bottom in PDF coordinate space. table_areas : list, optional (default: None) List of table area strings of the form x1,y1,x2,y2 where (x1, y1) -> left-top and (x2, y2) -> right-bottom in PDF coordinate space. columns : list, optional (default: None) List of column x-coordinates strings where the coordinates are comma-separated. split_text : bool, optional (default: False) Split text that spans across multiple cells. flag_size : bool, optional (default: False) Flag text based on font size. Useful to detect super/subscripts. Adds <s></s> around flagged text. strip_text : str or sequence of str, optional (default: '') Characters or substrings to strip from each cell. A ``str`` strips per-character; a list/tuple of ``str`` strips whole substrings (#484). edge_tol : int, optional (default: 50) Tolerance parameter for extending textedges vertically. row_tol : int, optional (default: 2) Tolerance parameter used to combine text vertically, to generate rows. column_tol : int, optional (default: 0) Tolerance parameter used to combine text horizontally, to generate columns. engine : str, optional (default: 'combined') Line-detection engine for hybrid's **lattice half** (the network half is text-based and unaffected): - ``'combined'`` (default): OpenCV on the rendered page **plus** the PDF's native vector ruled lines unioned in — recovers faintly-rendered rules. Matches the ``flavor='lattice'`` default. - ``'raster'``: detect ruled lines with OpenCV only (pre-#763). - ``'vector'``: detect ruled lines **straight from the PDF's vector graphics, skipping rasterisation and OpenCV entirely** — the render-free hybrid (network text-edge alignment merged with vector ruled lines) for partial-ruled / borderless tables at roughly an order of magnitude less time than the raster path. (#39) """ def __init__( self, table_regions=None, table_areas=None, columns=None, flag_size=False, split_text=False, strip_text="", replace_text=None, edge_tol=None, row_tol=2, column_tol=0, debug=False, engine="combined", **kwargs, ): super().__init__( "hybrid", table_regions=table_regions, table_areas=table_areas, flag_size=flag_size, split_text=split_text, strip_text=strip_text, replace_text=replace_text, debug=debug, ) self.columns = columns # Columns settings impacts the hybrid table self.network_parser = Network( table_regions=table_regions, table_areas=table_areas, columns=columns, flag_size=flag_size, split_text=split_text, strip_text=strip_text, replace_text=replace_text, edge_tol=edge_tol, row_tol=row_tol, column_tol=column_tol, debug=debug, ) self.lattice_parser = Lattice( table_regions=table_regions, table_areas=table_areas, flag_size=flag_size, split_text=split_text, strip_text=strip_text, replace_text=replace_text, edge_tol=edge_tol, row_tol=row_tol, column_tol=column_tol, debug=debug, # Forward the line-detection engine so flavor='hybrid' can drive # its lattice half with 'raster', 'combined' (raster + PDF vector # lines, #763) or the render-free 'vector' engine (#39). engine=engine, )
[docs] def prepare_page_parse( self, filename, layout, dimensions, page_idx, images, horizontal_text, vertical_text, rotation, layout_kwargs, ): """Call this method to prepare the page parsing . Parameters ---------- filename : [type] [description] layout : [type] [description] dimensions : [type] [description] page_idx : [type] [description] layout_kwargs : [type] [description] """ super().prepare_page_parse( filename, layout, dimensions, page_idx, images, horizontal_text, vertical_text, rotation, layout_kwargs, ) self.network_parser.prepare_page_parse( filename, layout, dimensions, page_idx, images, horizontal_text, vertical_text, rotation, layout_kwargs, ) self.lattice_parser.prepare_page_parse( filename, layout, dimensions, page_idx, images, horizontal_text, vertical_text, rotation, layout_kwargs, )
def _generate_columns_and_rows(self, bbox, table_idx): parser = self.table_bbox_parses[bbox] return parser._generate_columns_and_rows(bbox, table_idx) def _generate_table(self, table_idx, bbox, cols, rows, **kwargs): parser = self.table_bbox_parses[bbox] table = parser._generate_table(table_idx, bbox, cols, rows, **kwargs) # Because hybrid can inject extraneous splits from both lattice and # network, remove lines / cols that are completely empty. # drop empty rows table.df = table.df.loc[~(table.df == "").all(axis=1)] # drop empty columns table.df = table.df.loc[:, ~(table.df == "").all(axis=0)] table.shape = table.df.shape return table def _reject_table(self, table) -> bool: """Drop tables left empty after the empty-row/col purge. The render-free ``engine='vector'`` half reads ruled lines straight from the PDF's vector graphics, which include decorative page borders and form rules. Those can raise a "grid" with no text inside; once :meth:`_generate_table` strips its all-empty rows and columns nothing is left, and an empty table would otherwise leak out as a spurious detection. (The rendered raster/combined halves don't hit this — the OpenCV pipeline doesn't pick those rules up — so their output is unchanged.) """ return table.df.empty or table.shape[0] == 0 or table.shape[1] == 0 @staticmethod def _augment_boundaries_with_splits(boundaries, splits, tolerance=0): """Augment existing boundaries using provided hard splits. Boundaries: |---| |-| |---------| #noqa RST305 Splits: | | | | #noqa RST305 Augmented: |-------|-----|-------|--| #noqa RST305 """ idx_boundaries = len(boundaries) - 1 idx_splits = len(splits) - 1 previous_boundary = None while True: if idx_splits < 0: # No more splits to incorporate, we're done break split = splits[idx_splits] if idx_boundaries < 0: # Need to insert remaining splits new_boundary = [split, boundaries[0][0]] boundaries.insert(0, new_boundary) idx_splits = idx_splits - 1 else: boundary = boundaries[idx_boundaries] if boundary[1] < split + tolerance: # The lattice column is further to the right of our # col boundary. We move our left boundary to match. boundary[1] = split # And if there was another segment after, we make its # right boundary match as well so that there's no gap if previous_boundary is not None: previous_boundary[0] = split idx_splits = idx_splits - 1 elif boundary[0] > split - tolerance: # Our boundary is fully after the split, move on idx_boundaries = idx_boundaries - 1 previous_boundary = boundary if idx_boundaries < 0: # If this is the last boundary to the left, set its # edge at the split boundary[0] = split idx_splits = idx_splits - 1 else: # The split is inside our boundary: split it new_boundary = [split, boundary[1]] boundaries.insert(idx_boundaries + 1, new_boundary) boundary[1] = split previous_boundary = new_boundary idx_splits = idx_splits - 1 return boundaries def _count_column_aligned_rows(self, lattice_bbox, col_anchors): """Count text rows inside ``lattice_bbox`` that populate >=2 columns. Used to tell a complete ruled grid from a *partially*-ruled fragment. A horizontal rule lattice missed leaves text in **several** columns at the same y; a multi-line cell only adds extra text in **one** column. So clustering the bbox's textlines by y and counting the clusters that span at least two of lattice's columns approximates the table's true row count — robust to multi-line cells (which inflate a naive y-count). """ tls = [ t for t in text_in_bbox( lattice_bbox, self.horizontal_text + self.vertical_text ) if t.get_text().strip() ] if not tls: return 0 def column_of(textline): xc = (textline.x0 + textline.x1) / 2.0 for i in range(len(col_anchors) - 1): if col_anchors[i] <= xc <= col_anchors[i + 1]: return i return None rows = self.network_parser._group_rows(tls, row_tol=self.network_parser.row_tol) aligned = 0 for row in rows: cols = {column_of(t) for t in row} cols.discard(None) if len(cols) >= 2: aligned += 1 return aligned def _lattice_grid_is_complete(self, lattice_bbox): """Whether lattice already resolved a full ruled grid for this bbox. The combine ``_augment_boundaries_with_splits`` *unions* network's text-derived column splits onto lattice's. On a partial / borderless table that recovers columns lattice couldn't see — the niche hybrid is for. But on an **already-complete** ruled grid the union only adds spurious splits and, because the merged bbox is then parsed by the *network* parser (text-grouped rows), it also throws away lattice's exact ruled rows — the over-split that sinks fully-ruled docs (#38). So gate the augmentation. A grid counts as complete only when: 1. lattice found genuine ruled lines in **both** directions (interior column *and* row anchors, not just the two bbox edges); 2. its joints actually cover that grid (a real lattice of crossings, not a couple of stray rules); and 3. lattice's row lines account for the table's text rows — i.e. it is not a *partially*-ruled fragment whose unruled rows lattice would silently drop (the us-008 / us-033 failure mode). Network handles those better, so they stay on the augmented path. Complete grids are routed to the lattice parser as-is; incomplete ones keep the network-augmented path. """ parse = self.lattice_parser.table_bbox_parses[lattice_bbox] col_anchors = parse["col_anchors"] row_anchors = parse["row_anchors"] # Need at least one interior line in *each* direction (more than the # two bbox edges) — otherwise lattice has no grid of its own to keep. if len(col_anchors) <= 2 or len(row_anchors) <= 2: return False joints_normalized = parse.get("joints_normalized", []) unique_joints = {(round(j[0], 1), round(j[1], 1)) for j in joints_normalized} grid_points = len(col_anchors) * len(row_anchors) coverage = len(unique_joints) / grid_points if grid_points else 0.0 if coverage < _LATTICE_GRID_COVERAGE: return False # Lattice's row count must stay commensurate with the column-aligned # text rows: too few => partially-ruled fragment (us-008); too many # => interior rules that don't separate real rows (vertical headers). lattice_rows = len(row_anchors) - 1 aligned_rows = self._count_column_aligned_rows(lattice_bbox, col_anchors) if aligned_rows >= _MIN_ALIGNED_ROWS and not ( _LATTICE_ROW_MATCH * aligned_rows <= lattice_rows <= _LATTICE_ROW_CEIL * aligned_rows ): return False return True def _merge_bbox_analysis(self, lattice_bbox, network_bbox): """Identify splits that were only detected by lattice or by network.""" lattice_parse = self.lattice_parser.table_bbox_parses[lattice_bbox] lattice_cols = lattice_parse["col_anchors"] network_bbox_data = self.network_parser.table_bbox_parses[network_bbox] network_cols_boundaries = network_bbox_data["cols_boundaries"] # Favor network, but complete or adjust its columns based on the # splits identified by lattice. if network_cols_boundaries is None: self.table_bbox_parses[lattice_bbox] = self.lattice_parser elif self._lattice_grid_is_complete(lattice_bbox): # Lattice already has a full ruled grid here — keep it intact # instead of unioning network's splits on top (#38 over-split). self.table_bbox_parses[lattice_bbox] = self.lattice_parser else: network_cols_boundaries = self._augment_boundaries_with_splits( network_cols_boundaries, lattice_cols, self.lattice_parser.joint_tol ) augmented_bbox = ( network_cols_boundaries[0][0], min(lattice_bbox[1], network_bbox[1]), network_cols_boundaries[-1][1], max(lattice_bbox[3], network_bbox[3]), ) network_bbox_data["cols_anchors"] = boundaries_to_split_lines( network_cols_boundaries ) del self.network_parser.table_bbox_parses[network_bbox] self.network_parser.table_bbox_parses[augmented_bbox] = network_bbox_data self.table_bbox_parses[augmented_bbox] = self.network_parser def _generate_table_bbox(self): self.table_bbox_parses = {} # Collect bboxes from both parsers self.lattice_parser._generate_table_bbox() _lattice_bboxes = sorted( self.lattice_parser.table_bbox_parses, key=lambda bbox: (bbox[0], -bbox[1]) ) self.network_parser._generate_table_bbox() _network_bboxes = sorted( self.network_parser.table_bbox_parses, key=lambda bbox: (bbox[0], -bbox[1]) ) # Merge the data from both processes for lattice_bbox in _lattice_bboxes: merged = False for idx in range(len(_network_bboxes) - 1, -1, -1): network_bbox = _network_bboxes[idx] if not bboxes_overlap(lattice_bbox, network_bbox): continue self._merge_bbox_analysis(lattice_bbox, network_bbox) # network_bbox_data["cols_boundaries"] del _network_bboxes[idx] merged = True if not merged: self.table_bbox_parses[lattice_bbox] = self.lattice_parser # Add the bboxes from network that haven't been merged for network_bbox in _network_bboxes: self.table_bbox_parses[network_bbox] = self.network_parser