百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

Python文件操作常用库高级应用教程

off999 2025-07-08 22:07 37 浏览 0 评论

本文是在前面《Python文件操作常用库使用教程》的基础上,进一步学习Python文件操作库的高级应用。

一、高级文件系统监控

1.1 watchdog库 - 实时文件系统监控

安装与基本使用

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class FileChangeHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if not event.is_directory:
            print(f"文件被修改: {event.src_path}")

def monitor_directory(path):
    event_handler = FileChangeHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

# 监控当前目录
monitor_directory('.')

功能扩展

class CustomHandler(FileSystemEventHandler):
    def __init__(self, callback):
        self.callback = callback
    
    def on_any_event(self, event):
        event_type = event.event_type  # 'modified', 'created', 'deleted', 'moved'
        self.callback(event_type, event.src_path, 
                     getattr(event, 'dest_path', None))

# 使用示例
def log_change(event_type, src, dest):
    print(f"{event_type}: {src} {f'-> {dest}' if dest else ''}")

monitor_with_callback('.', log_change)

文件监控系统架构




二、高性能文件处理

2.1 内存映射文件(mmap)

大文件处理示例

import mmap
import re

def search_large_file(filename, pattern):
    """在大文件中搜索模式"""
    with open(filename, 'r+b') as f:
        # 内存映射文件
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            # 使用正则搜索
            for match in re.finditer(pattern.encode(), mm):
                yield match.start(), match.group().decode()

# 使用示例
for pos, text in search_large_file('large.log', r'ERROR\s+\d+'):
    print(f"在位置 {pos} 发现错误: {text}")

性能对比测试

import timeit

def test_performance():
    # 传统方式
    def traditional():
        with open('large.file') as f:
            return sum(1 for _ in f)
    
    # mmap方式
    def mmap_way():
        with open('large.file', 'r+b') as f:
            with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
                return sum(1 for _ in mm)
    
    print("传统方式:", timeit.timeit(traditional, number=10))
    print("mmap方式:", timeit.timeit(mmap_way, number=10))

2.2 并行文件处理

多进程文件处理

from multiprocessing import Pool
import os

def process_file_chunk(args):
    """处理文件块"""
    filename, start, end = args
    with open(filename, 'rb') as f:
        f.seek(start)
        chunk = f.read(end - start)
        return len(chunk.splitlines())

def parallel_file_processing(filename, workers=4):
    """并行处理大文件"""
    file_size = os.path.getsize(filename)
    chunk_size = file_size // workers
    ranges = [(filename, i*chunk_size, (i+1)*chunk_size) 
              for i in range(workers)]
    
    with Pool(workers) as pool:
        results = pool.map(process_file_chunk, ranges)
    
    return sum(results)

# 使用示例
total_lines = parallel_file_processing('huge_file.csv')
print(f"文件总行数: {total_lines}")

三、高级压缩与归档处理

3.1 增量压缩与加密

ZIP加密与增量添加

import zipfile
import os
from tqdm import tqdm  # 进度条库

def create_secure_zip(output_zip, files_to_zip, password):
    """创建加密ZIP文件"""
    with zipfile.ZipFile(
        output_zip, 
        'w', 
        compression=zipfile.ZIP_DEFLATED
    ) as zipf:
        zipf.setpassword(password.encode())
        
        for file in tqdm(files_to_zip, desc="压缩进度"):
            if os.path.isfile(file):
                zipf.write(file)
                
def add_to_existing_zip(zip_file, new_files, password=None):
    """向现有ZIP添加文件"""
    temp_zip = f"temp_{zip_file}"
    os.rename(zip_file, temp_zip)
    
    with zipfile.ZipFile(temp_zip, 'r') as zin:
        with zipfile.ZipFile(zip_file, 'w') as zout:
            if password:
                zout.setpassword(password.encode())
            
            # 复制现有条目
            for item in tqdm(zin.infolist(), desc="复制原有文件"):
                zout.writestr(item, zin.read(item))
            
            # 添加新文件
            for file in tqdm(new_files, desc="添加新文件"):
                if os.path.isfile(file):
                    zout.write(file)
    
    os.remove(temp_zip)

3.2 多卷压缩文件

分卷压缩实现

import zipfile
import math

def split_zip(input_files, output_prefix, max_size_mb):
    """创建分卷ZIP文件"""
    max_size = max_size_mb * 1024 * 1024
    part_num = 1
    current_size = 0
    current_zip = None
    
    for file in input_files:
        file_size = os.path.getsize(file)
        
        # 如果当前ZIP不存在或需要新分卷
        if current_zip is None or current_size + file_size > max_size:
            if current_zip is not None:
                current_zip.close()
            
            zip_name = f"{output_prefix}.zip.{part_num:03d}"
            current_zip = zipfile.ZipFile(
                zip_name, 'w', zipfile.ZIP_DEFLATED)
            part_num += 1
            current_size = 0
        
        current_zip.write(file)
        current_size += file_size
    
    if current_zip is not None:
        current_zip.close()
    
    return part_num - 1  # 返回分卷数量

四、云存储集成

4.1 boto3 - AWS S3集成

S3文件操作示例

import boto3
from botocore.exceptions import ClientError

class S3Manager:
    def __init__(self, bucket_name):
        self.s3 = boto3.client('s3')
        self.bucket = bucket_name
    
    def upload_file(self, local_path, s3_key):
        """上传文件到S3"""
        try:
            self.s3.upload_file(
                local_path, 
                self.bucket, 
                s3_key,
                ExtraArgs={
                    'ACL': 'bucket-owner-full-control',
                    'StorageClass': 'INTELLIGENT_TIERING'
                }
            )
            return True
        except ClientError as e:
            print(f"S3上传错误: {e}")
            return False
    
    def download_file(self, s3_key, local_path):
        """从S3下载文件"""
        try:
            self.s3.download_file(self.bucket, s3_key, local_path)
            return True
        except ClientError as e:
            print(f"S3下载错误: {e}")
            return False
    
    def generate_presigned_url(self, s3_key, expires=3600):
        """生成预签名URL"""
        try:
            return self.s3.generate_presigned_url(
                'get_object',
                Params={'Bucket': self.bucket, 'Key': s3_key},
                ExpiresIn=expires
            )
        except ClientError as e:
            print(f"生成URL错误: {e}")
            return None

4.2 阿里云OSS集成

OSS文件操作示例

import oss2
from oss2.exceptions import OssError

class OSSManager:
    def __init__(self, access_key, secret_key, endpoint, bucket_name):
        auth = oss2.Auth(access_key, secret_key)
        self.bucket = oss2.Bucket(auth, endpoint, bucket_name)
    
    def upload_with_progress(self, local_path, oss_key):
        """带进度显示的上传"""
        def progress_callback(consumed_bytes, total_bytes):
            if total_bytes:
                percent = 100 * (consumed_bytes / total_bytes)
                print(f"\r上传进度: {percent:.2f}%", end='')
        
        try:
            result = self.bucket.put_object_from_file(
                oss_key, 
                local_path,
                progress_callback=progress_callback
            )
            print("\n上传完成")
            return result.status == 200
        except OssError as e:
            print(f"\nOSS上传错误: {e}")
            return False
    
    def download_large_file(self, oss_key, local_path, part_size=10*1024*1024):
        """分片下载大文件"""
        try:
            oss2.resumable_download(
                self.bucket, 
                oss_key, 
                local_path,
                part_size=part_size,
                num_threads=4
            )
            return True
        except OssError as e:
            print(f"OSS下载错误: {e}")
            return False

五、文件内容高级处理

5.1 二进制文件解析

结构化二进制文件解析

import struct
from collections import namedtuple

def parse_binary_file(filename):
    """解析自定义二进制格式文件"""
    # 定义文件头结构
    Header = namedtuple('Header', 'magic version num_records')
    header_format = '<4sII'  # 4字节魔数,2个无符号整数
    
    # 定义记录结构
    Record = namedtuple('Record', 'id timestamp value')
    record_format = '<Qdf'  # 无符号长整型,双精度浮点,单精度浮点
    
    with open(filename, 'rb') as f:
        # 读取文件头
        header_data = f.read(struct.calcsize(header_format))
        header = Header._make(struct.unpack(header_format, header_data))
        
        # 验证魔数
        if header.magic != b'BIN1':
            raise ValueError("无效的文件格式")
        
        # 读取记录
        records = []
        for _ in range(header.num_records):
            record_data = f.read(struct.calcsize(record_format))
            records.append(Record._make(struct.unpack(record_format, record_data)))
    
    return header, records

5.2 实时日志文件分析

实时日志分析器

import re
from collections import defaultdict
import time

class LogAnalyzer:
    def __init__(self, log_file, patterns):
        self.log_file = log_file
        self.patterns = patterns
        self.position = 0
        self.stats = defaultdict(int)
    
    def analyze(self):
        """分析日志文件"""
        with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
            # 跳到上次读取位置
            f.seek(self.position)
            
            for line in f:
                for name, pattern in self.patterns.items():
                    if re.search(pattern, line):
                        self.stats[name] += 1
            
            # 更新读取位置
            self.position = f.tell()
        
        return dict(self.stats)
    
    def continuous_monitor(self, interval=5):
        """持续监控日志文件"""
        try:
            while True:
                stats = self.analyze()
                if stats:
                    print(f"\n[{time.ctime()}] 统计结果:")
                    for name, count in stats.items():
                        print(f"{name}: {count}")
                time.sleep(interval)
        except KeyboardInterrupt:
            print("\n监控停止")

# 使用示例
patterns = {
    'ERROR': r'ERROR',
    'WARNING': r'WARNING',
    'HTTP_500': r'HTTP/1.\d"\s+500'
}
analyzer = LogAnalyzer('app.log', patterns)
analyzer.continuous_monitor()

六、安全文件操作

6.1 安全文件写入模式

原子写入模式

import os
import tempfile

def atomic_write(filename, data, mode='w', encoding='utf-8'):
    """原子性写入文件"""
    # 创建临时文件
    temp_dir = os.path.dirname(os.path.abspath(filename))
    with tempfile.NamedTemporaryFile(
        mode=mode,
        dir=temp_dir,
        prefix='.tmp_',
        delete=False,
        encoding=encoding
    ) as tmp:
        tmp.write(data)
        tmp.flush()
        os.fsync(tmp.fileno())
        tempname = tmp.name
    
    # 原子替换
    try:
        os.replace(tempname, filename)
    except:
        os.unlink(tempname)
        raise

6.2 文件权限管理

安全权限设置

import os
import stat

def set_secure_permissions(filename):
    """设置安全文件权限"""
    # 获取当前权限
    current_mode = os.stat(filename).st_mode
    
    # 移除组和其他用户的写权限
    new_mode = current_mode & ~stat.S_IWGRP & ~stat.S_IWOTH
    
    # 如果是敏感文件,移除所有组和其他用户的权限
    if filename.endswith(('.key', '.pem', '.env')):
        new_mode = new_mode & ~stat.S_IRGRP & ~stat.S_IROTH
    
    os.chmod(filename, new_mode)

def create_secure_file(filename, content):
    """创建安全文件"""
    # 使用原子写入
    atomic_write(filename, content)
    
    # 设置安全权限
    set_secure_permissions(filename)
    
    # 验证权限
    mode = os.stat(filename).st_mode
    if mode & (stat.S_IWGRP | stat.S_IWOTH):
        raise RuntimeError("文件权限设置失败")

七、总结

表2:文件操作性能优化技术

场景

优化技术

适用条件

大文件读取

内存映射(mmap)

需要随机访问的大文件

批量小文件

并行处理(multiprocessing)

大量独立小文件

顺序处理

缓冲读写(大缓冲区)

顺序读写场景

网络存储

断点续传/分片上传

不稳定网络环境

实时处理

增量读取(记录位置)

日志监控等场景

压缩文件

流式处理(不解压)

大型压缩文件

建议

  1. 避免频繁的小IO操作,尽量批量处理
  2. 使用with语句确保资源释放
  3. 对大文件考虑内存映射技术
  4. 大量文件处理使用并行技术
  5. 网络存储操作添加重试机制
  6. 关键操作添加原子性保证

通过本教程的高级技巧,我们可以在实际项目中实现更高效、更安全的文件操作,满足企业级应用的需求。


持续更新Python编程学习日志与技巧,敬请关注!



#编程# #python# #在头条记录我的2025#

相关推荐

大文件传不动?WinRAR/7-Zip 入门到高手,这 5 个技巧让你效率翻倍

“这200张照片怎么传给女儿?微信发不了,邮箱附件又超限……”62岁的张阿姨对着电脑犯愁时,儿子只用了3分钟就把照片压缩成一个文件,还教她:“以后用压缩软件,比打包行李还方便!”职场人更懂这...

电脑解压缩软件推荐——7-Zip:免费、高效、简洁的文件管理神器

在日常工作中,我们经常需要处理压缩文件。无论是下载软件包、接收文件,还是存储大量数据,压缩和解压缩文件都成为了我们日常操作的一部分。而说到压缩解压软件,7-Zip绝对是一个不可忽视的名字。今天,我就来...

设置了加密密码zip文件要如何打开?这几个方法可以试试~

Zip是一种常见的压缩格式文件,文件还可以设置密码保护。那设置了密码的Zip文件要如何打开呢?不清楚的小伙伴一起来看看吧。当我们知道密码想要打开带密码的Zip文件,我们需要用到适用于Zip格式的解压缩...

大文件想要传输成功,怎么把ZIP文件分卷压缩

不知道各位小伙伴有没有这样的烦恼,发送很大很大的压缩包会受到限制,为此,想要在压缩过程中将文件拆分为几个压缩包并且同时为所有压缩包设置加密应该如何设置?方法一:使用7-Zip免费且强大的文件管理工具7...

高效处理 RAR 分卷压缩包:合并解压操作全攻略

在文件传输和存储过程中,当遇到大文件时,我们常常会使用分卷压缩的方式将其拆分成多个较小的压缩包,方便存储和传输。RAR作为一种常见的压缩格式,分卷压缩包的使用频率也很高。但很多人在拿到RAR分卷...

2个方法教你如何删除ZIP压缩包密码

zip压缩包设置了加密密码,每次解压文件都需要输入密码才能够顺利解压出文件,当压缩包文件不再需要加密的时候,大家肯定想删除压缩包密码,或是忘记了压缩包密码,想要通过删除操作将压缩包密码删除,就能够顺利...

速转!漏洞预警丨压缩软件Winrar目录穿越漏洞

WinRAR是一款功能强大的压缩包管理器,它是档案工具RAR在Windows环境下的图形界面。该软件可用于备份数据,缩减电子邮件附件的大小,解压缩从Internet上下载的RAR、ZIP及其它类...

文件解压方法和工具分享_文件解压工具下载

压缩文件减少文件大小,降低文件失效的概率,总得来说好处很多。所以很多文件我们下载下来都是压缩软件,很多小伙伴不知道怎么解压,或者不知道什么工具更好,所以今天做了文件解压方法和工具的分享给大家。一、解压...

[python]《Python编程快速上手:让繁琐工作自动化》学习笔记3

1.组织文件笔记(第9章)(代码下载)1.1文件与文件路径通过importshutil调用shutil模块操作目录,shutil模块能够在Python程序中实现文件复制、移动、改名和删除;同时...

Python内置tarfile模块:读写 tar 归档文件详解

一、学习目标1.1学习目标掌握Python内置模块tarfile的核心功能,包括:理解tar归档文件的原理与常见压缩格式(gzip/bz2/lzma)掌握tar文件的读写操作(创建、解压、查看、过滤...

使用python展开tar包_python拓展

类Unix的系统,打包文件经常使用的就是tar包,结合zip工具,可以方便的打包并解压。在python的标准库里面有tarfile库,可以方便实现生成了展开tar包。使用这个库最大的好处,可能就在于不...

银狐钓鱼再升级:白文件脚本化实现GO语言后门持久驻留

近期,火绒威胁情报中心监测到一批相对更为活跃的“银狐”系列变种木马。火绒安全工程师第一时间获取样本并进行分析。分析发现,该样本通过阿里云存储桶下发恶意文件,采用AppDomainManager进行白利...

ZIP文件怎么打开?2个简单方法教你轻松搞定!

在日常工作和生活中,我们经常会遇到各种压缩文件,其中最常见的格式之一就是ZIP。ZIP文件通过压缩数据来减少文件大小,方便我们进行存储和传输。然而,对于初学者来说,如何打开ZIP文件可能会成为一个小小...

Ubuntu—解压多个zip压缩文件.zip .z01 .z02

方法将所有zip文件放在同一目录中:zip_file.z01,zip_file.z02,zip_file.z03,...,zip_file.zip。在Zip3.0版本及以上,使用下列命令:将所有zi...

如何使用7-Zip对文件进行加密压缩

7-Zip是一款开源的文件归档工具,支持多种压缩格式,并提供了对压缩文件进行加密的功能。使用7-Zip可以轻松创建和解压.7z、.zip等格式的压缩文件,并且可以通过设置密码来保护压缩包中的...

取消回复欢迎 发表评论: