Skip to content

Commit

Permalink
Add suspend/resume workflow to netcore
Browse files Browse the repository at this point in the history
  • Loading branch information
aelassas committed Feb 11, 2024
1 parent f1c58cb commit 11b983b
Show file tree
Hide file tree
Showing 100 changed files with 363 additions and 47 deletions.
5 changes: 5 additions & 0 deletions src/netcore/Wexflow.Core/Task.cs
Original file line number Diff line number Diff line change
Expand Up @@ -447,5 +447,10 @@ public void ErrorFormat(string msg, Exception e, params object[] args)
Logs.Add($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff", CultureInfo.InvariantCulture)} ERROR - {message}\r\n{e}");
}
}

/// <summary>
/// This method is necessary for suspend/resume the current task.
/// </summary>
public void WaitOne() => Workflow.WaitOne();
}
}
23 changes: 17 additions & 6 deletions src/netcore/Wexflow.Core/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1783,6 +1783,13 @@ public bool Stop(string stoppedBy)
return false;
}

private readonly ManualResetEvent _event = new(true);

/// <summary>
/// If "unset" the thread will wait otherwise it will continue.
/// </summary>
internal void WaitOne() => _event.WaitOne();

/// <summary>
/// Suspends this workflow.
/// </summary>
Expand All @@ -1792,9 +1799,11 @@ public bool Suspend()
{
try
{
#pragma warning disable CS0618 // Le type ou le membre est obsolète
_thread.Suspend();
#pragma warning restore CS0618 // Le type ou le membre est obsolète
//#pragma warning disable CS0618 // Le type ou le membre est obsolète
// _thread.Suspend();
//#pragma warning restore CS0618 // Le type ou le membre est obsolète
// unset the reset event which will cause the workflow to pause
_event.Reset();
IsPaused = true;
Database.IncrementPendingCount();
Database.DecrementRunningCount();
Expand Down Expand Up @@ -1827,9 +1836,11 @@ public void Resume()
{
try
{
#pragma warning disable CS0618 // Le type ou le membre est obsolète
_thread.Resume();
#pragma warning restore CS0618 // Le type ou le membre est obsolète
//#pragma warning disable CS0618 // Le type ou le membre est obsolète
// _thread.Resume();
//#pragma warning restore CS0618 // Le type ou le membre est obsolète
// // set the reset event which will cause the workflow to continue
_event.Set();
Database.IncrementRunningCount();
Database.DecrementPendingCount();
var entry = Database.GetEntry(Id, InstanceId);
Expand Down
10 changes: 6 additions & 4 deletions src/netcore/Wexflow.Server/WexflowService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,8 @@ private void SuspendWorkflow()
{
_ = _endpoints.MapPost(GetPattern("suspend"), async context =>
{
var res = false;
var auth = GetAuth(context.Request);
var username = auth.Username;
var password = auth.Password;
Expand All @@ -633,19 +635,19 @@ private void SuspendWorkflow()
{
if (user.UserProfile == Core.Db.UserProfile.SuperAdministrator)
{
WexflowServer.WexflowEngine.SuspendWorkflow(workflowId, instanceId);
await context.Response.WriteAsync(string.Empty);
res = WexflowServer.WexflowEngine.SuspendWorkflow(workflowId, instanceId);
}
else if (user.UserProfile == Core.Db.UserProfile.Administrator)
{
var workflowDbId = WexflowServer.WexflowEngine.Workflows.First(w => w.Id == workflowId).DbId;
var check = WexflowServer.WexflowEngine.CheckUserWorkflow(user.GetDbId(), workflowDbId);
if (check)
{
WexflowServer.WexflowEngine.SuspendWorkflow(workflowId, instanceId);
await context.Response.WriteAsync(string.Empty);
res = WexflowServer.WexflowEngine.SuspendWorkflow(workflowId, instanceId);
}
}
await context.Response.WriteAsync(JsonConvert.SerializeObject(res));
}
else
{
Expand Down
1 change: 1 addition & 0 deletions src/netcore/Wexflow.Tasks.Approval/Approval.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public override TaskStatus Run()
while (!File.Exists(trigger) && !Workflow.IsRejected && !IsStopped)
{
Thread.Sleep(1000);
WaitOne();
}

IsWaitingForApproval = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public override TaskStatus Run()
ErrorFormat("An error occured while importing the record {0}.", e, file.Path);
success = false;
}
finally
{
WaitOne();
}
}

var smKey = "ApprovalRecordsCreator.RecordIds";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public override TaskStatus Run()
e, recordId);
success = false;
}
finally
{
WaitOne();
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/netcore/Wexflow.Tasks.ApproveRecord/ApproveRecord.cs
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,10 @@ record = Workflow.Database.GetRecord(RecordId);
Error("An error occured during approval process.", e);
status = Core.Status.Error;
}
finally
{
WaitOne();
}

Info("Approval process finished.");
return new TaskStatus(status);
Expand Down
4 changes: 4 additions & 0 deletions src/netcore/Wexflow.Tasks.CsvToJson/CsvToJson.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ private bool ConvertFiles(ref bool atLeastOneSuccess)
ErrorFormat("An error occured while converting the CSV file {0}: {1}", csvFile.Path, e.Message);
success = false;
}
finally
{
WaitOne();
}
}

return success;
Expand Down
4 changes: 4 additions & 0 deletions src/netcore/Wexflow.Tasks.CsvToSql/CsvToSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ private bool ConvertCsvToSql(string csvPath, string sqlPath, string tableName, s
ErrorFormat("An error occured while converting the CSV {0} to SQL: {1}", csvPath, e.Message);
return false;
}
finally
{
WaitOne();
}
}
}
}
4 changes: 4 additions & 0 deletions src/netcore/Wexflow.Tasks.CsvToXml/CsvToXml.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public override TaskStatus Run()
ErrorFormat("An error occured while creating the XML from {0} Please check this XML file according to the documentation of the task. Error: {1}", file.Path, e.Message);
success = false;
}
finally
{
WaitOne();
}
}

var status = Status.Success;
Expand Down
4 changes: 4 additions & 0 deletions src/netcore/Wexflow.Tasks.CsvToYaml/CsvToYaml.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ private bool ConvertFiles(ref bool atLeastOneSuccess)
ErrorFormat("An error occured while converting the CSV file {0}: {1}", csvFile.Path, e.Message);
success = false;
}
finally
{
WaitOne();
}
}

return success;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public override TaskStatus Run()
{
ErrorFormat("An error occured while retrieving environment variable. Error: {0}", e.Message);
}
finally
{
WaitOne();
}

Info("Task finished.");
return new TaskStatus(succeeded ? Status.Success : Status.Error, value);
Expand Down
4 changes: 4 additions & 0 deletions src/netcore/Wexflow.Tasks.ExecPython/ExecPython.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public override Core.TaskStatus Run()
ErrorFormat("An error occured while executing the script {0}: {1}", pythonFile.Path, e.Message);
success = false;
}
finally
{
WaitOne();
}
}

if (!success && atLeastOneSuccess)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public override TaskStatus Run()
{
InfoFormat("No content matching the pattern {0} was found in the file {1}.", Pattern, file);
}

WaitOne();
}

// Checking folders
Expand All @@ -63,6 +65,8 @@ public override TaskStatus Run()
{
InfoFormat("No content matching the pattern {0} was found in the file {1}.", Pattern, file);
}

WaitOne();
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/netcore/Wexflow.Tasks.FileExists/FileExists.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public override TaskStatus Run()
ErrorFormat("An error occured while checking file {0}. Error: {1}", File, e.Message);
return new TaskStatus(Status.Error, false);
}
finally
{
WaitOne();
}

Info("Task finished");

Expand Down
3 changes: 3 additions & 0 deletions src/netcore/Wexflow.Tasks.FileMatch/FileMatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public override TaskStatus Run()
success = true;
break;
}

WaitOne();
}

if (success)
Expand All @@ -61,6 +63,7 @@ public override TaskStatus Run()
return new TaskStatus(Status.Error, false);
}


Info("Task finished");

return new TaskStatus(Status.Success, success);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public override TaskStatus Run()
finally
{
Info("FileSystemWatcher.OnFound finished.");
WaitOne();
}
try
{
Expand Down
4 changes: 4 additions & 0 deletions src/netcore/Wexflow.Tasks.FilesConcat/FilesConcat.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public override TaskStatus Run()
ErrorFormat("An error occured while concatenating the file {0}", e, file.Path);
success = false;
}
finally
{
WaitOne();
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/netcore/Wexflow.Tasks.FilesCopier/FilesCopier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ public override TaskStatus Run()
ErrorFormat("An error occured while copying the file {0} to {1}.", e, file.Path, destPath);
success = false;
}
finally
{
WaitOne();
}
}

var status = Status.Success;
Expand Down
2 changes: 2 additions & 0 deletions src/netcore/Wexflow.Tasks.FilesDecryptor/FilesDecryptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public override TaskStatus Run()
{
atLeastOneSuccess = true;
}

WaitOne();
}

if (!succeeded && atLeastOneSuccess)
Expand Down
1 change: 1 addition & 0 deletions src/netcore/Wexflow.Tasks.FilesDiff/FilesDiff.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public override TaskStatus Run()
try
{
CheckFiles();
WaitOne();
ts = new TaskStatus(Status.Success);
}
catch (ThreadInterruptedException)
Expand Down
1 change: 1 addition & 0 deletions src/netcore/Wexflow.Tasks.FilesEncryptor/FilesEncryptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public override TaskStatus Run()
{
atLeastOneSuccess = true;
}
WaitOne();
}

if (!succeeded && atLeastOneSuccess)
Expand Down
1 change: 1 addition & 0 deletions src/netcore/Wexflow.Tasks.FilesEqual/FilesEqual.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public override TaskStatus Run()

xdoc.Save(xmlPath);
Files.Add(new FileInf(xmlPath, Id));
WaitOne();
InfoFormat("The result has been written in: {0}", xmlPath);
}
catch (ThreadInterruptedException)
Expand Down
2 changes: 2 additions & 0 deletions src/netcore/Wexflow.Tasks.FilesExist/FilesExist.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public override TaskStatus Run()
new XAttribute("path", file),
new XAttribute("name", Path.GetFileName(file)),
new XAttribute("exists", File.Exists(file))));
WaitOne();
}

foreach (var folder in Folders)
Expand All @@ -46,6 +47,7 @@ public override TaskStatus Run()
new XAttribute("path", folder),
new XAttribute("name", Path.GetFileName(folder)),
new XAttribute("exists", Directory.Exists(folder))));
WaitOne();
}

if (xdoc.Root != null)
Expand Down
4 changes: 4 additions & 0 deletions src/netcore/Wexflow.Tasks.FilesInfo/FilesInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public override TaskStatus Run()
ErrorFormat("An error occured while generating file information of the file {0}", e, file.Path);
success = false;
}
finally
{
WaitOne();
}
}
xdoc.Save(filesInfoPath);
Files.Add(new FileInf(filesInfoPath, Id));
Expand Down
4 changes: 4 additions & 0 deletions src/netcore/Wexflow.Tasks.FilesJoiner/FilesJoiner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ private bool JoinFiles(string fileName, FileInf[] files)
ErrorFormat("An error occured while concatenating the file {0}", e, file.Path);
success = false;
}
finally
{
WaitOne();
}
}

if (success)
Expand Down
2 changes: 2 additions & 0 deletions src/netcore/Wexflow.Tasks.FilesLoader/FilesLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public override TaskStatus Run()
Files.Add(fi);
InfoFormat("File loaded: {0}", file);
}
WaitOne();
}
}
}
Expand All @@ -59,6 +60,7 @@ public override TaskStatus Run()
Files.Add(fi);
InfoFormat("File loaded: {0}", file);
}
WaitOne();
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/netcore/Wexflow.Tasks.FilesLoaderEx/FilesLoaderEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public override TaskStatus Run()
FileInf fi = new(file, Id);
folderFiles.Add(fi);
}
WaitOne();
}
}
}
Expand All @@ -80,6 +81,7 @@ public override TaskStatus Run()
FileInf fi = new(file, Id);
folderFiles.Add(fi);
}
WaitOne();
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/netcore/Wexflow.Tasks.FilesMover/FilesMover.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public override TaskStatus Run()
ErrorFormat("An error occured while moving the file {0} to {1}", e, file.Path, destFilePath);
success = false;
}
finally
{
WaitOne();
}
}

var status = Status.Success;
Expand Down
4 changes: 4 additions & 0 deletions src/netcore/Wexflow.Tasks.FilesRemover/FilesRemover.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public override TaskStatus Run()
ErrorFormat("An error occured while deleting the file {0}", e, file.Path);
success = false;
}
finally
{
WaitOne();
}
}

var status = Status.Success;
Expand Down
Loading

0 comments on commit 11b983b

Please sign in to comment.