Skip to content

Commit

Permalink
Merge pull request #6 from electricimp/develop
Browse files Browse the repository at this point in the history
Update CI
  • Loading branch information
betzrhodes authored Apr 30, 2019
2 parents 083fec0 + ddaed64 commit 2a90456
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 89 deletions.
12 changes: 12 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
root = true

[*]
indent_style = space
indent_size = 4
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true

[*.md]
trim_trailing_whitespace = false
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.DS_Store
.builder-cache
1 change: 1 addition & 0 deletions .impt.test
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"deviceGroupId": "ca83d15f-fcb8-d3c2-ff69-b0d4acb20e9b",
"deviceGroupName" : "impFarm M",
"timeout": 180,
"stopOnFail": false,
"allowDisconnect": false,
Expand Down
11 changes: 0 additions & 11 deletions .travis.yml

This file was deleted.

162 changes: 84 additions & 78 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The AWSKinesisStreams library utilizes the [AWSRequestV4](https://github.com/ele
#require "AWSKinesisStreams.agent.lib.nut:1.1.0"
```

![Build Status](https://cse-ci.electricimp.com/app/rest/builds/buildType:(id:AWSKinesisStreams_BuildAndTest)/statusIcon)

## Example ##

A complete, step-by-step recipe can be found in the [Examples](./Examples) folder.
Expand Down Expand Up @@ -76,13 +78,13 @@ This class represents an AWS Kinesis Streams record: a combination of data attri

This method creates and returns an AWSKinesisStreams.Record object that can be written into an Amazon Kinesis stream using [AWSKinesisStreams.Producer](#awskinesisstreamsproducer-class) methods.

| Parameter | Data Type | Required? | Description |
| Parameter | Data Type | Required | Description |
| --- | --- | --- | --- |
| *data* | Blob or [JSON-compatible type](#json-compatible-type) | Yes | The record data |
| *partitionKey* | String | Yes | Identifies which shard in the stream the data record is assigned to (see the [Kinesis Streams documentation](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-PartitionKey)) |
| *explicitHashKey* | String | Optional | The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash (see the [Kinesis Streams documentation](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-ExplicitHashKey)) |
| *prevSequenceNumber* | String | Optional | See the [Kinesis Streams documentation](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-SequenceNumberForOrdering) |
| *encoder* | Function | Optional | A custom JSON encoder function for encoding the provided data (e.g. [JSONEncoder.encode](https://github.com/electricimp/JSONEncoder)) |
| *explicitHashKey* | String | No | The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash (see the [Kinesis Streams documentation](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-ExplicitHashKey)) |
| *prevSequenceNumber* | String | No | See the [Kinesis Streams documentation](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-SequenceNumberForOrdering) |
| *encoder* | Function | No | A custom JSON encoder function for encoding the provided data (eg. [*JSONEncoder.encode()*](https://github.com/electricimp/JSONEncoder)) |

## AWS_KINESIS_STREAMS_ENCRYPTION_TYPE Enum ##

Expand All @@ -103,7 +105,7 @@ This class allows the agent to write data records to a specific AWS Kinesis stre

Creates and returns an AWSKinesisStreams.Producer object. The constructor’s parameters are as follows:

| Parameter | Data Type | Required? | Description |
| Parameter | Data Type | Required | Description |
| --- | --- | --- | --- |
| *region* | String | Yes | The Region code of Amazon EC2 (see the [Amazon EC2 documentation](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html).) |
| *accessKeyId* | String | Yes | The access key ID of an AWS IAM user. See the [Kinesis Streams documentation](http://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-iam.html)) |
Expand All @@ -116,10 +118,10 @@ Creates and returns an AWSKinesisStreams.Producer object. The constructor’s pa

This method writes a single data record into the AWS Kinesis stream. For more information, please see the corresponding [Kinesis Streams REST API action](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html).

| Parameter | Data Type | Required? | Description |
| Parameter | Data Type | Required | Description |
| --- | --- | --- | --- |
| *record* | [AWSKinesisStreams.Record](#awskinesisstreamsrecord-class) | Yes | The record to be written |
| *callback* | Function | Optional | Executed once the operation is completed |
| *callback* | Function | No | Executed once the operation is completed |

The method returns nothing. The result of the operation may be obtained via the *callback* function, which has the following parameters:

Expand All @@ -132,10 +134,10 @@ The method returns nothing. The result of the operation may be obtained via the

This method writes multiple data records into the AWS Kinesis stream in a single request. Every record is processed by AWS individually. Some of the records may be written successfully but some may fail. For more information, please see the corresponding [Kinesis Streams REST API action](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html).

| Parameter | Data Type | Required? | Description |
| Parameter | Data Type | Required | Description |
| --- | --- | --- | --- |
| *records* | Array of [AWSKinesisStreams.Records](#awskinesisstreamsrecord-class) | Yes | The records to be written |
| *callback* | Function | Optional | Executed once the operation is completed |
| *callback* | Function | No | Executed once the operation is completed |

The method returns nothing. The result of the operation may be obtained via the *callback* function, which has the following parameters:

Expand Down Expand Up @@ -167,13 +169,13 @@ This class allows your code to read data records from a specific AWS Kinesis Str

This method creates and returns an AWSKinesisStreams.Consumer object. The constructor’s parameters are as follows:

| Parameter | Data Type | Required? | Description |
| Parameter | Data Type | Required | Description |
| --- | --- | --- | --- |
| *region* | String | Yes | The Region code of Amazon EC2 (see the [EC2 documentation](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html)) |
| *accessKeyId* | String | Yes | The access key ID of an AWS IAM user (see the [Kinesis Streams documentation](http://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-iam.html)) |
| *secretAccessKey* | String | Yes | The secret access key of an AWS IAM user (see the [Kinesis Streams documentation](http://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-iam.html)) |
| *streamName* | String | Yes | The name of an AWS Kinesis stream |
| *isBlob* | Boolean | Optional | If `true`, the AWSKinesisStreams.Consumer object will consider every received data record as a Squirrel blob. If `false` or not specified, the AWSKinesisStreams.Consumer object will consider every received data record as a JSON data and parse it into an appropriate [JSON-compatible type](#json-compatible-type) |
| *isBlob* | Boolean | No | If `true`, the AWSKinesisStreams.Consumer object will consider every received data record as a Squirrel blob. If `false` or not specified, the AWSKinesisStreams.Consumer object will consider every received data record as a JSON data and parse it into an appropriate [JSON-compatible type](#json-compatible-type) |

Before creating an AWSKinesisStreams.Consumer instance your code should know which type of data it is going to receive: binary data (a Squirrel blob) or a [JSON-compatible type](#json-compatible-type). This choice is specified in the AWSKinesisStreams.Consumer constructor and cannot be changed after that. In a complex case, your application can specify the data as a blob and parse it to a specific or custom type by itself.

Expand All @@ -183,7 +185,7 @@ Before creating an AWSKinesisStreams.Consumer instance your code should know whi

This method retrieves a list of the IDs of all the shards in the AWS Kinesis stream, including closed shards. Closed shards may still contain records your application may need to read.

| Parameter | Data Type | Required? | Description |
| Parameter | Data Type | Required | Description |
| --- | --- | --- | --- |
| *callback* | Function | Yes | Executed once the operation is completed |

Expand All @@ -200,7 +202,7 @@ This method allows your code to specify a start position from which the reading

**Note** Every shard iterator returned by *getShardIterator()* or *getRecords()* expires five minutes after it is returned. Your application should call *getRecords()* with the iterator before it expires, otherwise the call will fail and your code will need to obtain a new iterator using *getShardIterator()*.

| Parameter | Data Type | Required? | Description |
| Parameter | Data Type | Required | Description |
| --- | --- | --- | --- |
| *shardId* | String | Yes | The shard ID. |
| *type* | [AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE](#aws_kinesis_streams_shard_iterator_type-enum) | Yes | The shard iterator type. Determines how the shard iterator is used to start reading data records from the shard. Some of the types require the corresponding *typeOptions* to be specified |
Expand All @@ -223,12 +225,12 @@ The method returns nothing. The result of the operation may be obtained via the

This method allows your code to read a portion of data records using the specified shard iterator and returns the next shard iterator which can be used to read the next portion of data records by calling *getRecords()* again. Reading is always going to prefer older records over the latest. For more information, please see the corresponding [Kinesis Streams REST API action](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html).

| Parameter | Data Type | Required? | Description |
| Parameter | Data Type | Required | Description |
| --- | --- | --- | --- |
| *options* | Table | Yes | Options for the operation (see below) |
| *callback* | Function | Yes | Executed once the operation is completed |

| *options* key | Data Type | Required? | Description |
| *options* key | Data Type | Required | Description |
| --- | --- | --- | --- |
| *shardIterator* | String | Yes | The shard iterator that specifies the position in the shard from which the reading should be started |
| *limit* | Integer | Optional | The maximum number of data records to read. If not specified, the number of returned records is AWS Kinesis Streams specific (see the [Kinesis Streams documentation](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#Streams-GetRecords-request-Limit)) |
Expand Down Expand Up @@ -276,8 +278,8 @@ A type of Squirrel data which can be encoded/decoded into/from JSON, eg. table,
#require "AWSKinesisStreams.agent.lib.nut:1.1.0"
#require "JSONEncoder.class.nut:2.0.0"
//This class can be used to hold numbers larger than Squirrel can natively support (i.e. anything larger than 32-bit)
//and then be encoded as a number (rather than a string) when encoded with `JSONEncoder.encode`.
// This class can be used to hold numbers larger than Squirrel can natively support (ie. anything larger than 32-bit)
// and then be encoded as a number (rather than a string) when encoded with 'JSONEncoder.encode()'.
class JSONLiteralString {
_string = null;
Expand Down Expand Up @@ -305,34 +307,36 @@ producer <- AWSKinesisStreams.Producer(AWS_KINESIS_REGION, AWS_KINESIS_ACCESS_KE
// Writes single data record
producer.putRecord(AWSKinesisStreams.Record("Hello!", "partitionKey"), function (error, putResult) {
if (error) {
server.error("Data writing failed: " + error.details);
} else {
// Record written successfully
}
if (error) {
server.error("Data writing failed: " + error.details);
} else {
// Record written successfully
}
});
// Writes multiple records with different data structures
records <- [
AWSKinesisStreams.Record("test", "partitionKey1"),
AWSKinesisStreams.Record(12345, "partitionKey2"),
AWSKinesisStreams.Record({ "temperature" : 21, "humidity" : 60 }, "partitionKey3"),
AWSKinesisStreams.Record({ "a" : JSONLiteralString("123456789123456789") }, "partitionKey4", null, null, JSONEncoder.encode.bindenv(JSONEncoder)) //write record using custom encoder
AWSKinesisStreams.Record("test", "partitionKey1"),
AWSKinesisStreams.Record(12345, "partitionKey2"),
AWSKinesisStreams.Record({ "temperature" : 21, "humidity" : 60 }, "partitionKey3"),
// Write record using custom encoder
AWSKinesisStreams.Record({ "a" : JSONLiteralString("123456789123456789") }, "partitionKey4", null, null, JSONEncoder.encode.bindenv(JSONEncoder))
];
producer.putRecords(records, function (error, failedRecordCount, putResults) {
if (error) {
server.error("Data writing failed: " + error.details);
} else if (failedRecordCount > 0) {
server.log("Data writing partially failed:");
foreach (res in putResults) {
if (res.errorCode) {
server.log(format("%s: %s", res.errorCode, res.errorMessage));
}
}
} else {
// Records written successfully
if (error) {
server.error("Data writing failed: " + error.details);
} else if (failedRecordCount > 0) {
server.log("Data writing partially failed:");
foreach (res in putResults) {
if (res.errorCode) {
server.log(format("%s: %s", res.errorCode, res.errorMessage));
}
}
} else {
// Records written successfully
}
});
```

Expand All @@ -353,60 +357,62 @@ consumer <- AWSKinesisStreams.Consumer(AWS_KINESIS_REGION, AWS_KINESIS_ACCESS_KE
// Obtains the stream shards
consumer.getShards(function (error, shardIds) {
if (error) {
server.error("getShards failed: " + error.details);
} else {
foreach (shardId in shardIds) {
getShardIterator(shardId);
}
if (error) {
server.error("getShards failed: " + error.details);
} else {
foreach (shardId in shardIds) {
getShardIterator(shardId);
}
}
});
// Obtains shard iterator for the specified shard and starts reading records
function getShardIterator(shardId) {
consumer.getShardIterator(
shardId,
AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE.TRIM_HORIZON,
null,
function (error, shardIterator) {
if (error) {
server.error("getShardIterator failed: " + error.details);
} else {
// shard iterator obtained successfully
readRecords({ "shardIterator" : shardIterator, "limit" : 10 });
}
});
consumer.getShardIterator(
shardId,
AWS_KINESIS_STREAMS_SHARD_ITERATOR_TYPE.TRIM_HORIZON,
null,
function (error, shardIterator) {
if (error) {
server.error("getShardIterator failed: " + error.details);
} else {
// shard iterator obtained successfully
readRecords({ "shardIterator" : shardIterator, "limit" : 10 });
}
}
);
}
// Recursively reads records from the specified shard
function readRecords(options) {
consumer.getRecords(
options,
function (error, records, millisBehindLatest, nextOptions) {
if (error) {
server.error("Data reading failed: " + error.details);
} else {
if (records.len() == 0) {
// No new records
} else {
foreach (record in records) {
// Process records individually
}
}
if (nextOptions) {
// Read next portion of records
imp.wakeup(10.0, function () {
readRecords(nextOptions);
});
}
}
});
consumer.getRecords(
options,
function (error, records, millisBehindLatest, nextOptions) {
if (error) {
server.error("Data reading failed: " + error.details);
} else {
if (records.len() == 0) {
// No new records
} else {
foreach (record in records) {
// Process records individually
}
}
if (nextOptions) {
// Read next portion of records
imp.wakeup(10.0, function () {
readRecords(nextOptions);
});
}
}
}
);
}
```

Working examples are also provided in the [Examples](./Examples) directory and described [here](./Examples/README.md).

## License ##

The AWSKinesisStreams library is licensed under the [MIT License](./LICENSE).
This library is licensed under the [MIT License](./LICENSE).

0 comments on commit 2a90456

Please sign in to comment.