Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid to close @socket of MQTT::Client unexpectedly #130

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 61 additions & 30 deletions lib/mqtt/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def initialize(*args)
@read_queue = Queue.new
@pubacks = {}
@read_thread = nil
@write_semaphore = Mutex.new
@socket_semaphore = Mutex.new
@pubacks_semaphore = Mutex.new
end

Expand Down Expand Up @@ -227,7 +227,9 @@ def connect(clientid = nil)

raise 'No MQTT server host set when attempting to connect' if @host.nil?

unless connected?
@socket_semaphore.synchronize do
break if socket_alive?

# Create network socket
tcp_socket = TCPSocket.new(@host, @port)

Expand Down Expand Up @@ -261,15 +263,27 @@ def connect(clientid = nil)
)

# Send packet
send_packet(packet)
send_packet(packet, false)

# Receive response
receive_connack

# Start packet reading thread
@read_thread = Thread.new(Thread.current) do |parent|
Thread.current[:parent] = parent
receive_packet while connected?
no_error = true
no_error = receive_packet while no_error && connected?

if no_error
# Should not reach here on normal state since `disconnect` kills
# this thread, but it will occur when `receive_packet` catches no
# error and # `connected?` returns false. An error should be raised
# in this case too to break `get` loop.
@socket_semaphore.synchronize do
close_socket(false)
end
Thread.current[:parent].raise(MQTT::NotConnectedException)
end
end
end

Expand All @@ -290,21 +304,16 @@ def disconnect(send_msg = true)
@read_thread.kill if @read_thread && @read_thread.alive?
@read_thread = nil

return unless connected?

# Close the socket if it is open
if send_msg
packet = MQTT::Packet::Disconnect.new
send_packet(packet)
@socket_semaphore.synchronize do
close_socket(send_msg)
end
@socket.close unless @socket.nil?
handle_close
@socket = nil
end

# Checks whether the client is connected to the server.
def connected?
[email protected]? && [email protected]?
@socket_semaphore.synchronize do
socket_alive?
end
end

# Publish a message on a particular topic to the MQTT server.
Expand Down Expand Up @@ -454,26 +463,32 @@ def unsubscribe(*topics)

private

def socket_alive?
[email protected]? && [email protected]?
end

# Try to read a packet from the server
# Also sends keep-alive ping packets.
def receive_packet
# Poll socket - is there data waiting?
result = IO.select([@socket], [], [], SELECT_TIMEOUT)
handle_timeouts
unless result.nil?
# Yes - read in the packet
packet = MQTT::Packet.read(@socket)
handle_packet packet
@socket_semaphore.synchronize do
unless result.nil?
# Yes - read in the packet
packet = MQTT::Packet.read(@socket)
handle_packet packet
end
keep_alive!
end
keep_alive!
true
# Pass exceptions up to parent thread
rescue Exception => exp
unless @socket.nil?
@socket.close
@socket = nil
handle_close
@socket_semaphore.synchronize do
close_socket(false)
end
Thread.current[:parent].raise(exp)
false
end

def wait_for_puback(id, queue)
Expand Down Expand Up @@ -509,6 +524,19 @@ def handle_close
end
end

def close_socket(send_msg = true)
return unless socket_alive?

# Close the socket if it is open
if send_msg
packet = MQTT::Packet::Disconnect.new
send_packet(packet, false)
end
@socket.close unless @socket.nil?
handle_close
@socket = nil
end

if Process.const_defined? :CLOCK_MONOTONIC
def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
Expand All @@ -521,12 +549,12 @@ def current_time
end

def keep_alive!
return unless @keep_alive > 0 && connected?
return unless @keep_alive > 0 && socket_alive?

response_timeout = (@keep_alive * 1.5).ceil
if current_time >= @last_ping_request + @keep_alive
packet = MQTT::Packet::Pingreq.new
send_packet(packet)
send_packet(packet, false)
@last_ping_request = current_time
elsif current_time > @last_ping_response + response_timeout
raise MQTT::ProtocolException, "No Ping Response received for #{response_timeout} seconds"
Expand Down Expand Up @@ -556,12 +584,15 @@ def receive_connack
end

# Send a packet to server
def send_packet(data)
# Raise exception if we aren't connected
raise MQTT::NotConnectedException unless connected?

def send_packet(data, with_lock = true)
# Only allow one thread to write to socket at a time
@write_semaphore.synchronize do
if with_lock
@socket_semaphore.synchronize do
raise MQTT::NotConnectedException unless socket_alive?
@socket.write(data.to_s)
end
else
raise MQTT::NotConnectedException unless socket_alive?
@socket.write(data.to_s)
end
end
Expand Down
8 changes: 4 additions & 4 deletions spec/mqtt_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ def now
end

it "should not create a new TCP Socket if connected" do
allow(client).to receive(:connected?).and_return(true)
allow(client).to receive(:socket_alive?).and_return(true)
expect(TCPSocket).to receive(:new).never
client.connect('myclient')
end
Expand Down Expand Up @@ -558,19 +558,19 @@ def now
end

it "should not do anything if the socket is already disconnected" do
allow(client).to receive(:connected?).and_return(false)
allow(client).to receive(:socket_alive?).and_return(false)
client.disconnect(true)
expect(socket.string).to eq("")
end

it "should write a valid DISCONNECT packet to the socket if connected and the send_msg=true an" do
allow(client).to receive(:connected?).and_return(true)
allow(client).to receive(:socket_alive?).and_return(true)
client.disconnect(true)
expect(socket.string).to eq("\xE0\x00")
end

it "should not write anything to the socket if the send_msg=false" do
allow(client).to receive(:connected?).and_return(true)
allow(client).to receive(:socket_alive?).and_return(true)
client.disconnect(false)
expect(socket.string).to be_empty
end
Expand Down