| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- """
- Fitz 图像预处理插件
- 提供将输入图片(本地路径、HTTP(S) URL 或 PIL.Image)通过 PyMuPDF(fitz)
- 按指定 DPI 渲染为新的高质量图像的能力。
- 导出的主要函数:
- - render_image_at_dpi: 首选、语义清晰的 API。
- - get_image_by_fitz_doc: 与现有代码保持一致的兼容函数名。
- 相关辅助函数也一并提炼,确保模块自洽可用:
- - fitz_doc_to_image: 将 fitz.Page 渲染为 PIL.Image。
- """
- from collections.abc import Generator
- from typing import Any, Union, Optional
- from io import BytesIO
- import os
- import base64
- import requests
- import fitz # PyMuPDF
- from PIL import Image
- from dify_plugin import Tool
- from dify_plugin.entities.tool import ToolInvokeMessage
- def _load_image_to_bytes(image: Union[str, Image.Image]) -> tuple[Image.Image, bytes]:
- """
- 将输入图片统一读取为 PIL.Image 和原始字节数据。
- 支持:
- - PIL.Image.Image 直接传入
- - 本地路径(以普通字符串表示)
- - HTTP(S) URL
- - Base64 数据 URI (data:image/xxx;base64,...)
- 返回:
- - (pil_image, image_bytes)
- """
- if isinstance(image, Image.Image):
- bio = BytesIO()
- image.save(bio, format='PNG')
- return image, bio.getvalue()
- assert isinstance(image, str), f"Unsupported image type: {type(image)}"
- # 处理 base64 数据 URI
- if image.startswith("data:image/"):
- try:
- # 解析 data URI: data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAA...
- header, data = image.split(',', 1)
- if 'base64' in header:
- data_bytes = base64.b64decode(data)
- pil_img = Image.open(BytesIO(data_bytes))
- return pil_img, data_bytes
- else:
- raise ValueError("Only base64 encoded data URIs are supported")
- except Exception as e:
- raise ValueError(f"Failed to parse base64 data URI: {e}")
- # 处理 HTTP(S) URL
- if image.startswith("http://") or image.startswith("https://"):
- try:
- with requests.get(image, stream=True) as response:
- response.raise_for_status()
- data_bytes = response.content
- pil_img = Image.open(BytesIO(data_bytes))
- return pil_img, data_bytes
- except Exception as e:
- raise ValueError(f"Failed to download image from URL: {e}")
- # 默认当作本地路径处理
- try:
- with open(image, 'rb') as f:
- data_bytes = f.read()
- pil_img = Image.open(BytesIO(data_bytes))
- return pil_img, data_bytes
- except Exception as e:
- raise ValueError(f"Failed to load image from file path: {e}")
- def fitz_doc_to_image(doc: fitz.Page, target_dpi: int = 200, origin_dpi: Optional[tuple] = None) -> Image.Image:
- """
- 将 fitz.Page 按指定 DPI 渲染为 PIL.Image。
- 参数:
- - doc: fitz.Page
- - target_dpi: 目标渲染 DPI,默认 200
- - origin_dpi: 原始 DPI 信息(当前未使用,保留参数以兼容原始签名)
- """
- mat = fitz.Matrix(target_dpi / 72, target_dpi / 72)
- pm = doc.get_pixmap(matrix=mat, alpha=False)
- if pm.width > 4500 or pm.height > 4500:
- # 超大图回退到 fitz 默认 DPI,以避免内存和性能问题
- mat = fitz.Matrix(72 / 72, 72 / 72)
- pm = doc.get_pixmap(matrix=mat, alpha=False)
- image = Image.frombytes('RGB', (pm.width, pm.height), pm.samples)
- return image
- def render_image_at_dpi(image: Union[str, Image.Image], target_dpi: int = 200) -> Image.Image:
- """
- 使用 fitz 将任意输入图片按指定 DPI 渲染为新的 PIL.Image。
- 支持输入:
- - PIL.Image.Image
- - 本地路径
- - HTTP(S) URL
- 返回:
- - 渲染后的 PIL.Image
- """
- pil_img, data_bytes = _load_image_to_bytes(image)
- origin_dpi = pil_img.info.get('dpi', None)
- # 先将图片封装为 PDF,再用 fitz 渲染
- pdf_bytes = fitz.open(stream=data_bytes).convert_to_pdf()
- doc = fitz.open('pdf', pdf_bytes)
- page = doc[0]
- image_fitz = fitz_doc_to_image(page, target_dpi=target_dpi, origin_dpi=origin_dpi)
- return image_fitz
- def get_image_by_fitz_doc(image: Union[str, Image.Image], target_dpi: int = 200) -> Image.Image:
- """
- 兼容函数名:行为等同于 render_image_at_dpi。
- """
- return render_image_at_dpi(image, target_dpi=target_dpi)
- class ImageFitzTool(Tool):
- def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessage]:
- """
- 调用图像预处理工具
- """
- try:
- # 获取参数
- image_input = tool_parameters.get("image_input")
- target_dpi = tool_parameters.get("target_dpi", 200)
-
- if not image_input:
- raise ValueError("image_input parameter is required")
-
- # 处理图像
- processed_image = render_image_at_dpi(image_input, target_dpi)
-
- # 将处理后的图像转换为字节数据
- bio = BytesIO()
- processed_image.save(bio, format='PNG')
- image_bytes = bio.getvalue()
-
- # 创建文件元数据
- meta = {
- 'filename': 'processed_image.png',
- 'mime_type': 'image/png',
- 'width': processed_image.size[0],
- 'height': processed_image.size[1],
- 'dpi': target_dpi
- }
-
- # 返回 blob 消息(文件形式)
- yield self.create_blob_message(blob=image_bytes, meta=meta)
-
- except Exception as e:
- yield self.create_text_message(f"Error processing image: {str(e)}")
|