본문 바로가기

카테고리 없음

44

import os
import glob
import re
from pathlib import Path
from typing import Iterator, Optional, Union

def filter_lines_memory_efficient(input_file: str, output_file: str, keyword: str, 
                                 case_sensitive: bool = True, encoding: str = 'utf-8') -> int:
    """
    메모리 효율적인 줄 단위 필터링 - 대용량 파일 지원
    
    Args:
        input_file: 입력 파일 경로
        output_file: 출력 파일 경로
        keyword: 검색할 키워드
        case_sensitive: 대소문자 구분 여부
        encoding: 파일 인코딩
    
    Returns:
        필터링된 줄 수
    """
    try:
        line_count = 0
        search_keyword = keyword if case_sensitive else keyword.lower()
        
        # 줄 단위로 스트리밍 처리 - 메모리 효율적
        with open(input_file, 'r', encoding=encoding, buffering=8192) as infile, \
             open(output_file, 'w', encoding=encoding, buffering=8192) as outfile:
            
            for line in infile:  # 파이썬의 내장 이터레이터 사용
                check_line = line if case_sensitive else line.lower()
                if search_keyword in check_line:
                    outfile.write(line)
                    line_count += 1
        
        print(f"✅ {Path(input_file).name}: {line_count:,}줄 필터링 완료")
        return line_count
        
    except FileNotFoundError:
        print(f"❌ 파일을 찾을 수 없습니다: {input_file}")
        return 0
    except PermissionError:
        print(f"❌ 파일 접근 권한이 없습니다: {input_file}")
        return 0
    except UnicodeDecodeError as e:
        print(f"❌ 인코딩 오류 {input_file}: {e}")
        print(f"💡 다른 인코딩 시도: cp949, euc-kr")
        return 0
    except Exception as e:
        print(f"❌ 예상치 못한 오류 {input_file}: {e}")
        return 0

def filter_with_regex_efficient(input_file: str, output_file: str, pattern: str,
                               flags: int = 0, encoding: str = 'utf-8') -> int:
    """
    정규표현식 기반 메모리 효율적 필터링
    
    Args:
        input_file: 입력 파일 경로
        output_file: 출력 파일 경로  
        pattern: 정규표현식 패턴
        flags: re 플래그
        encoding: 파일 인코딩
    """
    try:
        compiled_pattern = re.compile(pattern, flags)
        line_count = 0
        
        with open(input_file, 'r', encoding=encoding, buffering=8192) as infile, \
             open(output_file, 'w', encoding=encoding, buffering=8192) as outfile:
            
            for line in infile:
                if compiled_pattern.search(line):
                    outfile.write(line)
                    line_count += 1
                    
        print(f"✅ 정규표현식 필터링: {line_count:,}줄 처리됨")
        return line_count
        
    except re.error as e:
        print(f"❌ 정규표현식 오류: {e}")
        return 0
    except Exception as e:
        print(f"❌ 필터링 오류: {e}")
        return 0

def detect_encoding(file_path: str) -> str:
    """
    파일 인코딩 자동 감지
    """
    encodings_to_try = ['utf-8', 'cp949', 'euc-kr', 'utf-16', 'latin1']
    
    for encoding in encodings_to_try:
        try:
            with open(file_path, 'r', encoding=encoding) as f:
                f.read(1024)  # 첫 1KB만 테스트
                return encoding
        except (UnicodeDecodeError, UnicodeError):
            continue
    
    return 'utf-8'  # 기본값

def batch_filter_files_improved(directory: str, keyword: str, 
                               pattern: str = "*.txt", case_sensitive: bool = True,
                               backup: bool = True) -> dict:
    """
    개선된 일괄 파일 처리 - 인코딩 자동감지, 백업 옵션
    
    Args:
        directory: 처리할 디렉토리
        keyword: 검색 키워드
        pattern: 파일 패턴
        case_sensitive: 대소문자 구분
        backup: 원본 백업 여부
    
    Returns:
        처리 결과 딕셔너리
    """
    files = list(Path(directory).glob(pattern))
    
    if not files:
        print(f"❌ {directory}에서 {pattern} 패턴 파일을 찾을 수 없습니다.")
        return {"processed": 0, "total": 0, "lines": 0}
    
    print(f"🔍 {len(files)}개 파일에서 '{keyword}' 검색 시작...")
    
    processed_files = 0
    total_lines = 0
    failed_files = []
    
    for file_path in files:
        # 인코딩 자동 감지
        encoding = detect_encoding(str(file_path))
        print(f"📄 {file_path.name} (인코딩: {encoding})")
        
        # 백업 생성
        if backup:
            backup_path = file_path.with_suffix('.bak' + file_path.suffix)
            if not backup_path.exists():
                file_path.replace(backup_path)
                file_path = backup_path
        
        # 출력 파일명
        output_path = file_path.parent / f"{file_path.stem}_filtered{file_path.suffix}"
        
        lines_found = filter_lines_memory_efficient(
            str(file_path), str(output_path), keyword, 
            case_sensitive=case_sensitive, encoding=encoding
        )
        
        if lines_found > 0:
            processed_files += 1
            total_lines += lines_found
        elif lines_found == -1:  # 오류 발생
            failed_files.append(str(file_path))
    
    # 결과 요약
    result = {
        "processed": processed_files,
        "total": len(files),
        "lines": total_lines,
        "failed": failed_files
    }
    
    print(f"\n📊 처리 완료:")
    print(f"   ✅ 성공: {processed_files}/{len(files)}개 파일")
    print(f"   📝 총 필터링된 줄: {total_lines:,}")
    if failed_files:
        print(f"   ❌ 실패한 파일: {len(failed_files)}개")
        for failed in failed_files[:3]:  # 최대 3개만 표시
            print(f"      - {Path(failed).name}")
    
    return result

def get_file_info(file_path: str) -> dict:
    """
    파일 정보 확인 - 크기, 줄 수 등
    """
    try:
        path = Path(file_path)
        size = path.stat().st_size
        
        # 줄 수 계산 (메모리 효율적)
        line_count = 0
        encoding = detect_encoding(file_path)
        with open(file_path, 'r', encoding=encoding) as f:
            for _ in f:
                line_count += 1
        
        return {
            "size_mb": round(size / (1024 * 1024), 2),
            "lines": line_count,
            "encoding": encoding
        }
    except Exception as e:
        return {"error": str(e)}

def interactive_mode():
    """
    대화형 모드 - 사용자 친화적 인터페이스
    """
    print("=" * 50)
    print("🔍 텍스트 파일 필터링 도구 v2.0")
    print("=" * 50)
    
    while True:
        print("\n처리 방식 선택:")
        print("1. 단일 파일 처리")
        print("2. 여러 파일 일괄 처리")
        print("3. 정규표현식 필터링")
        print("4. 파일 정보 확인")
        print("5. 종료")
        
        choice = input("\n선택 (1-5): ").strip()
        
        if choice == "1":
            # 단일 파일 처리
            input_file = input("입력 파일 경로: ").strip()
            if not Path(input_file).exists():
                print("❌ 파일이 존재하지 않습니다.")
                continue
                
            keyword = input("검색할 키워드 (기본: 삐삐): ").strip() or "삐삐"
            case_sensitive = input("대소문자 구분? (y/N): ").strip().lower() == 'y'
            
            # 출력 파일명 자동 생성
            input_path = Path(input_file)
            output_file = str(input_path.parent / f"{input_path.stem}_filtered{input_path.suffix}")
            
            # 파일 정보 표시
            info = get_file_info(input_file)
            if "error" not in info:
                print(f"📊 파일 정보: {info['size_mb']}MB, {info['lines']:,}줄, {info['encoding']}")
            
            filter_lines_memory_efficient(input_file, output_file, keyword, case_sensitive)
            
        elif choice == "2":
            # 여러 파일 일괄 처리
            directory = input("디렉토리 경로 (기본: 현재 폴더): ").strip() or "."
            pattern = input("파일 패턴 (기본: *.txt): ").strip() or "*.txt"
            keyword = input("검색할 키워드 (기본: 삐삐): ").strip() or "삐삐"
            case_sensitive = input("대소문자 구분? (y/N): ").strip().lower() == 'y'
            backup = input("원본 백업? (Y/n): ").strip().lower() != 'n'
            
            batch_filter_files_improved(directory, keyword, pattern, case_sensitive, backup)
            
        elif choice == "3":
            # 정규표현식 필터링
            input_file = input("입력 파일 경로: ").strip()
            if not Path(input_file).exists():
                print("❌ 파일이 존재하지 않습니다.")
                continue
                
            pattern = input("정규표현식 패턴: ").strip()
            ignore_case = input("대소문자 무시? (y/N): ").strip().lower() == 'y'
            flags = re.IGNORECASE if ignore_case else 0
            
            input_path = Path(input_file)
            output_file = str(input_path.parent / f"{input_path.stem}_regex_filtered{input_path.suffix}")
            
            filter_with_regex_efficient(input_file, output_file, pattern, flags)
            
        elif choice == "4":
            # 파일 정보 확인
            file_path = input("파일 경로: ").strip()
            if Path(file_path).exists():
                info = get_file_info(file_path)
                if "error" in info:
                    print(f"❌ 오류: {info['error']}")
                else:
                    print(f"📊 파일 정보:")
                    print(f"   크기: {info['size_mb']}MB")
                    print(f"   줄 수: {info['lines']:,}")
                    print(f"   인코딩: {info['encoding']}")
            else:
                print("❌ 파일이 존재하지 않습니다.")
                
        elif choice == "5":
            print("👋 프로그램을 종료합니다.")
            break
        else:
            print("❌ 잘못된 선택입니다.")

# 성능 테스트 함수
def performance_test(file_path: str, keyword: str):
    """
    성능 테스트 - 처리 시간 및 메모리 사용량 측정
    """
    import time
    import psutil
    import os
    
    print(f"🚀 성능 테스트 시작: {Path(file_path).name}")
    
    # 메모리 사용량 측정
    process = psutil.Process(os.getpid())
    memory_before = process.memory_info().rss / 1024 / 1024  # MB
    
    start_time = time.time()
    
    # 임시 출력 파일
    temp_output = file_path + ".temp_filtered"
    
    lines_found = filter_lines_memory_efficient(file_path, temp_output, keyword)
    
    end_time = time.time()
    memory_after = process.memory_info().rss / 1024 / 1024  # MB
    
    # 임시 파일 삭제
    if Path(temp_output).exists():
        Path(temp_output).unlink()
    
    print(f"📊 성능 결과:")
    print(f"   처리 시간: {end_time - start_time:.2f}초")
    print(f"   메모리 사용량: {memory_before:.1f} → {memory_after:.1f} MB")
    print(f"   메모리 증가: {memory_after - memory_before:.1f} MB")
    print(f"   처리된 줄: {lines_found:,}")

if __name__ == "__main__":
    interactive_mode()