Skip to content

Commit

Permalink
Chunked image encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
talaj committed Aug 30, 2017
1 parent 13005d4 commit c553d8a
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 0 deletions.
47 changes: 47 additions & 0 deletions src/callback_streambuf.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#ifndef CALLBACK_STREAMBUF_H
#define CALLBACK_STREAMBUF_H

#include <array>
#include <iostream>

template<typename Callback, std::size_t Size, class Char = char>
class callback_streambuf : public std::basic_streambuf<Char>
{
public:
using base_type = std::streambuf;
using char_type = typename base_type::char_type;
using int_type = typename base_type::int_type;

callback_streambuf(Callback callback)
: buffer_{},
callback_(callback)
{
base_type::setp(buffer_.begin(), buffer_.end());
}

protected:
int sync()
{
bool ok = callback_(base_type::pbase(),
base_type::pptr() - base_type::pbase());
base_type::setp(buffer_.begin(), buffer_.end());
return ok ? 0 : -1;
}

int_type overflow(int_type ch)
{
int ret = sync();
if (ch == base_type::traits_type::eof())
{
return ch;
}
base_type::sputc(ch);
return ret == 0 ? 0 : base_type::traits_type::eof();
}

private:
std::array<char_type, Size> buffer_;
Callback callback_;
};

#endif
184 changes: 184 additions & 0 deletions src/mapnik_image.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "mapnik_color.hpp"

#include "utils.hpp"
#include "callback_streambuf.hpp"

#include "agg_rasterizer_scanline_aa.h"
#include "agg_basics.h"
Expand Down Expand Up @@ -82,6 +83,7 @@ void Image::Initialize(v8::Local<v8::Object> target) {
Nan::SetPrototypeMethod(lcons, "setPixel", setPixel);
Nan::SetPrototypeMethod(lcons, "encodeSync", encodeSync);
Nan::SetPrototypeMethod(lcons, "encode", encode);
Nan::SetPrototypeMethod(lcons, "encodeChunked", encodeChunked);
Nan::SetPrototypeMethod(lcons, "view", view);
Nan::SetPrototypeMethod(lcons, "saveSync", saveSync);
Nan::SetPrototypeMethod(lcons, "save", save);
Expand Down Expand Up @@ -3736,6 +3738,188 @@ void Image::EIO_AfterEncode(uv_work_t* req)
delete closure;
}

struct chunked_encode_image_baton_t
{
encode_image_baton_t image_baton;

uv_async_t async;
uv_mutex_t mutex;

using char_type = char;
using buffer_type = std::vector<char_type>;
using buffer_list_type = std::vector<buffer_type>;
buffer_list_type buffers;

chunked_encode_image_baton_t()
{
if (int e = uv_async_init(uv_default_loop(), &async, yield_chunk))
{
throw std::runtime_error("Cannot create async handler");
}

if (int e = uv_mutex_init(&mutex))
{
uv_close(reinterpret_cast<uv_handle_t*>(&async), NULL);
throw std::runtime_error("Cannot create mutex");
}

async.data = this;
}

~chunked_encode_image_baton_t()
{
uv_close(reinterpret_cast<uv_handle_t*>(&async), NULL);
uv_mutex_destroy(&mutex);
}

template<class Char, class Size>
bool operator()(const Char* buffer, Size size)
{
uv_mutex_lock(&mutex);
buffers.emplace_back(buffer, buffer + size);
uv_mutex_unlock(&mutex);

if (int e = uv_async_send(&async))
{
image_baton.error = true;
image_baton.error_name = "Cannot call async callback";
return false;
}

return true;
}

static void yield_chunk(uv_async_t* handle)
{
using closure_type = chunked_encode_image_baton_t;
closure_type & closure = *reinterpret_cast<closure_type*>(handle->data);

buffer_list_type local_buffers;

uv_mutex_lock(&closure.mutex);
closure.buffers.swap(local_buffers);
uv_mutex_unlock(&closure.mutex);

Nan::HandleScope scope;

for (auto const & buffer : local_buffers)
{
v8::Local<v8::Value> argv[2] = {
Nan::Null(), Nan::CopyBuffer(buffer.data(),
buffer.size()).ToLocalChecked() };
Nan::MakeCallback(Nan::GetCurrentContext()->Global(),
Nan::New(closure.image_baton.cb), 2, argv);
}
}
};

void Image::EIO_EncodeChunked(uv_work_t* work)
{
using closure_type = chunked_encode_image_baton_t;
closure_type & closure = *reinterpret_cast<closure_type*>(work->data);
try
{
callback_streambuf<closure_type&, 1024> streambuf(closure);
std::ostream stream(&streambuf);
save_to_stream(*closure.image_baton.im->this_,
stream, closure.image_baton.format);
stream.flush();

uv_mutex_lock(&closure.mutex);
closure.buffers.emplace_back(); // Signalize end of stream
uv_mutex_unlock(&closure.mutex);
}
catch (std::exception const& ex)
{
closure.image_baton.error = true;
closure.image_baton.error_name = ex.what();
}
}

void Image::EIO_AfterEncodeChunked(uv_work_t* work, int status)
{
using closure_type = chunked_encode_image_baton_t;
closure_type & closure = *reinterpret_cast<closure_type*>(work->data);

if (closure.image_baton.error)
{
v8::Local<v8::Value> argv[1] = {
Nan::Error(closure.image_baton.error_name.c_str()) };
Nan::MakeCallback(Nan::GetCurrentContext()->Global(),
Nan::New(closure.image_baton.cb), 1, argv);
}
else
{
closure_type::yield_chunk(&closure.async);
}

closure.image_baton.im->Unref();
closure.image_baton.cb.Reset();
delete &closure;
}

NAN_METHOD(Image::encodeChunked)
{
Image* im = Nan::ObjectWrap::Unwrap<Image>(info.Holder());

std::string format = "png";
palette_ptr palette;

// accept custom format
if (info.Length() >= 1){
if (!info[0]->IsString()) {
Nan::ThrowTypeError("first arg, 'format' must be a string");
return;
}
format = TOSTR(info[0]);
}

// options hash
if (info.Length() >= 2) {
if (!info[1]->IsObject()) {
Nan::ThrowTypeError("optional second arg must be an options object");
return;
}

v8::Local<v8::Object> options = info[1].As<v8::Object>();

if (options->Has(Nan::New("palette").ToLocalChecked()))
{
v8::Local<v8::Value> format_opt = options->Get(Nan::New("palette").ToLocalChecked());
if (!format_opt->IsObject()) {
Nan::ThrowTypeError("'palette' must be an object");
return;
}

v8::Local<v8::Object> obj = format_opt.As<v8::Object>();
if (obj->IsNull() || obj->IsUndefined() || !Nan::New(Palette::constructor)->HasInstance(obj)) {
Nan::ThrowTypeError("mapnik.Palette expected as second arg");
return;
}

palette = Nan::ObjectWrap::Unwrap<Palette>(obj)->palette();
}
}

// ensure callback is a function
v8::Local<v8::Value> callback = info[info.Length() - 1];
if (!callback->IsFunction()) {
Nan::ThrowTypeError("last argument must be a callback function");
return;
}

chunked_encode_image_baton_t *closure = new chunked_encode_image_baton_t();
closure->image_baton.request.data = closure;
closure->image_baton.im = im;
closure->image_baton.format = format;
closure->image_baton.palette = palette;
closure->image_baton.error = false;
closure->image_baton.cb.Reset(callback.As<v8::Function>());

uv_queue_work(uv_default_loop(), &closure->image_baton.request, EIO_EncodeChunked, EIO_AfterEncodeChunked);
im->Ref();
}

/**
* Get a constrained view of this image given x, y, width, height parameters.
* @memberof Image
Expand Down
3 changes: 3 additions & 0 deletions src/mapnik_image.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ class Image: public Nan::ObjectWrap {
static NAN_METHOD(setPixel);
static NAN_METHOD(encodeSync);
static NAN_METHOD(encode);
static NAN_METHOD(encodeChunked);
static void EIO_Encode(uv_work_t* req);
static void EIO_AfterEncode(uv_work_t* req);
static void EIO_EncodeChunked(uv_work_t* req);
static void EIO_AfterEncodeChunked(uv_work_t* req, int status);

static NAN_METHOD(setGrayScaleToAlpha);
static NAN_METHOD(width);
Expand Down
48 changes: 48 additions & 0 deletions test/image.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,54 @@ describe('mapnik.Image ', function() {
assert.equal(im.encodeSync().length, im2.encodeSync().length);
});

it('should be able to encode by chunks', function(done) {
var im = new mapnik.Image(256, 256);
assert.ok(im instanceof mapnik.Image);

assert.equal(im.width(), 256);
assert.equal(im.height(), 256);
assert.throws(function() { im.view(); });

var actual_length = 0;
var chunk_count = 0;

im.encodeChunked('png32', {}, function(err, result) {
if (err) throw err;
if (result.length == 0) {
assert.equal(1, chunk_count);
assert.equal(im.encodeSync('png32').length, actual_length);
done();
} else {
chunk_count++;
actual_length += result.length;
}
});
});

it('should be able to encode by chunks - multiple chunks', function(done) {
var im = new mapnik.Image.openSync('./test/data/images/sat_image.png');
assert.ok(im instanceof mapnik.Image);

assert.equal(im.width(), 75);
assert.equal(im.height(), 75);
assert.throws(function() { im.view(); });

var actual_length = 0;
var chunk_count = 0;

im.encodeChunked('png32', {}, function(err, result) {
if (err) throw err;
if (result.length == 0) {
assert.equal(16, chunk_count);
assert.equal(im.encodeSync('png32').length, actual_length);
done();
} else {
chunk_count++;
actual_length += result.length;
}
});
});

it('should be able to open via byte stream', function(done) {
var im = new mapnik.Image(256, 256);
// png
Expand Down

0 comments on commit c553d8a

Please sign in to comment.