C++ 11: Concurrency Support

Discussion in 'C++' started by BiplabKamal, Mar 23, 2016.

  1. Concurrency in applications is the ability of the application to have more than one independent execution path and still work for the common goal. This implies that the independent execution paths need to communicate with each other while getting executed. Here for the simplicity of the discussion we will use term thread for an execution path though different name could be used in different platforms. We will use the term multi-threading to mean concurrency.

    Multi-threaded programming has always been a high potential source of bugs in software development. If you don’t have very clear picture of threading techniques available in the technologies you use for development and your threads are using shared resources and multiple threads need to co-operate with each other then definitely you will create mess somewhere. I am assuming you are well versed with the concept of threading concept in general and not going in detail of what is thread and how the operating system manages of running threads. We know threads are independent execution paths. In windows operating system each process has one or more threads. As the scheduling of threads is controlled by the operating system, programmers do not have much control over when a thread will get the time slice and for what duration. This will not create any problem when threads are completely independent. But that never happens, if threads are fully independent then they could be implemented in separate applications. Your application might create more than one thread to fulfill a common goal i.e. the requirement of the users of the application. Unless the threads in an application are not collaborating they cannot fulfill the common target. I hope you are not asking the question that why we need multiple threads and why cannot be done with single thread. It is true that many applications can be single threaded and we should not introduce multi-threading unless it is really required because multi-threading programming incur more development cost and run-time overhead than single-threading. While you should be very sure that you need multiple threads in your application.

    What is the implication of having more threads in a single application? By introducing more threads in a single application you are helping both of your application as well as the overall system performance. How? Having more number of active threads in the system helps reducing the amount of time CPU keeps idle when we have multi-core processors. Most of the operating systems tries to keep the CPUs busy at it’s maximum level by applying efficient thread scheduling algorithm. It schedules the threads which are not blocked by io operation or some other purpose. CPU is in idle state when there is no active thread to run by CPU. If you look at your desktop CPU performance you will find most of the time CPU is idle. This is because CPU much faster than I/O devices. So when you break the bigger task into smaller tasks and run in different threads you increase the chances of CPU not being idle. At the same time if your application has more numbers of threads it gets more sharing of CPU when scheduling is based on number of threads and not on number of processes. Think about a real example where you need to wait in a queue for buying tickets. The rule is that one person cannot get more than 5 tickets at a time but same person can go in the queue as many number of times as he wants. Also each ticket is issued in a fixed time. Now if the requirement is for 100 tickets and there are 2 counters available you can take two approach:
    1. You can send one person who will go in one of the queues, buy 5 tickets and then again go in one of the queues and so on until he gets total 100 tickets.
    2. You can send multiple(say 5) people who will wait in the queues simultaneously and every person will follow the first approach until their total tickets reach to 100.
    In both the approach there are benefits and problems. In the first approach it will take lot of time if there is a rush. Even if there is no rush and both counters are free you can not utilize both the counter. In the 2nd approach it will be much faster to get total number of tickets but you have to deploy more resources. So there is a question of cost verses speed. In case of a software application you can think the processors as counters, threads as ticket buyer and the work load as the total number of tickets.

    There is no hard and fast rules when we should use how many threads but there are scenarios where using multiple threads may have benefits over cost:
    1. Utilization of multiple CPUs: If your system has more than one CPU cores using multiple threads can improve the performance. At leaset one thread per CPU core is required to utilize the power of the machine fully. Normally operating system try to keep the CPUs busy with effective thread scheduling.
    2. Improving CPU utilization: Even if there is a single CPU it might be idle most of the time because most of the applications spends time on blocking IO operations which don’t need CPU cycles. So creating separate threads for io expensive tasks will improve the utilization CPU and hence the throughput.
    3. Improve performance or throughput: If an application has tasks which takes long time, it could be broken into smaller tasks and run in separate threads which will reduce the overall completion time. This is because by creating more threads you are demanding more share of the CPU time-slice from the operating system.
    4. User responsiveness: When an application needs to response to the user in a timely manner but it needs to do background work which is time consuming creating separate threads for background tasks will enable the user thread to response in time.
    5. Supporting multiple users: If an application (example: Web Application) supports multiple users simultaneously separate threads might be required to handle interaction with every user.
    Threading is basically an operating system feature and not a language feature. Like in windows you call system APIs to create and manage threads. Every process in Windows must have at least one thread called the main thread because thread is the unit of execution in the OS. When there are additional threads other than main thread all the threads need to communicate and synchronize accessing common resource among them as they will be fulfilling some common goal and share resources. This aspects of multi-threaded design requires lot of patience, knowledge, expertise and experience. Before introducing additional thread in your application you need to know about the operating system support and technique of thread management and thread synchronization. The learning curve can be reduced when there is helper library in the language used and supporting multiple platforms is a requirement. C++11 took the pain of platform independent support for multi-threading and thread synchronization. Prior to C++11, in Windows Platforms you needed to learn OS APIs or some framework like MFC or .Net or some other framework to create, manage and synchronize threads. But if you know C++ standard library support for concurrency features of version C++11 you can do most of the things without knowing the APIs or frameworks.

    C++11 threading: C++11 added threading support in it’s standard library and not as a compiler feature. That means if you want to use this support you need to include some standard library headers. For example the thread class is used to create thread object which is defined in header <thread> and part of the namespace ‘std’. Let us first take a very simple example where the main thread creates another thread and run a function within that thread:

    Code:
    #include<thread>
    #include<iostream>
    #include<chrono>
    int main()
    {
        
        // Lamda expression to be execeuted by the thread. It can be a function also
        auto threadlamda = []() {
            for (int i = 1; i <= 10;i++)
            {
                    std::cout << "Child Thread"<< " Iteration: " <<i << std::endl;
            }
            std::cout << "Exiting the child thread" << std::endl;
        };
        std::thread mythread(threadlamda);
        for (int i = 1; i <= 10;i++)
        {
            std::cout << "Main Thread" << " Iteration: " << i << std::endl;
        }
        std::cout << "Exiting the main thread"<<std::endl;
        return 0;
    }
    
    In the above code there is at least two major problem, one is the output to the console can be a mess because while one thread is writing to console another thread may be get the CPU cycles. So the two output statement from two threads can interfere with each other. Here is one instance of the output but it could be different for each run and output is uncertain because thread scheduling is controlled by the os.
    Code:
    Child Thread Iteration: Main Thread Iteration: 11
    
    Main Thread Iteration: 2
    Child Thread Iteration: 2
    Main Thread Iteration: 3
    Child Thread Iteration: 3Main Thread Iteration: 4
    
    Main Thread Iteration: 5
    Child Thread Iteration: 4
    Main Thread Iteration: 6Child Thread Iteration: 5
    
    Main Thread Iteration: 7Child Thread Iteration: 6
    
    Main Thread Iteration: 8Child Thread Iteration: 7
    
    Main Thread Iteration: 9Child Thread Iteration: 8
    
    Main Thread Iteration: 10Child Thread Iteration: 9
    
    Exiting the main thread
    Child Thread Iteration: 10
    Exiting the child thread
    
    This is because both the thread is using cout object but only the << operator is thread safe and we are using << operator multiple times to write a complete sentence to the console. So you need to synchronize the threads so that when one thread is writing one message other thread is blocked.

    There is another problem- When the thread object is destroyed at the return of main function, an exception is thrown. To avoid this you need to call join() method of the thread object from the main thread. This also ensure that child thread is exited before the main thread exit. Let us try to improve this code as shown below:
    Code:
    #include<thread>
    #include<iostream>
    #include<mutex>
    int main()
    {
        std::mutex m;
        // Lamda expression to be execeuted by the thread. It can be a function also
        auto threadlamda = [&m]() {
            std::cout << "Starting Child thread \n";
            for (int i = 1; i <= 10;i++)
            {
                std::unique_lock<std::mutex> mylock(m); // Lock here and unlock at the end of the block
                std::cout << "Child Thread"<< " Iteration: " <<i << std::endl;
            }
            std::cout << "Exiting the child thread" << std::endl;
        };
        std::thread mythread(threadlamda);
        for (int i = 1; i <= 10;i++)
        {
            std::unique_lock<std::mutex> mylock(m);// Lock here and unlock at the end of the block
            std::cout << "Main Thread" << " Iteration: " << i << std::endl;
        }
        mythread.join();
        std::cout << "Exiting the main thread\n";
        
        return 0;
    }
    [CODE]
    
    In the above code we added mutex and lock it before the cout statements. Unlock is called when std::unique_lock<std::mutex> object get out of scope. Also we added the call the join() method of the child thread object at the end of the main thread. Note the use of std::unique_lock<std::mutex> object for synchronization.It locks the mutex in the constructor and unlocks in the destructor. std::mutex and std::unique_lock template class are implemented in the header <mutex>. But it added one new problem that child thread is not getting chance to lock the mutex before the main thread loop is done. You can add a sleep (std::this_thread::sleep_for()) in each thread’s loop which will allow both threads lock the mutex in between. But it will be still very irregular in nature. Suppose we want to display the output uniformly by each thread after the other thread. So you need to add handshaking between the threads. For this purpose we will now use a condition variables available in C++11. Here is the modified code:
    
    [CODE=Cpp]
    #include<thread>
    #include<iostream>
    #include<mutex>
    #include<condition_variable>
    int main()
    {
        std::mutex m;
        std::condition_variable SharedCond;
        // Lamda expression to be execeuted by the thread. It can be a function also
        auto threadlamda = [&]() {
            std::cout << "Starting Child thread \n";
            for (int i = 1; i <= 5;i++)
            {
                {
                    std::unique_lock<std::mutex> mylock(m); // Lock here and unlock at the end of the block
                    SharedCond.wait(mylock);
                }
                std::cout << "Child Thread"<< " Iteration: " <<i << std::endl;
                SharedCond.notify_one();
            }
            std::cout << "Exiting the child thread" << std::endl;
        };
        std::thread mythread(threadlamda);
        for (int i = 1; i <= 5;i++)
        {
            std::cout << "Main Thread" << " Iteration: " << i << std::endl;
            {
                SharedCond.notify_one();
                std::unique_lock<std::mutex> mylock(m);// Lock here and unlock at the end of the block
                SharedCond.wait(mylock);
            }
        }
        mythread.join();
        std::cout << "Exiting the main thread\n";
        return 0;
    }
    
    following is the output as intended:
    Code:
    Starting Child thread
    Main Thread Iteration: 1
    Child Thread Iteration: 1
    Main Thread Iteration: 2
    Child Thread Iteration: 2
    Main Thread Iteration: 3
    Child Thread Iteration: 3
    Main Thread Iteration: 4
    Child Thread Iteration: 4
    Main Thread Iteration: 5
    Child Thread Iteration: 5
    Exiting the child thread
    Exiting the main thread
    
    condition_variable is a synchronization object on which a thread execution can be blocked. To block a thread the thread should call it’s wait() method passing a mutex object which is locked. In the wait() method an object of template class std::unique_lock<std::mutex> is passed. The wait() method also unlock() the underlying mutex while blocking the thread. Some other thread can call the notify_one() or notify_all() methods of the same condition variable to wake up the blocking threads. wait() method calls the lock() method of the underlying mutex after waking the thread. Normally calls to notify_one() or notify_all() methods from the other threads can wake up the waiting threads but some implementations may generate spurious wake-up call without notify calls. So the users of these calls need to make sure that actual condition is met after the thread receives wake-up calls. There is another conditional version of wait() method checks for the actual condition before releasing the thread execution. This wait() method takes a callable object or function as it’s 2nd argument called predicate that takes no argument and returns a value convertible to bool value. wait() does not return but keeps on waiting if the return value of the predicate turns to be false. So in the above code we will introduce one flag for each thread they will set before notify to other thread. Here is the modified code:

    Code:
    #include<thread>
    #include<iostream>
    #include<mutex>
    #include<condition_variable>
    int main()
    {
        std::mutex m;
        std::condition_variable SharedCond;
        bool bMainthreadDone = false;
        bool bChildThreadDone = false;
        // Lamda expression to be execeuted by the thread. It can be a function also
        auto threadlamda = [&]() {
            std::cout << "Starting Child thread \n";
            for (int i = 1; i <= 5;i++)
            {
                {
                    std::unique_lock<std::mutex> mylock(m); // Lock here and unlock at the end of the block
                    SharedCond.wait(mylock, [&]() {return bMainthreadDone;});
                }
                std::cout << "Child Thread"<< " Iteration: " <<i << std::endl;
                bChildThreadDone = true;
                bMainthreadDone = false;
                SharedCond.notify_one();
            }
            std::cout << "Exiting the child thread" << std::endl;
        };
        std::thread mythread(threadlamda);
        for (int i = 1; i <= 5;i++)
        {
            std::cout << "Main Thread" << " Iteration: " << i << std::endl;
            {
                bMainthreadDone = true;
                bChildThreadDone = false;
                SharedCond.notify_one();
                std::unique_lock<std::mutex> mylock(m);// Lock here and unlock at the end of the block
                SharedCond.wait(mylock, [&]() {return bChildThreadDone;});
            }
        }
        mythread.join();
        std::cout << "Exiting the main thread\n";
        return 0;
    }
    
    Output will be same as above but it is safer.

    Whatever we discussed above is basically called thread based approach of concurrency. C++11 introduced a new concept of concurrency which is task based. A task can be executed asynchronously. For example in the above code snippet if you consider the threadlamda function as a task you can execute that task asynchronously by passing the lamda to the std::async() call. So replace following line
    Code:
     std::thread mythread(threadlamda);
        with
    std::async(threadlamda);
    
    and run the code. Wait this code will not work. This is because std::async() returns a std::future<T> object where T is deduced from the return value of the function passed to async() call. The rule is that the returned future object destructor will be blocked until the thread execution is finished. In this case the return object is not stored so the destructor of the returned object will get called immediately and block the main thread. On the other hand the child thread is waiting for the notification from the main thread. This is a deadlock situation! This code is deadlock prone as you always run the risk of deadlock and race condition in concurrent execution. Following code shows how task is executed asynchronously:
    Code:
    //#include<thread>
    #include<iostream>
    #include<mutex>
    #include<condition_variable>
    #include<future>
    int main()
    {
        std::mutex m;
        std::condition_variable SharedCond;
        bool bMainthreadDone = false;
        bool bChildThreadDone = false;
        // Lamda expression to be execeuted by the thread. It can be a function also
        auto threadlamda = [&]() {
            for (int i = 1; i <= 5;i++)
            {
                {
                    std::unique_lock<std::mutex> mylock(m); // Lock here and unlock at the end of the block
                    SharedCond.wait(mylock, [&]() {return bMainthreadDone;});
                }
                std::cout << "Child Thread"<< " Iteration: " <<i << std::endl;
                bChildThreadDone = true;
                bMainthreadDone = false;
                SharedCond.notify_one();
            }
            std::cout << "Exiting the child thread" << std::endl;
        };
        //std::thread mythread(threadlamda);
        std::future<void> fut= std::async(threadlamda);
        for (int i = 1; i <= 5;i++)
        {
            std::cout << "Main Thread" << " Iteration: " << i << std::endl;
            {
                bMainthreadDone = true;
                bChildThreadDone = false;
                SharedCond.notify_one();
                std::unique_lock<std::mutex> mylock(m);// Lock here and unlock at the end of the block
                SharedCond.wait(mylock, [&]() {return bChildThreadDone;});
            }
        }
        //mythread.join();
        fut.wait();
        std::cout << "Exiting the main thread\n";
        return 0;
    }
    
    Advantages of using std::async():
    • Thread management is delegated to standard library function.
    • You can catch the return value of the thread function
    • You can catch the exception thrown by the thread function
    In thread based programming you cannot catch the return value of the thread function in the calling thread. In task based programming using std::async() you can get the return value of the function by calling get() method of the std::future object returned by std::async() method. Similarly if the function throws an exception same exception will be thrown by the get() function. wait() method or get() blocks until the task is completed. So call future::get() or future.wait() method in place of join() method of thread object. Note that there are other versions of wait() like wait_for() or wait_until(). There are also other methods of std::future object and you need to read the reference manual whenever you need to use them. Following program demonstrates the task based programming using std::async() and capturing return values as well as exception:
    Code:
    #include<iostream>
    #include<future>
    using namespace std;
    int main()
    {
    
        int SharedMem = 0;
        condition_variable SharedCond;
        mutex condmutext;
        // Lamda expression to be execeuted by thread function
        auto threadlamda = [&](int newval, bool IsFirst) {
            if (newval < 0)
                throw newval;
            for (int i = 0; i < 5;i++)
            {
                if (!IsFirst)
                {
                    unique_lock<mutex> lm(condmutext);
                    SharedCond.wait(lm);
                }
                {
                    
                    if (IsFirst)
                        cout << "1st thread: " << endl;
                    else
                        cout << "2nd thread: " << endl;
                    cout << "Thread id: " << this_thread::get_id() << " and value before modification: " << SharedMem << endl;
                    SharedMem = newval + i;
                    cout << "Thread id: " << this_thread::get_id() << " and value after modification: " << SharedMem << endl << endl;
                
                }
    
                if (IsFirst)
                {
                    SharedCond.notify_one();
                    unique_lock<mutex> lm(condmutext);
                    SharedCond.wait(lm);
                }
                else
                {
                    SharedCond.notify_one();
                }
    
            }
            cout << "Exiting thread ID: " << this_thread::get_id() << endl;
            return newval;
        };
        auto thread1lamda = [threadlamda]()->int {return threadlamda(100, true);};
        auto thread2lamda = [threadlamda]()->int {return threadlamda(200, false);};
        auto thread3lamda = [threadlamda]()->int {return threadlamda(-200, false);};
        auto fut1 = std::async(thread1lamda);
        auto fut2 = std::async(thread2lamda);
        auto fut3 = std::async(thread3lamda);
        try {
            cout << "Firt task return value :" << fut1.get() << endl;
            cout << "2nd task return value :" << fut2.get() << endl;
            cout << "third task return value :" << fut3.get() << endl;
        }
        catch (int i)
        {
            cout << "Exception thrown: " << i<<endl;
        }
        cout << "Exiting the main function" << endl;
        return 0;
    }
    
    One thing to note that std::async() by default does not guarantee to run the task in different thread. It depends on resource availability in the system and the thread management policy of the OS. If you want to force it to run asynchronously std:async() has the option to specify. You can specify the std::launch::async or std::launch::deferred as the first argument of std:async() and then callable object like function or lamda as the 2nd argument and more arguments for the parameters of the function object passed as the 2nd argument. std::launch::async option will ensure that the task will be run in a new thread and std::launch::deferred will defer the execution and will be executed ynchronously when the calling thread tries to get the return value or call wait() function. If you don’t specify the launch option then the default option will be std::launch::async|std::launch::deferred. Passing arguments of the thread function also applicable to std::thread constructor. Following two lines of code are equivalent:
    Code:
    auto fut1 = std::async(thread1lamda);
    auto fut1 = std::async(std::launch::async|std::launch::deferred,thread1lamda);
    
    Both the line above will make it uncertain whether the task will be launched asynchronously or synchronously. In case of synchronous execution the task will be deferred until the get() or wait() function of the std::future object returned by the std::async() is called. In the above call if any of the 1st two sync() call is deferred then it will cause deadlock.

    C++11 also added new feature called atomicity to make a data access thread safe by making it atomic. For the purpose <atomic> header is included the std::atomic<t> template class is used. You can use it for primitive data types like int, bool etc. For example std::atomic<int> is used to declare an atomic int variable. Operators(like ++) applicable to int data types are implemented in std::atomic<int> specialized class in a thread safe manner.

    Using C++11 threads and asynchronous operation is high level approach of concurrency. There could be scenarios where you want more control at the lower level. In that case you will need to use native threading model. Applications concurrency strategy should be a design decision and not coding outcome mode. If multi-threading is introduced casually you are bound to introduce critical bugs difficult to identify. It is difficult to avoid multi-threading and also difficult to synchronize threads.
     

Share This Page

  1. This site uses cookies to help personalise content, tailor your experience and to keep you logged in if you register.
    By continuing to use this site, you are consenting to our use of cookies.
    Dismiss Notice