🎉 Celebrating 25 Years of GameDev.net! 🎉

Not many can claim 25 years on the Internet! Join us in celebrating this milestone. Learn more about our history, and thank you for being a part of our community!

C++ Multi-Thread Queue Processing

Started by
12 comments, last by Bregma 4 years, 4 months ago

I put in some debug

For some reason m_objsToProcess gets decremented too much

I think the main code is running on before m_queue2 is filled

Reject the basic asumption of civialisation especially the importance of material possessions
Advertisement

Since I can't correctly express what I mean here is my interpretation of solution to your problem.

struct object_to_process {
  using pointer_type = std::shared_ptr<object_to_process>;
};

struct object_process_result {
  using pointer_type = std::shared_ptr<object_process_result>;
};

using to_process_queue_t = std::queue<object_to_process::pointer_type>;
using processed_queue_t  = std::queue<object_process_result::pointer_type>;

// one thread ~ one mutex ~ one condition_variable

std::mutex         to_process_mutex;
to_process_queue_t to_process; // queue1

std::condition_variable to_process_cond;

std::mutex        processed_mutex;
processed_queue_t processed;

std::condition_variable processed_cond;

std::atomic<bool> keep_running{true};

auto threaded_processor = [&]() {
  static constexpr const size_t max_count = 4;

  std::vector<object_to_process::pointer_type>     input;
  std::vector<object_process_result::pointer_type> output;
  while (true) {
    if (!keep_running)
      return;

    {
      // only single lock and unlock
      std::unique_lock<std::mutex> lock(to_process_mutex);
      to_process_cond.wait(lock, [&]() -> bool {
        if (!keep_running)
          return true;
        if (!to_process.empty())
          return true;
        return false;
      });
      if (!keep_running)
        return;

      // memory usage estimation
      input.reserve(to_process.size());
      output.reserve(to_process.size());
      
      for (auto count = 0; count < max_count && !to_process.empty(); ++count) {
        input.emplace_back(std::move(to_process.front()));
        input.pop();
      }
    }
    
    // processing loop
    for (const auto& item: input) {
      auto result = process(item);
      if (!result)
        continue;
      output.emplace_back(std::move(result));
    }
    
    {
      // only single lock and unlock
      std::unique_lock<std::mutex> lock(processed_mutex);
      for (auto& item: output)
        processed.push(std::move(item));
    }
    
    input.clear();
    output.clear();
  }
};

{
  std::unique_lock<std::mutex> lock(to_process_mutex);
  to_process = prepareToProcess();
}
to_process_cond.notify_all();

std::vector<object_process_result::pointer_type> processed_objects;
{
  std::unique_lock<std::mutex> lock(processed_mutex);
  processed_cond.wait(lock, [&]() -> bool {
    return !processed.empty();
  });
  while (!processed.empty()) {
    processed_objects.emplace_back(std::move(processed.front()));
    processed.pop();
  }
}
// processed_objects ready to be ... checked

// BOM?:
struct worker {
  // input queue mutex
  // input queue
  // input condition variable
  // keep_running
  
  // output queue reference mutex
  // output queue reference
  // output condition variable
};

keep your “ifs” on top of the stack

Cacks said:
I didn't want to have to make new classes per queue

I recommend using object oriented design here. You need one new class, a locked queue, that is a queue. You can distribute the logic behind locking the queue all over your code, and get problems like you are experiencing because reasoning about the logic become difficult, or you can associate the logic with the data it operates on using a class and keep it simple.

If your several locked queues hold data of different types, that's what templates are for. std::queue is a template. It's not a stretch to make a locked_queue<typename T> template that wraps a std::queue<T> and provides a drop-in (duck typed) replacement interface.

Stephen M. Webb
Professional Free Software Developer

This topic is closed to new replies.

Advertisement