Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Got error when connection lost using SSL #206

Open
leruetkins opened this issue Jun 14, 2023 · 1 comment
Open

Got error when connection lost using SSL #206

leruetkins opened this issue Jun 14, 2023 · 1 comment

Comments

@leruetkins
Copy link

I run the program and everything works fine, but after the connection to the server is lost, the program crashes and may have time to catch such an error:

thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: Poi
sonError { .. }', C:\Users\user\.cargo\registry\src\github.com-1ecc6299db9ec823\
paho-mqtt-0.12.1\src\token.rs:369:41

Used code:

fn mqtt_connect() -> mqtt::Result<()> {
    let period = CONFIG_JSON["settings"]["mqtt"]["period"].as_u64().unwrap() * 1000;
    let period_duration = Duration::from_millis(period);
    let mut zabbix_last_msg = Instant::now() - period_duration - Duration::from_millis(1000);
    let host = CONFIG_JSON["settings"]["mqtt"]["url"]
        .as_str()
        .unwrap()
        .to_string();
    println!("Connecting to host: '{}'", host);

    let zabbix_topic = CONFIG_JSON["settings"]["mqtt"]["topic"].as_str().unwrap();

    // let random_name_result = format!("zbx-np_{}", RANDOM_NAME.to_string());
    // println!("Client ID: {}", random_name_result);
    let cli = mqtt::CreateOptionsBuilder::new()
        .server_uri(&host)
        .client_id("random_name_result")
        .max_buffered_messages(100)
        .create_client()?;

    let ssl_opts = mqtt::SslOptionsBuilder::new()
        .enable_server_cert_auth(false)
        //  .trust_store(trust_store)?
        //  .key_store(key_store)?
        .finalize();

    let login = CONFIG_JSON["settings"]["mqtt"]["login"]
        .as_str()
        .unwrap()
        .to_string();

    let password = CONFIG_JSON["settings"]["mqtt"]["password"]
        .as_str()
        .unwrap()
        .to_string();
    let conn_opts = mqtt::ConnectOptionsBuilder::new()
        .ssl_options(ssl_opts)
        .user_name(login)
        .password(password)
        .keep_alive_interval(Duration::from_secs(20))
        .clean_session(false)
        // .will_message(lwt)
        .finalize();

    cli.connect_with_callbacks(conn_opts, on_connect_success, on_connect_failure);

    cli.set_connected_callback(|_cli: &mqtt::AsyncClient| {
        println!("Connected.");
    });
    cli.set_connection_lost_callback(|cli: &mqtt::AsyncClient| {
        println!("Connection lost. Attempting reconnect.");
        thread::sleep(Duration::from_millis(2500));
        cli.reconnect_with_callbacks(on_connect_success, on_connect_failure);
    });

    cli.set_message_callback(move |_cli, msg| {
        if let Some(msg) = msg {
            let topic = msg.topic();
            let payload_str = msg.payload_str();
            if topic == zabbix_topic {
                let now = Instant::now();
                if (now - zabbix_last_msg) > Duration::from_millis((period).try_into().unwrap()) {
                    print_time_date();
                    let data: Result<Data, _> = serde_json::from_str(&payload_str);
                    // let json_obj: Result<Value, serde_json::Error> = serde_json::from_str(&payload_str);
                    match data {
                        Ok(ref obj) => {
                            let json_string = serde_json::to_string(&obj);
                            match json_string {
                                Ok(string) => {
                                    // println!("{}", string);
                                    println!("Received data from MQTT:\n{} - {}", topic, string);
                                }
                                Err(ref e) => {
                                    println!("Failed to convert JSON to string: {}", e);
                                }
                            }
                        }
                        Err(ref e) => {
                            println!("Failed to parse JSON: {}", e);
                        }
                    }
                    if let Ok(data) = data {
                        let response_json = json!({
                            "zabbix_server": data.zabbix_server,
                            "item_host_name": data.item_host_name,
                            "item": data.item,
                        });
                        let show_result = send_to_zabbix(&response_json.to_string());
                        let decoded_show_result = match show_result {
                            Ok(show_result) => decode_unicode_escape_sequences(&show_result),
                            Err(err) => {
                                eprintln!("Error sending data to Zabbix server: {}", err);
                                // Create an error response JSON
                                return;
                            }
                        };
                        let mut response_data = json!({
                            "data": response_json,
                            "result": decoded_show_result
                        });
                        // Convert the "show_result" field to a JSON value
                        if let Some(show_result_value) = response_data.get_mut("result") {
                            if let Some(show_result_str) = show_result_value.as_str() {
                                if let Ok(show_result_json) = serde_json::from_str(show_result_str)
                                {
                                    *show_result_value = Value::Object(show_result_json);
                                }
                            }
                        }
                    } else if let Err(err) = data {
                        eprintln!("Failed to parse payload as JSON object: {}", err);
                        // Handle the parsing error
                    }
                    zabbix_last_msg = Instant::now();
                }
            }
        }
    });

    loop {
        thread::sleep(Duration::from_millis(1000));
    }
}
@matmoscicki
Copy link

I have a similar problem. When the connection is lost, my client crashes. Sometimes it works properly and the client reconnect, but very often not.

I've tested this by killing mosquitto server.

Thread 7 "MQTTAsync_rcv" received signal SIGPIPE, Broken pipe.
[Switching to Thread 0x7ffff68b2700 (LWP 1327907)]
__libc_write (nbytes=24, buf=0x7fffd800ed03, fd=9) at ../sysdeps/unix/sysv/linux/write.c:26
26      ../sysdeps/unix/sysv/linux/write.c: No such file or directory.
(gdb) bt
#0  __libc_write (nbytes=24, buf=0x7fffd800ed03, fd=9) at ../sysdeps/unix/sysv/linux/write.c:26
#1  __libc_write (fd=9, buf=0x7fffd800ed03, nbytes=24) at ../sysdeps/unix/sysv/linux/write.c:24
#2  0x00007ffff7cf9489 in ?? () from /lib/x86_64-linux-gnu/libcrypto.so.1.1
#3  0x00007ffff7cf466e in ?? () from /lib/x86_64-linux-gnu/libcrypto.so.1.1
#4  0x00007ffff7cf3684 in ?? () from /lib/x86_64-linux-gnu/libcrypto.so.1.1
#5  0x00007ffff7cf3b47 in BIO_write () from /lib/x86_64-linux-gnu/libcrypto.so.1.1
#6  0x00007ffff7f46dde in ?? () from /lib/x86_64-linux-gnu/libssl.so.1.1
#7  0x00007ffff7f47cd9 in ?? () from /lib/x86_64-linux-gnu/libssl.so.1.1
#8  0x00007ffff7f5188e in ?? () from /lib/x86_64-linux-gnu/libssl.so.1.1
#9  0x00007ffff7f4fa65 in ?? () from /lib/x86_64-linux-gnu/libssl.so.1.1
#10 0x00007ffff7f5aec3 in SSL_shutdown () from /lib/x86_64-linux-gnu/libssl.so.1.1
#11 0x0000555555ad6d36 in SSLSocket_close (net=0x555555eac928) at /home/mmoscicki/.cargo/registry/src/index.crates.io-6f17d22bba15001f/paho-mqtt-sys-0.9.0/paho.mqtt.c/src/SSLSocket.c:957
#12 0x0000555555ad345e in MQTTAsync_closeOnly (client=0x555555eac8f0, reasonCode=MQTTREASONCODE_SUCCESS, props=0x0) at /home/mmoscicki/.cargo/registry/src/index.crates.io-6f17d22bba15001f/paho-mqtt-sys-0.9.0/paho.mqtt.c/src/MQTTAsyncUtils.c:2409
#13 0x0000555555ad34e3 in MQTTAsync_closeSession (client=0x555555eac8f0, reasonCode=MQTTREASONCODE_SUCCESS, props=0x0) at /home/mmoscicki/.cargo/registry/src/index.crates.io-6f17d22bba15001f/paho-mqtt-sys-0.9.0/paho.mqtt.c/src/MQTTAsyncUtils.c:2427
#14 0x0000555555ad0dd9 in nextOrClose (m=0x555555eac6b0, rc=-1, message=0x555555cbbcea "socket error") at /home/mmoscicki/.cargo/registry/src/index.crates.io-6f17d22bba15001f/paho-mqtt-sys-0.9.0/paho.mqtt.c/src/MQTTAsyncUtils.c:1644
#15 0x0000555555ad20ea in MQTTAsync_receiveThread (n=0x555555eac6b0) at /home/mmoscicki/.cargo/registry/src/index.crates.io-6f17d22bba15001f/paho-mqtt-sys-0.9.0/paho.mqtt.c/src/MQTTAsyncUtils.c:2053
#16 0x00007ffff7c0b609 in start_thread (arg=<optimized out>) at pthread_create.c:477
#17 0x00007ffff79d9353 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants