Skip to content

Commit

Permalink
Added some benchmark utilities to allow comparison with other impleme…
Browse files Browse the repository at this point in the history
…ntations.

Adds benchmarks for communicating a single 64-bit integer between threads in
the following thread topologies:
- unicast (1->1)
- multicast (1->3)
- pipeline (1->1->1->1)
- diamond (1->2->1)
- sequencer (3->1)
  • Loading branch information
lewissbaker committed Jul 30, 2014
1 parent afc2468 commit 1dafeae
Show file tree
Hide file tree
Showing 8 changed files with 678 additions and 0 deletions.
20 changes: 20 additions & 0 deletions benchmark/build.cake
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from cake.tools import script, env, compiler

compiler.addIncludePath(env.expand("${DISRUPTOR}/include"))

benchmarks = ["unicast",
"multicast",
"sequencer",
"pipeline",
"diamond",
]

programs = []
for benchmark in benchmarks:
programs.append(
compiler.program(
target=env.expand("${DISRUPTOR_BUILD}/" + benchmark),
sources=compiler.objects(
targetDir=env.expand("${DISRUPTOR_BUILD}"),
sources=script.cwd([benchmark + ".cpp"]),
)))
159 changes: 159 additions & 0 deletions benchmark/diamond.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#include <disruptorplus/single_threaded_claim_strategy.hpp>
#include <disruptorplus/multi_threaded_claim_strategy.hpp>
#include <disruptorplus/blocking_wait_strategy.hpp>
#include <disruptorplus/spin_wait_strategy.hpp>
#include <disruptorplus/sequence_barrier_group.hpp>
#include <disruptorplus/ring_buffer.hpp>

#include <thread>
#include <chrono>
#include <cstdint>
#include <iostream>

namespace
{
template<typename WaitStrategy, template<typename T> class ClaimStrategy>
uint64_t CalculateOpsPerSecond(size_t bufferSize, uint64_t iterationCount, int consumerCount)
{
WaitStrategy waitStrategy;
std::vector<std::unique_ptr<disruptorplus::sequence_barrier<WaitStrategy>>> consumedBarriers(consumerCount);
ClaimStrategy<WaitStrategy> claimStrategy(bufferSize, waitStrategy);
disruptorplus::sequence_barrier_group<WaitStrategy> parallelConsumers(waitStrategy);
disruptorplus::ring_buffer<uint64_t> buffer(bufferSize);

for (int i = 0; i < consumerCount; ++i)
{
consumedBarriers[i].reset(new disruptorplus::sequence_barrier<WaitStrategy>(waitStrategy));
if (i + 1 != consumerCount)
{
parallelConsumers.add(*consumedBarriers[i]);
}
}
claimStrategy.add_claim_barrier(*consumedBarriers.back());

const uint64_t expectedResult = (iterationCount * (iterationCount - 1)) / 2;

std::vector<uint64_t> results(consumerCount);

std::vector<std::thread> consumers;
consumers.reserve(consumerCount);

// Consumers
for (int consumerIndex = 0; consumerIndex < consumerCount; ++consumerIndex)
{
if (consumerIndex + 1 != consumerCount)
{
consumers.emplace_back([&, consumerIndex]()
{
uint64_t sum = 0;
disruptorplus::sequence_t nextToRead = 0;
uint64_t itemsRemaining = iterationCount;
auto& barrier = *consumedBarriers[consumerIndex];
while (itemsRemaining > 0)
{
const auto available = claimStrategy.wait_until_published(nextToRead, nextToRead - 1);
do
{
sum += buffer[nextToRead];
--itemsRemaining;
} while (nextToRead++ != available);
barrier.publish(available);
}

results[consumerIndex] = sum;
});
}
else
{
consumers.emplace_back([&, consumerIndex]()
{
uint64_t sum = 0;
disruptorplus::sequence_t nextToRead = 0;
uint64_t itemsRemaining = iterationCount;
auto& barrier = *consumedBarriers[consumerIndex];
while (itemsRemaining > 0)
{
const auto available = parallelConsumers.wait_until_published(nextToRead);
do
{
sum += buffer[nextToRead];
--itemsRemaining;
} while (nextToRead++ != available);
barrier.publish(available);
}

results[consumerIndex] = sum;
});
}
}

const auto start = std::chrono::high_resolution_clock::now();

// Publisher
for (uint64_t i = 0; i < iterationCount; ++i)
{
const auto seq = claimStrategy.claim_one();
buffer[seq] = i;
claimStrategy.publish(seq);
}

bool resultsOk = true;
for (int i = 0; i < consumerCount; ++i)
{
consumers[i].join();
if (results[i] != expectedResult)
{
resultsOk = false;
}
}

if (!resultsOk)
{
throw std::domain_error("Unexpected test result.");
}

const auto timeTaken = std::chrono::high_resolution_clock::now() - start;
const auto timeTakenUS = std::chrono::duration_cast<std::chrono::microseconds>(timeTaken).count();

return (iterationCount * 1000 * 1000) / timeTakenUS;
}
}

int main()
{
const int consumerCount = 3;
const size_t bufferSize = 64 * 1024;
const uint64_t iterationCount = 10 * 1000 * 1000;
const uint32_t runCount = 5;

std::cout << "Diamond Throughput Benchmark" << std::endl
<< "Consumer count: " << consumerCount << std::endl
<< "Buffer size: " << bufferSize << std::endl
<< "Iteration count: " << iterationCount << std::endl
<< "Run count: " << runCount << std::endl;

try
{
#define BENCHMARK(CS,WS) \
do { \
std::cout << #CS "/" #WS << std::endl; \
for (uint32_t run = 1; run <= runCount; ++run) \
{ \
const auto opsPerSecond = CalculateOpsPerSecond<disruptorplus::WS, disruptorplus::CS>(bufferSize, iterationCount, consumerCount); \
std::cout << "run " << run << " " << opsPerSecond << " ops/sec" << std::endl; \
} \
} while (false)

BENCHMARK(single_threaded_claim_strategy, spin_wait_strategy);
BENCHMARK(single_threaded_claim_strategy, blocking_wait_strategy);
BENCHMARK(multi_threaded_claim_strategy, spin_wait_strategy);
BENCHMARK(multi_threaded_claim_strategy, blocking_wait_strategy);
}
catch (std::exception& e)
{
std::cout << "error: " << e.what() << std::endl;
return 1;
}

return 0;
}
128 changes: 128 additions & 0 deletions benchmark/multicast.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#include <disruptorplus/single_threaded_claim_strategy.hpp>
#include <disruptorplus/multi_threaded_claim_strategy.hpp>
#include <disruptorplus/blocking_wait_strategy.hpp>
#include <disruptorplus/spin_wait_strategy.hpp>
#include <disruptorplus/ring_buffer.hpp>

#include <thread>
#include <chrono>
#include <cstdint>
#include <iostream>

namespace
{
template<typename WaitStrategy, template<typename T> class ClaimStrategy>
uint64_t CalculateOpsPerSecond(size_t bufferSize, uint64_t iterationCount, int consumerCount)
{
WaitStrategy waitStrategy;
std::vector<std::unique_ptr<disruptorplus::sequence_barrier<WaitStrategy>>> consumedBarriers(consumerCount);
ClaimStrategy<WaitStrategy> claimStrategy(bufferSize, waitStrategy);
disruptorplus::ring_buffer<uint64_t> buffer(bufferSize);

for (int i = 0; i < consumerCount; ++i)
{
consumedBarriers[i].reset(new disruptorplus::sequence_barrier<WaitStrategy>(waitStrategy));
claimStrategy.add_claim_barrier(*consumedBarriers[i]);
}

const uint64_t expectedResult = (iterationCount * (iterationCount - 1)) / 2;

std::vector<uint64_t> results(consumerCount);

std::vector<std::thread> consumers;
consumers.reserve(consumerCount);

// Consumers
for (int consumerIndex = 0; consumerIndex < consumerCount; ++consumerIndex)
{
consumers.emplace_back([&,consumerIndex]()
{
uint64_t sum = 0;
disruptorplus::sequence_t nextToRead = 0;
uint64_t itemsRemaining = iterationCount;
auto& barrier = *consumedBarriers[consumerIndex];
while (itemsRemaining > 0)
{
const auto available = claimStrategy.wait_until_published(nextToRead, nextToRead - 1);
do
{
sum += buffer[nextToRead];
--itemsRemaining;
} while (nextToRead++ != available);
barrier.publish(available);
}

results[consumerIndex] = sum;
});
}

const auto start = std::chrono::high_resolution_clock::now();

// Publisher
for (uint64_t i = 0; i < iterationCount; ++i)
{
const auto seq = claimStrategy.claim_one();
buffer[seq] = i;
claimStrategy.publish(seq);
}

bool resultsOk = true;
for (int i = 0; i < consumerCount; ++i)
{
consumers[i].join();
if (results[i] != expectedResult)
{
resultsOk = false;
}
}

if (!resultsOk)
{
throw std::domain_error("Unexpected test result.");
}

const auto timeTaken = std::chrono::high_resolution_clock::now() - start;
const auto timeTakenUS = std::chrono::duration_cast<std::chrono::microseconds>(timeTaken).count();

return (iterationCount * 1000 * 1000) / timeTakenUS;
}
}

int main()
{
const int consumerCount = 3;
const size_t bufferSize = 64 * 1024;
const uint64_t iterationCount = 10 * 1000 * 1000;
const uint32_t runCount = 5;

std::cout << "Multicast Throughput Benchmark" << std::endl
<< "Consumer count: " << consumerCount << std::endl
<< "Buffer size: " << bufferSize << std::endl
<< "Iteration count: " << iterationCount << std::endl
<< "Run count: " << runCount << std::endl;

try
{
#define BENCHMARK(CS,WS) \
do { \
std::cout << #CS "/" #WS << std::endl; \
for (uint32_t run = 1; run <= runCount; ++run) \
{ \
const auto opsPerSecond = CalculateOpsPerSecond<disruptorplus::WS, disruptorplus::CS>(bufferSize, iterationCount, consumerCount); \
std::cout << "run " << run << " " << opsPerSecond << " ops/sec" << std::endl; \
} \
} while (false)

BENCHMARK(single_threaded_claim_strategy, spin_wait_strategy);
BENCHMARK(single_threaded_claim_strategy, blocking_wait_strategy);
BENCHMARK(multi_threaded_claim_strategy, spin_wait_strategy);
BENCHMARK(multi_threaded_claim_strategy, blocking_wait_strategy);
}
catch (std::exception& e)
{
std::cout << "error: " << e.what() << std::endl;
return 1;
}

return 0;
}
Loading

0 comments on commit 1dafeae

Please sign in to comment.