Skip to content

Commit

Permalink
FakePayment - override GSB bindings and message channel
Browse files Browse the repository at this point in the history
  • Loading branch information
nieznanysprawiciel committed Aug 9, 2024
1 parent af87391 commit 83106c2
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 4 deletions.
9 changes: 8 additions & 1 deletion core/payment/tests/test_payment_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::str::FromStr;
use test_context::test_context;

use ya_client_model::payment::Acceptance;
use ya_core_model::payment::public::SendInvoice;
use ya_core_model::payment::public::{AcceptInvoice, Ack, SendInvoice};
use ya_framework_basic::async_drop::DroppableTestContext;
use ya_framework_basic::log::enable_logs;
use ya_framework_basic::{resource, temp_dir};
Expand Down Expand Up @@ -72,6 +72,10 @@ async fn test_payment_sync(ctx: &mut DroppableTestContext) -> anyhow::Result<()>
.send_as(invoice.issuer_id, SendInvoice(invoice.clone()))
.await??;

let mut channel = node2
.get_fake_payment()?
.message_channel::<AcceptInvoice>(Ok(Ack {}));

log::info!("Accepting Invoice ({})...", invoice.invoice_id);
requestor.get_invoice(&invoice.invoice_id).await.unwrap();
requestor
Expand All @@ -85,5 +89,8 @@ async fn test_payment_sync(ctx: &mut DroppableTestContext) -> anyhow::Result<()>
.await
.unwrap();

let accept = channel.recv().await.unwrap();
assert_eq!(accept.invoice_id, invoice.invoice_id);
assert_eq!(accept.acceptance.total_amount_accepted, invoice.amount);
Ok(())
}
2 changes: 1 addition & 1 deletion test-utils/test-framework/framework-mocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ regex = "1.5"
serde = "1.0"
serde_json = "1.0"
test-context = "0.1.4"
tokio = "1"
tokio = { version = "1", features = ["sync"] }
uuid = { version = "0.8", features = ["v4", "serde"] }
url = "2.5"

2 changes: 1 addition & 1 deletion test-utils/test-framework/framework-mocks/src/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl RealIdentity {

fn register_identity_in_net(&self, id: NodeId) {
if let Some(gsb) = &self.gsb {
self.net.register_node(&id, &gsb.public_addr());
self.net.register_node(&id, gsb.public_addr());
} else {
// This line is temporary, until we will be able to rebind all modules to non-fixed prefix.
// Currently, all modules must be bound under `/local/{module}` and `/public/{module}`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
use anyhow::anyhow;
use bigdecimal::BigDecimal;
use chrono::{Duration, Utc};
use std::fmt::Display;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::mpsc;
use uuid::Uuid;
use ya_client_model::market::Agreement;

use ya_agreement_utils::AgreementView;
use ya_client_model::market::Agreement;
use ya_client_model::payment::allocation::PaymentPlatformEnum;
use ya_client_model::payment::{DocumentStatus, Invoice, NewAllocation};
use ya_core_model as model;
Expand All @@ -24,6 +26,7 @@ use ya_payment::migrations;
use ya_payment::processor::PaymentProcessor;
use ya_persistence::executor::DbExecutor;
use ya_service_bus::typed::{Endpoint, ServiceBinder};
use ya_service_bus::RpcMessage;

#[derive(Clone)]
pub struct FakePayment {
Expand Down Expand Up @@ -81,6 +84,45 @@ impl FakePayment {
Ok(())
}

/// Function binds new GSB handler to the given message.
/// It returns Receiver that can be used to inspect the messages and make assertions.
/// GSB will always return `result` passed in parameter back to the caller.
/// Function overrides previous handler, so only one Receiver at the same time can be used.
pub fn message_channel<T>(
&self,
result: Result<T::Item, T::Error>,
) -> mpsc::UnboundedReceiver<T>
where
T: RpcMessage,
T::Item: Clone,
T::Error: Clone + Display,
{
let (sender, receiver) = mpsc::unbounded_channel();
self.override_gsb_public()
.bind(move |_db: DbExecutor, _sender_id: String, msg: T| {
let result = result.clone();
let sender = sender.clone();
async move {
let _ = sender.send(msg).map_err(|_e| {
log::error!(
"[FakePayment] Unable to send message '{}' to channel.",
T::ID
);
});
result
}
});
receiver
}

pub fn override_gsb_public(&self) -> ServiceBinder<DbExecutor, ()> {
ServiceBinder::new(self.gsb.public_addr(), &self.db, ())
}

pub fn override_gsb_local(&self) -> ServiceBinder<DbExecutor, ()> {
ServiceBinder::new(self.gsb.local_addr(), &self.db, ())
}

pub fn gsb_local_endpoint(&self) -> Endpoint {
self.gsb.local()
}
Expand Down

0 comments on commit 83106c2

Please sign in to comment.