Introduction#
- low level multi-part upload
- thread and callback
- multi-part configuration
Low Level#
Multipart upload allows you to upload a single object as a set of parts. Each part is a contiguous portion of the object's data. You can upload these object parts independently and in any order. If transmission of any part fails, you can retransmit that part without affecting other parts. After all parts of your object are uploaded, Amazon S3 assembles these parts and creates the object. In general, when your object size reaches 100 MB, you should consider using multipart uploads instead of uploading the object in a single operation.
- create uploading session id
- uploading each part with tag and part number
- when complete, s3 will merge those part
import boto3BUCKET = ""KEY = "multipart-upload/hello.txt"client = boto3.client("s3")# create multipart uploadmultipart_upload = client.create_multipart_upload(Bucket=BUCKET,Key=KEY,)# get multipart uploadidupload_id = multipart_upload["UploadId"]etags = []parts = []# upload partsfor k in range(10):# upload a partupload_part = client.upload_part(Body="hello"*1024*1024 + f'{k}',Bucket=BUCKET,Key=KEY,PartNumber=k+1,UploadId = upload_id)# save etagsetags.append(upload_part["ETag"])# save partsparts.append({ 'ETag': upload_part["ETag"], 'PartNumber': k+1})# print etag and part numberprint(f'uploading part {k} etag {upload_part["ETag"]}')x = [{'ETag': tag, 'PartNumber': k+1} for k, tag in enumerate(etags)]print(parts)# complete the partsclient.complete_multipart_upload(Bucket=BUCKET,Key=KEY,UploadId=upload_id,MultipartUpload={'Parts': x})
High Level#
# haimtran 03 OCT 2022# test multipart upload using# high level sdk# show multi-thread handling the upload# ref: https://aws.amazon.com/premiumsupport/knowledge-center/s3-multipart-upload-cli/import sysimport threadingimport boto3from boto3.s3.transfer import TransferConfigs3 = boto3.resource("s3")class TransferCallBack:"""the s3 transfer manager periodically calls the __call__ methodthroughout the upload and download process so that it can take action,such as displaying the progress to the user"""def __init__(self, target_size) -> None:""" """self._target_size = target_sizeself._total_transferred = 0self._lock = threading.Lock()self.thread_infor = {}def __call__(self, bytes_transferred):"""this is periodically called by the s3 transfer manager"""thread = threading.current_thread()with self._lock:self._total_transferred += bytes_transferredif thread.ident not in self.thread_infor.keys():self.thread_infor[thread.ident] = bytes_transferredelse:self.thread_infor[thread.ident] += bytes_transferredtarget = self._target_sizesys.stdout.write(f"\r{self._total_transferred} of {target} transferred"f"({(self._total_transferred / target) * 100:.2f}%).")sys.stdout.flush()print(f"{self._total_transferred} of {target} thread {thread.ident}")def upload_with_default_configuration(local_file_path, bucket_name, object_key, file_size_mb):"""configuration"""transfer_callback = TransferCallBack(file_size_mb)s3.Bucket(bucket_name).upload_file(local_file_path, object_key, Callback=transfer_callback)return transfer_callback.thread_infordef upload_with_chunksize_and_meta(local_file_path, bucket_name, object_key, file_size_mb, metadata=None):"""control the chunk size"""transfer_callback = TransferCallBack(file_size_mb)config = TransferConfig(multipart_chunksize=1024 * 1024, max_concurrency=20)# extra_args = {"Metadata": metadata} if metadata else Nones3.Bucket(bucket_name).upload_file(local_file_path,object_key,Config=config,# ExtraArgs=extra_args,Callback=transfer_callback,)return transfer_callback.thread_infordef upload_with_high_threshold(local_file_path, bucket_name, object_key, file_size_mb):"""setting a multipath threshold larger than the size of the fileresults in the trasfer manager sending the file as a standard uploadinstead of a multipart upload"""transfer_callback = TransferCallBack(file_size_mb)config = TransferConfig(multipart_threshold=file_size_mb * 2)s3.Bucket(bucket_name).upload_file(local_file_path, object_key, Config=config, Callback=transfer_callback)return transfer_callback.thread_inforif __name__ == "__main__":# upload_with_default_configuration(# "./setup.mov",# bucket_name="",# object_key="demo.mov",# file_size_mb=1434217919# )thread_infor = upload_with_chunksize_and_meta("./setup.mov", "BUCKET_NAME", "multipart-upload/setup.mov", 1434217919)print(thread_infor)