From ad8679e9164e926d3c3d8669c9e76f52e4465cde Mon Sep 17 00:00:00 2001 From: Brian Chen Date: Fri, 24 Jun 2016 11:09:23 +1000 Subject: [PATCH] basic emr implementation quick emr resize implementation ass task group not force new add task group check empty slices clean up rename to initial_instance_count add task instance group as resource cluster resize core group clean up add name option log info clean up change log debug format clean up add missing security groups for master and slave add bootstrap actions add options for bootstrap action add tags option clean up fix for tags array support delimiters : = bootstrap actions fix add configurations item load local or remote config rename function support multiple bootstrap actions default value 0 for core group follow aws api able to create a master only tags use terraform tag schema option item for log_uri ec2_attribute as option add emr task group accTests add embedded json config add embedded json config add service_role and instance_profile add partial state support for either the "TERMINATED" or "TERMINATED_WITH_ERRORS" state not allowing to change name or instance_type for task group "core_instance_type" change into "Optional" and "Computed" apply MaxItems for ec2Attributes remove all debug "fmt.Println" clean up debug info and useless variable Expose visible_to_all_users as an option, default will be true remove debug info logging should happen before setId("") add hanChange checking first clean up debug log add some force new double check the core group existed add waiting and polling, until cluster up testcase add EMR cluster id and status checking clean up using common way to read ec2_attributes --- builtin/providers/aws/provider.go | 12 +- builtin/providers/aws/resource_aws_emr.go | 498 ++++++++++++++++++ .../aws/resource_aws_emr_task_group.go | 115 ++++ .../aws/resource_aws_emr_task_group_test.go | 176 +++++++ .../providers/aws/resource_aws_emr_test.go | 184 +++++++ .../aws/test-fixtures/emr_configurations.json | 28 + 6 files changed, 1008 insertions(+), 5 deletions(-) create mode 100644 builtin/providers/aws/resource_aws_emr.go create mode 100644 builtin/providers/aws/resource_aws_emr_task_group.go create mode 100644 builtin/providers/aws/resource_aws_emr_task_group_test.go create mode 100644 builtin/providers/aws/resource_aws_emr_test.go create mode 100644 builtin/providers/aws/test-fixtures/emr_configurations.json diff --git a/builtin/providers/aws/provider.go b/builtin/providers/aws/provider.go index 1afafa6754..70e741e0ea 100644 --- a/builtin/providers/aws/provider.go +++ b/builtin/providers/aws/provider.go @@ -232,11 +232,13 @@ func Provider() terraform.ResourceProvider { "aws_elasticsearch_domain": resourceAwsElasticSearchDomain(), "aws_elastictranscoder_pipeline": resourceAwsElasticTranscoderPipeline(), "aws_elastictranscoder_preset": resourceAwsElasticTranscoderPreset(), - "aws_elb": resourceAwsElb(), - "aws_elb_attachment": resourceAwsElbAttachment(), - "aws_flow_log": resourceAwsFlowLog(), - "aws_glacier_vault": resourceAwsGlacierVault(), - "aws_iam_access_key": resourceAwsIamAccessKey(), + "aws_elb": resourceAwsElb(), + "aws_elb_attachment": resourceAwsElbAttachment(), + "aws_emr": resourceAwsEMR(), + "aws_emr_task_group": resourceAwsEMRTaskGroup(), + "aws_flow_log": resourceAwsFlowLog(), + "aws_glacier_vault": resourceAwsGlacierVault(), + "aws_iam_access_key": resourceAwsIamAccessKey(), "aws_iam_account_password_policy": resourceAwsIamAccountPasswordPolicy(), "aws_iam_group_policy": resourceAwsIamGroupPolicy(), "aws_iam_group": resourceAwsIamGroup(), diff --git a/builtin/providers/aws/resource_aws_emr.go b/builtin/providers/aws/resource_aws_emr.go new file mode 100644 index 0000000000..76201cd03d --- /dev/null +++ b/builtin/providers/aws/resource_aws_emr.go @@ -0,0 +1,498 @@ +package aws + +import ( + "log" + + "encoding/json" + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/emr" + "github.com/hashicorp/terraform/helper/resource" + "github.com/hashicorp/terraform/helper/schema" + "io/ioutil" + "net/http" + "strings" + "time" +) + +func resourceAwsEMR() *schema.Resource { + return &schema.Resource{ + Create: resourceAwsEMRCreate, + Read: resourceAwsEMRRead, + Update: resourceAwsEMRUpdate, + Delete: resourceAwsEMRDelete, + Schema: map[string]*schema.Schema{ + "name": &schema.Schema{ + Type: schema.TypeString, + Required: true, + }, + "release_label": &schema.Schema{ + Type: schema.TypeString, + Required: true, + }, + "master_instance_type": &schema.Schema{ + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "core_instance_type": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + Computed: true, + }, + "core_instance_count": &schema.Schema{ + Type: schema.TypeInt, + Optional: true, + Default: 0, + }, + "log_uri": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "applications": &schema.Schema{ + Type: schema.TypeSet, + Optional: true, + ForceNew: true, + Elem: &schema.Schema{Type: schema.TypeString}, + Set: schema.HashString, + }, + "ec2_attributes": &schema.Schema{ + Type: schema.TypeList, + MaxItems: 1, + Optional: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "key_name": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "subnet_id": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "additional_master_security_groups": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "additional_slave_security_groups": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "emr_managed_master_security_group": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "emr_managed_slave_security_group": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "instance_profile": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + }, + }, + }, + "bootstrap_action": &schema.Schema{ + Type: schema.TypeSet, + Optional: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "name": &schema.Schema{ + Type: schema.TypeString, + Required: true, + }, + "path": &schema.Schema{ + Type: schema.TypeString, + Required: true, + }, + "args": &schema.Schema{ + Type: schema.TypeSet, + Optional: true, + Elem: &schema.Schema{Type: schema.TypeString}, + Set: schema.HashString, + }, + }, + }, + }, + "tags": tagsSchema(), + "configurations": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "service_role": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + "visible_to_all_users": &schema.Schema{ + Type: schema.TypeBool, + Optional: true, + Default: true, + }, + }, + } +} + +func resourceAwsEMRCreate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).emrconn + + log.Printf("[DEBUG] Creating EMR cluster") + masterInstanceType := d.Get("master_instance_type").(string) + coreInstanceType := masterInstanceType + if v, ok := d.GetOk("core_instance_type"); ok { + coreInstanceType = v.(string) + } + coreInstanceCount := d.Get("core_instance_count").(int) + + applications := d.Get("applications").(*schema.Set).List() + var userKey, subnet, extraMasterSecGrp, extraSlaveSecGrp, emrMasterSecGrp, emrSlaveSecGrp, instanceProfile, serviceRole string + instanceProfile = "EMR_EC2_DefaultRole" + + if a, ok := d.GetOk("ec2_attributes"); ok { + ec2Attributes := a.([]interface{}) + attributes := ec2Attributes[0].(map[string]interface{}) + userKey = attributes["key_name"].(string) + subnet = attributes["subnet_id"].(string) + extraMasterSecGrp = attributes["additional_master_security_groups"].(string) + extraSlaveSecGrp = attributes["additional_slave_security_groups"].(string) + emrMasterSecGrp = attributes["emr_managed_master_security_group"].(string) + emrSlaveSecGrp = attributes["emr_managed_slave_security_group"].(string) + + if len(strings.TrimSpace(attributes["instance_profile"].(string))) != 0 { + instanceProfile = strings.TrimSpace(attributes["instance_profile"].(string)) + } + } + + if v, ok := d.GetOk("service_role"); ok { + serviceRole = v.(string) + } else { + serviceRole = "EMR_DefaultRole" + } + + emrApps := expandApplications(applications) + + params := &emr.RunJobFlowInput{ + Instances: &emr.JobFlowInstancesConfig{ + Ec2KeyName: aws.String(userKey), + Ec2SubnetId: aws.String(subnet), + InstanceCount: aws.Int64(int64(coreInstanceCount + 1)), + KeepJobFlowAliveWhenNoSteps: aws.Bool(true), + MasterInstanceType: aws.String(masterInstanceType), + SlaveInstanceType: aws.String(coreInstanceType), + TerminationProtected: aws.Bool(false), + AdditionalMasterSecurityGroups: []*string{ + aws.String(extraMasterSecGrp), + }, + AdditionalSlaveSecurityGroups: []*string{ + aws.String(extraSlaveSecGrp), + }, + EmrManagedMasterSecurityGroup: aws.String(emrMasterSecGrp), + EmrManagedSlaveSecurityGroup: aws.String(emrSlaveSecGrp), + }, + Name: aws.String(d.Get("name").(string)), + Applications: emrApps, + + JobFlowRole: aws.String(instanceProfile), + ReleaseLabel: aws.String(d.Get("release_label").(string)), + ServiceRole: aws.String(serviceRole), + VisibleToAllUsers: aws.Bool(d.Get("visible_to_all_users").(bool)), + } + + if v, ok := d.GetOk("log_uri"); ok { + logUrl := v.(string) + params.LogUri = aws.String(logUrl) + } + if v, ok := d.GetOk("bootstrap_action"); ok { + bootstrapActions := v.(*schema.Set).List() + log.Printf("[DEBUG] %v\n", bootstrapActions) + params.BootstrapActions = expandBootstrapActions(bootstrapActions) + } + if v, ok := d.GetOk("tags"); ok { + tagsIn := v.(map[string]interface{}) + params.Tags = expandTags(tagsIn) + } + if v, ok := d.GetOk("configurations"); ok { + confUrl := v.(string) + params.Configurations = expandConfigures(confUrl) + } + + log.Printf("[DEBUG] EMR Cluster create options: %s", params) + resp, err := conn.RunJobFlow(params) + + if err != nil { + log.Printf("[ERROR] %s", err) + return err + } + + log.Printf("[DEBUG] Created EMR Cluster done...") + d.SetId(*resp.JobFlowId) + + log.Println( + "[INFO] Waiting for EMR Cluster to be available") + + stateConf := &resource.StateChangeConf{ + Pending: []string{"STARTING", "BOOTSTRAPPING"}, + Target: []string{"WAITING", "RUNNING"}, + Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta), + Timeout: 40 * time.Minute, + MinTimeout: 10 * time.Second, + Delay: 30 * time.Second, // Wait 30 secs before starting + } + + _, err = stateConf.WaitForState() + if err != nil { + return fmt.Errorf("[WARN] Error waiting for EMR Cluster state to be \"WAITING\": %s", err) + } + + return resourceAwsEMRRead(d, meta) +} + +func resourceAwsEMRRead(d *schema.ResourceData, meta interface{}) error { + emrconn := meta.(*AWSClient).emrconn + + req := &emr.DescribeClusterInput{ + ClusterId: aws.String(d.Id()), + } + + resp, err := emrconn.DescribeCluster(req) + if err != nil { + return fmt.Errorf("Error reading EMR cluster: %s", err) + } + + if resp.Cluster == nil { + d.SetId("") + log.Printf("[DEBUG] EMR Cluster (%s) not found", d.Id()) + return nil + } + + instance := resp.Cluster + + if instance.Status != nil { + if *resp.Cluster.Status.State == "TERMINATED" { + log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED already", d.Id()) + d.SetId("") + return nil + } + + if *resp.Cluster.Status.State == "TERMINATED_WITH_ERRORS" { + log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED_WITH_ERRORS already", d.Id()) + d.SetId("") + return nil + } + } + + instanceGroups, errGrps := loadGroups(d, meta) + if errGrps == nil { + coreGroup := findGroup(instanceGroups, "CORE") + if coreGroup != nil { + d.Set("core_instance_type", coreGroup.InstanceType) + } + } + + return nil +} + +func resourceAwsEMRUpdate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).emrconn + + if d.HasChange("core_instance_count") { + log.Printf("[DEBUG] Modify EMR cluster") + req := &emr.ListInstanceGroupsInput{ + ClusterId: aws.String(d.Id()), + } + + respGrps, errGrps := conn.ListInstanceGroups(req) + if errGrps != nil { + return fmt.Errorf("Error reading EMR cluster: %s", errGrps) + } + instanceGroups := respGrps.InstanceGroups + + coreInstanceCount := d.Get("core_instance_count").(int) + coreGroup := findGroup(instanceGroups, "CORE") + + params := &emr.ModifyInstanceGroupsInput{ + InstanceGroups: []*emr.InstanceGroupModifyConfig{ + { + InstanceGroupId: aws.String(*coreGroup.Id), + InstanceCount: aws.Int64(int64(coreInstanceCount)), + }, + }, + } + _, errModify := conn.ModifyInstanceGroups(params) + if errModify != nil { + log.Printf("[ERROR] %s", errModify) + return errModify + } + + log.Printf("[DEBUG] Modify EMR Cluster done...") + } + + return resourceAwsEMRRead(d, meta) +} + +func resourceAwsEMRDelete(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).emrconn + + req := &emr.TerminateJobFlowsInput{ + JobFlowIds: []*string{ + aws.String(d.Id()), + }, + } + + _, err := conn.TerminateJobFlows(req) + if err != nil { + log.Printf("[ERROR], %s", err) + return err + } + + d.SetId("") + return nil +} + +func expandApplications(apps []interface{}) []*emr.Application { + appOut := make([]*emr.Application, 0, len(apps)) + + for _, appName := range expandStringList(apps) { + app := &emr.Application{ + Name: appName, + } + appOut = append(appOut, app) + } + return appOut +} + +func loadGroups(d *schema.ResourceData, meta interface{}) ([]*emr.InstanceGroup, error) { + emrconn := meta.(*AWSClient).emrconn + reqGrps := &emr.ListInstanceGroupsInput{ + ClusterId: aws.String(d.Id()), + } + + respGrps, errGrps := emrconn.ListInstanceGroups(reqGrps) + if errGrps != nil { + return nil, fmt.Errorf("Error reading EMR cluster: %s", errGrps) + } + return respGrps.InstanceGroups, nil +} + +func findGroup(grps []*emr.InstanceGroup, typ string) *emr.InstanceGroup { + for _, grp := range grps { + if *grp.InstanceGroupType == typ { + return grp + } + } + return nil +} + +func expandTags(m map[string]interface{}) []*emr.Tag { + var result []*emr.Tag + for k, v := range m { + result = append(result, &emr.Tag{ + Key: aws.String(k), + Value: aws.String(v.(string)), + }) + } + + return result +} + +func expandBootstrapActions(bootstrapActions []interface{}) []*emr.BootstrapActionConfig { + actionsOut := []*emr.BootstrapActionConfig{} + + for _, raw := range bootstrapActions { + actionAttributes := raw.(map[string]interface{}) + actionName := actionAttributes["name"].(string) + actionPath := actionAttributes["path"].(string) + actionArgs := actionAttributes["args"].(*schema.Set).List() + + action := &emr.BootstrapActionConfig{ + Name: aws.String(actionName), + ScriptBootstrapAction: &emr.ScriptBootstrapActionConfig{ + Path: aws.String(actionPath), + Args: expandStringList(actionArgs), + }, + } + actionsOut = append(actionsOut, action) + } + + return actionsOut +} + +func expandConfigures(input string) []*emr.Configuration { + configsOut := []*emr.Configuration{} + if strings.HasPrefix(input, "http") { + readHttpJson(input, &configsOut) + } else if strings.HasSuffix(input, ".json") { + readLocalJson(input, &configsOut) + } else { + readBodyJson(input, &configsOut) + } + log.Printf("[DEBUG] Configures %v\n", configsOut) + + return configsOut +} + +func readHttpJson(url string, target interface{}) error { + r, err := http.Get(url) + if err != nil { + return err + } + defer r.Body.Close() + + return json.NewDecoder(r.Body).Decode(target) +} + +func readLocalJson(localFile string, target interface{}) error { + file, e := ioutil.ReadFile(localFile) + if e != nil { + log.Printf("[ERROR] %s", e) + return e + } + + return json.Unmarshal(file, target) +} + +func readBodyJson(body string, target interface{}) error { + log.Printf("[DEBUG] Raw Body %s\n", body) + err := json.Unmarshal([]byte(body), target) + if err != nil { + log.Printf("[ERROR] parsing JSON %s", err) + return err + } + return nil +} + +func resourceAwsEMRClusterStateRefreshFunc(d *schema.ResourceData, meta interface{}) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + conn := meta.(*AWSClient).emrconn + + log.Printf("[INFO] Reading EMR Cluster Information: %s", d.Id()) + params := &emr.DescribeClusterInput{ + ClusterId: aws.String(d.Id()), + } + + resp, err := conn.DescribeCluster(params) + + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + if "ClusterNotFound" == awsErr.Code() { + return 42, "destroyed", nil + } + } + log.Printf("[WARN] Error on retrieving EMR Cluster (%s) when waiting: %s", d.Id(), err) + return nil, "", err + } + + emrc := resp.Cluster + + if emrc == nil { + return 42, "destroyed", nil + } + + if resp.Cluster.Status != nil { + log.Printf("[DEBUG] EMR Cluster status (%s): %s", d.Id(), *resp.Cluster.Status) + } + + return emrc, *emrc.Status.State, nil + } +} diff --git a/builtin/providers/aws/resource_aws_emr_task_group.go b/builtin/providers/aws/resource_aws_emr_task_group.go new file mode 100644 index 0000000000..ae29572eea --- /dev/null +++ b/builtin/providers/aws/resource_aws_emr_task_group.go @@ -0,0 +1,115 @@ +package aws + +import ( + "log" + + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/emr" + "github.com/hashicorp/terraform/helper/schema" +) + +func resourceAwsEMRTaskGroup() *schema.Resource { + return &schema.Resource{ + Create: resourceAwsEMRTaskGroupCreate, + Read: resourceAwsEMRTaskGroupRead, + Update: resourceAwsEMRTaskGroupUpdate, + Delete: resourceAwsEMRTaskGroupDelete, + Schema: map[string]*schema.Schema{ + "cluster_id": &schema.Schema{ + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "instance_type": &schema.Schema{ + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "instance_count": &schema.Schema{ + Type: schema.TypeInt, + Optional: true, + Default: 60, + }, + "name": &schema.Schema{ + Type: schema.TypeString, + Optional: true, + }, + }, + } +} + +func resourceAwsEMRTaskGroupCreate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).emrconn + + clusterId := d.Get("cluster_id").(string) + instanceType := d.Get("instance_type").(string) + instanceCount := d.Get("instance_count").(int) + groupName := d.Get("name").(string) + + log.Printf("[DEBUG] Creating EMR task group") + params := &emr.AddInstanceGroupsInput{ + InstanceGroups: []*emr.InstanceGroupConfig{ + { + InstanceRole: aws.String("TASK"), + InstanceCount: aws.Int64(int64(instanceCount)), + InstanceType: aws.String(instanceType), + Name: aws.String(groupName), + }, + }, + JobFlowId: aws.String(clusterId), + } + resp, err := conn.AddInstanceGroups(params) + if err != nil { + log.Printf("[ERROR] %s", err) + return err + } + + log.Printf("[DEBUG] Created EMR task group finished: %#v", resp) + d.SetId(*resp.InstanceGroupIds[0]) + + return nil +} + +func resourceAwsEMRTaskGroupRead(d *schema.ResourceData, meta interface{}) error { + + return nil +} + +func resourceAwsEMRTaskGroupUpdate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).emrconn + + log.Printf("[DEBUG] Modify EMR task group") + instanceCount := d.Get("instance_count").(int) + + if d.HasChange("name") { + return fmt.Errorf("[WARN] Error updating task group, change name is not supported by api") + } + + if d.HasChange("instance_type") { + return fmt.Errorf("[WARN] Error updating task group, change instance_type is not supported by api") + } + + params := &emr.ModifyInstanceGroupsInput{ + InstanceGroups: []*emr.InstanceGroupModifyConfig{ + { + InstanceGroupId: aws.String(d.Id()), + InstanceCount: aws.Int64(int64(instanceCount)), + }, + }, + } + resp, err := conn.ModifyInstanceGroups(params) + if err != nil { + log.Printf("[ERROR] %s", err) + return err + } + + log.Printf("[DEBUG] Modify EMR task group finished: %#v", resp) + + return nil +} + +func resourceAwsEMRTaskGroupDelete(d *schema.ResourceData, meta interface{}) error { + + return nil +} diff --git a/builtin/providers/aws/resource_aws_emr_task_group_test.go b/builtin/providers/aws/resource_aws_emr_task_group_test.go new file mode 100644 index 0000000000..f541cd23dd --- /dev/null +++ b/builtin/providers/aws/resource_aws_emr_task_group_test.go @@ -0,0 +1,176 @@ +package aws + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/emr" + "github.com/hashicorp/terraform/helper/acctest" + "github.com/hashicorp/terraform/helper/resource" + "github.com/hashicorp/terraform/terraform" + "log" + "testing" +) + +func TestAccAWSEmrTaskGroup_basic(t *testing.T) { + var jobFlow emr.RunJobFlowOutput + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckAWSEmrTaskGroupDestroy, + Steps: []resource.TestStep{ + resource.TestStep{ + Config: testAccAWSEmrTaskGroupConfig, + Check: testAccCheckAWSEmrTaskGroupExists("aws_emr_task_group.task", &jobFlow), + }, + }, + }) +} + +func testAccCheckAWSEmrTaskGroupDestroy(s *terraform.State) error { + conn := testAccProvider.Meta().(*AWSClient).emrconn + + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_emr" { + continue + } + + params := &emr.DescribeClusterInput{ + ClusterId: aws.String(rs.Primary.ID), + } + + describe, err := conn.DescribeCluster(params) + + if err == nil { + if describe.Cluster != nil && + *describe.Cluster.Status.State == "WAITING" { + return fmt.Errorf("EMR Cluster still exists") + } + } + + providerErr, ok := err.(awserr.Error) + if !ok { + return err + } + + log.Printf("[ERROR] %v", providerErr) + } + + return nil +} + +func testAccCheckAWSEmrTaskGroupExists(n string, v *emr.RunJobFlowOutput) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Not found: %s", n) + } + if rs.Primary.ID == "" { + return fmt.Errorf("No task group id set") + } + conn := testAccProvider.Meta().(*AWSClient).emrconn + _, err := conn.DescribeCluster(&emr.DescribeClusterInput{ + ClusterId: aws.String(rs.Primary.Attributes["cluster_id"]), + }) + if err != nil { + return fmt.Errorf("EMR error: %v", err) + } + return nil + } +} + +var testAccAWSEmrTaskGroupConfig = fmt.Sprintf(` +provider "aws" { + region = "ap-southeast-2" +} + +resource "aws_emr" "tf-test-cluster" { + name = "emr-%s" + release_label = "emr-4.6.0" + applications = ["Spark"] + + ec2_attributes { + subnet_id = "${aws_subnet.main.id}" + emr_managed_master_security_group = "${aws_security_group.allow_all.id}" + emr_managed_slave_security_group = "${aws_security_group.allow_all.id}" + } + + master_instance_type = "m3.xlarge" + core_instance_type = "m3.xlarge" + core_instance_count = 1 + + tags { + role = "rolename" + dns_zone = "env_zone" + env = "env" + name = "name-env" + } + + bootstrap_action { + path ="s3://elasticmapreduce/bootstrap-actions/run-if" + name ="runif" + args =["instance.isMaster=true","echo running on master node"] + } + + configurations = "test-fixtures/emr_configurations.json" +} + +resource "aws_emr_task_group" "task" { + cluster_id = "${aws_emr.tf-test-cluster.id}" + instance_count = 1 + instance_type = "m3.xlarge" +} + +resource "aws_security_group" "allow_all" { + name = "allow_all" + description = "Allow all inbound traffic" + vpc_id = "${aws_vpc.main.id}" + + ingress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + + depends_on = ["aws_subnet.main"] + lifecycle { + ignore_changes = ["ingress", "egress"] + } +} + +resource "aws_vpc" "main" { + cidr_block = "168.31.0.0/16" + enable_dns_hostnames = true +} + +resource "aws_subnet" "main" { + vpc_id = "${aws_vpc.main.id}" + cidr_block = "168.31.0.0/20" +# map_public_ip_on_launch = true +} + +resource "aws_internet_gateway" "gw" { + vpc_id = "${aws_vpc.main.id}" +} + +resource "aws_route_table" "r" { + vpc_id = "${aws_vpc.main.id}" + + route { + cidr_block = "0.0.0.0/0" + gateway_id = "${aws_internet_gateway.gw.id}" + } +} + +resource "aws_main_route_table_association" "a" { + vpc_id = "${aws_vpc.main.id}" + route_table_id = "${aws_route_table.r.id}" +} +`, acctest.RandString(10)) diff --git a/builtin/providers/aws/resource_aws_emr_test.go b/builtin/providers/aws/resource_aws_emr_test.go new file mode 100644 index 0000000000..2f57bca7fe --- /dev/null +++ b/builtin/providers/aws/resource_aws_emr_test.go @@ -0,0 +1,184 @@ +package aws + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/emr" + "github.com/hashicorp/terraform/helper/acctest" + "github.com/hashicorp/terraform/helper/resource" + "github.com/hashicorp/terraform/terraform" + "log" + "testing" +) + +func TestAccAWSEmrCluster_basic(t *testing.T) { + var jobFlow emr.RunJobFlowOutput + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckAWSEmrDestroy, + Steps: []resource.TestStep{ + resource.TestStep{ + Config: testAccAWSEmrClusterConfig, + Check: testAccCheckAWSEmrClusterExists("aws_emr.tf-test-cluster", &jobFlow), + }, + }, + }) +} + +func testAccCheckAWSEmrDestroy(s *terraform.State) error { + conn := testAccProvider.Meta().(*AWSClient).emrconn + + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_emr" { + continue + } + + params := &emr.DescribeClusterInput{ + ClusterId: aws.String(rs.Primary.ID), + } + + describe, err := conn.DescribeCluster(params) + + if err == nil { + if describe.Cluster != nil && + *describe.Cluster.Status.State == "WAITING" { + return fmt.Errorf("EMR Cluster still exists") + } + } + + providerErr, ok := err.(awserr.Error) + if !ok { + return err + } + + log.Printf("[ERROR] %v", providerErr) + } + + return nil +} + +func testAccCheckAWSEmrClusterExists(n string, v *emr.RunJobFlowOutput) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Not found: %s", n) + } + if rs.Primary.ID == "" { + return fmt.Errorf("No cluster id set") + } + conn := testAccProvider.Meta().(*AWSClient).emrconn + describe, err := conn.DescribeCluster(&emr.DescribeClusterInput{ + ClusterId: aws.String(rs.Primary.ID), + }) + if err != nil { + return fmt.Errorf("EMR error: %v", err) + } + + if describe.Cluster != nil && + *describe.Cluster.Id != rs.Primary.ID { + return fmt.Errorf("EMR cluser not found") + } + + if describe.Cluster != nil && + *describe.Cluster.Status.State != "WAITING" { + return fmt.Errorf("EMR cluser is not up yet") + } + + return nil + } +} + +var testAccAWSEmrClusterConfig = fmt.Sprintf(` +provider "aws" { + region = "ap-southeast-2" +} + +resource "aws_emr" "tf-test-cluster" { + name = "emr-%s" + release_label = "emr-4.6.0" + applications = ["Spark"] + + ec2_attributes { + subnet_id = "${aws_subnet.main.id}" + emr_managed_master_security_group = "${aws_security_group.allow_all.id}" + emr_managed_slave_security_group = "${aws_security_group.allow_all.id}" + } + + master_instance_type = "m3.xlarge" + core_instance_type = "m3.xlarge" + core_instance_count = 1 + + tags { + role = "rolename" + dns_zone = "env_zone" + env = "env" + name = "name-env" + } + + bootstrap_action { + path ="s3://elasticmapreduce/bootstrap-actions/run-if" + name ="runif" + args =["instance.isMaster=true","echo running on master node"] + } + + configurations = "test-fixtures/emr_configurations.json" + + depends_on = ["aws_main_route_table_association.a"] +} + +resource "aws_security_group" "allow_all" { + name = "allow_all" + description = "Allow all inbound traffic" + vpc_id = "${aws_vpc.main.id}" + + ingress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + + depends_on = ["aws_subnet.main"] + lifecycle { + ignore_changes = ["ingress", "egress"] + } +} + +resource "aws_vpc" "main" { + cidr_block = "168.31.0.0/16" + enable_dns_hostnames = true +} + +resource "aws_subnet" "main" { + vpc_id = "${aws_vpc.main.id}" + cidr_block = "168.31.0.0/20" +# map_public_ip_on_launch = true +} + +resource "aws_internet_gateway" "gw" { + vpc_id = "${aws_vpc.main.id}" +} + +resource "aws_route_table" "r" { + vpc_id = "${aws_vpc.main.id}" + + route { + cidr_block = "0.0.0.0/0" + gateway_id = "${aws_internet_gateway.gw.id}" + } +} + +resource "aws_main_route_table_association" "a" { + vpc_id = "${aws_vpc.main.id}" + route_table_id = "${aws_route_table.r.id}" +} + +`, acctest.RandString(10)) diff --git a/builtin/providers/aws/test-fixtures/emr_configurations.json b/builtin/providers/aws/test-fixtures/emr_configurations.json new file mode 100644 index 0000000000..48b22d9d3b --- /dev/null +++ b/builtin/providers/aws/test-fixtures/emr_configurations.json @@ -0,0 +1,28 @@ +[ + { + "Classification": "hadoop-env", + "Configurations": [ + { + "Classification": "export", + "Configurations": [], + "Properties": { + "JAVA_HOME": "/usr/lib/jvm/java-1.8.0" + } + } + ], + "Properties": {} + }, + { + "Classification": "spark-env", + "Configurations": [ + { + "Classification": "export", + "Configurations": [], + "Properties": { + "JAVA_HOME": "/usr/lib/jvm/java-1.8.0" + } + } + ], + "Properties": {} + } +] \ No newline at end of file