Introduction#
GitHub this note shows how to develop Spark and PySparkProcessor in SageMaker
- Develop with Glue notebook
- Parse arguments
- PySparkProcessor
- sg-spark-glue-amazon-reviews notebook
- sg-create-spark-job
Glue Session#
To use Glue interactive session right in SageMaker studio notebook
- Update role for notebook
- Choose Spark Analytics kernel with Glue PySpark and Ray HERE
First, we need to attach AwsGlueSessionUserRestrictedServiceRole to the role, then update policy with iam PassRole and GetRole as
{"Version": "2012-10-17","Statement": [{"Sid": "unique_statement_id","Effect": "Allow","Action": ["iam:GetRole", "iam:PassRole", "sts:GetCallerIdentity"],"Resource": "*"}]}
Second, need to update the trust policy
{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Principal": {"Service": ["glue.amazonaws.com", "sagemaker.amazonaws.com"]},"Action": "sts:AssumeRole"}]}
Third, launch a notebook and select Spark Analytics and PySpark (Ray) kernel, configure the Glue session by magic
# %additional_python_modules matplotlib, numpy, pandas# %idle_timeout 60# %glue_version 3.0# %number_of_workers 5# %iam_role arn:aws:iam::413175686616:role/RoleForDataScientistUserProfile
Fourth, create Glue and Spark context
from awsglue.context import GlueContextfrom pyspark.context import SparkContextfrom pyspark.sql import SparkSessionsc = SparkContext.getOrCreate()glueContext = GlueContext(sc)# spark = glueContext.spark_sessionspark = SparkSession.builder.appName("PySparkApp").getOrCreate()
PySpark Code#
Read parquet from S3
df_parquet = spark.read.format("parquet").load(f"s3://{source_bucket_name}/parquet/")
Read csv from S3 with schema
from pyspark.sql.types import IntegerType, StringType, StructTypeschema = (StructType().add("marketplace", StringType(), True).add("customer_id", StringType(), True).add("review_id", StringType(), True).add("product_id", StringType(), True).add("product_parent", IntegerType(), True).add("product_title", StringType(), True).add("product_category", StringType(), True).add("star_rating", IntegerType(), True).add("helpful_vote", IntegerType(), True).add("total_vote", IntegerType(), True).add("vine", StringType(), True).add("verified_purchase", StringType(), True).add("review_headline", StringType(), True).add("review_body", StringType(), True).add("myyear", StringType(), True))
Then load csv from S3 into a Spark dataframe
df_csv = (spark.read.format("csv").option("header", True).schema(schema).option("delimiter", "\t").option("quote", '"').load(f"s3://{source_bucket_name}/tsv/")# .select("marketplace", "customer_id", "review_id", "product_id", "product_title"))
A very simple transform which select some columns of interest
df_clean = df_csv.select("marketplace", "customer_id", "product_id", "star_rating")
Finally, write the dataframe to S3
df_clean.write.format("parquet").save(f"s3://{dest_bucket_name}/amazon-reviews/")
Double check the result in S3
!aws s3 ls --summarize --human-readable --recursive s3://sagemaker-us-east-1-413175686616/amazon-reviews/
PySpark Preprocess#
This is the code for the PySparkProcessor
%%writefile ./spark-code/preprocess.pyimport argparsefrom pyspark.context import SparkContextfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import IntegerType, StringType, StructType# create spark sessionspark = SparkSession.builder.appName("PySparkApp").getOrCreate()# create schemaschema =def main():"""parse argument"""# define parserparser = argparse.ArgumentParser(description="app inputs and outputs")parser.add_argument("--source_bucket_name", type=str, help="s3 input bucket")parser.add_argument("--dest_bucket_name", type=str, help="output s3 prefix")# parse argumentargs = parser.parse_args()print(f"{args.source_bucket_name} and {args.dest_bucket_name}")# read data from s3df_csv = (spark.read.format("csv").option("header", True).schema(schema).option("delimiter", "\t").option("quote", '"').load(f"s3://{args.source_bucket_name}/tsv/"))# transform and feature engineerdf_clean = df_csv.where("marketplace='US'").select("marketplace", "customer_id", "product_id", "star_rating")# write data to s3df_clean.write.format("parquet").save(f"s3://{args.dest_bucket_name}/amazon-reviews/")if __name__ == "__main__":main()
Parse Arguments#
Pass arguments from run command to the job script by using argparse. First defin parser in the main function, for example, main.py
parser = argparse.ArgumentParser(description="app inputs and outputs")parser = argparse.ArgumentParser(description="app inputs and outputs")parser.add_argument("--source_bucket_name", type=str, help="s3 input bucket")parser.add_argument("--dest_bucket_name", type=str, help="output s3 prefix")args = parser.parse_args()
Then pass arguments from running command, similar with container
python main.py --s3_input_bucket bucket-name
PySparkProcessor#
Create a PySparkProcessor processing job
spark_processor = PySparkProcessor(base_job_name="sm-spark",framework_version="3.1",role=role,instance_count=2,instance_type="ml.m5.xlarge",max_runtime_in_seconds=1200,)
then run the job
spark_processor.run(submit_app=f"{local_code_path}",arguments=["--s3_input_bucket",bucket,"--s3_input_key_prefix",input_prefix_abalone,"--s3_output_bucket",bucket,"--s3_output_key_prefix",input_preprocessed_prefix_abalone,],spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, prefix),logs=False,)
PySpark Code#
Create a SparkSession
import pysparkfrom pyspark.sql import SparkSessionspark = (SparkSession.builder.appName("PySparkApp").config("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.2.2").config("fs.s3a.aws.credentials.provider","com.amazonaws.auth.ContainerCredentialsProvider").config("fs.s3a.endpoint", "s3.amazonaws.com").getOrCreate())
Read data from S3 using PySpark
data_uri = f"s3a://amazon-reviews-pds/parquet/product_category=Automotive/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet"df_parquet = spark.read.format("parquet").load(data_uri)df_parquet.show(20)
Write data to S3 using PySpark
bucket = "sagemaker-us-east-1-413175686616"df_parquet.write.format("csv")\.option("header", True)\.option("delimiter", "\t")\.option("quote", '"')\.save(f"s3a://{bucket}/data-spark/amazon-reviews-2.csv")