Adding firehose to elastic search support (#7839)

Add firehose elasticsearch configuration documentation

Adding CRUD for elastic search as firehose destination

Updated the firehose stream documentation to add elastic search as destination example.

Adding testing for es as firehose destination

Update the test case for es
This commit is contained in:
Andy Chan 2016-08-07 17:21:18 -07:00 committed by Paul Stack
parent 43ac64e2b7
commit 5ac8ae1338
3 changed files with 370 additions and 28 deletions

View File

@ -27,6 +27,14 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if len(value) > 64 {
errors = append(errors, fmt.Errorf(
"%q cannot be longer than 64 characters", k))
}
return
},
},
"destination": &schema.Schema{
@ -37,6 +45,14 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
value := v.(string)
return strings.ToLower(value)
},
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if value != "s3" && value != "redshift" && value != "elasticsearch" {
errors = append(errors, fmt.Errorf(
"%q must be one of 's3', 'redshift', 'elasticsearch'", k))
}
return
},
},
// elements removed in v0.7.0
@ -167,6 +183,113 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
},
},
"elasticsearch_configuration": &schema.Schema{
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"buffering_interval": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 300,
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(int)
if value < 60 || value > 900 {
errors = append(errors, fmt.Errorf(
"%q must be in the range from 60 to 900 seconds.", k))
}
return
},
},
"buffering_size": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 5,
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(int)
if value < 1 || value > 100 {
errors = append(errors, fmt.Errorf(
"%q must be in the range from 1 to 100 MB.", k))
}
return
},
},
"domain_arn": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"index_name": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"index_rotation_period": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Default: "OneDay",
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if value != "NoRotation" && value != "OneHour" && value != "OneDay" && value != "OneWeek" && value != "OneMonth" {
errors = append(errors, fmt.Errorf(
"%q must be one of 'NoRotation', 'OneHour', 'OneDay', 'OneWeek', 'OneMonth'", k))
}
return
},
},
"retry_duration": &schema.Schema{
Type: schema.TypeInt,
Optional: true,
Default: 300,
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(int)
if value < 0 || value > 7200 {
errors = append(errors, fmt.Errorf(
"%q must be in the range from 0 to 7200 seconds.", k))
}
return
},
},
"role_arn": &schema.Schema{
Type: schema.TypeString,
Required: true,
},
"s3_backup_mode": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Default: "FailedDocumentsOnly",
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if value != "FailedDocumentsOnly" && value != "AllDocuments" {
errors = append(errors, fmt.Errorf(
"%q must be one of 'FailedDocumentsOnly', 'AllDocuments'", k))
}
return
},
},
"type_name": &schema.Schema{
Type: schema.TypeString,
Optional: true,
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if len(value) > 100 {
errors = append(errors, fmt.Errorf(
"%q cannot be longer than 100 characters", k))
}
return
},
},
},
},
},
"arn": &schema.Schema{
Type: schema.TypeString,
Optional: true,
@ -188,15 +311,6 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
}
}
func validateConfiguration(d *schema.ResourceData) error {
destination := d.Get("destination").(string)
if destination != "s3" && destination != "redshift" {
return fmt.Errorf("[ERROR] Destination must be s3 or redshift")
}
return nil
}
func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration {
s3 := d.Get("s3_configuration").([]interface{})[0].(map[string]interface{})
@ -289,6 +403,85 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati
}, nil
}
func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3DestinationConfiguration) (*firehose.ElasticsearchDestinationConfiguration, error) {
esConfig, ok := d.GetOk("elasticsearch_configuration")
if !ok {
return nil, fmt.Errorf("[ERR] Error loading Elasticsearch Configuration for Kinesis Firehose: elasticsearch_configuration not found")
}
esList := esConfig.([]interface{})
es := esList[0].(map[string]interface{})
config := &firehose.ElasticsearchDestinationConfiguration{
BufferingHints: extractBufferingHints(es),
DomainARN: aws.String(es["domain_arn"].(string)),
IndexName: aws.String(es["index_name"].(string)),
RetryOptions: extractRetryOptions(es),
RoleARN: aws.String(es["role_arn"].(string)),
TypeName: aws.String(es["type_name"].(string)),
S3Configuration: s3Config,
}
if indexRotationPeriod, ok := es["index_rotation_period"]; ok {
config.IndexRotationPeriod = aws.String(indexRotationPeriod.(string))
}
if s3BackupMode, ok := es["s3_backup_mode"]; ok {
config.S3BackupMode = aws.String(s3BackupMode.(string))
}
return config, nil
}
func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3DestinationUpdate) (*firehose.ElasticsearchDestinationUpdate, error) {
esConfig, ok := d.GetOk("elasticsearch_configuration")
if !ok {
return nil, fmt.Errorf("[ERR] Error loading Elasticsearch Configuration for Kinesis Firehose: elasticsearch_configuration not found")
}
esList := esConfig.([]interface{})
es := esList[0].(map[string]interface{})
update := &firehose.ElasticsearchDestinationUpdate{
BufferingHints: extractBufferingHints(es),
DomainARN: aws.String(es["domain_arn"].(string)),
IndexName: aws.String(es["index_name"].(string)),
RetryOptions: extractRetryOptions(es),
RoleARN: aws.String(es["role_arn"].(string)),
TypeName: aws.String(es["type_name"].(string)),
S3Update: s3Update,
}
if indexRotationPeriod, ok := es["index_rotation_period"]; ok {
update.IndexRotationPeriod = aws.String(indexRotationPeriod.(string))
}
return update, nil
}
func extractBufferingHints(es map[string]interface{}) *firehose.ElasticsearchBufferingHints {
bufferingHints := &firehose.ElasticsearchBufferingHints{}
if bufferingInterval, ok := es["buffering_hints"].(int); ok {
bufferingHints.IntervalInSeconds = aws.Int64(int64(bufferingInterval))
}
if bufferingSize, ok := es["buffering_size"].(int); ok {
bufferingHints.SizeInMBs = aws.Int64(int64(bufferingSize))
}
return bufferingHints
}
func extractRetryOptions(es map[string]interface{}) *firehose.ElasticsearchRetryOptions {
retryOptions := &firehose.ElasticsearchRetryOptions{}
if retryDuration, ok := es["retry_duration"].(int); ok {
retryOptions.DurationInSeconds = aws.Int64(int64(retryDuration))
}
return retryOptions
}
func extractCopyCommandConfiguration(redshift map[string]interface{}) *firehose.CopyCommand {
cmd := &firehose.CopyCommand{
DataTableName: aws.String(redshift["data_table_name"].(string)),
@ -306,10 +499,6 @@ func extractCopyCommandConfiguration(redshift map[string]interface{}) *firehose.
func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).firehoseconn
if err := validateConfiguration(d); err != nil {
return err
}
sn := d.Get("name").(string)
s3Config := createS3Config(d)
@ -319,6 +508,12 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta
if d.Get("destination").(string) == "s3" {
createInput.S3DestinationConfiguration = s3Config
} else if d.Get("destination").(string) == "elasticsearch" {
esConfig, err := createElasticsearchConfig(d, s3Config)
if err != nil {
return err
}
createInput.ElasticsearchDestinationConfiguration = esConfig
} else {
rc, err := createRedshiftConfig(d, s3Config)
if err != nil {
@ -359,7 +554,7 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta
Pending: []string{"CREATING"},
Target: []string{"ACTIVE"},
Refresh: firehoseStreamStateRefreshFunc(conn, sn),
Timeout: 5 * time.Minute,
Timeout: 20 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}
@ -381,10 +576,6 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta
func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).firehoseconn
if err := validateConfiguration(d); err != nil {
return err
}
sn := d.Get("name").(string)
s3Config := updateS3Config(d)
@ -396,6 +587,12 @@ func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta
if d.Get("destination").(string) == "s3" {
updateInput.S3DestinationUpdate = s3Config
} else if d.Get("destination").(string) == "elasticsearch" {
esUpdate, err := updateElasticsearchConfig(d, s3Config)
if err != nil {
return err
}
updateInput.ElasticsearchDestinationUpdate = esUpdate
} else {
rc, err := updateRedshiftConfig(d, s3Config)
if err != nil {
@ -459,7 +656,7 @@ func resourceAwsKinesisFirehoseDeliveryStreamDelete(d *schema.ResourceData, meta
Pending: []string{"DELETING"},
Target: []string{"DESTROYED"},
Refresh: firehoseStreamStateRefreshFunc(conn, sn),
Timeout: 5 * time.Minute,
Timeout: 20 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}

View File

@ -29,7 +29,7 @@ func TestAccAWSKinesisFirehoseDeliveryStream_s3basic(t *testing.T) {
Config: config,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil),
),
},
},
@ -61,7 +61,7 @@ func TestAccAWSKinesisFirehoseDeliveryStream_s3ConfigUpdates(t *testing.T) {
Config: preConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil),
),
},
@ -69,7 +69,7 @@ func TestAccAWSKinesisFirehoseDeliveryStream_s3ConfigUpdates(t *testing.T) {
Config: postConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, updatedS3DestinationConfig, nil),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, updatedS3DestinationConfig, nil, nil),
),
},
},
@ -100,7 +100,7 @@ func TestAccAWSKinesisFirehoseDeliveryStream_RedshiftConfigUpdates(t *testing.T)
Config: preConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil),
),
},
@ -108,7 +108,46 @@ func TestAccAWSKinesisFirehoseDeliveryStream_RedshiftConfigUpdates(t *testing.T)
Config: postConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, updatedRedshiftConfig),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, updatedRedshiftConfig, nil),
),
},
},
})
}
func TestAccAWSKinesisFirehoseDeliveryStream_ElasticsearchConfigUpdates(t *testing.T) {
var stream firehose.DeliveryStreamDescription
ri := acctest.RandInt()
awsAccountId := os.Getenv("AWS_ACCOUNT_ID")
preConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_ElasticsearchBasic,
ri, awsAccountId, ri, ri, ri, awsAccountId, awsAccountId, ri, ri)
postConfig := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_ElasticsearchUpdate,
ri, awsAccountId, ri, ri, ri, awsAccountId, awsAccountId, ri, ri)
updatedElasticSearchConfig := &firehose.ElasticsearchDestinationDescription{
BufferingHints: &firehose.ElasticsearchBufferingHints{
IntervalInSeconds: aws.Int64(500),
},
}
resource.Test(t, resource.TestCase{
PreCheck: testAccKinesisFirehosePreCheck(t),
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: preConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream_es", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil),
),
},
resource.TestStep{
Config: postConfig,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream_es", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, updatedElasticSearchConfig),
),
},
},
@ -142,9 +181,7 @@ func testAccCheckKinesisFirehoseDeliveryStreamExists(n string, stream *firehose.
}
}
func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.DeliveryStreamDescription, s3config interface{}, redshiftConfig interface{}) resource.TestCheckFunc {
// *firehose.RedshiftDestinationDescription
// *firehose.S3DestinationDescription
func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.DeliveryStreamDescription, s3config interface{}, redshiftConfig interface{}, elasticsearchConfig interface{}) resource.TestCheckFunc {
return func(s *terraform.State) error {
if !strings.HasPrefix(*stream.DeliveryStreamName, "terraform-kinesis-firehose") {
return fmt.Errorf("Bad Stream name: %s", *stream.DeliveryStreamName)
@ -193,6 +230,19 @@ func testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(stream *firehose.Del
}
}
if elasticsearchConfig != nil {
es := elasticsearchConfig.(*firehose.ElasticsearchDestinationDescription)
// Range over the Stream Destinations, looking for the matching Elasticsearch destination
var match bool
for _, d := range stream.Destinations {
if d.ElasticsearchDestinationDescription != nil {
match = true
}
}
if !match {
return fmt.Errorf("Mismatch Elasticsearch Buffering Interval, expected: %s, got: %s", es, stream.Destinations)
}
}
}
return nil
}
@ -365,3 +415,59 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
data_table_columns = "test-col"
}
}`
var testAccKinesisFirehoseDeliveryStreamBaseElasticsearchConfig = testAccKinesisFirehoseDeliveryStreamBaseConfig + `
resource "aws_elasticsearch_domain" "test_cluster" {
domain_name = "es-test-%d"
access_policies = <<CONFIG
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::%s:root"
},
"Action": "es:*",
"Resource": "arn:aws:es:us-east-1:%s:domain/es-test-%d/*"
}
]
}
CONFIG
}`
var testAccKinesisFirehoseDeliveryStreamConfig_ElasticsearchBasic = testAccKinesisFirehoseDeliveryStreamBaseElasticsearchConfig + `
resource "aws_kinesis_firehose_delivery_stream" "test_stream_es" {
depends_on = ["aws_iam_role_policy.firehose", "aws_elasticsearch_domain.test_cluster"]
name = "terraform-kinesis-firehose-es-%d"
destination = "elasticsearch"
s3_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
}
elasticsearch_configuration {
domain_arn = "${aws_elasticsearch_domain.test_cluster.arn}"
role_arn = "${aws_iam_role.firehose.arn}"
index_name = "test"
type_name = "test"
}
}`
var testAccKinesisFirehoseDeliveryStreamConfig_ElasticsearchUpdate = testAccKinesisFirehoseDeliveryStreamBaseElasticsearchConfig + `
resource "aws_kinesis_firehose_delivery_stream" "test_stream_es" {
depends_on = ["aws_iam_role_policy.firehose", "aws_elasticsearch_domain.test_cluster"]
name = "terraform-kinesis-firehose-es-%d"
destination = "elasticsearch"
s3_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
}
elasticsearch_configuration {
domain_arn = "${aws_elasticsearch_domain.test_cluster.arn}"
role_arn = "${aws_iam_role.firehose.arn}"
index_name = "test"
type_name = "test"
buffering_interval = 500
}
}`

View File

@ -84,6 +84,33 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
}
```
### Elasticsearch Destination
```
resource "aws_elasticsearch_domain" "test_cluster" {
domain_name = "firehose-es-test"
}
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
name = "terraform-kinesis-firehose-test-stream"
destination = "redshift"
s3_configuration {
role_arn = "${aws_iam_role.firehose_role.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
buffer_size = 10
buffer_interval = 400
compression_format = "GZIP"
}
elasticsearch_configuration {
domain_arn = "${aws_elasticsearch_domain.test_cluster.arn}"
role_arn = "${aws_iam_role.firehose_role.arn}"
index_name = "test"
type_name = "test"
}
}
```
~> **NOTE:** Kinesis Firehose is currently only supported in us-east-1, us-west-2 and eu-west-1.
## Argument Reference
@ -92,7 +119,7 @@ The following arguments are supported:
* `name` - (Required) A name to identify the stream. This is unique to the
AWS account and region the Stream is created in.
* `destination` (Required) This is the destination to where the data is delivered. The only options are `s3` & `redshift`.
* `destination` (Required) This is the destination to where the data is delivered. The only options are `s3`, `redshift`, and `elasticsearch`.
* `s3_configuration` - (Required) Configuration options for the s3 destination (or the intermediate bucket if the destination
is redshift). More details are given below.
* `redshift_configuration` - (Optional) Configuration options if redshift is the destination.
@ -121,6 +148,18 @@ The `redshift_configuration` object supports the following:
* `copy_options` - (Optional) Copy options for copying the data from the s3 intermediate bucket into redshift.
* `data_table_columns` - (Optional) The data table columns that will be targeted by the copy command.
The `elasticsearch_configuration` object supports the following:
* `buffering_interval` - (Optional) Buffer incoming data for the specified period of time, in seconds between 60 to 900, before deliverying it to the destination. The default value is 300s.
* `buffering_size` - (Optional) Buffer incoming data to the specified size, in MBs between 1 to 100, before delivering it to the destination. The default value is 5MB.
* `domain_arn` - (Required) The ARN of the Amazon ES domain. The IAM role must have permission for `DescribeElasticsearchDomain`, `DescribeElasticsearchDomains`, and `DescribeElasticsearchDomainConfig` after assuming `RoleARN`. The pattern needs to be `arn:.*`.
* `index_name` - (Required) The Elasticsearch index name.
* `index_rotation_period` - (Optional) The Elasticsearch index rotation period. Index rotation appends a timestamp to the IndexName to facilitate expiration of old data. Valid values are `NoRotation`, `OneHour`, `OneDay`, `OneWeek`, and `OneMonth`. The default value is `OneDay`.
* `retry_duration` - (Optional) After an initial failure to deliver to Amazon Elasticsearch, the total amount of time, in seconds between 0 to 7200, during which Firehose re-attempts delivery (including the first attempt). After this time has elapsed, the failed documents are writtn to Amazon S3. The default value is 300s. There will be no retry if the value is 0.
* `role_arn` - (Required) The ARN of the IAM role to be assumed by Firehose for calling the Amazon ES Configuration API and for indexing documents. The pattern needs to be `arn:.*`.
* `s3_backup_mode` - (Optional) Defines how documents should be delivered to Aamazon S3. Valid values are `FailedDocumentsOnly` and `AllDocuments`. Default value is `FailedDocumentsOnly`.
* `type_name` - (Required) The Elasticsearch type name with maximum length of 100 characters.
## Attributes Reference
* `arn` - The Amazon Resource Name (ARN) specifying the Stream