Skip to content

Commit

Permalink
Add Workflow Collections Feature (#2569)
Browse files Browse the repository at this point in the history
- Introduce named and shared key-value storage for workflows.
- Optimize operations and ensure stability when collection names are fixed.
- Implement `stream_all` for comprehensive HGETALL/HSCAN-like queries.
- Enable wildcard prefix querying using `stream_match`, with pg_trigram and GIN indexing on keys for performance.
- Support ascending upsert sorting, employing a unique constraint for efficiency.
- Enhance the collections cursor and benchmark performance improvements.
- Provide CRUD UI for collections, including error handling, sorting, and comprehensive test coverage.
- Develop a streaming API for collections, with refined authentication and authorization mechanisms:
  - Abstract RSA key generation and standardize token signing.
  - Integrate user tokens and personal access tokens into a unified authentication approach.
- Streamlined response structures, improved item serialization, and optimized database interactions.
- Support POST, PUT, GET, and DELETE operations, ensuring robust and transactional behavior.
- Add timestamps to items and refine collection filters, with additional test cases to cover edge scenarios.
- Code cleanup, refactoring, and documentation updates.

---------

Co-authored-by: Stuart Corbishley <[email protected]>
Co-authored-by: Rogerio Pontual <[email protected]>
Co-authored-by: Elias W. BA <[email protected]>
Co-authored-by: Joe Clark <[email protected]>
  • Loading branch information
4 people authored Nov 7, 2024
1 parent d67c136 commit 020b523
Show file tree
Hide file tree
Showing 61 changed files with 3,704 additions and 233 deletions.
7 changes: 3 additions & 4 deletions .credo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,17 @@
{Credo.Check.Warning.WrongTestFileExtension, []},
# Controversial but included
{Credo.Check.Consistency.MultiAliasImportRequireUse, []},
{Credo.Check.Design.DuplicatedCode, []},
{Credo.Check.Readability.MultiAlias, []},
{Credo.Check.Readability.SeparateAliasRequire, []},
{Credo.Check.Readability.StrictModuleLayout, []},
# Checks scheduled for next check update (opt-in for now, will bump exit_status soon)
{Credo.Check.Consistency.UnusedVariableNames, [exit_status: 0]}
{Credo.Check.Readability.StrictModuleLayout, []}
],
disabled: [
#
# Controversial and experimental checks (opt-in, just move the check to `:enabled`
# and be sure to use `mix credo --strict` to see low priority checks)
#
{Credo.Check.Consistency.UnusedVariableNames, [exit_status: 0]},
{Credo.Check.Design.DuplicatedCode, []},
{Credo.Check.Design.SkipTestWithoutComment, []},
{Credo.Check.Readability.AliasAs, []},
{Credo.Check.Readability.BlockPipe, []},
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ and this project adheres to

### Added

- Adds a UI for managing collections
[#2567](https://github.com/OpenFn/lightning/issues/2567)
- Introduces collections, a programatic workflow data sharing resource.
[#2551](https://github.com/OpenFn/lightning/issues/2551)

### Changed

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# - https://hub.docker.com/r/hexpm/elixir/tags - for the build image
# - https://hub.docker.com/_/debian?tab=tags&page=1&name=bullseye-20210902-slim - for the release image
# - https://pkgs.org/ - resource for finding needed packages
# - Ex: hexpm/elixir:1.13.2-erlang-24.2.1-debian-bullseye-20210902-slim
# - Ex: hexpm/elixir:1.16.2-erlang-26.2.5-debian-bookworm-20240513
#
ARG ELIXIR_VERSION=1.16.2
ARG OTP_VERSION=26.2.5
Expand Down
34 changes: 17 additions & 17 deletions assets/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion assets/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"zustand": "^4.3.7"
},
"devDependencies": {
"@openfn/ws-worker": "^1.7.0",
"@openfn/ws-worker": "^1.8.0",
"@types/marked": "^4.0.8",
"@types/react": "^18.0.15",
"@types/react-dom": "^18.0.6",
Expand Down
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ config :lightning, :default_retention_period, nil

config :lightning, Lightning.Runtime.RuntimeManager, start: false

config :lightning, LightningWeb.CollectionsController, stream_limit: 1_000

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{config_env()}.exs"
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,5 @@ config :lightning, :github_app,
FaFp+DyAe+b4nDwuJaW2LURbr8AEZga7oQj0uYxcYw==
-----END RSA PRIVATE KEY-----
"""

config :lightning, LightningWeb.CollectionsController, stream_limit: 50
39 changes: 27 additions & 12 deletions lib/lightning/accounts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,22 @@ defmodule Lightning.Accounts do
Raises `Ecto.NoResultsError` if the User does not exist.
See `get_user/1`.
"""
def get_user!(id), do: Repo.get!(User, id)

@doc """
Gets a single user.
## Examples
iex> get_user!(123)
iex> get_user(123)
%User{}
iex> get_user!(456)
** (Ecto.NoResultsError)
nil
"""
def get_user!(id), do: Repo.get!(User, id)
def get_user(id), do: Repo.get(User, id)

@doc """
Gets a single token.
Expand Down Expand Up @@ -696,8 +702,8 @@ defmodule Lightning.Accounts do
Gets the user with the given signed token.
"""
def get_user_by_session_token(token) do
{:ok, query} = UserToken.verify_token_query(token, "session")
Repo.one(query)
UserToken.verify_token_query(token, "session")
|> Repo.one()
end

@doc """
Expand All @@ -723,8 +729,7 @@ defmodule Lightning.Accounts do
Checks if the given sudo token for the user is valid
"""
def sudo_session_token_valid?(user, token) do
{:ok, token_query} =
UserToken.verify_token_query(token, "sudo_session")
token_query = UserToken.verify_token_query(token, "sudo_session")

query = from t in token_query, where: t.user_id == ^user.id
Repo.exists?(query)
Expand Down Expand Up @@ -770,8 +775,8 @@ defmodule Lightning.Accounts do
Gets the user with the given signed token.
"""
def get_user_by_auth_token(token) do
{:ok, query} = UserToken.verify_token_query(token, "auth")
Repo.one(query)
UserToken.verify_token_query(token, "auth")
|> Repo.one()
end

@doc """
Expand All @@ -796,9 +801,19 @@ defmodule Lightning.Accounts do
@doc """
Gets the user with the given signed token.
"""
def get_user_by_api_token(claims) when is_map(claims) do
case claims do
%{sub: "user:" <> id} ->
Repo.get(User, id)

_ ->
nil
end
end

def get_user_by_api_token(token) do
{:ok, query} = UserToken.verify_token_query(token, "api")
Repo.one(query)
UserToken.verify_token_query(token, "api")
|> Repo.one()
end

@doc """
Expand Down
71 changes: 25 additions & 46 deletions lib/lightning/accounts/user_token.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ defmodule Lightning.Accounts.UserToken do
"""

use Lightning.Schema
use Joken.Config

import Ecto.Query

Expand Down Expand Up @@ -47,15 +46,6 @@ defmodule Lightning.Accounts.UserToken do
timestamps updated_at: false
end

def token_config do
default_claims(skip: [:exp])
|> add_claim(
"my_key",
fn -> "My custom claim" end,
&(&1 == "My custom claim")
)
end

@doc """
Generates a token that will be stored in a signed place,
such as session or cookie. As they are signed, those
Expand All @@ -65,9 +55,10 @@ defmodule Lightning.Accounts.UserToken do
{binary(), Ecto.Changeset.t(%__MODULE__{})}
def build_token(user, "api" = context) do
token =
Joken.generate_and_sign!(default_claims(skip: [:exp]), %{
"user_id" => user.id
})
Lightning.Tokens.PersonalAccessToken.generate_and_sign!(
%{"sub" => "user:#{user.id}"},
Lightning.Config.token_signer()
)

{token,
changeset(%__MODULE__{}, %{token: token, context: context, user_id: user.id})}
Expand Down Expand Up @@ -103,48 +94,36 @@ defmodule Lightning.Accounts.UserToken do
not expired (after @auth_validity_in_seconds or @session_validity_in_days).
"""
def verify_token_query(token, "auth" = context) do
query =
from(token in token_and_context_query(token, context),
join: user in assoc(token, :user),
where: token.inserted_at > ago(@auth_validity_in_seconds, "second"),
select: user
)

{:ok, query}
from(token in token_and_context_query(token, context),
join: user in assoc(token, :user),
where: token.inserted_at > ago(@auth_validity_in_seconds, "second"),
select: user
)
end

def verify_token_query(token, "api" = context) do
query =
from(token in token_and_context_query(token, context),
join: user in assoc(token, :user),
select: user
)

{:ok, query}
from(token in token_and_context_query(token, context),
join: user in assoc(token, :user),
select: user
)
end

def verify_token_query(token, "session" = context) do
query =
from(token in token_and_context_query(token, context),
join: user in assoc(token, :user),
where: token.inserted_at > ago(@session_validity_in_days, "day"),
select: user
)

{:ok, query}
from(token in token_and_context_query(token, context),
join: user in assoc(token, :user),
where: token.inserted_at > ago(@session_validity_in_days, "day"),
select: user
)
end

def verify_token_query(token, "sudo_session" = context) do
query =
from(token in token_and_context_query(token, context),
join: user in assoc(token, :user),
where:
token.inserted_at >
ago(@sudo_session_validity_in_seconds, "second"),
select: user
)

{:ok, query}
from(token in token_and_context_query(token, context),
join: user in assoc(token, :user),
where:
token.inserted_at >
ago(@sudo_session_validity_in_seconds, "second"),
select: user
)
end

@doc """
Expand Down
3 changes: 2 additions & 1 deletion lib/lightning/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ defmodule Lightning.Application do
adaptor_service_childspec,
{Lightning.TaskWorker, name: :cli_task_worker},
{Lightning.Runtime.RuntimeManager,
worker_secret: Lightning.Config.worker_secret()},
worker_secret: Lightning.Config.worker_secret(),
endpoint: LightningWeb.Endpoint},
{Lightning.KafkaTriggers.Supervisor, type: :supervisor}
# Start a worker by calling: Lightning.Worker.start_link(arg)
# {Lightning.Worker, arg}
Expand Down
Loading

0 comments on commit 020b523

Please sign in to comment.