Group: com.netflix.conductor
Published Artifact | Description |
---|---|
conductor-task | Community contributed tasks |
Note: If you are using condutor-contribs
as a dependency, the task module is already included, you do not need to include it separately.
JSON_JQ_TRANSFORM_TASK is a System task that allows processing of JSON data that is supplied to the task, by using the popular JQ processing tool’s query expression language.
"type" : "JSON_JQ_TRANSFORM_TASK"
Check the JQ Manual, and the JQ Playground for more information on JQ
JSON is a popular format of choice for data-interchange. It is widely used in web and server applications, document storage, API I/O etc. It’s also used within Conductor to define workflow and task definitions and passing data and state between tasks and workflows. This makes a tool like JQ a natural fit for processing task related data. Some common usages within Conductor includes, working with HTTP task, JOIN tasks or standalone tasks that try to transform data from the output of one task to the input of another.
Here is an example of a JSON_JQ_TRANSFORM
task. The inputParameters
attribute is expected to have a value object
that has the following
-
A list of key value pair objects denoted key1/value1, key2/value2 in the example below. Note the key1/value1 are arbitrary names used in this example.
-
A key with the name
queryExpression
, whose value is a JQ expression. The expression will operate on the value of theinputParameters
attribute. In the example below, theinputParameters
has 2 inner objects named by attributeskey1
andkey2
, each of which has an object that is namedvalue1
andvalue2
. They have an associated array of strings as values,"a", "b"
and"c", "d"
. The expressionkey3: (.key1.value1 + .key2.value2)
concats the 2 string arrays into a single array against an attribute namedkey3
{
"name": "jq_example_task",
"taskReferenceName": "my_jq_example_task",
"type": "JSON_JQ_TRANSFORM",
"inputParameters": {
"key1": {
"value1": [
"a",
"b"
]
},
"key2": {
"value2": [
"c",
"d"
]
},
"queryExpression": "{ key3: (.key1.value1 + .key2.value2) }"
}
}
The execution of this example task above will provide the following output. The resultList
attribute stores the full
list of the queryExpression
result. The result
attribute stores the first element of the resultList. An
optional error
attribute along with a string message will be returned if there was an error processing the query expression.
{
"result": {
"key3": [
"a",
"b",
"c",
"d"
]
},
"resultList": [
{
"key3": [
"a",
"b",
"c",
"d"
]
}
]
}
Attribute | Description |
---|---|
name | Task Name. A unique name that is descriptive of the task function |
taskReferenceName | Task Reference Name. A unique reference to this task. There can be multiple references of a task within the same workflow definition |
type | Task Type. In this case, JSON_JQ_TRANSFORM |
inputParameters | The input parameters that will be supplied to this task. The parameters will be a JSON object of atleast 2 attributes, one of which will be called queryExpression. The others are user named attributes. These attributes will be accessible by the JQ query processor |
inputParameters/user-defined-key(s) | User defined key(s) along with values. |
inputParameters/queryExpression | A JQ query expression |
Attribute | Description |
---|---|
result | The first results returned by the JQ expression |
resultList | A List of results returned by the JQ expression |
error | An optional error message, indicating that the JQ query failed processing |
A Kafka Publish task is used to push messages to another microservice via Kafka.
"type" : "KAFKA_PUBLISH"
Sample Task
{
"name": "call_kafka",
"taskReferenceName": "call_kafka",
"inputParameters": {
"kafka_request": {
"topic": "userTopic",
"value": "Message to publish",
"bootStrapServers": "localhost:9092",
"headers": {
"x-Auth":"Auth-key"
},
"key": "123",
"keySerializer": "org.apache.kafka.common.serialization.IntegerSerializer"
}
},
"type": "KAFKA_PUBLISH"
}
The task expects an input parameter named "kafka_request"
as part
of the task's input with the following details:
"bootStrapServers"
- bootStrapServers for connecting to given kafka."key"
- Key to be published."keySerializer"
- Serializer used for serializing the key published to kafka. One of the following can be set : a. org.apache.kafka.common.serialization.IntegerSerializer b. org.apache.kafka.common.serialization.LongSerializer c. org.apache.kafka.common.serialization.StringSerializer. Default is String serializer."value"
- Value published to kafka"requestTimeoutMs"
- Request timeout while publishing to kafka. If this value is not given the value is read from the property kafka.publish.request.timeout.ms. If the property is not set the value defaults to 100 ms."maxBlockMs"
- maxBlockMs while publishing to kafka. If this value is not given the value is read from the property kafka.publish.max.block.ms. If the property is not set the value defaults to 500 ms."headers"
- A map of additional kafka headers to be sent along with the request."topic"
- Topic to publish.
The producer created in the kafka task is cached. By default the cache size is 10 and expiry time is 120000 ms. To change the defaults following can be modified kafka.publish.producer.cache.size, kafka.publish.producer.cache.time.ms respectively.
Task status transitions to COMPLETED
.
The task is marked as FAILED
if the message could not be published to
the Kafka queue.