Introduction#

GitHub this note shows how to develop Spark and PySparkProcessor in SageMaker

Glue Session#

To use Glue interactive session right in SageMaker studio notebook

  • Update role for notebook
  • Choose Spark Analytics kernel with Glue PySpark and Ray HERE

First, we need to attach AwsGlueSessionUserRestrictedServiceRole to the role, then update policy with iam PassRole and GetRole as

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "unique_statement_id",
"Effect": "Allow",
"Action": ["iam:GetRole", "iam:PassRole", "sts:GetCallerIdentity"],
"Resource": "*"
}
]
}

Second, need to update the trust policy

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": ["glue.amazonaws.com", "sagemaker.amazonaws.com"]
},
"Action": "sts:AssumeRole"
}
]
}

Third, launch a notebook and select Spark Analytics and PySpark (Ray) kernel, configure the Glue session by magic

# %additional_python_modules matplotlib, numpy, pandas
# %idle_timeout 60
# %glue_version 3.0
# %number_of_workers 5
# %iam_role arn:aws:iam::413175686616:role/RoleForDataScientistUserProfile

Fourth, create Glue and Spark context

from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
# spark = glueContext.spark_session
spark = SparkSession.builder.appName("PySparkApp").getOrCreate()

PySpark Code#

Read parquet from S3

df_parquet = spark.read.format("parquet").load(f"s3://{source_bucket_name}/parquet/")

Read csv from S3 with schema

from pyspark.sql.types import IntegerType, StringType, StructType
schema = (
StructType()
.add("marketplace", StringType(), True)
.add("customer_id", StringType(), True)
.add("review_id", StringType(), True)
.add("product_id", StringType(), True)
.add("product_parent", IntegerType(), True)
.add("product_title", StringType(), True)
.add("product_category", StringType(), True)
.add("star_rating", IntegerType(), True)
.add("helpful_vote", IntegerType(), True)
.add("total_vote", IntegerType(), True)
.add("vine", StringType(), True)
.add("verified_purchase", StringType(), True)
.add("review_headline", StringType(), True)
.add("review_body", StringType(), True)
.add("myyear", StringType(), True)
)

Then load csv from S3 into a Spark dataframe

df_csv = (
spark.read.format("csv")
.option("header", True)
.schema(schema)
.option("delimiter", "\t")
.option("quote", '"')
.load(f"s3://{source_bucket_name}/tsv/")
# .select("marketplace", "customer_id", "review_id", "product_id", "product_title")
)

A very simple transform which select some columns of interest

df_clean = df_csv.select("marketplace", "customer_id", "product_id", "star_rating")

Finally, write the dataframe to S3

df_clean.write.format("parquet").save(f"s3://{dest_bucket_name}/amazon-reviews/")

Double check the result in S3

!aws s3 ls --summarize --human-readable --recursive s3://sagemaker-us-east-1-413175686616/amazon-reviews/

PySpark Preprocess#

This is the code for the PySparkProcessor

%%writefile ./spark-code/preprocess.py
import argparse
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructType
# create spark session
spark = SparkSession.builder.appName("PySparkApp").getOrCreate()
# create schema
schema =
def main():
"""
parse argument
"""
# define parser
parser = argparse.ArgumentParser(description="app inputs and outputs")
parser.add_argument("--source_bucket_name", type=str, help="s3 input bucket")
parser.add_argument("--dest_bucket_name", type=str, help="output s3 prefix")
# parse argument
args = parser.parse_args()
print(f"{args.source_bucket_name} and {args.dest_bucket_name}")
# read data from s3
df_csv = (
spark.read.format("csv")
.option("header", True)
.schema(schema)
.option("delimiter", "\t")
.option("quote", '"')
.load(f"s3://{args.source_bucket_name}/tsv/")
)
# transform and feature engineer
df_clean = df_csv.where("marketplace='US'").select(
"marketplace", "customer_id", "product_id", "star_rating"
)
# write data to s3
df_clean.write.format("parquet").save(
f"s3://{args.dest_bucket_name}/amazon-reviews/"
)
if __name__ == "__main__":
main()

Parse Arguments#

Pass arguments from run command to the job script by using argparse. First defin parser in the main function, for example, main.py

parser = argparse.ArgumentParser(description="app inputs and outputs")
parser = argparse.ArgumentParser(description="app inputs and outputs")
parser.add_argument("--source_bucket_name", type=str, help="s3 input bucket")
parser.add_argument("--dest_bucket_name", type=str, help="output s3 prefix")
args = parser.parse_args()

Then pass arguments from running command, similar with container

python main.py --s3_input_bucket bucket-name

PySparkProcessor#

Create a PySparkProcessor processing job

spark_processor = PySparkProcessor(
base_job_name="sm-spark",
framework_version="3.1",
role=role,
instance_count=2,
instance_type="ml.m5.xlarge",
max_runtime_in_seconds=1200,
)

then run the job

spark_processor.run(
submit_app=f"{local_code_path}",
arguments=[
"--s3_input_bucket",
bucket,
"--s3_input_key_prefix",
input_prefix_abalone,
"--s3_output_bucket",
bucket,
"--s3_output_key_prefix",
input_preprocessed_prefix_abalone,
],
spark_event_logs_s3_uri="s3://{}/{}/spark_event_logs".format(bucket, prefix),
logs=False,
)

PySpark Code#

Create a SparkSession

import pyspark
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("PySparkApp")
.config(
"spark.jars.packages",
"org.apache.hadoop:hadoop-aws:3.2.2")
.config(
"fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.ContainerCredentialsProvider")
.config("fs.s3a.endpoint", "s3.amazonaws.com")
.getOrCreate()
)

Read data from S3 using PySpark

data_uri = f"s3a://amazon-reviews-pds/parquet/product_category=Automotive/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet"
df_parquet = spark.read.format("parquet").load(data_uri)
df_parquet.show(20)

Write data to S3 using PySpark

bucket = "sagemaker-us-east-1-413175686616"
df_parquet.write.format("csv")\
.option("header", True)\
.option("delimiter", "\t")\
.option("quote", '"')\
.save(f"s3a://{bucket}/data-spark/amazon-reviews-2.csv")

Reference#