FreezeJ' Blog

火山引擎TOS对象存储API调用Demo

2026-02-06

参考文档:
单分块上传的签名机制
在URL中包含签名
PutObject上传接口

建议直接使用Python SDK,此方法仅用于无法使用SDK的情况,提供手动签名的参考Demo。本Demo仅实现了上传和下载2个方法。

import os
import hmac
import hashlib
import base64
import requests
from datetime import datetime, timezone
from urllib.parse import quote
from typing import Dict, Any, Union

class TosClient:
    """
    TOS (火山引擎对象存储) 客户端类
    用于处理与 TOS 服务的交互,包括生成预签名 URL 和上传文件。
    """
    def __init__(self, ak: str, sk: str, endpoint: str, region: str, bucket_name: str):
        """
        初始化 TosClient 实例

        :param ak: Access Key ID (访问密钥 ID)
        :param sk: Secret Access Key (秘密访问密钥)
        :param endpoint: TOS 服务端点 (例如: tos-cn-guangzhou.volces.com)
        :param region: 地域信息 (例如: cn-guangzhou)
        :param bucket_name: 存储桶名称
        """
        self.ak = ak
        self.sk = sk
        self.endpoint = endpoint
        self.region = region
        self.bucket_name = bucket_name
        self.host = f'{bucket_name}.{endpoint}'

    def _hmac_sha256(self, key: Union[bytes, str], msg: str) -> bytes:
        """
        计算 HMAC-SHA256 签名

        :param key: 签名密钥 (bytes 或 str)
        :param msg: 待签名的消息字符串
        :return: 签名的二进制数据
        """
        if isinstance(key, str):
            key = key.encode()
        return hmac.new(key, msg.encode(), hashlib.sha256).digest()

    def _get_signing_key(self, date: str) -> bytes:
        """
        生成派生签名密钥 (Derived Signing Key)

        :param date: 日期字符串 (格式: YYYYMMDD)
        :return: 最终的签名密钥
        """
        k_date = self._hmac_sha256(self.sk, date)
        k_region = self._hmac_sha256(k_date, self.region)
        k_service = self._hmac_sha256(k_region, 'tos')
        return self._hmac_sha256(k_service, 'request')

    def _get_canonical_querystring(self, params: Dict[str, str]) -> str:
        """
        构造规范化的查询字符串

        :param params: 查询参数字典
        :return: 编码并排序后的查询字符串
        """
        return '&'.join([f'{k}={quote(v, safe="")}' for k, v in sorted(params.items())])

    def _sign_request(self, method: str, path: str, headers_to_sign: Dict[str, str], expires: int = 3600) -> Dict[str, Any]:
        """
        生成签名
        :param method: HTTP方法 (GET, PUT, etc.)
        :param path: 对象路径 (例如 object-key)
        :param headers_to_sign: 需要参与签名的Header字典
        :param expires: 过期时间
        :return: 包含签名、规范化查询字符串等信息的字典
        """
        now = datetime.now(timezone.utc)
        date = now.strftime('%Y%m%d')
        timestamp = now.strftime('%Y%m%dT%H%M%SZ')
        
        # 1. 构造 CanonicalURI
        if not path.startswith('/'):
            path = '/' + path
        # t2.txt: 除了对象名中的 / 无需编码,其他情况下 / 都需要编码
        canonical_uri = quote(path, safe='/')
        
        # 2. 构造 SignedHeaders 和 CanonicalHeaders
        # t2.txt: Lowercase(HeaderName) + ':' + Trim(value) + '\n'
        tmp_headers = {}
        for k, v in headers_to_sign.items():
            tmp_headers[k.lower()] = str(v).strip()
            
        sorted_keys = sorted(tmp_headers.keys())
        signed_headers_str = ';'.join(sorted_keys)
        
        canonical_headers = ''
        for k in sorted_keys:
            canonical_headers += f"{k}:{tmp_headers[k]}\n"

        # 3. 构造查询参数
        credential_scope = f'{date}/{self.region}/tos/request'
        params = {
            'X-Tos-Algorithm': 'TOS4-HMAC-SHA256',
            'X-Tos-Credential': f'{self.ak}/{credential_scope}',
            'X-Tos-Date': timestamp,
            'X-Tos-Expires': str(expires),
            'X-Tos-SignedHeaders': signed_headers_str
        }
        canonical_querystring = self._get_canonical_querystring(params)
        
        # 4. 构造规范化请求
        # Payload 默认为 UNSIGNED-PAYLOAD (适用于预签名URL场景)
        payload_hash = "UNSIGNED-PAYLOAD"
        canonical_request = f"{method}\n{canonical_uri}\n{canonical_querystring}\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}"
        
        # 5. 计算签名
        string_to_sign = f"TOS4-HMAC-SHA256\n{timestamp}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode()).hexdigest()}"
        
        signing_key = self._get_signing_key(date)
        signature = hmac.new(signing_key, string_to_sign.encode(), hashlib.sha256).hexdigest()
        
        return {
            'signature': signature,
            'canonical_querystring': canonical_querystring,
            'timestamp': timestamp,
            'signed_headers': signed_headers_str
        }

    def generate_presigned_url(self, path: str, expires: int = 3600) -> str:
        """
        生成用于下载对象的预签名 URL

        :param path: 对象在存储桶中的路径 (Key)
        :param expires: URL 有效期 (秒),默认 3600 秒
        :return: 完整的预签名 URL
        """
        headers_to_sign = {'host': self.host}
        sign_result = self._sign_request('GET', path, headers_to_sign, expires)
        
        final_url = f"https://{self.host}/{path}?{sign_result['canonical_querystring']}&X-Tos-Signature={sign_result['signature']}"
        return final_url

    def upload_file(self, file_path: str, path: str, expires: int = 3600, **kwargs) -> Dict[str, Any]:
        """
        上传文件到 TOS 存储桶

        :param file_path: 本地文件路径
        :param path: 上传到存储桶中的对象路径
        :param expires: 签名有效期 (秒),默认 3600 秒
        :param kwargs: 可选的额外头信息 (如 x-tos-acl 等)
        :return: 上传结果字典,包含状态码、ETag 等信息
        """
        # 读取文件
        with open(file_path, 'rb') as file:
            file_content = file.read()
        
        # 计算摘要和长度
        content_md5 = hashlib.md5(file_content).digest()
        content_md5_base64 = base64.b64encode(content_md5).decode()
        content_length = str(len(file_content))
        
        # 构造参与签名的Headers (Key必须小写)
        headers_to_sign = {
            'content-length': content_length,
            'content-md5': content_md5_base64,
            'host': self.host
        }
        
        # 生成签名
        sign_result = self._sign_request('PUT', path, headers_to_sign, expires)
        
        # 构造请求URL
        signed_url = f"https://{self.host}/{path}?{sign_result['canonical_querystring']}&X-Tos-Signature={sign_result['signature']}"
        
        # 构造实际请求Headers
        request_headers = {
            'Content-Type': 'application/octet-stream',
            'Content-Length': content_length,
            'Content-MD5': content_md5_base64,
            'Host': self.host
        }
        
        # 添加可选Headers
        optional_headers_map = {
            'x-tos-acl': 'x-tos-acl',
            'x-tos-storage-class': 'x-tos-storage-class',
            'x-tos-server-side-encryption': 'x-tos-server-side-encryption'
        }
        for k, v in optional_headers_map.items():
            if k in kwargs:
                request_headers[v] = kwargs[k]
        
        # 发送请求
        response = requests.put(signed_url, data=file_content, headers=request_headers)
        
        return {
            'success': response.status_code == 200,
            'status_code': response.status_code,
            'headers': dict(response.headers),
            'etag': response.headers.get('ETag', ''),
            'version_id': response.headers.get('x-tos-version-id', ''),
            'crc64': response.headers.get('x-tos-hash-crc64ecma', '')
        }

if __name__ == '__main__':
    # AK\SK写入环境变量并且修改config内容
    config = {
        'ak': os.getenv('TOS_AK'),
        'sk': os.getenv('TOS_SK'),
        'endpoint': 'tos-cn-guangzhou.volces.com',
        'region': 'cn-guangzhou',
        'bucket_name': 'xxxxxxx'
    }
    client = TosClient(**config)
    object_key = 'object-test.txt'
    local_file = './test.txt'

    # 上传文件
    try:
        result = client.upload_file(
            file_path=local_file,
            path=object_key
        )
        print("上传结果:", result)
    except FileNotFoundError:
        print(f"错误信息: {local_file} 未找到. 请创建该文件或调整 file_path.")
    except Exception as e:
        print(f"上传文件时发生错误: {e}")

    # 生成预签名URL
    url = client.generate_presigned_url(path=object_key, expires=3600)
    print(f"预签名URL: {url}")
使用支付宝打赏
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏