Skip to content

Commit

Permalink
Release of version 1.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
liuszeng committed Jun 28, 2018
1 parent 47363e9 commit 832f074
Show file tree
Hide file tree
Showing 22 changed files with 1,136 additions and 135 deletions.
547 changes: 449 additions & 98 deletions AWSIoTPythonSDK/MQTTLib.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion AWSIoTPythonSDK/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = "1.3.1"
__version__ = "1.4.0"


9 changes: 7 additions & 2 deletions AWSIoTPythonSDK/core/greengrass/discovery/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class DiscoveryInfoProvider(object):

REQUEST_TYPE_PREFIX = "GET "
PAYLOAD_PREFIX = "/greengrass/discover/thing/"
PAYLOAD_SUFFIX = " HTTP/1.1\r\n\r\n" # Space in the front
PAYLOAD_SUFFIX = " HTTP/1.1\r\n" # Space in the front
HOST_PREFIX = "Host: "
HOST_SUFFIX = "\r\n\r\n"
HTTP_PROTOCOL = r"HTTP/1.1 "
CONTENT_LENGTH = r"content-length: "
CONTENT_LENGTH_PATTERN = CONTENT_LENGTH + r"([0-9]+)\r\n"
Expand Down Expand Up @@ -311,7 +313,10 @@ def _send_discovery_request(self, ssl_sock, thing_name):
request = self.REQUEST_TYPE_PREFIX + \
self.PAYLOAD_PREFIX + \
thing_name + \
self.PAYLOAD_SUFFIX
self.PAYLOAD_SUFFIX + \
self.HOST_PREFIX + \
self._host + ":" + str(self._port) + \
self.HOST_SUFFIX
self._logger.debug("Sending discover request: " + request)

start_time = time.time()
Expand Down
Empty file.
151 changes: 151 additions & 0 deletions AWSIoTPythonSDK/core/jobs/thingJobManager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# /*
# * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# *
# * Licensed under the Apache License, Version 2.0 (the "License").
# * You may not use this file except in compliance with the License.
# * A copy of the License is located at
# *
# * http://aws.amazon.com/apache2.0
# *
# * or in the "license" file accompanying this file. This file is distributed
# * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# * express or implied. See the License for the specific language governing
# * permissions and limitations under the License.
# */

import json

_BASE_THINGS_TOPIC = "$aws/things/"
_NOTIFY_OPERATION = "notify"
_NOTIFY_NEXT_OPERATION = "notify-next"
_GET_OPERATION = "get"
_START_NEXT_OPERATION = "start-next"
_WILDCARD_OPERATION = "+"
_UPDATE_OPERATION = "update"
_ACCEPTED_REPLY = "accepted"
_REJECTED_REPLY = "rejected"
_WILDCARD_REPLY = "#"

#Members of this enum are tuples
_JOB_ID_REQUIRED_INDEX = 1
_JOB_OPERATION_INDEX = 2

_STATUS_KEY = 'status'
_STATUS_DETAILS_KEY = 'statusDetails'
_EXPECTED_VERSION_KEY = 'expectedVersion'
_EXEXCUTION_NUMBER_KEY = 'executionNumber'
_INCLUDE_JOB_EXECUTION_STATE_KEY = 'includeJobExecutionState'
_INCLUDE_JOB_DOCUMENT_KEY = 'includeJobDocument'
_CLIENT_TOKEN_KEY = 'clientToken'

#The type of job topic.
class jobExecutionTopicType(object):
JOB_UNRECOGNIZED_TOPIC = (0, False, '')
JOB_GET_PENDING_TOPIC = (1, False, _GET_OPERATION)
JOB_START_NEXT_TOPIC = (2, False, _START_NEXT_OPERATION)
JOB_DESCRIBE_TOPIC = (3, True, _GET_OPERATION)
JOB_UPDATE_TOPIC = (4, True, _UPDATE_OPERATION)
JOB_NOTIFY_TOPIC = (5, False, _NOTIFY_OPERATION)
JOB_NOTIFY_NEXT_TOPIC = (6, False, _NOTIFY_NEXT_OPERATION)
JOB_WILDCARD_TOPIC = (7, False, _WILDCARD_OPERATION)

#Members of this enum are tuples
_JOB_SUFFIX_INDEX = 1
#The type of reply topic, or #JOB_REQUEST_TYPE for topics that are not replies.
class jobExecutionTopicReplyType(object):
JOB_UNRECOGNIZED_TOPIC_TYPE = (0, '')
JOB_REQUEST_TYPE = (1, '')
JOB_ACCEPTED_REPLY_TYPE = (2, '/' + _ACCEPTED_REPLY)
JOB_REJECTED_REPLY_TYPE = (3, '/' + _REJECTED_REPLY)
JOB_WILDCARD_REPLY_TYPE = (4, '/' + _WILDCARD_REPLY)

_JOB_STATUS_INDEX = 1
class jobExecutionStatus(object):
JOB_EXECUTION_STATUS_NOT_SET = (0, None)
JOB_EXECUTION_QUEUED = (1, 'QUEUED')
JOB_EXECUTION_IN_PROGRESS = (2, 'IN_PROGRESS')
JOB_EXECUTION_FAILED = (3, 'FAILED')
JOB_EXECUTION_SUCCEEDED = (4, 'SUCCEEDED')
JOB_EXECUTION_CANCELED = (5, 'CANCELED')
JOB_EXECUTION_REJECTED = (6, 'REJECTED')
JOB_EXECUTION_UNKNOWN_STATUS = (99, None)

def _getExecutionStatus(jobStatus):
try:
return jobStatus[_JOB_STATUS_INDEX]
except KeyError:
return None

def _isWithoutJobIdTopicType(srcJobExecTopicType):
return (srcJobExecTopicType == jobExecutionTopicType.JOB_GET_PENDING_TOPIC or srcJobExecTopicType == jobExecutionTopicType.JOB_START_NEXT_TOPIC
or srcJobExecTopicType == jobExecutionTopicType.JOB_NOTIFY_TOPIC or srcJobExecTopicType == jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC)

class thingJobManager:
def __init__(self, thingName, clientToken = None):
self._thingName = thingName
self._clientToken = clientToken

def getJobTopic(self, srcJobExecTopicType, srcJobExecTopicReplyType=jobExecutionTopicReplyType.JOB_REQUEST_TYPE, jobId=None):
if self._thingName is None:
return None

#Verify topics that only support request type, actually have request type specified for reply
if (srcJobExecTopicType == jobExecutionTopicType.JOB_NOTIFY_TOPIC or srcJobExecTopicType == jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC) and srcJobExecTopicReplyType != jobExecutionTopicReplyType.JOB_REQUEST_TYPE:
return None

#Verify topics that explicitly do not want a job ID do not have one specified
if (jobId is not None and _isWithoutJobIdTopicType(srcJobExecTopicType)):
return None

#Verify job ID is present if the topic requires one
if jobId is None and srcJobExecTopicType[_JOB_ID_REQUIRED_INDEX]:
return None

#Ensure the job operation is a non-empty string
if srcJobExecTopicType[_JOB_OPERATION_INDEX] == '':
return None

if srcJobExecTopicType[_JOB_ID_REQUIRED_INDEX]:
return '{0}{1}/jobs/{2}/{3}{4}'.format(_BASE_THINGS_TOPIC, self._thingName, str(jobId), srcJobExecTopicType[_JOB_OPERATION_INDEX], srcJobExecTopicReplyType[_JOB_SUFFIX_INDEX])
elif srcJobExecTopicType == jobExecutionTopicType.JOB_WILDCARD_TOPIC:
return '{0}{1}/jobs/#'.format(_BASE_THINGS_TOPIC, self._thingName)
else:
return '{0}{1}/jobs/{2}{3}'.format(_BASE_THINGS_TOPIC, self._thingName, srcJobExecTopicType[_JOB_OPERATION_INDEX], srcJobExecTopicReplyType[_JOB_SUFFIX_INDEX])

def serializeJobExecutionUpdatePayload(self, status, statusDetails=None, expectedVersion=0, executionNumber=0, includeJobExecutionState=False, includeJobDocument=False):
executionStatus = _getExecutionStatus(status)
if executionStatus is None:
return None
payload = {_STATUS_KEY: executionStatus}
if statusDetails:
payload[_STATUS_DETAILS_KEY] = statusDetails
if expectedVersion > 0:
payload[_EXPECTED_VERSION_KEY] = str(expectedVersion)
if executionNumber > 0:
payload[_EXEXCUTION_NUMBER_KEY] = str(executionNumber)
if includeJobExecutionState:
payload[_INCLUDE_JOB_EXECUTION_STATE_KEY] = True
if includeJobDocument:
payload[_INCLUDE_JOB_DOCUMENT_KEY] = True
if self._clientToken is not None:
payload[_CLIENT_TOKEN_KEY] = self._clientToken
return json.dumps(payload)

def serializeDescribeJobExecutionPayload(self, executionNumber=0, includeJobDocument=True):
payload = {_INCLUDE_JOB_DOCUMENT_KEY: includeJobDocument}
if executionNumber > 0:
payload[_EXEXCUTION_NUMBER_KEY] = executionNumber
if self._clientToken is not None:
payload[_CLIENT_TOKEN_KEY] = self._clientToken
return json.dumps(payload)

def serializeStartNextPendingJobExecutionPayload(self, statusDetails=None):
payload = {}
if self._clientToken is not None:
payload[_CLIENT_TOKEN_KEY] = self._clientToken
if statusDetails is not None:
payload[_STATUS_DETAILS_KEY] = statusDetails
return json.dumps(payload)

def serializeClientTokenPayload(self):
return json.dumps({_CLIENT_TOKEN_KEY: self._clientToken}) if self._clientToken is not None else '{}'
67 changes: 67 additions & 0 deletions AWSIoTPythonSDK/core/protocol/connection/alpn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# /*
# * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# *
# * Licensed under the Apache License, Version 2.0 (the "License").
# * You may not use this file except in compliance with the License.
# * A copy of the License is located at
# *
# * http://aws.amazon.com/apache2.0
# *
# * or in the "license" file accompanying this file. This file is distributed
# * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# * express or implied. See the License for the specific language governing
# * permissions and limitations under the License.
# */


try:
import ssl
except:
ssl = None


class SSLContextBuilder(object):

def __init__(self):
self.check_supportability()
self._ssl_context = ssl.create_default_context()

def check_supportability(self):
if ssl is None:
raise RuntimeError("This platform has no SSL/TLS.")
if not hasattr(ssl, "SSLContext"):
raise NotImplementedError("This platform does not support SSLContext. Python 2.7.10+/3.5+ is required.")
if not hasattr(ssl.SSLContext, "set_alpn_protocols"):
raise NotImplementedError("This platform does not support ALPN as TLS extensions. Python 2.7.10+/3.5+ is required.")

def with_protocol(self, protocol):
self._ssl_context.protocol = protocol
return self

def with_ca_certs(self, ca_certs):
self._ssl_context.load_verify_locations(ca_certs)
return self

def with_cert_key_pair(self, cert_file, key_file):
self._ssl_context.load_cert_chain(cert_file, key_file)
return self

def with_cert_reqs(self, cert_reqs):
self._ssl_context.verify_mode = cert_reqs
return self

def with_check_hostname(self, check_hostname):
self._ssl_context.check_hostname = check_hostname
return self

def with_ciphers(self, ciphers):
if ciphers is not None:
self._ssl_context.set_ciphers(ciphers) # set_ciphers() does not allow None input. Use default (do nothing) if None
return self

def with_alpn_protocols(self, alpn_protocols):
self._ssl_context.set_alpn_protocols(alpn_protocols)
return self

def build(self):
return self._ssl_context
41 changes: 33 additions & 8 deletions AWSIoTPythonSDK/core/protocol/connection/cores.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
# when to increase it and when to reset it.


import re
import sys
import ssl
import errno
import struct
import socket
import base64
Expand All @@ -30,6 +32,7 @@
from datetime import datetime
import hashlib
import hmac
from AWSIoTPythonSDK.exception.AWSIoTExceptions import ClientError
from AWSIoTPythonSDK.exception.AWSIoTExceptions import wssNoKeyInEnvironmentError
from AWSIoTPythonSDK.exception.AWSIoTExceptions import wssHandShakeError
from AWSIoTPythonSDK.core.protocol.internal.defaults import DEFAULT_CONNECT_DISCONNECT_TIMEOUT_SEC
Expand Down Expand Up @@ -240,12 +243,13 @@ def createWebsocketEndpoint(self, host, port, region, method, awsServiceName, pa
amazonDateSimple = amazonDate[0] # Unicode in 3.x
amazonDateComplex = amazonDate[1] # Unicode in 3.x
allKeys = self._checkIAMCredentials() # Unicode in 3.x
hasCredentialsNecessaryForWebsocket = "aws_access_key_id" in allKeys.keys() and "aws_secret_access_key" in allKeys.keys()
if not hasCredentialsNecessaryForWebsocket:
return ""
if not self._hasCredentialsNecessaryForWebsocket(allKeys):
raise wssNoKeyInEnvironmentError()
else:
# Because of self._hasCredentialsNecessaryForWebsocket(...), keyID and secretKey should not be None from here
keyID = allKeys["aws_access_key_id"]
secretKey = allKeys["aws_secret_access_key"]
# amazonDateSimple and amazonDateComplex are guaranteed not to be None
queryParameters = "X-Amz-Algorithm=AWS4-HMAC-SHA256" + \
"&X-Amz-Credential=" + keyID + "%2F" + amazonDateSimple + "%2F" + region + "%2F" + awsServiceName + "%2Faws4_request" + \
"&X-Amz-Date=" + amazonDateComplex + \
Expand All @@ -264,12 +268,23 @@ def createWebsocketEndpoint(self, host, port, region, method, awsServiceName, pa
# generate url
url = "wss://" + host + ":" + str(port) + path + '?' + queryParameters + "&X-Amz-Signature=" + signature
# See if we have STS token, if we do, add it
if "aws_session_token" in allKeys.keys():
awsSessionTokenCandidate = allKeys.get("aws_session_token")
if awsSessionTokenCandidate is not None and len(awsSessionTokenCandidate) != 0:
aws_session_token = allKeys["aws_session_token"]
url += "&X-Amz-Security-Token=" + quote(aws_session_token.encode("utf-8")) # Unicode in 3.x
self._logger.debug("createWebsocketEndpoint: Websocket URL: " + url)
return url

def _hasCredentialsNecessaryForWebsocket(self, allKeys):
awsAccessKeyIdCandidate = allKeys.get("aws_access_key_id")
awsSecretAccessKeyCandidate = allKeys.get("aws_secret_access_key")
# None value is NOT considered as valid entries
validEntries = awsAccessKeyIdCandidate is not None and awsAccessKeyIdCandidate is not None
if validEntries:
# Empty value is NOT considered as valid entries
validEntries &= (len(awsAccessKeyIdCandidate) != 0 and len(awsSecretAccessKeyCandidate) != 0)
return validEntries


# This is an internal class that buffers the incoming bytes into an
# internal buffer until it gets the full desired length of bytes.
Expand Down Expand Up @@ -305,6 +320,10 @@ def read(self, numberOfBytesToBeBuffered):
while self._remainedLength > 0: # Read in a loop, always try to read in the remained length
# If the data is temporarily not available, socket.error will be raised and catched by paho
dataChunk = self._sslSocket.read(self._remainedLength)
# There is a chance where the server terminates the connection without closing the socket.
# If that happens, let's raise an exception and enter the reconnect flow.
if not dataChunk:
raise socket.error(errno.ECONNABORTED, 0)
self._internalBuffer.extend(dataChunk) # Buffer the data
self._remainedLength -= len(dataChunk) # Update the remained length

Expand Down Expand Up @@ -411,6 +430,8 @@ def __init__(self, socket, hostAddress, portNumber, AWSAccessKeyID="", AWSSecret
raise ValueError("No Access Key/KeyID Error")
except wssHandShakeError:
raise ValueError("Websocket Handshake Error")
except ClientError as e:
raise ValueError(e.message)
# Now we have a socket with secured websocket...
self._bufferedReader = _BufferedReader(self._sslSocket)
self._bufferedWriter = _BufferedWriter(self._sslSocket)
Expand Down Expand Up @@ -461,11 +482,12 @@ def _verifyWSSAcceptKey(self, srcAcceptKey, clientKey):

def _handShake(self, hostAddress, portNumber):
CRLF = "\r\n"
hostAddressChunks = hostAddress.split('.') # <randomString>.iot.<region>.amazonaws.com
region = hostAddressChunks[2] # XXXX.<region>.beta
IOT_ENDPOINT_PATTERN = r"^[0-9a-zA-Z]+\.iot\.(.*)\.amazonaws\..*"
matched = re.compile(IOT_ENDPOINT_PATTERN).match(hostAddress)
if not matched:
raise ClientError("Invalid endpoint pattern for wss: %s" % hostAddress)
region = matched.group(1)
signedURL = self._sigV4Handler.createWebsocketEndpoint(hostAddress, portNumber, region, "GET", "iotdata", "/mqtt")
if signedURL == "":
raise wssNoKeyInEnvironmentError()
# Now we got a signedURL
path = signedURL[signedURL.index("/mqtt"):]
# Assemble HTTP request headers
Expand Down Expand Up @@ -667,6 +689,9 @@ def close(self):
self._sslSocket.close()
self._sslSocket = None

def getpeercert(self):
return self._sslSocket.getpeercert()

def getSSLSocket(self):
if self._connectStatus != self._WebsocketDisconnected:
return self._sslSocket
Expand Down
3 changes: 3 additions & 0 deletions AWSIoTPythonSDK/core/protocol/internal/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def set_endpoint_provider(self, endpoint_provider):
def configure_last_will(self, topic, payload, qos, retain=False):
self._paho_client.will_set(topic, payload, qos, retain)

def configure_alpn_protocols(self, alpn_protocols):
self._paho_client.config_alpn_protocols(alpn_protocols)

def clear_last_will(self):
self._paho_client.will_clear()

Expand Down
3 changes: 2 additions & 1 deletion AWSIoTPythonSDK/core/protocol/internal/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
DEFAULT_CONNECT_DISCONNECT_TIMEOUT_SEC = 30
DEFAULT_OPERATION_TIMEOUT_SEC = 5
DEFAULT_DRAINING_INTERNAL_SEC = 0.5
METRICS_PREFIX = "?SDK=Python&Version="
METRICS_PREFIX = "?SDK=Python&Version="
ALPN_PROTCOLS = "x-amzn-mqtt-ca"
Loading

0 comments on commit 832f074

Please sign in to comment.