Skip to content

Sparknlp-1158 Adding Parameters Options to PDF Reader #14562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,12 @@ jobs:
with:
distribution: 'temurin'
java-version: '8'
cache: 'sbt'
- name: Install Python 3.7
- name: Install Python 3.8
uses: actions/setup-python@v2
with:
python-version: 3.7.7
python-version: 3.8
architecture: x64
- name: Install Python packages (Python 3.7)
- name: Install Python packages (Python 3.8)
run: |
python -m pip install --upgrade pip
pip install pyspark==3.4.0 numpy pytest
Expand All @@ -70,7 +69,7 @@ jobs:
- name: Test Spark NLP in Python - Apache Spark 3.4.x
run: |
cd python
python3.7 -m pytest -v -m fast
python3.8 -m pytest -v -m fast
spark35:
if: "! contains(toJSON(github.event.commits.*.message), '[skip test]')"
runs-on: macos-13
Expand All @@ -85,7 +84,6 @@ jobs:
with:
distribution: 'adopt'
java-version: '8'
cache: 'sbt'
- name: Install Python 3.10
uses: actions/setup-python@v2
with:
Expand Down Expand Up @@ -121,13 +119,12 @@ jobs:
with:
distribution: 'adopt'
java-version: '8'
cache: 'sbt'
- name: Install Python 3.7
- name: Install Python 3.8
uses: actions/setup-python@v2
with:
python-version: 3.7.7
python-version: 3.8
architecture: x64
- name: Install Python packages (Python 3.7)
- name: Install Python packages (Python 3.8)
run: |
python -m pip install --upgrade pip
pip install pyspark==3.3.1 numpy pytest
Expand All @@ -141,5 +138,5 @@ jobs:
- name: Test Spark NLP in Python - Apache Spark 3.3.x
run: |
cd python
python3.7 -m pytest -v -m fast
python3.8 -m pytest -v -m fast

19 changes: 19 additions & 0 deletions python/sparknlp/reader/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2017-2025 John Snow Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum

class TextStripperType(Enum):
"""Text Stripper Type"""
PDF_TEXT_STRIPPER = "PDFTextStripper"
PDF_LAYOUT_TEXT_STRIPPER = "PDFLayoutTextStripper"
48 changes: 47 additions & 1 deletion python/sparknlp/reader/pdf_to_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from pyspark.ml.util import JavaMLReadable, JavaMLWritable
from pyspark.ml.wrapper import JavaTransformer

from sparknlp.reader.enums import TextStripperType


class PdfToText(JavaTransformer, HasInputCol, HasOutputCol,
JavaMLReadable, JavaMLWritable):
Expand All @@ -25,6 +27,22 @@ class PdfToText(JavaTransformer, HasInputCol, HasOutputCol,
"Force to store splitted pdf.",
typeConverter=TypeConverters.toBoolean)

splitPage = Param(Params._dummy(), "splitPage",
"Param for enable/disable splitting document per page",
typeConverter=TypeConverters.toBoolean)

textStripper = Param(Params._dummy(), "textStripper",
"Text stripper type used for output layout and formatting",
typeConverter=TypeConverters.toString)

sort = Param(Params._dummy(), "sort",
"Param for enable/disable sort lines",
typeConverter=TypeConverters.toBoolean)

onlyPageNum = Param(Params._dummy(), "onlyPageNum",
"Force to extract only number of pages",
typeConverter=TypeConverters.toBoolean)

@keyword_only
def __init__(self):
"""
Expand All @@ -33,7 +51,6 @@ def __init__(self):
super(PdfToText, self).__init__()
self._java_obj = self._new_java_obj("com.johnsnowlabs.reader.PdfToText", self.uid)


def setInputCol(self, value):
"""
Sets the value of :py:attr:`inputCol`.
Expand Down Expand Up @@ -63,3 +80,32 @@ def setStoreSplittedPdf(self, value):
Sets the value of :py:attr:`storeSplittedPdf`.
"""
return self._set(storeSplittedPdf=value)

def setSplitPage(self, value):
"""
Sets the value of :py:attr:`splitPage`.
"""
return self._set(splitPage=value)

def setOnlyPageNum(self, value):
"""
Sets the value of :py:attr:`onlyPageNum`.
"""
return self._set(onlyPageNum=value)

def setTextStripper(self, value):
"""
Sets the value of :py:attr:`textStripper`.
"""
if isinstance(value, TextStripperType):
value = value.value
if value not in [i.value for i in TextStripperType]:
type_value = type(value)
raise ValueError(f"Param textStripper must be a 'TextStripperType' enum but got {type_value}.")
return self._set(textStripper=str(value))

def setSort(self, value):
"""
Sets the value of :py:attr:`sort`.
"""
return self._set(sort=value)
122 changes: 109 additions & 13 deletions src/main/scala/com/johnsnowlabs/reader/PdfToText.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import org.apache.spark.internal.Logging
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.param.{BooleanParam, IntParam, Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, posexplode_outer, udf}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset}

import java.io.ByteArrayOutputStream
import scala.util.{Failure, Success, Try}

class PdfToText(override val uid: String)
Expand Down Expand Up @@ -62,11 +63,27 @@ class PdfToText(override val uid: String)
}

final val pageNumCol = new Param[String](this, "pageNumCol", "Page number output column name.")
final val splitPage = new BooleanParam(
this,
"splitPage",
"Enable/disable splitting per page to identify page numbers and improve performance.")
final val originCol =
new Param[String](this, "originCol", "Input column name with original path of file.")
final val partitionNum = new IntParam(this, "partitionNum", "Number of partitions.")
final val onlyPageNum = new BooleanParam(this, "onlyPageNum", "Extract only page numbers.")
final val storeSplittedPdf =
new BooleanParam(this, "storeSplittedPdf", "Force to store bytes content of splitted pdf.")
final val textStripper = new Param[String](
this,
"textStripper",
"Text stripper type used for output layout and formatting")
final val sort = new BooleanParam(this, "sort", "Enable/disable sorting content on the page.")

/** @group setParam */
def setPageNumCol(value: String): this.type = set(pageNumCol, value)

/** @group setParam */
def setSplitPage(value: Boolean): this.type = set(splitPage, value)

/** @group getParam */
def setOriginCol(value: String): this.type = set(originCol, value)
Expand All @@ -80,16 +97,29 @@ class PdfToText(override val uid: String)
/** @group getParam */
def setPartitionNum(value: Int): this.type = set(partitionNum, value)

/** @group setParam */
def setOnlyPageNum(value: Boolean): this.type = set(onlyPageNum, value)

/** @group setParam */
def setStoreSplittedPdf(value: Boolean): this.type = set(storeSplittedPdf, value)

/** @group setParam */
def setTextStripper(value: String): this.type = set(textStripper, value)

/** @group setParam */
def setSort(value: Boolean): this.type = set(sort, value)

setDefault(
inputCol -> "content",
outputCol -> "text",
pageNumCol -> "pagenum",
originCol -> "path",
partitionNum -> 0,
storeSplittedPdf -> false)
onlyPageNum -> false,
storeSplittedPdf -> false,
splitPage -> true,
sort -> false,
textStripper -> TextStripperType.PDF_TEXT_STRIPPER)

private def transformUDF: UserDefinedFunction = udf(
(path: String, content: Array[Byte]) => {
Expand All @@ -99,7 +129,14 @@ class PdfToText(override val uid: String)

private def doProcess(
content: Array[Byte]): Seq[(String, Int, Int, Array[Byte], String, Int)] = {
val pagesTry = Try(pdfToText(content, $(storeSplittedPdf)))
val pagesTry = Try(
pdfToText(
content,
$(onlyPageNum),
$(splitPage),
$(storeSplittedPdf),
$(sort),
$(textStripper)))

pagesTry match {
case Failure(_) =>
Expand Down Expand Up @@ -157,23 +194,49 @@ trait PdfToTextTrait extends Logging with PdfUtils {
/*
* extracts a text layer from a PDF.
*/
private def extractText(document: => PDDocument, startPage: Int, endPage: Int): Seq[String] = {
val pdfTextStripper = new PDFTextStripper
private def extractText(
document: => PDDocument,
startPage: Int,
endPage: Int,
sort: Boolean,
textStripper: String): Seq[String] = {
val pdfTextStripper: PDFTextStripper = textStripper match {
case TextStripperType.PDF_LAYOUT_TEXT_STRIPPER =>
val stripper = new PDFLayoutTextStripper()
stripper.setIsSort(sort)
stripper
case _ => new PDFTextStripper
}
pdfTextStripper.setStartPage(startPage + 1)
pdfTextStripper.setEndPage(endPage + 1)
Seq(pdfTextStripper.getText(document))
}

def pdfToText(
content: Array[Byte],
storeSplittedPdf: Boolean): Seq[(String, Int, Int, Array[Byte], String, Int)] = {
onlyPageNum: Boolean,
splitPage: Boolean,
storeSplittedPdf: Boolean,
sort: Boolean,
textStripper: String): Seq[(String, Int, Int, Array[Byte], String, Int)] = {
val validPdf = checkAndFixPdf(content)
val pdfDoc = PDDocument.load(validPdf)
val numPages = pdfDoc.getNumberOfPages
log.info(s"Number of pages ${numPages}")
require(numPages >= 1, "pdf input stream cannot be empty")

val result = pdfboxMethod(pdfDoc, 0, numPages - 1, content, storeSplittedPdf)
val result = if (!onlyPageNum) {
pdfboxMethod(
pdfDoc,
0,
numPages - 1,
content,
splitPage,
storeSplittedPdf,
sort,
textStripper)
} else {
Range(1, numPages + 1).map(pageNum => ("", 1, 1, null, null, pageNum))
}
pdfDoc.close()
log.info("Close pdf")
result
Expand All @@ -184,10 +247,43 @@ trait PdfToTextTrait extends Logging with PdfUtils {
startPage: Int,
endPage: Int,
content: Array[Byte],
storeSplittedPdf: Boolean): Seq[(String, Int, Int, Array[Byte], String, Int)] = {
val text = extractText(pdfDoc, startPage, endPage).mkString(System.lineSeparator())
val heightDimension = pdfDoc.getPage(startPage).getMediaBox.getHeight.toInt
val widthDimension = pdfDoc.getPage(startPage).getMediaBox.getWidth.toInt
Seq((text, heightDimension, widthDimension, if (storeSplittedPdf) content else null, null, 0))
splitPage: Boolean,
storeSplittedPdf: Boolean,
sort: Boolean,
textStripper: String): Seq[(String, Int, Int, Array[Byte], String, Int)] = {
lazy val out: ByteArrayOutputStream = new ByteArrayOutputStream()
if (splitPage)
Range(startPage, endPage + 1).flatMap(pagenum =>
extractText(pdfDoc, pagenum, pagenum, sort, textStripper)
.map { text =>
out.reset()
val outputDocument = new PDDocument()
val page = pdfDoc.getPage(pagenum)
val splittedPdf = if (storeSplittedPdf) {
outputDocument.importPage(page)
outputDocument.save(out)
outputDocument.close()
out.toByteArray
} else null
(
text,
page.getMediaBox.getHeight.toInt,
page.getMediaBox.getWidth.toInt,
splittedPdf,
null,
pagenum)
})
else {
val text = extractText(pdfDoc, startPage, endPage, sort, textStripper).mkString(
System.lineSeparator())
val heightDimension = pdfDoc.getPage(startPage).getMediaBox.getHeight.toInt
val widthDimension = pdfDoc.getPage(startPage).getMediaBox.getWidth.toInt
Seq(
(text, heightDimension, widthDimension, if (storeSplittedPdf) content else null, null, 0))
}
}
}

object PdfToText extends DefaultParamsReadable[PdfToText] {
override def load(path: String): PdfToText = super.load(path)
}
Loading