isPowerfulBlog
[LangChain] S3, MinIO Loader 구현하고 PDFParser 사용하기 본문
s3에서는 버킷 내 파일 고유의 url을 제공한다. 허용된 시간 동안 그 url을 이용해서 파일에 접근할 수 있다.
loader = PyPDFLoader("http://s3_url/어쩌구저쩌구겁나긴긴긴긴url입니다")
그래서 이 url을 이용해서 langchain에서 제공하는 다양한 PDFLoader를 사용할 계획이었으나...
OSError: [Errno 36] File name too long
이런 에러가 났다! 그럴만두 한게 s3에서 제공하는 url이 진짜 길긴 하다.
근데 langchain에서 제공하는 PDFLoader는 web link나 file path만 input으로 넣어줄 수 있다. 그래서 되게 난감했다.
pdf 파싱을 해야하는데, 랭체인에서는 PDFLoader로 파싱 기능을 제공하기 때문이다...
Langchain 코드 확인하기
그래서 랭체인 github 레포지토리를 뒤졌다. 역시나 랭체인에서 구현한 PyPDFLoader(뿐만 아니라 딴것들도)도 기존에 구현해놓은 PyPDFParser를 불러와서 사용하는 구조였다.
# langchain/libs/community/langchain_community/document_loaders/pdf.py
class PyPDFLoader(BasePDFLoader):
"""Load PDF using pypdf into list of documents.
Loader chunks by page and stores page numbers in metadata.
"""
def __init__(
self,
file_path: str,
password: Optional[Union[str, bytes]] = None,
headers: Optional[Dict] = None,
extract_images: bool = False,
) -> None:
"""Initialize with a file path."""
try:
import pypdf # noqa:F401
except ImportError:
raise ImportError(
"pypdf package not found, please install it with " "`pip install pypdf`"
)
super().__init__(file_path, headers=headers)
self.parser = PyPDFParser(password=password, extract_images=extract_images)
def lazy_load(
self,
) -> Iterator[Document]:
"""Lazy load given path as pages."""
if self.web_path:
blob = Blob.from_data(open(self.file_path, "rb").read(), path=self.web_path) # type: ignore[attr-defined]
else:
blob = Blob.from_path(self.file_path) # type: ignore[attr-defined]
yield from self.parser.parse(blob)
주목할 부분은
self.parser = PyPDFParser(password=password, extract_images=extract_images)
객체 초기화 함수에서 PyPDFParser를 불러와서 쓰고 있다는 것과blob = Blob.from_path(self.file_path)
Blob라는 객체 형태로 파일을 읽어오고, 그 객체를 parser의 input으로 넣어주고 있다는 것이다.
from langchain_community.document_loaders.blob_loaders import Blob
Blob 객체 또한 langchain에서 구현히 두었더라
S3, MinIO file Loader 구현하기
나는 s3 또는 MinIO 버킷에서 pdf파일을 읽어와서 파싱을 해야했다.
그래서 버킷에서 파일을 읽어와서 Blob 객체 형태로 반환하는 Loader를 구현하기로 했다.
(둘이 완전 똑같아서 minio로 작성했다.)
class MinioLoader:
def __init__(self, access_credential, bucket_name):
# MinIO 클라이언트 객체 생성
# 버킷 존재 여부 확인
...
def load_file(self, key):
# 버킷에서 파일객체 가져오고
# Blob객체 형태로 반환
작성한 MinioLoader의 구조는 다음과 같다!. (중간엔 개인적으로 더 필요한 함수들을 작성해두었으나 안 궁금해할 것 같아서 뺐다.)
MinIO 클라이언트 객체 생성
MinIO 클라이언트 객체 생성은 다음과 같다.
client = Minio(f'{url}:{port}',
access_key=access_credential["accessKey"],
secret_key=access_credential["secretKey"],
secure=False
)
secure False 옵션은 아래와 같은 SSLError(1, '[SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:997)'))
에러를 해결하기 위해 붙였다. 우리는 내부 서버에서만 MinIO를 동작시킬거라 일단 보안 문제는 나중으로...
raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='3.35.178.237', port=9001): Max retries exceeded with url: /wequiz-files?location= (Caused by SSLError(SSLError(1, '[SSL: WRONG_VERSION_NUMBER] wrong version number (_ssl.c:997)')))
버킷 존재 확인
클라이언트에 타겟하고 있는 버킷이 존재하는지 확인한다.
if self.client.bucket_exists(self.bucket_name):
print(f"Bucket {self.bucket_name} exists")
else:
print(f"Bucket {self.bucket_name} does not exist")
버킷에서 파일 객체 가져오기
file_obj = self.client.get_object(self.bucket_name, key)
get_object
를 사용해서 버킷 속 key(path)에 해당하는 파일을 가져온다.
Blob 객체 형태로 반환
from langchain_community.document_loaders.blob_loaders import Blob
..
Blob.from_data(file_obj.read())
file 객체를 read()
함수를 이용해서 읽는다. 이 함수를 이용해서 읽으면 파일 객체가 바이너리 형태로 읽힌다.
Blob 클래스의 from_data()
함수를 이용해서 바이너리 형태의 컨텐츠를 Blob객체로 말아준다. 이제 준비 완룡!
PyPDFParser 사용하기
위에서도 말했지만, langchain에서는 너무 친절하게 pdfloader를 제공하는데, 나는 이제 loader는 됐고,
blob객체를 파싱해줄 parser가 필요했다. 랭체인 깃허브 뒤져서 PyPDFParser 모듈을 발견했다.
# langchain/libs/community/langchain_community/document_loaders/parsers
/pdf.py
class PyPDFParser(BaseBlobParser):
"""Load `PDF` using `pypdf`"""
def __init__(
self, password: Optional[Union[str, bytes]] = None, extract_images: bool = False
):
self.password = password
self.extract_images = extract_images
def lazy_parse(self, blob: Blob) -> Iterator[Document]: # type: ignore[valid-type]
"""Lazily parse the blob."""
import pypdf
with blob.as_bytes_io() as pdf_file_obj: # type: ignore[attr-defined]
pdf_reader = pypdf.PdfReader(pdf_file_obj, password=self.password)
yield from [
Document(
page_content=page.extract_text()
+ self._extract_images_from_page(page),
metadata={"source": blob.source, "page": page_number}, # type: ignore[attr-defined]
)
for page_number, page in enumerate(pdf_reader.pages)
]
def _extract_images_from_page(self, page: pypdf._page.PageObject) -> str:
"""Extract images from page and get the text with RapidOCR."""
if not self.extract_images or "/XObject" not in page["/Resources"].keys():
return ""
xObject = page["/Resources"]["/XObject"].get_object() # type: ignore
images = []
for obj in xObject:
if xObject[obj]["/Subtype"] == "/Image":
if xObject[obj]["/Filter"][1:] in _PDF_FILTER_WITHOUT_LOSS:
height, width = xObject[obj]["/Height"], xObject[obj]["/Width"]
images.append(
np.frombuffer(xObject[obj].get_data(), dtype=np.uint8).reshape(
height, width, -1
)
)
elif xObject[obj]["/Filter"][1:] in _PDF_FILTER_WITH_LOSS:
images.append(xObject[obj].get_data())
else:
warnings.warn("Unknown PDF Filter!")
return extract_from_images_with_rapidocr(images)
lazy_parse
함수를 발견!!!했다. 얘로 파싱하는구나..~~ 파악했으니 이제 PyPDFParser를 사용만 하면된다.
PyPDFParser 적용하기
# PyPDFParser 객체 생성
parser = PyPDFParser(extract_images=True)
file_obj = loader.load_file(file)
# PDF 파싱
documents = parser.lazy_parse(file_obj)
아까 만든 loader로 파일을 가져오고
아까 확인한 lazy_parse
함수로 Blob 형태의 파일객체를 파싱해준다!
파싱 성공!
기분 좋다~
Reference
'Data Engineering' 카테고리의 다른 글
큐(Queue)를 이용한 비동기(Async) 작업 처리 속도 및 확장성 개선 (0) | 2024.10.25 |
---|---|
[LangChain] multi query invoke, ainvoke 속도 비교하기 (1) | 2024.04.25 |
[Kafka] Docker에서 Kafka Multi Broker Cluster 구성하기 (1) | 2023.03.22 |
[Kafka] Docker로 Producer, Consumer 통신하기 (Python) (0) | 2023.03.22 |
[Kafka] Docker에서 Kafka 단일 Broker Cluster 구성하기 (0) | 2023.03.22 |