# 手动上锁解锁 import threading lock_object = threading.RLock() num = 0 defadd(count): lock_object.acquire() # 申请锁(没有申请到就等待) global num for i inrange(count): num += 1 lock_object.release() # 释放锁 defsub(count): lock_object.acquire() # 申请锁(没有申请到就等待) global num for i inrange(count): num -= 1 lock_object.release() # 释放锁 t1 = threading.Thread(target=add, args=(1000000,)) t2 = threading.Thread(target=sub, args=(1000000,)) t1.start() t2.start() t1.join() t2.join() print(num)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
```python # 上下文自动进行上锁解锁 import threading lock_object = threading.RLock() sum = 0 defadd(count): with lock_object: globalsum for i inrange(count): sum += 1 print(sum)
for i inrange(10): t = threading.Thread(target=add, args=(1000000,)) t.start()
线程安全类型
是指有一些类型内部已经实现了线程安全的功能,例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# 带线程锁的类型实列 import threading
data_list = []
lock = threading.RLock()
defappend_data(count): for i inrange(count): data_list.append(i) print(len(data_list))
for i inrange(2): t = threading.Thread(target=append_data, args=(100,)) t.start()
# 线程池 import time from concurrent.futures import ThreadPoolExecutor
deftask(name): print('{} is running...'.format(name)) time.sleep(1)
# 创建线程池,最大线程数为10,如果线程池中的线程数超过了最大线程数,那么新加入的任务将会等待线程池中的线程执行完毕后再执行 pool = ThreadPoolExecutor(10)
for i inrange(20): pool.submit(task, i)
print('all task is done!')
等待线程池的任务完成,才能返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# 线程池 import time from concurrent.futures import ThreadPoolExecutor
deftask(name): print('{} is running...'.format(name)) time.sleep(1)
# 创建线程池,最大线程数为10,如果线程池中的线程数超过了最大线程数,那么新加入的任务将会等待线程池中的线程执行完毕后再执行 pool = ThreadPoolExecutor(10)
for i inrange(20): pool.submit(task, i) print('wait for all task done...') pool.shutdown(wait=True) print('all task is done!')
线程池完成的回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# 线程池执行完毕的回调 import time from concurrent.futures import ThreadPoolExecutor deftask(name): print('{} is running...'.format(name)) time.sleep(1) return name pool = ThreadPoolExecutor(10) for i inrange(20):
if __name__ == '__main__': with Manager() as manager: d = manager.dict({1:'a',"2":'b',0.25:'c'}) l = manager.list() p = Process(target=fun,args=(d,l)) p.start() p.join()
import time import requests # 导入requests import multiprocessing url_list = [ {'name':'1.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'}, {'name':'2.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'}, {'name':'3.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'}, ] start = time.time() end = 0 defdownload(name,url): res = requests.get(url) withopen(name,'wb') as f: f.write(res.content) end = time.time() print("耗时:",end-start) if __name__ == '__main__': # 多进程必须要有这个判断,如果不需要就加上:multiprocessing.set_start_method('fork') for i in url_list: t = multiprocessing.Process(target=download,args=(i['name'],i['url'])) t.start()