diff --git a/.credo.exs b/.credo.exs index 446acb5bc6..2dfa19776b 100644 --- a/.credo.exs +++ b/.credo.exs @@ -168,18 +168,17 @@ {Credo.Check.Warning.WrongTestFileExtension, []}, # Controversial but included {Credo.Check.Consistency.MultiAliasImportRequireUse, []}, - {Credo.Check.Design.DuplicatedCode, []}, {Credo.Check.Readability.MultiAlias, []}, {Credo.Check.Readability.SeparateAliasRequire, []}, - {Credo.Check.Readability.StrictModuleLayout, []}, - # Checks scheduled for next check update (opt-in for now, will bump exit_status soon) - {Credo.Check.Consistency.UnusedVariableNames, [exit_status: 0]} + {Credo.Check.Readability.StrictModuleLayout, []} ], disabled: [ # # Controversial and experimental checks (opt-in, just move the check to `:enabled` # and be sure to use `mix credo --strict` to see low priority checks) # + {Credo.Check.Consistency.UnusedVariableNames, [exit_status: 0]}, + {Credo.Check.Design.DuplicatedCode, []}, {Credo.Check.Design.SkipTestWithoutComment, []}, {Credo.Check.Readability.AliasAs, []}, {Credo.Check.Readability.BlockPipe, []}, diff --git a/CHANGELOG.md b/CHANGELOG.md index 8ce343d8c4..2ab13a0fb2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ and this project adheres to ### Added +- Adds a UI for managing collections + [#2567](https://github.com/OpenFn/lightning/issues/2567) +- Introduces collections, a programatic workflow data sharing resource. + [#2551](https://github.com/OpenFn/lightning/issues/2551) + ### Changed ### Fixed diff --git a/Dockerfile b/Dockerfile index 1d9ee7a1d8..fdfdf80891 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ # - https://hub.docker.com/r/hexpm/elixir/tags - for the build image # - https://hub.docker.com/_/debian?tab=tags&page=1&name=bullseye-20210902-slim - for the release image # - https://pkgs.org/ - resource for finding needed packages -# - Ex: hexpm/elixir:1.13.2-erlang-24.2.1-debian-bullseye-20210902-slim +# - Ex: hexpm/elixir:1.16.2-erlang-26.2.5-debian-bookworm-20240513 # ARG ELIXIR_VERSION=1.16.2 ARG OTP_VERSION=26.2.5 diff --git a/assets/package-lock.json b/assets/package-lock.json index a92abc81a8..d4b0a5343d 100644 --- a/assets/package-lock.json +++ b/assets/package-lock.json @@ -34,7 +34,7 @@ "zustand": "^4.3.7" }, "devDependencies": { - "@openfn/ws-worker": "^1.7.0", + "@openfn/ws-worker": "^1.8.0", "@types/marked": "^4.0.8", "@types/react": "^18.0.15", "@types/react-dom": "^18.0.6", @@ -568,9 +568,9 @@ } }, "node_modules/@openfn/compiler": { - "version": "0.3.3", - "resolved": "https://registry.npmjs.org/@openfn/compiler/-/compiler-0.3.3.tgz", - "integrity": "sha512-aL7dgUDhz6jrRpnsuk1YJyud7vx3TGIzfDSoNP4J6T1l542jF7fGQ46pJj72Jejl4kycM5MYRKvhtAKPfeFZTw==", + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/@openfn/compiler/-/compiler-0.4.0.tgz", + "integrity": "sha512-oMBlaippanMkbWLsEZssxoz+OFOS45Ts1++jCf7fSS3KJUXjeuIGK+xSQPuXRcgRwL81lCbkJsF/MtBeR2a2eQ==", "dev": true, "dependencies": { "@openfn/describe-package": "0.1.2", @@ -600,16 +600,16 @@ } }, "node_modules/@openfn/engine-multi": { - "version": "1.3.0", - "resolved": "https://registry.npmjs.org/@openfn/engine-multi/-/engine-multi-1.3.0.tgz", - "integrity": "sha512-ZZI9zc/zIjt1VSRLysTqhvHubb2WQqVjtFiDht/p8umW9QIf2GcTLfCfaZYFgOsDlBxJpzoXWCu/tYcg+L40SQ==", + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/@openfn/engine-multi/-/engine-multi-1.4.1.tgz", + "integrity": "sha512-mzoK7iYaNLGmACE30iCQzK2/qt56wU7+60fMMx1PrQA4sG4hxCMqnnrYnF7qjTFkwuNH1++4RfkcxOhVd8fSPg==", "dev": true, "dependencies": { - "@openfn/compiler": "0.3.3", + "@openfn/compiler": "0.4.0", "@openfn/language-common": "2.0.0-rc3", "@openfn/lexicon": "^1.1.0", "@openfn/logger": "1.0.2", - "@openfn/runtime": "1.4.2", + "@openfn/runtime": "1.5.1", "fast-safe-stringify": "^2.1.1" } }, @@ -641,9 +641,9 @@ } }, "node_modules/@openfn/runtime": { - "version": "1.4.2", - "resolved": "https://registry.npmjs.org/@openfn/runtime/-/runtime-1.4.2.tgz", - "integrity": "sha512-JLrOg3iUZN8zSlTv9IQMjE5RAPqvG0+bKvnrRrgLFM08JpTuclGzQTnzi+019lTA5B2r/zScKen1gqH6KbMSTQ==", + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/@openfn/runtime/-/runtime-1.5.1.tgz", + "integrity": "sha512-Gy91JcjEqcN/u9xRY9Ur4FQJ8c2xseFibE5n+oDqHhb8bti6QWlOlwrywkfg6rs19FcNeveErWflZceimkgSFA==", "dev": true, "dependencies": { "@openfn/logger": "1.0.2", @@ -652,16 +652,16 @@ } }, "node_modules/@openfn/ws-worker": { - "version": "1.7.0", - "resolved": "https://registry.npmjs.org/@openfn/ws-worker/-/ws-worker-1.7.0.tgz", - "integrity": "sha512-0vCu9pNvsVE/EwhfyW5I04cHHk7OUVbTavoZgbKEViIeIQL0F8pNes5PD1a/bBW7NdYuMLEd74ybLZjlmEeIUw==", + "version": "1.8.1", + "resolved": "https://registry.npmjs.org/@openfn/ws-worker/-/ws-worker-1.8.1.tgz", + "integrity": "sha512-4DkO7gn67DlwKQhT85UCLM5A8Ac5bUR9tdwswEUSLgfeWK5/VJpwNAVJzg2EBEjUFx/+MZb+CfXZ63Q8XhLMTQ==", "dev": true, "dependencies": { "@koa/router": "^12.0.0", - "@openfn/engine-multi": "1.3.0", + "@openfn/engine-multi": "1.4.1", "@openfn/lexicon": "^1.1.0", "@openfn/logger": "1.0.2", - "@openfn/runtime": "1.4.2", + "@openfn/runtime": "1.5.1", "@types/koa-logger": "^3.1.2", "@types/ws": "^8.5.6", "fast-safe-stringify": "^2.1.1", diff --git a/assets/package.json b/assets/package.json index b7f0e6f6ff..d7cc4e7bcc 100644 --- a/assets/package.json +++ b/assets/package.json @@ -36,7 +36,7 @@ "zustand": "^4.3.7" }, "devDependencies": { - "@openfn/ws-worker": "^1.7.0", + "@openfn/ws-worker": "^1.8.0", "@types/marked": "^4.0.8", "@types/react": "^18.0.15", "@types/react-dom": "^18.0.6", diff --git a/config/config.exs b/config/config.exs index b7bb8d38cd..30c18fcfff 100644 --- a/config/config.exs +++ b/config/config.exs @@ -144,6 +144,8 @@ config :lightning, :default_retention_period, nil config :lightning, Lightning.Runtime.RuntimeManager, start: false +config :lightning, LightningWeb.CollectionsController, stream_limit: 1_000 + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{config_env()}.exs" diff --git a/config/test.exs b/config/test.exs index 1300902c31..bf91361612 100644 --- a/config/test.exs +++ b/config/test.exs @@ -161,3 +161,5 @@ config :lightning, :github_app, FaFp+DyAe+b4nDwuJaW2LURbr8AEZga7oQj0uYxcYw== -----END RSA PRIVATE KEY----- """ + +config :lightning, LightningWeb.CollectionsController, stream_limit: 50 diff --git a/lib/lightning/accounts.ex b/lib/lightning/accounts.ex index 9f5b2b543f..7caa061057 100644 --- a/lib/lightning/accounts.ex +++ b/lib/lightning/accounts.ex @@ -159,16 +159,22 @@ defmodule Lightning.Accounts do Raises `Ecto.NoResultsError` if the User does not exist. + See `get_user/1`. + """ + def get_user!(id), do: Repo.get!(User, id) + + @doc """ + Gets a single user. + ## Examples - iex> get_user!(123) + iex> get_user(123) %User{} iex> get_user!(456) - ** (Ecto.NoResultsError) - + nil """ - def get_user!(id), do: Repo.get!(User, id) + def get_user(id), do: Repo.get(User, id) @doc """ Gets a single token. @@ -696,8 +702,8 @@ defmodule Lightning.Accounts do Gets the user with the given signed token. """ def get_user_by_session_token(token) do - {:ok, query} = UserToken.verify_token_query(token, "session") - Repo.one(query) + UserToken.verify_token_query(token, "session") + |> Repo.one() end @doc """ @@ -723,8 +729,7 @@ defmodule Lightning.Accounts do Checks if the given sudo token for the user is valid """ def sudo_session_token_valid?(user, token) do - {:ok, token_query} = - UserToken.verify_token_query(token, "sudo_session") + token_query = UserToken.verify_token_query(token, "sudo_session") query = from t in token_query, where: t.user_id == ^user.id Repo.exists?(query) @@ -770,8 +775,8 @@ defmodule Lightning.Accounts do Gets the user with the given signed token. """ def get_user_by_auth_token(token) do - {:ok, query} = UserToken.verify_token_query(token, "auth") - Repo.one(query) + UserToken.verify_token_query(token, "auth") + |> Repo.one() end @doc """ @@ -796,9 +801,19 @@ defmodule Lightning.Accounts do @doc """ Gets the user with the given signed token. """ + def get_user_by_api_token(claims) when is_map(claims) do + case claims do + %{sub: "user:" <> id} -> + Repo.get(User, id) + + _ -> + nil + end + end + def get_user_by_api_token(token) do - {:ok, query} = UserToken.verify_token_query(token, "api") - Repo.one(query) + UserToken.verify_token_query(token, "api") + |> Repo.one() end @doc """ diff --git a/lib/lightning/accounts/user_token.ex b/lib/lightning/accounts/user_token.ex index d079c35108..f4d8a7895e 100644 --- a/lib/lightning/accounts/user_token.ex +++ b/lib/lightning/accounts/user_token.ex @@ -18,7 +18,6 @@ defmodule Lightning.Accounts.UserToken do """ use Lightning.Schema - use Joken.Config import Ecto.Query @@ -47,15 +46,6 @@ defmodule Lightning.Accounts.UserToken do timestamps updated_at: false end - def token_config do - default_claims(skip: [:exp]) - |> add_claim( - "my_key", - fn -> "My custom claim" end, - &(&1 == "My custom claim") - ) - end - @doc """ Generates a token that will be stored in a signed place, such as session or cookie. As they are signed, those @@ -65,9 +55,10 @@ defmodule Lightning.Accounts.UserToken do {binary(), Ecto.Changeset.t(%__MODULE__{})} def build_token(user, "api" = context) do token = - Joken.generate_and_sign!(default_claims(skip: [:exp]), %{ - "user_id" => user.id - }) + Lightning.Tokens.PersonalAccessToken.generate_and_sign!( + %{"sub" => "user:#{user.id}"}, + Lightning.Config.token_signer() + ) {token, changeset(%__MODULE__{}, %{token: token, context: context, user_id: user.id})} @@ -103,48 +94,36 @@ defmodule Lightning.Accounts.UserToken do not expired (after @auth_validity_in_seconds or @session_validity_in_days). """ def verify_token_query(token, "auth" = context) do - query = - from(token in token_and_context_query(token, context), - join: user in assoc(token, :user), - where: token.inserted_at > ago(@auth_validity_in_seconds, "second"), - select: user - ) - - {:ok, query} + from(token in token_and_context_query(token, context), + join: user in assoc(token, :user), + where: token.inserted_at > ago(@auth_validity_in_seconds, "second"), + select: user + ) end def verify_token_query(token, "api" = context) do - query = - from(token in token_and_context_query(token, context), - join: user in assoc(token, :user), - select: user - ) - - {:ok, query} + from(token in token_and_context_query(token, context), + join: user in assoc(token, :user), + select: user + ) end def verify_token_query(token, "session" = context) do - query = - from(token in token_and_context_query(token, context), - join: user in assoc(token, :user), - where: token.inserted_at > ago(@session_validity_in_days, "day"), - select: user - ) - - {:ok, query} + from(token in token_and_context_query(token, context), + join: user in assoc(token, :user), + where: token.inserted_at > ago(@session_validity_in_days, "day"), + select: user + ) end def verify_token_query(token, "sudo_session" = context) do - query = - from(token in token_and_context_query(token, context), - join: user in assoc(token, :user), - where: - token.inserted_at > - ago(@sudo_session_validity_in_seconds, "second"), - select: user - ) - - {:ok, query} + from(token in token_and_context_query(token, context), + join: user in assoc(token, :user), + where: + token.inserted_at > + ago(@sudo_session_validity_in_seconds, "second"), + select: user + ) end @doc """ diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex index 01b970124a..066bc937d3 100644 --- a/lib/lightning/application.ex +++ b/lib/lightning/application.ex @@ -126,7 +126,8 @@ defmodule Lightning.Application do adaptor_service_childspec, {Lightning.TaskWorker, name: :cli_task_worker}, {Lightning.Runtime.RuntimeManager, - worker_secret: Lightning.Config.worker_secret()}, + worker_secret: Lightning.Config.worker_secret(), + endpoint: LightningWeb.Endpoint}, {Lightning.KafkaTriggers.Supervisor, type: :supervisor} # Start a worker by calling: Lightning.Worker.start_link(arg) # {Lightning.Worker, arg} diff --git a/lib/lightning/collections.ex b/lib/lightning/collections.ex new file mode 100644 index 0000000000..e039fa321d --- /dev/null +++ b/lib/lightning/collections.ex @@ -0,0 +1,260 @@ +defmodule Lightning.Collections do + @moduledoc """ + Access to collections of unique key-value pairs shared across multiple workflows. + """ + import Ecto.Query + + alias Lightning.Collections.Collection + alias Lightning.Collections.Item + alias Lightning.Repo + + @doc """ + Returns the list of collections with optional ordering and preloading. + + ## Parameters + + - `opts`: A keyword list of options. + - `:order_by` (optional): The field by which to order the results. Default is `[asc: :name]`. + - `:preload` (optional): A list of associations to preload. Default is `[:project]`. + + ## Examples + + iex> list_collections() + [%Collection{}, ...] + + iex> list_collections(order_by: [asc: :inserted_at], preload: [:project, :user]) + [%Collection{}, ...] + + ## Returns + + - A list of `%Collection{}` structs, preloaded and ordered as specified. + """ + @spec list_collections(keyword()) :: [Collection.t()] + def list_collections(opts \\ []) do + order_by = Keyword.get(opts, :order_by, asc: :name) + preload = Keyword.get(opts, :preload, [:project]) + + Repo.all(from(c in Collection, order_by: ^order_by, preload: ^preload)) + end + + @spec get_collection(String.t()) :: + {:ok, Collection.t()} | {:error, :not_found} + def get_collection(name) do + case Repo.get_by(Collection, name: name) do + nil -> {:error, :not_found} + collection -> {:ok, collection} + end + end + + @doc """ + Creates a new collection with the given attributes. + + ## Parameters + + - `attrs`: A map of attributes to create the collection. + + ## Examples + + iex> create_collection(%{name: "New Collection", description: "Description here"}) + {:ok, %Collection{}} + + iex> create_collection(%{name: nil}) + {:error, %Ecto.Changeset{}} + + ## Returns + + - `{:ok, %Collection{}}` on success. + - `{:error, %Ecto.Changeset{}}` on failure due to validation errors. + """ + @spec create_collection(map()) :: + {:ok, Collection.t()} | {:error, Ecto.Changeset.t()} + def create_collection(attrs) do + %Collection{} + |> Collection.changeset(attrs) + |> Repo.insert() + end + + @doc """ + Updates an existing collection with the given attributes. + + ## Parameters + + - `collection`: The existing `%Collection{}` struct to update. + - `attrs`: A map of attributes to update the collection. + + ## Examples + + iex> update_collection(collection, %{name: "Updated Name"}) + {:ok, %Collection{}} + + iex> update_collection(collection, %{name: nil}) + {:error, %Ecto.Changeset{}} + + ## Returns + + - `{:ok, %Collection{}}` on success. + - `{:error, %Ecto.Changeset{}}` on failure due to validation errors. + """ + @spec update_collection(Collection.t(), map()) :: + {:ok, Collection.t()} | {:error, Ecto.Changeset.t()} + def update_collection(collection, attrs) do + collection + |> Collection.changeset(attrs) + |> Repo.update() + end + + @spec create_collection(Ecto.UUID.t(), String.t()) :: + {:ok, Collection.t()} | {:error, Ecto.Changeset.t()} + def create_collection(project_id, name) do + %Collection{} + |> Collection.changeset(%{project_id: project_id, name: name}) + |> Repo.insert() + end + + @spec delete_collection(Ecto.UUID.t()) :: + {:ok, Collection.t()} + | {:error, Ecto.Changeset.t()} + | {:error, :not_found} + def delete_collection(collection_id) do + case Repo.get(Collection, collection_id) do + nil -> {:error, :not_found} + collection -> Repo.delete(collection) + end + end + + @spec get(Collection.t(), String.t()) :: Item.t() | nil + def get(%{id: collection_id}, key) do + Repo.get_by(Item, collection_id: collection_id, key: key) + end + + @spec stream_all(Collection.t(), Enum.t()) :: Enum.t() + def stream_all(%{id: collection_id}, params \\ %{}) do + params = Map.new(params) + cursor = Map.get(params, :cursor) + limit = Map.fetch!(params, :limit) + + collection_id + |> stream_query(cursor, limit) + |> filter_by_inserted_at(params) + |> Repo.stream() + end + + @spec stream_match(Collection.t(), String.t(), Enum.t()) :: Enum.t() + def stream_match( + %{id: collection_id}, + pattern, + params \\ %{} + ) do + pattern = format_pattern(pattern) + params = Map.new(params) + cursor = Map.get(params, :cursor) + limit = Map.fetch!(params, :limit) + + collection_id + |> stream_query(cursor, limit) + |> filter_by_inserted_at(params) + |> where([i], like(i.key, ^pattern)) + |> Repo.stream() + end + + @spec put(Collection.t(), String.t(), String.t()) :: + :ok | {:error, Ecto.Changeset.t()} + def put(%{id: collection_id}, key, value) do + %Item{} + |> Item.changeset(%{collection_id: collection_id, key: key, value: value}) + |> Repo.insert( + conflict_target: [:collection_id, :key], + on_conflict: [set: [value: value, updated_at: DateTime.utc_now()]] + ) + |> then(fn result -> + with {:ok, _no_return} <- result, do: :ok + end) + end + + @spec put_all(Collection.t(), [{String.t(), String.t()}]) :: + {:ok, non_neg_integer()} | :error + def put_all(%{id: collection_id}, kv_list) do + item_list = + Enum.with_index(kv_list, fn %{"key" => key, "value" => value}, + unique_index -> + now = DateTime.add(DateTime.utc_now(), unique_index, :microsecond) + + %{ + collection_id: collection_id, + key: key, + value: value, + inserted_at: now, + updated_at: now + } + end) + + case Repo.insert_all(Item, item_list, + conflict_target: [:collection_id, :key], + on_conflict: {:replace, [:value, :updated_at]} + ) do + {n, nil} when n > 0 -> {:ok, n} + _error -> :error + end + end + + @spec delete(Collection.t(), String.t()) :: :ok | {:error, :not_found} + def delete(%{id: collection_id}, key) do + query = + from(i in Item, where: i.collection_id == ^collection_id and i.key == ^key) + + case Repo.delete_all(query) do + {0, nil} -> {:error, :not_found} + {1, nil} -> :ok + end + end + + @spec delete_all(Collection.t(), String.t() | nil) :: {:ok, non_neg_integer()} + def delete_all(%{id: collection_id}, key_pattern \\ nil) do + query = + from(i in Item, where: i.collection_id == ^collection_id) + |> then(fn query -> + case key_pattern do + nil -> query + pattern -> where(query, [i], like(i.key, ^format_pattern(pattern))) + end + end) + + with {count, _nil} <- Repo.delete_all(query), do: {:ok, count} + end + + defp stream_query(collection_id, cursor, limit) do + Item + |> where([i], i.collection_id == ^collection_id) + |> order_by([i], asc: i.inserted_at) + |> limit(^limit) + |> then(fn query -> + case cursor do + nil -> query + ts_cursor -> where(query, [i], i.inserted_at > ^ts_cursor) + end + end) + end + + defp filter_by_inserted_at(query, params) do + query + |> filter_by_created_before(params) + |> filter_by_created_after(params) + end + + defp filter_by_created_after(query, %{created_after: created_after}), + do: where(query, [i], i.inserted_at >= ^created_after) + + defp filter_by_created_after(query, _params), do: query + + defp filter_by_created_before(query, %{created_before: created_before}), + do: where(query, [i], i.inserted_at < ^created_before) + + defp filter_by_created_before(query, _params), do: query + + defp format_pattern(pattern) do + pattern + |> String.replace("\\", "\\\\") + |> String.replace("%", "\\%") + |> String.replace("*", "%") + end +end diff --git a/lib/lightning/collections/collection.ex b/lib/lightning/collections/collection.ex new file mode 100644 index 0000000000..01be284e5e --- /dev/null +++ b/lib/lightning/collections/collection.ex @@ -0,0 +1,37 @@ +defmodule Lightning.Collections.Collection do + @moduledoc """ + Collection referenced by name associated to a project. + """ + use Lightning.Schema + + import Ecto.Changeset + + @type t :: %__MODULE__{ + id: Ecto.UUID.t(), + project_id: Ecto.UUID.t(), + name: String.t(), + inserted_at: NaiveDateTime.t(), + updated_at: NaiveDateTime.t() + } + + schema "collections" do + field :name, :string + belongs_to :project, Lightning.Projects.Project + has_many :items, Lightning.Collections.Item + + timestamps() + end + + @doc false + def changeset(entry, attrs) do + entry + |> cast(attrs, [:project_id, :name]) + |> validate_required([:project_id, :name]) + |> validate_format(:name, ~r/^[a-z0-9]+([\-_.][a-z0-9]+)*$/, + message: "Collection name must be URL safe" + ) + |> unique_constraint([:name], + message: "A collection with this name already exists" + ) + end +end diff --git a/lib/lightning/collections/item.ex b/lib/lightning/collections/item.ex new file mode 100644 index 0000000000..1ec5f6c755 --- /dev/null +++ b/lib/lightning/collections/item.ex @@ -0,0 +1,48 @@ +defmodule Lightning.Collections.Item do + @moduledoc """ + A key value entry of a collection bound to a project. + """ + use Lightning.Schema + + import Ecto.Changeset + + @type t :: %__MODULE__{ + collection_id: Ecto.UUID.t(), + key: String.t(), + value: String.t(), + inserted_at: DateTime.t(), + updated_at: DateTime.t() + } + + @primary_key false + schema "collections_items" do + belongs_to :collection, Lightning.Collections.Collection, primary_key: true + field :key, :string, primary_key: true + field :value, :string + + timestamps(type: :utc_datetime_usec) + end + + @doc false + def changeset(entry, attrs) do + entry + |> cast(attrs, [:collection_id, :key, :value]) + |> validate_required([:collection_id, :key, :value]) + |> unique_constraint([:collection_id, :key]) + |> foreign_key_constraint(:collection_id) + end + + defimpl Jason.Encoder, for: __MODULE__ do + def encode(item, opts) do + Jason.Encode.map( + %{ + key: item.key, + value: item.value, + created: item.inserted_at, + updated: item.updated_at + }, + opts + ) + end + end +end diff --git a/lib/lightning/config.ex b/lib/lightning/config.ex index 6c09eacf67..ca675ebaf8 100644 --- a/lib/lightning/config.ex +++ b/lib/lightning/config.ex @@ -7,6 +7,26 @@ defmodule Lightning.Config do @behaviour Lightning.Config alias Lightning.Services.AdapterHelper + @impl true + def token_signer do + :persistent_term.get({__MODULE__, "token_signer"}, nil) + |> case do + nil -> + pem = + Application.get_env(:lightning, :workers, []) + |> Keyword.get(:private_key) + + signer = Joken.Signer.create("RS256", %{"pem" => pem}) + + :persistent_term.put({__MODULE__, "token_signer"}, signer) + + signer + + signer -> + signer + end + end + @impl true def run_token_signer do pem = @@ -213,6 +233,7 @@ defmodule Lightning.Config do @callback run_token_signer() :: Joken.Signer.t() @callback storage() :: term() @callback storage(key :: atom()) :: term() + @callback token_signer() :: Joken.Signer.t() @callback usage_tracking() :: Keyword.t() @callback usage_tracking_cron_opts() :: [Oban.Plugins.Cron.cron_input()] @callback worker_secret() :: binary() | nil @@ -232,6 +253,10 @@ defmodule Lightning.Config do impl().run_token_signer() end + def token_signer do + impl().token_signer() + end + @doc """ Returns the Token signer used to verify worker tokens. """ diff --git a/lib/lightning/helpers.ex b/lib/lightning/helpers.ex index e7fa6e2e7b..4cb511c7d6 100644 --- a/lib/lightning/helpers.ex +++ b/lib/lightning/helpers.ex @@ -89,4 +89,68 @@ defmodule Lightning.Helpers do def json_safe(a) when is_atom(a) and not is_boolean(a), do: Atom.to_string(a) def json_safe(any), do: any + + @doc """ + Copies an error from one key to another in the given changeset. + + ## Parameters + + - `changeset`: The changeset to modify. + - `original_key`: The key where the error currently exists. + - `new_key`: The key where the error should be duplicated. + - `opts`: A keyword list of options. Supports `overwrite`, which is a boolean indicating whether to overwrite the `new_key` error if it already exists. Defaults to `true`. + + ## Example + + iex> changeset = %Ecto.Changeset{errors: [name: {"has already been taken", []}]} + iex> updated_changeset = Lightning.Helpers.copy_error(changeset, :name, :raw_name) + iex> updated_changeset.errors + [name: {"has already been taken", []}, raw_name: {"has already been taken", []}] + + If the `original_key` doesn't exist in the errors, or if the `new_key` already exists and `overwrite` is set to `false`, the changeset is returned unchanged. + """ + def copy_error(changeset, original_key, new_key, opts \\ [overwrite: true]) do + overwrite = Keyword.get(opts, :overwrite, true) + + if Keyword.has_key?(changeset.errors, original_key) do + {error_msg, error_opts} = Keyword.fetch!(changeset.errors, original_key) + + if Keyword.has_key?(changeset.errors, new_key) and not overwrite do + changeset + else + Ecto.Changeset.add_error(changeset, new_key, error_msg, error_opts) + end + else + changeset + end + end + + @doc """ + Converts a string into a URL-safe format by converting it to lowercase, + replacing unwanted characters with hyphens, and trimming leading/trailing hyphens. + + This function allows international characters, which will be automatically + percent-encoded in URLs by browsers. + + ## Parameters + + - `name`: The string to convert. If `nil` is passed, it returns an empty string. + + ## Examples + + iex> url_safe_name("My Project!!") + "my-project" + + iex> url_safe_name(nil) + "" + """ + @spec url_safe_name(String.t() | nil) :: String.t() + def url_safe_name(nil), do: "" + + def url_safe_name(name) when is_binary(name) do + name + |> String.downcase() + |> String.replace(~r/[^\p{L}0-9_\.\-]+/u, "-") + |> String.trim("-") + end end diff --git a/lib/lightning/policies/collections.ex b/lib/lightning/policies/collections.ex new file mode 100644 index 0000000000..80e8d89722 --- /dev/null +++ b/lib/lightning/policies/collections.ex @@ -0,0 +1,31 @@ +defmodule Lightning.Policies.Collections do + @moduledoc """ + The Bodyguard Policy module for Collections. + + Access to collections is controlled by the project the collection belongs to. + + The `access_collection` action is allowed if the user has access to the + project, or if a run belongs to the project (via it's workflow). + """ + @behaviour Bodyguard.Policy + + alias Lightning.Accounts.User + alias Lightning.Collections.Collection + alias Lightning.Run + + @type actions :: :access_collection + @spec authorize(actions(), Lightning.Accounts.User.t(), Collection.t()) :: + :ok | {:error, :unauthorized} + def authorize(:access_collection, %User{} = user, %Collection{} = collection) do + Lightning.Policies.Permissions.can( + Lightning.Policies.ProjectUsers, + :access_project, + user, + collection + ) + end + + def authorize(:access_collection, %Run{} = run, %Collection{} = collection) do + Lightning.Runs.get_project_id_for_run(run) == collection.project_id + end +end diff --git a/lib/lightning/policies/project_users.ex b/lib/lightning/policies/project_users.ex index 3b1e1eea3a..c539aa6e5b 100644 --- a/lib/lightning/policies/project_users.ex +++ b/lib/lightning/policies/project_users.ex @@ -41,7 +41,7 @@ defmodule Lightning.Policies.ProjectUsers do @spec authorize( actions(), Lightning.Accounts.User.t(), - Lightning.Projects.Project.t() | nil + Lightning.Projects.Project.t() | %{project_id: Ecto.UUID.t()} | nil ) :: boolean def authorize(:access_project, %User{}, nil), do: false @@ -87,4 +87,7 @@ defmodule Lightning.Policies.ProjectUsers do :initiate_github_sync ], do: project_user.role in [:owner, :admin, :editor] + + def authorize(action, user, %{project_id: project_id}), + do: authorize(action, user, Projects.get_project(project_id)) end diff --git a/lib/lightning/projects.ex b/lib/lightning/projects.ex index ff9dad42d4..f4157c3fc6 100644 --- a/lib/lightning/projects.ex +++ b/lib/lightning/projects.ex @@ -627,15 +627,6 @@ defmodule Lightning.Projects do |> Repo.one() end - def url_safe_project_name(nil), do: "" - - def url_safe_project_name(name) when is_binary(name) do - name - |> String.downcase() - |> String.replace(~r/[^a-z-_\.\d]+/, "-") - |> String.replace(~r/^\-+|\-+$/, "") - end - def member_of?(%Project{id: project_id}, %User{id: user_id}) do from(p in Project, join: pu in assoc(p, :project_users), diff --git a/lib/lightning/runs.ex b/lib/lightning/runs.ex index 25866f8e4b..a4de7635aa 100644 --- a/lib/lightning/runs.ex +++ b/lib/lightning/runs.ex @@ -327,6 +327,7 @@ defmodule Lightning.Runs do defdelegate subscribe(run), to: Events + @spec get_project_id_for_run(Run.t()) :: Ecto.UUID.t() | nil def get_project_id_for_run(run) do Ecto.assoc(run, [:work_order, :workflow, :project]) |> select([p], p.id) diff --git a/lib/lightning/runs/run_options.ex b/lib/lightning/runs/run_options.ex index a31c769502..ce588b1f1e 100644 --- a/lib/lightning/runs/run_options.ex +++ b/lib/lightning/runs/run_options.ex @@ -24,6 +24,11 @@ defmodule Lightning.Runs.RunOptions do field :run_memory_limit_mb, :integer end + def new(opts \\ %{}) do + %__MODULE__{} + |> cast(opts, [:save_dataclips, :run_timeout_ms]) + end + defimpl Jason.Encoder, for: __MODULE__ do def encode(value, opts) do value diff --git a/lib/lightning/runtime/runtime_manager.ex b/lib/lightning/runtime/runtime_manager.ex index 7f81d88131..52bc44ae0a 100644 --- a/lib/lightning/runtime/runtime_manager.ex +++ b/lib/lightning/runtime/runtime_manager.ex @@ -44,7 +44,9 @@ defmodule Lightning.Runtime.RuntimeManager do port: 2222, repo_dir: nil, worker_secret: nil, - ws_url: "ws://localhost:4000/worker" + endpoint: nil, + ws_url: "ws://localhost:4000/worker", + col_url: "http://localhost:4000/collections" @doc """ Parses the keyword list of start arguments and returns a tuple, @@ -58,6 +60,7 @@ defmodule Lightning.Runtime.RuntimeManager do Application.get_env(:lightning, __MODULE__, []) |> Keyword.merge(args) ) + |> maybe_put_urls() {_, args} = args |> Keyword.split(config |> Map.keys()) @@ -102,10 +105,33 @@ defmodule Lightning.Runtime.RuntimeManager do {:ws_url, v} -> ~w(--lightning #{v}) + {:col_url, v} -> + ~w(--collections-url #{v}) + _ -> [nil] end end + + defp maybe_put_urls(config) do + if config.endpoint do + config + |> Map.merge(%{ + ws_url: + Phoenix.VerifiedRoutes.unverified_url(config.endpoint, "/worker") + |> URI.parse() + |> Map.put(:scheme, "ws") + |> URI.to_string(), + col_url: + Phoenix.VerifiedRoutes.unverified_url( + config.endpoint, + "/collections" + ) + }) + else + config + end + end end defmodule RuntimeClient do diff --git a/lib/lightning/tokens.ex b/lib/lightning/tokens.ex new file mode 100644 index 0000000000..5c54c235fa --- /dev/null +++ b/lib/lightning/tokens.ex @@ -0,0 +1,68 @@ +defmodule Lightning.Tokens do + @moduledoc """ + Token generation, verification and validation. + """ + + defmodule PersonalAccessToken do + @moduledoc false + use Joken.Config + + @impl true + def token_config do + %{} + |> add_claim("jti", &Joken.generate_jti/0) + |> add_claim("iss", fn -> "Lightning" end, &(&1 == "Lightning")) + |> add_claim("sub", nil, fn sub, _claims, _context -> + String.starts_with?(sub, "user:") + end) + |> add_claim( + "iat", + fn -> Lightning.current_time() |> DateTime.to_unix() end, + fn iat, _claims, _context -> + Lightning.current_time() >= iat |> DateTime.from_unix() + end + ) + end + end + + @doc """ + Verify a token and return the claims if successful. + + This serves as a central point to verify and validate different types + of tokens. + """ + @spec verify(String.t()) :: {:ok, map()} | {:error, any()} + def verify(token) do + Joken.peek_claims(token) + |> case do + # TODO: Look up user tokens via the JTI and ensure the JTI is indexed + {:ok, %{"sub" => "user:" <> _}} -> + PersonalAccessToken.verify_and_validate( + token, + Lightning.Config.token_signer() + ) + + {:ok, %{"sub" => "run:" <> _}} -> + Lightning.Workers.verify_run_token(token, %{}) + + {:ok, _} -> + {:error, "Unsupported token type"} + + {:error, err} -> + {:error, err} + end + end + + @doc """ + Get the subject of a token. + Currently support RunTokens and PersonalAccessTokens, + which return `Lightning.Run`s and `Lightning.Accounts.User`s respectively. + """ + def get_subject(%{"sub" => "user:" <> user_id}) do + Lightning.Accounts.get_user(user_id) + end + + def get_subject(%{"sub" => "run:" <> run_id}) do + Lightning.Runs.get(run_id) + end +end diff --git a/lib/lightning/utils/crypto.ex b/lib/lightning/utils/crypto.ex new file mode 100644 index 0000000000..14c3683679 --- /dev/null +++ b/lib/lightning/utils/crypto.ex @@ -0,0 +1,37 @@ +defmodule Lightning.Utils.Crypto do + @moduledoc """ + Utility functions for cryptographic operations. + """ + + @doc """ + Generates a new RSA key pair with 2048 bits and a public exponent of 65537. + + This is preferable to using `create_private_key` and `abstract_public_key` as + it generates a key pair in one step, and also doesn't require shelling out to + `openssl`. + """ + def generate_rsa_key_pair do + {:RSAPrivateKey, _, modulus, public_exponent, _, _, _, _exponent1, _, _, + _other_prime_infos} = + rsa_private_key = :public_key.generate_key({:rsa, 2048, 65_537}) + + rsa_public_key = {:RSAPublicKey, modulus, public_exponent} + + private_key = + [:public_key.pem_entry_encode(:RSAPrivateKey, rsa_private_key)] + |> :public_key.pem_encode() + + public_key = + [:public_key.pem_entry_encode(:RSAPublicKey, rsa_public_key)] + |> :public_key.pem_encode() + + {private_key, public_key} + end + + @doc """ + Generates a new HS256 key. + """ + def generate_hs256_key do + 32 |> :crypto.strong_rand_bytes() |> Base.encode64() + end +end diff --git a/lib/lightning/workers.ex b/lib/lightning/workers.ex index da836803fb..0c0e2ff30c 100644 --- a/lib/lightning/workers.ex +++ b/lib/lightning/workers.ex @@ -5,7 +5,7 @@ defmodule Lightning.Workers do This module deals with the security tokens and the formatting used on the communication with the workers. """ - defmodule Token do + defmodule WorkerToken do @moduledoc """ JWT token configuration to authenticate workers. """ @@ -35,7 +35,14 @@ defmodule Lightning.Workers do %{} |> add_claim("iss", fn -> "Lightning" end, &(&1 == "Lightning")) |> add_claim("id", nil, fn id, _claims, context -> - is_binary(id) and id == Map.get(context, :id) + Map.get(context, :id) + |> case do + nil -> + is_binary(id) + + expected_id -> + is_binary(id) and id == expected_id + end end) |> add_claim( "nbf", @@ -58,12 +65,14 @@ defmodule Lightning.Workers do Lightning.Run.t(), Lightning.Runs.RunOptions.t() ) :: binary() - def generate_run_token(run, run_options) do - run_timeout_ms = run_options[:run_timeout_ms] - + def generate_run_token(run, run_options \\ %Lightning.Runs.RunOptions{}) do {:ok, token, _claims} = RunToken.generate_and_sign( - %{"id" => run.id, "exp" => calculate_token_expiry(run_timeout_ms)}, + %{ + "id" => run.id, + "exp" => calculate_token_expiry(run_options.run_timeout_ms), + "sub" => "run:#{run.id}" + }, Lightning.Config.run_token_signer() ) @@ -119,7 +128,7 @@ defmodule Lightning.Workers do def verify_worker_token(token, context \\ %{}) when is_binary(token) do context = Enum.into(context, %{current_time: Lightning.current_time()}) - Token.verify_and_validate( + WorkerToken.verify_and_validate( token, Lightning.Config.worker_token_signer(), context diff --git a/lib/lightning_web/channels/worker_channel.ex b/lib/lightning_web/channels/worker_channel.ex index fb68a9e122..ce31b9cfb0 100644 --- a/lib/lightning_web/channels/worker_channel.ex +++ b/lib/lightning_web/channels/worker_channel.ex @@ -49,5 +49,8 @@ defmodule LightningWeb.WorkerChannel do |> then(fn %{project_id: project_id} -> UsageLimiter.get_run_options(%Context{project_id: project_id}) end) + |> Enum.into(%{}) + |> Runs.RunOptions.new() + |> Ecto.Changeset.apply_changes() end end diff --git a/lib/lightning_web/components/layouts/settings.html.heex b/lib/lightning_web/components/layouts/settings.html.heex index eb22faf8f0..4b16f8964e 100644 --- a/lib/lightning_web/components/layouts/settings.html.heex +++ b/lib/lightning_web/components/layouts/settings.html.heex @@ -19,35 +19,42 @@
Projects Users Authentication Audit + + + Collections +
- + Back diff --git a/lib/lightning_web/controllers/collections_controller.ex b/lib/lightning_web/controllers/collections_controller.ex new file mode 100644 index 0000000000..dde63fdb73 --- /dev/null +++ b/lib/lightning_web/controllers/collections_controller.ex @@ -0,0 +1,292 @@ +defmodule LightningWeb.CollectionsController do + use LightningWeb, :controller + + alias Lightning.Collections + alias Lightning.Policies.Permissions + alias Lightning.Repo + + action_fallback LightningWeb.FallbackController + + require Logger + + @max_chunk_size 50 + + @default_limit Application.compile_env!(:lightning, __MODULE__)[:stream_limit] + + @valid_params [ + "key", + "cursor", + "limit", + "created_after", + "created_before" + ] + + defp authorize(conn, collection) do + Permissions.can( + Lightning.Policies.Collections, + :access_collection, + conn.assigns.subject, + collection + ) + end + + # + # Controller starts here + # + def put(conn, %{"name" => col_name, "key" => key, "value" => value}) do + with {:ok, collection} <- Collections.get_collection(col_name), + :ok <- authorize(conn, collection) do + case Collections.put(collection, key, value) do + :ok -> + json(conn, %{upserted: 1, error: nil}) + + {:error, _reason} -> + json(conn, %{upserted: 0, error: "Format error"}) + end + end + end + + def put_all(conn, %{"name" => col_name, "items" => items}) do + with {:ok, collection} <- Collections.get_collection(col_name), + :ok <- authorize(conn, collection) do + case Collections.put_all(collection, items) do + {:ok, count} -> + json(conn, %{upserted: count, error: nil}) + + :error -> + conn + |> put_status(:internal_server_error) + |> json(%{upserted: 0, error: "Database Error"}) + end + end + end + + def get(conn, %{"name" => col_name, "key" => key}) do + with {:ok, collection} <- Collections.get_collection(col_name), + :ok <- authorize(conn, collection) do + case Collections.get(collection, key) do + nil -> + conn + |> put_status(:no_content) + |> json(nil) + + item -> + json(conn, item) + end + end + end + + def delete(conn, %{"name" => col_name, "key" => key}) do + with {:ok, collection} <- Collections.get_collection(col_name), + :ok <- authorize(conn, collection) do + case Collections.delete(collection, key) do + :ok -> + json(conn, %{key: key, deleted: 1, error: nil}) + + {:error, :not_found} -> + json(conn, %{key: key, deleted: 0, error: "Item Not Found"}) + end + end + end + + def delete_all(conn, %{"name" => col_name} = params) do + with {:ok, collection} <- Collections.get_collection(col_name), + :ok <- authorize(conn, collection) do + key_param = params["key"] + + {:ok, n} = Collections.delete_all(collection, key_param) + + json(conn, %{key: key_param, deleted: n, error: nil}) + end + end + + def stream(conn, %{"name" => col_name, "key" => key_pattern}) do + with {:ok, collection, filters, response_limit} <- + validate_query(conn, col_name) do + case Repo.transact(fn -> + items_stream = + Collections.stream_match(collection, key_pattern, filters) + + stream_chunked(conn, items_stream, response_limit) + end) do + {:error, conn} -> conn + {:ok, conn} -> conn + end + end + end + + def stream(conn, %{"name" => col_name}) do + with {:ok, collection, filters, response_limit} <- + validate_query(conn, col_name) do + case Repo.transact(fn -> + items_stream = Collections.stream_all(collection, filters) + + stream_chunked(conn, items_stream, response_limit) + end) do + {:error, conn} -> conn + {:ok, conn} -> conn + end + end + end + + defmodule ChunkAcc do + defstruct conn: nil, + count: 0, + limit: 0, + last: nil, + cursor_data: nil + end + + defp stream_chunked(conn, items_stream, response_limit) do + with %{halted: false} = conn <- begin_chunking(conn) do + items_stream + |> Stream.chunk_every(@max_chunk_size) + |> Stream.with_index() + |> Enum.reduce_while( + %ChunkAcc{conn: conn, limit: response_limit}, + &send_chunk/2 + ) + |> finish_chunking() + end + end + + defp validate_query(conn, col_name) do + with {:ok, collection} <- Collections.get_collection(col_name), + :ok <- authorize(conn, collection), + query_params <- + Enum.into(conn.query_params, %{ + "cursor" => nil, + "limit" => "#{@default_limit}" + }), + {:ok, filters} <- validate_query_params(query_params) do + # returns one more from db than the limit to determine if there are more items for the cursor + db_query_filters = Map.update(filters, :limit, @default_limit, &(&1 + 1)) + response_limit = Map.fetch!(filters, :limit) + + {:ok, collection, db_query_filters, response_limit} + end + end + + defp validate_query_params( + %{"cursor" => cursor, "limit" => limit} = query_params + ) do + with invalid_params when map_size(invalid_params) == 0 <- + Map.drop(query_params, @valid_params), + {:ok, cursor} <- validate_cursor(cursor), + {limit, ""} <- Integer.parse(limit), + valid_params <- Map.take(query_params, @valid_params) do + filters = + valid_params + |> Map.new(fn {key, value} -> {String.to_existing_atom(key), value} end) + |> Map.put(:limit, limit) + |> Map.put(:cursor, cursor) + + {:ok, filters} + else + _invalid -> + {:error, :bad_request} + end + end + + defp validate_cursor(nil), do: {:ok, nil} + + defp validate_cursor(cursor) do + with {:ok, decoded} <- Base.decode64(cursor), + {:ok, datetime, _off} <- DateTime.from_iso8601(decoded) do + {:ok, datetime} + end + end + + defp begin_chunking(conn) do + conn + |> put_resp_content_type("application/json") + |> send_chunked(200) + |> Plug.Conn.chunk(~S({"items": [)) + |> case do + {:ok, conn} -> + conn + + {:error, reason} -> + Logger.warning("Error starting chunking: #{inspect(reason)}") + halt(conn) + end + end + + defp finish_chunking(%ChunkAcc{conn: conn, cursor_data: cursor_data}) do + cursor = + if cursor_data do + cursor_data |> DateTime.to_iso8601() |> Base.encode64() + end + + Plug.Conn.chunk(conn, ~S(], "cursor":) <> Jason.encode!(cursor) <> "}") + end + + defp finish_chunking({:error, conn}), do: conn + + defp send_chunk({chunk_items, 0}, acc) do + {taken_items, acc} = take_and_accumulate(chunk_items, acc) + + taken_items + |> Enum.map_join(",", &Jason.encode!/1) + |> send_chunk_and_iterate(acc) + end + + defp send_chunk( + {_chunk_items, _i}, + %ChunkAcc{count: sent_count, last: last, limit: limit} = acc + ) + when sent_count == limit do + {:halt, %ChunkAcc{acc | cursor_data: last.inserted_at}} + end + + defp send_chunk({chunk_items, _i}, acc) do + {taken_items, acc} = take_and_accumulate(chunk_items, acc) + + taken_items + |> Enum.map_join(",", &Jason.encode!/1) + |> then(fn items_chunk -> + "," <> items_chunk + end) + |> send_chunk_and_iterate(acc) + end + + defp take_and_accumulate( + chunk_items, + %ChunkAcc{count: sent_count, limit: limit} = acc + ) do + taken_items = Enum.take(chunk_items, limit - sent_count) + last = List.last(taken_items) + taken_count = length(taken_items) + + cursor_data = + if taken_count > 0 and length(chunk_items) > taken_count do + last.inserted_at + end + + acc = + struct(acc, %{ + count: sent_count + taken_count, + last: last, + cursor_data: cursor_data + }) + + {taken_items, acc} + end + + defp send_chunk_and_iterate( + chunk, + %ChunkAcc{conn: conn, cursor_data: cursor_data} = acc + ) do + case Plug.Conn.chunk(conn, chunk) do + {:ok, conn} -> + if cursor_data do + {:halt, %{acc | conn: conn}} + else + {:cont, %{acc | conn: conn}} + end + + {:error, :closed} -> + {:halt, {:error, conn}} + end + end +end diff --git a/lib/lightning_web/controllers/fallback_controller.ex b/lib/lightning_web/controllers/fallback_controller.ex index 0ea590bec1..467130d4d2 100644 --- a/lib/lightning_web/controllers/fallback_controller.ex +++ b/lib/lightning_web/controllers/fallback_controller.ex @@ -14,11 +14,18 @@ defmodule LightningWeb.FallbackController do |> render(:"404") end + def call(conn, {:error, :bad_request}) do + conn + |> put_status(:bad_request) + |> put_view(LightningWeb.ErrorView) + |> render(:"400") + end + def call(conn, {:error, :unauthorized}) do conn |> put_status(:unauthorized) |> put_view(LightningWeb.ErrorView) - |> render(:"401", error: %{error: :unauthirized}) + |> render(:"401") end def call(conn, {:error, :forbidden}) do diff --git a/lib/lightning_web/controllers/user_auth.ex b/lib/lightning_web/controllers/user_auth.ex index 38986253ef..0793e57a8a 100644 --- a/lib/lightning_web/controllers/user_auth.ex +++ b/lib/lightning_web/controllers/user_auth.ex @@ -188,7 +188,7 @@ defmodule LightningWeb.UserAuth do end end - defp get_bearer(conn) do + def get_bearer(conn) do conn |> get_req_header("authorization") |> case do @@ -256,7 +256,6 @@ defmodule LightningWeb.UserAuth do @doc """ Used for API routes that require the resource to be authenticated. A resource can be a `User` or a `ProjectRepoConnection` - """ def require_authenticated_api_resource(conn, _opts) do if is_nil(conn.assigns[:current_resource]) do diff --git a/lib/lightning_web/live/collection_live/collection_creation_modal.ex b/lib/lightning_web/live/collection_live/collection_creation_modal.ex new file mode 100644 index 0000000000..273413fdc2 --- /dev/null +++ b/lib/lightning_web/live/collection_live/collection_creation_modal.ex @@ -0,0 +1,182 @@ +defmodule LightningWeb.CollectionLive.CollectionCreationModal do + use LightningWeb, :live_component + + alias Lightning.Collections + alias Lightning.Collections.Collection + alias Lightning.Helpers + alias Lightning.Projects + + @impl true + def update(assigns, socket) do + changeset = Collection.changeset(assigns.collection, %{}) + + {:ok, + socket + |> assign(assigns) + |> assign(:changeset, changeset) + |> assign(:name, get_collection_name(changeset)) + |> assign(:projects_options, list_project_options()) + |> assign_new(:mode, fn -> :create end)} + end + + defp list_project_options do + Projects.list_projects() |> Enum.map(&{&1.name, &1.id}) + end + + defp get_collection_name(changeset) do + Ecto.Changeset.fetch_field!(changeset, :name) + end + + @impl true + def handle_event("close_modal", _, socket) do + {:noreply, socket |> push_navigate(to: socket.assigns.return_to)} + end + + def handle_event("validate", %{"collection" => collection_params}, socket) do + changeset = + socket.assigns.collection + |> Collection.changeset( + collection_params + |> coerce_raw_name_to_safe_name + ) + |> Map.put(:action, :validate) + + {:noreply, + socket + |> assign( + :changeset, + Lightning.Helpers.copy_error(changeset, :name, :raw_name) + ) + |> assign(:name, Ecto.Changeset.fetch_field!(changeset, :name))} + end + + def handle_event("save", %{"collection" => collection_params}, socket) do + %{mode: mode, return_to: return_to} = socket.assigns + + result = + case mode do + :create -> + Collections.create_collection(collection_params) + + :update -> + Collections.update_collection( + socket.assigns.collection, + collection_params + ) + end + + case result do + {:ok, _collection} -> + {:noreply, + socket + |> put_flash(:info, "Collection #{mode}d successfully") + |> push_navigate(to: return_to)} + + {:error, changeset} -> + {:noreply, + assign( + socket, + :changeset, + Lightning.Helpers.copy_error(changeset, :name, :raw_name) + )} + end + end + + defp coerce_raw_name_to_safe_name(%{"raw_name" => raw_name} = params) do + new_name = Helpers.url_safe_name(raw_name) + + params |> Map.put("name", new_name) + end + + defp coerce_raw_name_to_safe_name(%{} = params) do + params + end + + @impl true + def render(assigns) do + ~H""" +
+ <.modal id={@id} width="xl:min-w-1/3 min-w-1/2 max-w-full"> + <:title> +
+ + <%= if @mode == :create, + do: "Create Collection", + else: "Edit Collection" %> + + +
+ + <.form + :let={f} + for={@changeset} + id={"collection-form-#{@collection.id || "new"}"} + phx-target={@myself} + phx-change="validate" + phx-submit="save" + > +
+
+ <.input + type="text" + field={f[:raw_name]} + value={@name} + label="Name" + required="true" + /> + <.input type="hidden" field={f[:name]} /> + + <%= if to_string(f[:name].value) != "" do %> + Your collection will be named + <%= @name %>. + <% end %> + +
+
+ <.input + type="select" + field={f[:project_id]} + label="Project" + options={@projects_options} + required="true" + /> +
+
+ <.modal_footer class="mt-6 mx-6"> +
+ + +
+ + + +
+ """ + end +end diff --git a/lib/lightning_web/live/collection_live/components.ex b/lib/lightning_web/live/collection_live/components.ex new file mode 100644 index 0000000000..560d860b03 --- /dev/null +++ b/lib/lightning_web/live/collection_live/components.ex @@ -0,0 +1,152 @@ +defmodule LightningWeb.CollectionLive.Components do + use LightningWeb, :component + + import PetalComponents.Table + + defp confirm_collection_deletion_modal(assigns) do + ~H""" + <.modal id={@id} width="max-w-md"> + <:title> +
+ + Delete collection + + + +
+ +
+

+ Are you sure you want to delete the collection + <%= @collection.name %> + ? + If you wish to proceed with this action, click on the delete button. To cancel click on the cancel button.

+

+
+
+ <.button + id={"#{@id}_confirm_button"} + type="button" + phx-click="delete_collection" + phx-value-collection={@collection.id} + color_class="bg-red-600 hover:bg-red-700 text-white" + phx-disable-with="Deleting..." + > + Delete + + +
+ + """ + end + + defp table_title(assigns) do + ~H""" +

+ Collections + + (<%= @count %>) + +

+ """ + end + + def collections_table(assigns) do + next_sort_icon = %{asc: "hero-chevron-down", desc: "hero-chevron-up"} + + assigns = + assign(assigns, + collections_count: Enum.count(assigns.collections), + empty?: Enum.empty?(assigns.collections), + name_sort_icon: next_sort_icon[assigns.name_direction] + ) + + ~H""" + <%= if @empty? do %> + <%= render_slot(@empty_state) %> + <% else %> +
+ <.table_title count={@collections_count} /> +
+ <%= render_slot(@create_collection_button) %> +
+
+ <.table id="collections-table"> + <.tr> + <.th> +
+ Name + + <.icon name={@name_sort_icon} /> + +
+ + <.th>Project + <.th> + + + <.tr + :for={collection <- @collections} + id={"collections-table-row-#{collection.id}"} + class="hover:bg-gray-100 transition-colors duration-200" + > + <.td class="break-words max-w-[15rem] text-gray-800"> + <%= collection.name %> + + <.td class="break-words max-w-[25rem]"> + <%= collection.project.name %> + + + <.td> +
+ + +
+ <.live_component + id={"update-collection-#{collection.id}-modal"} + module={LightningWeb.CollectionLive.CollectionCreationModal} + collection={collection} + mode={:update} + return_to={~p"/settings/collections"} + /> + <.confirm_collection_deletion_modal + id={"delete-collection-#{collection.id}-modal"} + collection={collection} + /> + + + + <% end %> + """ + end +end diff --git a/lib/lightning_web/live/collection_live/index.ex b/lib/lightning_web/live/collection_live/index.ex new file mode 100644 index 0000000000..abb3f43c93 --- /dev/null +++ b/lib/lightning_web/live/collection_live/index.ex @@ -0,0 +1,129 @@ +defmodule LightningWeb.CollectionLive.Index do + use LightningWeb, :live_view + + import LightningWeb.CollectionLive.Components + + alias Lightning.Collections + alias Lightning.Collections.Collection + alias Lightning.Policies.Permissions + alias Lightning.Policies.Users + + require Logger + + @impl true + def mount(_params, _session, socket) do + can_access_admin_space = + Users + |> Permissions.can?(:access_admin_space, socket.assigns.current_user, {}) + + if can_access_admin_space do + {:ok, + socket + |> assign( + page_title: "Collections", + active_menu_item: :collections, + collections: Collections.list_collections(), + name_sort_direction: :asc + ), layout: {LightningWeb.Layouts, :settings}} + else + {:ok, + socket + |> put_flash(:nav, :no_access) + |> push_redirect(to: "/projects")} + end + end + + @impl true + def handle_event("sort", %{"by" => field}, socket) do + sort_key = String.to_atom("#{field}_sort_direction") + sort_direction = Map.get(socket.assigns, sort_key, :asc) + new_sort_direction = switch_sort_direction(sort_direction) + + order_column = map_sort_field_to_column(field) + + collections = + Collections.list_collections( + order_by: [{new_sort_direction, order_column}] + ) + + {:noreply, + socket + |> assign(:collections, collections) + |> assign(sort_key, new_sort_direction)} + end + + def handle_event("delete_collection", %{"collection" => collection_id}, socket) do + case Collections.delete_collection(collection_id) do + {:ok, _collection} -> + {:noreply, + socket + |> put_flash(:info, "Collection deleted successfully!") + |> push_navigate(to: ~p"/settings/collections")} + + {:error, reason} -> + Logger.error("Error during collection deletion: #{inspect(reason)}") + + {:noreply, + socket + |> put_flash(:error, "Couldn't delete collection!") + |> push_navigate(to: ~p"/settings/collections")} + end + end + + defp switch_sort_direction(:asc), do: :desc + defp switch_sort_direction(:desc), do: :asc + + defp map_sort_field_to_column("name"), do: :name + + @impl true + def render(assigns) do + ~H""" + + <:header> + + <:title><%= @page_title %> + + + +
+ <.collections_table + collections={@collections} + user={@current_user} + name_direction={@name_sort_direction} + > + <:empty_state> + + + <:create_collection_button> + <.button + role="button" + id="open-create-collection-modal-button" + phx-click={show_modal("create-collection-modal")} + class="col-span-1 w-full rounded-md" + > + Create collection + + + + <.live_component + id="create-collection-modal" + module={LightningWeb.CollectionLive.CollectionCreationModal} + collection={%Collection{}} + return_to={~p"/settings/collections"} + /> +
+
+
+ """ + end +end diff --git a/lib/lightning_web/live/dashboard_live/project_creation_modal.ex b/lib/lightning_web/live/dashboard_live/project_creation_modal.ex index f41d1af648..c40684133e 100644 --- a/lib/lightning_web/live/dashboard_live/project_creation_modal.ex +++ b/lib/lightning_web/live/dashboard_live/project_creation_modal.ex @@ -1,6 +1,7 @@ defmodule LightningWeb.DashboardLive.ProjectCreationModal do use LightningWeb, :live_component + alias Lightning.Helpers alias Lightning.Projects alias Lightning.Projects.Project @@ -61,7 +62,7 @@ defmodule LightningWeb.DashboardLive.ProjectCreationModal do end defp coerce_raw_name_to_safe_name(%{"raw_name" => raw_name} = params) do - new_name = Projects.url_safe_project_name(raw_name) + new_name = Helpers.url_safe_name(raw_name) params |> Map.put("name", new_name) end diff --git a/lib/lightning_web/live/project_live/form_component.ex b/lib/lightning_web/live/project_live/form_component.ex index 4b9155c5e0..66f1dda693 100644 --- a/lib/lightning_web/live/project_live/form_component.ex +++ b/lib/lightning_web/live/project_live/form_component.ex @@ -15,6 +15,7 @@ defmodule LightningWeb.ProjectLive.FormComponent do import Ecto.Changeset, only: [fetch_field!: 2] import LightningWeb.Components.Form + alias Lightning.Helpers alias Lightning.Projects alias Lightning.Projects.Project @@ -49,7 +50,7 @@ defmodule LightningWeb.ProjectLive.FormComponent do |> assign(:changeset, changeset) |> assign( :name, - Projects.url_safe_project_name(fetch_field!(changeset, :name)) + Helpers.url_safe_name(fetch_field!(changeset, :name)) )} end @@ -125,7 +126,7 @@ defmodule LightningWeb.ProjectLive.FormComponent do end defp coerce_raw_name_to_safe_name(%{"raw_name" => raw_name} = params) do - new_name = Projects.url_safe_project_name(raw_name) + new_name = Helpers.url_safe_name(raw_name) params |> Map.put("name", new_name) end diff --git a/lib/lightning_web/live/project_live/settings.html.heex b/lib/lightning_web/live/project_live/settings.html.heex index 146499bd93..1d277d4117 100644 --- a/lib/lightning_web/live/project_live/settings.html.heex +++ b/lib/lightning_web/live/project_live/settings.html.heex @@ -629,31 +629,33 @@ project_user={project_user} /> - <.td class="text-right"> - <.button - id={"remove_project_user_#{project_user.id}_button"} - type="button" - phx-click={show_modal("remove_#{project_user.id}_modal")} - color_class="bg-white text-gray-900 hover:bg-gray-50 disabled:bg-gray-100" - class="gap-x-2 rounded-md px-3.5 py-2.5 text-sm shadow-sm ring-1 ring-inset ring-gray-300 disabled:cursor-not-allowed" - tooltip={ - remove_user_tooltip( - project_user, - @current_user, - @can_remove_project_user - ) - } - disabled={ - !user_removable?( - project_user, - @current_user, - @can_remove_project_user - ) - } - > - <.icon name="hero-minus-circle" class="w-5 h-5" /> - Remove Collaborator - + <.td> +
+ <.button + id={"remove_project_user_#{project_user.id}_button"} + type="button" + phx-click={show_modal("remove_#{project_user.id}_modal")} + color_class="bg-white text-gray-900 hover:bg-gray-50 disabled:bg-gray-100" + class="gap-x-2 rounded-md px-3.5 py-2.5 text-sm shadow-sm ring-1 ring-inset ring-gray-300 disabled:cursor-not-allowed" + tooltip={ + remove_user_tooltip( + project_user, + @current_user, + @can_remove_project_user + ) + } + disabled={ + !user_removable?( + project_user, + @current_user, + @can_remove_project_user + ) + } + > + <.icon name="hero-minus-circle" class="w-5 h-5" /> + Remove Collaborator + +
<.confirm_user_removal_modal :if={ user_removable?( diff --git a/lib/lightning_web/plugs/api_auth.ex b/lib/lightning_web/plugs/api_auth.ex new file mode 100644 index 0000000000..b37b2d7eed --- /dev/null +++ b/lib/lightning_web/plugs/api_auth.ex @@ -0,0 +1,48 @@ +defmodule LightningWeb.Plugs.ApiAuth do + @moduledoc """ + Authenticates api calls based on JWT bearer token. + """ + use Phoenix.Controller + import Plug.Conn + + def init(opts) do + opts + end + + def call(conn, _opts) do + with {:ok, bearer_token} <- get_bearer_token(conn), + {:ok, claims} <- Lightning.Tokens.verify(bearer_token) do + conn + |> assign(:claims, claims) + |> put_subject() + else + {:error, _reason} -> + deny_access(conn) + end + end + + defp get_bearer_token(conn) do + conn + |> get_req_header("authorization") + |> case do + ["Bearer " <> bearer] -> {:ok, bearer} + _none_or_many -> {:error, :token_not_found} + end + end + + defp put_subject(conn) do + conn.assigns.claims + |> Lightning.Tokens.get_subject() + |> then(fn subject -> + conn |> assign(:subject, subject) + end) + end + + defp deny_access(conn) do + conn + |> put_status(:unauthorized) + |> put_view(LightningWeb.ErrorView) + |> render(:"401") + |> halt() + end +end diff --git a/lib/lightning_web/route_helpers.ex b/lib/lightning_web/route_helpers.ex index f255c3e901..33df348577 100644 --- a/lib/lightning_web/route_helpers.ex +++ b/lib/lightning_web/route_helpers.ex @@ -4,15 +4,6 @@ defmodule LightningWeb.RouteHelpers do """ alias LightningWeb.Router.Helpers, as: Routes - def show_run_url(project_id, run_id) do - Routes.project_run_show_url( - LightningWeb.Endpoint, - :show, - project_id, - run_id - ) - end - def project_dashboard_url(project_id) do Routes.project_workflow_index_url( LightningWeb.Endpoint, diff --git a/lib/lightning_web/router.ex b/lib/lightning_web/router.ex index 84b25ab718..50587825f9 100644 --- a/lib/lightning_web/router.ex +++ b/lib/lightning_web/router.ex @@ -42,6 +42,11 @@ defmodule LightningWeb.Router do plug :accepts, ["json"] end + pipeline :authenticated_api do + plug :accepts, ["json"] + plug LightningWeb.Plugs.ApiAuth + end + scope "/", LightningWeb do pipe_through [:browser] @@ -84,6 +89,18 @@ defmodule LightningWeb.Router do # resources "/runs", API.RunController, only: [:index, :show] end + ## Collections + scope "/collections", LightningWeb do + pipe_through [:authenticated_api] + + get "/:name", CollectionsController, :stream + get "/:name/:key", CollectionsController, :get + put "/:name/:key", CollectionsController, :put + post "/:name", CollectionsController, :put_all + delete "/:name/:key", CollectionsController, :delete + delete "/:name", CollectionsController, :delete_all + end + ## Authentication routes scope "/", LightningWeb do @@ -154,6 +171,8 @@ defmodule LightningWeb.Router do live "/settings/authentication", AuthProvidersLive.Index, :edit live "/settings/authentication/new", AuthProvidersLive.Index, :new + + live "/settings/collections", CollectionLive.Index, :index end live_session :default, on_mount: LightningWeb.InitAssigns do diff --git a/lib/lightning_web/views/error_view.ex b/lib/lightning_web/views/error_view.ex index 3db6b9f58c..e050068202 100644 --- a/lib/lightning_web/views/error_view.ex +++ b/lib/lightning_web/views/error_view.ex @@ -1,4 +1,6 @@ defmodule LightningWeb.ErrorView do + @moduledoc false + # This module needs to be changed to use Layouts use LightningWeb, :view @@ -31,7 +33,10 @@ defmodule LightningWeb.ErrorView do

Authorization Error

-
+
<%= for {k,v} <- @error do %>
<%= k %>
<%= v %>
@@ -43,10 +48,6 @@ defmodule LightningWeb.ErrorView do """ end - def render("403.json", %{error: error}) do - %{"error" => error} - end - defp logo_bar(assigns) do ~H"""