Skip to content

Commit 253e9a3

Browse files
authored
Merge pull request #425 from qiniu/features/retry-when-upload-expire
support retry expired parts with upload
2 parents eeac190 + 29ee67a commit 253e9a3

File tree

2 files changed

+135
-49
lines changed

2 files changed

+135
-49
lines changed

qiniu/services/storage/upload_progress_recorder.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,7 @@ def delete_upload_record(self, file_name, key):
6666
record_file_name = hashlib.md5(record_key.encode('utf-8')).hexdigest()
6767

6868
upload_record_file_path = os.path.join(self.record_folder, record_file_name)
69-
os.remove(upload_record_file_path)
69+
try:
70+
os.remove(upload_record_file_path)
71+
except OSError:
72+
pass

qiniu/services/storage/uploader.py

Lines changed: 131 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ def __init__(self, up_token, key, input_stream, file_name, data_size, hostscache
182182
self.file_name = file_name
183183
self.size = data_size
184184
self.hostscache_dir = hostscache_dir
185+
self.blockStatus = []
185186
self.params = params
186187
self.mime_type = mime_type
187188
self.progress_handler = progress_handler
@@ -199,7 +200,12 @@ def record_upload_progress(self, offset):
199200
'offset': offset,
200201
}
201202
if self.version == 'v1':
202-
record_data['contexts'] = [block['ctx'] for block in self.blockStatus]
203+
record_data['contexts'] = [
204+
{
205+
'ctx': block['ctx'],
206+
'expired_at': block['expired_at'] if 'expired_at' in block else 0
207+
} for block in self.blockStatus
208+
]
203209
elif self.version == 'v2':
204210
record_data['etags'] = self.blockStatus
205211
record_data['expired_at'] = self.expiredAt
@@ -230,7 +236,11 @@ def recovery_from_record(self):
230236
if self.version == 'v1':
231237
if not record.__contains__('contexts') or len(record['contexts']) == 0:
232238
return 0
233-
self.blockStatus = [{'ctx': ctx} for ctx in record['contexts']]
239+
self.blockStatus = [
240+
# 兼容旧版本的 ctx 持久化
241+
ctx if type(ctx) is dict else {'ctx': ctx, 'expired_at': 0}
242+
for ctx in record['contexts']
243+
]
234244
return record['offset']
235245
elif self.version == 'v2':
236246
if not record.__contains__('etags') or len(record['etags']) == 0 or \
@@ -242,67 +252,126 @@ def recovery_from_record(self):
242252

243253
def upload(self):
244254
"""上传操作"""
255+
if self.version == 'v1':
256+
return self._upload_v1()
257+
elif self.version == 'v2':
258+
return self._upload_v2()
259+
else:
260+
raise ValueError("version must choose v1 or v2 !")
261+
262+
def _upload_v1(self):
245263
self.blockStatus = []
246264
self.recovery_index = 1
247265
self.expiredAt = None
248266
self.uploadId = None
249267
self.get_bucket()
268+
self.part_size = config._BLOCK_SIZE
269+
250270
host = self.get_up_host()
251-
if self.version == 'v1':
252-
offset = self.recovery_from_record()
253-
self.part_size = config._BLOCK_SIZE
254-
elif self.version == 'v2':
255-
offset, self.uploadId, self.expiredAt = self.recovery_from_record()
256-
if offset > 0 and self.blockStatus != [] and self.uploadId is not None \
257-
and self.expiredAt is not None:
258-
self.recovery_index = self.blockStatus[-1]['partNumber'] + 1
271+
offset = self.recovery_from_record()
272+
is_resumed = offset > 0
273+
274+
# 检查原来的分片是否过期,如有则重传该分片
275+
for index, block_status in enumerate(self.blockStatus):
276+
if block_status.get('expired_at', 0) > time.time():
277+
self.input_stream.seek(self.part_size, os.SEEK_CUR)
259278
else:
260-
self.recovery_index = 1
261-
init_url = self.block_url_v2(host, self.bucket_name)
262-
self.uploadId, self.expiredAt = self.init_upload_task(init_url)
279+
block = self.input_stream.read(self.part_size)
280+
response, ok = self._make_block_with_retry(block, host)
281+
ret, info = response
282+
if not ok:
283+
return ret, info
284+
self.blockStatus[index] = ret
285+
self.record_upload_progress(offset)
286+
287+
# 从断点位置上传
288+
for block in _file_iter(self.input_stream, self.part_size, offset):
289+
length = len(block)
290+
response, ok = self._make_block_with_retry(block, host)
291+
ret, info = response
292+
if not ok:
293+
return ret, info
294+
295+
self.blockStatus.append(ret)
296+
offset += length
297+
self.record_upload_progress(offset)
298+
if callable(self.progress_handler):
299+
self.progress_handler(((len(self.blockStatus) - 1) * self.part_size) + len(block), self.size)
300+
301+
ret, info = self.make_file(host)
302+
if info.status_code == 200 or info.status_code == 701:
303+
self.upload_progress_recorder.delete_upload_record(self.file_name, self.key)
304+
if info.status_code == 701 and is_resumed:
305+
return self.upload()
306+
return ret, info
307+
308+
def _upload_v2(self):
309+
self.blockStatus = []
310+
self.recovery_index = 1
311+
self.expiredAt = None
312+
self.uploadId = None
313+
self.get_bucket()
314+
host = self.get_up_host()
315+
316+
offset, self.uploadId, self.expiredAt = self.recovery_from_record()
317+
is_resumed = False
318+
if offset > 0 and self.blockStatus != [] and self.uploadId is not None \
319+
and self.expiredAt is not None:
320+
self.recovery_index = self.blockStatus[-1]['partNumber'] + 1
321+
is_resumed = True
263322
else:
264-
raise ValueError("version must choose v1 or v2 !")
323+
self.recovery_index = 1
324+
init_url = self.block_url_v2(host, self.bucket_name)
325+
self.uploadId, self.expiredAt = self.init_upload_task(init_url)
326+
265327
for index, block in enumerate(_file_iter(self.input_stream, self.part_size, offset)):
266328
length = len(block)
267-
if self.version == 'v1':
268-
crc = crc32(block)
269-
ret, info = self.make_block(block, length, host)
270-
elif self.version == 'v2':
271-
index_ = index + self.recovery_index
272-
url = self.block_url_v2(host, self.bucket_name) + '/%s/%d' % (self.uploadId, index_)
273-
ret, info = self.make_block_v2(block, url)
329+
index_ = index + self.recovery_index
330+
url = self.block_url_v2(host, self.bucket_name) + '/%s/%d' % (self.uploadId, index_)
331+
ret, info = self.make_block_v2(block, url)
332+
if info.status_code == 612:
333+
self.upload_progress_recorder.delete_upload_record(self.file_name, self.key)
334+
if info.status_code == 612 and is_resumed:
335+
return self.upload()
274336
if ret is None and not info.need_retry():
275337
return ret, info
276338
if info.connect_failed():
277339
if config.get_default('default_zone').up_host_backup:
278340
host = config.get_default('default_zone').up_host_backup
279341
else:
280-
host = config.get_default('default_zone').get_up_host_backup_by_token(self.up_token,
281-
self.hostscache_dir)
282-
if self.version == 'v1':
283-
if info.need_retry() or crc != ret['crc32']:
284-
ret, info = self.make_block(block, length, host)
285-
if ret is None or crc != ret['crc32']:
286-
return ret, info
287-
elif self.version == 'v2':
288-
if info.need_retry():
289-
url = self.block_url_v2(host, self.bucket_name) + '/%s/%d' % (self.uploadId, index + 1)
290-
ret, info = self.make_block_v2(block, url)
291-
if ret is None:
292-
return ret, info
293-
del ret['md5']
294-
ret['partNumber'] = index_
342+
host = config.get_default('default_zone')\
343+
.get_up_host_backup_by_token(self.up_token, self.hostscache_dir)
344+
345+
if info.need_retry():
346+
url = self.block_url_v2(host, self.bucket_name) + '/%s/%d' % (self.uploadId, index + 1)
347+
ret, info = self.make_block_v2(block, url)
348+
if info.status_code == 612:
349+
self.upload_progress_recorder.delete_upload_record(self.file_name, self.key)
350+
if info.status_code == 612 and is_resumed:
351+
return self.upload()
352+
if ret is None:
353+
return ret, info
354+
del ret['md5']
355+
ret['partNumber'] = index_
295356
self.blockStatus.append(ret)
296357
offset += length
297358
self.record_upload_progress(offset)
298-
if (callable(self.progress_handler)):
359+
if callable(self.progress_handler):
299360
self.progress_handler(((len(self.blockStatus) - 1) * self.part_size) + len(block), self.size)
300-
if self.version == 'v1':
301-
return self.make_file(host)
302-
elif self.version == 'v2':
303-
make_file_url = self.block_url_v2(host, self.bucket_name) + '/%s' % self.uploadId
304-
return self.make_file_v2(self.blockStatus, make_file_url, self.file_name,
305-
self.mime_type, self.params, self.metadata)
361+
362+
make_file_url = self.block_url_v2(host, self.bucket_name) + '/%s' % self.uploadId
363+
ret, info = self.make_file_v2(
364+
self.blockStatus,
365+
make_file_url,
366+
self.file_name,
367+
self.mime_type,
368+
self.params,
369+
self.metadata)
370+
if info.status_code == 200 or info.status_code == 612:
371+
self.upload_progress_recorder.delete_upload_record(self.file_name, self.key)
372+
if info.status_code == 612 and is_resumed:
373+
return self.upload()
374+
return ret, info
306375

307376
def make_file_v2(self, block_status, url, file_name=None, mime_type=None, customVars=None, metadata=None):
308377
"""completeMultipartUpload"""
@@ -317,10 +386,7 @@ def make_file_v2(self, block_status, url, file_name=None, mime_type=None, custom
317386
'customVars': customVars,
318387
'metadata': metadata
319388
}
320-
ret, info = self.post_with_headers(url, json.dumps(data), headers=headers)
321-
if info.status_code == 200:
322-
self.upload_progress_recorder.delete_upload_record(self.file_name, self.key)
323-
return ret, info
389+
return self.post_with_headers(url, json.dumps(data), headers=headers)
324390

325391
def get_up_host(self):
326392
if config.get_default('default_zone').up_host:
@@ -329,6 +395,24 @@ def get_up_host(self):
329395
host = config.get_default('default_zone').get_up_host_by_token(self.up_token, self.hostscache_dir)
330396
return host
331397

398+
def _make_block_with_retry(self, block_data, up_host):
399+
length = len(block_data)
400+
crc = crc32(block_data)
401+
ret, info = self.make_block(block_data, length, up_host)
402+
if ret is None and not info.need_retry():
403+
return (ret, info), False
404+
if info.connect_failed():
405+
if config.get_default('default_zone').up_host_backup:
406+
up_host = config.get_default('default_zone').up_host_backup
407+
else:
408+
up_host = config.get_default('default_zone') \
409+
.get_up_host_backup_by_token(self.up_token, self.hostscache_dir)
410+
if info.need_retry() or crc != ret['crc32']:
411+
ret, info = self.make_block(block_data, length, up_host)
412+
if ret is None or crc != ret['crc32']:
413+
return (ret, info), False
414+
return (ret, info), True
415+
332416
def make_block(self, block, block_size, host):
333417
"""创建块"""
334418
url = self.block_url(host, block_size)
@@ -380,7 +464,6 @@ def make_file(self, host):
380464
"""创建文件"""
381465
url = self.file_url(host)
382466
body = ','.join([status['ctx'] for status in self.blockStatus])
383-
self.upload_progress_recorder.delete_upload_record(self.file_name, self.key)
384467
return self.post(url, body)
385468

386469
def init_upload_task(self, url):

0 commit comments

Comments
 (0)