import copy
import locale
import logging
import os
import sys
import warnings
import numpy as np
import pandas as pd
from ..backends.image_conversion import BACKENDS
from ..core import Table
from ..image_processing import adaptive_threshold
from ..image_processing import find_contours
from ..image_processing import find_joints
from ..image_processing import find_lines
from ..utils import compute_accuracy
from ..utils import compute_whitespace
from ..utils import get_table_index
from ..utils import merge_close_lines
from ..utils import scale_image
from ..utils import scale_pdf
from ..utils import segments_in_bbox
from ..utils import text_in_bbox
from .base import BaseParser
logger = logging.getLogger("camelot")
[docs]
class Lattice(BaseParser):
"""Lattice method of parsing looks for lines between text
to parse the table.
Parameters
----------
table_regions : list, optional (default: None)
List of page regions that may contain tables of the form x1,y1,x2,y2
where (x1, y1) -> left-top and (x2, y2) -> right-bottom
in PDF coordinate space.
table_areas : list, optional (default: None)
List of table area strings of the form x1,y1,x2,y2
where (x1, y1) -> left-top and (x2, y2) -> right-bottom
in PDF coordinate space.
process_background : bool, optional (default: False)
Process background lines.
line_scale : int, optional (default: 15)
Line size scaling factor. The larger the value the smaller
the detected lines. Making it very large will lead to text
being detected as lines.
copy_text : list, optional (default: None)
{'h', 'v'}
Direction in which text in a spanning cell will be copied
over.
shift_text : list, optional (default: ['l', 't'])
{'l', 'r', 't', 'b'}
Direction in which text in a spanning cell will flow.
split_text : bool, optional (default: False)
Split text that spans across multiple cells.
flag_size : bool, optional (default: False)
Flag text based on font size. Useful to detect
super/subscripts. Adds <s></s> around flagged text.
strip_text : str, optional (default: '')
Characters that should be stripped from a string before
assigning it to a cell.
line_tol : int, optional (default: 2)
Tolerance parameter used to merge close vertical and horizontal
lines.
joint_tol : int, optional (default: 2)
Tolerance parameter used to decide whether the detected lines
and points lie close to each other.
threshold_blocksize : int, optional (default: 15)
Size of a pixel neighborhood that is used to calculate a
threshold value for the pixel: 3, 5, 7, and so on.
For more information, refer `OpenCV's adaptiveThreshold <https://docs.opencv.org/2.4/modules/imgproc/doc/miscellaneous_transformations.html#adaptivethreshold>`_.
threshold_constant : int, optional (default: -2)
Constant subtracted from the mean or weighted mean.
Normally, it is positive but may be zero or negative as well.
For more information, refer `OpenCV's adaptiveThreshold <https://docs.opencv.org/2.4/modules/imgproc/doc/miscellaneous_transformations.html#adaptivethreshold>`_.
iterations : int, optional (default: 0)
Number of times for erosion/dilation is applied.
For more information, refer `OpenCV's dilate <https://docs.opencv.org/2.4/modules/imgproc/doc/filtering.html#dilate>`_.
resolution : int, optional (default: 300)
Resolution used for PDF to PNG conversion.
"""
def __init__(
self,
table_regions=None,
table_areas=None,
process_background=False,
line_scale=15,
copy_text=None,
shift_text=["l", "t"],
split_text=False,
flag_size=False,
strip_text="",
line_tol=2,
joint_tol=2,
threshold_blocksize=15,
threshold_constant=-2,
iterations=0,
resolution=300,
backend="ghostscript",
**kwargs,
):
self.table_regions = table_regions
self.table_areas = table_areas
self.process_background = process_background
self.line_scale = line_scale
self.copy_text = copy_text
self.shift_text = shift_text
self.split_text = split_text
self.flag_size = flag_size
self.strip_text = strip_text
self.line_tol = line_tol
self.joint_tol = joint_tol
self.threshold_blocksize = threshold_blocksize
self.threshold_constant = threshold_constant
self.iterations = iterations
self.resolution = resolution
self.backend = Lattice._get_backend(backend)
@staticmethod
def _get_backend(backend):
def implements_convert():
methods = [
method for method in dir(backend) if method.startswith("__") is False
]
return "convert" in methods
if isinstance(backend, str):
if backend not in BACKENDS.keys():
raise NotImplementedError(
f"Unknown backend '{backend}' specified. Please use either 'poppler' or 'ghostscript'."
)
if backend == "ghostscript":
warnings.warn(
"'ghostscript' will be replaced by 'poppler' as the default image conversion"
" backend in v0.12.0. You can try out 'poppler' with backend='poppler'.",
DeprecationWarning,
)
return BACKENDS[backend]()
else:
if not implements_convert():
raise NotImplementedError(
f"'{backend}' must implement a 'convert' method"
)
return backend
@staticmethod
def _reduce_index(t, idx, shift_text):
"""Reduces index of a text object if it lies within a spanning
cell.
Parameters
----------
table : camelot.core.Table
idx : list
List of tuples of the form (r_idx, c_idx, text).
shift_text : list
{'l', 'r', 't', 'b'}
Select one or more strings from above and pass them as a
list to specify where the text in a spanning cell should
flow.
Returns
-------
indices : list
List of tuples of the form (r_idx, c_idx, text) where
r_idx and c_idx are new row and column indices for text.
"""
indices = []
for r_idx, c_idx, text in idx:
for d in shift_text:
if d == "l":
if t.cells[r_idx][c_idx].hspan:
while not t.cells[r_idx][c_idx].left:
c_idx -= 1
if d == "r":
if t.cells[r_idx][c_idx].hspan:
while not t.cells[r_idx][c_idx].right:
c_idx += 1
if d == "t":
if t.cells[r_idx][c_idx].vspan:
while not t.cells[r_idx][c_idx].top:
r_idx -= 1
if d == "b":
if t.cells[r_idx][c_idx].vspan:
while not t.cells[r_idx][c_idx].bottom:
r_idx += 1
indices.append((r_idx, c_idx, text))
return indices
@staticmethod
def _copy_spanning_text(t, copy_text=None):
"""Copies over text in empty spanning cells.
Parameters
----------
t : camelot.core.Table
copy_text : list, optional (default: None)
{'h', 'v'}
Select one or more strings from above and pass them as a list
to specify the direction in which text should be copied over
when a cell spans multiple rows or columns.
Returns
-------
t : camelot.core.Table
"""
for f in copy_text:
if f == "h":
for i in range(len(t.cells)):
for j in range(len(t.cells[i])):
if t.cells[i][j].text.strip() == "":
if t.cells[i][j].hspan and not t.cells[i][j].left:
t.cells[i][j].text = t.cells[i][j - 1].text
elif f == "v":
for i in range(len(t.cells)):
for j in range(len(t.cells[i])):
if t.cells[i][j].text.strip() == "":
if t.cells[i][j].vspan and not t.cells[i][j].top:
t.cells[i][j].text = t.cells[i - 1][j].text
return t
def _generate_table_bbox(self):
def scale_areas(areas):
scaled_areas = []
for area in areas:
x1, y1, x2, y2 = area.split(",")
x1 = float(x1)
y1 = float(y1)
x2 = float(x2)
y2 = float(y2)
x1, y1, x2, y2 = scale_pdf((x1, y1, x2, y2), image_scalers)
scaled_areas.append((x1, y1, abs(x2 - x1), abs(y2 - y1)))
return scaled_areas
self.image, self.threshold = adaptive_threshold(
self.imagename,
process_background=self.process_background,
blocksize=self.threshold_blocksize,
c=self.threshold_constant,
)
image_width = self.image.shape[1]
image_height = self.image.shape[0]
image_width_scaler = image_width / float(self.pdf_width)
image_height_scaler = image_height / float(self.pdf_height)
pdf_width_scaler = self.pdf_width / float(image_width)
pdf_height_scaler = self.pdf_height / float(image_height)
image_scalers = (image_width_scaler, image_height_scaler, self.pdf_height)
pdf_scalers = (pdf_width_scaler, pdf_height_scaler, image_height)
if self.table_areas is None:
regions = None
if self.table_regions is not None:
regions = scale_areas(self.table_regions)
vertical_mask, vertical_segments = find_lines(
self.threshold,
regions=regions,
direction="vertical",
line_scale=self.line_scale,
iterations=self.iterations,
)
horizontal_mask, horizontal_segments = find_lines(
self.threshold,
regions=regions,
direction="horizontal",
line_scale=self.line_scale,
iterations=self.iterations,
)
contours = find_contours(vertical_mask, horizontal_mask)
table_bbox = find_joints(contours, vertical_mask, horizontal_mask)
else:
vertical_mask, vertical_segments = find_lines(
self.threshold,
direction="vertical",
line_scale=self.line_scale,
iterations=self.iterations,
)
horizontal_mask, horizontal_segments = find_lines(
self.threshold,
direction="horizontal",
line_scale=self.line_scale,
iterations=self.iterations,
)
areas = scale_areas(self.table_areas)
table_bbox = find_joints(areas, vertical_mask, horizontal_mask)
self.table_bbox_unscaled = copy.deepcopy(table_bbox)
self.table_bbox, self.vertical_segments, self.horizontal_segments = scale_image(
table_bbox, vertical_segments, horizontal_segments, pdf_scalers
)
def _generate_columns_and_rows(self, table_idx, tk):
# select elements which lie within table_bbox
t_bbox = {}
v_s, h_s = segments_in_bbox(
tk, self.vertical_segments, self.horizontal_segments
)
t_bbox["horizontal"] = text_in_bbox(tk, self.horizontal_text)
t_bbox["vertical"] = text_in_bbox(tk, self.vertical_text)
t_bbox["horizontal"].sort(key=lambda x: (-x.y0, x.x0))
t_bbox["vertical"].sort(key=lambda x: (x.x0, -x.y0))
self.t_bbox = t_bbox
cols, rows = zip(*self.table_bbox[tk])
cols, rows = list(cols), list(rows)
cols.extend([tk[0], tk[2]])
rows.extend([tk[1], tk[3]])
# sort horizontal and vertical segments
cols = merge_close_lines(sorted(cols), line_tol=self.line_tol)
rows = merge_close_lines(sorted(rows, reverse=True), line_tol=self.line_tol)
# make grid using x and y coord of shortlisted rows and cols
cols = [(cols[i], cols[i + 1]) for i in range(0, len(cols) - 1)]
rows = [(rows[i], rows[i + 1]) for i in range(0, len(rows) - 1)]
return cols, rows, v_s, h_s
def _generate_table(self, table_idx, cols, rows, **kwargs):
v_s = kwargs.get("v_s")
h_s = kwargs.get("h_s")
if v_s is None or h_s is None:
raise ValueError(f"No segments found on {self.rootname}")
table = Table(cols, rows)
# set table edges to True using ver+hor lines
table = table.set_edges(v_s, h_s, joint_tol=self.joint_tol)
# set table border edges to True
table = table.set_border()
# set spanning cells to True
table = table.set_span()
pos_errors = []
# TODO: have a single list in place of two directional ones?
# sorted on x-coordinate based on reading order i.e. LTR or RTL
for direction in ["vertical", "horizontal"]:
for t in self.t_bbox[direction]:
indices, error = get_table_index(
table,
t,
direction,
split_text=self.split_text,
flag_size=self.flag_size,
strip_text=self.strip_text,
)
if indices[0][:2] != (-1, -1):
pos_errors.append(error)
indices = Lattice._reduce_index(
table, indices, shift_text=self.shift_text
)
for r_idx, c_idx, text in indices:
table.cells[r_idx][c_idx].text = text
accuracy = compute_accuracy([[100, pos_errors]])
if self.copy_text is not None:
table = Lattice._copy_spanning_text(table, copy_text=self.copy_text)
data = table.data
table.df = pd.DataFrame(data)
table.shape = table.df.shape
whitespace = compute_whitespace(data)
table.flavor = "lattice"
table.accuracy = accuracy
table.whitespace = whitespace
table.order = table_idx + 1
table.page = int(os.path.basename(self.rootname).replace("page-", ""))
# for plotting
_text = []
_text.extend([(t.x0, t.y0, t.x1, t.y1) for t in self.horizontal_text])
_text.extend([(t.x0, t.y0, t.x1, t.y1) for t in self.vertical_text])
table._text = _text
table._image = (self.image, self.table_bbox_unscaled)
table._segments = (self.vertical_segments, self.horizontal_segments)
table._textedges = None
return table
def extract_tables(self, filename, suppress_stdout=False, layout_kwargs={}):
self._generate_layout(filename, layout_kwargs)
if not suppress_stdout:
logger.info(f"Processing {os.path.basename(self.rootname)}")
if not self.horizontal_text:
if self.images:
warnings.warn(
"{} is image-based, camelot only works on"
" text-based pages.".format(os.path.basename(self.rootname))
)
else:
warnings.warn(f"No tables found on {os.path.basename(self.rootname)}")
return []
self.backend.convert(self.filename, self.imagename)
self._generate_table_bbox()
_tables = []
# sort tables based on y-coord
for table_idx, tk in enumerate(
sorted(self.table_bbox.keys(), key=lambda x: x[1], reverse=True)
):
cols, rows, v_s, h_s = self._generate_columns_and_rows(table_idx, tk)
table = self._generate_table(table_idx, cols, rows, v_s=v_s, h_s=h_s)
table._bbox = tk
_tables.append(table)
return _tables