POSIX Threads Synchronization
POSIX threads aka pthreads, provide multiple flows of execution within a process. The threads have their own stacks but share the global data and the heap. So the global data is visible to multiple threads of a process. The threads need to synchronize their actions so that a thread does not access shared data while it is being modified by some other thread and a thread does not waste processor time busy-waiting for a key shared variable to reach a certain desired value. Mutual exclusion and synchronization are key problems in concurrent programming and need to be taken care of correctly for proper working of multi-threaded systems.
Table of Contents
1.0 Mutual Exclusion
The global variables are visible to multiple threads. To illustrate the need for mutual exclusion, suppose, in a banking application, there is a variable named account_balance, with the value 7000. There are two threads, viz., check_clearance_thread and atm_withdrawal_thread. A customer tries to withdraw 5000 at an ATM. The atm_withdrawal_thread thread gets activated, the account_balance variable is checked and as it has 7000, the withdrawal processing starts. As luck would have it, a check for 4000 comes up for processing at the same time for the customer withdrawing the money via ATM. The check_clearance_thread thread gets activated, the account_balance is checked and found to have 7000, and the check clearance processing starts. By the time the two threads finish, the atm_withdrawal_thread has deducted 5000 from the account_balance and the check_clearance_thread has deducted another 4000 from the account_balance leaving a net -2000 in account_balance. So, there is a serious flaw in our logic. The catch is that the part of processing starting from reading the value of account_balance, processing and finally writing the new balance is a critical section of the code and only one thread should execute it at a time. If any other thread wants to access the account_balance, it should be blocked till the time the first thread finishes its processing and writing the final update of account_balance. The two threads must mutually exclude each other from the critical section. The problem of mutual exclusion is solved using a binary semaphore in the case of concurrent processes and threads.
1.1 Pthreads mutex object
For Pthreads, we have a special locking mechanism for mutual exclusion known as a mutex object. If there are calls analogous to P (mutex) and V (mutex) at the start and end of the critical section of code, only one thread would execute the critical section at any time.
1.2 Pthread mutex creation
The simplest way to initialize a mutex is to define and initialize it as a global variable.
pthread_mutex_t new_mutex = PTHREAD_MUTEX_INITIALIZER;
This can only be done for global variables. For automatic and dynamically allocated variables, it is necessary to initialize the mutex with the pthread_mutex_init call.
The basic calls for using mutex are the pthread_mutex_lock and pthread_mutex_unlock calls.
1.3 pthread_mutex_lock
#include <pthread.h> int pthread_mutex_lock (pthread_mutex_t *mutex);
pthread_mutex_lock locks the mutex identified by the pointer passed as the argument. If the mutex is already locked, the call blocks till the time mutex becomes available for locking.
1.4 pthread_mutex_unlock
#include <pthread.h> int pthread_mutex_unlock (pthread_mutex_t *mutex);
pthread_mutex_unlock unlocks the mutex identified by the pointer passed as the argument.
2.0 Condition Variables
Mutex's are like binary semaphores that help threads in mutually excluding each other from executing critical sections of code concurrently. There is another class of synchronization problem where a counting semaphore value represents the number of instances of resource available and P (semaphore) represents acquiring an instance of resource and V (semaphore) represents releasing an instance. That way processes or threads can work easily with available number of instances of that resource concurrently. Pthreads provide condition variables that help in solving this problem in an easy way. Actually, we are interested in the condition, resource_is_available
. If an instance of that resource is available, we can go ahead and take it. If no instance of that resource is available, we wait for a resource to become available.
To solve the problem of synchronized usage of a number of instances of a resource by multiple threads, we need a condition variable, a mutex and a predicate. A predicate could be something like, a resource instance is available
. A condition variable is used for signalling the state of predicate. The mutex provides the mutual exclusion protection for data between multiple threads.
A thread releasing an instance of resource has code like this.
//Thread 1: Release a resource instance ... lock (mutex); release resource instance update resource control data cond_signal (cond, mutex); unlock (mutex); ...
A thread acquiring an instance of resource has code similar to this.
// Thread 2: Acquire a resource instance ... lock (mutex); while (!resource_instance_is_available) cond_wait (cond, mutex); acquire resource instance update resource control data unlock (mutex); ...
The code for the first thread is quite straightforward. It first locks the mutex. It releases an instance of resource and unlocks the mutex. The code for the second thread is interesting. It locks the mutex. Then it does a cond_wait on condition variable, cond using mutex. As soon as the conditional wait starts, the mutex is released. This ensures that the first thread can lock the mutex and do a conditional signal on cond. When cond_wait returns, the mutex is locked and is owned by Thread 2. Thread 2 can acquire the resource, update resource database and, then, unlock the mutex. cond_wait is put in a while loop because a signal might interrupt it, causing it to return without a resource instance becoming available.
2.1 Pthread condition variable creation
The simplest way to initialize a condition variable is to define and initialize it as a global variable.
pthread_cond_t new_cond = PTHREAD_COND_INITIALIZER;
This can only be done for global variables. For automatic and dynamically allocated variables, it is necessary to initialize with the pthread_cond_init call.
The basic calls for using condition variables are pthread_cond_wait and pthread_cond_signal.
2.2 pthread_cond_wait
#include <pthread.h> int pthread_cond_wait (pthread_cond_t *cond, pthread_mutex_t *mutex);
pthread_cond_wait blocks on the condition variable pointed by cond. The mutex is released at the start of block. Some other thread can lock the mutex, issue pthread_cond_signal and unlock the mutex. pthread_cond_wait, then, returns with the mutex locked and owned by the calling thread.
2.3 pthread_cond_signal
#include <pthread.h> int pthread_cond_signal (pthread_cond_t *cond);
pthread_cond_signal unblocks a thread waiting on the condition variable pointed by cond.
3.0 Example Program in C
The following example program illustrates the usage of Pthreads mutex and condition variable calls. It solves the producer – consumer problem. There are ten producer threads that produce lines of text. The consumer thread prints these lines on the terminal.
/* * * pthreads-synch.c: Program to demonstrate Pthreads * synchronization using mutex and * condition variables in C under * Linux (Producer - Consumer problem) */ #include <sys/types.h> #include <stdio.h> #include <string.h> #include <stdlib.h> #include <time.h> #include <errno.h> #include <pthread.h> #include <unistd.h> // Buffer data structures #define MAX_BUFFERS 10 char buf [MAX_BUFFERS] [100]; int buffer_index; int buffer_print_index; pthread_mutex_t buf_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t buf_cond = PTHREAD_COND_INITIALIZER; pthread_cond_t spool_cond = PTHREAD_COND_INITIALIZER; int buffers_available = MAX_BUFFERS; int lines_to_print = 0; void *producer (void *arg); void *spooler (void *arg); int main (int argc, char **argv) { pthread_t tid_producer [10], tid_spooler; int i, r; // initialization buffer_index = buffer_print_index = 0; // Create spooler if ((r = pthread_create (&tid_spooler, NULL, spooler, NULL)) != 0) { fprintf (stderr, "Error = %d (%s)\n", r, strerror (r)); exit (1); } // Create 10 producer threads int thread_no [10]; for (i = 0; i < 10; i++) { thread_no [i] = i; if ((r = pthread_create (&tid_producer [i], NULL, producer, (void *) &thread_no [i])) != 0) { fprintf (stderr, "Error = %d (%s)\n", r, strerror (r)); exit (1); } } // Wait for producers to terminate for (i = 0; i < 10; i++) if ((r = pthread_join (tid_producer [i], NULL)) == -1) { fprintf (stderr, "Error = %d (%s)\n", r, strerror (r)); exit (1); } // No more strings to print? while (lines_to_print) sleep (1); // terminate spooler if ((r = pthread_cancel (tid_spooler)) != 0) { fprintf (stderr, "Error = %d (%s)\n", r, strerror (r)); exit (1); } exit (0); } // producer: produce strings for printing // There might be multiple producer threads void *producer (void *arg) { // Create 10 strings and terminate int i, r; int my_id = *((int *) arg); int count = 0; for (i = 0; i < 10; i++) { // Lock mutex if ((r = pthread_mutex_lock (&buf_mutex)) != 0) { fprintf (stderr, "Error = %d (%s)\n", r, strerror (r)); exit (1); } while (!buffers_available) pthread_cond_wait (&buf_cond, &buf_mutex); int j = buffer_index; buffer_index++; if (buffer_index == MAX_BUFFERS) buffer_index = 0; buffers_available--; // Produce a string sprintf (buf [j], "Thread %d: %d\n", my_id, ++count); lines_to_print++; pthread_cond_signal (&spool_cond); // Unlock mutex if ((r = pthread_mutex_unlock (&buf_mutex)) != 0) { fprintf (stderr, "Error = %d (%s)\n", r, strerror (r)); exit (1); } // Take a nap sleep (1); } } // There is only one spooler thread void *spooler (void *arg) { int r; while (1) { // forever // Lock mutex if ((r = pthread_mutex_lock (&buf_mutex)) != 0) { fprintf (stderr, "Error = %d (%s)\n", r, strerror (r)); exit (1); } while (!lines_to_print) pthread_cond_wait (&spool_cond, &buf_mutex); printf ("%s", buf [buffer_print_index]); lines_to_print--; buffer_print_index++; if (buffer_print_index == MAX_BUFFERS) buffer_print_index = 0; buffers_available++; pthread_cond_signal (&buf_cond); // Unlock mutex if ((r = pthread_mutex_unlock (&buf_mutex)) != 0) { fprintf (stderr, "Error = %d (%s)\n", r, strerror (r)); exit (1); } } }
We can compile and run the above program as below.
$ gcc pthreads-synch.c -o pthreads-synch -lpthread $ ./pthreads-synch Thread 0: 1 Thread 3: 1 Thread 2: 1 Thread 7: 1 ...