This article will focus on the threading module, and for everyday developers, this content is a must-have, and also a high frequency interview FAQ.
Official documentation (https://docs.python.org/zh-cn/3.6/library/threading.html)
Thread safety
Thread safety is a concept in multi-threaded or multi-process programming. In a program where multiple threads with shared data are executed in parallel, thread-safe code will ensure that each thread is executed properly and correctly through a synchronization mechanism, without data contamination or other unexpected situations.
For example, if there are 10 candies (resources) in a room (process), and there are 3 villains (1 main thread and 2 sub-threads), when villain A eats 3 candies and is forced to rest by the system, he thinks there are 7 candies left, and when villain B eats 3 candies after working, then when villain A comes back on duty, he thinks there are 7 candies left, but in fact there are only 4.
The above example where the data of thread A and thread B are not synchronized is a thread safety issue which can lead to very serious surprises, let’s go by the following example.
Here we have a value num
with an initial value of 0. We open 2 threads.
-
Thread 1 performs a 10 million + 1 operation on num
-
Thread 2 performs a -1 operation on
num
10 million times
The result may be staggering, as num
does not end up being 0 as we thought.
import threading
num = 0
def add():
global num
for i in range(10_000_000):
num += 1
def sub():
global num
for i in range(10_000_000):
num -= 1
if __name__ == "__main__":
subThread01 = threading.Thread(target=add)
subThread02 = threading.
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# The results are collected three times
# num result : 669214
# num result : -1849179
# num result : -525674
This is a very good case above, and to solve this problem we have to secure the timing of thread switching through locks.
It is worth noting that the Python basic data types list, tuple, and dict are thread-safe, so if there are multiple threads operating on these three containers, we don’t need to consider thread-safety issues.
The role of locks
Locks are a means by which Python provides us with the ability to manipulate thread switching on our own, and they can be used to make thread switching orderly.
Once thread switching is ordered, access and modification of data between threads becomes controlled, so to ensure thread safety, locks must be used.
The threading module provides the five most common types of locks, which are divided by function as follows.
- synchronous locks: lock (only one can be released at a time)
- recursive locks: rlock (only one can be released at a time)
- conditional locks: condition (any one can be released at a time)
- Event lock: event (all at once)
- semaphore lock: semaphore (can release a specific one at a time)
1. Lock() synchronous lock
Basic introduction
Lock lock has many names, such as.
- Synchronous lock
- Mutual exclusion lock
What do they mean? As follows.
-
Mutual exclusion means that a resource can be accessed by only one visitor at the same time, and is unique and exclusive, but mutual exclusion cannot restrict the order of access to the resource by the visitor, i.e., the access is unordered
-
Synchronization means that on the basis of mutual exclusion (in most cases), other mechanisms are used to achieve orderly access to resources by visitors
-
Synchronization is actually a more complex implementation of mutual exclusion, because it implements orderly access on top of mutual exclusion
The following methods are provided by the threading module in connection with synchronous locks.
Method | Description |
---|---|
threading.Lock() |
returns a synchronous lock object |
lockObject.acquire(blocking=True, timeout=1) |
lock, when a thread is executing the locked block, it will not be allowed to switch to other threads, the default lock expiration time is 1 second |
lockObject.release() |
Unlock, when a thread is executing an unlocked block, it will allow the system to switch to other threads according to the policy |
lockObject.locked() |
determines whether the lock object is locked or not, and returns a boolean value |
Usage
Synchronous locks can only release one thread at a time. A locked thread will not surrender execution rights while running, but will only hand over execution rights to other threads through system scheduling when the thread is unlocked.
The top problem is solved using synchronous locking as follows.
import threading
num = 0
def add():
lock.acquire()
global num
for i in range(10_000_000):
num += 1
lock.release()
def sub():
lock.acquire()
global num
for i in range(10_000_000):
num -= 1
lock.release()
if __name__ == "__main__":
lock = threading.Lock()
subThread01 = threading.Thread(target=add)
subThread02 = threading.
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# The results are collected three times
# num result : 0
# num result : 0
# num result : 0
This makes the code completely serial, which is not as fast as directly using serialized single-threaded execution for such computationally intensive I/O operations, so this example is only meant as an example and does not outline the real use of locks.
Deadlock phenomenon
For synchronous locks, one acquire() must correspond to one release()
, and the operation of using multiple acquires()
followed by multiple releases()
cannot be repeated continuously, which will cause a deadlock causing the program to block and not move at all, as follows.
import threading
num = 0
def add():
lock.acquire() # locking
lock.acquire() # deadlock
# Do not execute
global num
for i in range(10_000_000):
num += 1
lock.release()
lock.release()
def sub():
lock.acquire() # locking
lock.acquire() # deadlock
# Do not execute
global num
for i in range(10_000_000):
num -= 1
lock.release()
lock.release()
if __name__ == "__main__":
lock = threading.Lock()
subThread01 = threading.Thread(target=add)
subThread02 = threading.
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
The with statement
Since the __enter__()
and __exit__()
methods are implemented in the threading.Lock()
object, we can use the with statement to perform context-managed locking and unlocking operations in the following way.
import threading
num = 0
def add():
with lock:
# auto-lock
global num
for i in range(10_000_000):
num += 1
# Auto-unlock
def sub():
with lock:
# Auto-lock
global num
for i in range(10_000_000):
num -= 1
# Auto-unlock
if __name__ == "__main__":
lock = threading.Lock()
subThread01 = threading.Thread(target=add)
subThread02 = threading.
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# The results are collected three times
# num result : 0
# num result : 0
# num result : 0
2. RLock() Recursive lock
Basic introduction
Recursive locking is an upgraded version of synchronous locking, which can be done on the basis of synchronous locking by repeatedly using acquire()
and then repeatedly using release()
, but it must be noted that the number of locks and unlocks must be the same, otherwise it will also cause deadlock phenomenon.
The following methods are provided by the threading module with recursive locks.
Method | Description |
---|---|
threading.RLock() |
returns a recursive lock object |
lockObject.acquire(blocking=True, timeout=1) |
lock, when a thread is executing the locked block, it will not be allowed to switch to other threads, the default lock expiration time is 1 second |
lockObject.release() |
Unlock, when a thread is executing an unlocked block, it will allow the system to switch to other threads according to the policy |
lockObject.locked() |
determines whether the lock object is locked or not, and returns a boolean value |
Usage
The following is a simple use of recursive locking. If you use synchronous locking, deadlocking will occur, but recursive locking will not.
import threading
num = 0
def add():
lock.acquire()
lock.acquire()
global num
for i in range(10_000_000):
num += 1
lock.release()
lock.release()
def sub():
lock.acquire()
lock.acquire()
global num
for i in range(10_000_000):
num -= 1
lock.release()
lock.release()
if __name__ == "__main__":
lock = threading.RLock()
subThread01 = threading.Thread(target=add)
subThread02 = threading.
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# The results are collected three times
# num result : 0
# num result : 0
# num result : 0
The with statement
Since the __enter__()
and __exit__()
methods are implemented in the threading.RLock()
object, we can use the with statement to perform context-managed locking and unlocking operations in the form of
import threading
num = 0
def add():
with lock:
# auto-lock
global num
for i in range(10_000_000):
num += 1
# Auto-unlock
def sub():
with lock:
# Auto-lock
global num
for i in range(10_000_000):
num -= 1
# Auto-unlock
if __name__ == "__main__":
lock = threading.RLock()
subThread01 = threading.Thread(target=add)
subThread02 = threading.
subThread01.start()
subThread02.start()
subThread01.join()
subThread02.join()
print("num result : %s" % num)
# The results are collected three times
# num result : 0
# num result : 0
# num result : 0
3. Condition() Condition lock
Basic introduction
Condition lock is based on the recursive lock to add the function to suspend the running of the thread. And we can use wait()
and notify()
to control the number of threads executed.
Note: Conditional locks can be freely set to release several threads at a time.
The following methods are provided by the threading module and the conditional lock.
Method | Description |
---|---|
threading.Condition() |
returns a conditional lock object |
lockObject.acquire(blocking=True, timeout=1) |
lock, when a thread is executing the locked block, it will not be allowed to switch to another thread, the default lock expiration time is 1 second |
lockObject.release() |
Unlock, when a thread is executing an unlocked block, it will allow the system to switch to other threads according to the policy |
lockObject.wait(timeout=None) |
sets the current thread to a “wait” state, which will only continue after the thread is “notified” or the timeout expires. The thread in the “wait” state will allow the system to switch to other threads according to the policy |
lockObject.wait_for(predicate, timeout=None) |
sets the current thread to the “waiting” state, and will only continue to run after the thread’s predicate returns a True or the timeout expires. The thread in the “waiting” state will allow the system to switch to other threads according to the policy. Note: the predicate parameter should be passed as a callable object and return a bool type result |
lockObject.notify(n=1) |
notifies a thread with the current status of “waiting” to continue running, or multiple threads with the n parameter |
lockObject.notify_all() |
notifies all threads whose current state is “waiting” to continue running |
Usage
The following example starts 10 sub-threads and immediately sets the 10 sub-threads to the waiting state.
Then we can send one or more notifications to resume the waiting subthreads.
import threading
currentRunThreadNumber = 0
maxSubThreadNumber = 10
def task():
global currentRunThreadNumber
thName = threading.currentThread().name
condLock.acquire() # lock
print("start and wait run thread : %s" % thName)
condLock.wait() # suspend the thread and wait to wake it up
currentRunThreadNumber += 1
print("carry on run thread : %s" % thName)
condLock.release() # unlock
if __name__ == "__main__":
condLock = threading.Condition()
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
while currentRunThreadNumber < maxSubThreadNumber:
notifyNumber = int(
input("Please enter the number of threads that need to be notified to run:"))
condLock.acquire()
condLock.notify(notifyNumber) # release
condLock.release()
print("main thread run end")
# Start 10 subthreads first, then all of them will become waiting
# start and wait run thread : Thread-1
# start and wait run thread : Thread-2
# start and wait run thread : Thread-3
# start and wait run thread : Thread-4
# start and wait run thread : Thread-5
# start and wait run thread : Thread-6
# start and wait run thread : Thread-7
# start and wait run thread : Thread-8
# start and wait run thread : Thread-9
# start and wait run thread : Thread-10
# Batch send notification to release a specific number of sub threads to continue running
# Please enter the number of threads that need to be notified to run: 5 # Release 5
# carry on run thread : Thread-4
# carry on run thread : Thread-3
# carry on run thread : Thread-1
# carry on run thread : Thread-2
# carry on run thread : Thread-5
# Please enter the number of threads that need to be notified to run : 5 # release 5
# carry on run thread : Thread-8
# carry on run thread : Thread-10
# carry on run thread : Thread-6
# carry on run thread : Thread-9
# carry on run thread : Thread-7
# Please enter the number of threads that need to be notified to run: 1
# main thread run end
with statement
Since the __enter__()
and __exit__()
methods are implemented in the threading.Condition()
object, we can use the with statement to perform context-managed locking and unlocking operations in the form of
import threading
currentRunThreadNumber = 0
maxSubThreadNumber = 10
def task():
global currentRunThreadNumber
thName = threading.currentThread().name
with condLock:
print("start and wait run thread : %s" % thName)
condLock.wait() # suspend the thread and wait to wake it up
currentRunThreadNumber += 1
print("carry on run thread : %s" % thName)
if __name__ == "__main__":
condLock = threading.
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
while currentRunThreadNumber < maxSubThreadNumber:
notifyNumber = int(
input("Please enter the number of threads that need to be notified to run:"))
with condLock:
condLock.notify(notifyNumber) # Release
print("main thread run end")
4. Event() event lock
Basic introduction
Event lock is based on conditional locking. The difference between it and conditional locking is that it can only release all the threads at once, and cannot release any number of child threads to continue running.
We can think of event lock as a traffic light, when the light is red all sub-threads are suspended and enter the “waiting” state, when the light is green all sub-threads are back to “running”.
The following methods are provided by the threading module in relation to the event lock.
Method | Description |
---|---|
threading.Event() |
returns an event lock object |
lockObject.clear() |
sets the event lock to a red light, i.e. all threads are suspended |
lockObject.is_set() |
is used to determine the current event lock status, red is False, green is True |
lockObject.set() |
sets the event lock to a green state, i.e. all threads resume running |
lockObject.wait(timeout=None) |
sets the current thread to the “wait” state, which will continue to run only after the thread receives the “green light” or the timeout expires. The thread in the “wait” state will allow the system to switch to other threads according to the policy. |
Usage
Event locks cannot be used with the with statement, only in the usual way.
Let’s simulate the operation of a thread and a traffic light, stop on red and go on green as follows
import threading
maxSubThreadNumber = 3
def task():
thName = threading.currentThread().name
print("start and wait run thread : %s" % thName)
eventLock.wait() # pause run and wait for green light
print("green light, %s carry on run" % thName)
print("red light, %s stop run" % thName)
eventLock.wait() # pause run, wait for green light
print("green light, %s carry on run" % thName)
print("sub thread %s run end" % thName)
if __name__ == "__main__":
eventLock = threading.Event()
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
eventLock.set() # set to green
eventLock.clear() # set to red
eventLock.set() # set to green
# start and wait run thread : Thread-1
# start and wait run thread : Thread-2
# start and wait run thread : Thread-3
# green light, Thread-1 carry on run
# red light, Thread-1 stop run
# green light, Thread-1 carry on run
# sub thread Thread-1 run end
# green light, Thread-3 carry on run
# red light, Thread-3 stop run
# green light, Thread-3 carry on run
# sub thread Thread-3 run end
# green light, Thread-2 carry on run
# red light, Thread-2 stop run
# green light, Thread-2 carry on run
# sub thread Thread-2 run end
5. Semaphore() semaphore lock
Basic Introduction
A semaphore lock is also based on a conditional lock. It differs from a conditional lock and an event lock as follows.
Conditional lock: You can release any thread that is in the “waiting” state at one time.
Event lock: All threads in the “waiting” state can be released at once.
Semaphore locks: a specified number of threads can be released in a batch in a “locked” state.
The following methods are provided by the threading module in relation to semaphore locks.
Method | Description |
---|---|
threading.Semaphore() |
returns a semaphore lock object |
lockObject.acquire(blocking=True, timeout=1) |
lock, when a thread is executing a locked block, it will not be allowed to switch to another thread, the default lock expiration time is 1 second |
lockObject.release() |
Unlock, when a thread is executing an unlocked block, it will allow the system to switch to other threads according to the policy |
Usage
The following is a sample usage, which you can use as a width-limited section where only the same number of threads can be released at a time.
import threading
import time
maxSubThreadNumber = 6
def task():
thName = threading.currentThread().name
semaLock.acquire()
print("run sub thread %s" % thName)
time.sleep(3)
semaLock.release()
if __name__ == "__main__":
# Only 2 can be released at a time
semaLock = threading.Semaphore(2)
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
# run sub thread Thread-1
# run sub thread Thread-2
# run sub thread Thread-3
# run sub thread Thread-4
# run sub thread Thread-6
# run sub thread Thread-5
The with statement
Since the __enter__()
and __exit__()
methods are implemented in the threading.Semaphore()
object, we can use the with statement to perform context-managed locking and unlocking operations.
import threading
import time
maxSubThreadNumber = 6
def task():
thName = threading.currentThread().name
with semaLock:
print("run sub thread %s" % thName)
time.sleep(3)
if __name__ == "__main__":
semaLock = threading.Semaphore(2)
for i in range(maxSubThreadNumber):
subThreadIns = threading.Thread(target=task)
subThreadIns.start()
Lock Relationships
The above 5 types of locks can be said to be based on synchronous locks to do, which you can find from the source code.
First, let’s look at the RLock recursive lock. The implementation of recursive lock is very simple, it maintains an internal counter, when the counter is not 0, the thread cannot be switched by I/O operations and time polling mechanism. This is not the case when the counter is 0:
def __init__(self):
self._block = _allocate_lock()
self._owner = None
self._count = 0 # counter
The Condition conditional lock actually has two locks inside, a bottom-level lock (synchronous lock) and a high-level lock (recursive lock).
There are two ways to unlock the low-level lock. Using the wait()
method temporarily unlocks the bottom-level lock and adds a high-level lock, and only when it receives a notfiy() from another thread does it unlock the high-level lock and re-lock the low-level lock, which means that the condition lock is implemented based on the constant switching between synchronous and recursive locks.
def __init__(self, lock=None):
if lock is None:
lock = RLock() # You can see that conditional locking is internally based on recursive locking, which in turn is based on synchronous locking
self._lock = lock
self.acquire = lock.acquire
self.release = lock.release
try:
self._release_save = lock._release_save
except AttributeError:
pass
try:
self._acquire_restore = lock._acquire_restore
except AttributeError:
pass
try:
self._is_owned = lock._is_owned
except AttributeError:
pass
self._waiters = _deque()
Event event locks are internally based on conditional locks to do the following.
class Event:
def __init__(self):
self._cond = Condition(Lock()) # Instantiates a conditional lock.
self._flag = False
def _reset_internal_locks(self):
# private! called by Thread._reset_internal_locks by _after_fork()
self._cond.__init__(Lock())
def is_set(self):
"""Return true if and only if the internal flag is true."""
return self._flag
isSet = is_set
Semaphore semaphore locks are also internally based on conditional locks to do the following.
class Semaphore:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock()) # As you can see, a conditional lock is instantiated here
self._value = value
Basic Exercises
Application of conditional locks
Requirement: An empty list with two threads taking turns adding values to it (one adding an even number, one adding an odd number), so that the values in the list are 1 - 100, and are ordered.
import threading
lst = []
def even():
"""Add even numbers""""
with condLock:
for i in range(2, 101, 2):
# Determine if the current list is exhausted at length 2
# If so, add an odd number
# If not, add an even number
if len(lst) % 2 ! = 0:
# Add an even number
lst.append(i) # Add the value first
condLock.notify() # tell the other thread that you can add the odd number, but here you don't immediately hand over execution
condLock.wait() # hand over execution rights and wait for another thread to notify to add an even number
else:
# Add an odd number
condLock.wait() # surrender execution rights and wait for another thread to notify to add an even number
lst.append(i)
condLock.notify()
condLock.notify()
def odd():
"""add odd numbers""""
with condLock:
for i in range(1, 101, 2):
if len(lst) % 2 == 0:
lst.append(i)
condLock.notify()
condLock.wait()
condLock.notify()
if __name__ == "__main__":
condLock = threading.Condition()
addEvenTask = threading.Thread(target=even)
addOddTask = threading.
addEvenTask.start()
addOddTask.start()
addEvenTask.join()
addOddTask.join()
print(lst)
Application of event locks
There are 2 task threads to play Li Bai and Du Fu, how can we make them reply to each other in one sentence? The text is as follows.
Du Fu: Old Li, come drink!
Li Bai: Old Du, I can't drink anymore!
Du Fu: Old Li, one more pot?
Du Fu: ... old Li?
Li Bai: Hoo hoo hoo... fell asleep...
The code is as follows.
import threading
def libai():
event.wait()
print("Li Bai: Lao Du ah, do not drink I can not drink!")
event.set()
event.clear()
event.wait()
print("Li Bai: Hoo hoo hoo... Sleeping...")
def dufu():
print("Dufu: Old Li, come drink!")
event.set()
event.clear()
event.wait()
print("Du Fu: Old Li ah, another pot?")
print("Du Fu: ... Old Li?")
event.set()
if __name__ == '__main__':
event = threading.Event()
t1 = threading.Thread(target=libai)
Thread(target=dufu)
t1.start()
t2.start()
t1.join()
t2.join()