-
Notifications
You must be signed in to change notification settings - Fork 43
/
rbzmq.c
1864 lines (1696 loc) · 64.9 KB
/
rbzmq.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <string.h>
#include <ruby.h>
#ifdef HAVE_RUBY_IO_H
#include <ruby/io.h>
#else
#include <rubyio.h>
#endif
#include <zmq.h>
#if defined _MSC_VER
#ifndef int8_t
typedef __int8 int8_t;
#endif
#ifndef int16_t
typedef __int16 int16_t;
#endif
#ifndef int32_t
typedef __int32 int32_t;
#endif
#ifndef int64_t
typedef __int64 int64_t;
#endif
#ifndef uint8_t
typedef unsigned __int8 uint8_t;
#endif
#ifndef uint16_t
typedef unsigned __int16 uint16_t;
#endif
#ifndef uint32_t
typedef unsigned __int32 uint32_t;
#endif
#ifndef uint64_t
typedef unsigned __int64 uint64_t;
#endif
#else
#include <stdint.h>
#endif
struct zmq_context {
void *context;
unsigned refs;
};
struct zmq_socket {
void *socket;
struct zmq_context *context;
};
#define Check_Socket(__socket) \
do {\
if ((__socket->socket) == NULL)\
rb_raise (rb_eIOError, "closed socket");\
} while(0)
VALUE socket_type;
VALUE exception_type;
/*
* Document-class: ZMQ
*
* Ruby interface to the zeromq messaging library.
*/
/*
* call-seq:
* ZMQ.version() -> [major, minor, patch]
*
* Returns the version of the zeromq library.
*/
static VALUE module_version (VALUE self_)
{
int major, minor, patch;
zmq_version(&major, &minor, &patch);
return rb_ary_new3 (3, INT2NUM (major), INT2NUM (minor), INT2NUM (patch));
}
/*
* Document-class: ZMQ::Context
*
* ZeroMQ library context.
*/
static void context_free (void *ptr)
{
struct zmq_context * ctx = (struct zmq_context *)ptr;
assert(ctx->refs != 0);
ctx->refs--;
if (ctx->refs == 0) {
if (ctx->context != NULL) {
int rc = zmq_term(ctx->context);
assert (rc == 0);
}
xfree(ctx);
}
}
static VALUE context_alloc (VALUE class_)
{
struct zmq_context * ctx;
ctx = ALLOC(struct zmq_context);
ctx->context = NULL;
ctx->refs = 1;
return rb_data_object_alloc (class_, ctx, 0, context_free);
}
/*
* Document-method: new
*
* call-seq:
* new(io_threads=1)
*
* Initializes a new 0MQ context. The io_threads argument specifies the size
* of the 0MQ thread pool to handle I/O operations. If your application is
* using only the _inproc_ transport for you may set this to zero; otherwise,
* set it to at least one.
*/
static VALUE context_initialize (int argc_, VALUE* argv_, VALUE self_)
{
VALUE io_threads;
rb_scan_args (argc_, argv_, "01", &io_threads);
struct zmq_context * ctx = NULL;
Data_Get_Struct (self_, void, ctx);
assert (ctx->context == NULL);
void *zctx = zmq_init (NIL_P (io_threads) ? 1 : NUM2INT (io_threads));
if (!zctx) {
rb_raise (exception_type, "%s", zmq_strerror (zmq_errno ()));
return Qnil;
}
ctx->context = zctx;
return self_;
}
/*
* call-seq:
* zmq.close() -> nil
*
* Terminates the 0MQ context.
*
* Context termination is performed in the following steps:
*
* 1. Any blocking operations currently in progress on sockets open
* within context shall return immediately with an error code of
* ETERM. With the exception of ZMQ::Socket#close(), any further operations on
* sockets open within context shall fail with an error code of ETERM.
*
* 2. After interrupting all blocking calls, zmq_term() shall block until
* the following conditions are satisfied:
* * All sockets open within context have been closed with ZMQ::Socket#close().
* * For each socket within context, all messages sent by the
* application with ZMQ::Socket#send() have either been physically
* transferred to a network peer, or the socket’s linger period
* set with the ZMQ::LINGER socket option has expired.
*
* For further details regarding socket linger behaviour refer to the
* ZMQ::LINGER option in ZMQ::Socket#setsockopt().
*/
static VALUE context_close (VALUE self_)
{
struct zmq_context * ctx = NULL;
Data_Get_Struct (self_, void, ctx);
if (ctx->context != NULL) {
int rc = zmq_term(ctx->context);
assert (rc == 0);
ctx->context = NULL;
}
return Qnil;
}
struct poll_state {
int event;
int nitems;
zmq_pollitem_t *items;
VALUE io_objects;
};
typedef VALUE(*iterfunc)(ANYARGS);
static VALUE poll_add_item(VALUE io_, void *ps_) {
struct poll_state *state = (struct poll_state *)ps_;
long i;
for (i = 0; i < RARRAY_LEN (state->io_objects); i++) {
if (RARRAY_PTR (state->io_objects)[i] == io_) {
#ifdef HAVE_RUBY_IO_H
state->items[i].events |= state->event;
return Qnil;
#else
if (CLASS_OF (io_) == socket_type) {
state->items[i].events |= state->event;
return Qnil;
}
OpenFile *fptr;
GetOpenFile (io_, fptr);
if (state->event == ZMQ_POLLOUT &&
GetWriteFile (fptr) != NULL &&
fileno (GetWriteFile (fptr)) != state->items[i].fd) {
break;
}
else {
state->items[i].events |= state->event;
return Qnil;
}
#endif
}
}
/* Not found in array. Add a new poll item. */
rb_ary_push (state->io_objects, io_);
zmq_pollitem_t *item = &state->items[state->nitems];
state->nitems++;
item->events = state->event;
if (CLASS_OF (io_) == socket_type) {
struct zmq_socket *s;
Data_Get_Struct (io_, struct zmq_socket, s);
item->socket = s->socket;
item->fd = -1;
}
else {
item->socket = NULL;
#ifdef HAVE_RUBY_IO_H
rb_io_t *fptr;
GetOpenFile (io_, fptr);
item->fd = fileno (rb_io_stdio_file (fptr));
#else
OpenFile *fptr;
GetOpenFile (io_, fptr);
if (state->event == ZMQ_POLLIN && GetReadFile (fptr) != NULL) {
item->fd = fileno (GetReadFile (fptr));
}
else if (state->event == ZMQ_POLLOUT && GetWriteFile (fptr) != NULL) {
item->fd = fileno (GetWriteFile (fptr));
}
else if (state->event == ZMQ_POLLERR) {
if (GetReadFile(fptr) != NULL)
item->fd = fileno (GetReadFile (fptr));
else
item->fd = fileno (GetWriteFile (fptr));
}
#endif
}
return Qnil;
}
#ifdef HAVE_RUBY_INTERN_H
struct zmq_poll_args {
zmq_pollitem_t *items;
int nitems;
long timeout_usec;
int rc;
};
static VALUE zmq_poll_blocking (void* args_)
{
struct zmq_poll_args *poll_args = (struct zmq_poll_args *)args_;
poll_args->rc = zmq_poll (poll_args->items, poll_args->nitems, poll_args->timeout_usec);
return Qnil;
}
#endif
struct select_arg {
VALUE readset;
VALUE writeset;
VALUE errset;
long timeout_usec;
zmq_pollitem_t *items;
};
static VALUE internal_select(VALUE argval)
{
struct select_arg *arg = (struct select_arg *)argval;
int rc, nitems, i;
zmq_pollitem_t *item;
struct poll_state ps;
ps.nitems = 0;
ps.items = arg->items;
ps.io_objects = rb_ary_new ();
if (!NIL_P (arg->readset)) {
ps.event = ZMQ_POLLIN;
rb_iterate(rb_each, arg->readset, (iterfunc)poll_add_item, (VALUE)&ps);
}
if (!NIL_P (arg->writeset)) {
ps.event = ZMQ_POLLOUT;
rb_iterate(rb_each, arg->writeset, (iterfunc)poll_add_item, (VALUE)&ps);
}
if (!NIL_P (arg->errset)) {
ps.event = ZMQ_POLLERR;
rb_iterate(rb_each, arg->errset, (iterfunc)poll_add_item, (VALUE)&ps);
}
/* Reset nitems to the actual number of zmq_pollitem_t records we're sending. */
nitems = ps.nitems;
#ifdef HAVE_RUBY_INTERN_H
if (arg->timeout_usec != 0) {
struct zmq_poll_args poll_args;
poll_args.items = ps.items;
poll_args.nitems = ps.nitems;
poll_args.timeout_usec = arg->timeout_usec;
rb_thread_blocking_region (zmq_poll_blocking, (void*)&poll_args, NULL, NULL);
rc = poll_args.rc;
}
else
#endif
rc = zmq_poll (ps.items, ps.nitems, arg->timeout_usec);
if (rc == -1) {
rb_raise(exception_type, "%s", zmq_strerror (zmq_errno ()));
return Qnil;
}
else if (rc == 0)
return Qnil;
VALUE read_active = rb_ary_new ();
VALUE write_active = rb_ary_new ();
VALUE err_active = rb_ary_new ();
for (i = 0, item = &ps.items[0]; i < nitems; i++, item++) {
if (item->revents != 0) {
VALUE io = RARRAY_PTR (ps.io_objects)[i];
if (item->revents & ZMQ_POLLIN)
rb_ary_push (read_active, io);
if (item->revents & ZMQ_POLLOUT)
rb_ary_push (write_active, io);
if (item->revents & ZMQ_POLLERR)
rb_ary_push (err_active, io);
}
}
return rb_ary_new3 (3, read_active, write_active, err_active);
}
static VALUE module_select_internal(VALUE readset, VALUE writeset, VALUE errset, long timeout_usec)
{
size_t nitems;
struct select_arg arg;
/* Conservative estimate for nitems before we traverse the lists. */
nitems = (NIL_P (readset) ? 0 : RARRAY_LEN (readset)) +
(NIL_P (writeset) ? 0 : RARRAY_LEN (writeset)) +
(NIL_P (errset) ? 0 : RARRAY_LEN (errset));
arg.items = ALLOC_N(zmq_pollitem_t, nitems);
arg.readset = readset;
arg.writeset = writeset;
arg.errset = errset;
arg.timeout_usec = timeout_usec;
return rb_ensure(internal_select, (VALUE)&arg, (VALUE (*)())xfree, (VALUE)arg.items);
}
/*
* call-seq:
* ZMQ.select(in, out=[], err=[], timeout=nil) -> [in, out, err] | nil
*
* Like IO.select, but also works with 0MQ sockets.
*/
static VALUE module_select (int argc_, VALUE* argv_, VALUE self_)
{
VALUE readset, writeset, errset, timeout;
rb_scan_args (argc_, argv_, "13", &readset, &writeset, &errset, &timeout);
long timeout_usec;
if (!NIL_P (readset)) Check_Type (readset, T_ARRAY);
if (!NIL_P (writeset)) Check_Type (writeset, T_ARRAY);
if (!NIL_P (errset)) Check_Type (errset, T_ARRAY);
if (NIL_P (timeout))
timeout_usec = -1;
else
timeout_usec = (long)(NUM2DBL (timeout) * 1000000);
return module_select_internal(readset, writeset, errset, timeout_usec);
}
static void socket_free (void *ptr)
{
struct zmq_socket *s = (struct zmq_socket *)ptr;
if (s->socket != NULL) {
int rc = zmq_close(s->socket);
assert (rc == 0);
}
if (s->context != NULL) {
/* Decrement the refcounter for the context (and possibly free it). */
context_free(s->context);
}
xfree(s);
}
/*
* Document-method: socket
*
* call-seq:
* zmq.socket(socket_type)
*
* Creates a new 0MQ socket. The socket_type argument specifies the socket
* type, which determines the semantics of communication over the socket.
*
* The newly created socket is initially unbound, and not associated with any
* endpoints. In order to establish a message flow a socket must first be
* connected to at least one endpoint with connect(), or at least one
* endpoint must be created for accepting incoming connections with
* bind().
*
* For a description of the various socket types, see ZMQ::Socket.
*/
static VALUE context_socket (VALUE self_, VALUE type_)
{
struct zmq_context * ctx = NULL;
void *socket;
struct zmq_socket *s;
Data_Get_Struct (self_, void, ctx);
socket = zmq_socket(ctx->context, NUM2INT (type_));
if (!socket) {
rb_raise (exception_type, "%s", zmq_strerror (zmq_errno ()));
return Qnil;
}
s = ALLOC(struct zmq_socket);
/*
* Grab a reference on the context, to prevent it from being garbage-
* collected before the socket is closed.
*/
s->context = ctx;
s->context->refs++;
s->socket = socket;
return Data_Wrap_Struct(socket_type, 0, socket_free, s);
}
/*
* Document-class: ZMQ::Socket
*
* ZeroMQ message socket.
*
* = Description
* == Key differences to conventional sockets
* Generally speaking, conventional sockets present a _synchronous_ interface
* to either connection-oriented reliable byte streams (SOCK_STREAM), or
* connection-less unreliable datagrams (SOCK_DGRAM). In comparison, 0MQ
* sockets present an abstraction of an asynchronous <em>message queue</em>, with the
* exact queueing semantics depending on the socket type in use. Where
* conventional sockets transfer streams of bytes or discrete datagrams, 0MQ
* sockets transfer discrete _messages_.
*
* 0MQ sockets being _asynchronous_ means that the timings of the physical
* connection setup and teardown, reconnect and effective delivery are
* transparent to the user and organized by 0MQ itself. Further, messages
* may be _queued_ in the event that a peer is unavailable to receive them.
*
* Conventional sockets allow only strict one-to-one (two peers),
* many-to-one (many clients, one server), or in some cases one-to-many
* (multicast) relationships. With the exception of ZMQ::PAIR, 0MQ sockets
* may be connected <b>to multiple endpoints</b> using connect(), while
* simultaneously accepting incoming connections <b>from multiple endpoints</b>
* bound to the socket using bind(), thus allowing many-to-many relationships.
*
* == Socket Types
*
* The following sections present the socket types defined by 0MQ, grouped by
* the general <em>messaging pattern</em> which is built from related
* socket types.
*
* = Request-reply pattern
* The request-reply pattern is used for sending requests from a _client_ to one
* or more instances of a _service_, and receiving subsequent replies to each
* request sent.
*
* == ZMQ::REQ
* A socket of type ZMQ::REQ is used by a _client_ to send requests to and receive
* replies from a _service_. This socket type allows only an alternating sequence
* of send(request) and subsequent recv(reply) calls. Each request sent
* is load-balanced among all _services_, and each reply received is matched with
* the last issued request.
*
* When a ZMQ::REQ socket enters an exceptional state due to having reached the
* high water mark for all _services_, or if there are no _services_ at all, then
* any send() operations on the socket shall block until the exceptional
* state ends or at least one _service_ becomes available for sending; messages
* are not discarded.
*
* === Summary of ZMQ::REQ characteristics
* [Compatible peer sockets] ZMQ::REP
* [Direction] Bidirectional
* [Send/receive pattern] Send, Receive, Send, Receive, ...
* [Outgoing routing strategy] Load-balanced
* [Incoming routing strategy] Last peer
* [ZMQ::HWM option action] Block
*
* == ZMQ::REP
* A socket of type ZMQ::REP is used by a _service_ to receive requests from and
* send replies to a _client_. This socket type allows only an alternating
* sequence of recv(request) and subsequent send(reply) calls. Each
* request received is fair-queued from among all _clients_, and each reply sent
* is routed to the _client_ that issued the last request.
*
* When a ZMQ::REP socket enters an exceptional state due to having reached the
* high water mark for a _client_, then any replies sent to the _client_ in
* question shall be dropped until the exceptional state ends.
*
* === Summary of ZMQ::REP characteristics
* [Compatible peer sockets] ZMQ::REQ
* [Direction] Bidirectional
* [Send/receive pattern] Receive, Send, Receive, Send, ...
* [Incoming routing strategy] Fair-queued
* [Outgoing routing stratagy] Last peer
* [ZMQ::HWM option action] Drop
*
*
* = Publish-subscribe pattern
* The publish-subscribe pattern is used for one-to-many distribution of data
* from a single _publisher_ to multiple _subscribers_ in a fanout fashion.
*
* == ZMQ::PUB
* A socket of type ZMQ::PUB is used by a publisher to distribute data. Messages
* sent are distributed in a fanout fashion to all connected peers. The
* recv() function is not implemented for this socket type.
*
* When a ZMQ::PUB socket enters an exceptional state due to having reached the
* high water mark for a _subscriber_, then any messages that would be sent to the
* subscriber in question shall instead be dropped until the exceptional state
* ends.
*
* === Summary of ZMQ::PUB characteristics
* [Compatible peer sockets] ZMQ::SUB
* [Direction] Unidirectional
* [Send/receive pattern] Send only
* [Incoming routing strategy] N/A
* [Outgoing routing strategy] Fanout
* [ZMQ::HWM option action] Drop
*
* == ZMQ::SUB
*
* A socket of type ZMQ::SUB is used by a _subscriber_ to subscribe to data
* distributed by a _publisher_. Initially a ZMQ::SUB socket is not subscribed to
* any messages, use the ZMQ::SUBSCRIBE option of setsockopt() to specify which
* messages to subscribe to. The send() function is not implemented for this
* socket type.
*
* === Summary of ZMQ::SUB characteristics
* [Compatible peer sockets] ZMQ::PUB
* [Direction] Unidirectional
* [Send/receive pattern] Receive only
* [Incoming routing strategy] Fair-queued
* [Outgoing routing strategy] N/A
* [ZMQ::HWM option action] N/A
*
* = Pipeline pattern
* The pipeline pattern is used for distributing data to _nodes_ arranged in a
* pipeline. Data always flows down the pipeline, and each stage of the pipeline
* is connected to at least one _node_. When a pipeline stage is connected to
* multiple _nodes_ data is load-balanced among all connected _nodes_.
*
* == ZMQ::PUSH
*
* A socket of type ZMQ::PUSH is used by a pipeline node to send messages
* to downstream pipeline nodes. Messages are load-balanced to all connected
* downstream nodes. The ZMQ::recv() function is not implemented for this socket
* type.
*
* When a ZMQ::PUSH socket enters an exceptional state due to having
* reached the high water mark for all downstream _nodes_, or if there are no
* downstream _nodes_ at all, then any send() operations on the socket shall
* block until the exceptional state ends or at least one downstream _node_
* becomes available for sending; messages are not discarded.
*
* === Summary of ZMQ::PUSH characteristics
* [Compatible peer sockets] ZMQ::PULL
* [Direction] Unidirectional
* [Send/receive pattern] Send only
* [Incoming routing strategy] N/A
* [Outgoing routing strategy] Load-balanced
* [ZMQ::HWM option action] Block
*
* == ZMQ::PULL
*
* A socket of type ZMQ::PULL is used by a pipeline _node_ to receive messages
* from upstream pipeline _nodes_. Messages are fair-queued from among all
* connected upstream nodes. The send() function is not implemented for
* this socket type.
*
* === Summary of ZMQ::PULL characteristics
* [Compatible peer sockets] ZMQ::PUSH
* [Direction] Unidirectional
* [Send/receive pattern] Receive only
* [Incoming routing strategy] Fair-queued
* [Outgoing routing strategy] N/A
* [ZMQ::HWM option action] N/A
*
* = Exclusive pair pattern
*
* The exclusive pair is an advanced pattern used for communicating exclusively
* between two peers.
*
* == ZMQ::PAIR
*
* A socket of type ZMQ::PAIR can only be connected to a single peer at any one
* time. No message routing or filtering is performed on messages sent over a
* ZMQ::PAIR socket.
*
* When a ZMQ::PAIR socket enters an exceptional state due to having reached the
* high water mark for the connected peer, or if no peer is connected, then any
* send() operations on the socket shall block until the peer becomes
* available for sending; messages are not discarded.
*
* *NOTE* ZMQ::PAIR sockets are experimental, and are currently missing several
* features such as auto-reconnection.
*
* === Summary of ZMQ::PAIR characteristics
* [Compatible peer sockets] ZMQ::PAIR
* [Direction] Bidirectional
* [Send/receive pattern] Unrestricted
* [Incoming routing strategy] N/A
* [Outcoming routing strategy] N/A
* [ZMQ::HWM option action] Block
*/
/*
* call-seq:
* socket.getsockopt(option)
*
* Retrieves the value of the specified 0MQ socket option.
*
* The following options can be retrievesd with the getsockopt() function:
*
* == ZMQ::RCVMORE: More message parts to follow
* The ZMQ::RCVMORE option shall return a boolean value indicating if the
* multi-part message currently being read from the specified socket has more
* message parts to follow. If there are no message parts to follow or if the
* message currently being read is not a multi-part message a value of false
* shall be returned. Otherwise, a value of true shall be returned.
*
* Refer to send() and recv() for a detailed description of sending/receiving
* multi-part messages.
*
* [Option value type] Boolean
* [Option value unit] N/A
* [Default value] N/A
* [Applicable socket types] all
*
* == ZMQ::HWM: Retrieve high water mark
* The ZMQ::HWM option shall retrieve the high water mark for the specified
* _socket_. The high water mark is a hard limit on the maximum number of
* outstanding messages 0MQ shall queue in memory for any single peer that the
* specified _socket_ is communicating with.
*
* If this limit has been reached the socket shall enter an exceptional state
* and depending on the socket type, 0MQ shall take appropriate action such as
* blocking or dropping sent messages. Refer to the individual socket
* descriptions in ZMQ::Socket for details on the exact action taken for each
* socket type.
*
* The default ZMQ::HWM value of zero means "no limit".
*
* [Option value type] Integer
* [Option value unit] messages
* [Default value] 0
* [Applicable socket types] all
*
* == ZMQ::SWAP: Retrieve disk offload size
* The ZMQ::SWAP option shall retrieve the disk offload (swap) size for the
* specified _socket_. A socket which has ZMQ::SWAP set to a non-zero value may
* exceed it’s high water mark; in this case outstanding messages shall be
* offloaded to storage on disk rather than held in memory.
*
* The value of ZMQ::SWAP defines the maximum size of the swap space in bytes.
*
* [Option value type] Integer
* [Option value unit] bytes
* [Default value] 0
* [Applicable socket types] all
*
* == ZMQ::AFFINITY: Retrieve I/O thread affinity
* The ZMQ::AFFINITY option shall retrieve the I/O thread affinity for newly
* created connections on the specified _socket_.
*
* Affinity determines which threads from the 0MQ I/O thread pool associated
* with the socket’s _context_ shall handle newly created connections. A value of
* zero specifies no affinity, meaning that work shall be distributed fairly
* among all 0MQ I/O threads in the thread pool. For non-zero values, the lowest
* bit corresponds to thread 1, second lowest bit to thread 2 and so on. For
* example, a value of 3 specifies that subsequent connections on _socket_ shall
* be handled exclusively by I/O threads 1 and 2.
*
* See also ZMQ::Context#new for details on allocating the number of
* I/O threads for a specific _context_.
*
* [Option value type] Integer
* [Option value unit] N/A (bitmap)
* [Default value] 0
* [Applicable socket types] all
*
* == ZMQ::IDENTITY: Retrieve socket identity
* The ZMQ::IDENTITY option shall retrieve the identity of the specified _socket_.
* Socket identity determines if existing 0MQ infastructure (<em>message queues</em>,
* <em>forwarding devices</em>) shall be identified with a specific application and
* persist across multiple runs of the application.
*
* If the socket has no identity, each run of an application is completely
* separate from other runs. However, with identity set the socket shall re-use
* any existing 0MQ infrastructure configured by the previous run(s). Thus the
* application may receive messages that were sent in the meantime, <em>message
* queue</em> limits shall be shared with previous run(s) and so on.
*
* Identity can be at least one byte and at most 255 bytes long. Identities
* starting with binary zero are reserved for use by 0MQ infrastructure.
*
* [Option value type] String
* [Option value unit] N/A
* [Default value] nil
* [Applicable socket types] all
*
* == ZMQ::RATE: Retrieve multicast data rate
*
* The ZMQ::Rate option shall retrieve the maximum send or receive data
* rate for multicast transports using the specified _socket_.
*
* [Option value type] Integer
* [Option value unit] kilobits per second
* [Default value] 100
* [Applicable socket types] all, when using multicast transports
*
* == ZMQ::RECOVERY_IVL: Get multicast recovery interval
*
* The ZMQ::RECOVERY_IVL option shall retrieve the recovery interval for
* multicast transports using the specified _socket_. The recovery interval
* determines the maximum time in seconds that a receiver can be absent from a
* multicast group before unrecoverable data loss will occur.
*
* [Option value type] Integer
* [Option value unit] seconds
* [Default value] 10
* [Applicable socket types] all, when using multicast transports
*
* == ZMQ::RECOVERY_IVL_MSEC: Get multicast recovery interval in milliseconds
* The ZMQ::RECOVERY_IVL_MSEC option shall retrieve the recovery interval, in
* milliseconds (ms) for multicast transports using the specified socket. The
* recovery interval determines the maximum time in milliseconds that a receiver
* can be absent from a multicast group before unrecoverable data loss will occur.
*
* For backward compatibility, the default value of ZMQ::RECOVERY_IVL_MSEC is -1
* indicating that the recovery interval should be obtained from the
* ZMQ::RECOVERY_IVL option. However, if the ZMQ::RECOVERY_IVL_MSEC value is not
* zero, then it will take precedence, and be used.
*
* [Option value type] Integer
* [Option value unit] milliseconds
* [Default value] -1
* [Applicable socket types] all, when using multicast transports
*
* == ZMQ::MCAST_LOOP: Control multicast loopback
* The ZMQ::MCAST_LOOP option controls whether data sent via multicast transports
* can also be received by the sending host via loopback. A value of zero
* indicates that the loopback functionality is disabled, while the default
* value of 1 indicates that the loopback functionality is enabled. Leaving
* multicast loopback enabled when it is not required can have a negative impact
* on performance. Where possible, disable ZMQ::MCAST_LOOP in production
* environments.
*
* [Option value type] Boolean
* [Option value unit] N/A
* [Default value] true
* [Applicable socket types] all, when using multicast transports
*
* == ZMQ::SNDBUF: Retrieve kernel transmit buffer size
* The ZMQ::SNDBUF option shall retrieve the underlying kernel transmit buffer
* size for the specified _socket_. A value of zero means that the OS default is
* in effect. For details refer to your operating system documentation for the
* SO_SNDBUF socket option.
*
* [Option value type] Integer
* [Option value unit] bytes
* [Default value] 0
* [Applicable socket types] all
*
* == ZMQ::RCVBUF: Retrieve kernel receive buffer size
* The ZMQ::RCVBUF option shall retrieve the underlying kernel receive buffer
* size for the specified _socket_. A value of zero means that the OS default is
* in effect. For details refer to your operating system documentation for the
* SO_RCVBUF socket option.
*
* [Option value type] Integer
* [Option value unit] bytes
* [Default value] 0
* [Applicable socket types] all
*
* == ZMQ::LINGER: Retrieve linger period for socket shutdown
* The ZMQ::LINGER option shall retrieve the linger period for the specified
* socket. The linger period determines how long pending messages which have
* yet to be sent to a peer shall linger in memory after a socket is closed
* with ZMQ::Socket#close(), and further affects the termination of the
* socket’s context with ZMQ#close(). The following outlines the different
* behaviours:
*
* * The default value of −1 specifies an infinite linger period.
* Pending messages shall not be discarded after a call to ZMQ::Socket#close();
* attempting to terminate the socket’s context with ZMQ::Context#close() shall block
* until all pending messages have been sent to a peer.
*
* * The value of 0 specifies no linger period. Pending messages shall be
* discarded immediately when the socket is closed with ZMQ::Socket#close.
*
* * Positive values specify an upper bound for the linger period in
* milliseconds. Pending messages shall not be discarded after a call to
* ZMQ::Socket#close(); attempting to terminate the socket’s context with
* ZMQ::Context#close() shall block until either all pending messages have been sent
* to a peer, or the linger period expires, after which any pending messages
* shall be discarded.
*
* [Option value type] Integer
* [Option value unit] milliseconds
* [Default value] -1 (infinite)
* [Applicable socket types] all
*
* == ZMQ::RECONNECT_IVL: Retrieve reconnection interval
* The ZMQ::RECONNECT_IVL option shall retrieve the reconnection interval for
* the specified socket. The reconnection interval is the maximum period 0MQ
* shall wait between attempts to reconnect disconnected peers when using
* connection−oriented transports.
*
* [Option value type] Integer
* [Option value unit] milliseconds
* [Default value] 100
* [Applicable socket types] all, only for connection-oriented transports
*
* == ZMQ::RECONNECT_IVL_MAX: Retrieve maximum reconnection interval
* The ZMQ::RECONNECT_IVL_MAX option shall set the maximum reconnection interval
* for the specified socket. This is the maximum period ØMQ shall wait between
* attempts to reconnect. On each reconnect attempt, the previous interval
* shall be doubled untill ZMQ::RECONNECT_IVL_MAX is reached. This allows for
* exponential backoff strategy. Default value means no exponential backoff
* is performed and reconnect interval calculations are only based on
* ZMQ::RECONNECT_IVL.
*
* Values less than ZMQ::RECONNECT_IVL will be ignored.
*
* [Option value type] Integer
* [Option value unit] milliseconds
* [Default value] 0 (only use RECONNECT_IVL)
* [Applicable socket types] all, only for connection-oriented transports
*
* == ZMQ::BACKLOG: Retrieve maximum length of the queue of outstanding connections
* The ZMQ::BACKLOG option shall retrieve the maximum length of the queue of
* outstanding peer connections for the specified socket; this only applies to
* connection−oriented transports. For details refer to your operating system
* documentation for the listen function.
*
* [Option value type] Integer
* [Option value unit] connections
* [Default value] 100
* [Applicable socket types] all, only for connection-oriented transports
*
* == ZMQ::FD: Retrieve file descriptor associated with the socket
* The ZMQ::FD option shall retrieve the file descriptor associated with the
* specified socket. The returned file descriptor can be used to integrate the
* socket into an existing event loop; the 0MQ library shall signal any pending
* events on the socket in an edge−triggered fashion by making the file
* descriptor become ready for reading.
*
* === Note
* The ability to read from the returned file descriptor does not necessarily
* indicate that messages are available to be read from, or can be written to,
* the underlying socket; applications must retrieve the actual event state
* with a subsequent retrieval of the ZMQ::EVENTS option.
*
* === Caution
* The returned file descriptor is intended for use with a poll or similar
* system call only. Applications must never attempt to read or write data
* to it directly.
*
* [Option value type] int on POSIX systems, SOCKT on Windows
* [Option value unit] N/A
* [Default value] N/A
* [Applicable socket types] all
*
* == ZMQ::EVENTS: Retrieve socket event state
* The ZMQ::EVENTS option shall retrieve the event state for the specified
* socket. The returned value is a bit mask constructed by OR’ing a combination
* of the following event flags:
*
* === ZMQ::POLLIN
* Indicates that at least one message may be received from the specified
* socket without blocking.
*
* == ZMQ::POLLOUT
* Indicates that at least one message may be sent to the specified socket
* without blocking.
*
* The combination of a file descriptor returned by the ZMQ::FD option being
* ready for reading but no actual events returned by a subsequent retrieval of
* the ZMQ::EVENTS option is valid; applications should simply ignore this case
* and restart their polling operation/event loop.
*
* [Option value type] uint32_t
* [Option value unit] N/A (flags)
* [Default value] N/A
* [Applicable socket types] all
*
* == ZMQ::SNDTIMEO
* Retrieves the timeout for send operations on the socket.
*
* If the value is 0, a send operation on the socket will return immediately, with a
* EAGAIN error if the message cannot be sent. If the value is -1, it will block until the
* message is sent. For all other values, it will try to send the message for that amount
* of time before returning with an EAGAIN error.
*
* [Option value type] Integer
* [Option value unit] milliseconds
* [Default value] -1 (infinite)
* [Applicable socket types] all
*
* == ZMQ::RCVTIMEO
* Retrieves the timeout for receive operations on the socket.
*
* If the value is 0, a receive operation on the socket will return immediately, with a
* EAGAIN error if there is no message to receive. If the value is -1, it will block until
* a message is available. For all other values, it will wait for a message for that
* amount of time before returning with an EAGAIN error.
*
* [Option value type] Integer
* [Option value unit] milliseconds
* [Default value] -1 (infinite)
* [Applicable socket types] all
*
*/
static VALUE socket_getsockopt (VALUE self_, VALUE option_)
{
int rc = 0;
VALUE retval;
struct zmq_socket * s;
Data_Get_Struct (self_, struct zmq_socket, s);
Check_Socket (s);
switch (NUM2INT (option_)) {
#if ZMQ_VERSION >= 20100