Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zero-copy message cannot send same amount of transmission as copy message and ZMQbg/IO/0" received signal SIGABRT #614

Open
fengmao31 opened this issue Sep 4, 2023 · 6 comments

Comments

@fengmao31
Copy link

fengmao31 commented Sep 4, 2023

I found it have the error when I use zero-cpoy way to create big message and send it.
If the transimission is large, the zero- copy way the create have problem., while the copy work well.
I can send the 2MB size message and send 25Hz in copy message, but only can send 100KB message in the zero-cpoy way in the same frequency or I can send 2MB size meesage in low frequency.

This two work well. use alloc to take message or use copy way to create message.

zmq::message_t msg(const_cast<char*>(m2.data()), m2.size());            
char* bbb = (char*)malloc(m.size());
memcpy(bbb, m.data() , m.size());
zmq::message_t msg(bbb, m.size(), my_free3, nullptr);

This one cannot work. use zero-copy way to create message.

auto my_free3 = [](void* data, void* hint) {};
zmq::message_t msg(const_cast<char*>(m.data()), m.size(), my_free3, nullptr);
fengmao@fengmao-Precision-3660:~/rima/cpp-python-programmer/rmw/zmq/build$ ./zmqpub_multi_frame_zerocopy 
Sending seq 2 info size 27
Sending seq 2 m size 20000017
Bad address (/home/fengmao/rima/libzmq/src/tcp.cpp:254)
Aborted
fengmao@fengmao-Precision-3660:~/rima/cpp-python-programmer/rmw/zmq/build$ gdb zmqpub_multi_frame_zerocopy 
GNU gdb (Ubuntu 9.2-0ubuntu1~20.04.1) 9.2
Copyright (C) 2020 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.
Type "show copying" and "show warranty" for details.
This GDB was configured as "x86_64-linux-gnu".
Type "show configuration" for configuration details.
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>.
Find the GDB manual and other documentation resources online at:
    <http://www.gnu.org/software/gdb/documentation/>.

For help, type "help".
Type "apropos word" to search for commands related to "word"...
Reading symbols from zmqpub_multi_frame_zerocopy...
(gdb) r
Starting program: /home/fengmao/rima/cpp-python-programmer/rmw/zmq/build/zmqpub_multi_frame_zerocopy 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
[New Thread 0x7ffff713e700 (LWP 779197)]
[New Thread 0x7ffff693d700 (LWP 779198)]
[New Thread 0x7ffff613c700 (LWP 779199)]
Sending seq 2 info size 27
Sending seq 2 m size 20000017
Sending seq 3 info size 27
Sending seq 3 m size 20000017
Bad address (/home/fengmao/rima/libzmq/src/tcp.cpp:254)
Sending seq 4 info size 27
Sending seq 4 m size 20000017

Thread 4 "ZMQbg/IO/0" received signal SIGABRT, Aborted.
[Switching to Thread 0x7ffff613c700 (LWP 779199)]
__GI_raise (sig=sig@entry=6)
    at ../sysdeps/unix/sysv/linux/raise.c:50
50      ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) 
@gummif
Copy link
Member

gummif commented Sep 6, 2023

Can you provide full working example?

@fengmao31
Copy link
Author

I will finish a least example and send to you next week. Maybe it is after Tuesday.

@fengmao31
Copy link
Author

zmqpub_multi_frame_zerocopy_github

// based on https://www.codenong.com/45740168/
// based on https://blog.csdn.net/qq_41453285/article/details/106845900
// author samuel

#include "zmq.hpp"
#include <string>
#include <iostream>
#include "protobuf/examples.pb.h"
#include <unistd.h>
#include <thread>
#include <functional>
#include <memory>
#include "protobuf/msg_bridge_frame.pb.h"

using apollo::cyber::examples::proto::Chatter;
using apollo::msg_bridge::proto::Frame;

std::unique_ptr<zmq::context_t> context;
std::unique_ptr<zmq::socket_t> publisher;
class Apple {
  private:
    using ThreadPtr = std::unique_ptr<std::thread>;
    ThreadPtr thread_;
    ;

  public:
    ~Apple() { thread_->join(); }
    void my_free(void* data, void* hint) {}
    void Start() { thread_.reset(new std::thread(&Apple::ThreadFunc, this)); }
    bool ThreadFunc()
    {
        std::unique_ptr<zmq::context_t> context;
        std::unique_ptr<zmq::socket_t> publisher;
        context = std::make_unique<zmq::context_t>(1);
        publisher = std::make_unique<zmq::socket_t>(*context, ZMQ_PUB);
        publisher->bind("tcp://127.0.0.1:8888");
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        uint64_t seq = 1;
        while (true) {
            // sleep(1);
            seq++;

            const std::string topic = "stream1";
            zmq::message_t env(topic.data(), topic.size());

            std::string info(100,'+');
            std::string m(20000000,'+');

            auto my_free3 = [](void* data, void* hint) {};
            // char* aaa = (char*)malloc(info.size());
            // memcpy(aaa, info.data(), info.size());
            // std::cout<< aaa<<std::endl;
            // std::string aaa_str(aaa,info.size());
            // std::cout<< aaa_str<<std::endl;

            // char* bbb = (char*)malloc(m.size());
            // memcpy(bbb, m.data() , m.size());

            zmq::message_t header(const_cast<char*>(info.data()), info.size(), my_free3, nullptr);
            zmq::message_t msg(const_cast<char*>(m.data()), m.size(), my_free3, nullptr);
            // std::cout << "info: " << std::endl << info << std::endl;
            // std::cout << "--------------" << std::endl;
            // std::cout << "m: " << std::endl << m << std::endl;

            publisher->send(env, ZMQ_SNDMORE);
            std::cout << " info size " << info.size() << std::endl;
            publisher->send(header, ZMQ_SNDMORE);
            std::cout << " m size " << m.size() << std::endl;
            publisher->send(msg, ZMQ_NULL | ZMQ_NOBLOCK);
            usleep(500);
            // if(aaa)free(aaa);
            // if(bbb)free(bbb);
        }
        return 0;
    }
};

int main()
{
    Apple apple;
    apple.Start();
}

zmqsub_multi_frame_zerocopy_github

//based on https://www.codenong.com/45740168/
//based on https://blog.csdn.net/qq_41453285/article/details/106845900
//author samuel

#include "zmq.hpp"
#include <string>
#include <iostream>
#include "protobuf/examples.pb.h"
#include <thread>
#include "protobuf/msg_bridge_frame.pb.h"

using apollo::cyber::examples::proto::Chatter;
using apollo::msg_bridge::proto::Frame;

std::unique_ptr<zmq::context_t> context;
std::unique_ptr<zmq::socket_t> subscriber;
bool ThreadFunc();
int main()
{
std::thread thread = std::thread(ThreadFunc);
subscriber = nullptr;
thread.join();
}
bool ThreadFunc()
{

    context = std::make_unique<zmq::context_t>(1);
    subscriber = std::make_unique<zmq::socket_t>(*context, ZMQ_SUB);
    subscriber->connect("tcp://127.0.0.1:8888");
    const std::string topic="stream1";
    subscriber->set(zmq::sockopt::subscribe,topic.data());

    while (true) {

        zmq::message_t env;
        subscriber->recv(&env);
        std::string env_str = std::string(static_cast<char*>(env.data()), env.size());


        zmq::message_t header;
        subscriber->recv(&header);


        std::string header_str((char *)header.data(),header.size());
        std::cout << "Received " << header_str << " on topic A" << std::endl;

        zmq::message_t m;
        subscriber->recv(&m);

        std::string m_str((char*)m.data(), m.size());

        std::shared_ptr<Chatter> m_proto_ptr = std::make_shared<Chatter>();
        // std::cout << "Received  content " << m_str << " on topic A" << std::endl;
    }
    return 0;
}

when the receiver get the message from publisher, the publisher program will show the error bad address or segment fault. I use gdb to look up the broken point and find it is ZMQbg/IO/0" received signal SIGABRT.

@gummif
Copy link
Member

gummif commented Sep 14, 2023

This looks more complicqtwd than it needs to be. E.g. the my_free function does nothing which will cause read from deallocated memory. Try constructing the message the simple way.

@fengmao31
Copy link
Author

I want to use this to transport large message to reduce the delay in the communication. In my actual program, it will crash when the message bigger than 100KB 25HZ. or I skip send the first message and it can send remained messageses. But the message cannot be deserialized. It is not the message I want to send.
In this example, it can send more big message with faster speed. But it will finally show bad address or segment fault.

@fengmao31
Copy link
Author

I will fix the code in my_free function and try again. Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants