Skip to content

Commit

Permalink
Merge branch 'bloomberg:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-e1off authored Aug 20, 2024
2 parents 917e8a1 + b042ca1 commit b1f9170
Show file tree
Hide file tree
Showing 70 changed files with 14,596 additions and 8,780 deletions.
34 changes: 34 additions & 0 deletions docs/docs/community/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
layout: default
title: Community
nav_order: 10
has_children: false
permalink: /community
---

# [](#community)Community
{: .no_toc }

* toc
{:toc}

Here is a list of community blogposts about BlazingMQ, tools that integrate
with BlazingMQ, and other external projects using BlazingMQ. We haven’t
looked through all of these, so they may be out-of-date or may not work
anymore. Still, you may find inspiration from some of these resources.

We’re looking to grow this list, so please [get in touch with us][get-in-touch]
if you have written about or used BlazingMQ in your project. Better yet, [edit
this page][edit-this-page] and then [send us a PR][contributing-guidelines]!

## Blogposts

- Andy Pearce’s series of blogposts on the features and architecture of
BlazingMQ: [BlazingMQ: Introduction][andy-pearce-introduction] and
[BlazingMQ: Clustering and Message Flow][andy-pearce-clustering-message-flow].

[andy-pearce-clustering-message-flow]: https://www.andy-pearce.com/blog/posts/2024/Jul/blazingmq-clustering-and-message-flow/
[andy-pearce-introduction]: https://www.andy-pearce.com/blog/posts/2024/Jun/blazingmq-introduction/
[contributing-guidelines]: https://github.com/bloomberg/.github/blob/main/CONTRIBUTING.md
[edit-this-page]: https://github.com/bloomberg/blazingmq/edit/main/docs/docs/community/index.md
[get-in-touch]: https://github.com/bloomberg/blazingmq/discussions
15 changes: 8 additions & 7 deletions docs/docs/features/subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,14 @@ manipulation, as a tiny subset of the C programming language.
- Negation requires one argument
- Junctions require two boolean arguments
- Junctions are short-circuiting: if the left side of `&&` is `false`, or if
the left side of `||` is `true`, the right side is not evaluated. As a
consequence, identifiers in an expression do not all need to have a
corresponding property of the right type. For example, `type == "i" && shares
> 1000 || shares == "all"` is valid, as long as the value of `type` and the
type of `shares` are properly correlated. `order == "limit" and limit == 0`
can be used to detect the messages that represent a Limit Order with a silly
limit value, and e.g. log an error message.
the left side of `||` is `true`, the right side is not evaluated.
- As a consequence, identifiers in an expression do not all need to have a
corresponding property of the right type.
- For example, `type == "i" && shares 1000 || shares == "all"` is valid as
long as the value of `type` and the type of `shares` are properly correlated.
`order == "limit" and limit == 0` can be used to detect the messages that
represent a Limit Order with a silly limit value, and e.g. log an error
message.

### Operator Precedence
{:.no_toc}
Expand Down
4 changes: 2 additions & 2 deletions src/groups/bmq/bmqeval/bmqeval_simpleevaluator.h
Original file line number Diff line number Diff line change
Expand Up @@ -967,10 +967,10 @@ inline int CompilationContext::getPropertyIndex(const bsl::string& property,
if (iter == d_properties.end()) {
PropertyInfo info = {type, d_properties.size()};
d_properties.insert(iter, bsl::make_pair(property, info));
return info.d_index; // RETURN
return static_cast<int>(info.d_index); // RETURN
}
if (iter->second.d_type == type) {
return iter->second.d_index; // RETURN
return static_cast<int>(iter->second.d_index); // RETURN
}
return -1;
}
Expand Down
122 changes: 47 additions & 75 deletions src/groups/bmq/bmqp/bmqp_eventutil.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ generateSubQueueInfos(bmqp::Protocol::SubQueueInfosArray* subQueueInfos,

subQueueInfos->clear();

static unsigned int nextSubId = 1;
for (int i = 0; i < numSubQueueInfos; ++i) {
subQueueInfos->push_back(bmqp::SubQueueInfo(nextSubId++));
subQueueInfos->push_back(bmqp::SubQueueInfo(i + 1));
}
}

Expand Down Expand Up @@ -159,7 +158,7 @@ static void appendDatum(bsl::vector<Data>* data,
BSLS_ASSERT_OPT(payloadLength >= 0);

Data datum(bufferFactory, allocator);
datum.d_qid = generateRandomInteger(1, 200);
datum.d_qid = data->size();
datum.d_flags = 0;
// Use the new SubQueueInfo option
datum.d_isSubQueueInfo = true;
Expand Down Expand Up @@ -420,10 +419,11 @@ static void test2_flattenExplodesEvent()
//
// Plan:
// 1) Create an event composed of one message having a payload of size
// one third the maximum enforced size and four SubQueueIds.
// a little over the quarter of the maximum enforced size and four
// SubQueueIds.
// 2) Flatten the event.
// 3) Verify that the flattening results in two event blobs, each having
// two messages with one SubQueueId each.
// 3) Verify that the flattening results in two event blobs, first having
// 15 messages and the second 1 with one SubQueueId each.
//
// Testing:
// Flattening an event having a message with more than one SubQueueId
Expand All @@ -444,13 +444,21 @@ static void test2_flattenExplodesEvent()
// 1) Event composed of one message having a payload of size one third the
// maximum enforced size and four SubQueueIds.
// Msg1
payloadLength = bmqp::EventHeader::k_MAX_SIZE_SOFT / 3;
payloadLength = bmqp::PushHeader::k_MAX_PAYLOAD_SIZE_SOFT / 2;
numSubQueueIds = 4;
appendDatum(&data,
numSubQueueIds,
payloadLength,
&bufferFactory,
s_allocator_p);

int count = 0;
int total = 0;

while (total < bmqp::EventHeader::k_MAX_SIZE_SOFT / 4) {
appendDatum(&data,
numSubQueueIds,
payloadLength,
&bufferFactory,
s_allocator_p);
total += data[count].d_payload.length();
++count;
}

// Create event
appendMessages(&pushEventBuilder, data);
Expand All @@ -468,12 +476,12 @@ static void test2_flattenExplodesEvent()
// 3) Verify that the flattening results in two event blobs, each having
// two messages with one SubQueueId each.
bmqp::PushMessageIterator msgIterator(&bufferFactory, s_allocator_p);
const Data& D = data[0];
int idx = 0;

// 1st flattened event
bmqp::Event flattenedEvent1(&(eventInfos[0].d_blob), s_allocator_p);
ASSERT_EQ(eventInfos[0].d_ids.size(), 2u);
ASSERT_EQ(eventInfos[0].d_ids.size(),
static_cast<size_t>(count * numSubQueueIds - 1));

flattenedEvent1.loadPushMessageIterator(&msgIterator, true);
BSLS_ASSERT_OPT(msgIterator.isValid());
Expand All @@ -489,7 +497,7 @@ static void test2_flattenExplodesEvent()
rc = msgIterator.loadMessagePayload(&payload);
BSLS_ASSERT_OPT(rc == 0);

ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0);
ASSERT_EQ(bdlbb::BlobUtil::compare(data[0].d_payload, payload), 0);
}

// Verify SubQueueInfos
Expand All @@ -506,14 +514,16 @@ static void test2_flattenExplodesEvent()
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(subQueueInfos.size() == 1);

ASSERT_EQ(D.d_subQueueInfos[idx], subQueueInfos[0]);
ASSERT_EQ(data[0].d_subQueueInfos[0], subQueueInfos[0]);
}

// Verify that 'eventInfo' contains the queueId pair (id, subId)
// corresponding to this message
const int id = D.d_qid;
const unsigned int subcriptionId = D.d_subQueueInfos[idx].id();
// Verify that 'eventInfo' contains the queueId pair (id, subId)
// corresponding to this message

ASSERT(find(eventInfos[0], id, subcriptionId));
for (size_t i = 0; i < eventInfos[0].d_ids.size(); ++i) {
ASSERT_EQ(eventInfos[0].d_ids[i].d_subscriptionId, i % count + 1);
ASSERT_EQ(size_t(eventInfos[0].d_ids[i].d_header.queueId()),
i / count);
}

++idx;
Expand All @@ -529,7 +539,7 @@ static void test2_flattenExplodesEvent()
rc = msgIterator.loadMessagePayload(&payload);
BSLS_ASSERT_OPT(rc == 0);

ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0);
ASSERT_EQ(bdlbb::BlobUtil::compare(data[0].d_payload, payload), 0);
}

// Verify SubQueueInfos
Expand All @@ -546,25 +556,19 @@ static void test2_flattenExplodesEvent()
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(subQueueInfos.size() == 1);

ASSERT_EQ(D.d_subQueueInfos[idx], subQueueInfos[0]);

// Verify that 'eventInfo' contains the queueId pair (id, subId)
// corresponding to this message
const int qId = D.d_qid;
const unsigned int subcriptionId = D.d_subQueueInfos[idx].id();
ASSERT(find(eventInfos[0], qId, subcriptionId));
ASSERT_EQ(data[0].d_subQueueInfos[idx], subQueueInfos[0]);
}

++idx;
idx = count - 1; // the last one did not fit the first event

// 2nd flattened event
bmqp::Event flattenedEvent2(&(eventInfos[1].d_blob), s_allocator_p);
ASSERT_EQ(eventInfos[1].d_ids.size(), 2u);
ASSERT_EQ(eventInfos[1].d_ids.size(), 1u);

flattenedEvent2.loadPushMessageIterator(&msgIterator, true);
BSLS_ASSERT_OPT(msgIterator.isValid());

// Third message
// 1st message in tne second event
rc = msgIterator.next();
BSLS_ASSERT_OPT(rc == 1);
BSLS_ASSERT_OPT(msgIterator.hasOptions());
Expand All @@ -575,7 +579,7 @@ static void test2_flattenExplodesEvent()
rc = msgIterator.loadMessagePayload(&payload);
BSLS_ASSERT_OPT(rc == 0);

ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0);
ASSERT_EQ(bdlbb::BlobUtil::compare(data[idx].d_payload, payload), 0);
}

// Verify SubQueueInfos
Expand All @@ -592,54 +596,22 @@ static void test2_flattenExplodesEvent()
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(subQueueInfos.size() == 1);

ASSERT_EQ(D.d_subQueueInfos[idx].id(), subQueueInfos[0].id());
ASSERT_EQ(data[idx].d_subQueueInfos[idx].id(), subQueueInfos[0].id());

// Verify that 'eventInfo' contains the queueId (queueId, subQueueId)
// pair corresponding to this message
const int qId = D.d_qid;
const unsigned int subcriptionId = D.d_subQueueInfos[idx].id();
const int qId = data[idx].d_qid;
const unsigned int subcriptionId = data[idx].d_subQueueInfos[idx].id();
ASSERT(find(eventInfos[1], qId, subcriptionId));
}

++idx;

// Fourth message
rc = msgIterator.next();
BSLS_ASSERT_OPT(rc == 1);
BSLS_ASSERT_OPT(msgIterator.hasOptions());

// Verify payload
{
bdlbb::Blob payload(&bufferFactory, s_allocator_p);
rc = msgIterator.loadMessagePayload(&payload);
BSLS_ASSERT_OPT(rc == 0);

ASSERT_EQ(bdlbb::BlobUtil::compare(D.d_payload, payload), 0);
}

// Verify SubQueueInfos
{
bmqp::OptionsView optionsView(s_allocator_p);
rc = msgIterator.loadOptionsView(&optionsView);
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(optionsView.isValid());
BSLS_ASSERT_OPT(
optionsView.find(bmqp::OptionType::e_SUB_QUEUE_INFOS) !=
optionsView.end());
bmqp::Protocol::SubQueueInfosArray subQueueInfos(s_allocator_p);
rc = optionsView.loadSubQueueInfosOption(&subQueueInfos);
BSLS_ASSERT_OPT(rc == 0);
BSLS_ASSERT_OPT(subQueueInfos.size() == 1);

ASSERT_EQ(D.d_subQueueInfos[idx], subQueueInfos[0]);
// Verify that 'eventInfo' contains the queueId (queueId, subQueueId)
// pair corresponding to this message

// Verify that 'eventInfo' contains the queueId (queueId, subQueueId)
// pair corresponding to this message
const int qId = D.d_qid;
const unsigned int subcriptionId = D.d_subQueueInfos[idx].id();
ASSERT_EQ(eventInfos[1].d_ids[0].d_subscriptionId,
data[count - 1].d_subQueueInfos.back().id());
ASSERT_EQ(eventInfos[1].d_ids[0].d_header.queueId(), count - 1);

ASSERT(find(eventInfos[1], qId, subcriptionId));
}
// No more messages
rc = msgIterator.next();
}

static void test3_flattenWithMessageProperties()
Expand Down
3 changes: 0 additions & 3 deletions src/groups/bmq/bmqp/bmqp_optionutil.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,6 @@ static void test2_basicOptionsBoxCanAdd()
const int maxPayload = k_MAX_SIZE - headerSize;
const LimitT limit = maxCanBeAdded(contentSize, maxPayload);
ASSERT_EQ(Result::e_OPTION_TOO_BIG, limit.second);
const int sizeLeft = k_MAX_SIZE_SOFT - contentSize;
const int expected = sizeLeft / k_MAX_SIZE;
ASSERT_EQ(expected, limit.first);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/groups/bmq/bmqp/bmqp_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ const int MessagePropertyHeader::k_PROP_NAME_LEN_MASK = bdlb::BitMaskUtil::one(

const int PutHeader::k_MAX_OPTIONS_SIZE;
const int PutHeader::k_MAX_PAYLOAD_SIZE_SOFT;
const int PutHeader::k_MAX_SIZE_SOFT;
// Force variable/symbol definition so that it can be used in other files

const int PutHeader::k_FLAGS_MASK = bdlb::BitMaskUtil::one(
Expand Down
11 changes: 7 additions & 4 deletions src/groups/bmq/bmqp/bmqp_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -823,11 +823,13 @@ struct EventHeader {
/// an outgoing storage message can exceed
/// `PutHeader::k_MAX_PAYLOAD_SIZE_SOFT`. So, we assign a value of 65MB
/// to `StorageHeader::k_MAX_PAYLOAD_SIZE_SOFT`, and assign a value of
/// 66MB to `'EventHeader::k_MAX_SIZE_SOFT` such that a PUT message
/// having the maximum allowable value is processed through the entire
/// BlazingMQ pipeline w/o any issues. Also see notes in
/// at least 66MB to `EventHeader::k_MAX_SIZE_SOFT` such that a PUT
/// message having the maximum allowable value is processed through the
/// BlazingMQ pipeline w/o any issues. The value of
/// `EventHeader::k_MAX_SIZE_SOFT` is 512Mb to improve batching at high
/// posting rates. Also see notes for the
/// `StorageHeader::k_MAX_PAYLOAD_SIZE_SOFT` constant.
static const int k_MAX_SIZE_SOFT = (64 + 2) * 1024 * 1024;
static const int k_MAX_SIZE_SOFT = 512 * 1024 * 1024;

/// Highest possible value for the type of an event.
static const int k_MAX_TYPE = (1 << k_TYPE_NUM_BITS) - 1;
Expand Down Expand Up @@ -1492,6 +1494,7 @@ struct PutHeader {
/// be increased but not up to `k_MAX_SIZE`.
static const int k_MAX_PAYLOAD_SIZE_SOFT = 64 * 1024 * 1024;

static const int k_MAX_SIZE_SOFT = (64 + 2) * 1024 * 1024;
/// Maximum size (bytes) of the options area.
static const int k_MAX_OPTIONS_SIZE = ((1 << k_OPTIONS_WORDS_NUM_BITS) -
1) *
Expand Down
17 changes: 15 additions & 2 deletions src/groups/bmq/bmqp/bmqp_pusheventbuilder.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1077,15 +1077,28 @@ static void test8_buildEventTooBig()
bdlbb::Blob validPayload2(&bufferFactory, s_allocator_p);
bdlbb::BlobUtil::append(&validPayload2, s.c_str(), validLen);

rc = peb.packMessage(validPayload2,
int count = 1;
while ((evtSize + sizeof(bmqp::PushHeader) + validLen) <
bmqp::EventHeader::k_MAX_SIZE_SOFT) {
rc = peb.packMessage(validPayload2,
queueId,
guid,
flags,
bmqt::CompressionAlgorithmType::e_NONE);
ASSERT_EQ(rc, bmqt::EventBuilderResult::e_SUCCESS);
evtSize += sizeof(bmqp::PushHeader) + validLen;
++count;
}
evtSize = peb.eventSize(); // not calculating padding
rc = peb.packMessage(validPayload2,
queueId,
guid,
flags,
bmqt::CompressionAlgorithmType::e_NONE);

ASSERT_EQ(rc, bmqt::EventBuilderResult::e_EVENT_TOO_BIG);
ASSERT_EQ(evtSize, peb.eventSize());
ASSERT_EQ(1, peb.messageCount());
ASSERT_EQ(count, peb.messageCount());
}

static void testN1_decodeFromFile()
Expand Down
2 changes: 1 addition & 1 deletion src/groups/bmq/bmqp/bmqp_puteventbuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ PutEventBuilder::packMessageInternal(const bdlbb::Blob& appData, int queueId)
numPaddingBytes;

if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(sizeNoOptions >
EventHeader::k_MAX_SIZE_SOFT)) {
PutHeader::k_MAX_SIZE_SOFT)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
return Result::e_EVENT_TOO_BIG; // RETURN
}
Expand Down
Loading

0 comments on commit b1f9170

Please sign in to comment.