Source code for aiecs.tools.docs.document_writer_tool

# /*---------------------------------------------------------------------------------------------
#  *  Copyright (c) IRETBL Corporation. All rights reserved.
#  *  Licensed under the Apache-2.0. See License.txt in the project root for license information.
#  *--------------------------------------------------------------------------------------------*/
import os
import json
import uuid
import hashlib
import logging
import asyncio
import shutil
from typing import Dict, Any, List, Optional, Union, Tuple, cast
from enum import Enum
from datetime import datetime
from pathlib import Path
import tempfile

from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings, SettingsConfigDict

from aiecs.tools.base_tool import BaseTool
from aiecs.tools import register_tool


[docs] class DocumentFormat(str, Enum): """Supported document formats for writing""" TXT = "txt" PLAIN_TEXT = "txt" # Alias for TXT JSON = "json" CSV = "csv" XML = "xml" MARKDOWN = "markdown" HTML = "html" YAML = "yaml" PDF = "pdf" DOCX = "docx" XLSX = "xlsx" PPTX = "pptx" PPT = "ppt" BINARY = "binary"
[docs] class WriteMode(str, Enum): """Document writing modes""" CREATE = "create" # 创建新文件,如果存在则失败 OVERWRITE = "overwrite" # 覆盖现有文件 APPEND = "append" # 追加到现有文件 UPDATE = "update" # 更新现有文件(智能合并) BACKUP_WRITE = "backup_write" # 备份后写入 VERSION_WRITE = "version_write" # 版本化写入 INSERT = "insert" # 在指定位置插入内容 REPLACE = "replace" # 替换指定内容 DELETE = "delete" # 删除指定内容
[docs] class EditOperation(str, Enum): """Advanced edit operations""" BOLD = "bold" # 加粗文本 ITALIC = "italic" # 斜体文本 UNDERLINE = "underline" # 下划线文本 STRIKETHROUGH = "strikethrough" # 删除线文本 HIGHLIGHT = "highlight" # 高亮文本 INSERT_TEXT = "insert_text" # 插入文本 DELETE_TEXT = "delete_text" # 删除文本 REPLACE_TEXT = "replace_text" # 替换文本 COPY_TEXT = "copy_text" # 复制文本 CUT_TEXT = "cut_text" # 剪切文本 PASTE_TEXT = "paste_text" # 粘贴文本 FIND_REPLACE = "find_replace" # 查找替换 INSERT_LINE = "insert_line" # 插入行 DELETE_LINE = "delete_line" # 删除行 MOVE_LINE = "move_line" # 移动行
[docs] class EncodingType(str, Enum): """Text encoding types""" UTF8 = "utf-8" UTF16 = "utf-16" ASCII = "ascii" GBK = "gbk" AUTO = "auto"
[docs] class ValidationLevel(str, Enum): """Content validation levels""" NONE = "none" # 无验证 BASIC = "basic" # 基础验证(格式、大小) STRICT = "strict" # 严格验证(内容、结构) ENTERPRISE = "enterprise" # 企业级验证(安全、合规)
[docs] class DocumentWriterError(Exception): """Base exception for document writer errors"""
[docs] class WriteError(DocumentWriterError): """Raised when write operations fail"""
[docs] class ValidationError(DocumentWriterError): """Raised when validation fails"""
[docs] class SecurityError(DocumentWriterError): """Raised when security validation fails"""
[docs] class WritePermissionError(DocumentWriterError): """Raised when write permission is denied"""
[docs] class ContentValidationError(DocumentWriterError): """Raised when content validation fails"""
[docs] class StorageError(DocumentWriterError): """Raised when storage operations fail"""
[docs] @register_tool("document_writer") class DocumentWriterTool(BaseTool): """ Modern high-performance document writing component that can: 1. Handle multiple document formats and encodings 2. Provide production-grade write operations with validation 3. Support various write modes (create, overwrite, append, update) 4. Implement backup and versioning strategies 5. Ensure atomic operations and data integrity 6. Support both local and cloud storage Production Features: - Atomic writes (no partial writes) - Content validation and security scanning - Automatic backup and versioning - Write permission and quota checks - Transaction-like operations - Audit logging """ # Configuration schema
[docs] class Config(BaseSettings): """Configuration for the document writer tool Automatically reads from environment variables with DOC_WRITER_ prefix. Example: DOC_WRITER_GCS_PROJECT_ID -> gcs_project_id """ model_config = SettingsConfigDict(env_prefix="DOC_WRITER_") temp_dir: str = Field( default=os.path.join(tempfile.gettempdir(), "document_writer"), description="Temporary directory for document processing", ) backup_dir: str = Field( default=os.path.join(tempfile.gettempdir(), "document_backups"), description="Directory for document backups", ) output_dir: Optional[str] = Field(default=None, description="Default output directory for documents") max_file_size: int = Field(default=100 * 1024 * 1024, description="Maximum file size in bytes") max_backup_versions: int = Field(default=10, description="Maximum number of backup versions to keep") default_encoding: str = Field(default="utf-8", description="Default text encoding for documents") enable_backup: bool = Field( default=True, description="Whether to enable automatic backup functionality", ) enable_versioning: bool = Field(default=True, description="Whether to enable document versioning") enable_content_validation: bool = Field(default=True, description="Whether to enable content validation") enable_security_scan: bool = Field(default=True, description="Whether to enable security scanning") atomic_write: bool = Field(default=True, description="Whether to use atomic write operations") validation_level: str = Field(default="basic", description="Content validation level") timeout_seconds: int = Field(default=60, description="Operation timeout in seconds") auto_backup: bool = Field( default=True, description="Whether to automatically backup before write operations", ) atomic_writes: bool = Field(default=True, description="Whether to use atomic write operations") default_format: str = Field(default="markdown", description="Default document format") version_control: bool = Field(default=True, description="Whether to enable version control") security_scan: bool = Field(default=True, description="Whether to enable security scanning") enable_cloud_storage: bool = Field( default=True, description="Whether to enable cloud storage integration", ) gcs_bucket_name: Optional[str] = Field( default=None, description="Google Cloud Storage bucket name (must be provided via config or environment variable)", ) gcs_project_id: Optional[str] = Field(default=None, description="Google Cloud Storage project ID")
[docs] def __init__(self, config: Optional[Dict] = None, **kwargs): """Initialize DocumentWriterTool with settings Configuration is automatically loaded by BaseTool from: 1. Explicit config dict (highest priority) 2. YAML config files (config/tools/document_writer_tool.yaml) 3. Environment variables (via dotenv from .env files) 4. Tool defaults (lowest priority) Args: config: Optional configuration overrides **kwargs: Additional arguments passed to BaseTool (e.g., tool_name) """ super().__init__(config, **kwargs) # Configuration is automatically loaded by BaseTool into self._config_obj # Access config via self._config_obj (BaseSettings instance) self.config: "DocumentWriterTool.Config" = self._config_obj if self._config_obj else self.Config() # type: ignore[assignment] self.logger = logging.getLogger(__name__) # Create necessary directories os.makedirs(self.config.temp_dir, exist_ok=True) os.makedirs(self.config.backup_dir, exist_ok=True) # Initialize cloud storage self._init_cloud_storage() # Initialize office tool for PPTX/DOCX writing self._init_office_tool() # Initialize content validators self._init_validators()
def _init_cloud_storage(self): """Initialize cloud storage for document writing""" self.file_storage = None if self.config.enable_cloud_storage: try: from aiecs.infrastructure.persistence.file_storage import ( FileStorage, ) # Validate that gcs_bucket_name is provided if cloud storage is enabled if not self.config.gcs_bucket_name: self.logger.warning( "Cloud storage is enabled but gcs_bucket_name is not provided. " "Please set DOC_WRITER_GCS_BUCKET_NAME environment variable or provide it in config. " "Falling back to local storage only." ) storage_config = { "gcs_bucket_name": self.config.gcs_bucket_name, "gcs_project_id": self.config.gcs_project_id, "enable_local_fallback": True, "local_storage_path": self.config.temp_dir, } self.file_storage = FileStorage(storage_config) # Initialize storage asynchronously if in async context, otherwise defer try: asyncio.get_running_loop() # We're in an async context, create task asyncio.create_task(self._init_storage_async()) except RuntimeError: # Not in async context, initialization will happen on first async operation # or can be called explicitly via write_document_async pass except ImportError: self.logger.warning("FileStorage not available, cloud storage disabled") except Exception as e: self.logger.warning(f"Failed to initialize cloud storage: {e}") async def _init_storage_async(self): """Async initialization of file storage""" try: if self.file_storage: await self.file_storage.initialize() self.logger.info("Cloud storage initialized successfully") except Exception as e: self.logger.warning(f"Cloud storage initialization failed: {e}") self.file_storage = None def _init_office_tool(self): """Initialize office tool for PPTX/DOCX writing""" self.office_tool: Optional[Any] = None try: from aiecs.tools.task_tools.office_tool import OfficeTool self.office_tool = OfficeTool() self.logger.info("OfficeTool initialized successfully for PPTX/DOCX support") except ImportError: self.logger.warning("OfficeTool not available, PPTX/DOCX writing will be limited") self.office_tool = None def _init_validators(self): """Initialize content validators""" self.validators = { DocumentFormat.JSON: self._validate_json_content, DocumentFormat.XML: self._validate_xml_content, DocumentFormat.CSV: self._validate_csv_content, DocumentFormat.YAML: self._validate_yaml_content, DocumentFormat.HTML: self._validate_html_content, } def _run_async_safely(self, coro): """Safely run async coroutine from sync context This method handles both cases: 1. If already in an async context (event loop running), creates a new event loop in a thread 2. If not in async context, uses asyncio.run() to create new event loop Args: coro: Coroutine to run Returns: Result of the coroutine """ try: # Try to get the running event loop asyncio.get_running_loop() # If we get here, we're in an async context # We need to run the coroutine in a separate thread with its own event loop import threading result = None exception = None def run_in_thread(): nonlocal result, exception try: new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) result = new_loop.run_until_complete(coro) new_loop.close() except Exception as e: exception = e thread = threading.Thread(target=run_in_thread) thread.start() thread.join() if exception: raise exception return result except RuntimeError: # No running event loop, safe to use asyncio.run() return asyncio.run(coro) # Schema definitions
[docs] class Write_documentSchema(BaseModel): """Schema for write_document operation""" target_path: str = Field(description="Target file path (local or cloud)") content: Union[str, bytes, Dict, List] = Field(description="Content to write") format: DocumentFormat = Field(description="Document format") mode: WriteMode = Field(default=WriteMode.CREATE, description="Write mode") encoding: EncodingType = Field(default=EncodingType.UTF8, description="Text encoding") validation_level: ValidationLevel = Field(default=ValidationLevel.BASIC, description="Validation level") metadata: Optional[Dict[str, Any]] = Field(default=None, description="Additional metadata") backup_comment: Optional[str] = Field(default=None, description="Backup comment")
[docs] class Batch_write_documentsSchema(BaseModel): """Schema for batch_write_documents operation""" write_operations: List[Dict[str, Any]] = Field(description="List of write operations") transaction_mode: bool = Field(default=True, description="Use transaction mode") rollback_on_error: bool = Field(default=True, description="Rollback on any error")
[docs] class Edit_documentSchema(BaseModel): """Schema for edit_document operation""" target_path: str = Field(description="Target file path") operation: EditOperation = Field(description="Edit operation to perform") content: Optional[str] = Field(default=None, description="Content for the operation") position: Optional[Dict[str, Any]] = Field(default=None, description="Position info (line, column, offset)") selection: Optional[Dict[str, Any]] = Field(default=None, description="Text selection range") format_options: Optional[Dict[str, Any]] = Field(default=None, description="Formatting options")
[docs] class Format_textSchema(BaseModel): """Schema for format_text operation""" target_path: str = Field(description="Target file path") text_to_format: str = Field(description="Text to apply formatting to") format_type: EditOperation = Field(description="Type of formatting") format_options: Optional[Dict[str, Any]] = Field(default=None, description="Additional format options")
[docs] class Find_replaceSchema(BaseModel): """Schema for find_replace operation with precise control""" target_path: str = Field(description="Target file path") find_text: str = Field(description="Text to find") replace_text: str = Field(description="Text to replace with") replace_all: bool = Field(default=False, description="Replace all occurrences (ignored if occurrence is set)") occurrence: Optional[int] = Field( default=None, description="Replace only the nth occurrence (1-based index). If None, uses replace_all. Example: occurrence=3 replaces only the 3rd match", ge=1 ) start_line: Optional[int] = Field(default=None, description="Start line number (1-based, inclusive) to limit search range. Example: start_line=10 begins search at line 10", ge=1) end_line: Optional[int] = Field(default=None, description="End line number (1-based, inclusive) to limit search range. Example: end_line=50 ends search at line 50", ge=1) case_sensitive: bool = Field(default=True, description="Case sensitive search") regex_mode: bool = Field(default=False, description="Use regex for find/replace")
[docs] class Search_replace_blocksSchema(BaseModel): """Schema for search_replace_blocks operation (Cline/Claude Code format)""" target_path: str = Field(description="Target file path") blocks: str = Field( description="""String containing one or more SEARCH/REPLACE blocks in the format: <<<<<<< SEARCH old text to find ======= new text to replace with >>>>>>> REPLACE Multiple blocks can be provided sequentially. Each block will be executed in order.""" ) case_sensitive: bool = Field(default=True, description="Case sensitive search for all blocks")
[docs] def write_document( self, target_path: str, content: Union[str, bytes, Dict, List], format: DocumentFormat, mode: WriteMode = WriteMode.CREATE, encoding: EncodingType = EncodingType.UTF8, validation_level: ValidationLevel = ValidationLevel.BASIC, metadata: Optional[Dict[str, Any]] = None, backup_comment: Optional[str] = None, ) -> Dict[str, Any]: """ Write document with production-grade features Args: target_path: Target file path (local or cloud) content: Content to write format: Document format mode: Write mode (create, overwrite, append, update, etc.) encoding: Text encoding validation_level: Content validation level metadata: Additional metadata backup_comment: Comment for backup Returns: Dict containing write results and metadata """ try: start_time = datetime.now() operation_id = str(uuid.uuid4()) self.logger.info(f"Starting write operation {operation_id}: {target_path}") # Step 1: Validate inputs self._validate_write_inputs(target_path, content, format, mode) # Step 2: Prepare content processed_content, content_metadata = self._prepare_content(content, format, encoding, validation_level) # Step 3: Handle write mode logic write_plan = self._plan_write_operation(target_path, mode, metadata) # Step 4: Create backup if needed backup_info = None if self.config.enable_backup and mode in [ WriteMode.OVERWRITE, WriteMode.UPDATE, ]: backup_info = self._create_backup(target_path, backup_comment) # Step 5: Execute atomic write write_result = self._run_async_safely(self._execute_atomic_write(target_path, processed_content, format, encoding, write_plan)) # Step 6: Update metadata and versioning version_info = self._handle_versioning(target_path, content_metadata, metadata) # Step 7: Audit logging audit_info = self._log_write_operation(operation_id, target_path, mode, write_result, backup_info) result = { "operation_id": operation_id, "target_path": target_path, "write_mode": mode, "format": format, "encoding": encoding, "content_metadata": content_metadata, "write_result": write_result, "backup_info": backup_info, "version_info": version_info, "audit_info": audit_info, "processing_metadata": { "start_time": start_time.isoformat(), "end_time": datetime.now().isoformat(), "duration": (datetime.now() - start_time).total_seconds(), }, } self.logger.info(f"Write operation {operation_id} completed successfully") return result except Exception as e: self.logger.error(f"Write operation failed for {target_path}: {str(e)}") # Rollback if needed if "backup_info" in locals() and backup_info: self._rollback_from_backup(target_path, backup_info) raise DocumentWriterError(f"Document write failed: {str(e)}")
[docs] async def write_document_async( self, target_path: str, content: Union[str, bytes, Dict, List], format: DocumentFormat, mode: WriteMode = WriteMode.CREATE, encoding: EncodingType = EncodingType.UTF8, validation_level: ValidationLevel = ValidationLevel.BASIC, metadata: Optional[Dict[str, Any]] = None, backup_comment: Optional[str] = None, ) -> Dict[str, Any]: """Async version of write_document""" return await asyncio.to_thread( self.write_document, target_path=target_path, content=content, format=format, mode=mode, encoding=encoding, validation_level=validation_level, metadata=metadata, backup_comment=backup_comment, )
[docs] def batch_write_documents( self, write_operations: List[Dict[str, Any]], transaction_mode: bool = True, rollback_on_error: bool = True, ) -> Dict[str, Any]: """ Batch write multiple documents with transaction support Args: write_operations: List of write operation dictionaries transaction_mode: Use transaction mode for atomicity rollback_on_error: Rollback all operations on any error Returns: Dict containing batch write results """ try: start_time = datetime.now() batch_id = str(uuid.uuid4()) self.logger.info(f"Starting batch write operation {batch_id}: {len(write_operations)} operations") completed_operations = [] backup_operations = [] try: for i, operation in enumerate(write_operations): self.logger.info(f"Processing operation {i+1}/{len(write_operations)}") # Execute individual write operation result = self.write_document(**operation) completed_operations.append( { "index": i, "operation": operation, "result": result, "status": "success", } ) # Track backup info for potential rollback if result.get("backup_info"): backup_operations.append(result["backup_info"]) batch_result = { "batch_id": batch_id, "total_operations": len(write_operations), "successful_operations": len(completed_operations), "failed_operations": 0, "operations": completed_operations, "transaction_mode": transaction_mode, "batch_metadata": { "start_time": start_time.isoformat(), "end_time": datetime.now().isoformat(), "duration": (datetime.now() - start_time).total_seconds(), }, } self.logger.info(f"Batch write operation {batch_id} completed successfully") return batch_result except Exception as e: self.logger.error(f"Batch write operation {batch_id} failed: {str(e)}") if rollback_on_error and transaction_mode: self.logger.info(f"Rolling back batch operation {batch_id}") self._rollback_batch_operations(completed_operations, backup_operations) # Create failure result batch_result = { "batch_id": batch_id, "total_operations": len(write_operations), "successful_operations": len(completed_operations), "failed_operations": len(write_operations) - len(completed_operations), "operations": completed_operations, "error": str(e), "transaction_mode": transaction_mode, "rollback_performed": rollback_on_error and transaction_mode, } raise DocumentWriterError(f"Batch write operation failed: {str(e)}") except Exception as e: raise DocumentWriterError(f"Batch write operation failed: {str(e)}")
def _validate_write_inputs( self, target_path: str, content: Any, format: DocumentFormat, mode: WriteMode, ): """Validate write operation inputs""" # Path validation if not target_path or not isinstance(target_path, str): raise ValueError("Invalid target path") # Content validation if content is None: raise ValueError("Content cannot be None") # Size validation content_size = self._calculate_content_size(content) if content_size > self.config.max_file_size: raise ValueError(f"Content size {content_size} exceeds maximum {self.config.max_file_size}") # Permission validation if not self._check_write_permission(target_path, mode): raise WritePermissionError(f"No write permission for {target_path}") def _prepare_content( self, content: Any, format: DocumentFormat, encoding: EncodingType, validation_level: ValidationLevel, ) -> Tuple[Union[str, bytes], Dict]: """Prepare and validate content for writing""" # Content conversion based on format processed_content: Union[str, bytes] if format == DocumentFormat.JSON: if isinstance(content, (dict, list)): processed_content = json.dumps(content, ensure_ascii=False, indent=2) else: processed_content = str(content) elif format == DocumentFormat.CSV: processed_content = self._convert_to_csv(content) elif format == DocumentFormat.XML: processed_content = self._convert_to_xml(content) elif format == DocumentFormat.YAML: processed_content = self._convert_to_yaml(content) elif format == DocumentFormat.HTML: processed_content = self._convert_to_html(content) elif format == DocumentFormat.MARKDOWN: processed_content = self._convert_to_markdown(content) elif format == DocumentFormat.BINARY: if isinstance(content, bytes): processed_content = content else: processed_content = str(content).encode(encoding.value) else: processed_content = str(content) # Content validation if self.config.enable_content_validation: self._validate_content(processed_content, format, validation_level) # Calculate metadata content_metadata = { "original_type": type(content).__name__, "processed_size": (len(processed_content) if isinstance(processed_content, (str, bytes)) else 0), "format": format, "encoding": encoding, "checksum": self._calculate_checksum(processed_content), "validation_level": validation_level, "timestamp": datetime.now().isoformat(), } return processed_content, content_metadata def _plan_write_operation(self, target_path: str, mode: WriteMode, metadata: Optional[Dict]) -> Dict: """Plan the write operation based on mode and target""" plan = { "target_path": target_path, "mode": mode, "file_exists": self._file_exists(target_path), "is_cloud_path": self._is_cloud_storage_path(target_path), "requires_backup": False, "requires_versioning": False, "atomic_operation": self.config.atomic_write, } if mode == WriteMode.CREATE and plan["file_exists"]: raise DocumentWriterError(f"File already exists: {target_path}") if mode in [WriteMode.OVERWRITE, WriteMode.UPDATE] and plan["file_exists"]: plan["requires_backup"] = self.config.enable_backup plan["requires_versioning"] = self.config.enable_versioning if mode == WriteMode.APPEND and not plan["file_exists"]: # Convert to CREATE mode plan["mode"] = WriteMode.CREATE return plan def _create_backup(self, target_path: str, comment: Optional[str] = None) -> Dict: """Create backup of existing file""" if not self._file_exists(target_path): return {} try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") file_stem = Path(target_path).stem file_suffix = Path(target_path).suffix backup_filename = f"{file_stem}_backup_{timestamp}{file_suffix}" backup_path = os.path.join(self.config.backup_dir, backup_filename) # Copy file to backup location if self._is_cloud_storage_path(target_path): backup_path = self._backup_cloud_file(target_path, backup_path) else: shutil.copy2(target_path, backup_path) backup_info = { "original_path": target_path, "backup_path": backup_path, "timestamp": timestamp, "comment": comment, "checksum": self._calculate_file_checksum(target_path), } self.logger.info(f"Created backup: {backup_path}") return backup_info except Exception as e: self.logger.error(f"Failed to create backup for {target_path}: {e}") raise StorageError(f"Backup creation failed: {e}") async def _execute_atomic_write( self, target_path: str, content: Union[str, bytes], format: DocumentFormat, encoding: EncodingType, plan: Dict, ) -> Dict: """Execute atomic write operation""" if plan["is_cloud_path"]: return await self._write_to_cloud_storage(target_path, content, format, encoding, plan) else: return self._write_to_local_file(target_path, content, format, encoding, plan) def _write_to_local_file( self, target_path: str, content: Union[str, bytes], format: DocumentFormat, encoding: EncodingType, plan: Dict, ) -> Dict: """Write to local file system with atomic operation""" try: # Handle PPTX format using office_tool if format in [DocumentFormat.PPTX, DocumentFormat.PPT]: return self._write_pptx_file(target_path, content, plan) # Handle DOCX format using office_tool if format == DocumentFormat.DOCX: return self._write_docx_file(target_path, content, plan) # Create parent directories os.makedirs(os.path.dirname(target_path), exist_ok=True) if plan["atomic_operation"]: # Atomic write using temporary file temp_path = f"{target_path}.tmp.{uuid.uuid4().hex}" try: if plan["mode"] == WriteMode.APPEND and plan["file_exists"]: # Read existing content first with open(target_path, "rb") as f: existing_content = f.read() if isinstance(content, str): content = existing_content.decode(encoding.value) + content else: content = existing_content + content # Write to temporary file if isinstance(content, bytes): with open(temp_path, "wb") as f: f.write(content) else: # Handle both EncodingType enum and string enc_value = encoding.value if hasattr(encoding, "value") else str(encoding) with open(temp_path, "w", encoding=enc_value) as f: f.write(content) # Atomic move shutil.move(temp_path, target_path) finally: # Cleanup temp file if it still exists if os.path.exists(temp_path): os.unlink(temp_path) else: # Direct write mode_map = { WriteMode.CREATE: "w", WriteMode.OVERWRITE: "w", WriteMode.APPEND: "a", WriteMode.UPDATE: "w", } file_mode = mode_map.get(plan["mode"], "w") if isinstance(content, bytes): file_mode += "b" # Handle both EncodingType enum and string file_enc_value: Optional[str] = None if isinstance(content, bytes) else (encoding.value if hasattr(encoding, "value") else str(encoding)) with open(target_path, file_mode, encoding=file_enc_value) as f: f.write(content) # Get file stats stat = os.stat(target_path) return { "path": target_path, "size": stat.st_size, "checksum": self._calculate_file_checksum(target_path), "modified_time": datetime.fromtimestamp(stat.st_mtime).isoformat(), "atomic_write": plan["atomic_operation"], } except Exception as e: raise StorageError(f"Local file write failed: {e}") def _write_pptx_file(self, target_path: str, content: Union[str, bytes], plan: Dict) -> Dict: """Write content to PPTX file using office_tool""" if not self.office_tool: raise StorageError("OfficeTool not available. Cannot write PPTX files.") try: # Convert bytes to string if needed if isinstance(content, bytes): content_str = content.decode("utf-8") else: content_str = str(content) # Parse content to extract slides slides = self._parse_content_to_slides(content_str) # Handle append mode if plan["mode"] == WriteMode.APPEND and plan["file_exists"]: # Read existing slides existing_slides = self.office_tool.read_pptx(target_path) slides = existing_slides + slides # Use office_tool to write PPTX result = self.office_tool.write_pptx( slides=slides, output_path=target_path, image_path=None, ) if not result.get("success"): raise StorageError(f"Failed to write PPTX file: {result}") # Get file stats stat = os.stat(target_path) return { "path": target_path, "size": stat.st_size, "checksum": self._calculate_file_checksum(target_path), "modified_time": datetime.fromtimestamp(stat.st_mtime).isoformat(), "atomic_write": False, # Office tool handles its own atomicity } except Exception as e: raise StorageError(f"PPTX file write failed: {e}") def _write_docx_file(self, target_path: str, content: Union[str, bytes], plan: Dict) -> Dict: """Write content to DOCX file using office_tool""" if not self.office_tool: raise StorageError("OfficeTool not available. Cannot write DOCX files.") try: # Convert bytes to string if needed if isinstance(content, bytes): content_str = content.decode("utf-8") else: content_str = str(content) # Handle append mode if plan["mode"] == WriteMode.APPEND and plan["file_exists"]: # Read existing content existing_doc = self.office_tool.read_docx(target_path) existing_text = "\n".join(existing_doc.get("paragraphs", [])) content_str = existing_text + "\n" + content_str # Use office_tool to write DOCX result = self.office_tool.write_docx( text=content_str, output_path=target_path, table_data=None, ) if not result.get("success"): raise StorageError(f"Failed to write DOCX file: {result}") # Get file stats stat = os.stat(target_path) return { "path": target_path, "size": stat.st_size, "checksum": self._calculate_file_checksum(target_path), "modified_time": datetime.fromtimestamp(stat.st_mtime).isoformat(), "atomic_write": False, # Office tool handles its own atomicity } except Exception as e: raise StorageError(f"DOCX file write failed: {e}") def _parse_content_to_slides(self, content: str) -> List[str]: """Parse content string into list of slide contents Supports multiple slide separation formats: - "---" separator (markdown style) - "## Slide X:" headers - Empty lines between slides """ slides = [] # Split by "---" separator (common in markdown presentations) if "---" in content: parts = content.split("---") for part in parts: part = part.strip() if part: # Remove slide headers like "## Slide X: Title" lines = part.split("\n") cleaned_lines = [] for line in lines: # Skip slide headers if line.strip().startswith("## Slide") and ":" in line: continue cleaned_lines.append(line) slide_content = "\n".join(cleaned_lines).strip() if slide_content: slides.append(slide_content) else: # Try to split by "## Slide" headers if "## Slide" in content: parts = content.split("## Slide") for i, part in enumerate(parts): if i == 0: # First part might be title slide part = part.strip() if part: slides.append(part) else: # Extract content after "Slide X: Title" lines = part.split("\n", 1) if len(lines) > 1: slide_content = lines[1].strip() if slide_content: slides.append(slide_content) else: # Fallback: split by double newlines (paragraph breaks) parts = content.split("\n\n") current_slide: List[str] = [] for part in parts: part = part.strip() if part: # If it's a header, start a new slide if part.startswith("#"): if current_slide: slides.append("\n".join(current_slide)) current_slide = [] current_slide.append(part) if current_slide: slides.append("\n".join(current_slide)) # If no slides found, create a single slide with all content if not slides: slides = [content.strip()] if content.strip() else [""] return slides async def _write_to_cloud_storage( self, target_path: str, content: Union[str, bytes], format: DocumentFormat, encoding: EncodingType, plan: Dict, ) -> Dict: """Write to cloud storage""" if not self.file_storage: raise StorageError("Cloud storage not available") try: storage_path = self._parse_cloud_storage_path(target_path) # Handle append mode for cloud storage if plan["mode"] == WriteMode.APPEND and plan["file_exists"]: existing_content = await self.file_storage.retrieve(storage_path) if isinstance(content, str) and isinstance(existing_content, str): content = existing_content + content elif isinstance(content, bytes) and isinstance(existing_content, bytes): content = existing_content + content # Store in cloud storage await self.file_storage.store(storage_path, content) return { "path": target_path, "storage_path": storage_path, "size": (len(content) if isinstance(content, (str, bytes)) else 0), "checksum": self._calculate_checksum(content), "cloud_storage": True, } except Exception as e: raise StorageError(f"Cloud storage write failed: {e}") def _handle_versioning( self, target_path: str, content_metadata: Dict, metadata: Optional[Dict], ) -> Optional[Dict]: """Handle document versioning""" if not self.config.enable_versioning: return None try: version_info = { "path": target_path, "version": self._get_next_version(target_path), "timestamp": datetime.now().isoformat(), "content_metadata": content_metadata, "user_metadata": metadata or {}, } # Store version info version_file = f"{target_path}.versions.json" versions = self._load_version_history(version_file) versions.append(version_info) # Keep only recent versions if len(versions) > self.config.max_backup_versions: versions = versions[-self.config.max_backup_versions :] self._save_version_history(version_file, versions) return version_info except Exception as e: self.logger.warning(f"Versioning failed for {target_path}: {e}") return None def _validate_content( self, content: Union[str, bytes], format: DocumentFormat, validation_level: ValidationLevel, ): """Validate content based on format and validation level""" if validation_level == ValidationLevel.NONE: return try: # Format-specific validation if format in self.validators: self.validators[format](content, validation_level) # Security validation for enterprise level if validation_level == ValidationLevel.ENTERPRISE: self._security_scan_content(content) except Exception as e: raise ContentValidationError(f"Content validation failed: {e}") def _validate_json_content(self, content: Union[str, bytes], validation_level: ValidationLevel): """Validate JSON content""" try: if isinstance(content, bytes): content = content.decode("utf-8") json.loads(content) except json.JSONDecodeError as e: raise ContentValidationError(f"Invalid JSON: {e}") def _validate_xml_content(self, content: Union[str, bytes], validation_level: ValidationLevel): """Validate XML content""" try: import xml.etree.ElementTree as ET if isinstance(content, bytes): content = content.decode("utf-8") ET.fromstring(content) except ET.ParseError as e: raise ContentValidationError(f"Invalid XML: {e}") def _validate_csv_content(self, content: Union[str, bytes], validation_level: ValidationLevel): """Validate CSV content""" try: import csv import io if isinstance(content, bytes): content = content.decode("utf-8") csv.reader(io.StringIO(content)) except Exception as e: raise ContentValidationError(f"Invalid CSV: {e}") def _validate_yaml_content(self, content: Union[str, bytes], validation_level: ValidationLevel): """Validate YAML content""" try: import yaml if isinstance(content, bytes): content = content.decode("utf-8") yaml.safe_load(content) except yaml.YAMLError as e: raise ContentValidationError(f"Invalid YAML: {e}") def _validate_html_content(self, content: Union[str, bytes], validation_level: ValidationLevel): """Validate HTML content""" try: from bs4 import BeautifulSoup if isinstance(content, bytes): content = content.decode("utf-8") BeautifulSoup(content, "html.parser") except Exception as e: raise ContentValidationError(f"Invalid HTML: {e}") def _security_scan_content(self, content: Union[str, bytes]): """Perform security scan on content""" if isinstance(content, bytes): content = content.decode("utf-8", errors="ignore") # Check for suspicious patterns suspicious_patterns = [ r"<script[^>]*>", # JavaScript r"javascript:", # JavaScript URLs r"vbscript:", # VBScript URLs r"data:.*base64", # Base64 data URLs r"eval\s*\(", # eval() calls r"exec\s*\(", # exec() calls ] import re for pattern in suspicious_patterns: if re.search(pattern, content, re.IGNORECASE): raise ContentValidationError("Security scan failed: suspicious pattern detected") # Helper methods def _calculate_content_size(self, content: Any) -> int: """Calculate content size in bytes""" if isinstance(content, bytes): return len(content) elif isinstance(content, str): return len(content.encode("utf-8")) else: return len(str(content).encode("utf-8")) def _calculate_checksum(self, content: Union[str, bytes]) -> str: """Calculate content checksum""" if isinstance(content, str): content = content.encode("utf-8") return hashlib.sha256(content).hexdigest() def _calculate_file_checksum(self, file_path: str) -> str: """Calculate file checksum""" hash_sha256 = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_sha256.update(chunk) return hash_sha256.hexdigest() def _check_write_permission(self, target_path: str, mode: WriteMode) -> bool: """Check write permission for target path""" try: if self._is_cloud_storage_path(target_path): return self.file_storage is not None parent_dir = os.path.dirname(target_path) if not os.path.exists(parent_dir): # Check if we can create the directory return os.access(os.path.dirname(parent_dir), os.W_OK) if os.path.exists(target_path): return os.access(target_path, os.W_OK) else: return os.access(parent_dir, os.W_OK) except Exception: return False def _file_exists(self, file_path: str) -> bool: """Check if file exists (local or cloud)""" if self._is_cloud_storage_path(file_path): # For cloud storage, we'd need to implement exists check return False # Simplified for now else: return os.path.exists(file_path) def _is_cloud_storage_path(self, source: str) -> bool: """Check if source is a cloud storage path""" cloud_schemes = ["gs", "s3", "azure", "cloud"] try: from urllib.parse import urlparse parsed = urlparse(source) return parsed.scheme in cloud_schemes except Exception: return False def _parse_cloud_storage_path(self, source: str) -> str: """Parse cloud storage path to get storage key""" try: from urllib.parse import urlparse parsed = urlparse(source) return parsed.path.lstrip("/") except Exception: return source # Content conversion methods def _convert_to_csv(self, content: Any) -> str: """Convert content to CSV format""" import csv import io output = io.StringIO() writer = csv.writer(output) if isinstance(content, list): for row in content: if isinstance(row, (list, tuple)): writer.writerow(row) else: writer.writerow([row]) elif isinstance(content, dict): # Convert dict to CSV with headers if content: headers = list(content.keys()) writer.writerow(headers) writer.writerow([content[h] for h in headers]) else: writer.writerow([str(content)]) return output.getvalue() def _convert_to_xml(self, content: Any) -> str: """Convert content to XML format""" import xml.etree.ElementTree as ET if isinstance(content, dict): root = ET.Element("document") for key, value in content.items(): elem = ET.SubElement(root, str(key)) elem.text = str(value) return ET.tostring(root, encoding="unicode") else: root = ET.Element("document") root.text = str(content) return ET.tostring(root, encoding="unicode") def _convert_to_yaml(self, content: Any) -> str: """Convert content to YAML format""" try: import yaml return yaml.dump(content, default_flow_style=False, allow_unicode=True) except ImportError: # Fallback to simple string representation return str(content) def _convert_to_html(self, content: Any) -> str: """Convert content to HTML format""" if isinstance(content, dict): html = "<html><body>\n" for key, value in content.items(): html += f"<h3>{key}</h3>\n<p>{value}</p>\n" html += "</body></html>" return html else: return f"<html><body><pre>{str(content)}</pre></body></html>" def _convert_to_markdown(self, content: Any) -> str: """Convert content to Markdown format""" if isinstance(content, dict): md = "" for key, value in content.items(): md += f"## {key}\n\n{value}\n\n" return md else: return str(content) # Versioning methods def _get_next_version(self, file_path: str) -> int: """Get next version number for file""" version_file = f"{file_path}.versions.json" versions = self._load_version_history(version_file) return len(versions) + 1 def _load_version_history(self, version_file: str) -> List[Dict]: """Load version history from file""" try: if os.path.exists(version_file): with open(version_file, "r") as f: return cast(List[Dict[Any, Any]], json.load(f)) except Exception: pass return [] def _save_version_history(self, version_file: str, versions: List[Dict]): """Save version history to file""" try: with open(version_file, "w") as f: json.dump(versions, f, indent=2) except Exception as e: self.logger.warning(f"Failed to save version history: {e}") # Backup and rollback methods def _backup_cloud_file(self, source_path: str, backup_path: str) -> str: """Backup cloud file""" # Simplified implementation return backup_path def _rollback_from_backup(self, target_path: str, backup_info: Dict): """Rollback file from backup""" try: if backup_info and os.path.exists(backup_info["backup_path"]): shutil.copy2(backup_info["backup_path"], target_path) self.logger.info(f"Rolled back {target_path} from backup") except Exception as e: self.logger.error(f"Rollback failed: {e}") def _rollback_batch_operations(self, completed_operations: List[Dict], backup_operations: List[Dict]): """Rollback batch operations""" for op in reversed(completed_operations): try: result = op.get("result", {}) backup_info = result.get("backup_info") if backup_info: self._rollback_from_backup(result["write_result"]["path"], backup_info) except Exception as e: self.logger.error(f"Batch rollback failed for operation: {e}") def _log_write_operation( self, operation_id: str, target_path: str, mode: WriteMode, write_result: Dict, backup_info: Optional[Dict], ) -> Dict: """Log write operation for audit""" audit_info = { "operation_id": operation_id, "timestamp": datetime.now().isoformat(), "target_path": target_path, "mode": mode, "success": True, "file_size": write_result.get("size", 0), "checksum": write_result.get("checksum"), "backup_created": backup_info is not None, } # Log to audit file try: audit_file = os.path.join(self.config.temp_dir, "write_audit.log") with open(audit_file, "a") as f: f.write(json.dumps(audit_info) + "\n") except Exception as e: self.logger.warning(f"Audit logging failed: {e}") return audit_info
[docs] def edit_document( self, target_path: str, operation: EditOperation, content: Optional[str] = None, position: Optional[Dict[str, Any]] = None, selection: Optional[Dict[str, Any]] = None, format_options: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Perform advanced editing operations on documents Args: target_path: Target file path operation: Edit operation to perform content: Content for the operation (if applicable) position: Position info (line, column, offset) selection: Text selection range format_options: Additional format options Returns: Dict containing edit results """ try: start_time = datetime.now() operation_id = str(uuid.uuid4()) self.logger.info(f"Starting edit operation {operation_id}: {operation} on {target_path}") # Read current document content current_content = self._read_document_content(target_path) # Perform the specific edit operation if operation == EditOperation.INSERT_TEXT: if content is None: raise ValueError("content is required for INSERT_TEXT operation") edited_content = self._insert_text(current_content, content, position) elif operation == EditOperation.DELETE_TEXT: edited_content = self._delete_text(current_content, selection) elif operation == EditOperation.REPLACE_TEXT: if content is None: raise ValueError("content is required for REPLACE_TEXT operation") edited_content = self._replace_text(current_content, selection, content) elif operation == EditOperation.BOLD: edited_content = self._format_text_bold(current_content, selection, format_options) elif operation == EditOperation.ITALIC: edited_content = self._format_text_italic(current_content, selection, format_options) elif operation == EditOperation.UNDERLINE: edited_content = self._format_text_underline(current_content, selection, format_options) elif operation == EditOperation.STRIKETHROUGH: edited_content = self._format_text_strikethrough(current_content, selection, format_options) elif operation == EditOperation.HIGHLIGHT: edited_content = self._format_text_highlight(current_content, selection, format_options) elif operation == EditOperation.INSERT_LINE: if content is None: raise ValueError("content is required for INSERT_LINE operation") edited_content = self._insert_line(current_content, position, content) elif operation == EditOperation.DELETE_LINE: edited_content = self._delete_line(current_content, position) elif operation == EditOperation.MOVE_LINE: edited_content = self._move_line(current_content, position, format_options) elif operation == EditOperation.COPY_TEXT: return self._copy_text(current_content, selection) elif operation == EditOperation.CUT_TEXT: edited_content, cut_content = self._cut_text(current_content, selection) # Store cut content in clipboard self._store_clipboard_content(cut_content) elif operation == EditOperation.PASTE_TEXT: clipboard_content = self._get_clipboard_content() edited_content = self._paste_text(current_content, position, clipboard_content) else: raise ValueError(f"Unsupported edit operation: {operation}") # Write the edited content back to file file_format_str = self._detect_file_format(target_path) file_format = DocumentFormat(file_format_str) if file_format_str in [f.value for f in DocumentFormat] else DocumentFormat.TXT write_result = self.write_document( target_path=target_path, content=edited_content, format=file_format, mode=WriteMode.BACKUP_WRITE, # Always backup before editing backup_comment=f"Edit operation: {operation}", ) result = { "operation_id": operation_id, "target_path": target_path, "operation": operation, "edit_metadata": { "original_size": len(current_content), "edited_size": (len(edited_content) if isinstance(edited_content, str) else 0), "position": position, "selection": selection, }, "write_result": write_result, "processing_metadata": { "start_time": start_time.isoformat(), "end_time": datetime.now().isoformat(), "duration": (datetime.now() - start_time).total_seconds(), }, } self.logger.info(f"Edit operation {operation_id} completed successfully") return result except Exception as e: raise DocumentWriterError(f"Edit operation failed: {str(e)}")
[docs] def format_text( self, target_path: str, text_to_format: str, format_type: EditOperation, format_options: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Apply formatting to specific text in a document Args: target_path: Target file path text_to_format: Text to apply formatting to format_type: Type of formatting (bold, italic, etc.) format_options: Additional format options Returns: Dict containing formatting results """ try: current_content = self._read_document_content(target_path) # Find all occurrences of the text formatted_content = self._apply_text_formatting(current_content, text_to_format, format_type, format_options) # Write back to file file_format_str = self._detect_file_format(target_path) file_format = DocumentFormat(file_format_str) if file_format_str in [f.value for f in DocumentFormat] else DocumentFormat.TXT write_result = self.write_document( target_path=target_path, content=formatted_content, format=file_format, mode=WriteMode.BACKUP_WRITE, ) return { "target_path": target_path, "text_formatted": text_to_format, "format_type": format_type, "write_result": write_result, } except Exception as e: raise DocumentWriterError(f"Text formatting failed: {str(e)}")
[docs] def find_replace( self, target_path: str, find_text: str, replace_text: str, replace_all: bool = False, occurrence: Optional[int] = None, start_line: Optional[int] = None, end_line: Optional[int] = None, case_sensitive: bool = True, regex_mode: bool = False, ) -> Dict[str, Any]: """ Find and replace text in a document with precise control Args: target_path: Target file path find_text: Text to find replace_text: Text to replace with replace_all: Replace all occurrences (ignored if occurrence is set) occurrence: Replace only the nth occurrence (1-based index). If None, uses replace_all start_line: Start line number (1-based, inclusive) to limit search range end_line: End line number (1-based, inclusive) to limit search range case_sensitive: Case sensitive search regex_mode: Use regex for find/replace Returns: Dict containing find/replace results including: - target_path: Path to the file - find_text: Text that was searched for - replace_text: Replacement text - replacements_made: Number of replacements made - occurrence_replaced: Which occurrence was replaced (if occurrence was specified) - line_range: Line range used (if specified) - write_result: Result of the write operation Examples: # Replace first occurrence find_replace(path, "old", "new", replace_all=False) # Replace all occurrences find_replace(path, "old", "new", replace_all=True) # Replace 3rd occurrence only find_replace(path, "old", "new", occurrence=3) # Replace all occurrences in lines 10-50 find_replace(path, "old", "new", replace_all=True, start_line=10, end_line=50) # Replace 2nd occurrence in lines 10-50 find_replace(path, "old", "new", occurrence=2, start_line=10, end_line=50) """ try: current_content = self._read_document_content(target_path) # Perform find and replace new_content, replacements, occurrence_info = self._perform_find_replace( current_content, find_text, replace_text, replace_all, occurrence, start_line, end_line, case_sensitive, regex_mode, ) if replacements > 0: # Write back to file file_format_str = self._detect_file_format(target_path) file_format = DocumentFormat(file_format_str) if file_format_str in [f.value for f in DocumentFormat] else DocumentFormat.TXT # Build backup comment comment_parts = [f"Find/Replace: '{find_text}' -> '{replace_text}'"] if occurrence: comment_parts.append(f"occurrence={occurrence}") if start_line or end_line: comment_parts.append(f"lines {start_line or 1}-{end_line or 'end'}") write_result = self.write_document( target_path=target_path, content=new_content, format=file_format, mode=WriteMode.BACKUP_WRITE, backup_comment=", ".join(comment_parts), ) result = { "target_path": target_path, "find_text": find_text, "replace_text": replace_text, "replacements_made": replacements, "write_result": write_result, } # Add occurrence info if available if occurrence_info: result.update(occurrence_info) # Add line range info if specified if start_line or end_line: result["line_range"] = {"start": start_line, "end": end_line} return result else: result = { "target_path": target_path, "find_text": find_text, "replace_text": replace_text, "replacements_made": 0, "message": "No matches found", } if start_line or end_line: result["line_range"] = {"start": start_line, "end": end_line} return result except Exception as e: raise DocumentWriterError(f"Find/replace operation failed: {str(e)}")
[docs] def search_replace_blocks( self, target_path: str, blocks: str, case_sensitive: bool = True, ) -> Dict[str, Any]: """ Parse and execute SEARCH/REPLACE blocks (Cline/Claude Code format) This method accepts a string containing one or more SEARCH/REPLACE blocks and executes them sequentially. This format is commonly used by AI coding assistants like Cline and Claude Code. Args: target_path: Target file path blocks: String containing SEARCH/REPLACE blocks case_sensitive: Case sensitive search (default: True) Returns: Dict containing results of all replacements Format: <<<<<<< SEARCH old text to find ======= new text to replace with >>>>>>> REPLACE Multiple blocks can be provided in sequence. Example: blocks = ''' <<<<<<< SEARCH def old_function(): pass ======= def new_function(): return True >>>>>>> REPLACE <<<<<<< SEARCH OLD_CONSTANT = 1 ======= NEW_CONSTANT = 2 >>>>>>> REPLACE ''' result = tool.search_replace_blocks("file.py", blocks) """ try: # Parse the blocks parsed_blocks = self._parse_search_replace_blocks(blocks) if not parsed_blocks: return { "target_path": target_path, "blocks_processed": 0, "blocks_successful": 0, "total_replacements": 0, "message": "No valid SEARCH/REPLACE blocks found", "errors": ["No blocks could be parsed from input"], } # Execute each block sequentially results = [] total_replacements = 0 errors = [] for i, block in enumerate(parsed_blocks, 1): search_text = block["search"] replace_text = block["replace"] try: # Execute find_replace for this block result = self.find_replace( target_path=target_path, find_text=search_text, replace_text=replace_text, replace_all=False, # Replace first occurrence only case_sensitive=case_sensitive, regex_mode=False, ) results.append( { "block_number": i, "search": search_text[:100] + "..." if len(search_text) > 100 else search_text, "replace": replace_text[:100] + "..." if len(replace_text) > 100 else replace_text, "replacements": result.get("replacements_made", 0), "success": result.get("replacements_made", 0) > 0, } ) total_replacements += result.get("replacements_made", 0) if result.get("replacements_made", 0) == 0: errors.append(f"Block {i}: No match found for search text") except Exception as e: errors.append(f"Block {i}: {str(e)}") results.append({"block_number": i, "error": str(e), "success": False}) return { "target_path": target_path, "blocks_processed": len(parsed_blocks), "blocks_successful": sum(1 for r in results if r.get("success", False)), "total_replacements": total_replacements, "results": results, "errors": errors if errors else None, } except Exception as e: raise DocumentWriterError(f"SEARCH/REPLACE blocks operation failed: {str(e)}")
def _parse_search_replace_blocks(self, blocks_text: str) -> List[Dict[str, str]]: """ Parse SEARCH/REPLACE blocks from text Args: blocks_text: Text containing SEARCH/REPLACE blocks Returns: List of dicts with 'search' and 'replace' keys """ import re # Pattern to match SEARCH/REPLACE blocks # Supports both <<<<<<< and <<<<<< (6 or 7 angle brackets) pattern = r"<{6,7}\s*SEARCH\s*\n(.*?)\n={7}\s*\n(.*?)\n>{6,7}\s*REPLACE" matches = re.findall(pattern, blocks_text, re.DOTALL) parsed_blocks = [] for search_text, replace_text in matches: parsed_blocks.append({"search": search_text, "replace": replace_text}) return parsed_blocks # Helper methods for editing operations def _read_document_content(self, file_path: str) -> str: """Read document content for editing""" try: with open(file_path, "r", encoding="utf-8") as f: return f.read() except UnicodeDecodeError: # Try with different encodings for encoding in ["gbk", "latin1", "cp1252"]: try: with open(file_path, "r", encoding=encoding) as f: return f.read() except Exception: continue raise DocumentWriterError(f"Cannot decode file: {file_path}") except Exception as e: raise DocumentWriterError(f"Cannot read file {file_path}: {str(e)}") def _detect_file_format(self, file_path: str) -> str: """Detect file format from extension""" ext = os.path.splitext(file_path)[1].lower() format_map = { ".txt": "txt", ".json": "json", ".csv": "csv", ".xml": "xml", ".html": "html", ".htm": "html", ".md": "markdown", ".markdown": "markdown", ".yaml": "yaml", ".yml": "yaml", } return format_map.get(ext, "txt") def _insert_text(self, content: str, text: str, position: Optional[Dict[str, Any]]) -> str: """Insert text at specified position""" if not position: return content + text if "offset" in position: offset = position["offset"] return content[:offset] + text + content[offset:] elif "line" in position: lines = content.split("\n") line_num = position.get("line", 0) column = position.get("column", 0) if line_num < len(lines): line = lines[line_num] lines[line_num] = line[:column] + text + line[column:] else: lines.append(text) return "\n".join(lines) else: return content + text def _delete_text(self, content: str, selection: Optional[Dict[str, Any]]) -> str: """Delete text in specified selection""" if not selection: return content if "start_offset" in selection and "end_offset" in selection: start = selection["start_offset"] end = selection["end_offset"] return content[:start] + content[end:] elif "start_line" in selection and "end_line" in selection: lines = content.split("\n") start_line = selection["start_line"] end_line = selection["end_line"] start_col = selection.get("start_column", 0) end_col = selection.get( "end_column", len(lines[end_line]) if end_line < len(lines) else 0, ) if start_line == end_line: # Same line deletion line = lines[start_line] lines[start_line] = line[:start_col] + line[end_col:] else: # Multi-line deletion lines[start_line] = lines[start_line][:start_col] if end_line < len(lines): lines[start_line] += lines[end_line][end_col:] del lines[start_line + 1 : end_line + 1] return "\n".join(lines) return content def _replace_text( self, content: str, selection: Optional[Dict[str, Any]], replacement: str, ) -> str: """Replace text in specified selection""" if not selection: return content # First delete the selected text, then insert replacement content_after_delete = self._delete_text(content, selection) # Calculate new insertion position after deletion if "start_offset" in selection: insert_pos = {"offset": selection["start_offset"]} elif "start_line" in selection: insert_pos = { "line": selection["start_line"], "column": selection.get("start_column", 0), } else: insert_pos = None return self._insert_text(content_after_delete, replacement, insert_pos) def _format_text_bold( self, content: str, selection: Optional[Dict[str, Any]], options: Optional[Dict[str, Any]], ) -> str: """Apply bold formatting to selected text""" if not selection: return content format_type = options.get("format_type", "markdown") if options else "markdown" if format_type == "markdown": return self._apply_markdown_formatting(content, selection, "**", "**") elif format_type == "html": return self._apply_html_formatting(content, selection, "<strong>", "</strong>") else: return content def _format_text_italic( self, content: str, selection: Optional[Dict[str, Any]], options: Optional[Dict[str, Any]], ) -> str: """Apply italic formatting to selected text""" if not selection: return content format_type = options.get("format_type", "markdown") if options else "markdown" if format_type == "markdown": return self._apply_markdown_formatting(content, selection, "*", "*") elif format_type == "html": return self._apply_html_formatting(content, selection, "<em>", "</em>") else: return content def _format_text_underline( self, content: str, selection: Optional[Dict[str, Any]], options: Optional[Dict[str, Any]], ) -> str: """Apply underline formatting to selected text""" if not selection: return content format_type = options.get("format_type", "html") if options else "html" if format_type == "html": return self._apply_html_formatting(content, selection, "<u>", "</u>") else: return content def _format_text_strikethrough( self, content: str, selection: Optional[Dict[str, Any]], options: Optional[Dict[str, Any]], ) -> str: """Apply strikethrough formatting to selected text""" if not selection: return content format_type = options.get("format_type", "markdown") if options else "markdown" if format_type == "markdown": return self._apply_markdown_formatting(content, selection, "~~", "~~") elif format_type == "html": return self._apply_html_formatting(content, selection, "<del>", "</del>") else: return content def _format_text_highlight( self, content: str, selection: Optional[Dict[str, Any]], options: Optional[Dict[str, Any]], ) -> str: """Apply highlight formatting to selected text""" if not selection: return content format_type = options.get("format_type", "html") if options else "html" color = options.get("color", "yellow") if options else "yellow" if format_type == "html": return self._apply_html_formatting( content, selection, f'<mark style="background-color: {color}">', "</mark>", ) elif format_type == "markdown": return self._apply_markdown_formatting(content, selection, "==", "==") else: return content def _apply_markdown_formatting( self, content: str, selection: Dict[str, Any], start_marker: str, end_marker: str, ) -> str: """Apply markdown formatting to selected text""" selected_text = self._extract_selected_text(content, selection) formatted_text = start_marker + selected_text + end_marker return self._replace_text(content, selection, formatted_text) def _apply_html_formatting( self, content: str, selection: Dict[str, Any], start_tag: str, end_tag: str, ) -> str: """Apply HTML formatting to selected text""" selected_text = self._extract_selected_text(content, selection) formatted_text = start_tag + selected_text + end_tag return self._replace_text(content, selection, formatted_text) def _extract_selected_text(self, content: str, selection: Dict[str, Any]) -> str: """Extract text from selection""" if "start_offset" in selection and "end_offset" in selection: return content[selection["start_offset"] : selection["end_offset"]] elif "start_line" in selection and "end_line" in selection: lines = content.split("\n") start_line = selection["start_line"] end_line = selection["end_line"] start_col = selection.get("start_column", 0) end_col = selection.get( "end_column", len(lines[end_line]) if end_line < len(lines) else 0, ) if start_line == end_line: return cast(str, lines[start_line][start_col:end_col]) else: result = [lines[start_line][start_col:]] result.extend(lines[start_line + 1 : end_line]) if end_line < len(lines): result.append(lines[end_line][:end_col]) return "\n".join(result) return "" def _insert_line( self, content: str, position: Optional[Dict[str, Any]], line_content: str, ) -> str: """Insert a new line at specified position""" lines = content.split("\n") line_num = position.get("line", len(lines)) if position else len(lines) lines.insert(line_num, line_content) return "\n".join(lines) def _delete_line(self, content: str, position: Optional[Dict[str, Any]]) -> str: """Delete line at specified position""" lines = content.split("\n") line_num = position.get("line", 0) if position else 0 if 0 <= line_num < len(lines): del lines[line_num] return "\n".join(lines) def _move_line( self, content: str, position: Optional[Dict[str, Any]], options: Optional[Dict[str, Any]], ) -> str: """Move line to different position""" lines = content.split("\n") from_line = position.get("line", 0) if position else 0 to_line = options.get("to_line", 0) if options else 0 if 0 <= from_line < len(lines) and 0 <= to_line < len(lines): line_content = lines.pop(from_line) lines.insert(to_line, line_content) return "\n".join(lines) def _copy_text(self, content: str, selection: Optional[Dict[str, Any]]) -> Dict[str, Any]: """Copy selected text to clipboard""" selected_text = self._extract_selected_text(content, selection) if selection else content self._store_clipboard_content(selected_text) return { "operation": "copy", "copied_text": selected_text, "copied_length": len(selected_text), } def _cut_text(self, content: str, selection: Optional[Dict[str, Any]]) -> Tuple[str, str]: """Cut selected text (copy and delete)""" selected_text = self._extract_selected_text(content, selection) if selection else content new_content = self._delete_text(content, selection) if selection else "" return new_content, selected_text def _paste_text( self, content: str, position: Optional[Dict[str, Any]], clipboard_content: str, ) -> str: """Paste text from clipboard""" return self._insert_text(content, clipboard_content, position) def _store_clipboard_content(self, content: str): """Store content in clipboard (simplified implementation)""" clipboard_file = os.path.join(self.config.temp_dir, "clipboard.txt") try: with open(clipboard_file, "w", encoding="utf-8") as f: f.write(content) except Exception as e: self.logger.warning(f"Failed to store clipboard content: {e}") def _get_clipboard_content(self) -> str: """Get content from clipboard""" clipboard_file = os.path.join(self.config.temp_dir, "clipboard.txt") try: with open(clipboard_file, "r", encoding="utf-8") as f: return f.read() except Exception: return "" def _apply_text_formatting( self, content: str, text_to_format: str, format_type: EditOperation, options: Optional[Dict[str, Any]], ) -> str: """Apply formatting to all occurrences of specific text""" if format_type == EditOperation.BOLD: replacement = f"**{text_to_format}**" elif format_type == EditOperation.ITALIC: replacement = f"*{text_to_format}*" elif format_type == EditOperation.UNDERLINE: replacement = f"<u>{text_to_format}</u>" elif format_type == EditOperation.STRIKETHROUGH: replacement = f"~~{text_to_format}~~" elif format_type == EditOperation.HIGHLIGHT: color = options.get("color", "yellow") if options else "yellow" replacement = f'<mark style="background-color: {color}">{text_to_format}</mark>' else: replacement = text_to_format return content.replace(text_to_format, replacement) def _perform_find_replace( self, content: str, find_text: str, replace_text: str, replace_all: bool, occurrence: Optional[int] = None, start_line: Optional[int] = None, end_line: Optional[int] = None, case_sensitive: bool = True, regex_mode: bool = False, ) -> Tuple[str, int, Dict[str, Any]]: """ Perform find and replace operation with precise control Args: content: Content to search in find_text: Text to find replace_text: Text to replace with replace_all: Replace all occurrences (ignored if occurrence is set) occurrence: Replace only the nth occurrence (1-based) start_line: Start line number (1-based, inclusive) end_line: End line number (1-based, inclusive) case_sensitive: Case sensitive search regex_mode: Use regex for find/replace Returns: Tuple of (new_content, replacements_count, occurrence_info) """ import re # noqa: F401 replacements = 0 occurrence_info = {} # If line range is specified, extract that portion if start_line is not None or end_line is not None: lines = content.split("\n") total_lines = len(lines) # Convert to 0-based indices start_idx = (start_line - 1) if start_line else 0 end_idx = (end_line) if end_line else total_lines # Validate line range if start_idx < 0 or start_idx >= total_lines: return content, 0, {"error": f"start_line {start_line} out of range (1-{total_lines})"} if end_idx > total_lines: end_idx = total_lines # Extract the target range before_lines = lines[:start_idx] target_lines = lines[start_idx:end_idx] after_lines = lines[end_idx:] target_content = "\n".join(target_lines) # Perform replacement on target content new_target_content, replacements, occ_info = self._perform_find_replace_core( target_content, find_text, replace_text, replace_all, occurrence, case_sensitive, regex_mode, ) # Reconstruct full content new_content = "\n".join(before_lines + [new_target_content] + after_lines) occurrence_info = occ_info else: # No line range, process entire content new_content, replacements, occurrence_info = self._perform_find_replace_core( content, find_text, replace_text, replace_all, occurrence, case_sensitive, regex_mode, ) return new_content, replacements, occurrence_info def _perform_find_replace_core( self, content: str, find_text: str, replace_text: str, replace_all: bool, occurrence: Optional[int] = None, case_sensitive: bool = True, regex_mode: bool = False, ) -> Tuple[str, int, Dict[str, Any]]: """ Core find and replace logic without line range handling Returns: Tuple of (new_content, replacements_count, occurrence_info) """ import re replacements = 0 occurrence_info = {} # If occurrence is specified, it takes precedence if occurrence is not None: if occurrence < 1: return content, 0, {"error": "occurrence must be >= 1"} # Find all matches first matches: List[Any] = [] if regex_mode: flags = 0 if case_sensitive else re.IGNORECASE matches = list(re.finditer(find_text, content, flags=flags)) else: if case_sensitive: # Find all occurrences manually matches = [] start = 0 while True: pos = content.find(find_text, start) if pos == -1: break matches.append((pos, pos + len(find_text))) start = pos + 1 else: # Case insensitive - use regex pattern = re.escape(find_text) matches = list(re.finditer(pattern, content, flags=re.IGNORECASE)) # Check if the requested occurrence exists if occurrence > len(matches): return content, 0, {"error": f"occurrence {occurrence} not found (only {len(matches)} matches)", "total_matches": len(matches)} # Replace only the specified occurrence (1-based to 0-based) target_match = matches[occurrence - 1] if regex_mode or not case_sensitive: # Match object start_pos = target_match.start() end_pos = target_match.end() else: # Tuple start_pos, end_pos = target_match new_content = content[:start_pos] + replace_text + content[end_pos:] replacements = 1 occurrence_info = {"occurrence_replaced": occurrence, "total_matches": len(matches), "position": start_pos} else: # Standard replace_all or replace_first logic if regex_mode: flags = 0 if case_sensitive else re.IGNORECASE if replace_all: new_content, replacements = re.subn(find_text, replace_text, content, flags=flags) else: new_content = re.sub(find_text, replace_text, content, count=1, flags=flags) replacements = 1 if new_content != content else 0 else: if case_sensitive: if replace_all: replacements = content.count(find_text) new_content = content.replace(find_text, replace_text) else: new_content = content.replace(find_text, replace_text, 1) replacements = 1 if new_content != content else 0 else: # Case insensitive replacement pattern = re.escape(find_text) if replace_all: new_content, replacements = re.subn(pattern, replace_text, content, flags=re.IGNORECASE) else: new_content = re.sub( pattern, replace_text, content, count=1, flags=re.IGNORECASE, ) replacements = 1 if new_content != content else 0 return new_content, replacements, occurrence_info
DocumentWriterSettings = DocumentWriterTool.Config