Introduction#
GitHub this show how to use different processsors provided by SageMaker API.
- Base Processor
- Script Processor
- Sklearn Processor
We need to
- Choose container either by image url or framework version
- Provide your processing code or event your own container
- Provide data, save, download data from to a s3 bucket
Base processor#
the key here is the entrypoint where you put your command to run the code
processor = Processor(role=os.environ['ROLE'],image_uri=image_url,instance_count=1,instance_type='ml.m4.xlarge',entrypoint=["python", f"{container_base_path}/input/process-data.py","--processor=base-processor"])processor.run(job_name=f'processor-{strftime("%Y-%m-%d-%H-%M-%S")}',inputs=[ProcessingInput(source=data_input_path,destination=f"{container_base_path}/data/"),ProcessingInput(source=code_input_path,destination=f"{container_base_path}/input/")],outputs=[ProcessingOutput(source=f"{container_base_path}/output/train",destination=f"{data_output_path}/train",output_name="train",),ProcessingOutput(source=f"{container_base_path}/output/test",destination=f"{data_output_path}/test",output_name="test",),ProcessingOutput(source=f"{container_base_path}/output/validation",destination=f"{data_output_path}/validation",output_name="validation",),])
Script Processor#
different from the base professor a) command to run code should be python3 and b) the running code is specified in the run method
script_processor = ScriptProcessor(role=os.environ["ROLE"],image_uri=image_url,instance_count=1,instance_type="ml.m4.xlarge",command=["python3"],env={"PROCESSOR": "script-processor"},)script_processor.run(job_name=f'script-processor-{strftime("%Y-%m-%d-%H-%M-%S")}',code="process-data.py",inputs=[ProcessingInput(source=data_input_path,# process-data.py needs to know data heredestination=f"{container_base_path}/data/",),ProcessingInput(source=code_input_path,destination=f"{container_base_path}/input/",),],outputs=[ProcessingOutput(source=f"{container_base_path}/output/train",destination=f"{data_output_path}/train",output_name="train",),ProcessingOutput(source=f"{container_base_path}/output/test",destination=f"{data_output_path}/test",output_name="test",),ProcessingOutput(source=f"{container_base_path}/output/validation",destination=f"{data_output_path}/validation",output_name="validation",),],)
Sklearn Processor#
there is no entrypoint or command here, the code is specified in the run method
sklearn_processor = SKLearnProcessor(framework_version="0.20.0",role=os.environ["ROLE"],instance_count=1,instance_type="ml.m4.xlarge",env={"PROCESSOR": "sklearn-processor"},)sklearn_processor.run(job_name=f'sklearn-processor-{strftime("%Y-%m-%d-%H-%M-%S")}',code="process-data.py",inputs=[ProcessingInput(source=data_input_path,# process-data.py needs to know data located heredestination=f"{container_base_path}/data/",),ProcessingInput(source=code_input_path,destination=f"{container_base_path}/input/",),],outputs=[ProcessingOutput(source=f"{container_base_path}/output/train",destination=f"{data_output_path}/train",output_name="train",),ProcessingOutput(source=f"{container_base_path}/output/test",destination=f"{data_output_path}/test",output_name="test",),ProcessingOutput(source=f"{container_base_path}/output/validation",destination=f"{data_output_path}/validation",output_name="validation",),],)
PySpark Processor#
For big data processing, we can use a PySparkProcessor
spark_processor = PySparkProcessor(base_job_name="sm-spark",framework_version="3.1",role=os.environ["ROLE"],instance_count=2,instance_type="ml.m5.xlarge",max_runtime_in_seconds=1200)# run pyspark scriptspark_processor.run(submit_app="./pyspark_process_data.py",arguments=["--source_bucket_name",source_bucket_name,"--dest_bucket_name",dest_bucket_name,],logs=False)
The pyspark_process_data.py script just simple read data from S3 using Spark DataFrame, performan some transform, then write to a S3 destination. Please note that this job would take about 10 minutes.
PCA Transform#
Let create a parallel sagemaker job with multiple instances to do PCA transform on ECG signal. First let create a processing script which perform PCA on an ECG numpy array
import osimport argparsefrom time import strftimeimport globfrom datetime import datetimeimport pandas as pdfrom sklearn.decomposition import PCAimport boto3# container base pathcontainer_base_path = "/opt/ml/processing"# outputos.makedirs(f"{container_base_path}/output", exist_ok=True)def read_parameters():parser = argparse.ArgumentParser()parser.add_argument("--bucket", type=str, default="")params, _ = parser.parse_known_args()return params# parse argumentsargs = read_parameters()# bucket namebucket = args.bucket# s3 clientclient = boto3.client("s3")# pcapca = PCA(n_components=4)# loop through all input data filesfor file in glob.glob(f"{container_base_path}/data/*.csv"):# file namefile_name = file.split("/")[-1]print(file)# read datadf = pd.read_csv(file, usecols=[1, 2, 3, 4], dtype=float)# remove nandf.fillna(0.0, inplace=True)# to arrayecg = df.values# apply pcapecg = pca.fit_transform(ecg)# save resultpd.DataFrame(pecg).to_csv(f"{container_base_path}/output/pca_{file_name}", header=None, index=None)
Second create a sagemaker processing job using sklearn image
image_uri = image_uris.retrieve((region = "us-east-1"),(framework = "sklearn"),(version = "0.23-1"));
Create a sagemaker processing with instance count of 8 because the applied quota. Please check S3DataDistributionType to see how data files are distributed between instances.
processor = Processor(role=role,image_uri=image_uri,# check service quota from console depending on instance typeinstance_count=8,instance_type="ml.m4.xlarge",entrypoint=["python",f"{container_base_path}/script/pca-ecg.py",f"--bucket={bucket}",],)
Finally run the processing job
# it takes about 5 minutesprocessor.run(inputs=[ProcessingInput(source=data_input_path,destination=f"{container_base_path}/data/",s3_data_distribution_type="ShardedByS3Key",),ProcessingInput(source=code_input_path,destination=f"{container_base_path}/script/",),],outputs=[ProcessingOutput(source=f"{container_base_path}/output/",destination=f"{data_output_path}",output_name="data-pca",)],job_name=f'demo-{strftime("%Y-%m-%d-%H-%M-%S")}',)
Pipe Mode#
Processing job support two modes for accessing data from S3
- File Mode => data is downloaded from S3 to container
- Pipe Mode => directly stream data from S3 => need code update
For example
ProcessingInput(source=data_input_path,# process-data.py needs to know data located heredestination=f"{container_base_path}/data/",# Pipe mode to read directly from S3 => update process-data codes3_input_mode="Pipe"),
The process data code udpated
TODO
Entrypoint and CMD#
According to Page 74 Getting Started with Containerization
- ENTRYPOINT define the command ==> default /bin/sh -c
- CMD define parameters for the command ==> then overwrite at run time
For example, to run the following
ping 8.8.8.8 -c 3
We can configure ENTRYPOINT and CDM in a Dockerfile as the following
FROM alpine:latestENTRYPOINT ["ping"]CMD ["8.8.8.8", "-c", "3"]
Build a pinger image
docker image build -t pinger .
Then we can over-write the three parameters at run time as the following
docker container run --rm -it pinger -w 5 127.0.0.1
It is possible to over-write the entrypoing
docker container run --rm -it --entrypoing /bin/sh pinger
Please note that the default entrypont is /bin/sh -c when we use only CMD
FROM alpine:latestCMD wget -O - http://www.google.com
The actualy running command would be
/bin/sh -c "wget -O - http://www.google.com"