From 9ceae8bfc7fe161d8993e94bb6e795824baecc98 Mon Sep 17 00:00:00 2001 From: Friedrich von Never Date: Sat, 15 Jun 2019 22:10:42 +0700 Subject: [PATCH] Start the new Telegram client (#29) --- Emulsion.Tests/Actors/Telegram.fs | 2 +- Emulsion.Tests/MessageSystem.fs | 12 ++++++------ Emulsion/MessageSender.fs | 11 ++++++++--- Emulsion/MessageSystem.fs | 18 +++++++++--------- Emulsion/Program.fs | 19 +++++++++++++++++-- Emulsion/Telegram/Client.fs | 11 ++++++----- Emulsion/Telegram/Funogram.fs | 6 +++++- 7 files changed, 52 insertions(+), 27 deletions(-) diff --git a/Emulsion.Tests/Actors/Telegram.fs b/Emulsion.Tests/Actors/Telegram.fs index c5ecf331..33cc24d0 100644 --- a/Emulsion.Tests/Actors/Telegram.fs +++ b/Emulsion.Tests/Actors/Telegram.fs @@ -15,7 +15,7 @@ type TelegramTest() = let mutable sentMessage = None let telegram = { new IMessageSystem with - member __.Run _ _ = () + member __.Run _ = () member __.PutMessage message = sentMessage <- Some message } diff --git a/Emulsion.Tests/MessageSystem.fs b/Emulsion.Tests/MessageSystem.fs index 02fc23c2..b929e519 100644 --- a/Emulsion.Tests/MessageSystem.fs +++ b/Emulsion.Tests/MessageSystem.fs @@ -11,9 +11,9 @@ open Emulsion.MessageSystem let private performTest expectedStage runBody = use cts = new CancellationTokenSource() let mutable stage = 0 - let run ct = + let run() = stage <- stage + 1 - runBody cts ct stage + runBody cts stage let context = { cooldown = TimeSpan.Zero logError = ignore @@ -24,7 +24,7 @@ let private performTest expectedStage runBody = [] let ``wrapRun should restart the activity on error``() = - performTest 2 (fun cts _ stage -> + performTest 2 (fun cts stage -> match stage with | 1 -> raise <| Exception() | 2 -> cts.Cancel() @@ -33,14 +33,14 @@ let ``wrapRun should restart the activity on error``() = [] let ``wrapRun should not restart on OperationCanceledException``() = - performTest 1 (fun cts ct _ -> + performTest 1 (fun cts _ -> cts.Cancel() - ct.ThrowIfCancellationRequested() + cts.Token.ThrowIfCancellationRequested() ) [] let ``wrapRun should not restart on token.Cancel()``() = - performTest 4 (fun cts _ stage -> + performTest 4 (fun cts stage -> if stage > 3 then cts.Cancel() ) diff --git a/Emulsion/MessageSender.fs b/Emulsion/MessageSender.fs index 3a3155a1..6fe0970d 100644 --- a/Emulsion/MessageSender.fs +++ b/Emulsion/MessageSender.fs @@ -1,6 +1,7 @@ module Emulsion.MessageSender open System +open System.Threading type MessageSenderContext = { send: OutgoingMessage -> Async @@ -18,13 +19,17 @@ let rec private sendRetryLoop ctx msg = async { return! sendRetryLoop ctx msg } -let activity(ctx: MessageSenderContext): MailboxProcessor = MailboxProcessor.Start(fun inbox -> +type Sender = MailboxProcessor +let private receiver ctx (inbox: Sender) = let rec loop() = async { let! msg = inbox.Receive() do! sendRetryLoop ctx msg return! loop() } loop() -) -let send(activity: MailboxProcessor): OutgoingMessage -> unit = activity.Post +let startActivity(ctx: MessageSenderContext, token: CancellationToken): Sender = + MailboxProcessor.Start(receiver ctx, token) + +let send(activity: Sender): OutgoingMessage -> unit = activity.Post +// TODO[F]: Tests for this module. diff --git a/Emulsion/MessageSystem.fs b/Emulsion/MessageSystem.fs index 2c44ac73..5e9d73a3 100644 --- a/Emulsion/MessageSystem.fs +++ b/Emulsion/MessageSystem.fs @@ -9,7 +9,7 @@ type IncomingMessageReceiver = Message -> unit /// a queue and sends them when possible. Redirects the incoming messages to a function passed when starting the queue. type IMessageSystem = /// Starts the IM connection, manages reconnects. Never terminates unless cancelled. - abstract member Run : IncomingMessageReceiver -> CancellationToken -> unit + abstract member Run : IncomingMessageReceiver -> unit /// Queues the message to be sent to the IM system when possible. abstract member PutMessage : OutgoingMessage -> unit @@ -20,10 +20,10 @@ type RestartContext = { logMessage: string -> unit } -let internal wrapRun (ctx: RestartContext) (token: CancellationToken) (run: CancellationToken -> unit) : unit = +let internal wrapRun (ctx: RestartContext) (token: CancellationToken) (run: unit -> unit) : unit = while not token.IsCancellationRequested do try - run token + run() with | :? OperationCanceledException -> () | ex -> @@ -35,23 +35,23 @@ let putMessage (messageSystem: IMessageSystem) (message: OutgoingMessage) = messageSystem.PutMessage message [] -type MessageSystemBase(restartContext: RestartContext) as this = - let sender = MessageSender.activity { +type MessageSystemBase(restartContext: RestartContext, cancellationToken: CancellationToken) as this = + let sender = MessageSender.startActivity({ send = this.Send logError = restartContext.logError cooldown = restartContext.cooldown - } + }, cancellationToken) /// Starts the IM connection, manages reconnects. On cancellation could either throw OperationCanceledException or /// return a unit. - abstract member Run : IncomingMessageReceiver -> CancellationToken -> unit + abstract member RunOnce : IncomingMessageReceiver -> unit /// Sends a message through the message system. Free-threaded. Could throw exceptions; if throws an exception, then /// will be restarted later. abstract member Send : OutgoingMessage -> Async interface IMessageSystem with - member ms.Run receiver token = - wrapRun restartContext token (this.Run receiver) + member ms.Run receiver = + wrapRun restartContext cancellationToken (fun () -> this.RunOnce receiver) member __.PutMessage message = MessageSender.send sender message diff --git a/Emulsion/Program.fs b/Emulsion/Program.fs index fbc3c8f0..25148f1b 100644 --- a/Emulsion/Program.fs +++ b/Emulsion/Program.fs @@ -6,6 +6,7 @@ open System.IO open Akka.Actor open Microsoft.Extensions.Configuration +open System.Threading open Emulsion.Actors open Emulsion.MessageSystem open Emulsion.Settings @@ -21,6 +22,15 @@ let private getConfiguration directory fileName = let private logError = printfn "ERROR: %A" let private logInfo = printfn "INFO : %s" +let private startMessageSystem (system: IMessageSystem) receiver = + Async.StartChild <| async { + do! Async.SwitchToNewThread() + try + system.Run receiver + with + | ex -> logError ex + } + let private startApp config = async { printfn "Prepare system..." @@ -31,14 +41,19 @@ let private startApp config = logError = logError logMessage = logInfo } + let! cancellationToken = Async.CancellationToken let xmpp = Xmpp.Client.sharpXmpp config.xmpp - let telegram = Telegram.Client(restartContext, config.telegram) + let telegram = Telegram.Client(restartContext, cancellationToken, config.telegram) let factories = { xmppFactory = Xmpp.spawn xmpp telegramFactory = fun factory _ name -> Telegram.spawn telegram factory name } // TODO[F]: Change the architecture here so we don't need to ignore the `core` parameter. printfn "Prepare Core..." - ignore <| Core.spawn factories system "core" + let core = Core.spawn factories system "core" + printfn "Starting message systems..." + let! telegram = startMessageSystem telegram (fun m -> core.Tell(TelegramMessage m)) printfn "Ready. Wait for termination..." do! Async.AwaitTask system.WhenTerminated + printfn "Waiting for terminating of message systems..." + do! telegram } let private runApp app = diff --git a/Emulsion/Telegram/Client.fs b/Emulsion/Telegram/Client.fs index a9a864e6..de48117d 100644 --- a/Emulsion/Telegram/Client.fs +++ b/Emulsion/Telegram/Client.fs @@ -1,14 +1,15 @@ namespace Emulsion.Telegram +open System.Threading + open Emulsion.MessageSystem open Emulsion.Settings -type Client(restartContext: RestartContext, settings: TelegramSettings) = - inherit MessageSystemBase(restartContext) +type Client(restartContext: RestartContext, cancellationToken: CancellationToken, settings: TelegramSettings) = + inherit MessageSystemBase(restartContext, cancellationToken) - override __.Run receiver _ = - // TODO[F]: Update Funogram and don't ignore the cancellation token here. - Funogram.run settings receiver + override __.RunOnce receiver = + Funogram.run settings cancellationToken receiver override __.Send message = Funogram.send settings message diff --git a/Emulsion/Telegram/Funogram.fs b/Emulsion/Telegram/Funogram.fs index 4590c374..30b3ce7e 100644 --- a/Emulsion/Telegram/Funogram.fs +++ b/Emulsion/Telegram/Funogram.fs @@ -5,6 +5,7 @@ open Funogram.Bot open Funogram.Api open Funogram.Types +open System.Threading open Emulsion open Emulsion.Settings @@ -51,6 +52,9 @@ let send (settings : TelegramSettings) (OutgoingMessage content) : Async = return processResult result } -let run (settings : TelegramSettings) (onMessage : Emulsion.Message -> unit) : unit = +let run (settings: TelegramSettings) + (cancellationToken: CancellationToken) + (onMessage: Emulsion.Message -> unit) : unit = + // TODO[F]: Update Funogram and don't ignore the cancellation token here. let config = { defaultConfig with Token = settings.token } Bot.startBot config (updateArrived onMessage) None