Skip to content

Commit

Permalink
Experimental Kafka Trigger Implementation (#2270)
Browse files Browse the repository at this point in the history
  • Loading branch information
rorymckinley authored Jul 17, 2024
1 parent 68958bb commit 1dd5450
Show file tree
Hide file tree
Showing 66 changed files with 6,827 additions and 36 deletions.
5 changes: 4 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@
# USAGE_TRACKING_RESUBMISSION_BATCH_SIZE=10

# OpenFn.org hosts a public sandbox that gets reset every night. If you'd like to
# make your instance "resettable" (a highly descrutive action—this destroys all
# make your instance "resettable" (a highly destructive action—this destroys all
# data in your instance) you can set the following environment variable to "yes"
# IS_RESETTABLE_DEMO=no

# Set to 'yes' to enable the *experimental* Kakfa Trigger functionality.
# KAFKA_TRIGGERS_ENABLED=no
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ and this project adheres to
## [Unreleased]

### Added
- Add experimental support for triggers that consume message from a Kafka
cluster [#1801](https://github.com/OpenFn/lightning/issues/1801)

### Changed

Expand Down
10 changes: 10 additions & 0 deletions DEPLOYMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ The following environment variables are required:
- `OPENAI_API_KEY` - your OpenAI API key.
- `APOLLO_ENDPOINT` - the endpoint for the OpenFn Apollo AI service.

Lightning workflows can be configured with a trigger that will consume messages
from a Kafka Cluster. By default this is disabled and you will not see the
option to create a Kafka trigger in the UI, nor will the Kafka consumer groups
be running.

To enable this feature set the `KAFKA_TRIGGERS_ENABLED` to `yes` and restart
Lightning. Please note that, if you enable this feature and then create some
Kafka triggers and then disable the feature, you will not be able to edit any
triggers created before the feature was disabled.

### Other config

- `ADAPTORS_PATH` - where you store your locally installed adaptors
Expand Down
26 changes: 26 additions & 0 deletions assets/js/workflow-diagram/components/trigger-icons.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,29 @@ export const lockClosedIcon = (
</svg>

);

export const kafkaIcon = (
<svg viewBox="1 1 20 21" fill="none" xmlns="http://www.w3.org/2000/svg">
<path fillRule="evenodd" clipRule="evenodd" d="m15.54 12.97c-.68 0-1.3.25-1.78.67l-1.29-.75c.09-.29.13-.6.13-.92s-.04-.63-.1-.92l1.23-.73c.48.44 1.12.68 1.81.68 1.49 0 2.7-1.19 2.7-2.68s-1.21-2.69-2.7-2.69-2.7 1.21-2.7 2.7c0 .17.02.37.05.55l-1.25.72c-.43-.45-1-.78-1.64-.95v-1.39c1.09-.35 1.88-1.37 1.88-2.57 0-1.49-1.21-2.69-2.7-2.69s-2.68 1.2-2.68 2.69c0 1.2.76 2.21 1.84 2.57v1.4c-1.48.38-2.58 1.71-2.58 3.31s1.1 2.94 2.58 3.31v1.45c-1.08.36-1.84 1.37-1.84 2.57 0 1.49 1.19 2.7 2.68 2.7s2.7-1.21 2.7-2.7c0-1.2-.79-2.22-1.88-2.57v-1.44c.64-.16 1.2-.49 1.64-.94l1.26.73c-.04.19-.06.38-.06.58 0 1.49 1.21 2.7 2.7 2.7s2.7-1.21 2.7-2.7-1.21-2.69-2.7-2.69m0-5.97c.74 0 1.33.59 1.33 1.32s-.59 1.34-1.33 1.34-1.33-.6-1.33-1.34.59-1.32 1.33-1.32m-7.69-2.31c0-.74.59-1.34 1.33-1.34s1.32.6 1.32 1.34-.58 1.34-1.32 1.34-1.33-.6-1.33-1.34m2.65 14.61c0 .74-.58 1.34-1.32 1.34s-1.33-.6-1.33-1.34.59-1.34 1.33-1.34 1.32.6 1.32 1.34m-1.32-5.41c-1.06 0-1.92-.86-1.92-1.92s.86-1.92 1.92-1.92 1.92.86 1.92 1.92-.86 1.92-1.92 1.92m6.36 3.11c-.74 0-1.33-.6-1.33-1.34s.59-1.33 1.33-1.33 1.33.6 1.33 1.33-.59 1.34-1.33 1.34z" fill="#71757E" />
<defs>
<filter id="filter0_dd_365_7433" x="0" y="0" width="2000" height="2000" filterUnits="userSpaceOnUse" colorInterpolationFilters="sRGB">
<feFlood floodOpacity="0" result="BackgroundImageFix" />
<feColorMatrix in="SourceAlpha" type="matrix" values="0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 127 0" result="hardAlpha" />
<feMorphology radius="1" operator="erode" in="SourceAlpha" result="effect1_dropShadow_365_7433" />
<feOffset dy="4" />
<feGaussianBlur stdDeviation="3" />
<feComposite in2="hardAlpha" operator="out" />
<feColorMatrix type="matrix" values="0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0.1 0" />
<feBlend mode="normal" in2="BackgroundImageFix" result="effect1_dropShadow_365_7433" />
<feColorMatrix in="SourceAlpha" type="matrix" values="0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 127 0" result="hardAlpha" />
<feMorphology radius="1" operator="erode" in="SourceAlpha" result="effect2_dropShadow_365_7433" />
<feOffset dy="2" />
<feGaussianBlur stdDeviation="2" />
<feComposite in2="hardAlpha" operator="out" />
<feColorMatrix type="matrix" values="0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0.06 0" />
<feBlend mode="normal" in2="effect1_dropShadow_365_7433" result="effect2_dropShadow_365_7433" />
<feBlend mode="normal" in="SourceGraphic" in2="effect2_dropShadow_365_7433" result="shape" />
</filter>
</defs>
</svg>
);
9 changes: 8 additions & 1 deletion assets/js/workflow-diagram/nodes/Trigger.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import React, { memo } from 'react';
import { Position } from 'reactflow';
import { ClockIcon, GlobeAltIcon } from '@heroicons/react/24/outline';
import { lockClosedIcon } from '../components/trigger-icons';
import { kafkaIcon, lockClosedIcon } from '../components/trigger-icons';
import cronstrue from 'cronstrue';

import PlusButton from '../components/PlusButton';
Expand Down Expand Up @@ -56,6 +56,13 @@ function getTriggerMeta(trigger: Lightning.TriggerNode): TriggerMeta {
primaryIcon: <GlobeAltIcon />,
secondaryIcon: trigger.has_auth_method ? lockClosedIcon : null,
};
case 'kafka':
return {
label: 'Kafka trigger',
sublabel: `On each message consumed from the cluster`,
primaryIcon: kafkaIcon,
secondaryIcon: trigger.has_auth_method ? lockClosedIcon : null,
};
case 'cron':
try {
return {
Expand Down
8 changes: 7 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ base_cron = [
args: %{"type" => "weekly_project_digest"}},
{"0 10 1 * *", Lightning.DigestEmailWorker,
args: %{"type" => "monthly_project_digest"}},
{"1 2 * * *", Lightning.Projects, args: %{"type" => "data_retention"}}
{"1 2 * * *", Lightning.Projects, args: %{"type" => "data_retention"}},
{"*/10 * * * *", Lightning.KafkaTriggers.DuplicateTrackingCleanupWorker}
]

cleanup_cron =
Expand Down Expand Up @@ -509,4 +510,9 @@ config :lightning, :usage_tracking,
env!("USAGE_TRACKING_RESUBMISSION_BATCH_SIZE", :integer, 10),
daily_batch_size: env!("USAGE_TRACKING_DAILY_BATCH_SIZE", :integer, 10)

config :lightning, :kafka_triggers,
enabled: env!("KAFKA_TRIGGERS_ENABLED", &Utils.ensure_boolean/1, false),
duplicate_tracking_retention_seconds:
env!("KAFKA_DUPLICATE_TRACKING_RETENTION_SECONDS", :integer, 3600)

# ==============================================================================
Loading

0 comments on commit 1dd5450

Please sign in to comment.