649 lines
23 KiB
Python
649 lines
23 KiB
Python
# Copyright (c) 2006, Mathieu Fenniak
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are
|
|
# met:
|
|
#
|
|
# * Redistributions of source code must retain the above copyright notice,
|
|
# this list of conditions and the following disclaimer.
|
|
# * Redistributions in binary form must reproduce the above copyright notice,
|
|
# this list of conditions and the following disclaimer in the documentation
|
|
# and/or other materials provided with the distribution.
|
|
# * The name of the author may not be used to endorse or promote products
|
|
# derived from this software without specific prior written permission.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
import codecs
|
|
import decimal
|
|
import hashlib
|
|
import re
|
|
from binascii import unhexlify
|
|
from typing import Any, Callable, List, Optional, Tuple, Union, cast
|
|
|
|
from .._codecs import _pdfdoc_encoding_rev
|
|
from .._protocols import PdfObjectProtocol, PdfWriterProtocol
|
|
from .._utils import (
|
|
StreamType,
|
|
b_,
|
|
deprecation_with_replacement,
|
|
hex_str,
|
|
hexencode,
|
|
logger_warning,
|
|
read_non_whitespace,
|
|
read_until_regex,
|
|
str_,
|
|
)
|
|
from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError
|
|
|
|
__author__ = "Mathieu Fenniak"
|
|
__author_email__ = "biziqe@mathieu.fenniak.net"
|
|
|
|
|
|
class PdfObject(PdfObjectProtocol):
|
|
# function for calculating a hash value
|
|
hash_func: Callable[..., "hashlib._Hash"] = hashlib.sha1
|
|
indirect_reference: Optional["IndirectObject"]
|
|
|
|
def hash_value_data(self) -> bytes:
|
|
return ("%s" % self).encode()
|
|
|
|
def hash_value(self) -> bytes:
|
|
return (
|
|
"%s:%s"
|
|
% (
|
|
self.__class__.__name__,
|
|
self.hash_func(self.hash_value_data()).hexdigest(),
|
|
)
|
|
).encode()
|
|
|
|
def clone(
|
|
self,
|
|
pdf_dest: PdfWriterProtocol,
|
|
force_duplicate: bool = False,
|
|
ignore_fields: Union[Tuple[str, ...], List[str], None] = (),
|
|
) -> "PdfObject":
|
|
"""
|
|
clone object into pdf_dest (PdfWriterProtocol which is an interface for PdfWriter)
|
|
force_duplicate: in standard if the object has been already cloned and reference,
|
|
the copy is returned; when force_duplicate == True, a new copy is always performed
|
|
ignore_fields : list/tuple of Fields names (for dictionaries that will be ignored during cloning (apply also to childs duplication)
|
|
in standard, clone function call _reference_clone (see _reference)
|
|
"""
|
|
raise Exception("clone PdfObject")
|
|
|
|
def _reference_clone(
|
|
self, clone: Any, pdf_dest: PdfWriterProtocol
|
|
) -> PdfObjectProtocol:
|
|
"""
|
|
reference the object within the _objects of pdf_dest only if
|
|
indirect_reference attribute exists (which means the objects
|
|
was already identified in xref/xobjstm)
|
|
if object has been already referenced do nothing
|
|
"""
|
|
try:
|
|
if clone.indirect_reference.pdf == pdf_dest:
|
|
return clone
|
|
except Exception:
|
|
pass
|
|
if hasattr(self, "indirect_reference"):
|
|
ind = self.indirect_reference
|
|
i = len(pdf_dest._objects) + 1
|
|
if ind is not None:
|
|
if id(ind.pdf) not in pdf_dest._id_translated:
|
|
pdf_dest._id_translated[id(ind.pdf)] = {}
|
|
if ind.idnum in pdf_dest._id_translated[id(ind.pdf)]:
|
|
obj = pdf_dest.get_object(
|
|
pdf_dest._id_translated[id(ind.pdf)][ind.idnum]
|
|
)
|
|
assert obj is not None
|
|
return obj
|
|
pdf_dest._id_translated[id(ind.pdf)][ind.idnum] = i
|
|
pdf_dest._objects.append(clone)
|
|
clone.indirect_reference = IndirectObject(i, 0, pdf_dest)
|
|
return clone
|
|
|
|
def get_object(self) -> Optional["PdfObject"]:
|
|
"""Resolve indirect references."""
|
|
return self
|
|
|
|
def getObject(self) -> Optional["PdfObject"]: # pragma: no cover
|
|
deprecation_with_replacement("getObject", "get_object", "3.0.0")
|
|
return self.get_object()
|
|
|
|
def write_to_stream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None:
|
|
raise NotImplementedError
|
|
|
|
|
|
class NullObject(PdfObject):
|
|
def clone(
|
|
self,
|
|
pdf_dest: PdfWriterProtocol,
|
|
force_duplicate: bool = False,
|
|
ignore_fields: Union[Tuple[str, ...], List[str], None] = (),
|
|
) -> "NullObject":
|
|
"""clone object into pdf_dest"""
|
|
return cast("NullObject", self._reference_clone(NullObject(), pdf_dest))
|
|
|
|
def write_to_stream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None:
|
|
stream.write(b"null")
|
|
|
|
@staticmethod
|
|
def read_from_stream(stream: StreamType) -> "NullObject":
|
|
nulltxt = stream.read(4)
|
|
if nulltxt != b"null":
|
|
raise PdfReadError("Could not read Null object")
|
|
return NullObject()
|
|
|
|
def writeToStream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None: # pragma: no cover
|
|
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0")
|
|
self.write_to_stream(stream, encryption_key)
|
|
|
|
def __repr__(self) -> str:
|
|
return "NullObject"
|
|
|
|
@staticmethod
|
|
def readFromStream(stream: StreamType) -> "NullObject": # pragma: no cover
|
|
deprecation_with_replacement("readFromStream", "read_from_stream", "3.0.0")
|
|
return NullObject.read_from_stream(stream)
|
|
|
|
|
|
class BooleanObject(PdfObject):
|
|
def __init__(self, value: Any) -> None:
|
|
self.value = value
|
|
|
|
def clone(
|
|
self,
|
|
pdf_dest: PdfWriterProtocol,
|
|
force_duplicate: bool = False,
|
|
ignore_fields: Union[Tuple[str, ...], List[str], None] = (),
|
|
) -> "BooleanObject":
|
|
"""clone object into pdf_dest"""
|
|
return cast(
|
|
"BooleanObject", self._reference_clone(BooleanObject(self.value), pdf_dest)
|
|
)
|
|
|
|
def __eq__(self, __o: object) -> bool:
|
|
if isinstance(__o, BooleanObject):
|
|
return self.value == __o.value
|
|
elif isinstance(__o, bool):
|
|
return self.value == __o
|
|
else:
|
|
return False
|
|
|
|
def __repr__(self) -> str:
|
|
return "True" if self.value else "False"
|
|
|
|
def write_to_stream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None:
|
|
if self.value:
|
|
stream.write(b"true")
|
|
else:
|
|
stream.write(b"false")
|
|
|
|
def writeToStream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None: # pragma: no cover
|
|
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0")
|
|
self.write_to_stream(stream, encryption_key)
|
|
|
|
@staticmethod
|
|
def read_from_stream(stream: StreamType) -> "BooleanObject":
|
|
word = stream.read(4)
|
|
if word == b"true":
|
|
return BooleanObject(True)
|
|
elif word == b"fals":
|
|
stream.read(1)
|
|
return BooleanObject(False)
|
|
else:
|
|
raise PdfReadError("Could not read Boolean object")
|
|
|
|
@staticmethod
|
|
def readFromStream(stream: StreamType) -> "BooleanObject": # pragma: no cover
|
|
deprecation_with_replacement("readFromStream", "read_from_stream", "3.0.0")
|
|
return BooleanObject.read_from_stream(stream)
|
|
|
|
|
|
class IndirectObject(PdfObject):
|
|
def __init__(self, idnum: int, generation: int, pdf: Any) -> None: # PdfReader
|
|
self.idnum = idnum
|
|
self.generation = generation
|
|
self.pdf = pdf
|
|
|
|
def clone(
|
|
self,
|
|
pdf_dest: PdfWriterProtocol,
|
|
force_duplicate: bool = False,
|
|
ignore_fields: Union[Tuple[str, ...], List[str], None] = (),
|
|
) -> "IndirectObject":
|
|
"""clone object into pdf_dest"""
|
|
if self.pdf == pdf_dest and not force_duplicate:
|
|
# Already duplicated and no extra duplication required
|
|
return self
|
|
if id(self.pdf) not in pdf_dest._id_translated:
|
|
pdf_dest._id_translated[id(self.pdf)] = {}
|
|
|
|
if not force_duplicate and self.idnum in pdf_dest._id_translated[id(self.pdf)]:
|
|
dup = pdf_dest.get_object(pdf_dest._id_translated[id(self.pdf)][self.idnum])
|
|
else:
|
|
obj = self.get_object()
|
|
assert obj is not None
|
|
dup = obj.clone(pdf_dest, force_duplicate, ignore_fields)
|
|
assert dup is not None
|
|
assert dup.indirect_reference is not None
|
|
return dup.indirect_reference
|
|
|
|
@property
|
|
def indirect_reference(self) -> "IndirectObject": # type: ignore[override]
|
|
return self
|
|
|
|
def get_object(self) -> Optional["PdfObject"]:
|
|
obj = self.pdf.get_object(self)
|
|
if obj is None:
|
|
return None
|
|
return obj.get_object()
|
|
|
|
def __repr__(self) -> str:
|
|
return f"IndirectObject({self.idnum!r}, {self.generation!r}, {id(self.pdf)})"
|
|
|
|
def __eq__(self, other: Any) -> bool:
|
|
return (
|
|
other is not None
|
|
and isinstance(other, IndirectObject)
|
|
and self.idnum == other.idnum
|
|
and self.generation == other.generation
|
|
and self.pdf is other.pdf
|
|
)
|
|
|
|
def __ne__(self, other: Any) -> bool:
|
|
return not self.__eq__(other)
|
|
|
|
def write_to_stream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None:
|
|
stream.write(b_(f"{self.idnum} {self.generation} R"))
|
|
|
|
def writeToStream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None: # pragma: no cover
|
|
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0")
|
|
self.write_to_stream(stream, encryption_key)
|
|
|
|
@staticmethod
|
|
def read_from_stream(stream: StreamType, pdf: Any) -> "IndirectObject": # PdfReader
|
|
idnum = b""
|
|
while True:
|
|
tok = stream.read(1)
|
|
if not tok:
|
|
raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY)
|
|
if tok.isspace():
|
|
break
|
|
idnum += tok
|
|
generation = b""
|
|
while True:
|
|
tok = stream.read(1)
|
|
if not tok:
|
|
raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY)
|
|
if tok.isspace():
|
|
if not generation:
|
|
continue
|
|
break
|
|
generation += tok
|
|
r = read_non_whitespace(stream)
|
|
if r != b"R":
|
|
raise PdfReadError(
|
|
f"Error reading indirect object reference at byte {hex_str(stream.tell())}"
|
|
)
|
|
return IndirectObject(int(idnum), int(generation), pdf)
|
|
|
|
@staticmethod
|
|
def readFromStream(
|
|
stream: StreamType, pdf: Any # PdfReader
|
|
) -> "IndirectObject": # pragma: no cover
|
|
deprecation_with_replacement("readFromStream", "read_from_stream", "3.0.0")
|
|
return IndirectObject.read_from_stream(stream, pdf)
|
|
|
|
|
|
class FloatObject(decimal.Decimal, PdfObject):
|
|
def __new__(
|
|
cls, value: Union[str, Any] = "0", context: Optional[Any] = None
|
|
) -> "FloatObject":
|
|
try:
|
|
return decimal.Decimal.__new__(cls, str_(value), context)
|
|
except Exception:
|
|
# If this isn't a valid decimal (happens in malformed PDFs)
|
|
# fallback to 0
|
|
logger_warning(f"FloatObject ({value}) invalid; use 0.0 instead", __name__)
|
|
return decimal.Decimal.__new__(cls, "0.0")
|
|
|
|
def clone(
|
|
self,
|
|
pdf_dest: Any,
|
|
force_duplicate: bool = False,
|
|
ignore_fields: Union[Tuple[str, ...], List[str], None] = (),
|
|
) -> "FloatObject":
|
|
"""clone object into pdf_dest"""
|
|
return cast("FloatObject", self._reference_clone(FloatObject(self), pdf_dest))
|
|
|
|
def __repr__(self) -> str:
|
|
if self == self.to_integral():
|
|
# If this is an integer, format it with no decimal place.
|
|
return str(self.quantize(decimal.Decimal(1)))
|
|
else:
|
|
# Otherwise, format it with a decimal place, taking care to
|
|
# remove any extraneous trailing zeros.
|
|
return f"{self:f}".rstrip("0")
|
|
|
|
def as_numeric(self) -> float:
|
|
return float(repr(self).encode("utf8"))
|
|
|
|
def write_to_stream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None:
|
|
stream.write(repr(self).encode("utf8"))
|
|
|
|
def writeToStream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None: # pragma: no cover
|
|
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0")
|
|
self.write_to_stream(stream, encryption_key)
|
|
|
|
|
|
class NumberObject(int, PdfObject):
|
|
NumberPattern = re.compile(b"[^+-.0-9]")
|
|
|
|
def __new__(cls, value: Any) -> "NumberObject":
|
|
try:
|
|
return int.__new__(cls, int(value))
|
|
except ValueError:
|
|
logger_warning(f"NumberObject({value}) invalid; use 0 instead", __name__)
|
|
return int.__new__(cls, 0)
|
|
|
|
def clone(
|
|
self,
|
|
pdf_dest: Any,
|
|
force_duplicate: bool = False,
|
|
ignore_fields: Union[Tuple[str, ...], List[str], None] = (),
|
|
) -> "NumberObject":
|
|
"""clone object into pdf_dest"""
|
|
return cast("NumberObject", self._reference_clone(NumberObject(self), pdf_dest))
|
|
|
|
def as_numeric(self) -> int:
|
|
return int(repr(self).encode("utf8"))
|
|
|
|
def write_to_stream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None:
|
|
stream.write(repr(self).encode("utf8"))
|
|
|
|
def writeToStream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None: # pragma: no cover
|
|
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0")
|
|
self.write_to_stream(stream, encryption_key)
|
|
|
|
@staticmethod
|
|
def read_from_stream(stream: StreamType) -> Union["NumberObject", "FloatObject"]:
|
|
num = read_until_regex(stream, NumberObject.NumberPattern)
|
|
if num.find(b".") != -1:
|
|
return FloatObject(num)
|
|
return NumberObject(num)
|
|
|
|
@staticmethod
|
|
def readFromStream(
|
|
stream: StreamType,
|
|
) -> Union["NumberObject", "FloatObject"]: # pragma: no cover
|
|
deprecation_with_replacement("readFromStream", "read_from_stream", "3.0.0")
|
|
return NumberObject.read_from_stream(stream)
|
|
|
|
|
|
class ByteStringObject(bytes, PdfObject):
|
|
"""
|
|
Represents a string object where the text encoding could not be determined.
|
|
This occurs quite often, as the PDF spec doesn't provide an alternate way to
|
|
represent strings -- for example, the encryption data stored in files (like
|
|
/O) is clearly not text, but is still stored in a "String" object.
|
|
"""
|
|
|
|
def clone(
|
|
self,
|
|
pdf_dest: Any,
|
|
force_duplicate: bool = False,
|
|
ignore_fields: Union[Tuple[str, ...], List[str], None] = (),
|
|
) -> "ByteStringObject":
|
|
"""clone object into pdf_dest"""
|
|
return cast(
|
|
"ByteStringObject",
|
|
self._reference_clone(ByteStringObject(bytes(self)), pdf_dest),
|
|
)
|
|
|
|
@property
|
|
def original_bytes(self) -> bytes:
|
|
"""For compatibility with TextStringObject.original_bytes."""
|
|
return self
|
|
|
|
def write_to_stream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None:
|
|
bytearr = self
|
|
if encryption_key:
|
|
from .._security import RC4_encrypt
|
|
|
|
bytearr = RC4_encrypt(encryption_key, bytearr) # type: ignore
|
|
stream.write(b"<")
|
|
stream.write(hexencode(bytearr))
|
|
stream.write(b">")
|
|
|
|
def writeToStream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None: # pragma: no cover
|
|
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0")
|
|
self.write_to_stream(stream, encryption_key)
|
|
|
|
|
|
class TextStringObject(str, PdfObject):
|
|
"""
|
|
Represents a string object that has been decoded into a real unicode string.
|
|
If read from a PDF document, this string appeared to match the
|
|
PDFDocEncoding, or contained a UTF-16BE BOM mark to cause UTF-16 decoding to
|
|
occur.
|
|
"""
|
|
|
|
def clone(
|
|
self,
|
|
pdf_dest: Any,
|
|
force_duplicate: bool = False,
|
|
ignore_fields: Union[Tuple[str, ...], List[str], None] = (),
|
|
) -> "TextStringObject":
|
|
"""clone object into pdf_dest"""
|
|
obj = TextStringObject(self)
|
|
obj.autodetect_pdfdocencoding = self.autodetect_pdfdocencoding
|
|
obj.autodetect_utf16 = self.autodetect_utf16
|
|
return cast("TextStringObject", self._reference_clone(obj, pdf_dest))
|
|
|
|
autodetect_pdfdocencoding = False
|
|
autodetect_utf16 = False
|
|
|
|
@property
|
|
def original_bytes(self) -> bytes:
|
|
"""
|
|
It is occasionally possible that a text string object gets created where
|
|
a byte string object was expected due to the autodetection mechanism --
|
|
if that occurs, this "original_bytes" property can be used to
|
|
back-calculate what the original encoded bytes were.
|
|
"""
|
|
return self.get_original_bytes()
|
|
|
|
def get_original_bytes(self) -> bytes:
|
|
# We're a text string object, but the library is trying to get our raw
|
|
# bytes. This can happen if we auto-detected this string as text, but
|
|
# we were wrong. It's pretty common. Return the original bytes that
|
|
# would have been used to create this object, based upon the autodetect
|
|
# method.
|
|
if self.autodetect_utf16:
|
|
return codecs.BOM_UTF16_BE + self.encode("utf-16be")
|
|
elif self.autodetect_pdfdocencoding:
|
|
return encode_pdfdocencoding(self)
|
|
else:
|
|
raise Exception("no information about original bytes")
|
|
|
|
def write_to_stream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None:
|
|
# Try to write the string out as a PDFDocEncoding encoded string. It's
|
|
# nicer to look at in the PDF file. Sadly, we take a performance hit
|
|
# here for trying...
|
|
try:
|
|
bytearr = encode_pdfdocencoding(self)
|
|
except UnicodeEncodeError:
|
|
bytearr = codecs.BOM_UTF16_BE + self.encode("utf-16be")
|
|
if encryption_key:
|
|
from .._security import RC4_encrypt
|
|
|
|
bytearr = RC4_encrypt(encryption_key, bytearr)
|
|
obj = ByteStringObject(bytearr)
|
|
obj.write_to_stream(stream, None)
|
|
else:
|
|
stream.write(b"(")
|
|
for c in bytearr:
|
|
if not chr(c).isalnum() and c != b" ":
|
|
# This:
|
|
# stream.write(b_(rf"\{c:0>3o}"))
|
|
# gives
|
|
# https://github.com/davidhalter/parso/issues/207
|
|
stream.write(b_("\\%03o" % c))
|
|
else:
|
|
stream.write(b_(chr(c)))
|
|
stream.write(b")")
|
|
|
|
def writeToStream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None: # pragma: no cover
|
|
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0")
|
|
self.write_to_stream(stream, encryption_key)
|
|
|
|
|
|
class NameObject(str, PdfObject):
|
|
delimiter_pattern = re.compile(rb"\s+|[\(\)<>\[\]{}/%]")
|
|
surfix = b"/"
|
|
renumber_table = {
|
|
"#": b"#23",
|
|
"(": b"#28",
|
|
")": b"#29",
|
|
"/": b"#2F",
|
|
**{chr(i): f"#{i:02X}".encode() for i in range(33)},
|
|
}
|
|
|
|
def clone(
|
|
self,
|
|
pdf_dest: Any,
|
|
force_duplicate: bool = False,
|
|
ignore_fields: Union[Tuple[str, ...], List[str], None] = (),
|
|
) -> "NameObject":
|
|
"""clone object into pdf_dest"""
|
|
return cast("NameObject", self._reference_clone(NameObject(self), pdf_dest))
|
|
|
|
def write_to_stream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None:
|
|
stream.write(self.renumber()) # b_(renumber(self)))
|
|
|
|
def writeToStream(
|
|
self, stream: StreamType, encryption_key: Union[None, str, bytes]
|
|
) -> None: # pragma: no cover
|
|
deprecation_with_replacement("writeToStream", "write_to_stream", "3.0.0")
|
|
self.write_to_stream(stream, encryption_key)
|
|
|
|
def renumber(self) -> bytes:
|
|
out = self[0].encode("utf-8")
|
|
if out != b"/":
|
|
logger_warning(f"Incorrect first char in NameObject:({self})", __name__)
|
|
for c in self[1:]:
|
|
if c > "~":
|
|
for x in c.encode("utf-8"):
|
|
out += f"#{x:02X}".encode()
|
|
else:
|
|
try:
|
|
out += self.renumber_table[c]
|
|
except KeyError:
|
|
out += c.encode("utf-8")
|
|
return out
|
|
|
|
@staticmethod
|
|
def unnumber(sin: bytes) -> bytes:
|
|
i = sin.find(b"#", 0)
|
|
while i >= 0:
|
|
try:
|
|
sin = sin[:i] + unhexlify(sin[i + 1 : i + 3]) + sin[i + 3 :]
|
|
i = sin.find(b"#", i + 1)
|
|
except ValueError:
|
|
# if the 2 characters after # can not be converted to hexa
|
|
# we change nothing and carry on
|
|
i = i + 1
|
|
return sin
|
|
|
|
@staticmethod
|
|
def read_from_stream(stream: StreamType, pdf: Any) -> "NameObject": # PdfReader
|
|
name = stream.read(1)
|
|
if name != NameObject.surfix:
|
|
raise PdfReadError("name read error")
|
|
name += read_until_regex(stream, NameObject.delimiter_pattern, ignore_eof=True)
|
|
try:
|
|
# Name objects should represent irregular characters
|
|
# with a '#' followed by the symbol's hex number
|
|
name = NameObject.unnumber(name)
|
|
for enc in ("utf-8", "gbk"):
|
|
try:
|
|
ret = name.decode(enc)
|
|
return NameObject(ret)
|
|
except Exception:
|
|
pass
|
|
raise UnicodeDecodeError("", name, 0, 0, "Code Not Found")
|
|
except (UnicodeEncodeError, UnicodeDecodeError) as e:
|
|
if not pdf.strict:
|
|
logger_warning(
|
|
f"Illegal character in Name Object ({repr(name)})", __name__
|
|
)
|
|
return NameObject(name.decode("charmap"))
|
|
else:
|
|
raise PdfReadError(
|
|
f"Illegal character in Name Object ({repr(name)})"
|
|
) from e
|
|
|
|
@staticmethod
|
|
def readFromStream(
|
|
stream: StreamType, pdf: Any # PdfReader
|
|
) -> "NameObject": # pragma: no cover
|
|
deprecation_with_replacement("readFromStream", "read_from_stream", "3.0.0")
|
|
return NameObject.read_from_stream(stream, pdf)
|
|
|
|
|
|
def encode_pdfdocencoding(unicode_string: str) -> bytes:
|
|
retval = b""
|
|
for c in unicode_string:
|
|
try:
|
|
retval += b_(chr(_pdfdoc_encoding_rev[c]))
|
|
except KeyError:
|
|
raise UnicodeEncodeError(
|
|
"pdfdocencoding", c, -1, -1, "does not exist in translation table"
|
|
)
|
|
return retval
|