preloader

Explaining the 5 Python thread locks

post thumb
Python
by Admin/ on 20 Nov 2021

Explaining the 5 Python thread locks


This article will focus on the threading module, and for everyday developers, this content is a must-have, and also a high frequency interview FAQ.

Official documentation (https://docs.python.org/zh-cn/3.6/library/threading.html)

Thread safety


Thread safety is a concept in multi-threaded or multi-process programming. In a program where multiple threads with shared data are executed in parallel, thread-safe code will ensure that each thread is executed properly and correctly through a synchronization mechanism, without data contamination or other unexpected situations.

For example, if there are 10 candies (resources) in a room (process), and there are 3 villains (1 main thread and 2 sub-threads), when villain A eats 3 candies and is forced to rest by the system, he thinks there are 7 candies left, and when villain B eats 3 candies after working, then when villain A comes back on duty, he thinks there are 7 candies left, but in fact there are only 4.

The above example where the data of thread A and thread B are not synchronized is a thread safety issue which can lead to very serious surprises, let’s go by the following example.

Here we have a value num with an initial value of 0. We open 2 threads.

  • Thread 1 performs a 10 million + 1 operation on num

  • Thread 2 performs a -1 operation on num 10 million times

The result may be staggering, as num does not end up being 0 as we thought.

import threading

num = 0

def add():
    global num
    for i in range(10_000_000):
        num += 1

def sub():
    global num
    for i in range(10_000_000):
        num -= 1

if __name__ == "__main__":
    subThread01 = threading.Thread(target=add)
    subThread02 = threading.

    subThread01.start()
    subThread02.start()

    subThread01.join()
    subThread02.join()

    print("num result : %s" % num)

# The results are collected three times
# num result : 669214
# num result : -1849179
# num result : -525674

This is a very good case above, and to solve this problem we have to secure the timing of thread switching through locks.

It is worth noting that the Python basic data types list, tuple, and dict are thread-safe, so if there are multiple threads operating on these three containers, we don’t need to consider thread-safety issues.

The role of locks


Locks are a means by which Python provides us with the ability to manipulate thread switching on our own, and they can be used to make thread switching orderly.

Once thread switching is ordered, access and modification of data between threads becomes controlled, so to ensure thread safety, locks must be used.

The threading module provides the five most common types of locks, which are divided by function as follows.

  • synchronous locks: lock (only one can be released at a time)
  • recursive locks: rlock (only one can be released at a time)
  • conditional locks: condition (any one can be released at a time)
  • Event lock: event (all at once)
  • semaphore lock: semaphore (can release a specific one at a time)

1. Lock() synchronous lock


Basic introduction

Lock lock has many names, such as.

  • Synchronous lock
  • Mutual exclusion lock

What do they mean? As follows.

  1. Mutual exclusion means that a resource can be accessed by only one visitor at the same time, and is unique and exclusive, but mutual exclusion cannot restrict the order of access to the resource by the visitor, i.e., the access is unordered

  2. Synchronization means that on the basis of mutual exclusion (in most cases), other mechanisms are used to achieve orderly access to resources by visitors

  3. Synchronization is actually a more complex implementation of mutual exclusion, because it implements orderly access on top of mutual exclusion

The following methods are provided by the threading module in connection with synchronous locks.

Method Description
threading.Lock() returns a synchronous lock object
lockObject.acquire(blocking=True, timeout=1) lock, when a thread is executing the locked block, it will not be allowed to switch to other threads, the default lock expiration time is 1 second
lockObject.release() Unlock, when a thread is executing an unlocked block, it will allow the system to switch to other threads according to the policy
lockObject.locked() determines whether the lock object is locked or not, and returns a boolean value

Usage


Synchronous locks can only release one thread at a time. A locked thread will not surrender execution rights while running, but will only hand over execution rights to other threads through system scheduling when the thread is unlocked.

The top problem is solved using synchronous locking as follows.

import threading

num = 0

def add():
    lock.acquire()
    global num
    for i in range(10_000_000):
        num += 1
    lock.release()

def sub():
    lock.acquire()
    global num
    for i in range(10_000_000):
        num -= 1
    lock.release()

if __name__ == "__main__":
    lock = threading.Lock()

    subThread01 = threading.Thread(target=add)
    subThread02 = threading.

    subThread01.start()
    subThread02.start()

    subThread01.join()
    subThread02.join()

    print("num result : %s" % num)

# The results are collected three times
# num result : 0
# num result : 0
# num result : 0

This makes the code completely serial, which is not as fast as directly using serialized single-threaded execution for such computationally intensive I/O operations, so this example is only meant as an example and does not outline the real use of locks.

Deadlock phenomenon


For synchronous locks, one acquire() must correspond to one release(), and the operation of using multiple acquires() followed by multiple releases() cannot be repeated continuously, which will cause a deadlock causing the program to block and not move at all, as follows.

import threading

num = 0

def add():
    lock.acquire() # locking
    lock.acquire() # deadlock
    # Do not execute
    global num
    for i in range(10_000_000):
        num += 1
    lock.release()
    lock.release()

def sub():
    lock.acquire() # locking
    lock.acquire() # deadlock
    # Do not execute
    global num
    for i in range(10_000_000):
        num -= 1
    lock.release()
    lock.release()

if __name__ == "__main__":
    lock = threading.Lock()

    subThread01 = threading.Thread(target=add)
    subThread02 = threading.

    subThread01.start()
    subThread02.start()

    subThread01.join()
    subThread02.join()

    print("num result : %s" % num)

The with statement


Since the __enter__() and __exit__() methods are implemented in the threading.Lock() object, we can use the with statement to perform context-managed locking and unlocking operations in the following way.

import threading

num = 0

def add():
    with lock:
        # auto-lock
        global num
        for i in range(10_000_000):
            num += 1
        # Auto-unlock

def sub():
    with lock:
        # Auto-lock
        global num
        for i in range(10_000_000):
            num -= 1
        # Auto-unlock

if __name__ == "__main__":
    lock = threading.Lock()

    subThread01 = threading.Thread(target=add)
    subThread02 = threading.

    subThread01.start()
    subThread02.start()

    subThread01.join()
    subThread02.join()

    print("num result : %s" % num)
    
# The results are collected three times
# num result : 0
# num result : 0
# num result : 0

2. RLock() Recursive lock


Basic introduction

Recursive locking is an upgraded version of synchronous locking, which can be done on the basis of synchronous locking by repeatedly using acquire() and then repeatedly using release(), but it must be noted that the number of locks and unlocks must be the same, otherwise it will also cause deadlock phenomenon.

The following methods are provided by the threading module with recursive locks.

Method Description
threading.RLock() returns a recursive lock object
lockObject.acquire(blocking=True, timeout=1) lock, when a thread is executing the locked block, it will not be allowed to switch to other threads, the default lock expiration time is 1 second
lockObject.release() Unlock, when a thread is executing an unlocked block, it will allow the system to switch to other threads according to the policy
lockObject.locked() determines whether the lock object is locked or not, and returns a boolean value

Usage

The following is a simple use of recursive locking. If you use synchronous locking, deadlocking will occur, but recursive locking will not.

import threading

num = 0

def add():
    lock.acquire()
    lock.acquire()
    global num
    for i in range(10_000_000):
        num += 1
    lock.release()
    lock.release()

def sub():
    lock.acquire()
    lock.acquire()
    global num
    for i in range(10_000_000):
        num -= 1
    lock.release()
    lock.release()

if __name__ == "__main__":
    lock = threading.RLock()

    subThread01 = threading.Thread(target=add)
    subThread02 = threading.

    subThread01.start()
    subThread02.start()

    subThread01.join()
    subThread02.join()

    print("num result : %s" % num)

# The results are collected three times
# num result : 0
# num result : 0
# num result : 0

The with statement

Since the __enter__() and __exit__() methods are implemented in the threading.RLock() object, we can use the with statement to perform context-managed locking and unlocking operations in the form of

import threading

num = 0

def add():
    with lock:
        # auto-lock
        global num
        for i in range(10_000_000):
            num += 1
        # Auto-unlock

def sub():
    with lock:
        # Auto-lock
        global num
        for i in range(10_000_000):
            num -= 1
        # Auto-unlock

if __name__ == "__main__":
    lock = threading.RLock()

    subThread01 = threading.Thread(target=add)
    subThread02 = threading.

    subThread01.start()
    subThread02.start()

    subThread01.join()
    subThread02.join()

    print("num result : %s" % num)

# The results are collected three times
# num result : 0
# num result : 0
# num result : 0

3. Condition() Condition lock


Basic introduction

Condition lock is based on the recursive lock to add the function to suspend the running of the thread. And we can use wait() and notify() to control the number of threads executed.

Note: Conditional locks can be freely set to release several threads at a time.

The following methods are provided by the threading module and the conditional lock.

Method Description
threading.Condition() returns a conditional lock object
lockObject.acquire(blocking=True, timeout=1) lock, when a thread is executing the locked block, it will not be allowed to switch to another thread, the default lock expiration time is 1 second
lockObject.release() Unlock, when a thread is executing an unlocked block, it will allow the system to switch to other threads according to the policy
lockObject.wait(timeout=None) sets the current thread to a “wait” state, which will only continue after the thread is “notified” or the timeout expires. The thread in the “wait” state will allow the system to switch to other threads according to the policy
lockObject.wait_for(predicate, timeout=None) sets the current thread to the “waiting” state, and will only continue to run after the thread’s predicate returns a True or the timeout expires. The thread in the “waiting” state will allow the system to switch to other threads according to the policy. Note: the predicate parameter should be passed as a callable object and return a bool type result
lockObject.notify(n=1) notifies a thread with the current status of “waiting” to continue running, or multiple threads with the n parameter
lockObject.notify_all() notifies all threads whose current state is “waiting” to continue running

Usage

The following example starts 10 sub-threads and immediately sets the 10 sub-threads to the waiting state.

Then we can send one or more notifications to resume the waiting subthreads.

import threading

currentRunThreadNumber = 0
maxSubThreadNumber = 10

def task():
    global currentRunThreadNumber
    thName = threading.currentThread().name

    condLock.acquire() # lock
    print("start and wait run thread : %s" % thName)

    condLock.wait() # suspend the thread and wait to wake it up
    currentRunThreadNumber += 1
    print("carry on run thread : %s" % thName)

    condLock.release() # unlock

if __name__ == "__main__":
    condLock = threading.Condition()

    for i in range(maxSubThreadNumber):
        subThreadIns = threading.Thread(target=task)
        subThreadIns.start()

    while currentRunThreadNumber < maxSubThreadNumber:
        notifyNumber = int(
            input("Please enter the number of threads that need to be notified to run:"))

        condLock.acquire()
        condLock.notify(notifyNumber) # release
        condLock.release()

    print("main thread run end")
    
# Start 10 subthreads first, then all of them will become waiting
# start and wait run thread : Thread-1
# start and wait run thread : Thread-2
# start and wait run thread : Thread-3
# start and wait run thread : Thread-4
# start and wait run thread : Thread-5
# start and wait run thread : Thread-6
# start and wait run thread : Thread-7
# start and wait run thread : Thread-8
# start and wait run thread : Thread-9
# start and wait run thread : Thread-10

# Batch send notification to release a specific number of sub threads to continue running
# Please enter the number of threads that need to be notified to run: 5 # Release 5
# carry on run thread : Thread-4
# carry on run thread : Thread-3
# carry on run thread : Thread-1
# carry on run thread : Thread-2
# carry on run thread : Thread-5

# Please enter the number of threads that need to be notified to run : 5 # release 5
# carry on run thread : Thread-8
# carry on run thread : Thread-10
# carry on run thread : Thread-6
# carry on run thread : Thread-9
# carry on run thread : Thread-7

# Please enter the number of threads that need to be notified to run: 1
# main thread run end

with statement

Since the __enter__() and __exit__() methods are implemented in the threading.Condition() object, we can use the with statement to perform context-managed locking and unlocking operations in the form of

import threading

currentRunThreadNumber = 0
maxSubThreadNumber = 10

def task():
    global currentRunThreadNumber
    thName = threading.currentThread().name

    with condLock:
        print("start and wait run thread : %s" % thName)
        condLock.wait() # suspend the thread and wait to wake it up
        currentRunThreadNumber += 1
        print("carry on run thread : %s" % thName)

if __name__ == "__main__":
    condLock = threading.

    for i in range(maxSubThreadNumber):
        subThreadIns = threading.Thread(target=task)
        subThreadIns.start()

    while currentRunThreadNumber < maxSubThreadNumber:
        notifyNumber = int(
            input("Please enter the number of threads that need to be notified to run:"))

        with condLock:
            condLock.notify(notifyNumber) # Release

    print("main thread run end")

4. Event() event lock


Basic introduction

Event lock is based on conditional locking. The difference between it and conditional locking is that it can only release all the threads at once, and cannot release any number of child threads to continue running.

We can think of event lock as a traffic light, when the light is red all sub-threads are suspended and enter the “waiting” state, when the light is green all sub-threads are back to “running”.

The following methods are provided by the threading module in relation to the event lock.

Method Description
threading.Event() returns an event lock object
lockObject.clear() sets the event lock to a red light, i.e. all threads are suspended
lockObject.is_set() is used to determine the current event lock status, red is False, green is True
lockObject.set() sets the event lock to a green state, i.e. all threads resume running
lockObject.wait(timeout=None) sets the current thread to the “wait” state, which will continue to run only after the thread receives the “green light” or the timeout expires. The thread in the “wait” state will allow the system to switch to other threads according to the policy.

Usage

Event locks cannot be used with the with statement, only in the usual way.

Let’s simulate the operation of a thread and a traffic light, stop on red and go on green as follows

import threading

maxSubThreadNumber = 3

def task():
    thName = threading.currentThread().name
    print("start and wait run thread : %s" % thName)
    eventLock.wait() # pause run and wait for green light
    print("green light, %s carry on run" % thName)
    print("red light, %s stop run" % thName)
    eventLock.wait() # pause run, wait for green light
    print("green light, %s carry on run" % thName)
    print("sub thread %s run end" % thName)

if __name__ == "__main__":

    eventLock = threading.Event()

    for i in range(maxSubThreadNumber):
        subThreadIns = threading.Thread(target=task)
        subThreadIns.start()

    eventLock.set() # set to green
    eventLock.clear() # set to red
    eventLock.set() # set to green

# start and wait run thread : Thread-1
# start and wait run thread : Thread-2
# start and wait run thread : Thread-3

# green light, Thread-1 carry on run
# red light, Thread-1 stop run
# green light, Thread-1 carry on run
# sub thread Thread-1 run end

# green light, Thread-3 carry on run
# red light, Thread-3 stop run
# green light, Thread-3 carry on run
# sub thread Thread-3 run end

# green light, Thread-2 carry on run
# red light, Thread-2 stop run
# green light, Thread-2 carry on run
# sub thread Thread-2 run end

5. Semaphore() semaphore lock


Basic Introduction

A semaphore lock is also based on a conditional lock. It differs from a conditional lock and an event lock as follows.

Conditional lock: You can release any thread that is in the “waiting” state at one time.

Event lock: All threads in the “waiting” state can be released at once.

Semaphore locks: a specified number of threads can be released in a batch in a “locked” state.

The following methods are provided by the threading module in relation to semaphore locks.

Method Description
threading.Semaphore() returns a semaphore lock object
lockObject.acquire(blocking=True, timeout=1) lock, when a thread is executing a locked block, it will not be allowed to switch to another thread, the default lock expiration time is 1 second
lockObject.release() Unlock, when a thread is executing an unlocked block, it will allow the system to switch to other threads according to the policy

Usage

The following is a sample usage, which you can use as a width-limited section where only the same number of threads can be released at a time.

import threading
import time

maxSubThreadNumber = 6

def task():
    thName = threading.currentThread().name
    semaLock.acquire()
    print("run sub thread %s" % thName)
    time.sleep(3)
    semaLock.release()

if __name__ == "__main__":
    # Only 2 can be released at a time
    semaLock = threading.Semaphore(2)

    for i in range(maxSubThreadNumber):
        subThreadIns = threading.Thread(target=task)
        subThreadIns.start()


# run sub thread Thread-1
# run sub thread Thread-2

# run sub thread Thread-3
# run sub thread Thread-4

# run sub thread Thread-6
# run sub thread Thread-5

The with statement

Since the __enter__() and __exit__() methods are implemented in the threading.Semaphore() object, we can use the with statement to perform context-managed locking and unlocking operations.

import threading
import time

maxSubThreadNumber = 6

def task():
    thName = threading.currentThread().name
    with semaLock:
        print("run sub thread %s" % thName)
        time.sleep(3)


if __name__ == "__main__":

    semaLock = threading.Semaphore(2)

    for i in range(maxSubThreadNumber):
        subThreadIns = threading.Thread(target=task)
        subThreadIns.start()

Lock Relationships


The above 5 types of locks can be said to be based on synchronous locks to do, which you can find from the source code.

First, let’s look at the RLock recursive lock. The implementation of recursive lock is very simple, it maintains an internal counter, when the counter is not 0, the thread cannot be switched by I/O operations and time polling mechanism. This is not the case when the counter is 0:

def __init__(self):
    self._block = _allocate_lock()
    self._owner = None
    self._count = 0 # counter

The Condition conditional lock actually has two locks inside, a bottom-level lock (synchronous lock) and a high-level lock (recursive lock).

There are two ways to unlock the low-level lock. Using the wait() method temporarily unlocks the bottom-level lock and adds a high-level lock, and only when it receives a notfiy() from another thread does it unlock the high-level lock and re-lock the low-level lock, which means that the condition lock is implemented based on the constant switching between synchronous and recursive locks.

def __init__(self, lock=None):
    if lock is None:
        lock = RLock() # You can see that conditional locking is internally based on recursive locking, which in turn is based on synchronous locking
    self._lock = lock

    self.acquire = lock.acquire
    self.release = lock.release
    try:
        self._release_save = lock._release_save
    except AttributeError:
        pass
    try:
        self._acquire_restore = lock._acquire_restore
    except AttributeError:
        pass
    try:
        self._is_owned = lock._is_owned
    except AttributeError:
        pass
    self._waiters = _deque()

Event event locks are internally based on conditional locks to do the following.

class Event:

    def __init__(self):
        self._cond = Condition(Lock()) # Instantiates a conditional lock.
        self._flag = False

    def _reset_internal_locks(self):
        # private! called by Thread._reset_internal_locks by _after_fork()
        self._cond.__init__(Lock())

    def is_set(self):
        """Return true if and only if the internal flag is true."""
        return self._flag

    isSet = is_set

Semaphore semaphore locks are also internally based on conditional locks to do the following.

class Semaphore:

    def __init__(self, value=1):
        if value < 0:
            raise ValueError("semaphore initial value must be >= 0")
        self._cond = Condition(Lock()) # As you can see, a conditional lock is instantiated here
        self._value = value

Basic Exercises


Application of conditional locks

Requirement: An empty list with two threads taking turns adding values to it (one adding an even number, one adding an odd number), so that the values in the list are 1 - 100, and are ordered.

import threading

lst = []

def even():
    """Add even numbers""""
    with condLock:
        for i in range(2, 101, 2):
            # Determine if the current list is exhausted at length 2
            # If so, add an odd number
            # If not, add an even number
            if len(lst) % 2 ! = 0:
                # Add an even number
                lst.append(i) # Add the value first
                condLock.notify() # tell the other thread that you can add the odd number, but here you don't immediately hand over execution
                condLock.wait() # hand over execution rights and wait for another thread to notify to add an even number
            else:
                # Add an odd number
                condLock.wait() # surrender execution rights and wait for another thread to notify to add an even number
                lst.append(i)    
                condLock.notify()
        condLock.notify()

def odd():
    """add odd numbers""""
    with condLock:
        for i in range(1, 101, 2):
            if len(lst) % 2 == 0:
                lst.append(i)
                condLock.notify()
                condLock.wait()
        condLock.notify()

if __name__ == "__main__":
    condLock = threading.Condition()

    addEvenTask = threading.Thread(target=even)
    addOddTask = threading.

    addEvenTask.start()
    addOddTask.start()

    addEvenTask.join()
    addOddTask.join()

    print(lst)

Application of event locks

There are 2 task threads to play Li Bai and Du Fu, how can we make them reply to each other in one sentence? The text is as follows.

Du Fu: Old Li, come drink!

Li Bai: Old Du, I can't drink anymore!

Du Fu: Old Li, one more pot?

Du Fu: ... old Li?

Li Bai: Hoo hoo hoo... fell asleep...

The code is as follows.

import threading

def libai():
    event.wait()  
    print("Li Bai: Lao Du ah, do not drink I can not drink!")
    event.set()
    event.clear()
    event.wait()
    print("Li Bai: Hoo hoo hoo... Sleeping...")

def dufu():
    print("Dufu: Old Li, come drink!")
    event.set()  
    event.clear()
    event.wait()
    print("Du Fu: Old Li ah, another pot?")
    print("Du Fu: ... Old Li?")
    event.set()

if __name__ == '__main__':

    event = threading.Event()

    t1 = threading.Thread(target=libai)
    Thread(target=dufu)

    t1.start()
    t2.start()
    t1.join()
    t2.join()

Source


comments powered by Disqus