Skip to content

Commit 9642a0f

Browse files
committed
Modify upload_file api to support resumable uplaod
1 parent 8cfad1e commit 9642a0f

File tree

3 files changed

+120
-25
lines changed

3 files changed

+120
-25
lines changed

qcloud_cos/cos_client.py

+113-24
Original file line numberDiff line numberDiff line change
@@ -826,7 +826,6 @@ def restore_object(self, Bucket, Key, RestoreRequest={}, **kwargs):
826826
auth=CosS3Auth(self._conf._secret_id, self._conf._secret_key, Key),
827827
headers=headers,
828828
params=params)
829-
print rt.headers
830829
return None
831830

832831
# s3 bucket interface begin
@@ -1727,7 +1726,7 @@ def list_buckets(self, **kwargs):
17271726
return data
17281727

17291728
# Advanced interface
1730-
def _upload_part(self, bucket, key, local_path, offset, size, part_num, uploadid, md5_lst):
1729+
def _upload_part(self, bucket, key, local_path, offset, size, part_num, uploadid, md5_lst, resumable_flag, already_exist_parts):
17311730
"""从本地文件中读取分块, 上传单个分块,将结果记录在md5——list中
17321731
17331732
:param bucket(string): 存储桶名称.
@@ -1738,22 +1737,107 @@ def _upload_part(self, bucket, key, local_path, offset, size, part_num, uploadid
17381737
:param part_num(int): 上传分块的序号.
17391738
:param uploadid(string): 分块上传的uploadid.
17401739
:param md5_lst(list): 保存上传成功分块的MD5和序号.
1740+
:param resumable_flag(bool): 是否为断点续传.
1741+
:param already_exist_parts(dict): 断点续传情况下,保存已经上传的块的序号和Etag.
17411742
:return: None.
17421743
"""
1744+
# 如果是断点续传且该分块已经上传了则不用实际上传
1745+
if resumable_flag and part_num in already_exist_parts:
1746+
md5_lst.append({'PartNumber': part_num, 'ETag': already_exist_parts[part_num]})
1747+
else:
1748+
with open(local_path, 'rb') as fp:
1749+
fp.seek(offset, 0)
1750+
data = fp.read(size)
1751+
rt = self.upload_part(bucket, key, data, part_num, uploadid)
1752+
md5_lst.append({'PartNumber': part_num, 'ETag': rt['ETag']})
1753+
return None
1754+
1755+
def _get_resumable_uploadid(self, bucket, key):
1756+
"""从服务端获取未完成的分块上传任务,获取断点续传的uploadid
1757+
1758+
:param bucket(string): 存储桶名称.
1759+
:param key(string): 分块上传路径名.
1760+
:return(string): 断点续传的uploadid,如果不存在则返回None.
1761+
"""
1762+
multipart_response = self.list_multipart_uploads(
1763+
Bucket=bucket,
1764+
Prefix=key
1765+
)
1766+
if 'Upload' in multipart_response.keys():
1767+
if multipart_response['Upload'][0]['Key'] == key:
1768+
return multipart_response['Upload'][0]['UploadId']
1769+
1770+
return None
1771+
1772+
def _check_single_upload_part(self, local_path, offset, local_part_size, remote_part_size, remote_etag):
1773+
"""从本地文件中读取分块, 校验本地分块和服务端的分块信息
1774+
1775+
:param local_path(string): 本地文件路径名.
1776+
:param offset(int): 读取本地文件的分块偏移量.
1777+
:param local_part_size(int): 读取本地文件的分块大小.
1778+
:param remote_part_size(int): 服务端的文件的分块大小.
1779+
:param remote_etag(string): 服务端的文件Etag.
1780+
:return(bool): 本地单个分块的信息是否和服务端的分块信息一致
1781+
"""
1782+
if local_part_size != remote_part_size:
1783+
return False
17431784
with open(local_path, 'rb') as fp:
17441785
fp.seek(offset, 0)
1745-
data = fp.read(size)
1746-
rt = self.upload_part(bucket, key, data, part_num, uploadid)
1747-
md5_lst.append({'PartNumber': part_num, 'ETag': rt['ETag']})
1748-
return None
1786+
local_etag = get_raw_md5(fp.read(local_part_size))
1787+
if local_etag == remote_etag:
1788+
return True
1789+
return False
17491790

1750-
def upload_file(self, Bucket, Key, LocalFilePath, PartSize=10, MAXThread=5, **kwargs):
1751-
"""小于等于100MB的文件简单上传,大于等于100MB的文件使用分块上传
1791+
def _check_all_upload_parts(self, bucket, key, uploadid, local_path, parts_num, part_size, last_size, already_exist_parts):
1792+
"""获取所有已经上传的分块的信息,和本地的文件进行对比
1793+
1794+
:param bucket(string): 存储桶名称.
1795+
:param key(string): 分块上传路径名.
1796+
:param uploadid(string): 分块上传的uploadid
1797+
:param local_path(string): 本地文件的大小
1798+
:param parts_num(int): 本地文件的分块数
1799+
:param part_size(int): 本地文件的分块大小
1800+
:param last_size(int): 本地文件的最后一块分块大小
1801+
:param already_exist_parts(dict): 保存已经上传的分块的part_num和Etag
1802+
:return(bool): 本地文件是否通过校验,True为可以进行断点续传,False为不能进行断点续传
1803+
"""
1804+
parts_info = []
1805+
part_number_marker = 0
1806+
list_over_status = False
1807+
while list_over_status is False:
1808+
response = self.list_parts(
1809+
Bucket=bucket,
1810+
Key=key,
1811+
UploadId=uploadid,
1812+
PartNumberMarker=part_number_marker
1813+
)
1814+
parts_info.extend(response['Part'])
1815+
if response['IsTruncated'] == 'false':
1816+
list_over_status = True
1817+
else:
1818+
part_number_marker = int(response['NextMarker'])
1819+
for part in parts_info:
1820+
part_num = int(part['PartNumber'])
1821+
# 如果分块数量大于本地计算出的最大数量,校验失败
1822+
if part_num > parts_num:
1823+
return False
1824+
offset = (part_num - 1) * part_size
1825+
local_part_size = part_size
1826+
if part_num == parts_num:
1827+
local_part_size = last_size
1828+
# 有任何一块没有通过校验,则校验失败
1829+
if not self._check_single_upload_part(local_path, offset, local_part_size, int(part['Size']), part['ETag']):
1830+
return False
1831+
already_exist_parts[part_num] = part['ETag']
1832+
return True
1833+
1834+
def upload_file(self, Bucket, Key, LocalFilePath, PartSize=1, MAXThread=5, **kwargs):
1835+
"""小于等于20MB的文件简单上传,大于20MB的文件使用分块上传
17521836
17531837
:param Bucket(string): 存储桶名称.
17541838
:param key(string): 分块上传路径名.
17551839
:param LocalFilePath(string): 本地文件路径名.
1756-
:param PartSize(int): 分块的大小设置.
1840+
:param PartSize(int): 分块的大小设置,单位为MB.
17571841
:param MAXThread(int): 并发上传的最大线程数.
17581842
:param kwargs(dict): 设置请求headers.
17591843
:return(dict): 成功上传文件的元信息.
@@ -1768,18 +1852,19 @@ def upload_file(self, Bucket, Key, LocalFilePath, PartSize=10, MAXThread=5, **kw
17681852
Bucket='bucket',
17691853
Key=file_name,
17701854
LocalFilePath=file_name,
1855+
PartSize=10,
17711856
MAXThread=10,
17721857
CacheControl='no-cache',
17731858
ContentDisposition='download.txt'
17741859
)
17751860
"""
17761861
file_size = os.path.getsize(LocalFilePath)
1777-
if file_size <= 1024*1024*100:
1862+
if file_size <= 1024*1024*20:
17781863
with open(LocalFilePath, 'rb') as fp:
17791864
rt = self.put_object(Bucket=Bucket, Key=Key, Body=fp, **kwargs)
17801865
return rt
17811866
else:
1782-
part_size = 1024*1024*PartSize # 默认按照10MB分块,最大支持100G的文件,超过100G的分块数固定为10000
1867+
part_size = 1024*1024*PartSize # 默认按照1MB分块,最大支持10G的文件,超过10G的分块数固定为10000
17831868
last_size = 0 # 最后一块可以小于1MB
17841869
parts_num = file_size / part_size
17851870
last_size = file_size % part_size
@@ -1793,8 +1878,17 @@ def upload_file(self, Bucket, Key, LocalFilePath, PartSize=10, MAXThread=5, **kw
17931878
last_size += part_size
17941879

17951880
# 创建分块上传
1796-
rt = self.create_multipart_upload(Bucket=Bucket, Key=Key, **kwargs)
1797-
uploadid = rt['UploadId']
1881+
# 判断是否可以断点续传
1882+
resumable_flag = False
1883+
already_exist_parts = {}
1884+
uploadid = self._get_resumable_uploadid(Bucket, Key)
1885+
if uploadid is not None:
1886+
# 校验服务端返回的每个块的信息是否和本地的每个块的信息相同,只有校验通过的情况下才可以进行断点续传
1887+
resumable_flag = self._check_all_upload_parts(Bucket, Key, uploadid, LocalFilePath, parts_num, part_size, last_size, already_exist_parts)
1888+
# 如果不能断点续传,则创建一个新的分块上传
1889+
if not resumable_flag:
1890+
rt = self.create_multipart_upload(Bucket=Bucket, Key=Key, **kwargs)
1891+
uploadid = rt['UploadId']
17981892

17991893
# 上传分块
18001894
offset = 0 # 记录文件偏移量
@@ -1803,23 +1897,19 @@ def upload_file(self, Bucket, Key, LocalFilePath, PartSize=10, MAXThread=5, **kw
18031897

18041898
for i in range(1, parts_num+1):
18051899
if i == parts_num: # 最后一块
1806-
pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, file_size-offset, i, uploadid, lst)
1900+
pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, file_size-offset, i, uploadid, lst, resumable_flag, already_exist_parts)
18071901
else:
1808-
pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, part_size, i, uploadid, lst)
1902+
pool.add_task(self._upload_part, Bucket, Key, LocalFilePath, offset, part_size, i, uploadid, lst, resumable_flag, already_exist_parts)
18091903
offset += part_size
18101904

18111905
pool.wait_completion()
18121906
result = pool.get_result()
1813-
if not result['success_all']:
1907+
if not result['success_all'] or len(lst) != parts_num:
18141908
raise CosClientError('some upload_part fail after max_retry')
18151909
lst = sorted(lst, key=lambda x: x['PartNumber']) # 按PartNumber升序排列
18161910

1817-
# 完成分片上传
1818-
try:
1819-
rt = self.complete_multipart_upload(Bucket=Bucket, Key=Key, UploadId=uploadid, MultipartUpload={'Part': lst})
1820-
except Exception as e:
1821-
abort_response = self.abort_multipart_upload(Bucket=Bucket, Key=Key, UploadId=uploadid)
1822-
raise e
1911+
# 完成分块上传
1912+
rt = self.complete_multipart_upload(Bucket=Bucket, Key=Key, UploadId=uploadid, MultipartUpload={'Part': lst})
18231913
return rt
18241914

18251915
def _inner_head_object(self, CopySource):
@@ -2098,8 +2188,7 @@ def bucket_exists(self, Bucket):
20982188
"""判断一个存储桶是否存在
20992189
21002190
:param Bucket(string): 存储桶名称.
2101-
:return(bool): 存储桶
2102-
是否存在,返回True为存在,返回False为不存在.
2191+
:return(bool): 存储桶是否存在,返回True为存在,返回False为不存在.
21032192
21042193
.. code-block:: python
21052194

qcloud_cos/cos_comm.py

+6
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ def to_unicode(s):
6363
return s.decode('utf-8')
6464

6565

66+
def get_raw_md5(data):
67+
m2 = hashlib.md5(data)
68+
etag = '"' + str(m2.hexdigest()) + '"'
69+
return etag
70+
71+
6672
def get_md5(data):
6773
m2 = hashlib.md5(data)
6874
MD5 = base64.standard_b64encode(m2.digest())

ut/test.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def get_raw_md5(data):
3030

3131
def gen_file(path, size):
3232
_file = open(path, 'w')
33-
_file.seek(1024*1024*size)
33+
_file.seek(1024*1024*size-3)
3434
_file.write('cos')
3535
_file.close()
3636

0 commit comments

Comments
 (0)