Skip to content

Commit

Permalink
Async and Message Naming Rubric (#1160)
Browse files Browse the repository at this point in the history
1. Make setState (which sends an UpdatEngine to the serialization
   thread) wait for the unstream to complete to correctly
   meet the plugin api requirement of synchronous set state.
   Closes #1157

2. Add a rubric for how message names should be set, and start
   a few, but not many. Addresses #1141. Did a partial merge since
   I had started but the new name had only been applied to set engine
   state and didn't want conflict pain in the future.
  • Loading branch information
baconpaul authored Aug 20, 2024
1 parent 3065432 commit 2273203
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
#include "app/SCXTEditor.h"
#include "sst/voicemanager/midi1_to_voicemanager.h"

#include "scxt-plugin/scxt-plugin.h"

using namespace juce;

struct SCXTApplicationWindow : juce::DocumentWindow, juce::Button::Listener
Expand Down Expand Up @@ -101,9 +103,8 @@ struct SCXTApplicationWindow : juce::DocumentWindow, juce::Button::Listener
auto streamedState = properties->getValue("engineState");
if (!streamedState.isEmpty())
{
scxt::messaging::client::clientSendToSerialization(
scxt::messaging::client::UnstreamIntoEngine{streamedState.toStdString()},
*engine->getMessageController());
scxt::clap_first::scxt_plugin::SCXTPlugin::synchronousEngineUnstream(
engine, streamedState.toStdString());
}

setupAudio();
Expand Down
31 changes: 28 additions & 3 deletions clients/clap-first/scxt-plugin/scxt-plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*/

#include <cassert>
#include <chrono>

#include "scxt-plugin.h"
#include "version.h"
Expand Down Expand Up @@ -157,9 +158,8 @@ bool SCXTPlugin::stateLoad(const clap_istream *istream) noexcept

auto xml = std::string(buffer.data());

SCLOG("About to load state with size " << xml.size());
scxt::messaging::client::clientSendToSerialization(
scxt::messaging::client::UnstreamIntoEngine{xml}, *engine->getMessageController());
synchronousEngineUnstream(engine, xml);

scxt::messaging::client::clientSendToSerialization(
scxt::messaging::client::RequestHostCallback{(uint64_t)RESCAN_PARAM_IVT},
*engine->getMessageController());
Expand Down Expand Up @@ -559,4 +559,29 @@ void SCXTPlugin::onMainThread() noexcept
}
}

bool SCXTPlugin::synchronousEngineUnstream(const std::unique_ptr<scxt::engine::Engine> &engine,
const std::string &payload)
{
auto &cont = engine->getMessageController();
std::unique_lock<std::mutex> guard(cont->streamNotificationMutex);
auto originalStreamCount{cont->streamNotificationCount};

SCLOG("About to load state with size " << payload.size());
scxt::messaging::client::clientSendToSerialization(
scxt::messaging::client::UnstreamEngineState{payload}, *engine->getMessageController());

auto secondsBeforeTimeout{5.0};
auto waitDuration = std::chrono::milliseconds(100);
auto maxIterations = secondsBeforeTimeout / 100;

int iterations = 0;
while (cont->streamNotificationCount == originalStreamCount && iterations < maxIterations)
{
cont->streamNotificationConditionVariable.wait_for(guard, waitDuration);
iterations++;
}

return iterations < maxIterations;
}

} // namespace scxt::clap_first::scxt_plugin
4 changes: 4 additions & 0 deletions clients/clap-first/scxt-plugin/scxt-plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ struct SCXTPlugin : public plugHelper_t, sst::clap_juce_shim::EditorProvider
}
return true;
}

// a few top level non-clap factored functions
static bool synchronousEngineUnstream(const std::unique_ptr<scxt::engine::Engine> &e,
const std::string &payload);
};

} // namespace scxt::clap_first::scxt_plugin
Expand Down
40 changes: 31 additions & 9 deletions src/messaging/client/client_serial.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,43 @@ namespace scxt::messaging::client
* These IDs are used inside a session only and are not streamed,
* so add whatever you want as long as (1) you keep them contig
* (so don't assign values) and (2) the num_ enum is the last one
*
* These ids follow a naming convention which matches with the objects
* created in the CLIENT_TO_SERIAL.. and SERIAL_TO_CLIENT... macros
* used throughout messaging/client. That naming convention is
*
* S2C:
* - Prefer the verb "Send", since S2C is sending authoritative state
* - Use the structure s2c_send_class_thing, for instance
* s2c_send_selection_state
* - Name the object a camelcase version of the enum without s2c,
* so for instance SendSelectionState
* - Make the SCXT Editor callback names onObjectName
* - name the payloads as objectNamePayload_t so sendSelectionStatePayload_t
* if the payload type is a custom type.
*
* C2S:
* - Prefer the verb corresponding to the action like set, swap, create
* delete, etc. Do not use send in a C2S. Prefer set over update.
* - Class name follows per S2C above as does payload name
* - If an inline function is used to handle a message use the name
* 'doObjectName' so 'doSetZoneOrGroupModstorage`
*/
enum ClientToSerializationMessagesIds
{
c2s_on_register,
// Registration and Reset Messages
c2s_register_client,
c2s_reset_engine,

c2s_unstream_state,
// Stream and IO Messages
c2s_unstream_engine_state,
c2s_save_multi,
c2s_save_selected_part,

c2s_load_multi,
c2s_load_part_into,

c2s_single_select_address,
// Messages we haven't dealt with yet
c2s_do_select_action,
c2s_do_multi_select_action,
c2s_select_part,
Expand Down Expand Up @@ -112,12 +140,6 @@ enum ClientToSerializationMessagesIds

c2s_silence_engine,

c2s_save_multi,
c2s_save_part,

c2s_load_multi,
c2s_load_part,

c2s_set_macro_full_state,
c2s_set_macro_value,

Expand Down
12 changes: 7 additions & 5 deletions src/messaging/client/enginestatus_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ namespace scxt::messaging::client
SERIAL_TO_CLIENT(EngineStatusUpdate, s2c_engine_status, engine::Engine::EngineStatusMessage,
onEngineStatus);

using streamState_t = std::string;
inline void onUnstream(const streamState_t &payload, engine::Engine &engine,
MessageController &cont)
using unstreamEngineStatePayload_t = std::string;
inline void doUnstreamEngineState(const unstreamEngineStatePayload_t &payload,
engine::Engine &engine, MessageController &cont)
{
if (cont.isAudioRunning)
{
Expand All @@ -53,6 +53,7 @@ inline void onUnstream(const streamState_t &payload, engine::Engine &engine,
scxt::json::unstreamEngineState(nonconste, payload);
auto &cont = *e.getMessageController();
cont.restartAudioThreadFromSerial();
cont.sendStreamCompleteNotification();
}
catch (std::exception &err)
{
Expand All @@ -66,15 +67,16 @@ inline void onUnstream(const streamState_t &payload, engine::Engine &engine,
{
engine.stopAllSounds();
scxt::json::unstreamEngineState(engine, payload);
cont.sendStreamCompleteNotification();
}
catch (std::exception &err)
{
SCLOG("Unable to unstream [" << err.what() << "]");
}
}
}
CLIENT_TO_SERIAL(UnstreamIntoEngine, c2s_unstream_state, streamState_t,
onUnstream(payload, engine, cont));
CLIENT_TO_SERIAL(UnstreamEngineState, c2s_unstream_engine_state, unstreamEngineStatePayload_t,
doUnstreamEngineState(payload, engine, cont));

using stopSounds_t = bool;
inline void stopSoundsMessage(const stopSounds_t &payload, messaging::MessageController &cont)
Expand Down
4 changes: 2 additions & 2 deletions src/messaging/client/structure_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ SERIAL_TO_CLIENT(SendAllProcessorDescriptions, s2c_send_all_processor_descriptio
* A message the client auto-sends when it registers just so we can respond
*/

inline void onRegister(engine::Engine &engine, MessageController &cont)
inline void doRegisterClient(engine::Engine &engine, MessageController &cont)
{
assert(cont.threadingChecker.isSerialThread());
engine.sendMetadataToClient();
Expand All @@ -72,7 +72,7 @@ inline void onRegister(engine::Engine &engine, MessageController &cont)
}
engine.getSelectionManager()->sendSelectedPartMacrosToClient();
}
CLIENT_TO_SERIAL(OnRegister, c2s_on_register, bool, onRegister(engine, cont));
CLIENT_TO_SERIAL(RegisterClient, c2s_register_client, bool, doRegisterClient(engine, cont));

/*
* A message the client auto-sends when it registers just so we can respond
Expand Down
2 changes: 1 addition & 1 deletion src/messaging/messaging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ void MessageController::registerClient(const std::string &nm, clientCallback_t &
}

threadingChecker.registerAsClientThread();
client::clientSendToSerialization(client::OnRegister(true), *this);
client::clientSendToSerialization(client::RegisterClient(true), *this);

for (const auto &pcc : preClientConnectionCache)
{
Expand Down
18 changes: 18 additions & 0 deletions src/messaging/messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,24 @@ struct MessageController : MoveableOnly<MessageController>
*/
std::function<void(uint64_t)> requestHostCallback{nullptr};

/*
* A client which requests streaming can use this mutex and condition
* variable to communicate on number of unstreams. See the scxt-plugin
* setState for an example
*/
std::mutex streamNotificationMutex;
std::condition_variable streamNotificationConditionVariable;
uint64_t streamNotificationCount{1743}; // just dont start at zero to help debug
void sendStreamCompleteNotification()
{
{
// Notify any waiting unstreamers
std::lock_guard<std::mutex> nguard{streamNotificationMutex};
streamNotificationCount++;
}
streamNotificationConditionVariable.notify_all();
}

private:
uint64_t inboundClientMessageCount{0};
void runSerialization();
Expand Down

0 comments on commit 2273203

Please sign in to comment.