FreezeJ' Blog

多线程FTP客户端

2021-03-10

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2021-3-5 16:54
# @Author  : FreezeJ
# @File    : ftp_client.py
# @Software: PyCharm
# @Comment : FTP客户端脚本

import json
import os
import queue
import sys
import threading
from concurrent.futures import ThreadPoolExecutor
from ftplib import FTP as _FTP


# 重写ftp 修复同网段内 ip返回异常
class FTP(_FTP):
    def connect(self, host='', port=0, timeout=-999, *args):
        _FTP.connect(self, host, port, timeout)
        self.host = host
        return self.welcome

    def makepasv(self):
        host, port = _FTP.makepasv(self)
        return self.host, port

    def dir(self, *args):  # 复写方法,添加返回值
        cmd = 'LIST'
        temp_list = []
        if args[-1:] and type(args[-1]) != type(''):
            args, func = args[:-1], args[-1]
        for arg in args:
            if arg:
                cmd = cmd + (' ' + arg)
        self.retrlines(cmd, temp_list.append)
        return temp_list


class FtpOps(object):
    """ FTP操作
    ftp_info = {
        "ip": 'xxxx',  # 主机ip
        "user": 'xxxx',  # 用户名
        "passwd": 'xxxx',  # 密码
        "base_path": 'xxxx',  # ftp根目录,可选
    }
    my_ftp = FtpOps(**ftp_info)  # 普通操作,带状态
    my_ftp.print_usage()  # 查看用法
    """
    __slots__ = ['_ftp', 'ip', 'port', 'user', 'passwd', 'base_path']
    usage = {
        "mkdir": ("path", "递归创建目录"),
        "chdir": ("path", "切换目录"),
        "rmdir": ("path", "删除空目录"),
        "rename": ("path, new_path", "重命名文件或目录"),
        "dir": ("path", "列出路径文件信息"),
        "nlst": ("path", "列出路径文件"),
        "delete": ("path", "删除文件"),
        "file_size": ("path", "查看文件大小"),
        "upload": ("local_path, remote_path", "上传文件"),
        "download": ("local_path, remote_path", "下载文件"),
    }

    @classmethod
    def print_usage(cls):
        for k, v in cls.usage.items():
            print(f'my_ftp.{k}({v[0]}) # {v[1]}')
        print(f'my_ftp.pwd # 当前路径')

    def __init__(self, ip, user, passwd, port=21, base_path=None):
        """
        Ftp操作类
        :param ip: ip地址
        :param port: 端口
        :param user: 用户
        :param base_path: 操作根目录
        """
        self._ftp = None
        self.ip = ip
        self.port = port
        self.user = user
        self.passwd = passwd
        self.base_path = base_path
        if self.base_path:
            code, result = self.chdir(self.base_path)
            if code != 0:
                raise Exception(f'无法切换到目录{self.base_path}')

    @property
    def ftp(self):
        """
        单例ftp连接
        :return:
        """
        if not self._ftp:
            try:
                ftp = FTP()
                ftp.connect(self.ip, self.port)
                ftp.login(self.user, self.passwd)
                ftp.encoding = 'utf-8'
                self._ftp = ftp
            except Exception as e:
                print(e)
                return None
        return self._ftp

    def mkdir(self, path):
        """
        创建目录
        :param path: 目录路径
        :return: code: 状态码(0:成功 1:失败) result: 执行结果
        """
        result = f"{path}创建失败!"
        try:
            dir_list = path.strip('/').split('/')
            assert path, "目录路径为空"
            for i in range(len(dir_list)):
                parent_dir = dir_list[0: i + 1]  # 父目录
                if not parent_dir:
                    continue
                dir_path = '/'.join(parent_dir)
                if not self.is_exist(dir_path):  # 目录不存在
                    result = self.ftp.mkd(dir_path)
                else:
                    result = dir_path
            code = 0
        except Exception as e:
            result = repr(e)
            code = 1
        return code, result

    def chdir(self, path):
        """
        切换目录
        :param path: 目录路径
        :return: code: 状态码(0:成功 1:失败) result: 执行结果
        """
        try:
            result = self.ftp.cwd(path)
            code = 0
        except Exception as e:
            result = repr(e)
            code = 1
        return code, result

    def rmdir(self, path):
        """
        删除目录(空目录)
        :param path: 目录路径
        :return: code: 状态码(0:成功 1:失败) result: 执行结果
        """
        try:
            print(self.dir())
            result = self.ftp.rmd(path)
            code = 0
        except Exception as e:
            result = repr(e)
            code = 1
        return code, result

    def rename(self, path, new_path):
        """
        重命名文件或目录
        :param path: 旧路径
        :param new_path: 新路径
        :return: code: 状态码(0:成功 1:失败) result: 执行结果
        """
        try:
            result = self.ftp.rename(path, new_path)
            code = 0
        except Exception as e:
            result = repr(e)
            code = 1
        return code, result

    @property
    def pwd(self):
        """
        当前路径
        :return: 当前路径
        """
        return self.ftp.pwd()

    def dir(self, path=None):
        """
        列出路径下的文件,默认列出当前路径
        :param path: 目录路径
        :return: code: 状态码(0:成功 1:失败) result: 执行结果
        """
        try:
            result = self.ftp.dir(path)
            code = 0
        except Exception as e:
            result = repr(e)
            code = 1
        return code, result

    def nlst(self, path=None):
        """
        列出路径下的文件,默认列出当前路径
        :param path: 目录路径
        :return: code: 状态码(0:成功 1:失败) result: 执行结果
        """
        try:
            result = self.ftp.nlst(path)
            code = 0
        except Exception as e:
            result = repr(e)
            code = 1
        return code, result

    def is_exist(self, path):
        """
        判断文件是否存在
        :param path: 路径
        :return:
        """
        path_dir = os.path.dirname(path)
        basename = os.path.basename(path)
        code, result = self.nlst(path_dir)
        return True if basename in result else False

    def delete(self, path):
        """
        删除文件
        :param path: 文件路径
        :return: code: 状态码(0:成功 1:失败) result: 执行结果
        """
        try:
            result = self.ftp.delete(path)
            code = 0
            print(f"{path} 删除成功!")
        except Exception as e:
            result = repr(e)
            code = 1
            print(f"{path} 删除失败!")
        return code, result

    def file_size(self, path):
        """
        获取文件大小
        :param path: 文件路径
        :return: code: 状态码(0:成功 1:失败) result: 执行结果
        """
        try:
            result = self.ftp.size(path)
            code = 0
        except Exception as e:
            result = repr(e)
            code = 1
        return code, result

    def upload(self, local_path, remote_path):
        """
        上传文件
        :param local_path: 本地文件路径
        :param remote_path: 远程文件路径
        :return: code: 状态码(0:成功 1:失败) result: 执行结果
        """
        try:
            if not os.path.isfile(local_path):
                raise FileNotFoundError(local_path)
            local_size = os.path.getsize(local_path)  # 本地文件大小
            code, remote_size = self.file_size(remote_path)  # 远端文件大小
            if code == 0 and local_size == remote_size:
                result = f"{remote_path}文件已存在!"
                code = 0
            else:
                remote_path_dir = os.path.dirname(remote_path)
                self.mkdir(remote_path_dir)
                print('开始上传文件' + local_path)
                fb = open(local_path, 'rb')
                result = self.ftp.storbinary(f"stor {remote_path}", fb)
                code = 0
            # print(f"{local_path} 上传成功!")
        except Exception as e:
            result = repr(e)
            code = 1
            print(f"{local_path} 上传失败!")
        return code, result

    def download(self, local_path, remote_path):
        """
        下载文件
        :param local_path: 本地文件路径
        :param remote_path: 远程文件路径
        :return: code: 状态码(0:成功 1:失败) result: 执行结果
        """
        try:
            code, remote_size = self.file_size(remote_path)  # 远端文件大小
            if code == 0:  # 远端文件存在
                if os.path.isfile(local_path):  # 本地已经存在文件
                    local_size = os.path.getsize(local_path)  # 本地文件大小
                    print(f"local_size:{local_size}")
                    print(f"remote_size:{remote_size}")
                    if local_size == remote_size:  # 文件已经存在,且文件大小匹配,不用重新下载
                        result = f"{remote_path}文件已存在!"
                        print(result)
                    else:  # 文件已经存在,但文件大小不匹配,重新下载
                        result = self.ftp.retrbinary(f"retr {remote_path}",
                                                     open(local_path, 'wb').write)
                    code = 0
                elif os.path.exists(local_path):
                    result = f"{local_path}已经存在但不是一个文件!"
                    code = 1
                else:  # 文件不存在
                    local_path_dir = os.path.dirname(local_path)
                    os.makedirs(local_path_dir, exist_ok=True)  # 创建本地目录
                    result = self.ftp.retrbinary(f"retr {remote_path}",
                                                 open(local_path, 'wb').write)
                    code = 0
            else:
                result = f"{remote_path}文件不存在!"
                code = 1

        except Exception as e:
            result = repr(e)
            code = 1
        if code == 0:
            print(f"{local_path} 下载成功!")
        else:
            print(f"{local_path} 下载失败!")
        return code, result


class FtpThreadOps(object):
    """ 多线程FTP操作
    ftp_info = {
        "ip": 'xxxx',  # 主机ip
        "user": 'xxxx',  # 用户名
        "passwd": 'xxxx',  # 密码
        "base_path": 'xxxx',  # ftp根目录,可选
    }
    my_thread_ftp = FtpOps(**ftp_info)  # 普通操作,带状态
    my_thread_ftp.print_usage()  # 查看用法
    """
    usage = {
        "rmdir": ("path", "删除空目录列表"),
        "rename": ("path, new_path", "重命名文件或目录列表"),
        "delete": ("path", "删除文件列表"),
        "upload": ("local_path, remote_path", "上传文件列表"),
        "download": ("local_path, remote_path", "下载文件列表"),
    }

    @classmethod
    def print_usage(cls):
        for k, v in cls.usage.items():
            args = f'[{v[0]}]' if len(v[0].split(',')) > 1 else f'{v[0]}'
            print(f'my_thread_ftp.thread_ops(ops_name="{k}", args_list=[{args},], max_workers=5) # {v[1]}')

    def __init__(self, ip, user, passwd, port=21, base_path=None):
        self.ip = ip
        self.port = port
        self.user = user
        self.passwd = passwd
        self.base_path = base_path
        self.ftp_pool_num = 0  # 当前连接数量
        self._ftp_pool = queue.Queue(maxsize=100)
        self.lock = threading.Lock()  # 线程锁

    @property
    def ftp_pool(self):
        """
        ftp实例池
        :return:
        """
        if self._ftp_pool.qsize() == 0:  # 动态增加实例池数量,由线程并发数来控制
            try:
                self._ftp_pool.put(FtpOps(
                    ip=self.ip,  # 主机ip
                    user=self.user,  # 用户名
                    passwd=self.passwd,  # 密码
                    base_path=self.base_path,  # ftp根目录
                ))
                self.ftp_pool_num += 1
            except Exception as e:
                print(f"创建FTP实例失败! {e}")
                return None
        return self._ftp_pool

    def ops(self, ops_name, *args):
        """
        从实例池获取实例,根据attr执行FTP实例操作,执行结束返回实例池(不能确保目录状态,每次拿到的实例可能不同)
        :param ops_name: ftp操作名称
        :param args: 参数
        :return:
        """
        self.lock.acquire()  # 加锁
        ftp = self.ftp_pool.get()  # 获取连接实例
        self.lock.release()  # 释放
        ops_func = getattr(ftp, ops_name, None)
        if not ops_func:
            raise Exception(f'不支持的操作{ops_func}')
        code, result = getattr(ftp, ops_name)(*args)
        self.lock.acquire()  # 加锁
        self.ftp_pool.put(ftp)  # 回收连接实例
        self.lock.release()  # 释放
        return code, result

    def thread_ops(self, ops_name, args_list, max_workers=5):
        """
        FTP多线程操作(FTP协议只支持一个连接通道同时做一个操作,所以开启多线程会创建多个FTP实例复用操作)
        :param ops_name: ftp操作名称
        :param args_list: ftp操作参数
        :param max_workers: 线程数(也会影响FTP实例个数)
        :return:
        """
        with ThreadPoolExecutor(max_workers=max_workers) as t:
            thread_list = []
            if ops_name not in self.usage.keys():
                raise Exception(f"不支持的操作:{ops_name}, 只支持{self.usage.keys()}")
            for args in args_list:  # 去重
                task = t.submit(self.ops, ops_name, *args)
                thread_list.append({
                    "task": task,  # 任务对象
                    "args": args,  # 任务参数
                })
            t.shutdown(wait=True)  # 等待所有线程结束
        success_count = 0
        fail_count = 0
        fail_args = []
        fail_args_result = []
        for thread in thread_list:
            code, result = thread["task"].result()
            if code == 0:
                success_count += 1
                # sys.stdout.flush()
                # print(f"{ops_name}成功! args: {thread['args']}")
            else:
                sys.stdout.flush()
                print(f"{ops_name}失败!\nargs: {thread['args']}\nresult: {result}")
                fail_count += 1
                fail_args.append(thread["args"])
                fail_args_result.append(f'args: {thread["args"]}\nresult: {thread["task"].result()}')
        print(f"{ops_name}成功:{success_count} 失败:{fail_count}")
        if fail_count:
            print(f"{ops_name}失败参数:")
            print(json.dumps(fail_args))
        if fail_count > 0:
            code = 1
        else:
            code = 0
        return code, {
            "success_count": success_count,
            "fail_count": fail_count,
            "fail_path": fail_args,
            "fail_path_result": fail_args_result,
        }


if __name__ == '__main__':
    ftp_info = {
        "ip": '',  # 主机ip
        "user": '',  # 用户名
        "passwd": '',  # 密码
        "base_path": '',  # ftp根目录
    }

    # 实例化
    my_ftp = FtpOps(**ftp_info)  # 普通操作,带状态
    my_ftp.print_usage()

    my_thread_ftp = FtpThreadOps(**ftp_info)  # 多线程操作,不带状态
    my_thread_ftp.print_usage()
使用支付宝打赏
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏