Source code for camelot.parsers.hybrid

"""Implementation of hybrid table parser."""

import numpy as np

from ..utils import bboxes_overlap
from ..utils import boundaries_to_split_lines
from .base import BaseParser
from .lattice import Lattice
from .network import Network


[docs] class Hybrid(BaseParser): """Defines a hybrid parser, leveraging both network and lattice parsers. Parameters ---------- table_regions : list, optional (default: None) List of page regions that may contain tables of the form x1,y1,x2,y2 where (x1, y1) -> left-top and (x2, y2) -> right-bottom in PDF coordinate space. table_areas : list, optional (default: None) List of table area strings of the form x1,y1,x2,y2 where (x1, y1) -> left-top and (x2, y2) -> right-bottom in PDF coordinate space. columns : list, optional (default: None) List of column x-coordinates strings where the coordinates are comma-separated. split_text : bool, optional (default: False) Split text that spans across multiple cells. flag_size : bool, optional (default: False) Flag text based on font size. Useful to detect super/subscripts. Adds <s></s> around flagged text. strip_text : str, optional (default: '') Characters that should be stripped from a string before assigning it to a cell. edge_tol : int, optional (default: 50) Tolerance parameter for extending textedges vertically. row_tol : int, optional (default: 2) Tolerance parameter used to combine text vertically, to generate rows. column_tol : int, optional (default: 0) Tolerance parameter used to combine text horizontally, to generate columns. """ def __init__( self, table_regions=None, table_areas=None, columns=None, flag_size=False, split_text=False, strip_text="", edge_tol=None, row_tol=2, column_tol=0, debug=False, **kwargs, ): super().__init__( "hybrid", table_regions=table_regions, table_areas=table_areas, flag_size=flag_size, split_text=split_text, strip_text=strip_text, debug=debug, ) self.columns = columns # Columns settings impacts the hybrid table self.network_parser = Network( table_regions=table_regions, table_areas=table_areas, columns=columns, flag_size=flag_size, split_text=split_text, strip_text=strip_text, edge_tol=edge_tol, row_tol=row_tol, column_tol=column_tol, debug=debug, ) self.lattice_parser = Lattice( table_regions=table_regions, table_areas=table_areas, flag_size=flag_size, split_text=split_text, strip_text=strip_text, edge_tol=edge_tol, row_tol=row_tol, column_tol=column_tol, debug=debug, )
[docs] def prepare_page_parse( self, filename, layout, dimensions, page_idx, images, horizontal_text, vertical_text, layout_kwargs, ): """Call this method to prepare the page parsing . Parameters ---------- filename : [type] [description] layout : [type] [description] dimensions : [type] [description] page_idx : [type] [description] layout_kwargs : [type] [description] """ super().prepare_page_parse( filename, layout, dimensions, page_idx, images, horizontal_text, vertical_text, layout_kwargs, ) self.network_parser.prepare_page_parse( filename, layout, dimensions, page_idx, images, horizontal_text, vertical_text, layout_kwargs, ) self.lattice_parser.prepare_page_parse( filename, layout, dimensions, page_idx, images, horizontal_text, vertical_text, layout_kwargs, )
def _generate_columns_and_rows(self, bbox, table_idx): parser = self.table_bbox_parses[bbox] return parser._generate_columns_and_rows(bbox, table_idx) def _generate_table(self, table_idx, bbox, cols, rows, **kwargs): parser = self.table_bbox_parses[bbox] table = parser._generate_table(table_idx, bbox, cols, rows, **kwargs) # Because hybrid can inject extraneous splits from both lattice and # network, remove lines / cols that are completely empty. table.df = table.df.replace("", np.nan) table.df = table.df.dropna(axis=0, how="all") table.df = table.df.dropna(axis=1, how="all") table.df = table.df.replace(np.nan, "") table.shape = table.df.shape return table @staticmethod def _augment_boundaries_with_splits(boundaries, splits, tolerance=0): """Augment existing boundaries using provided hard splits. Boundaries: |---| |-| |---------| #noqa RST305 Splits: | | | | #noqa RST305 Augmented: |-------|-----|-------|--| #noqa RST305 """ idx_boundaries = len(boundaries) - 1 idx_splits = len(splits) - 1 previous_boundary = None while True: if idx_splits < 0: # No more splits to incorporate, we're done break split = splits[idx_splits] if idx_boundaries < 0: # Need to insert remaining splits new_boundary = [split, boundaries[0][0]] boundaries.insert(0, new_boundary) idx_splits = idx_splits - 1 else: boundary = boundaries[idx_boundaries] if boundary[1] < split + tolerance: # The lattice column is further to the right of our # col boundary. We move our left boundary to match. boundary[1] = split # And if there was another segment after, we make its # right boundary match as well so that there's no gap if previous_boundary is not None: previous_boundary[0] = split idx_splits = idx_splits - 1 elif boundary[0] > split - tolerance: # Our boundary is fully after the split, move on idx_boundaries = idx_boundaries - 1 previous_boundary = boundary if idx_boundaries < 0: # If this is the last boundary to the left, set its # edge at the split boundary[0] = split idx_splits = idx_splits - 1 else: # The split is inside our boundary: split it new_boundary = [split, boundary[1]] boundaries.insert(idx_boundaries + 1, new_boundary) boundary[1] = split previous_boundary = new_boundary idx_splits = idx_splits - 1 return boundaries def _merge_bbox_analysis(self, lattice_bbox, network_bbox): """Identify splits that were only detected by lattice or by network.""" lattice_parse = self.lattice_parser.table_bbox_parses[lattice_bbox] lattice_cols = lattice_parse["col_anchors"] network_bbox_data = self.network_parser.table_bbox_parses[network_bbox] network_cols_boundaries = network_bbox_data["cols_boundaries"] # Favor network, but complete or adjust its columns based on the # splits identified by lattice. if network_cols_boundaries is None: self.table_bbox_parses[lattice_bbox] = self.lattice_parser else: network_cols_boundaries = self._augment_boundaries_with_splits( network_cols_boundaries, lattice_cols, self.lattice_parser.joint_tol ) augmented_bbox = ( network_cols_boundaries[0][0], min(lattice_bbox[1], network_bbox[1]), network_cols_boundaries[-1][1], max(lattice_bbox[3], network_bbox[3]), ) network_bbox_data["cols_anchors"] = boundaries_to_split_lines( network_cols_boundaries ) del self.network_parser.table_bbox_parses[network_bbox] self.network_parser.table_bbox_parses[augmented_bbox] = network_bbox_data self.table_bbox_parses[augmented_bbox] = self.network_parser def _generate_table_bbox(self): # Collect bboxes from both parsers self.lattice_parser._generate_table_bbox() _lattice_bboxes = sorted( self.lattice_parser.table_bbox_parses, key=lambda bbox: (bbox[0], -bbox[1]) ) self.network_parser._generate_table_bbox() _network_bboxes = sorted( self.network_parser.table_bbox_parses, key=lambda bbox: (bbox[0], -bbox[1]) ) # Merge the data from both processes for lattice_bbox in _lattice_bboxes: merged = False for idx in range(len(_network_bboxes) - 1, -1, -1): network_bbox = _network_bboxes[idx] if not bboxes_overlap(lattice_bbox, network_bbox): continue self._merge_bbox_analysis(lattice_bbox, network_bbox) # network_bbox_data["cols_boundaries"] del _network_bboxes[idx] merged = True if not merged: self.table_bbox_parses[lattice_bbox] = self.lattice_parser # Add the bboxes from network that haven't been merged for network_bbox in _network_bboxes: self.table_bbox_parses[network_bbox] = self.network_parser