-
Notifications
You must be signed in to change notification settings - Fork 52
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
3 changed files
with
312 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
-- configure path so that you can run application | ||
-- from outside the root directory | ||
if package.setsearchroot ~= nil then | ||
package.setsearchroot() | ||
else | ||
-- Workaround for rocks loading in tarantool 1.10 | ||
-- It can be removed in tarantool > 2.2 | ||
-- By default, when you do require('mymodule'), tarantool looks into | ||
-- the current working directory and whatever is specified in | ||
-- package.path and package.cpath. If you run your app while in the | ||
-- root directory of that app, everything goes fine, but if you try to | ||
-- start your app with "tarantool myapp/init.lua", it will fail to load | ||
-- its modules, and modules from myapp/.rocks. | ||
local fio = require('fio') | ||
local app_dir = fio.abspath(fio.dirname(arg[0])) | ||
package.path = app_dir .. '/?.lua;' .. package.path | ||
package.path = app_dir .. '/?/init.lua;' .. package.path | ||
package.path = app_dir .. '/.rocks/share/tarantool/?.lua;' .. package.path | ||
package.path = app_dir .. '/.rocks/share/tarantool/?/init.lua;' .. package.path | ||
package.cpath = app_dir .. '/?.so;' .. package.cpath | ||
package.cpath = app_dir .. '/?.dylib;' .. package.cpath | ||
package.cpath = app_dir .. '/.rocks/lib/tarantool/?.so;' .. package.cpath | ||
package.cpath = app_dir .. '/.rocks/lib/tarantool/?.dylib;' .. package.cpath | ||
end | ||
|
||
local queue = require('queue') | ||
rawset(_G, 'queue', queue) | ||
|
||
-- Do not set listen for now so connector won't be | ||
-- able to send requests until everything is configured. | ||
box.cfg{ | ||
work_dir = os.getenv("TEST_TNT_WORK_DIR"), | ||
memtx_use_mvcc_engine = false, | ||
} | ||
|
||
box.once("init", function() | ||
box.schema.user.create('test', {password = 'test'}) | ||
box.schema.func.create('queue.tube.test_queue:touch') | ||
box.schema.func.create('queue.tube.test_queue:ack') | ||
box.schema.func.create('queue.tube.test_queue:put') | ||
box.schema.func.create('queue.tube.test_queue:drop') | ||
box.schema.func.create('queue.tube.test_queue:peek') | ||
box.schema.func.create('queue.tube.test_queue:kick') | ||
box.schema.func.create('queue.tube.test_queue:take') | ||
box.schema.func.create('queue.tube.test_queue:delete') | ||
box.schema.func.create('queue.tube.test_queue:release') | ||
box.schema.func.create('queue.tube.test_queue:release_all') | ||
box.schema.func.create('queue.tube.test_queue:bury') | ||
box.schema.func.create('queue.identify') | ||
box.schema.func.create('queue.state') | ||
box.schema.func.create('queue.statistics') | ||
box.schema.user.grant('test', 'create,read,write,drop', 'space') | ||
box.schema.user.grant('test', 'read, write', 'space', '_queue_session_ids') | ||
box.schema.user.grant('test', 'create,read,write,execute,drop, usage', 'universe') | ||
box.schema.user.grant('test', 'read,write', 'space', '_queue') | ||
box.schema.user.grant('test', 'read,write', 'space', '_schema') | ||
box.schema.user.grant('test', 'create,read,write', 'space', '_space_sequence') | ||
box.schema.user.grant('test', 'read,write', 'space', '_space') | ||
box.schema.user.grant('test', 'read,write', 'space', '_index') | ||
box.schema.user.grant('test', 'read,write', 'space', '_priv') | ||
if box.space._trigger ~= nil then | ||
box.schema.user.grant('test', 'read', 'space', '_trigger') | ||
end | ||
if box.space._fk_constraint ~= nil then | ||
box.schema.user.grant('test', 'read', 'space', '_fk_constraint') | ||
end | ||
if box.space._ck_constraint ~= nil then | ||
box.schema.user.grant('test', 'read', 'space', '_ck_constraint') | ||
end | ||
if box.space._func_index ~= nil then | ||
box.schema.user.grant('test', 'read', 'space', '_func_index') | ||
end | ||
end) | ||
|
||
-- Set listen only when every other thing is configured. | ||
box.cfg{ | ||
listen = os.getenv("TEST_TNT_LISTEN"), | ||
} | ||
|
||
require('console').start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
// Setup queue module and start Tarantool instance before execution: | ||
// Terminal 1: | ||
// $ make deps | ||
// $ TEST_TNT_LISTEN=3013 tarantool queue/config.lua | ||
// | ||
// Terminal 2: | ||
// $ cd queue | ||
// $ go test -v example_test.go | ||
package queue_test | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
"github.com/tarantool/go-tarantool" | ||
"github.com/tarantool/go-tarantool/queue" | ||
) | ||
|
||
func multiple_put(testData_1 string, q queue.Queue) { | ||
var err error | ||
for i := 0; i < 100; i++ { | ||
if _, err = q.Put(testData_1 + fmt.Sprintf("%d", i)); err != nil { | ||
fmt.Printf("error in put is %v", err) | ||
return | ||
} | ||
} | ||
} | ||
|
||
// Example demonstrates an operations like Put and Take with queue. | ||
func Example_simpleQueue() { | ||
cfg := queue.Cfg{ | ||
Temporary: false, | ||
Kind: queue.FIFO, | ||
} | ||
opts := tarantool.Opts{ | ||
Timeout: 2500 * time.Millisecond, | ||
User: "test", | ||
Pass: "test", | ||
} | ||
|
||
conn, err := tarantool.Connect("127.0.0.1:3013", opts) | ||
if err != nil { | ||
fmt.Printf("error in prepare is %v", err) | ||
return | ||
} | ||
defer conn.Close() | ||
|
||
q := queue.New(conn, "test_queue") | ||
if err := q.Create(cfg); err != nil { | ||
fmt.Printf("error in queue is %v", err) | ||
return | ||
} | ||
|
||
defer q.Drop() | ||
|
||
go multiple_put("test_put", q) | ||
|
||
// Output: data_1: test_data_1 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
local state = require('queue.abstract.state') | ||
|
||
local num_type = require('queue.compat').num_type | ||
local str_type = require('queue.compat').str_type | ||
local check_version = require('queue.compat').check_version | ||
|
||
local tube = {} | ||
local method = {} | ||
|
||
-- validate space of queue | ||
local function validate_space(space) | ||
-- check indexes | ||
local indexes = {'task_id', 'status'} | ||
for _, index in pairs(indexes) do | ||
if space.index[index] == nil then | ||
error(string.format('space "%s" does not have "%s" index', | ||
space.name, index)) | ||
end | ||
end | ||
end | ||
|
||
-- create space | ||
function tube.create_space(space_name, opts) | ||
local space_opts = {} | ||
local if_not_exists = opts.if_not_exists or false | ||
space_opts.temporary = opts.temporary or false | ||
space_opts.engine = opts.engine or 'memtx' | ||
space_opts.format = { | ||
{name = 'task_id', type = num_type()}, | ||
{name = 'status', type = str_type()}, | ||
{name = 'data', type = '*'} | ||
} | ||
|
||
local space = box.space[space_name] | ||
if if_not_exists and space then | ||
-- Validate the existing space. | ||
validate_space(box.space[space_name]) | ||
return space | ||
end | ||
box.schema.sequence.create('S',{min=1, start=1}) | ||
space = box.schema.create_space(space_name, space_opts) | ||
space:create_index('task_id', {sequence = 'S'}) | ||
space:create_index('status', { | ||
type = 'tree', | ||
parts = {2, str_type(), 1, num_type()} | ||
}) | ||
return space | ||
end | ||
|
||
-- start tube on space | ||
function tube.new(space, on_task_change) | ||
validate_space(space) | ||
|
||
on_task_change = on_task_change or (function() end) | ||
local self = setmetatable({ | ||
space = space, | ||
on_task_change = on_task_change, | ||
}, { __index = method }) | ||
return self | ||
end | ||
|
||
-- normalize task: cleanup all internal fields | ||
function method.normalize_task(self, task) | ||
return task | ||
end | ||
|
||
-- check if mvcc is enabled | ||
--- returns true if mvcc is enabled | ||
-- check if mvcc is enabled | ||
local function check_mvcc_state() | ||
if not check_version({2, 6, 1}) then | ||
return false | ||
end | ||
|
||
if box.cfg.memtx_use_mvcc_engine then | ||
return true | ||
end | ||
|
||
return false | ||
end | ||
|
||
-- put task in space | ||
function method.put(self, data, opts) | ||
local max = box.sequence.S:next() | ||
local task = self.space:insert{max, state.READY, data} | ||
self.on_task_change(task, 'put') | ||
return task | ||
end | ||
|
||
-- take task | ||
function method.take(self) | ||
local task = self.space.index.status:min{state.READY} | ||
if task ~= nil and task[2] == state.READY then | ||
task = self.space:update(task[1], { { '=', 2, state.TAKEN } }) | ||
self.on_task_change(task, 'take') | ||
return task | ||
end | ||
end | ||
|
||
-- touch task | ||
function method.touch(self, id, ttr) | ||
error('fifo queue does not support touch') | ||
end | ||
|
||
-- delete task | ||
function method.delete(self, id) | ||
local task = self.space:get(id) | ||
self.space:delete(id) | ||
if task ~= nil then | ||
task = task:transform(2, 1, state.DONE) | ||
self.on_task_change(task, 'delete') | ||
end | ||
return task | ||
end | ||
|
||
-- release task | ||
function method.release(self, id, opts) | ||
local task = self.space:update(id, {{ '=', 2, state.READY }}) | ||
if task ~= nil then | ||
self.on_task_change(task, 'release') | ||
end | ||
return task | ||
end | ||
|
||
-- bury task | ||
function method.bury(self, id) | ||
local task = self.space:update(id, {{ '=', 2, state.BURIED }}) | ||
self.on_task_change(task, 'bury') | ||
return task | ||
end | ||
|
||
-- unbury several tasks | ||
function method.kick(self, count) | ||
for i = 1, count do | ||
local task = self.space.index.status:min{ state.BURIED } | ||
if task == nil then | ||
return i - 1 | ||
end | ||
if task[2] ~= state.BURIED then | ||
return i - 1 | ||
end | ||
|
||
task = self.space:update(task[1], {{ '=', 2, state.READY }}) | ||
self.on_task_change(task, 'kick') | ||
end | ||
return count | ||
end | ||
|
||
-- peek task | ||
function method.peek(self, id) | ||
return self.space:get{id} | ||
end | ||
|
||
-- get iterator to tasks in a certain state | ||
function method.tasks_by_state(self, task_state) | ||
return self.space.index.status:pairs(task_state) | ||
end | ||
|
||
function method.truncate(self) | ||
self.space:truncate() | ||
end | ||
|
||
-- This driver has no background activity. | ||
-- Implement dummy methods for the API requirement. | ||
function method.start() | ||
return | ||
end | ||
|
||
function method.stop() | ||
return | ||
end | ||
|
||
return tube |