From 4ba12e9fbec20f953316d130cb1bb94e4d776a05 Mon Sep 17 00:00:00 2001 From: Midigo Frank <39288959+midigofrank@users.noreply.github.com> Date: Thu, 5 Sep 2024 13:28:52 +0200 Subject: [PATCH] Persist AI Assistant conversations and enable it for all users (#2453) * placeholder changelog * Persist AI messages (#2427) * persist messages * use role instead of sender in chat_messages. Simplifies things * allow all users to access the ai assistant * add test for viewing form * make dialyzer happy * make dialyzer happy * Resume Chat Sessions (#2439) * WIP * users can resume sessions * redirect to the created session for new chat sessions * make credo happy * save session title * fix bug when following run * Polish Chat Assistant UI (#2452) * polish UI * show user avatars in the chat session * disable submit when user is not allowed to edit workflow * Handle errors when saving message and querying apollo (#2456) this also gives way for handling limiter errors * Tests For Ai Assistant (#2458) * add tests for assistant * add test for failures * add ability to clear error message * increase sleep duration for async task in test * update chagelog * fix bug where closing edit modal did not update the onboarding ui * Get rid of Process.sleep in test instead use PubSub * Limit AI queries (#2457) * Limit AI queries * Remove forced error and add test case * Increment ai chat messages * Changelog and formatting * Handles assistant role as string sent by async process_message * Fix banner * Create index for counting * Use extension to increment ai queries * Simplify test and check expected limiter extension calls * Fix rebase * Formatting * Increment on reply * Changelog * Icon and Center of banner The Common.banner needs a fix * move limiter banner to the component * fix failing tests --------- Co-authored-by: Frank Midigo * always show the limit banner --------- Co-authored-by: Rogerio Pontual <44991200+jyeshe@users.noreply.github.com> --- CHANGELOG.md | 5 + lib/lightning/ai_assistant/ai_assistant.ex | 223 ++++--- lib/lightning/ai_assistant/chat_message.ex | 58 ++ lib/lightning/ai_assistant/chat_session.ex | 43 ++ lib/lightning/ai_assistant/limiter.ex | 20 + lib/lightning/extensions/usage_limiter.ex | 3 + lib/lightning/extensions/usage_limiting.ex | 9 +- lib/lightning/services/usage_limiter.ex | 5 + .../workflow_live/ai_assistant_component.ex | 595 +++++++++++++++--- lib/lightning_web/live/workflow_live/edit.ex | 113 +++- ...0823072414_create_chat_sessions_tables.exs | 31 + .../ai_assistant/ai_assistant_test.exs | 218 ++++--- test/lightning/ai_assistant/limiter_test.exs | 49 ++ .../extensions/usage_limiter_test.exs | 9 + .../live/workflow_live/edit_test.exs | 498 +++++++++++++++ test/support/factories.ex | 17 + test/support/stub_usage_limiter.ex | 3 + 17 files changed, 1593 insertions(+), 306 deletions(-) create mode 100644 lib/lightning/ai_assistant/chat_message.ex create mode 100644 lib/lightning/ai_assistant/chat_session.ex create mode 100644 lib/lightning/ai_assistant/limiter.ex create mode 100644 priv/repo/migrations/20240823072414_create_chat_sessions_tables.exs create mode 100644 test/lightning/ai_assistant/limiter_test.exs diff --git a/CHANGELOG.md b/CHANGELOG.md index a06ed91adf..f9327107a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ and this project adheres to ### Added +- Limit AI queries and hook the increment of AI queries to allow usage limiting. + [#2438](https://github.com/OpenFn/lightning/pull/2438) +- Persist AI Assistant conversations and enable it for all users + [#2296](https://github.com/OpenFn/lightning/issues/2296) + ### Changed ### Fixed diff --git a/lib/lightning/ai_assistant/ai_assistant.ex b/lib/lightning/ai_assistant/ai_assistant.ex index 9eab78e158..f0fc809bd9 100644 --- a/lib/lightning/ai_assistant/ai_assistant.ex +++ b/lib/lightning/ai_assistant/ai_assistant.ex @@ -3,103 +3,89 @@ defmodule Lightning.AiAssistant do The AI assistant module. """ + import Ecto.Query + + alias Ecto.Multi alias Lightning.Accounts.User + alias Lightning.AiAssistant.ChatSession alias Lightning.ApolloClient + alias Lightning.Repo + alias Lightning.Services.UsageLimiter alias Lightning.Workflows.Job - defmodule Session do - @moduledoc """ - Represents a session with the AI assistant. - """ - - defstruct [ - :id, - :expression, - :adaptor, - :history - ] - - @type t() :: %__MODULE__{ - id: Ecto.UUID.t(), - expression: String.t(), - adaptor: String.t(), - history: history() - } - - @type history() :: [ - %{role: :user | :assistant, content: String.t()} - ] - - @spec new(Job.t()) :: t() - def new(job) do - %Session{ - id: job.id, - expression: job.body, - adaptor: Lightning.AdaptorRegistry.resolve_adaptor(job.adaptor), - history: [] - } - end - - @spec put_history(t(), history() | [%{String.t() => any()}]) :: t() - def put_history(session, history) do - history = - Enum.map(history, fn h -> - %{role: h["role"] || h[:role], content: h["content"] || h[:content]} - end) - - %{session | history: history} - end - - @spec push_history(t(), %{String.t() => any()}) :: t() - def push_history(session, message) do - history = - session.history ++ - [ - %{ - role: message["role"] || message[:role], - content: message["content"] || message[:content] - } - ] - - %{session | history: history} - end + @spec put_expression_and_adaptor(ChatSession.t(), String.t(), String.t()) :: + ChatSession.t() + def put_expression_and_adaptor(session, expression, adaptor) do + %{ + session + | expression: expression, + adaptor: Lightning.AdaptorRegistry.resolve_adaptor(adaptor) + } + end - @doc """ - Puts the given expression into the session. - """ - @spec put_expression(t(), String.t()) :: t() - def put_expression(session, expression) do - %{session | expression: expression} - end + @spec list_sessions_for_job(Job.t()) :: [ChatSession.t(), ...] | [] + def list_sessions_for_job(job) do + Repo.all( + from s in ChatSession, + where: s.job_id == ^job.id, + order_by: [desc: :updated_at], + preload: [:user] + ) end - @doc """ - Creates a new session with the given job. + @spec get_session!(Ecto.UUID.t()) :: ChatSession.t() + def get_session!(id) do + ChatSession |> Repo.get!(id) |> Repo.preload(messages: :user) + end - **Example** + @spec create_session(Job.t(), User.t(), String.t()) :: + {:ok, ChatSession.t()} | {:error, Ecto.Changeset.t()} + def create_session(job, user, content) do + %ChatSession{ + id: Ecto.UUID.generate(), + job_id: job.id, + user_id: user.id, + title: String.slice(content, 0, 40), + messages: [] + } + |> put_expression_and_adaptor(job.body, job.adaptor) + |> save_message(%{role: :user, content: content, user: user}) + end - iex> AiAssistant.new_session(%Lightning.Workflows.Job{ - ...> body: "fn()", - ...> adaptor: "@openfn/language-common@latest" - ...> }) - %Lightning.AiAssistant.Session{ - expression: "fn()", - adaptor: "@openfn/language-common@1.6.2", - history: [] - } - - > ℹ️ The `adaptor` field is resolved to the latest version when `@latest` - > is provided as Apollo expects a specific version. - """ + @spec save_message(ChatSession.t(), %{any() => any()}) :: + {:ok, ChatSession.t()} | {:error, Ecto.Changeset.t()} + def save_message(session, message) do + # we can call the limiter at this point + # note: we should only increment the counter when role is `:assistant` + messages = Enum.map(session.messages, &Map.take(&1, [:id])) + + Multi.new() + |> Multi.put(:message, message) + |> Multi.insert_or_update( + :upsert, + session + |> ChatSession.changeset(%{messages: messages ++ [message]}) + ) + |> Multi.merge(&maybe_increment_msgs_counter/1) + |> Repo.transaction() + |> case do + {:ok, %{upsert: session}} -> + {:ok, session} - @spec new_session(Job.t()) :: Session.t() - def new_session(job) do - Session.new(job) + {:error, _operation, changeset, _changes} -> + {:error, changeset} + end end - @spec push_history(Session.t(), %{String.t() => any()}) :: Session.t() - def push_history(session, message) do - Session.push_history(session, message) + @spec project_has_any_session?(Ecto.UUID.t()) :: boolean() + def project_has_any_session?(project_id) do + query = + from s in ChatSession, + join: j in assoc(s, :job), + join: w in assoc(j, :workflow), + where: w.project_id == ^project_id + + Repo.exists?(query) end @doc """ @@ -112,28 +98,48 @@ defmodule Lightning.AiAssistant do iex> AiAssistant.query(session, "fn()") {:ok, session} """ - @spec query(Session.t(), String.t()) :: {:ok, Session.t()} | :error + @spec query(ChatSession.t(), String.t()) :: + {:ok, ChatSession.t()} + | {:error, Ecto.Changeset.t() | :apollo_unavailable} def query(session, content) do - ApolloClient.query( - content, - %{expression: session.expression, adaptor: session.adaptor}, - session.history - ) - |> case do - {:ok, %Tesla.Env{status: status} = response} when status in 200..299 -> - {:ok, session |> Session.put_history(response.body["history"])} + apollo_resp = + ApolloClient.query( + content, + %{expression: session.expression, adaptor: session.adaptor}, + build_history(session) + ) + + case apollo_resp do + {:ok, %Tesla.Env{status: status, body: body}} when status in 200..299 -> + message = body["history"] |> Enum.reverse() |> hd() + save_message(session, message) _ -> - :error + {:error, :apollo_unavailable} + end + end + + defp build_history(session) do + case Enum.reverse(session.messages) do + [%{role: :user} | other] -> + other + |> Enum.reverse() + |> Enum.map(&Map.take(&1, [:role, :content])) + + messages -> + Enum.map(messages, &Map.take(&1, [:role, :content])) end end @doc """ - Checks if the user is authorized to access the AI assistant. + Checks if the AI assistant is enabled. """ - @spec authorized?(User.t()) :: boolean() - def authorized?(user) do - user.role == :superuser + @spec enabled?() :: boolean() + def enabled? do + endpoint = Lightning.Config.apollo(:endpoint) + api_key = Lightning.Config.apollo(:openai_api_key) + + is_binary(endpoint) && is_binary(api_key) end @doc """ @@ -143,4 +149,23 @@ defmodule Lightning.AiAssistant do def endpoint_available? do ApolloClient.test() == :ok end + + # assistant role sent via async as string + defp maybe_increment_msgs_counter(%{ + upsert: session, + message: %{"role" => "assistant"} + }), + do: + maybe_increment_msgs_counter(%{ + upsert: session, + message: %{role: :assistant} + }) + + defp maybe_increment_msgs_counter(%{ + upsert: session, + message: %{role: :assistant} + }), + do: UsageLimiter.increment_ai_queries(session) + + defp maybe_increment_msgs_counter(_user_role), do: Multi.new() end diff --git a/lib/lightning/ai_assistant/chat_message.ex b/lib/lightning/ai_assistant/chat_message.ex new file mode 100644 index 0000000000..a9e371d6e6 --- /dev/null +++ b/lib/lightning/ai_assistant/chat_message.ex @@ -0,0 +1,58 @@ +defmodule Lightning.AiAssistant.ChatMessage do + @moduledoc false + + use Lightning.Schema + import Ecto.Changeset + import Lightning.Validators, only: [validate_required_assoc: 2] + + @type role() :: :user | :assistant + @type t() :: %__MODULE__{ + id: Ecto.UUID.t(), + content: String.t() | nil, + role: role(), + is_deleted: boolean(), + is_public: boolean() + } + + schema "ai_chat_messages" do + field :content, :string + field :role, Ecto.Enum, values: [:user, :assistant] + field :is_deleted, :boolean, default: false + field :is_public, :boolean, default: true + + belongs_to :chat_session, Lightning.AiAssistant.ChatSession + belongs_to :user, Lightning.Accounts.User + + timestamps() + end + + def changeset(chat_message, attrs) do + chat_message + |> cast(attrs, [ + :content, + :role, + :is_deleted, + :is_public, + :chat_session_id + ]) + |> validate_required([:content, :role]) + |> maybe_put_user_assoc(attrs[:user] || attrs["user"]) + |> maybe_require_user() + end + + defp maybe_put_user_assoc(changeset, user) do + if user do + put_assoc(changeset, :user, user) + else + changeset + end + end + + defp maybe_require_user(changeset) do + if get_field(changeset, :role) == :user do + validate_required_assoc(changeset, :user) + else + changeset + end + end +end diff --git a/lib/lightning/ai_assistant/chat_session.ex b/lib/lightning/ai_assistant/chat_session.ex new file mode 100644 index 0000000000..9ab028d78c --- /dev/null +++ b/lib/lightning/ai_assistant/chat_session.ex @@ -0,0 +1,43 @@ +defmodule Lightning.AiAssistant.ChatSession do + @moduledoc false + + use Lightning.Schema + import Ecto.Changeset + + alias Lightning.Accounts.User + alias Lightning.AiAssistant.ChatMessage + alias Lightning.Workflows.Job + + @type t() :: %__MODULE__{ + id: Ecto.UUID.t(), + job_id: Ecto.UUID.t(), + user_id: Ecto.UUID.t(), + title: String.t(), + expression: String.t() | nil, + adaptor: String.t() | nil, + is_public: boolean(), + is_deleted: boolean(), + messages: [ChatMessage.t(), ...] | [] + } + + schema "ai_chat_sessions" do + field :expression, :string, virtual: true + field :adaptor, :string, virtual: true + field :title, :string + field :is_public, :boolean, default: false + field :is_deleted, :boolean, default: false + belongs_to :job, Job + belongs_to :user, User + + has_many :messages, ChatMessage, preload_order: [asc: :inserted_at] + + timestamps() + end + + def changeset(chat_session, attrs) do + chat_session + |> cast(attrs, [:title, :is_public, :is_deleted, :job_id, :user_id]) + |> validate_required([:title, :job_id, :user_id]) + |> cast_assoc(:messages) + end +end diff --git a/lib/lightning/ai_assistant/limiter.ex b/lib/lightning/ai_assistant/limiter.ex new file mode 100644 index 0000000000..64402aa03b --- /dev/null +++ b/lib/lightning/ai_assistant/limiter.ex @@ -0,0 +1,20 @@ +defmodule Lightning.AiAssistant.Limiter do + @moduledoc """ + The AI assistant limiter to check for AI query quota. + """ + + alias Lightning.Extensions.UsageLimiting + alias Lightning.Extensions.UsageLimiting.Action + alias Lightning.Extensions.UsageLimiting.Context + alias Lightning.Services.UsageLimiter + + @doc """ + Checks if has not reached the limit of the project ai queries quota. + """ + @spec validate_quota(Ecto.UUID.t()) :: :ok | UsageLimiting.error() + def validate_quota(project_id) do + UsageLimiter.limit_action(%Action{type: :ai_query}, %Context{ + project_id: project_id + }) + end +end diff --git a/lib/lightning/extensions/usage_limiter.ex b/lib/lightning/extensions/usage_limiter.ex index dfaac9d193..4bec4824e2 100644 --- a/lib/lightning/extensions/usage_limiter.ex +++ b/lib/lightning/extensions/usage_limiter.ex @@ -10,6 +10,9 @@ defmodule Lightning.Extensions.UsageLimiter do @impl true def limit_action(_action, _context), do: :ok + @impl true + def increment_ai_queries(_session), do: Ecto.Multi.new() + @impl true def get_run_options(context) do [ diff --git a/lib/lightning/extensions/usage_limiting.ex b/lib/lightning/extensions/usage_limiting.ex index f0d9e0da52..84e17d8af6 100644 --- a/lib/lightning/extensions/usage_limiting.ex +++ b/lib/lightning/extensions/usage_limiting.ex @@ -4,6 +4,7 @@ defmodule Lightning.Extensions.UsageLimiting do """ @type error_reason :: :too_many_runs + | :too_many_queries | :runs_hard_limit | :unknown_project @type message :: Lightning.Extensions.Message.t() @@ -15,9 +16,10 @@ defmodule Lightning.Extensions.UsageLimiting do type: :new_run | :activate_workflow - | :new_user + | :ai_query | :alert_failure - | :github_sync, + | :github_sync + | :new_user, amount: pos_integer() } @@ -41,6 +43,9 @@ defmodule Lightning.Extensions.UsageLimiting do @callback limit_action(action :: Action.t(), context :: Context.t()) :: :ok | error() + @callback increment_ai_queries(Lightning.AiAssistant.ChatSession.t()) :: + Ecto.Multi.t() + @callback get_run_options(context :: Context.t()) :: Lightning.Runs.RunOptions.keyword_list() end diff --git a/lib/lightning/services/usage_limiter.ex b/lib/lightning/services/usage_limiter.ex index 6fc3b3e247..0848780155 100644 --- a/lib/lightning/services/usage_limiter.ex +++ b/lib/lightning/services/usage_limiter.ex @@ -16,6 +16,11 @@ defmodule Lightning.Services.UsageLimiter do adapter().limit_action(action, context) end + @impl true + def increment_ai_queries(session) do + adapter().increment_ai_queries(session) + end + @impl true def get_run_options(context) do adapter().get_run_options(context) diff --git a/lib/lightning_web/live/workflow_live/ai_assistant_component.ex b/lib/lightning_web/live/workflow_live/ai_assistant_component.ex index 262777858e..df7abc70b9 100644 --- a/lib/lightning_web/live/workflow_live/ai_assistant_component.ex +++ b/lib/lightning_web/live/workflow_live/ai_assistant_component.ex @@ -1,36 +1,305 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do use LightningWeb, :live_component + alias Lightning.AiAssistant + alias Lightning.AiAssistant.Limiter alias Phoenix.LiveView.AsyncResult + alias Phoenix.LiveView.JS def mount(socket) do {:ok, socket |> assign(%{ + ai_limit_result: nil, pending_message: AsyncResult.ok(nil), + process_message_on_show: false, + all_sessions: AsyncResult.ok([]), session: nil, - form: to_form(%{"content" => nil}) + form: to_form(%{"content" => nil}), + error_message: nil }) |> assign_async(:endpoint_available?, fn -> {:ok, %{endpoint_available?: AiAssistant.endpoint_available?()}} end)} end - def update(%{selected_job: job}, socket) do + def update(%{action: action} = assigns, socket) do {:ok, socket - |> update(:session, fn session -> - if session && job.id == session.id do - session |> AiAssistant.Session.put_expression(job.body) - else - AiAssistant.new_session(job) - end - end)} + |> assign(assigns) + |> maybe_check_limit() + |> apply_action(action, assigns)} + end + + defp apply_action(socket, :new, %{selected_job: job} = assigns) do + socket + |> assign_async(:all_sessions, fn -> + {:ok, %{all_sessions: AiAssistant.list_sessions_for_job(job)}} + end) + |> assign(has_read_disclaimer: assigns.project_has_chat_sessions) + end + + defp apply_action(socket, :show, %{ + selected_job: job, + chat_session_id: chat_session_id + }) do + if socket.assigns.process_message_on_show do + message = hd(socket.assigns.session.messages) + + socket + |> assign(:process_message_on_show, false) + |> process_message(message.content) + else + session = + chat_session_id + |> AiAssistant.get_session!() + |> AiAssistant.put_expression_and_adaptor(job.body, job.adaptor) + + socket + |> assign(:session, session) + |> assign(:process_message_on_show, false) + end end def render(assigns) do + ~H""" +
+ <%= if @action == :new and !@has_read_disclaimer do %> + <.render_onboarding myself={@myself} can_edit_workflow={@can_edit_workflow} /> + <% else %> + <.render_session {assigns} /> + <% end %> +
+ """ + end + + def handle_event("send_message", %{"content" => content}, socket) do + if socket.assigns.can_edit_workflow do + %{action: action} = socket.assigns + # clear error + socket + |> assign(error_message: nil) + |> check_limit() + |> then(fn socket -> + if socket.assigns.ai_limit_result == :ok do + {:noreply, save_message(socket, action, content)} + else + {:noreply, socket} + end + end) + else + {:noreply, + socket + |> assign( + form: to_form(%{"content" => nil}), + error_message: "You are not authorized to use the Ai Assistant" + )} + end + end + + def handle_event("mark_disclaimer_read", _params, socket) do + {:noreply, assign(socket, has_read_disclaimer: true)} + end + + defp save_message(%{assigns: assigns} = socket, :new, content) do + case AiAssistant.create_session( + assigns.selected_job, + assigns.current_user, + content + ) do + {:ok, session} -> + query_params = Map.put(assigns.query_params, "chat", session.id) + + socket + |> assign(:session, session) + |> assign(:process_message_on_show, true) + |> push_patch(to: redirect_url(assigns.base_url, query_params)) + + error -> + assign(socket, error_message: error_message(error)) + end + end + + defp save_message(%{assigns: assigns} = socket, :show, content) do + case AiAssistant.save_message(assigns.session, %{ + "role" => "user", + "content" => content, + "user" => assigns.current_user + }) do + {:ok, session} -> + socket + |> assign(:session, session) + |> process_message(content) + + error -> + assign(socket, error_message: error_message(error)) + end + end + + defp error_message({:error, %Ecto.Changeset{}}) do + "Oops! Could not save message. Please try again." + end + + defp error_message({:error, :apollo_unavailable}) do + "Oops! Could not reach the Ai Server. Please try again later." + end + + defp error_message({:error, _reason, %{text: text_message}}) do + text_message + end + + defp error_message(_error) do + "Oops! Something went wrong. Please try again." + end + + defp process_message(socket, message) do + session = socket.assigns.session + + socket + |> assign(:pending_message, AsyncResult.loading()) + |> start_async( + :process_message, + fn -> AiAssistant.query(session, message) end + ) + end + + defp redirect_url(base_url, query_params) do + query_string = + query_params + |> Enum.reject(fn {_k, v} -> is_nil(v) end) + |> URI.encode_query() + + "#{base_url}?#{query_string}" + end + + def handle_async(:process_message, {:ok, {:ok, session}}, socket) do + {:noreply, + socket + |> assign(:session, session) + |> assign(:pending_message, AsyncResult.ok(nil))} + end + + def handle_async(:process_message, {:ok, error}, socket) do + {:noreply, + socket + |> update(:pending_message, fn async_result -> + AsyncResult.failed(async_result, error) + end)} + end + + def handle_async(:process_message, {:exit, error}, socket) do + {:noreply, + socket + |> update(:pending_message, fn async_result -> + AsyncResult.failed(async_result, {:exit, error}) + end)} + end + + defp render_onboarding(assigns) do + ~H""" +
+
+

+ All models are wrong.
- Joe Clark! +

+

+ + Learn more about AI Assistant + +

+ <.button + id="get-started-with-ai-btn" + phx-click="mark_disclaimer_read" + phx-target={@myself} + disabled={!@can_edit_workflow} + > + Get started with the AI Assistant + +
+ <.render_disclaimer /> +
+ """ + end + + attr :id, :string, default: "ai-assistant-disclaimer" + + defp render_disclaimer(assigns) do + ~H""" + + """ + end + + defp render_session(assigns) do ~H"""
+ <%= case @action do %> + <% :new -> %> + <.render_all_sessions + all_sessions={@all_sessions} + query_params={@query_params} + base_url={@base_url} + /> + <% :show -> %> + <.render_individual_session + session={@session} + pending_message={@pending_message} + query_params={@query_params} + base_url={@base_url} + /> + <% end %> + <.async_result :let={endpoint_available?} assign={@endpoint_available?}> <:loading>
@@ -39,64 +308,30 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do
-
- <%= for message <- @session.history do %> -
- <%= message.content %> -
-
-
-
- <.icon name="hero-cpu-chip" class="" /> -
-
- -
- <%= message.content |> Earmark.as_html!() |> raw() %> -
-
- <% end %> - <.async_result assign={@pending_message}> - <:loading> -
-
-
- <.icon name="hero-sparkles" class="" /> -
-
-
-
- - <:failed> -
-
-
- <.icon name="hero-sparkles" class="" /> -
-
-
- <.icon name="exclamation-triangle" class="text-red" /> - An error occured! Please try again later. -
-
- - -
<.form for={@form} phx-submit="send_message" class="row-span-1 p-2 pt-0" phx-target={@myself} + id="ai-assistant-form" > + <.chat_input form={@form} - disabled={!endpoint_available? or !is_nil(@pending_message.loading)} + disabled={ + !@can_edit_workflow or @ai_limit_result != :ok or !endpoint_available? or + !is_nil(@pending_message.loading) + } + tooltip={disabled_tooltip_message(@can_edit_workflow, @ai_limit_result)} /> @@ -104,55 +339,28 @@ defmodule LightningWeb.WorkflowLive.AiAssistantComponent do """ end - def handle_event("send_message", %{"content" => content}, socket) do - {:noreply, - socket - |> assign(:pending_message, AsyncResult.loading()) - |> assign( - :session, - AiAssistant.push_history(socket.assigns.session, %{ - "role" => "user", - "content" => content - }) - ) - |> start_async( - :process_message, - fn -> - AiAssistant.query(socket.assigns.session, content) - end - )} - end + defp disabled_tooltip_message(can_edit_workflow, ai_limit_result) do + case {can_edit_workflow, ai_limit_result} do + {false, _} -> + "You are not authorized to use the Ai Assistant" - def handle_async(:process_message, {:ok, {:ok, session}}, socket) do - {:noreply, - socket - |> assign(:session, session) - |> assign(:pending_message, AsyncResult.ok(nil))} - end + {_, {:error, _reason, _msg} = error} -> + error_message(error) - def handle_async(:process_message, {:ok, :error}, socket) do - {:noreply, - socket - |> update(:pending_message, fn async_result -> - AsyncResult.failed(async_result, :error) - end)} + _ -> + nil + end end attr :disabled, :boolean + attr :tooltip, :string attr :form, :map, required: true defp chat_input(assigns) do - assigns = - assigns - |> assign( - :errors, - Enum.map( - assigns.form[:content].errors, - &LightningWeb.CoreComponents.translate_error(&1) - ) - ) - ~H""" +
+ Do not paste PPI or sensitive business data +
- <:panel - :if={Lightning.AiAssistant.authorized?(@current_user)} - hash="aichat" - class="h-full" - > -
- <.live_component - module={LightningWeb.WorkflowLive.AiAssistantComponent} - selected_job={@selected_job} - id={"aichat-#{@selected_job.id}"} - /> -
+ <:panel hash="aichat" class="h-full"> + <%= if @ai_assistant_enabled do %> +
+ <.live_component + module={LightningWeb.WorkflowLive.AiAssistantComponent} + can_edit_workflow={@can_edit_workflow} + project_id={@project.id} + project_has_chat_sessions={@project_has_chat_sessions} + current_user={@current_user} + selected_job={@selected_job} + chat_session_id={@chat_session_id} + query_params={@query_params} + base_url={@base_url} + action={if(@chat_session_id, do: :show, else: :new)} + id={"aichat-#{@selected_job.id}"} + /> +
+ <% else %> +
+
+

+ AI Assistant has not been configured for your instance. +
Contact your admin to configure AI Assistant or try it on +
OpenFn cloud for free. +
Try the AI Assistant on + + https://app.openfn.org + +

+
+
+ <% end %> @@ -965,6 +987,9 @@ defmodule LightningWeb.WorkflowLive.Edit do view_only_users_ids: view_only_users_ids, active_menu_item: :overview, expanded_job: nil, + ai_assistant_enabled: AiAssistant.enabled?(), + project_has_chat_sessions: nil, + chat_session_id: nil, follow_run: nil, step: nil, manual_run_form: nil, @@ -974,7 +999,13 @@ defmodule LightningWeb.WorkflowLive.Edit do selected_run: nil, selected_trigger: nil, selection_mode: nil, - query_params: %{"s" => nil, "m" => nil, "a" => nil}, + query_params: %{ + "s" => nil, + "m" => nil, + "a" => nil, + "v" => nil, + "chat" => nil + }, workflow: nil, snapshot: nil, changeset: nil, @@ -1050,6 +1081,10 @@ defmodule LightningWeb.WorkflowLive.Edit do |> push_redirect(to: ~p"/projects/#{socket.assigns.project}/w") end end + |> assign( + project_has_chat_sessions: + AiAssistant.project_has_any_session?(socket.assigns.project.id) + ) end defp track_user_presence(socket) do @@ -1662,7 +1697,9 @@ defmodule LightningWeb.WorkflowLive.Edit do |> maybe_disable_canvas()} end - def handle_info(%{}, socket), do: {:noreply, socket} + def handle_info(%{}, socket) do + {:noreply, socket} + end defp maybe_disable_canvas(socket) do %{ @@ -1954,8 +1991,14 @@ defmodule LightningWeb.WorkflowLive.Edit do |> assign( query_params: params - |> Map.take(["s", "m", "a"]) - |> Enum.into(%{"s" => nil, "m" => nil, "a" => nil}) + |> Map.take(["s", "m", "a", "v", "chat"]) + |> Enum.into(%{ + "s" => nil, + "m" => nil, + "a" => nil, + "v" => nil, + "chat" => nil + }) ) |> apply_query_params() end @@ -1983,6 +2026,18 @@ defmodule LightningWeb.WorkflowLive.Edit do end end |> assign_follow_run(socket.assigns.query_params) + |> assign_chat_session_id(socket.assigns.query_params) + end + + defp assign_chat_session_id(socket, %{"chat" => session_id}) + when is_binary(session_id) do + socket + |> assign(chat_session_id: session_id) + end + + defp assign_chat_session_id(socket, _params) do + socket + |> assign(chat_session_id: nil) end defp switch_changeset(socket) do @@ -2059,7 +2114,8 @@ defmodule LightningWeb.WorkflowLive.Edit do selected_edge: nil, selected_job: nil, selected_trigger: nil, - selection_mode: nil + selection_mode: nil, + chat_session_id: nil ) end @@ -2086,11 +2142,21 @@ defmodule LightningWeb.WorkflowLive.Edit do :triggers -> socket - |> assign(selected_job: nil, selected_trigger: value, selected_edge: nil) + |> assign( + selected_job: nil, + selected_trigger: value, + selected_edge: nil, + chat_session_id: nil + ) :edges -> socket - |> assign(selected_job: nil, selected_trigger: nil, selected_edge: value) + |> assign( + selected_job: nil, + selected_trigger: nil, + selected_edge: value, + chat_session_id: nil + ) end end @@ -2105,6 +2171,7 @@ defmodule LightningWeb.WorkflowLive.Edit do query_params |> Map.put("a", run.id) |> Map.put("v", version) + |> Enum.reject(fn {_k, v} -> is_nil(v) end) socket |> push_patch(to: ~p"/projects/#{project}/w/#{workflow}?#{params}") diff --git a/priv/repo/migrations/20240823072414_create_chat_sessions_tables.exs b/priv/repo/migrations/20240823072414_create_chat_sessions_tables.exs new file mode 100644 index 0000000000..4e3cb2d237 --- /dev/null +++ b/priv/repo/migrations/20240823072414_create_chat_sessions_tables.exs @@ -0,0 +1,31 @@ +defmodule Lightning.Repo.Migrations.CreateChatSessionsTables do + use Ecto.Migration + + def change do + create table(:ai_chat_sessions, primary_key: false) do + add :id, :uuid, primary_key: true + add :title, :string + add :is_public, :boolean + add :is_deleted, :boolean + add :job_id, references(:jobs, type: :binary_id, on_delete: :nilify_all) + add :user_id, references(:users, type: :binary_id, on_delete: :nilify_all) + + timestamps() + end + + create table(:ai_chat_messages, primary_key: false) do + add :id, :uuid, primary_key: true + + add :chat_session_id, + references(:ai_chat_sessions, type: :binary_id, on_delete: :delete_all) + + add :user_id, references(:users, type: :binary_id, on_delete: :nilify_all) + add :content, :text + add :role, :string + add :is_deleted, :boolean + add :is_public, :boolean + + timestamps() + end + end +end diff --git a/test/lightning/ai_assistant/ai_assistant_test.exs b/test/lightning/ai_assistant/ai_assistant_test.exs index 26554cef39..d6d392e459 100644 --- a/test/lightning/ai_assistant/ai_assistant_test.exs +++ b/test/lightning/ai_assistant/ai_assistant_test.exs @@ -1,43 +1,16 @@ defmodule Lightning.AiAssistantTest do - use ExUnit.Case, async: true + use Lightning.DataCase, async: true import Mox alias Lightning.AiAssistant - alias Lightning.AiAssistant.Session - - import Lightning.Factories setup :verify_on_exit! - describe "Session" do - test "creation" do - job = build(:job, body: "fn()", adaptor: "@openfn/language-common@latest") - session = Session.new(job) - - assert session.adaptor == "@openfn/language-common@1.6.2" - assert session.expression == "fn()" - assert session.history == [] - end - - test "put_history" do - job = build(:job, body: "fn()", adaptor: "@openfn/language-common@latest") - - session = - Session.new(job) - |> Session.put_history([%{role: :user, content: "fn()"}]) - - assert session.history == [%{role: :user, content: "fn()"}] - end - - test "put_expression" do - job = build(:job, body: "fn()", adaptor: "@openfn/language-common@latest") - - session = - Session.new(job) - |> Session.put_expression("fn(state => state)") - - assert session.expression == "fn(state => state)" - end + setup do + user = insert(:user) + project = insert(:project, project_users: [%{user: user, role: :owner}]) + workflow = insert(:simple_workflow, project: project) + [user: user, project: project, workflow: workflow] end describe "endpoint_available?" do @@ -75,63 +48,150 @@ defmodule Lightning.AiAssistantTest do end end - test "query" do - session = - AiAssistant.new_session(%Lightning.Workflows.Job{ - body: "fn()", - adaptor: "@openfn/language-common@latest" - }) + describe "query/2" do + test "queries and saves the response", %{ + user: user, + workflow: %{jobs: [job_1 | _]} = _workflow + } do + session = + insert(:chat_session, + user: user, + job: job_1, + messages: [%{role: :user, content: "what?", user: user}] + ) + + Mox.stub(Lightning.MockConfig, :apollo, fn key -> + case key do + :endpoint -> "http://localhost:3000" + :openai_api_key -> "api_key" + end + end) - Mox.stub(Lightning.MockConfig, :apollo, fn key -> - case key do - :endpoint -> "http://localhost:3000" - :openai_api_key -> "api_key" - end - end) + reply = + """ + { + "response": "Based on the provided guide and the API documentation for the OpenFn @openfn/language-common@1.14.0 adaptor, you can create jobs using the functions provided by the API to interact with different data sources and perform various operations.\\n\\nTo create a job using the HTTP adaptor, you can use functions like `get`, `post`, `put`, `patch`, `head`, and `options` to make HTTP requests. Here's an example job code using the HTTP adaptor:\\n\\n```javascript\\nconst { get, post, each, dataValue } = require('@openfn/language-common');\\n\\nexecute(\\n get('/patients'),\\n each('$.data.patients[*]', (item, index) => {\\n item.id = `item-${index}`;\\n }),\\n post('/patients', dataValue('patients'))\\n);\\n```\\n\\nIn this example, the job first fetches patient data using a GET request, then iterates over each patient to modify their ID, and finally posts the modified patient data back.\\n\\nYou can similarly create jobs using the Salesforce adaptor or the ODK adaptor by utilizing functions like `upsert`, `create`, `fields`, `field`, etc., as shown in the provided examples.\\n\\nFeel free to ask if you have any specific questions or need help with", + "history": [ + { "role": "user", "content": "what?" }, + { + "role": "assistant", + "content": "Based on the provided guide and the API documentation for the OpenFn @openfn/language-common@1.14.0 adaptor, you can create jobs using the functions provided by the API to interact with different data sources and perform various operations.\\n\\nTo create a job using the HTTP adaptor, you can use functions like `get`, `post`, `put`, `patch`, `head`, and `options` to make HTTP requests. Here's an example job code using the HTTP adaptor:\\n\\n```javascript\\nconst { get, post, each, dataValue } = require('@openfn/language-common');\\n\\nexecute(\\n get('/patients'),\\n each('$.data.patients[*]', (item, index) => {\\n item.id = `item-${index}`;\\n }),\\n post('/patients', dataValue('patients'))\\n);\\n```\\n\\nIn this example, the job first fetches patient data using a GET request, then iterates over each patient to modify their ID, and finally posts the modified patient data back.\\n\\nYou can similarly create jobs using the Salesforce adaptor or the ODK adaptor by utilizing functions like `upsert`, `create`, `fields`, `field`, etc., as shown in the provided examples.\\n\\nFeel free to ask if you have any specific questions or need help with" + } + ] + } + """ + |> Jason.decode!() + + expect(Lightning.Tesla.Mock, :call, fn %{method: :post, url: url}, _opts -> + assert url =~ "/services/job_chat" + + {:ok, %Tesla.Env{status: 200, body: reply}} + end) - reply = - """ - { - "response": "Based on the provided guide and the API documentation for the OpenFn @openfn/language-common@1.14.0 adaptor, you can create jobs using the functions provided by the API to interact with different data sources and perform various operations.\\n\\nTo create a job using the HTTP adaptor, you can use functions like `get`, `post`, `put`, `patch`, `head`, and `options` to make HTTP requests. Here's an example job code using the HTTP adaptor:\\n\\n```javascript\\nconst { get, post, each, dataValue } = require('@openfn/language-common');\\n\\nexecute(\\n get('/patients'),\\n each('$.data.patients[*]', (item, index) => {\\n item.id = `item-${index}`;\\n }),\\n post('/patients', dataValue('patients'))\\n);\\n```\\n\\nIn this example, the job first fetches patient data using a GET request, then iterates over each patient to modify their ID, and finally posts the modified patient data back.\\n\\nYou can similarly create jobs using the Salesforce adaptor or the ODK adaptor by utilizing functions like `upsert`, `create`, `fields`, `field`, etc., as shown in the provided examples.\\n\\nFeel free to ask if you have any specific questions or need help with", - "history": [ - { "role": "user", "content": "what?" }, - { - "role": "assistant", - "content": "Based on the provided guide and the API documentation for the OpenFn @openfn/language-common@1.14.0 adaptor, you can create jobs using the functions provided by the API to interact with different data sources and perform various operations.\\n\\nTo create a job using the HTTP adaptor, you can use functions like `get`, `post`, `put`, `patch`, `head`, and `options` to make HTTP requests. Here's an example job code using the HTTP adaptor:\\n\\n```javascript\\nconst { get, post, each, dataValue } = require('@openfn/language-common');\\n\\nexecute(\\n get('/patients'),\\n each('$.data.patients[*]', (item, index) => {\\n item.id = `item-${index}`;\\n }),\\n post('/patients', dataValue('patients'))\\n);\\n```\\n\\nIn this example, the job first fetches patient data using a GET request, then iterates over each patient to modify their ID, and finally posts the modified patient data back.\\n\\nYou can similarly create jobs using the Salesforce adaptor or the ODK adaptor by utilizing functions like `upsert`, `create`, `fields`, `field`, etc., as shown in the provided examples.\\n\\nFeel free to ask if you have any specific questions or need help with" - } - ] - } - """ - |> Jason.decode!() + {:ok, updated_session} = AiAssistant.query(session, "foo") - expect(Lightning.Tesla.Mock, :call, fn %{method: :post, url: url}, _opts -> - assert url =~ "/services/job_chat" + assert Enum.count(updated_session.messages) == Enum.count(reply["history"]) - {:ok, %Tesla.Env{status: 200, body: reply}} - end) + reply_message = List.last(reply["history"]) + saved_message = List.last(updated_session.messages) - {:ok, session} = AiAssistant.query(session, "foo") + assert reply_message["content"] == saved_message.content + assert reply_message["role"] == to_string(saved_message.role) - assert session.history == - reply["history"] - |> Enum.map(fn h -> %{role: h["role"], content: h["content"]} end) + assert Lightning.Repo.reload!(saved_message) + end + end + + describe "list_sessions_for_job/1" do + test "lists the sessions in descending order of time updated", %{ + user: user, + workflow: %{jobs: [job_1 | _]} = _workflow + } do + session_1 = + insert(:chat_session, + user: user, + job: job_1, + updated_at: DateTime.utc_now() |> DateTime.add(-5) + ) + + session_2 = insert(:chat_session, user: user, job: job_1) + + assert [list_session_1, list_session_2] = + AiAssistant.list_sessions_for_job(job_1) + + assert list_session_1.id == session_2.id + assert list_session_2.id == session_1.id + + assert is_struct(list_session_1.user), + "user who created the session is preloaded" + end end - test "authorized?" do - assert AiAssistant.authorized?(%Lightning.Accounts.User{role: :superuser}) - refute AiAssistant.authorized?(%Lightning.Accounts.User{role: :user}) + describe "create_session/3" do + test "creates a new session", %{ + user: user, + workflow: %{jobs: [job_1 | _]} = _workflow + } do + assert {:ok, session} = AiAssistant.create_session(job_1, user, "foo") + + assert session.job_id == job_1.id + assert session.user_id == user.id + assert session.expression == job_1.body + + assert session.adaptor == + Lightning.AdaptorRegistry.resolve_adaptor(job_1.adaptor) + + assert match?( + [%{role: :user, content: "foo", user: ^user}], + session.messages + ) + end end - test "new_session" do - job = %Lightning.Workflows.Job{ - body: "fn()", - adaptor: "@openfn/language-common@latest" - } + describe "save_message/2" do + test "calls limiter to increment ai queries when role is assistant" do + user = insert(:user) - session = AiAssistant.new_session(job) + %{id: job_id} = job = insert(:job, workflow: build(:workflow)) - assert session.adaptor == "@openfn/language-common@1.6.2" - assert session.expression == "fn()" - assert session.history == [] + session = insert(:chat_session, job: job, user: user) + + Mox.expect( + Lightning.Extensions.MockUsageLimiter, + :increment_ai_queries, + 1, + fn %{job_id: ^job_id} -> Ecto.Multi.new() end + ) + + content1 = """ + I am an assistant and I am here to help you with your questions. + """ + + AiAssistant.save_message(session, %{ + role: :assistant, + content: content1, + user: user + }) + end + + test "does not call limiter to increment ai queries when role is user" do + user = insert(:user) + + %{id: job_id} = job = insert(:job, workflow: build(:workflow)) + session = insert(:chat_session, job: job, user: user) + + Mox.expect( + Lightning.Extensions.MockUsageLimiter, + :increment_ai_queries, + 0, + fn %{job_id: ^job_id} -> Ecto.Multi.new() end + ) + + AiAssistant.save_message(session, %{ + role: :user, + content: "What if I want to deduplicate the headers?", + user: user + }) + end end end diff --git a/test/lightning/ai_assistant/limiter_test.exs b/test/lightning/ai_assistant/limiter_test.exs new file mode 100644 index 0000000000..f7fb2415c5 --- /dev/null +++ b/test/lightning/ai_assistant/limiter_test.exs @@ -0,0 +1,49 @@ +defmodule Lightning.AiAssistant.LimiterTest do + use ExUnit.Case, async: true + + import Mox + + alias Lightning.AiAssistant.Limiter + + setup :verify_on_exit! + + describe "validate_quota" do + test "return ok when limit is not reached" do + project_id = Ecto.UUID.generate() + + stub(Lightning.Extensions.MockUsageLimiter, :limit_action, fn %{ + type: + :ai_query + }, + %{ + project_id: + ^project_id + } -> + :ok + end) + + assert :ok == Limiter.validate_quota(project_id) + end + + test "return limiter error when limit is reached" do + limiter_error = + {:error, :too_many_queries, + %Lightning.Extensions.Message{text: "Too many queries"}} + + project_id = Ecto.UUID.generate() + + stub(Lightning.Extensions.MockUsageLimiter, :limit_action, fn %{ + type: + :ai_query + }, + %{ + project_id: + ^project_id + } -> + limiter_error + end) + + assert limiter_error == Limiter.validate_quota(project_id) + end + end +end diff --git a/test/lightning/extensions/usage_limiter_test.exs b/test/lightning/extensions/usage_limiter_test.exs index b47d3426e8..7ebf7a891f 100644 --- a/test/lightning/extensions/usage_limiter_test.exs +++ b/test/lightning/extensions/usage_limiter_test.exs @@ -1,6 +1,8 @@ defmodule Lightning.Extensions.UsageLimiterTest do use ExUnit.Case, async: true + import Lightning.Factories + alias Lightning.Extensions.UsageLimiting.Action alias Lightning.Extensions.UsageLimiting.Context alias Lightning.Extensions.UsageLimiter @@ -30,4 +32,11 @@ defmodule Lightning.Extensions.UsageLimiterTest do end) end end + + describe "increment_ai_queries/2" do + test "returns no change" do + assert Ecto.Multi.new() == + UsageLimiter.increment_ai_queries(build(:chat_session)) + end + end end diff --git a/test/lightning_web/live/workflow_live/edit_test.exs b/test/lightning_web/live/workflow_live/edit_test.exs index b8159461fa..89256dfe8d 100644 --- a/test/lightning_web/live/workflow_live/edit_test.exs +++ b/test/lightning_web/live/workflow_live/edit_test.exs @@ -1371,4 +1371,502 @@ defmodule LightningWeb.WorkflowLive.EditTest do assert html =~ error_msg end end + + describe "AI Assistant:" do + setup :create_workflow + + test "correct information is displayed when the assistant is not configured", + %{ + conn: conn, + project: project, + workflow: %{jobs: [job_1 | _]} = workflow + } do + # when not configured properly + Mox.stub(Lightning.MockConfig, :apollo, fn + :endpoint -> nil + :openai_api_key -> "openai_api_key" + end) + + {:ok, view, _html} = + live( + conn, + ~p"/projects/#{project.id}/w/#{workflow.id}?#{[v: workflow.lock_version, s: job_1.id, m: "expand"]}" + ) + + render_async(view) + refute has_element?(view, "#aichat-#{job_1.id}") + + assert render(view) =~ + "AI Assistant has not been configured for your instance" + + # when configured properly + Mox.stub(Lightning.MockConfig, :apollo, fn + :endpoint -> "http://localhost:4001" + :openai_api_key -> "openai_api_key" + end) + + {:ok, view, _html} = + live( + conn, + ~p"/projects/#{project.id}/w/#{workflow.id}?#{[v: workflow.lock_version, s: job_1.id, m: "expand"]}" + ) + + render_async(view) + assert has_element?(view, "#aichat-#{job_1.id}") + + refute render(view) =~ + "AI Assistant has not been configured for your instance" + end + + test "onboarding ui is displayed when no session exists for the project", %{ + conn: conn, + project: project, + user: user, + workflow: %{jobs: [job_1, job_2 | _]} = workflow + } do + Mox.stub(Lightning.MockConfig, :apollo, fn + :endpoint -> "http://localhost:4001" + :openai_api_key -> "openai_api_key" + end) + + # when no session exists + {:ok, view, _html} = + live( + conn, + ~p"/projects/#{project.id}/w/#{workflow.id}?#{[v: workflow.lock_version, s: job_1.id, m: "expand"]}" + ) + + render_async(view) + + html = view |> element("#aichat-#{job_1.id}") |> render() + assert html =~ "Get started with the AI Assistant" + assert html =~ "Learn more about AI Assistant" + refute has_element?(view, "#ai-assistant-form") + + # let's try clicking the get started button + view |> element("#get-started-with-ai-btn") |> render_click() + html = view |> element("#aichat-#{job_1.id}") |> render() + refute html =~ "Get started with the AI Assistant" + refute html =~ "Learn more about AI Assistant" + assert has_element?(view, "#ai-assistant-form") + + # when a session exists + # notice I'm using another job for the session. + # This is because the onboarding is shown once per project and not per job + insert(:chat_session, user: user, job: job_2) + + {:ok, view, _html} = + live( + conn, + ~p"/projects/#{project.id}/w/#{workflow.id}?#{[v: workflow.lock_version, s: job_1.id, m: "expand"]}" + ) + + render_async(view) + + html = view |> element("#aichat-#{job_1.id}") |> render() + refute html =~ "Get started with the AI Assistant" + refute html =~ "Learn more about AI Assistant" + + assert has_element?(view, "#ai-assistant-form") + end + + test "authorized users can send a message", %{ + conn: conn, + project: project, + user: user, + workflow: %{jobs: [job_1 | _]} = workflow + } do + apollo_endpoint = "http://localhost:4001" + + Mox.stub(Lightning.MockConfig, :apollo, fn + :endpoint -> apollo_endpoint + :openai_api_key -> "openai_api_key" + end) + + Mox.stub( + Lightning.Tesla.Mock, + :call, + fn + %{method: :get, url: ^apollo_endpoint <> "/"}, _opts -> + {:ok, %Tesla.Env{status: 200}} + + %{method: :post}, _opts -> + {:ok, + %Tesla.Env{ + status: 200, + body: %{ + "history" => [%{"role" => "assistant", "content" => "Hello!"}] + } + }} + end + ) + + # insert session so that the onboarding flow is not displayed + insert(:chat_session, user: user, job: job_1) + + for {conn, _user} <- + setup_project_users(conn, project, [:owner, :admin, :editor]) do + {:ok, view, _html} = + live( + conn, + ~p"/projects/#{project.id}/w/#{workflow.id}?#{[v: workflow.lock_version, s: job_1.id, m: "expand"]}" + ) + + render_async(view) + + assert view + |> form("#ai-assistant-form") + |> has_element?() + + input_element = element(view, "#ai-assistant-form textarea") + submit_btn = element(view, "#ai-assistant-form-submit-btn") + + assert has_element?(input_element) + refute render(input_element) =~ "disabled=\"disabled\"" + assert has_element?(submit_btn) + refute render(submit_btn) =~ "disabled=\"disabled\"" + + # try submitting a message + html = + view + |> form("#ai-assistant-form") + |> render_submit(%{content: "Hello"}) + + refute html =~ "You are not authorized to use the Ai Assistant" + + assert_patch(view) + end + + for {conn, _user} <- setup_project_users(conn, project, [:viewer]) do + {:ok, view, _html} = + live( + conn, + ~p"/projects/#{project.id}/w/#{workflow.id}?#{[v: workflow.lock_version, s: job_1.id, m: "expand"]}" + ) + + render_async(view) + + assert view + |> form("#ai-assistant-form") + |> has_element?() + + input_element = element(view, "#ai-assistant-form textarea") + submit_btn = element(view, "#ai-assistant-form-submit-btn") + + assert has_element?(input_element) + assert render(input_element) =~ "disabled=\"disabled\"" + assert has_element?(submit_btn) + assert render(submit_btn) =~ "disabled=\"disabled\"" + + # try submitting a message + html = + view + |> form("#ai-assistant-form") + |> render_submit(%{content: "Hello"}) + + assert html =~ "You are not authorized to use the Ai Assistant" + end + end + + test "users can start a new session", %{ + conn: conn, + project: project, + workflow: %{jobs: [job_1 | _]} = workflow, + test: test + } do + apollo_endpoint = "http://localhost:4001" + + Mox.stub(Lightning.MockConfig, :apollo, fn + :endpoint -> apollo_endpoint + :openai_api_key -> "openai_api_key" + end) + + Mox.stub( + Lightning.Tesla.Mock, + :call, + fn + %{method: :get, url: ^apollo_endpoint <> "/"}, _opts -> + {:ok, %Tesla.Env{status: 200}} + + %{method: :post}, _opts -> + # delay the response to simulate a long running request + # I'm doing this to test the pending assistant resp message + test |> to_string() |> Lightning.subscribe() + + receive do + :return_resp -> + {:ok, + %Tesla.Env{ + status: 200, + body: %{ + "history" => [ + %{"role" => "user", "content" => "Ping"}, + %{"role" => "assistant", "content" => "Pong"} + ] + } + }} + end + end + ) + + {:ok, view, _html} = + live( + conn, + ~p"/projects/#{project.id}/w/#{workflow.id}?#{[v: workflow.lock_version, s: job_1.id, m: "expand"]}" + ) + + render_async(view) + + # click the get started button + view |> element("#get-started-with-ai-btn") |> render_click() + + # submit a message + view + |> form("#ai-assistant-form") + |> render_submit(%{content: "Ping"}) + + assert_patch(view) + + # pending message is shown + assert has_element?(view, "#assistant-pending-message") + refute render(view) =~ "Pong" + + # return the response + test |> to_string() |> Lightning.broadcast(:return_resp) + html = render_async(view) + + # pending message is not shown + refute has_element?(view, "#assistant-pending-message") + assert html =~ "Pong" + end + + test "users can resume a session", %{ + conn: conn, + project: project, + user: user, + workflow: %{jobs: [job_1 | _]} = workflow + } do + apollo_endpoint = "http://localhost:4001" + + Mox.stub(Lightning.MockConfig, :apollo, fn + :endpoint -> apollo_endpoint + :openai_api_key -> "openai_api_key" + end) + + expected_question = "Can you help me with this?" + expected_answer = "No, I am a robot" + + Mox.stub( + Lightning.Tesla.Mock, + :call, + fn + %{method: :get, url: ^apollo_endpoint <> "/"}, _opts -> + {:ok, %Tesla.Env{status: 200}} + + %{method: :post}, _opts -> + {:ok, + %Tesla.Env{ + status: 200, + body: %{ + "history" => [ + %{"role" => "user", "content" => "Ping"}, + %{"role" => "assistant", "content" => "Pong"}, + %{"role" => "user", "content" => expected_question}, + %{"role" => "assistant", "content" => expected_answer} + ] + } + }} + end + ) + + session = + insert(:chat_session, + user: user, + job: job_1, + messages: [ + %{role: :user, content: "Ping", user: user}, + %{role: :assistant, content: "Pong"} + ] + ) + + {:ok, view, _html} = + live( + conn, + ~p"/projects/#{project.id}/w/#{workflow.id}?#{[v: workflow.lock_version, s: job_1.id, m: "expand"]}" + ) + + html = render_async(view) + + assert html =~ session.title + + # click the link to open the session + view |> element("#session-#{session.id}") |> render_click() + + assert_patch(view) + + # submit a message + html = + view + |> form("#ai-assistant-form") + |> render_submit(%{content: expected_question}) + + # answer is not yet shown + refute html =~ expected_answer + + html = render_async(view) + + # answer is now displayed + assert html =~ expected_answer + end + + test "an error is displayed incase the assistant does not return 200", %{ + conn: conn, + project: project, + workflow: %{jobs: [job_1 | _]} = workflow + } do + apollo_endpoint = "http://localhost:4001" + + Mox.stub(Lightning.MockConfig, :apollo, fn + :endpoint -> apollo_endpoint + :openai_api_key -> "openai_api_key" + end) + + Mox.stub( + Lightning.Tesla.Mock, + :call, + fn + %{method: :get, url: ^apollo_endpoint <> "/"}, _opts -> + {:ok, %Tesla.Env{status: 200}} + + %{method: :post}, _opts -> + {:ok, %Tesla.Env{status: 400}} + end + ) + + {:ok, view, _html} = + live( + conn, + ~p"/projects/#{project.id}/w/#{workflow.id}?#{[v: workflow.lock_version, s: job_1.id, m: "expand"]}" + ) + + render_async(view) + + # click the get started button + view |> element("#get-started-with-ai-btn") |> render_click() + + # submit a message + view + |> form("#ai-assistant-form") + |> render_submit(%{content: "Ping"}) + + assert_patch(view) + + render_async(view) + + # pending message is not shown + assert has_element?(view, "#assistant-failed-message") + + assert view |> element("#assistant-failed-message") |> render() =~ + "Oops! Could not reach the Ai Server. Please try again later." + end + + test "an error is displayed incase the assistant query process crashes", %{ + conn: conn, + project: project, + workflow: %{jobs: [job_1 | _]} = workflow + } do + apollo_endpoint = "http://localhost:4001" + + Mox.stub(Lightning.MockConfig, :apollo, fn + :endpoint -> apollo_endpoint + :openai_api_key -> "openai_api_key" + end) + + Mox.stub( + Lightning.Tesla.Mock, + :call, + fn + %{method: :get, url: ^apollo_endpoint <> "/"}, _opts -> + {:ok, %Tesla.Env{status: 200}} + + %{method: :post}, _opts -> + raise "oops" + end + ) + + {:ok, view, _html} = + live( + conn, + ~p"/projects/#{project.id}/w/#{workflow.id}?#{[v: workflow.lock_version, s: job_1.id, m: "expand"]}" + ) + + render_async(view) + + # click the get started button + view |> element("#get-started-with-ai-btn") |> render_click() + + # submit a message + view + |> form("#ai-assistant-form") + |> render_submit(%{content: "Ping"}) + + assert_patch(view) + + render_async(view) + + # pending message is not shown + assert has_element?(view, "#assistant-failed-message") + + assert view |> element("#assistant-failed-message") |> render() =~ + "Oops! Something went wrong. Please try again." + end + + test "shows a flash error when limit has reached", %{ + conn: conn, + project: %{id: project_id} = project, + workflow: %{jobs: [job_1 | _]} = workflow + } do + [{conn, _user}] = setup_project_users(conn, project, [:owner]) + + Mox.stub(Lightning.MockConfig, :apollo, fn + :endpoint -> "http://localhost:4001/health_check" + :openai_api_key -> "openai_api_key" + end) + + error_message = "You have reached your quota of AI queries" + + Mox.stub(Lightning.Extensions.MockUsageLimiter, :limit_action, fn %{ + type: + :ai_query + }, + %{ + project_id: + ^project_id + } -> + {:error, :too_many_queries, + %Lightning.Extensions.Message{text: error_message}} + end) + + {:ok, view, _html} = + live( + conn, + ~p"/projects/#{project.id}/w/#{workflow.id}?#{[v: workflow.lock_version, s: job_1.id, m: "expand"]}" + ) + + render_async(view) + + # click the get started button + view |> element("#get-started-with-ai-btn") |> render_click() + + assert has_element?(view, "#ai-assistant-error", error_message) + assert render(view) =~ "aria-label=\"#{error_message}\"" + + # submiting a message shows the flash + view + |> form("#ai-assistant-form") + |> render_submit(%{content: "Ping"}) + + assert has_element?(view, "#ai-assistant-error", error_message) + end + end end diff --git a/test/support/factories.ex b/test/support/factories.ex index 7bedd6594d..2fa524a33f 100644 --- a/test/support/factories.ex +++ b/test/support/factories.ex @@ -301,6 +301,23 @@ defmodule Lightning.Factories do } end + def chat_session_factory do + %Lightning.AiAssistant.ChatSession{ + id: fn -> Ecto.UUID.generate() end, + expression: "fn()", + adaptor: "@openfn/language-common@latest", + title: "Some session title", + messages: [] + } + end + + def chat_message_factory do + %Lightning.AiAssistant.ChatMessage{ + content: "Hello, world!", + role: :user + } + end + # ---------------------------------------------------------------------------- # Helpers # ---------------------------------------------------------------------------- diff --git a/test/support/stub_usage_limiter.ex b/test/support/stub_usage_limiter.ex index 484b465db2..9d62d36fb9 100644 --- a/test/support/stub_usage_limiter.ex +++ b/test/support/stub_usage_limiter.ex @@ -32,6 +32,9 @@ defmodule Lightning.Extensions.StubUsageLimiter do {:error, :too_many_runs, %Message{text: "Runs limit exceeded"}} end + @impl true + def increment_ai_queries(multi), do: multi + @impl true def get_run_options(_context), do: [run_timeout_ms: Config.default_max_run_duration()]