-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodule_pdf_ocr.py
100 lines (81 loc) · 3.33 KB
/
module_pdf_ocr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import fitz
import io
from PIL import Image
from tesserocr import PyTessBaseAPI, PSM
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
import tessdata
from pathlib import Path
import sys
import os
def setup_environment():
total_cores = os.cpu_count()
threads_to_use = max(4, total_cores - 8)
script_dir = Path(__file__).resolve().parent
tessdata_path = script_dir / 'share' / 'tessdata'
os.environ['TESSDATA_PREFIX'] = str(tessdata_path)
return threads_to_use, tessdata_path
def convert_page_to_image(page, zoom=2):
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
img_data = pix.tobytes("png")
return Image.open(io.BytesIO(img_data))
def remove_line_break_hyphens(text):
"""
Removes hyphenated line breaks from a block of text by joining words split across lines.
"""
lines = text.split('\n')
processed_lines = []
i = 0
while i < len(lines):
current_line = lines[i].rstrip()
if current_line.endswith('-') and i < len(lines) - 1:
next_line = lines[i+1].lstrip()
if next_line and next_line[0].islower():
joined_word = current_line[:-1] + next_line.split(' ', 1)[0]
remaining_next_line = ' '.join(next_line.split(' ')[1:])
processed_lines.append(joined_word)
if remaining_next_line:
lines[i+1] = remaining_next_line
else:
i += 1
else:
processed_lines.append(current_line)
else:
processed_lines.append(current_line)
i += 1
return '\n'.join(processed_lines)
def process_page(page_num, page, tessdata_path):
image = convert_page_to_image(page)
with PyTessBaseAPI(psm=PSM.AUTO, path=str(tessdata_path)) as api:
api.SetImage(image)
text = api.GetUTF8Text()
processed_text = remove_line_break_hyphens(text)
return page_num, processed_text
def process_pdf(pdf_path):
threads_to_use, tessdata_path = setup_environment()
pdf_document = fitz.open(str(pdf_path))
with ThreadPoolExecutor(max_workers=threads_to_use) as executor:
future_to_page = {executor.submit(process_page, page_num, pdf_document[page_num], tessdata_path): page_num
for page_num in range(len(pdf_document))}
results = {}
for future in as_completed(future_to_page):
page_num, processed_text = future.result()
results[page_num] = processed_text
pdf_document.close()
full_text = '\n'.join([results[page_num] for page_num in sorted(results.keys())])
return full_text
def open_file(file_path):
try:
os.startfile(file_path)
except OSError:
print("Error: No default viewer detected or failed to open the file.")
if __name__ == "__main__":
# insert path to pdf to process when running as a standalone script
pdf_path = Path(r"[PATH TO PDF TO PROCESS]")
ocr_text = process_pdf(pdf_path)
output_file = pdf_path.stem + "_ocr.txt"
with open(output_file, 'w', encoding='utf-8') as f:
f.write(ocr_text)
print(f"Results have been saved to {output_file}")
open_file(output_file)