Aws使用EMR对Dynamodb进行备份还原

最近需要对DynamoDB的数据进行迁移,由于现阶段将Dynamo的容量模式由AutoScaling修改成了按需,当使用之前的HIVE脚本时出现了Read throughput should not be less than 1的报错,报错跟DynamoDB的容量有关,继而准备用Ansible在执行脚本前将Dynamo的容量和模式进行调整,又发现了Ansible的Dynamo模块现阶段还没有支持On-Demand billing这个功能,接着又准备使用awscli命令行进行调整,经调研这个应该是可以对Dynamo进行调整的,文章后面有一些资料可以参考,现在由于AWS官方给出了回复,所以就不用搞这个了,直接升级版本就好了,更省事。关于版本问题可以看下面说的版本问题说明

使用Hive进行Table的备份和还原

需要使用EMR版本需要5.22.0以上

使用Hive导出Dynamo

使用的备份Hive脚本(backup_dynamo.q)

1
2
3
4
5
6
7
8
9
CREATE EXTERNAL TABLE ${DynamoDBName} (item map<string,string>)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "${DynamoDBName}");
CREATE EXTERNAL TABLE ${DynamoDBName}s3 (item map<string, string>)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
LOCATION '${OUTPUT}/';
SET dynamodb.throughput.read.percent=${SPEEDX};
INSERT OVERWRITE TABLE ${DynamoDBName}s3 SELECT * FROM ${DynamoDBName};

使用pipeline script,脚本内容如下(里面有两个参数变量一个是SPEED可选0.1-1.5,另一个是DBS是要备份的dynamodb的表名,多个表的话用,分割)
其中s3://backup-205/dynamodb/scripts/backup_dynamo.q是上面备份语句文件所在的位置
其中s3://backup-to-dir/dynamodb/是备份文件输出的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import groovy.json.JsonOutput
_EMRHiveScript = new String('s3://scripts/backup_dynamo.q')
def date = new Date()
datesubfolder = date.format('yyyyMMddhhmmz')
@NonCPS
def GenerateEMRShellTask(name, cmds) {
def map = [:]
map.Type = 'CUSTOM_JAR'
map.ActionOnFailure = 'CONTINUE'
map.Jar = "command-runner.jar"
// String Properties
map.Name = name
map.Args = cmds.split(' ')
return map
}
@NonCPS
def GenerateDynamoBackupStep(dbName, speed, output) {
if (speed > 1.5){
speed = 1.5
}
cmd = """hive-script --run-hive-script --args -f ${_EMRHiveScript} -d OUTPUT=${output} -d DynamoDBName=${dbName} -d SPEEDX=${speed}"""
return GenerateEMRShellTask(dbName+'Backup', cmd)
}
@NonCPS
def GetCmds() {
def speed = Float.valueOf(SPEED)
def ret = []
if(DBS.size() < 1) {
error "没选择任何数据库"
}
DBS.split(',').each{ it ->
ret << GenerateDynamoBackupStep(it, speed, 's3://backup-to-dir/dynamodb/' + it + '/' + datesubfolder)
}
def steps = JsonOutput.prettyPrint(JsonOutput.toJson(ret))
return """
aws emr create-cluster \
--region ap-southeast-1 \
--release-label emr-5.22.0 \
--auto-terminate \
--applications Name=Hadoop Name=Hive --name 'BackupDynamoDBFullSpeed' \
--tags 'Name=EMRBackup' \
--instance-groups '
[
{
"InstanceCount": 1,
"InstanceGroupType": "MASTER",
"InstanceType": "m3.xlarge",
"Name": "Master Instance Group"
},
{
"InstanceCount": 3,
"InstanceGroupType": "CORE",
"InstanceType": "m3.xlarge",
"Name": "Core Instance Group"
}
]
' \
--ec2-attributes '
{
"KeyName": "server-pem",
"InstanceProfile": "EMR_EC2_DefaultRole",
"SubnetId": "subnet-88a4dbfe",
"EmrManagedSlaveSecurityGroup": "your_set",
"EmrManagedMasterSecurityGroup": "your_set"
}
' \
--service-role EMR_DefaultRole --steps '
${steps}
'
"""
}
def shcmd=GetCmds()
node {
stage "echo cmds for debug"
echo shcmd
stage "run aws emr for backup"
withCredentials([[$class: 'StringBinding', credentialsId: 'EMRRUN_KEY_ID', variable: 'AWS_ACCESS_KEY_ID'], [$class: 'StringBinding', credentialsId: 'EMRRUN_KEY_SEC', variable: 'AWS_SECRET_ACCESS_KEY']]) {
sh shcmd
}
}

使用Hive导入Dynamo

导入的Hive脚本如下(restore_dynamo.q)

1
2
3
4
5
6
7
8
9
10
CREATE EXTERNAL TABLE ${DynamoDBName}s3 (item map<string, string>)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
LOCATION '${INPUT}/';
CREATE EXTERNAL TABLE ${DynamoDBName} (item map<string,string>)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "${DynamoDBName}");
SET dynamodb.throughput.write.percent=${SPEEDX};
INSERT OVERWRITE TABLE ${DynamoDBName} SELECT * FROM ${DynamoDBName}s3;

使用的pipeline script如下,其中需要3个参数配置(SPEED/datesubfolder/DBS)
其中SPEED跟备份一样是选择速率,datesubfolder这个需要选择某个备份输出的目录如201805280419CST,DBS这个是要还原的表名也就是daesubfolder备份目录的上以及目录
注意修改s3://backup-to-dir/dynamodb/这个参数以及脚本里面的集群参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import groovy.json.JsonOutput
_EMRHiveScript = new String('s3://scripts/restore_dynamo.q')
@NonCPS
def GenerateEMRShellTask(name, cmds) {
def map = [:]
map.Type = 'CUSTOM_JAR'
map.ActionOnFailure = 'CONTINUE'
map.Jar = "command-runner.jar"
// String Properties
map.Name = name
map.Args = cmds.split(' ')
return map
}
@NonCPS
def GenerateDynamoBackupStep(dbName, speed, input) {
if (speed > 1.5){
speed = 1.5
}
cmd = """hive-script --run-hive-script --args -f ${_EMRHiveScript} -d INPUT=${input} -d DynamoDBName=${dbName} -d SPEEDX=${speed}"""
return GenerateEMRShellTask(dbName+'Restore', cmd)
}
@NonCPS
def GetCmds() {
def speed = Float.valueOf(SPEED)
def ret = []
if(DBS.size() < 1) {
error "没选择任何数据库"
}
DBS.split(',').each{ it ->
ret << GenerateDynamoBackupStep(it, speed, 's3://backup-to-dir/dynamodb/' + it + '/' + datesubfolder)
}
def steps = JsonOutput.prettyPrint(JsonOutput.toJson(ret))
return """
aws emr create-cluster \
--region ap-southeast-1 \
--release-label emr-5.22.0 \
--auto-terminate \
--applications Name=Hadoop Name=Hive --name 'BackupDynamoDBFullSpeed' \
--tags 'Name=EMRBackup' \
--instance-groups '
[
{
"InstanceCount": 1,
"InstanceGroupType": "MASTER",
"InstanceType": "m3.xlarge",
"Name": "Master Instance Group"
},
{
"InstanceCount": 3,
"InstanceGroupType": "CORE",
"InstanceType": "m3.xlarge",
"Name": "Core Instance Group"
}
]
' \
--ec2-attributes '
{
"KeyName": "server-pem",
"InstanceProfile": "EMR_EC2_DefaultRole",
"SubnetId": "subnet-88a4dbfe",
"EmrManagedSlaveSecurityGroup": "your_set",
"EmrManagedMasterSecurityGroup": "your_set"
}
' \
--service-role EMR_DefaultRole --steps '${steps}'
"""
}
def shcmd=GetCmds()
node {
stage "echo cmds for debug"
echo shcmd
stage "run aws emr for backup"
withCredentials([[$class: 'StringBinding', credentialsId: 'EMRRUN_KEY_ID', variable: 'AWS_ACCESS_KEY_ID'], [$class: 'StringBinding', credentialsId: 'EMRRUN_KEY_SEC', variable: 'AWS_SECRET_ACCESS_KEY']]) {
sh shcmd
}
}

版本问题的说明

注意:使用EMR版本emr-4.7.2时会出现以下报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j.properties
WARNING: Configured write throughput of the dynamodb table HMTGift is less than the cluster map capacity. ClusterMapCapacity: 30 WriteThroughput: 0
WARNING: Writes to this table might result in a write outage on the table.
OK
Time taken: 1.786 seconds
OK
Time taken: 0.349 seconds
Query ID = hadoop_xxxx_xxxx-xxxx-xxxx
Total jobs = 3
Launching Job 1 out of 3
Number of reduce tasks is set to 0 since there's no reduce operator
java.lang.RuntimeException: Read throughput should not be less than 1. Read throughput percent: 0.0
at org.apache.hadoop.dynamodb.read.AbstractDynamoDBInputFormat.getSplits(AbstractDynamoDBInputFormat.java:51 )
at org.apache.hadoop.hive.ql.io.HiveInputFormat.addSplitsForGroup(HiveInputFormat.java:298 )
at org.apache.hadoop.hive.ql.io.HiveInputFormat.getSplitsInternal(HiveInputFormat.java:412 )
at org.apache.hadoop.hive.ql.io.HiveInputFormat.getSplits(HiveInputFormat.java:330 )
at org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getCombineSplits(CombineHiveInputFormat.java:311 )
at org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getSplitsInternal(CombineHiveInputFormat.java:519 )
at org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getSplits(CombineHiveInputFormat.java:463 )
at org.apache.hadoop.mapreduce.JobSubmitter.writeOldSplits(JobSubmitter.java:328 )
at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:320 )
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196 )
at org.apache.hadoop.mapreduce.Job$10.run (Job.java:1290 )
at org.apache.hadoop.mapreduce.Job$10.run (Job.java:1287 )
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415 )
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657 )
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287 )
at org.apache.hadoop.mapred.JobClient$1.run (JobClient.java:575 )
at org.apache.hadoop.mapred.JobClient$1.run (JobClient.java:570 )
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415 )
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657 )
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:570 )
at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:561 )
at org.apache.hadoop.hive.ql.exec.mr.ExecDriver.execute(ExecDriver.java:429 )
at org.apache.hadoop.hive.ql.exec.mr.MapRedTask.execute(MapRedTask.java:137 )
at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:160 )
at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85 )
at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1618 )
at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1379 )
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1192 )
at org.apache.hadoop.hive.ql.Driver.run (Driver.java:1019 )
at org.apache.hadoop.hive.ql.Driver.run (Driver.java:1009 )
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:201 )
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:153 )
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:364 )
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:299 )
at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:397 )
at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:413 )
at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:668 )
at org.apache.hadoop.hive.cli.CliDriver.run (CliDriver.java:631 )
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:570 )
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57 )
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43 )
at java.lang.reflect.Method.invoke(Method.java:606 )
at org.apache.hadoop.util.RunJar.run (RunJar.java:221 )
at org.apache.hadoop.util.RunJar.main(RunJar.java:136 )
Job Submission failed with exception 'java.lang.RuntimeException(Read throughput should not be less than 1. Read throughput percent: 0.0)'
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
Command exiting with ret '1'

给AWS提了Case,确定是EMR版本问题(这是一个已知issue,在5.22.0版本修复),AWS官方回复如下

从emr-5.22.0开始,可以成功运行Hive语句和on-demand table进行import/export操作
根据调查,这是老EMR版本中使用的EMR DDB connector的known issue,由于在EMR中Apache Hadoop, Hive, Spark会使用该connector去访问Dynamodb,老版本中DDB connector并不支持和on-demand配置类型的Dynamodb table操作,且并没有其他的workaround。该问题已经在EMR release 5.22中被修复。
我建议您使用emr-5.22.0及以上的版本来和on-demand Dynamodb table交互。如果您的使用条件不允许进行EMR升级,建议您将Dynamodb table转为预配置(provision)进行操作。

使用AWSCLI管理DynamoDB

这个AWS官网的文档很清晰,就不赘述了,最下面附有链接,简单例子如下
容量调整

1
aws --region ap-southeast-1 dynamodb update-table --table-name your_table_name --provisioned-throughput ReadCapacityUnits=300,WriteCapacityUnits=120

使用Ansible管理DynamoDB

目前使用Ansible还不能实现DynamoDB计费模式的更改(因为后来加入的按需付费功能),如果使用了按需付费就不能使用Ansible的Dynamo模块进行容量变更也不能更改Dynamodb的付费模式。目前Ansible还没有加入这个功能,感兴趣的话可以通过这个ISSUES进行追踪.

另外可以参考Ansible的Dynamo模块文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#https://docs.ansible.com/ansible/latest/modules/dynamodb_table_module.html
- hosts: localhost
remote_user: ec2-user
become: no
become_user: root
gather_facts: false
vars:
my_env_access_key: xxx
my_env_secret_key: xxx
tasks:
#205 and 209
- dynamodb_table:
name: my-table
region: us-east-1
aws_access_key: "{{ my_env_access_key }}"
aws_secret_key: "{{ my_env_secret_key }}"
read_capacity: 300
write_capacity: 30
with_items:
- "GMInfo"
- "HMTGift"

其他一些文档

关于DynamoDB吞吐量的文档
关于DynamoDB的一些限制
使用AWSCLI管理DynamoDB的文档