Skip to content

Commit

Permalink
Prevent simultaneous edits to a workflow (#2241)
Browse files Browse the repository at this point in the history
* Detect presence of other users in the workflow edit page for a specific workflow, and compute their priorities depending on the time their joined

* Display banners and toggle edit/read mode

* Refactor code and handle UI glitches

* Test the whole feature

* More tests

* More tests

* More tests

* Update CHANGELOG.md

* Disable Canvas

* Refactor code for disabling canvas when in snapshot mode and / or in low priority mode

* Remove few console.log

* Fix type check error

* Fix failing tests

* Viewers are not promoted to high priority

* Switch to latest version when promoting user to high priority, but handle the case of a real snapshot

* The banner is pushing down the canvas which then pushes the zoom panel under the viewport

* Test events in low priority mode, and fix regression / bug on deleting an edge

* Fix infinite loop on reloading workflow

* Restore button disabled

* Handle code review change requests

* Change flash messages language, less technical

* Fix disabled regression after rebase

* Repo.exists?/1 doesn't need limit

* Constrain Workflow lock_version check to given workflow

* fix up language

* Slight improvement on test consistancy

---------

Co-authored-by: Stuart Corbishley <[email protected]>
Co-authored-by: Taylor Downs <[email protected]>
  • Loading branch information
3 people authored Jul 10, 2024
1 parent 86f3337 commit 7db778c
Show file tree
Hide file tree
Showing 19 changed files with 1,065 additions and 93 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ and this project adheres to

### Added

- Handle Simultaneous Editors Working In Same Workflow
[#1949](https://github.com/OpenFn/lightning/issues/1949)

### Changed

### Fixed
Expand Down
1 change: 1 addition & 0 deletions assets/js/workflow-diagram/WorkflowDiagram.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export default React.forwardRef<HTMLElement, WorkflowDiagramProps>(
jobs: state.jobs,
triggers: state.triggers,
edges: state.edges,
disabled: state.disabled,
}),
shallow
);
Expand Down
1 change: 0 additions & 1 deletion assets/js/workflow-diagram/components/ErrorMessage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import React from 'react';
import { ExclamationCircleIcon } from '@heroicons/react/24/outline';

const ErrorMessage: React.FC<React.ComponentProps<any>> = ({ children }) => {
console.log(children);
return (
<p className="line-clamp-2 align-left text-xs text-red-500 flex items-center">
<ExclamationCircleIcon className="mr-1 w-5" />
Expand Down
9 changes: 4 additions & 5 deletions assets/js/workflow-diagram/nodes/Job.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ const JobNode = ({
...props
}: NodeProps<NodeData>) => {
const toolbar = () => [
props.data?.allowPlaceholder &&
!props.data?.disabled && [
<PlusButton key="+step" />,
<PathButton key="+path" />,
],
props.data?.allowPlaceholder && [
<PlusButton key="+step" />,
<PathButton key="+path" />,
],
];

const adaptorIconsData = useAdaptorIcons();
Expand Down
3 changes: 1 addition & 2 deletions assets/js/workflow-diagram/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ export namespace Lightning {
id: string;
name: string;
workflow_id: string;
disabled: boolean;

// Not technically from Lightning, but we'll infer this and scribble it
placeholder?: boolean;
Expand Down Expand Up @@ -49,7 +48,6 @@ export namespace Lightning {
error_path?: boolean;
errors: any;
condition_label?: string;
disabled: boolean;
}

export type Workflow = {
Expand All @@ -58,6 +56,7 @@ export namespace Lightning {
triggers: TriggerNode[];
jobs: JobNode[];
edges: Edge[];
disabled: boolean;
};
}

Expand Down
5 changes: 3 additions & 2 deletions assets/js/workflow-diagram/util/from-workflow.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ const fromWorkflow = (
placeholders: Flow.Model = { nodes: [], edges: [] },
selectedId: string | null
): Flow.Model => {
const allowPlaceholder = placeholders.nodes.length === 0;
const allowPlaceholder =
placeholders.nodes.length === 0 && !workflow.disabled;

const process = (
items: Array<Lightning.Node | Lightning.Edge>,
Expand Down Expand Up @@ -135,7 +136,7 @@ const fromWorkflow = (

const sortedEdges = edges.sort(sortOrderForSvg);

return { nodes, edges: sortedEdges };
return { nodes, edges: sortedEdges, disabled: workflow.disabled };
};

export default fromWorkflow;
4 changes: 4 additions & 0 deletions assets/js/workflow-editor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ export default {
this.workflowStore.getState().applyPatches(response.patches);
});

this.handleEvent('set-disabled', (response: { disabled: boolean }) => {
this.workflowStore.getState().setDisabled(response.disabled);
});

this.handleEvent<{ href: string; patch: boolean }>('navigate', e => {
const id = new URL(window.location.href).searchParams.get('s');

Expand Down
9 changes: 9 additions & 0 deletions assets/js/workflow-editor/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export type WorkflowProps = {
triggers: Lightning.TriggerNode[];
jobs: Lightning.JobNode[];
edges: Lightning.Edge[];
disabled: boolean;
};

export interface WorkflowState extends WorkflowProps {
Expand All @@ -36,6 +37,7 @@ export interface WorkflowState extends WorkflowProps {
) => T | undefined;
onChange: (pendingAction: PendingAction) => void;
applyPatches: (patches: Patch[]) => void;
setDisabled: (value: boolean) => void;
}

// Immer's Patch type has an array of strings for the path, but RFC 6902
Expand Down Expand Up @@ -74,6 +76,7 @@ export const createWorkflowStore = (
triggers: [],
jobs: [],
edges: [],
disabled: false,
};

// Calculate the next state using Immer, and then call the onChange callback
Expand Down Expand Up @@ -216,6 +219,12 @@ export const createWorkflowStore = (
set(state => applyPatches(state, immerPatches));
},
onChange,
setDisabled: (value: boolean) => {
set(state => ({
...state,
disabled: value,
}));
},
}));
};

Expand Down
1 change: 1 addition & 0 deletions lib/lightning/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ defmodule Lightning.Application do
auth_providers_cache_childspec,
# Start the Endpoint (http/https)
LightningWeb.Endpoint,
Lightning.Workflows.Presence,
adaptor_registry_childspec,
adaptor_service_childspec,
{Lightning.TaskWorker, name: :cli_task_worker},
Expand Down
5 changes: 5 additions & 0 deletions lib/lightning/workflows.ex
Original file line number Diff line number Diff line change
Expand Up @@ -405,4 +405,9 @@ defmodule Lightning.Workflows do
def jobs_ordered_subquery do
from(j in Job, order_by: [asc: j.inserted_at])
end

def has_newer_version?(%Workflow{lock_version: version, id: id}) do
from(w in Workflow, where: w.lock_version > ^version and w.id == ^id)
|> Repo.exists?()
end
end
194 changes: 194 additions & 0 deletions lib/lightning/workflows/presence.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
defmodule Lightning.Workflows.Presence do
@moduledoc """
Handles user presence tracking within the Workflow canvas page.
This module leverages Phoenix.Presence to track user sessions, manage user priorities,
and list active presences on specified topics.
"""
use Phoenix.Presence,
otp_app: :lightning,
pubsub_server: Lightning.PubSub

alias LightningWeb.Endpoint

defstruct user: nil, joined_at: nil, active_sessions: 0

@doc """
Creates a new `UserPresence` struct.
## Parameters
- `user`: The user data to be included in the presence.
- `joined_at`: The timestamp when the user joined, in microseconds.
- `active_sessions`: The number of active sessions for the user (default is 0).
## Examples
iex> Lightning.Workflows.Presence.new_user_presence(%User{id: 1}, 1625597762000000)
%Lightning.Workflows.Presence{
user: %User{id: 1},
joined_at: 1625597762000000,
active_sessions: 0
}
"""
def new_user_presence(user, joined_at, active_sessions \\ 0) do
%__MODULE__{
user: user,
joined_at: joined_at,
active_sessions: active_sessions
}
end

@doc """
Tracks the presence of a user on a given topic.
## Parameters
- `user`: The user to be tracked.
- `topic`: The topic to track the user on.
- `pid`: The process identifier for the user's session.
## Examples
iex> Lightning.Workflows.Presence.track_user_presence(%User{id: 1}, "room:lobby", self())
:ok
"""
def track_user_presence(user, topic, pid) do
joined_at = System.system_time(:microsecond)

track(pid, topic, user.id, %{
user: user,
joined_at: joined_at
})

Endpoint.subscribe(topic)
end

@doc """
Lists all presences for a given topic.
## Parameters
- `topic`: The topic to list the presences for.
## Examples
iex> Lightning.Workflows.Presence.list_presences("workflow:canvas")
[%Lightning.Workflows.Presence{user: %User{id: 1}, ...}, ...]
"""
def list_presences(topic) do
topic
|> list_presences_by_topic()
|> group_presences_by_user()
|> extract_presences()
end

@doc """
Builds a summary of presences with details about the current user's presence, promotable presences,
and edit priority.
## Parameters
- `presences` (list): A list of presence records, each containing user information and a joined_at timestamp.
- `params` (map): A map containing the following keys:
- `:current_user_presence` - The presence record for the current user.
- `:current_user` - The current user record.
- `:view_only_users_ids` - A list of user IDs who have view-only permissions.
## Returns
- `map`: A map containing the following keys:
- `:presences` - The sorted list of all presences.
- `:prior_user_presence` - The presence record with edit priority.
- `:current_user_presence` - The presence record for the current user.
- `:has_presence_edit_priority` - A boolean indicating if the current user has edit priority.
## Examples
iex> presences = [
...> %{user: %{id: 1}, joined_at: ~N[2024-07-03 12:00:00], active_sessions: 1},
...> %{user: %{id: 2}, joined_at: ~N[2024-07-03 12:05:00], active_sessions: 1},
...> %{user: %{id: 3}, joined_at: ~N[2024-07-03 12:10:00], active_sessions: 1}
...> ]
iex> params = %{
...> current_user_presence: %{user: %{id: 1}, joined_at: ~N[2024-07-03 12:00:00], active_sessions: 1},
...> current_user: %{id: 1},
...> view_only_users_ids: [2]
...> }
iex> build_presences_summary(presences, params)
%{
presences: [
%{user: %{id: 1}, joined_at: ~N[2024-07-03 12:00:00], active_sessions: 1},
%{user: %{id: 2}, joined_at: ~N[2024-07-03 12:05:00], active_sessions: 1},
%{user: %{id: 3}, joined_at: ~N[2024-07-03 12:10:00], active_sessions: 1}
],
prior_user_presence: %{user: %{id: 3}, joined_at: ~N[2024-07-03 12:10:00], active_sessions: 1},
current_user_presence: %{user: %{id: 1}, joined_at: ~N[2024-07-03 12:00:00], active_sessions: 1},
has_presence_edit_priority: true
}
"""
def build_presences_summary(presences, params) do
%{
current_user_presence: current_user_presence,
current_user: current_user,
view_only_users_ids: view_only_users_ids
} = params

presences = Enum.sort_by(presences, & &1.joined_at)

current_user_presence =
Enum.find(presences, current_user_presence, fn presence ->
presence.user.id == current_user.id
end)

presences_promotable =
Enum.reject(presences, fn presence ->
presence.user.id in view_only_users_ids
end)

prior_user_presence =
if length(presences_promotable) > 0 do
List.first(presences_promotable)
else
current_user_presence
end

has_presence_edit_priority =
current_user_presence.user.id == prior_user_presence.user.id &&
current_user_presence.active_sessions <= 1

%{
presences: presences,
prior_user_presence: prior_user_presence,
current_user_presence: current_user_presence,
has_presence_edit_priority: has_presence_edit_priority
}
end

defp list_presences_by_topic(topic) do
list(topic)
|> Enum.flat_map(fn {_user_id, %{metas: metas}} -> metas end)
end

defp group_presences_by_user(presences) do
Enum.group_by(presences, & &1.user.id)
end

defp extract_presences(grouped_presences) do
grouped_presences
|> Enum.map(fn {_id, group} ->
active_sessions = length(group)
presence = List.first(group)

new_user_presence(
presence.user,
presence.joined_at,
active_sessions
)
end)
end
end
Loading

0 comments on commit 7db778c

Please sign in to comment.