Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory not being reclaimed #62

Open
ghost opened this issue Jun 3, 2017 · 14 comments
Open

Memory not being reclaimed #62

ghost opened this issue Jun 3, 2017 · 14 comments

Comments

@ghost
Copy link

ghost commented Jun 3, 2017

Hello,
i am running a daemon process which parses request and

  1. instantly evaluates it
  2. sends to computation layer for further processing

To each of these layers i send pointers which are captured inside lambda work function for each layer. I have written a check program which uses all functions that i am using inside daemon and ran memcheck on that, there are no memory leaks . However any variables that were used inside the work function still claim memory even after the thread ends. I am checking memory usage through Gnome System Info.

For instance when function find_trip does some heavy processing and all variables inside (R-tree, hash map) take 300 MB space,these variables are not freed even if they were not allocated through new. They somehow keep to live inside the process and the memory usage is never lower even if i run less resource intensive request afterwards.

I am attaching my daemon code below. Can you please take a look and help me figure out what am i doing wrong?

int main(int argc, char *argv[])
{
    if (argc < 2)
    {
        cout << "Usage " << argv[0] << " input_file" << endl;
        return -1;
    }
    string inputFileName(argv[1]);
    string nodes = inputFileName + ".nodes";         // node is actually Edge
    string lmdb_file = inputFileName + ".nodeIndex"; // LMDBStore keeps information node
    // coordinates, poi data and poi geometry
    string processed_edges = inputFileName + ".ch"; // edges from optimalized graph created by contraction hierarchies
    string highwayData = inputFileName + ".highwaydata";
    string highwayRtree = inputFileName + ".highwayRTree";
    string poiRtree = inputFileName + ".poiRTree";

    Graph *graph = new Graph();
    graph->loadNodes(nodes);
    graph->loadEdges(processed_edges);
    graph->loadLMDBStore(lmdb_file);
    graph->loadHighwayData(highwayData);

    RStarIndex *rstar_index = new RStarIndex(graph, highwayRtree, poiRtree);
    tbb::task_scheduler_init init;

    // number of workers to use at each stage
    auto worker_concurrency = std::thread::hardware_concurrency();
    logging::INFO("Number of threads per stage is " + to_string(worker_concurrency));

    // change these to tcp://known.ip.address.with:port if you want to do this
    // across machines
    zmq::context_t context;
    string server_endpoint = "tcp://*:8002";
    std::string result_endpoint = "ipc:///tmp/result_endpoint";
    std::string request_interrupt = "ipc:///tmp/request_interrupt";
    std::string parse_proxy_endpoint = "ipc:///tmp/parse_proxy_endpoint";
    std::string compute_proxy_endpoint = "ipc:///tmp/compute_proxy_endpoint";
    bool default_logging = false; //enable/disable default request/response logging

    // server
    std::thread server_thread = std::thread(std::bind(&http_server_t::serve,
                                                      http_server_t(context,
                                                                    server_endpoint,
                                                                    parse_proxy_endpoint + "_upstream",
                                                                    result_endpoint,
                                                                    request_interrupt,
                                                                    default_logging)));
    // load balancer for parsing
    std::thread parse_proxy(std::bind(&proxy_t::forward, proxy_t(context,
                                                                 parse_proxy_endpoint + "_upstream",
                                                                 parse_proxy_endpoint + "_downstream")));
    parse_proxy.detach();

    //request parsers
    std::list<std::thread> parse_worker_threads;
    for (size_t i = 0; i < worker_concurrency; ++i)
    {
        parse_worker_threads.emplace_back(std::bind(
            &worker_t::work,
            worker_t(
                context,
                parse_proxy_endpoint + "_downstream",
                compute_proxy_endpoint + "_upstream",
                result_endpoint,
                request_interrupt,
                [&graph, &rstar_index](const std::list<zmq::message_t> &job, void *request_info,
                                       worker_t::interrupt_function_t &) {
                    try
                    {
                        http_request_t request = http_request_t::from_string(static_cast<const char *>(job.front().data()), job.front().size());
                        http_request_info_t info = *static_cast<http_request_info_t *>(request_info);
                        int is_get_request = (request.method == method_t::GET);

                        //we accept only GET and POST requests
                        if (request.method != method_t::POST && request.method != method_t::GET)
                        {
                            return jsonify_error(405, runtime_error("Only GET and POST request are allowed"), is_get_request, info);
                        }

                        //is the request path action in the action set?
                        auto action = PATH_TO_ACTION.find(request.path);
                        if (action == PATH_TO_ACTION.cend())
                        {
                            return jsonify_error(404, runtime_error("Invalid route"), is_get_request, info);
                        }

                        worker_t::result_t result{false};
                        //do request specific processing
                        switch (action->second)
                        {
                        case ADDRESS:
                            result = find_address(request, info, graph);
                            break;
                        case NEAREST:
                            result = find_nearest(request, info, graph, rstar_index);
                            break;
                        case ROUTE:
                            result = parse_shortest_query(request, info);
                            break;
                        case TRIP:
                            result = parse_trip_query(request, info);
                            break;
                        default:
                            //apparently you wanted something that we figured we'd support but havent written yet
                            return jsonify_error(501, runtime_error("Feature is not implemented yet"), is_get_request, info);
                        }

                        return result;
                    }
                    catch (const std::exception &e)
                    {
                        http_request_t request = http_request_t::from_string(static_cast<const char *>(job.front().data()), job.front().size());
                        int is_get_request = (request.method == method_t::GET);
                        http_request_info_t info = *static_cast<http_request_info_t *>(request_info);
                        return jsonify_error(400, e, is_get_request, info);
                    }
                })));
        parse_worker_threads.back().detach();
    }

    //load balancer for shortest path computation
    std::thread compute_proxy(std::bind(&proxy_t::forward, proxy_t(context,
                                                                   compute_proxy_endpoint + "_upstream",
                                                                   compute_proxy_endpoint + "_downstream")));
    compute_proxy.detach();

    //shortest path computation
    std::list<std::thread> compute_worker_threads;
    vector<ContractionHierarchyQuery> query_objects;

    for (size_t i = 0; i < worker_concurrency; ++i)
    {
        //we need separate query object for each thread
        query_objects.push_back(graph->getQueryObject());

        compute_worker_threads.emplace_back(std::bind(
            &worker_t::work,
            worker_t(context,
                     compute_proxy_endpoint + "_downstream",
                     "ipc:///dev/null",
                     result_endpoint,
                     request_interrupt,
                     [&graph, &rstar_index, &query_objects, i](const std::list<zmq::message_t> &job, void *request_info, worker_t::interrupt_function_t &) {

                         string query(static_cast<const char *>(job.front().data()), job.front().size());

                         //parse request
                         json arr = json::parse(query);
                         int is_get_request = arr[0];
                         int accepts_gzip = arr[1];
                         int find_shortest = arr[2];
                         uint32_t start = arr[3];
                         uint32_t destination = arr[4];
                         vector<double> coordinates = arr[5].get<vector<double>>();
                         vector<int> categories;
                         if (!find_shortest)
                         {
                             categories = arr[6].get<vector<int>>();
                         }
                         try
                         {
                             http_request_info_t info = *static_cast<http_request_info_t *>(request_info);
                             worker_t::result_t result{false};
                             if (find_shortest)
                             {
                                 result = find_shortest_path(is_get_request, accepts_gzip, start, destination, coordinates, info, graph, query_objects[i]);
                             }
                             else
                             {
                                 result = find_trip(is_get_request, accepts_gzip, start, destination, coordinates, categories, info, graph, rstar_index, query_objects[i]);
                             }
                             return result;
                         }
                         catch (const std::exception &e)
                         {
                             http_request_info_t info = *static_cast<http_request_info_t *>(request_info);
                             return jsonify_error(400, e, is_get_request, info);
                         }
                     })));
        compute_worker_threads.back().detach();
    }

    //listen for SIGINT and terminate if we hear it
    std::signal(SIGINT, [](int) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        exit(1);
    });
    server_thread.join();

    delete graph;
    delete rstar_index;

    return 0;
}
@kevinkreiser
Copy link
Owner

ok so let me reiterate what you've got here so i make sure i understand correctly. you've initialized the graph and rstar_index outside of your workers, you then only free them when your daemon is interrupted (SIGINT). i dont know about how the internals of the graph and rstar_index in terms of what they cache when you use them to compute paths but i would reckon that they fill up some internal data structures which you might want to clear after each request.

there is a place in the prime_server api to do cleanup after each request. you'll notice you can attach a cleanup_function_t as another function to be run after each request is serviced by a given worker. the worker could then call the appropriate methods on graph and rstar_index to make them reclaim some memory.

im not sure if this will solve your issue though because i dont know how the internals of graph and rstar_index work. the fact that they are shared non-const amongst threads is worrisome. although maybe its just that query_objects[i] that is doing the allocation and since thats not shared maybe your cleanup function should be clearing that? im just guessing a bit now since i dont know your contraction hierarchy api but please let me know if this helps or if you have found a possibly new use case that the prime_server api could better accommodate and i'll be happy to consider alterations.

@ghost
Copy link
Author

ghost commented Jun 6, 2017

It seems that rstar_index class which uses internally Libspatialindex has some issue. It allocates some variables on heap and never frees them(except when process ends). However i don´t even know how to debug that since there is no memory leak. I have tried to look on /proc/pid/maps, but that doesn´t say which function makes the allocation. Do you have any idea on how to solve this kind of issue?

@kevinkreiser
Copy link
Owner

kevinkreiser commented Jun 7, 2017

I would guess that this library caches that information on purpose (ie its not a leak) so as to accelerate subsequent requests. indeed, valhalla, the routing software that i use with prime_server also does this. i am not familiar with libspatialindex but i would fish around in the docs for clues on what it keeps in memory.

to me it looks like your test program that doesnt use prime_server should show similar memory usage patterns, that is use a lot while the program is running and then release it only when the program finishes. it seems to me the issue with memory use is probably not at all related to prime_server. at any rate if you do find a way to tell the libspatialindex api to deallocate its cache you can do so in the cleanup function i mentioned earlier.

@ghost
Copy link
Author

ghost commented Jun 15, 2017

I have investigated the problem deeper and it is not necessarily connected with the library. The issue is connected with the threads inside prime_server. You implemented cleanup function for each thread,but since no thread ever dies, the locally allocated variables which went out of scope, all that memory is not released back to OS. So i really wonder how you don´t have issues on your server when you run a lot of requests. That memory is never released back to OS and the process still occupies it. Is there some other way to release the freed memory back to OS other than forking a new process inside each thread that does the processing?

@kevinkreiser
Copy link
Owner

the cleanup method is called after every request. if you properly deallocate the things you are interested in deallocating in that function, your thread will not consume memory without bound. the whole API is a la carte, in typical c++ fashion. anything you dont explicitly take care of is going to sit around until the program exists.

can you try something else just for fun? could you write your worker function as a free function that takes those large memory objects by reference and then pass pass those objects to the method as std::ref? as is standard practice when passing things by reference over the thread barrier.

@ghost
Copy link
Author

ghost commented Jun 16, 2017

The problem is not with the large memory objects. I have tried using apache benchmark for 10000 requests and i commented out all my functions(find_nearest, etc.) and send back just hello world. After all those requests, the memory raised and it was never freed. This is the problem that our process can't tell OS that it has freed memory, because the threads are part of the process.

You can easily replicate this issue yourself on prime_echod example.

prime_serverd tcp://*:8002
ab -k -n 10000 http://127.0.0.1:8002/is_prime?possible_prime=12345

Just server running - 3.6 MB as showed in gnome monitor which outputs physical resident set without mapped files.
After 10k requests - 8.2 MB memory

@kevinkreiser
Copy link
Owner

now that is something i can look into, ill have a quick go with valgrind!

@kevinkreiser
Copy link
Owner

interesting, ive confirmed that prime_serverd is increasing in memory over the course of its life. ill look closer at it this weekend if i get a chance. thanks for pointing this out

@ghost
Copy link
Author

ghost commented Jun 18, 2017

Okay i did some digging, this issue is connected with the memory fragmentation. To me the problem only appears when i am trying to use some dynamically allocated containers like vector or string inside the classes. I have found an interesting article here http://natsys-lab.blogspot.sk/2015/09/fast-memory-pool-allocators-boost-nginx.html.

I have tried to use solution provided by Facebook, namely library folly which contains Arena.h pool implementation. When using pool for instance for vector memory indeed gets released,but this only works for types with fixed length - ie. uint, int, double,... I can also imagine using it for strings, you get the string length and ask the allocator, but the real problem is how would i create class which uses this pool.

 static const size_t requestedBlockSize = 64;
            folly::SysArena arena(requestedBlockSize);
            folly::fbvector<size_t, folly::StlAllocator<folly::SysArena, size_t>>
                vec{{}, folly::StlAllocator<folly::SysArena, size_t>(&arena)};
            for (size_t i = 0; i < 100000; i++)
            {
                vec.push_back(i);
            }

Problem is that the problem still persists with 100k points inserted. All of the memory is just not freed back to OS.

@kevinkreiser
Copy link
Owner

sorry i have been absent a while. i'm going to keep this issue open until i find time to properly spend looking into it. seems like something inherent to multithreading that i've not come across before or was not aware of.

@ghost
Copy link
Author

ghost commented Jun 29, 2017

I don' t know whether it is a problem with the library itself,but it seems it is kind of expected behavior of libc library. Even some other guy on the Internet noticed this https://news.ycombinator.com/item?id=14275805. I will try to compile my application with libc++(https://libcxx.llvm.org/) and see if that makes any difference. I was thinking about it for a longer period of time,but if the problem was memory fragmentation only, then we should be only very slowly taking up RAM from the OS,because we should be able to fill up the freed pages. But apparently this is not happening, application is still requesting new pages even if it could re-use some of them. I was trying those funny environmental variables and for a while i thought it fixed the issue in the longer runs of server, but now i am not even sure, because it was behaving similarly without the env variable.

@kevinkreiser
Copy link
Owner

i dont know if this is an option for you, but when we run this in production we dont run it as a single process, for the sake of fault tolerance. each worker is its own process. so we'll run a prime_serverd, a prime_proxyd and then n worker processes which just implement and run the worker interface. none of it is in threads (other than the zmq internals) so this simplifies things as well when it comes to the code.

@kevinkreiser
Copy link
Owner

kevinkreiser commented Jun 29, 2017

oh i should mention when i say processes i mean each is its own stand alone program, no forks or any nonsense like that

@kevinkreiser
Copy link
Owner

kevinkreiser commented Jun 16, 2020

i've just merged a pr by @LaGrunge who found that were free'ing inside of an assert, which means only debug builds were actually free'ing... this sounds very much related to this thread so if anyone has any interested in checking their applications again it'd be great to hear from you!

reference #85

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant