""" Fitz 图像预处理插件 提供将输入图片(本地路径、HTTP(S) URL 或 PIL.Image)通过 PyMuPDF(fitz) 按指定 DPI 渲染为新的高质量图像的能力。 导出的主要函数: - render_image_at_dpi: 首选、语义清晰的 API。 - get_image_by_fitz_doc: 与现有代码保持一致的兼容函数名。 相关辅助函数也一并提炼,确保模块自洽可用: - fitz_doc_to_image: 将 fitz.Page 渲染为 PIL.Image。 """ from collections.abc import Generator from typing import Any, Union, Optional from io import BytesIO import os import base64 import requests import fitz # PyMuPDF from PIL import Image from dify_plugin import Tool from dify_plugin.entities.tool import ToolInvokeMessage def _load_image_to_bytes(image: Union[str, Image.Image]) -> tuple[Image.Image, bytes]: """ 将输入图片统一读取为 PIL.Image 和原始字节数据。 支持: - PIL.Image.Image 直接传入 - 本地路径(以普通字符串表示) - HTTP(S) URL - Base64 数据 URI (data:image/xxx;base64,...) 返回: - (pil_image, image_bytes) """ if isinstance(image, Image.Image): bio = BytesIO() image.save(bio, format='PNG') return image, bio.getvalue() assert isinstance(image, str), f"Unsupported image type: {type(image)}" # 处理 base64 数据 URI if image.startswith("data:image/"): try: # 解析 data URI: data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAA... header, data = image.split(',', 1) if 'base64' in header: data_bytes = base64.b64decode(data) pil_img = Image.open(BytesIO(data_bytes)) return pil_img, data_bytes else: raise ValueError("Only base64 encoded data URIs are supported") except Exception as e: raise ValueError(f"Failed to parse base64 data URI: {e}") # 处理 HTTP(S) URL if image.startswith("http://") or image.startswith("https://"): try: with requests.get(image, stream=True) as response: response.raise_for_status() data_bytes = response.content pil_img = Image.open(BytesIO(data_bytes)) return pil_img, data_bytes except Exception as e: raise ValueError(f"Failed to download image from URL: {e}") # 默认当作本地路径处理 try: with open(image, 'rb') as f: data_bytes = f.read() pil_img = Image.open(BytesIO(data_bytes)) return pil_img, data_bytes except Exception as e: raise ValueError(f"Failed to load image from file path: {e}") def fitz_doc_to_image(doc: fitz.Page, target_dpi: int = 200, origin_dpi: Optional[tuple] = None) -> Image.Image: """ 将 fitz.Page 按指定 DPI 渲染为 PIL.Image。 参数: - doc: fitz.Page - target_dpi: 目标渲染 DPI,默认 200 - origin_dpi: 原始 DPI 信息(当前未使用,保留参数以兼容原始签名) """ mat = fitz.Matrix(target_dpi / 72, target_dpi / 72) pm = doc.get_pixmap(matrix=mat, alpha=False) if pm.width > 4500 or pm.height > 4500: # 超大图回退到 fitz 默认 DPI,以避免内存和性能问题 mat = fitz.Matrix(72 / 72, 72 / 72) pm = doc.get_pixmap(matrix=mat, alpha=False) image = Image.frombytes('RGB', (pm.width, pm.height), pm.samples) return image def render_image_at_dpi(image: Union[str, Image.Image], target_dpi: int = 200) -> Image.Image: """ 使用 fitz 将任意输入图片按指定 DPI 渲染为新的 PIL.Image。 支持输入: - PIL.Image.Image - 本地路径 - HTTP(S) URL 返回: - 渲染后的 PIL.Image """ pil_img, data_bytes = _load_image_to_bytes(image) origin_dpi = pil_img.info.get('dpi', None) # 先将图片封装为 PDF,再用 fitz 渲染 pdf_bytes = fitz.open(stream=data_bytes).convert_to_pdf() doc = fitz.open('pdf', pdf_bytes) page = doc[0] image_fitz = fitz_doc_to_image(page, target_dpi=target_dpi, origin_dpi=origin_dpi) return image_fitz def get_image_by_fitz_doc(image: Union[str, Image.Image], target_dpi: int = 200) -> Image.Image: """ 兼容函数名:行为等同于 render_image_at_dpi。 """ return render_image_at_dpi(image, target_dpi=target_dpi) class ImageFitzTool(Tool): def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessage]: """ 调用图像预处理工具 """ try: # 获取参数 image_input = tool_parameters.get("image_input") target_dpi = tool_parameters.get("target_dpi", 200) if not image_input: raise ValueError("image_input parameter is required") # 处理图像 processed_image = render_image_at_dpi(image_input, target_dpi) # 将处理后的图像转换为字节数据 bio = BytesIO() processed_image.save(bio, format='PNG') image_bytes = bio.getvalue() # 创建文件元数据 meta = { 'filename': 'processed_image.png', 'mime_type': 'image/png', 'width': processed_image.size[0], 'height': processed_image.size[1], 'dpi': target_dpi } # 返回 blob 消息(文件形式) yield self.create_blob_message(blob=image_bytes, meta=meta) except Exception as e: yield self.create_text_message(f"Error processing image: {str(e)}")