🎉 Celebrating 25 Years of GameDev.net! 🎉

Not many can claim 25 years on the Internet! Join us in celebrating this milestone. Learn more about our history, and thank you for being a part of our community!

C++ Multi-Thread Queue Processing

Started by
12 comments, last by Bregma 4 years, 3 months ago

Hi,

I want to remove objects off Queue1, Process, move results to Queue2

In a Multi-Threaded fashion, but my code is Deadlocking:

// Threaded Function
m_threadedFunct = [this]() {
	while (m_threadsRunning) {

		// Wait
		{
			unique_lock<mutex> lock(m_mutexMain);
					
			while (m_objsToProcess == 0) m_mutexMainCondition.wait(lock);
		}

		// Objs to Process
		shared_ptr<Object> obj;
					
		{
			unique_lock<mutex> lock(m_mutex1);

			if (m_queue1.size() > 0) {
				obj = m_queue1.back();
				m_queue1.pop_back();
			}
		}

		// Process obj
		if (obj) {

			shared_ptr<Data> data(process(obj));

			// Store
			{
				unique_lock<mutex> lock(m_mutex2);

				m_queue2[obj] = data;
			}
	
			// Notify Main Thread
			{
				unique_lock<mutex> lock(m_mutexMain);

				--m_objsToProcess;

				if (m_objsToProcess == 0) m_mutexMainCondition.notify_all();
			}
		}
	}
	
};


// Start Threads
for (unsigned int i(0); i < 4; i++) m_threads.emplace_back(thread(m_threadedFunct));
			

// Main
m_queue1 = fillQueue1();
m_queue2.clear();
{
	unique_lock<mutex> calcLock(m_mutexMain);
	m_objsToProcess = static_cast<unsigned int>(m_queue1.size());
}
m_mutexMainCondition.notify_all();
{	
	unique_lock<mutex> calcLock(m_mutexMain);
	while (m_objsToProcess > 0) m_mutexMainCondition.wait(calcLock);
}
Reject the basic asumption of civialisation especially the importance of material possessions
Advertisement

As far as I'm not totally wrong: You are locking the main mutex and then set your thread to wait for new data while still holding the lock. This can't work because another thread will never get the lock and so always wait on the “waiting” thread.

You should hold a lok only for as long as you need it to perform your operation (to get the next object from the queue for example) and then release it immediately. This will keep the thread stalling time at a minimum

@Shaarigan

m_mutexMainCondition.wait(calcLock); releases the mutex in the main thread tho so the lock is released?
Reject the basic asumption of civialisation especially the importance of material possessions

If m_objsToProcess is your own variable, simply use an Interlocked/ Atomic operator to get and set the value and prevent using a lock at all sounds like a good idea

Try designing your data structures as a monitor instead of using ad-hoc synchronization in your processing code. It makes reasoning about the synchronization flow so much easier. Don't think about synchronizing your functions, think about synchronizing access to your data.

The idea is each queue has its own mutex and condition variable, and the get used in its push_back_locked() and pop_front_locked() member functions (which just wrap the unlocked versions). The latter function blocks while the queue is empty. The producer/consumer pair then just push and pop the queues as required, no synchronization code required.

For a simple producer/consumer setup like this you need one mutex and condition variable pair per queue, for a total of two mutexes and two condition variables. If you have more than that, you're doing it wrong and it's going to deadlock, sooner rather than later.

The root of the deadlock in the code above is because any worker thread can be woken by the notifyAll(), but m_queue1 is empty, so it goes back to waiting without ever waking anyone up. Boom.

Stephen M. Webb
Professional Free Software Developer

Why complicate things so much?

std::queue has an empty() function that checks if size() == 0, not empty means size > 0. So you don't need external variable holding number of items in queue.

While mutex is locked wait and then get all items from queue1at once with swap() moving all items to local queue 1. Then process all items and store result in local queue 2 without any mutex locked at all. Finally lock second mutex and push all items to queue2.

Any other queue implementation (if not using STL) should have similar functionality.

keep your “ifs” on top of the stack

Most importantly:

  • lock single mutex at a time - don't call functions that lock another mutex
  • wait only if queue is empty
  • notify_all() have no effect (does nothing) when no thread is waiting
  • process items/objects/jobs in batch

keep your “ifs” on top of the stack

Shaarigan said:

If m_objsToProcess is your own variable, simply use an Interlocked/ Atomic operator to get and set the value and prevent using a lock at all sounds like a good idea

@shaarigan I need to use a lock to stop the main thread running on before the 2nd Queue is filled

Reject the basic asumption of civialisation especially the importance of material possessions

CapSel said:

Why complicate things so much?

std::queue has an empty() function that checks if size() == 0, not empty means size > 0. So you don't need external variable holding number of items in queue.

The external variable is needed because m_queue1 can become empty before m_queue2 is filled & the main thread will continue

Reject the basic asumption of civialisation especially the importance of material possessions

Bregma said:

Try designing your data structures as a monitor instead of using ad-hoc synchronization in your processing code. It makes reasoning about the synchronization flow so much easier. Don't think about synchronizing your functions, think about synchronizing access to your data.

I will do this, I didn't want to have to make new classes per queue though, I do several rounds of processing & passing to queues so not just 2 queues

Reject the basic asumption of civialisation especially the importance of material possessions

This topic is closed to new replies.

Advertisement