Python Concurrency(ii)

Photo by Serge Le Strat on Unsplash

In last blog, we talked about concurrency concepts as well as CPU vs. I/O bound tasks briefly. In this blog, we are going to talk about Threading module in Python.

But first let’s define some core concepts:

  • Process: running instance of a computer program;
  • Thread: smallest sequence of instructions that can be managed by the operating system;
  • Scheduler: an operating system module that selects the next jobs to be admitted into the system and the next process to run (it use some algorithms to decide);
  • Context switch: the process of saving and restoring the state of a thread of process (it’s cheaper if the next thread if from the same process otherwise the switch will be expensive);
  • Thread pool: a pool of worker threads that are waiting for the job and reuse many times.

Python’s Thread module — built on top of the low-level _thread module, can run multiple programs in concurrency at the same time.

The signature of the __init__() of the Thread class is below :

def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):

The target function is a callable object that call be called start() to begin, the start() will trigger the built-in run() method (you can overwrite this).

import threadingdef func():
print 'thread function'
return
if __name__ == '__main__':
for i in range(3):
t = threading.Thread(target=func)
t.start()

The start() starts the thread’s activity and invokes the object’s run() method to be invoked in a separate thread of control.

We can also pass arguments to the target function, the arguments need to be passed in as a tuple, defaults to ().

import threadingdef func(id):
print 'thread function %s' %(id)
return
if __name__ == '__main__':
for i in range(3):
t = threading.Thread(target=func, args=(i,))
t.start()

We can also construct the thread by overwriting the thread run() method.

import threadingclass NewThing(threading.Thread):
def __init__(self, time):
super(NewThing, self).__init__()
self.start()
def run(self):
print "Running!"
t = NewThing()
t.join()

Then there’s the join() method. We normally use join() to block the calling thread (main thread) until the worker threads (t thread above) whose join() method is called is terminated.

The join() method makes sure that the main thread waits for your thread to finish.

import threadingdef func_a():
time.sleep(6)
return
def func_b():
return
if __name__ == '__main__':
t = threading.Thread(target=func_a)
d = threading.Thread(target=func_b)
t.start()
d.start()
t.join()
print "func_a.join() finished"
d.join()
print "func_b.join() finished"

You will see below even thread t finishes later than thread d (approx. 6 sec) as the main thread needs to wait for thread t before proceed anything else.

func_a.join() finished
func_b.join() finished

But implementing thread is not hassle-free. A typical example of thread interference is race condition.

A race condition occurs when two or more threads can access shared data and they try to change it at the same time. As a result, the values of variables may be unpredictable and vary depending on the timings of context switches of the processes.

example of race condition

As you can see above, both threads compete to read and update the same variable (read and write to the shared memory). As a result the more thread you run, the more complex / slower the process get to cope with thread interference.

To fix this problem, Python introduces Lock class to coordinate among multiple threads.

A lock is in one of 2 states, locked or unlocked. It has 2 basic methods, acquire() and release(). Once a thread locks the resources, they cannot be changed or accessed by another thread until the current one release it.

lock = threading.Lock()
lock.acquire()
try:
...do something to the shared resource
finally:
lock.release()

Following is example of python program to understand the concept of locks for dealing with race condition :

import threadingx = 0def increment_global():   global x
x += 1
def taskofThread(lock): for _ in range(50000):
lock.acquire()
increment_global()
lock.release()
def main():
global x
x = 0
lock = threading.Lock()
t1 = threading.Thread(target = taskofThread, args = (lock,))
t2 = threading.Thread(target = taskofThread, args = (lock,))
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == "__main__":
for i in range(5):
main()
print("x = {1} after Iteration {0}".format(i,x))

Note that the normal Lock objects cannot be acquired more than once, even by the same thread. So in a situation where the same thread needs to “re-acquire” the lock again, we can use an RLock :

import threadinglock = threading.RLock()lock.acquire()
lock.acquire()

We can also use context managers for a with statement when using Lock class.

with some_lock:
# do something...

is the same as:

some_lock.acquire()
try:
# do something...
finally:
some_lock.release()

We can also use the condition check:

if lock.acquire(False):
...do something with the resources
else:
... lock cannot be acquired, do some other stuff

There are other synchronisation primitives such as a semaphore. Think of it as an internal counter which is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; in case if it gets to zero, it suspends the calling green thread until count becomes nonzero again.

This is good for when you want to have more than one worker access to a resource while still limiting the overall number of accesses.

import threading
import time
class ThreadPool(object):
def __init__(self):
super(ThreadPool, self).__init__()
self.active = []
self.lock = threading.Lock()
def makeActive(self, name):
with self.lock:
self.active.append(name)
def makeInactive(self, name):
with self.lock:
self.active.remove(name)
def f(s, pool):
with s:
name = threading.currentThread().getName()
pool.makeActive(name)
time.sleep(0.5)
pool.makeInactive(name)
if __name__ == '__main__':
pool = ThreadPool()
s = threading.Semaphore(3)
for i in range(10):
t = threading.Thread(target=f, name='thread_'+str(i), args=(s, pool))
t.start()

This is one of the simplest mechanisms for communication between threads: one thread signals an event and other threads wait for it.

An event object manages an internal flag that can be set to true with the set() method and reset to false with the clear() method. The wait() method blocks until the flag is true.

is_set: return True if and only if the internal flag is true.

set:set the internal flag to true. Threads that call wait() once the flag is true will not block at all.

clear: reset the internal flag to false. Subsequently, threads calling wait() will block until set() is called to set the internal flag to true again.

wait(timeout=None): block until the internal flag is true. Otherwise, block until another thread calls set() to set the flag to true, or until the optional timeout occurs.

Another, perhaps more common way of implementing inter-thread communication is using Queue. A typical pattern is applying Producer and Consumer with Queue. The Producer thread is responsible for putting items into the queue while the Consumer thread gets items.

from threading import Thread
import time


def producer(queue):
for item in range(5):
queue.put(item)

def consumer(queue):
while True:
item = queue.get()
# do something with the item
queue.task_done()

q = queue.Queue()
threads = [
Thread(target=producer, args=(q,)),
Thread(target=consumer, args=(q,)),
]

for thread in threads:
thread.start()
q.join()

You will notice 4 basic methods:

  • get: get item from the queue
  • put: put item into the queue
  • task_done: mark an item that is gotten from the queue as processed
  • join: block until all items in the queue are processed

Note that both get() and put() are blocking calls. We can specify the maximum number of items in the queue. The put call will block the queue once the queue is full. Get can also block the queue (more frequently than put in reality) when the queue is empty (we can pass in an arg to indicate not to do so ).

As you can see, there are multiple ways to use locks to safeguard the shared resources in python concurrency mode and make it thread-safe. But this can also cause problems.

By adding lock to each thread groups means multiple locks will exist which can cause another problem — decreased performance caused by managing multiple locks as well as Deadlocks.

Deadlock is when you have two or multiple threads blocking each other from progress at the same time. For example, maybe they’ve taken a pair of locks in reverse order, so thread 1 won’t release lock 1 until it gets lock 2, but thread 2 won’t release lock 2 until it gets lock 1.

The Python Global Interpreter Lock or GIL, in simple words, is a mutex (or a lock) that allows only one thread to hold the control of the Python interpreter. This means that only one thread can be in a state of execution at any point in time. This effectively prevent deadlock but it makes any CPU-bound Python program single-threaded.

The GIL, although used by interpreters for other languages like Ruby, is not the only solution to this problem. Some languages avoid the requirement of a GIL for thread-safe memory management by using approaches other than reference counting, such as garbage collection.

So why GIL not anything else then? In Python, a lot of extensions were being written for C libraries. To prevent inconsistent changes, these C extensions required a thread-safe memory management which the GIL provided.

There are of course workarounds for GIL-less Python interpreters like Jython and IronPython. You can also adopt Multiprocessing instead which we will talk about in next blog.

That’s so much of it.

Happy Reading!

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store