Customers: Tinder, Cannon, Careem

What use cases/When to use?

  • DynamoDB
  • Single digit milisecond latency (micro-second)
  • Less operation and complexity (easy code change)
  • Handle read-heavy and bursty workloads
  • Increase performance while recuding costs

Concepts/Parameters:

Caching strategies:

Architecture#

aws_devops-Expriment drawio(3)

dax_performance

DAX Cluster Stack#

subnet group

// get existed vpc
const vpc = aws_ec2.Vpc.fromLookup(this, 'Vpc', {
vpcId: props.vpcId,
vpcName: props.vpcName
})
// subnet groups
const subnetGroup = new aws_dax.CfnSubnetGroup(this, 'SubnetGroupForDaxDemo', {
description: 'subnet group for dax demo',
subnetGroupName: 'SubnetGroupForDaxDemo',
// nice map
subnetIds: vpc.privateSubnets.map(subnet => subnet.subnetId)
})

parameter group

const parameterGroup = new aws_dax.CfnParameterGroup(
this,
'ParameterGroupDaxDemo',
{
parameterGroupName: 'ParameterGroupDaxDemo',
description: 'parameter gropu for dax cluster demo',
// default 5 minutes 300000 milisesconds
parameterNameValues: {
'query-ttl-millis': '300000',
'record-ttl-millis': '180000'
}
}
)

role for DAX

// role for dax cluster
const role = new aws_iam.Role(this, 'RoleForDaxClusterDmoe', {
roleName: 'RoleForDaxClusterDemo',
assumedBy: new aws_iam.ServicePrincipal('dax.amazonaws.com')
})
role.attachInlinePolicy(
new aws_iam.Policy(this, 'PolicyForDaxClusterDmoe', {
policyName: 'PolicyForDaxClusterDmoe',
statements: [
new aws_iam.PolicyStatement({
effect: aws_iam.Effect.ALLOW,
actions: ['dynamodb:*'],
resources: ['*']
})
]
})
)

security group for DAX cluster

// security group
const securityGroup = new aws_ec2.SecurityGroup(
this,
'SecurityGroupForDaxCluster',
{
securityGroupName: 'SecurityGroupForDaxCluster',
vpc: vpc
}
)
securityGroup.addIngressRule(
// production SG peer
aws_ec2.Peer.anyIpv4(),
// unencrypted 8111
aws_ec2.Port.tcp(8111)
)

create a DAX cluster

// create a dax cluster
new aws_dax.CfnCluster(this, 'DaxClusterDemo', {
clusterName: 'DaxClusterDemo',
// role to access ddb
iamRoleArn: role.roleArn,
// mem optimized node type
nodeType: 'dax.r4.large',
// 3: 1 primary and 2 read replics
replicationFactor: 3,
// automatically into az
// availabilityZones: [''],
// encryption TSL or NONE as default
clusterEndpointEncryptionType: 'NONE',
// notificationTopicArn: "",
parameterGroupName: parameterGroup.parameterGroupName,
// range of time maintenance of DAX software performed
// preferredMaintenanceWindow: "",
securityGroupIds: [securityGroup.securityGroupId],
subnetGroupName: subnetGroup.subnetGroupName
})

DAX Python Client#

# dax client
dax = amazondax.AmazonDaxClient.resource(endpoint_url=DAX_ENDPOINT)
# table
table = dax.Table(table_name)

get item by primary key

table.get_item(
Key={"UserId": user_id}
)

write an item to table

table.put_item(
Item={
'UserId': str(uuid.uuid1()),
"UserName": names.get_full_name(),
'GameTitle': game_title,
'Score': random.randint(1000, 6000),
'Wins': random.randint(0, 100),
'Losses': random.randint(5, 50),
'CreatedTime': int(datetime.datetime.now().timestamp() * 1000)
}
)

DAX Client Performance Check#

get table amazondax python client

def get_table(table_name: str, mode='ddb'):
"""
get table
"""
if mode=='dax':
# dax client
dax = amazondax.AmazonDaxClient.resource(endpoint_url=DAX_ENDPOINT)
# table
table = dax.Table(table_name)
else:
# create ddb client
ddb = boto3.resource('dynamodb')
# table
table = ddb.Table(table_name)
# return
return table

get item by primary key

def get_items_by_primary_key(table_name: str, mode='dax', no_user=100):
"""
"""
# buffer items
items, latencies = [], []
# buffer time lags
latencies = []
# table
table = get_table(table_name, mode)
# get user id
user_ids = scan_user_ids(table_name, no_user, mode=mode)
# loop get item
for user_id in user_ids:
start = time.perf_counter()
res = table.get_item(
Key={"UserId": user_id}
)
end = time.perf_counter()
# time lag in ms
duration = (end - start) * 1000
print(f'{mode} get-item {res["Item"]["UserId"]} latency: {duration:.4f}ms ')
# print(res)
# tag latency to each query
item = res['Item']
item['latency'] = duration
# parse items
items.append(item)
# buffer time lag
latencies.append(duration)
# return
return {"latencies": latencies[2:], "items": items[2:]}

DynamoDB Table and Prepare Data#

def create_table(table_name: str) -> None:
"""
create a table
"""
# db client, optional region specified here
db_client = boto3.client('dynamodb')
# create a table
res = db_client.create_table(
TableName=table_name,
AttributeDefinitions=[
{
'AttributeName': 'UserId',
'AttributeType': 'S'
},
{
'AttributeName': 'CreatedTime',
'AttributeType': 'N'
},
],
# KeySchema and Attribute should be the same
KeySchema=[
{
'AttributeName': 'UserId',
'KeyType': 'HASH'
},
{
'AttributeName': 'CreatedTime',
'KeyType': 'RANGE'
},
],
# PAY_PER_REQUEST when load is unpredictable
# PROVISIONED when load is predictable
BillingMode="PAY_PER_REQUEST"
)
# print table meta data
print(res)

write data to a table

def write_table(table_name: str, mode='dax') -> None:
"""
write data items to a table
"""
# table
table = get_table(table_name, mode)
# create a new item
for game_title in GAME_TITLES:
for k in range(NUM_USER):
res = table.put_item(
Item={
'UserId': str(uuid.uuid1()),
"UserName": names.get_full_name(),
'GameTitle': game_title,
'Score': random.randint(1000, 6000),
'Wins': random.randint(0, 100),
'Losses': random.randint(5, 50),
'CreatedTime': int(datetime.datetime.now().timestamp() * 1000)
}
)
print(k)

write multi-thread

def write_table_thread(table_name: str, mode='dax') -> None:
"""
"""
with ThreadPoolExecutor(max_workers=NUM_THREAD) as executor:
for k in range(1, NUM_THREAD):
executor.submit(write_table, TABLE_NAME)