建议直接使用Python SDK,此方法仅用于无法使用SDK的情况,提供手动签名的参考Demo。本Demo仅实现了上传和下载2个方法。
import os
import hmac
import hashlib
import base64
import requests
from datetime import datetime, timezone
from urllib.parse import quote
from typing import Dict, Any, Union
class TosClient:
"""
TOS (火山引擎对象存储) 客户端类
用于处理与 TOS 服务的交互,包括生成预签名 URL 和上传文件。
"""
def __init__(self, ak: str, sk: str, endpoint: str, region: str, bucket_name: str):
"""
初始化 TosClient 实例
:param ak: Access Key ID (访问密钥 ID)
:param sk: Secret Access Key (秘密访问密钥)
:param endpoint: TOS 服务端点 (例如: tos-cn-guangzhou.volces.com)
:param region: 地域信息 (例如: cn-guangzhou)
:param bucket_name: 存储桶名称
"""
self.ak = ak
self.sk = sk
self.endpoint = endpoint
self.region = region
self.bucket_name = bucket_name
self.host = f'{bucket_name}.{endpoint}'
def _hmac_sha256(self, key: Union[bytes, str], msg: str) -> bytes:
"""
计算 HMAC-SHA256 签名
:param key: 签名密钥 (bytes 或 str)
:param msg: 待签名的消息字符串
:return: 签名的二进制数据
"""
if isinstance(key, str):
key = key.encode()
return hmac.new(key, msg.encode(), hashlib.sha256).digest()
def _get_signing_key(self, date: str) -> bytes:
"""
生成派生签名密钥 (Derived Signing Key)
:param date: 日期字符串 (格式: YYYYMMDD)
:return: 最终的签名密钥
"""
k_date = self._hmac_sha256(self.sk, date)
k_region = self._hmac_sha256(k_date, self.region)
k_service = self._hmac_sha256(k_region, 'tos')
return self._hmac_sha256(k_service, 'request')
def _get_canonical_querystring(self, params: Dict[str, str]) -> str:
"""
构造规范化的查询字符串
:param params: 查询参数字典
:return: 编码并排序后的查询字符串
"""
return '&'.join([f'{k}={quote(v, safe="")}' for k, v in sorted(params.items())])
def _sign_request(self, method: str, path: str, headers_to_sign: Dict[str, str], expires: int = 3600) -> Dict[str, Any]:
"""
生成签名
:param method: HTTP方法 (GET, PUT, etc.)
:param path: 对象路径 (例如 object-key)
:param headers_to_sign: 需要参与签名的Header字典
:param expires: 过期时间
:return: 包含签名、规范化查询字符串等信息的字典
"""
now = datetime.now(timezone.utc)
date = now.strftime('%Y%m%d')
timestamp = now.strftime('%Y%m%dT%H%M%SZ')
# 1. 构造 CanonicalURI
if not path.startswith('/'):
path = '/' + path
# t2.txt: 除了对象名中的 / 无需编码,其他情况下 / 都需要编码
canonical_uri = quote(path, safe='/')
# 2. 构造 SignedHeaders 和 CanonicalHeaders
# t2.txt: Lowercase(HeaderName) + ':' + Trim(value) + '\n'
tmp_headers = {}
for k, v in headers_to_sign.items():
tmp_headers[k.lower()] = str(v).strip()
sorted_keys = sorted(tmp_headers.keys())
signed_headers_str = ';'.join(sorted_keys)
canonical_headers = ''
for k in sorted_keys:
canonical_headers += f"{k}:{tmp_headers[k]}\n"
# 3. 构造查询参数
credential_scope = f'{date}/{self.region}/tos/request'
params = {
'X-Tos-Algorithm': 'TOS4-HMAC-SHA256',
'X-Tos-Credential': f'{self.ak}/{credential_scope}',
'X-Tos-Date': timestamp,
'X-Tos-Expires': str(expires),
'X-Tos-SignedHeaders': signed_headers_str
}
canonical_querystring = self._get_canonical_querystring(params)
# 4. 构造规范化请求
# Payload 默认为 UNSIGNED-PAYLOAD (适用于预签名URL场景)
payload_hash = "UNSIGNED-PAYLOAD"
canonical_request = f"{method}\n{canonical_uri}\n{canonical_querystring}\n{canonical_headers}\n{signed_headers_str}\n{payload_hash}"
# 5. 计算签名
string_to_sign = f"TOS4-HMAC-SHA256\n{timestamp}\n{credential_scope}\n{hashlib.sha256(canonical_request.encode()).hexdigest()}"
signing_key = self._get_signing_key(date)
signature = hmac.new(signing_key, string_to_sign.encode(), hashlib.sha256).hexdigest()
return {
'signature': signature,
'canonical_querystring': canonical_querystring,
'timestamp': timestamp,
'signed_headers': signed_headers_str
}
def generate_presigned_url(self, path: str, expires: int = 3600) -> str:
"""
生成用于下载对象的预签名 URL
:param path: 对象在存储桶中的路径 (Key)
:param expires: URL 有效期 (秒),默认 3600 秒
:return: 完整的预签名 URL
"""
headers_to_sign = {'host': self.host}
sign_result = self._sign_request('GET', path, headers_to_sign, expires)
final_url = f"https://{self.host}/{path}?{sign_result['canonical_querystring']}&X-Tos-Signature={sign_result['signature']}"
return final_url
def upload_file(self, file_path: str, path: str, expires: int = 3600, **kwargs) -> Dict[str, Any]:
"""
上传文件到 TOS 存储桶
:param file_path: 本地文件路径
:param path: 上传到存储桶中的对象路径
:param expires: 签名有效期 (秒),默认 3600 秒
:param kwargs: 可选的额外头信息 (如 x-tos-acl 等)
:return: 上传结果字典,包含状态码、ETag 等信息
"""
# 读取文件
with open(file_path, 'rb') as file:
file_content = file.read()
# 计算摘要和长度
content_md5 = hashlib.md5(file_content).digest()
content_md5_base64 = base64.b64encode(content_md5).decode()
content_length = str(len(file_content))
# 构造参与签名的Headers (Key必须小写)
headers_to_sign = {
'content-length': content_length,
'content-md5': content_md5_base64,
'host': self.host
}
# 生成签名
sign_result = self._sign_request('PUT', path, headers_to_sign, expires)
# 构造请求URL
signed_url = f"https://{self.host}/{path}?{sign_result['canonical_querystring']}&X-Tos-Signature={sign_result['signature']}"
# 构造实际请求Headers
request_headers = {
'Content-Type': 'application/octet-stream',
'Content-Length': content_length,
'Content-MD5': content_md5_base64,
'Host': self.host
}
# 添加可选Headers
optional_headers_map = {
'x-tos-acl': 'x-tos-acl',
'x-tos-storage-class': 'x-tos-storage-class',
'x-tos-server-side-encryption': 'x-tos-server-side-encryption'
}
for k, v in optional_headers_map.items():
if k in kwargs:
request_headers[v] = kwargs[k]
# 发送请求
response = requests.put(signed_url, data=file_content, headers=request_headers)
return {
'success': response.status_code == 200,
'status_code': response.status_code,
'headers': dict(response.headers),
'etag': response.headers.get('ETag', ''),
'version_id': response.headers.get('x-tos-version-id', ''),
'crc64': response.headers.get('x-tos-hash-crc64ecma', '')
}
if __name__ == '__main__':
# AK\SK写入环境变量并且修改config内容
config = {
'ak': os.getenv('TOS_AK'),
'sk': os.getenv('TOS_SK'),
'endpoint': 'tos-cn-guangzhou.volces.com',
'region': 'cn-guangzhou',
'bucket_name': 'xxxxxxx'
}
client = TosClient(**config)
object_key = 'object-test.txt'
local_file = './test.txt'
# 上传文件
try:
result = client.upload_file(
file_path=local_file,
path=object_key
)
print("上传结果:", result)
except FileNotFoundError:
print(f"错误信息: {local_file} 未找到. 请创建该文件或调整 file_path.")
except Exception as e:
print(f"上传文件时发生错误: {e}")
# 生成预签名URL
url = client.generate_presigned_url(path=object_key, expires=3600)
print(f"预签名URL: {url}")
赏
使用支付宝打赏
使用微信打赏
若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏