import os
import glob
import re
from pathlib import Path
from typing import Iterator, Optional, Union
def filter_lines_memory_efficient(input_file: str, output_file: str, keyword: str,
case_sensitive: bool = True, encoding: str = 'utf-8') -> int:
"""
메모리 효율적인 줄 단위 필터링 - 대용량 파일 지원
Args:
input_file: 입력 파일 경로
output_file: 출력 파일 경로
keyword: 검색할 키워드
case_sensitive: 대소문자 구분 여부
encoding: 파일 인코딩
Returns:
필터링된 줄 수
"""
try:
line_count = 0
search_keyword = keyword if case_sensitive else keyword.lower()
# 줄 단위로 스트리밍 처리 - 메모리 효율적
with open(input_file, 'r', encoding=encoding, buffering=8192) as infile, \
open(output_file, 'w', encoding=encoding, buffering=8192) as outfile:
for line in infile: # 파이썬의 내장 이터레이터 사용
check_line = line if case_sensitive else line.lower()
if search_keyword in check_line:
outfile.write(line)
line_count += 1
print(f"✅ {Path(input_file).name}: {line_count:,}줄 필터링 완료")
return line_count
except FileNotFoundError:
print(f"❌ 파일을 찾을 수 없습니다: {input_file}")
return 0
except PermissionError:
print(f"❌ 파일 접근 권한이 없습니다: {input_file}")
return 0
except UnicodeDecodeError as e:
print(f"❌ 인코딩 오류 {input_file}: {e}")
print(f"💡 다른 인코딩 시도: cp949, euc-kr")
return 0
except Exception as e:
print(f"❌ 예상치 못한 오류 {input_file}: {e}")
return 0
def filter_with_regex_efficient(input_file: str, output_file: str, pattern: str,
flags: int = 0, encoding: str = 'utf-8') -> int:
"""
정규표현식 기반 메모리 효율적 필터링
Args:
input_file: 입력 파일 경로
output_file: 출력 파일 경로
pattern: 정규표현식 패턴
flags: re 플래그
encoding: 파일 인코딩
"""
try:
compiled_pattern = re.compile(pattern, flags)
line_count = 0
with open(input_file, 'r', encoding=encoding, buffering=8192) as infile, \
open(output_file, 'w', encoding=encoding, buffering=8192) as outfile:
for line in infile:
if compiled_pattern.search(line):
outfile.write(line)
line_count += 1
print(f"✅ 정규표현식 필터링: {line_count:,}줄 처리됨")
return line_count
except re.error as e:
print(f"❌ 정규표현식 오류: {e}")
return 0
except Exception as e:
print(f"❌ 필터링 오류: {e}")
return 0
def detect_encoding(file_path: str) -> str:
"""
파일 인코딩 자동 감지
"""
encodings_to_try = ['utf-8', 'cp949', 'euc-kr', 'utf-16', 'latin1']
for encoding in encodings_to_try:
try:
with open(file_path, 'r', encoding=encoding) as f:
f.read(1024) # 첫 1KB만 테스트
return encoding
except (UnicodeDecodeError, UnicodeError):
continue
return 'utf-8' # 기본값
def batch_filter_files_improved(directory: str, keyword: str,
pattern: str = "*.txt", case_sensitive: bool = True,
backup: bool = True) -> dict:
"""
개선된 일괄 파일 처리 - 인코딩 자동감지, 백업 옵션
Args:
directory: 처리할 디렉토리
keyword: 검색 키워드
pattern: 파일 패턴
case_sensitive: 대소문자 구분
backup: 원본 백업 여부
Returns:
처리 결과 딕셔너리
"""
files = list(Path(directory).glob(pattern))
if not files:
print(f"❌ {directory}에서 {pattern} 패턴 파일을 찾을 수 없습니다.")
return {"processed": 0, "total": 0, "lines": 0}
print(f"🔍 {len(files)}개 파일에서 '{keyword}' 검색 시작...")
processed_files = 0
total_lines = 0
failed_files = []
for file_path in files:
# 인코딩 자동 감지
encoding = detect_encoding(str(file_path))
print(f"📄 {file_path.name} (인코딩: {encoding})")
# 백업 생성
if backup:
backup_path = file_path.with_suffix('.bak' + file_path.suffix)
if not backup_path.exists():
file_path.replace(backup_path)
file_path = backup_path
# 출력 파일명
output_path = file_path.parent / f"{file_path.stem}_filtered{file_path.suffix}"
lines_found = filter_lines_memory_efficient(
str(file_path), str(output_path), keyword,
case_sensitive=case_sensitive, encoding=encoding
)
if lines_found > 0:
processed_files += 1
total_lines += lines_found
elif lines_found == -1: # 오류 발생
failed_files.append(str(file_path))
# 결과 요약
result = {
"processed": processed_files,
"total": len(files),
"lines": total_lines,
"failed": failed_files
}
print(f"\n📊 처리 완료:")
print(f" ✅ 성공: {processed_files}/{len(files)}개 파일")
print(f" 📝 총 필터링된 줄: {total_lines:,}")
if failed_files:
print(f" ❌ 실패한 파일: {len(failed_files)}개")
for failed in failed_files[:3]: # 최대 3개만 표시
print(f" - {Path(failed).name}")
return result
def get_file_info(file_path: str) -> dict:
"""
파일 정보 확인 - 크기, 줄 수 등
"""
try:
path = Path(file_path)
size = path.stat().st_size
# 줄 수 계산 (메모리 효율적)
line_count = 0
encoding = detect_encoding(file_path)
with open(file_path, 'r', encoding=encoding) as f:
for _ in f:
line_count += 1
return {
"size_mb": round(size / (1024 * 1024), 2),
"lines": line_count,
"encoding": encoding
}
except Exception as e:
return {"error": str(e)}
def interactive_mode():
"""
대화형 모드 - 사용자 친화적 인터페이스
"""
print("=" * 50)
print("🔍 텍스트 파일 필터링 도구 v2.0")
print("=" * 50)
while True:
print("\n처리 방식 선택:")
print("1. 단일 파일 처리")
print("2. 여러 파일 일괄 처리")
print("3. 정규표현식 필터링")
print("4. 파일 정보 확인")
print("5. 종료")
choice = input("\n선택 (1-5): ").strip()
if choice == "1":
# 단일 파일 처리
input_file = input("입력 파일 경로: ").strip()
if not Path(input_file).exists():
print("❌ 파일이 존재하지 않습니다.")
continue
keyword = input("검색할 키워드 (기본: 삐삐): ").strip() or "삐삐"
case_sensitive = input("대소문자 구분? (y/N): ").strip().lower() == 'y'
# 출력 파일명 자동 생성
input_path = Path(input_file)
output_file = str(input_path.parent / f"{input_path.stem}_filtered{input_path.suffix}")
# 파일 정보 표시
info = get_file_info(input_file)
if "error" not in info:
print(f"📊 파일 정보: {info['size_mb']}MB, {info['lines']:,}줄, {info['encoding']}")
filter_lines_memory_efficient(input_file, output_file, keyword, case_sensitive)
elif choice == "2":
# 여러 파일 일괄 처리
directory = input("디렉토리 경로 (기본: 현재 폴더): ").strip() or "."
pattern = input("파일 패턴 (기본: *.txt): ").strip() or "*.txt"
keyword = input("검색할 키워드 (기본: 삐삐): ").strip() or "삐삐"
case_sensitive = input("대소문자 구분? (y/N): ").strip().lower() == 'y'
backup = input("원본 백업? (Y/n): ").strip().lower() != 'n'
batch_filter_files_improved(directory, keyword, pattern, case_sensitive, backup)
elif choice == "3":
# 정규표현식 필터링
input_file = input("입력 파일 경로: ").strip()
if not Path(input_file).exists():
print("❌ 파일이 존재하지 않습니다.")
continue
pattern = input("정규표현식 패턴: ").strip()
ignore_case = input("대소문자 무시? (y/N): ").strip().lower() == 'y'
flags = re.IGNORECASE if ignore_case else 0
input_path = Path(input_file)
output_file = str(input_path.parent / f"{input_path.stem}_regex_filtered{input_path.suffix}")
filter_with_regex_efficient(input_file, output_file, pattern, flags)
elif choice == "4":
# 파일 정보 확인
file_path = input("파일 경로: ").strip()
if Path(file_path).exists():
info = get_file_info(file_path)
if "error" in info:
print(f"❌ 오류: {info['error']}")
else:
print(f"📊 파일 정보:")
print(f" 크기: {info['size_mb']}MB")
print(f" 줄 수: {info['lines']:,}")
print(f" 인코딩: {info['encoding']}")
else:
print("❌ 파일이 존재하지 않습니다.")
elif choice == "5":
print("👋 프로그램을 종료합니다.")
break
else:
print("❌ 잘못된 선택입니다.")
# 성능 테스트 함수
def performance_test(file_path: str, keyword: str):
"""
성능 테스트 - 처리 시간 및 메모리 사용량 측정
"""
import time
import psutil
import os
print(f"🚀 성능 테스트 시작: {Path(file_path).name}")
# 메모리 사용량 측정
process = psutil.Process(os.getpid())
memory_before = process.memory_info().rss / 1024 / 1024 # MB
start_time = time.time()
# 임시 출력 파일
temp_output = file_path + ".temp_filtered"
lines_found = filter_lines_memory_efficient(file_path, temp_output, keyword)
end_time = time.time()
memory_after = process.memory_info().rss / 1024 / 1024 # MB
# 임시 파일 삭제
if Path(temp_output).exists():
Path(temp_output).unlink()
print(f"📊 성능 결과:")
print(f" 처리 시간: {end_time - start_time:.2f}초")
print(f" 메모리 사용량: {memory_before:.1f} → {memory_after:.1f} MB")
print(f" 메모리 증가: {memory_after - memory_before:.1f} MB")
print(f" 처리된 줄: {lines_found:,}")
if __name__ == "__main__":
interactive_mode()카테고리 없음