题外话
一款进度条库
pip install tqdm
多线程与多进程
什么是进程和线程
简单概述进程和线程的关系
- 一个工厂,至少有一个车间,一个车间至少有一个工人,最终是工人在工作。
- 一个程序,至少有一个进程,一个车间至少有一个线程,最终是线程在工作。
进程
是计算机资源分配的最小单元,为线程提供资源线程
是计算机中可以被CPU执行的最小单元,是真正在工作的单位多线程和多进程
多线程
多线程常见方法
- start(): 当前线程准备就绪,等待CPU调度,具体执行时间由CPU决定。主线程不会等待子线程执行完毕,直接往下走。
- join(): 当前线程等待子线程执行完毕,再往下走。
- setDaemon(boolean): 设置当前线程是否为守护线程。(主线程执行完毕是否等待子线程执行完毕,true为不等待,false为等待)
- setName(String): 设置当前线程的名称。
自定义多线程
自定义多线程实列import time import threading class MyThread(threading.Thread): # 实例化执行的方法 def run(self): print("执行此线程",self._args) t= MyThread(args=("我是参数1","我是参数2")) t.start()
```python自定义线程实列
import requests # 导入requests
import threading
url_list = [
{‘name’:’1.mp4’,’url’:’https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
{‘name’:’2.mp4’,’url’:’https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
{‘name’:’3.mp4’,’url’:’https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
]
class DowBili(threading.Thread):
def run(self):
name,url = self._args
r = requests.get(url)
with open(name,’wb’) as f:
f.write(r.content)
for i in url_list:
dow = DowBili(target=requests.get,args=(i[‘name’],i[‘url’]))
dow.start()
#### 线程安全
##### 设置线程锁
* 在多线程环境下,线程安全是一个重要的问题。如果两个线程同时修改一个变量,可能会出现不可预料的结果。在Python中,通过使用锁机制来解决线程安全问题。
```python
# 手动上锁解锁
import threading
lock_object = threading.RLock()
num = 0
def add(count):
lock_object.acquire() # 申请锁(没有申请到就等待)
global num
for i in range(count):
num += 1
lock_object.release() # 释放锁
def sub(count):
lock_object.acquire() # 申请锁(没有申请到就等待)
global num
for i in range(count):
num -= 1
lock_object.release() # 释放锁
t1 = threading.Thread(target=add, args=(1000000,))
t2 = threading.Thread(target=sub, args=(1000000,))
t1.start()
t2.start()
t1.join()
t2.join()
print(num)
```python
# 上下文自动进行上锁解锁
import threading
lock_object = threading.RLock()
sum = 0
def add(count):
with lock_object:
global sum
for i in range(count):
sum += 1
print(sum)
for i in range(10):
t = threading.Thread(target=add, args=(1000000,))
t.start()
线程安全类型
是指有一些类型内部已经实现了线程安全的功能,例如:
# 带线程锁的类型实列
import threading
data_list = []
lock = threading.RLock()
def append_data(count):
for i in range(count):
data_list.append(i)
print(len(data_list))
for i in range(2):
t = threading.Thread(target=append_data, args=(100,))
t.start()
Lock和RLock的区别
- Lock: 不可套娃锁,已经被锁的代码块里面不能再被锁。强行加锁会造成死锁(程序卡死)。但是Lock反复上锁解锁效率高。
RLock: 可重入锁,已经被锁的代码块里面能再被锁。但是RLock反复上锁解锁效率低。
注:一般碰到调用别人上了锁的代码块或方法,就用RLock。
线程池
python3才提供线程池。线程不是开的越多越好,多了效率会下降,所以要有一个线程池来管理线程。
不等待线程池的任务完成,就直接返回。
# 线程池
import time
from concurrent.futures import ThreadPoolExecutor
def task(name):
print('{} is running...'.format(name))
time.sleep(1)
# 创建线程池,最大线程数为10,如果线程池中的线程数超过了最大线程数,那么新加入的任务将会等待线程池中的线程执行完毕后再执行
pool = ThreadPoolExecutor(10)
for i in range(20):
pool.submit(task, i)
print('all task is done!')
等待线程池的任务完成,才能返回。
# 线程池
import time
from concurrent.futures import ThreadPoolExecutor
def task(name):
print('{} is running...'.format(name))
time.sleep(1)
# 创建线程池,最大线程数为10,如果线程池中的线程数超过了最大线程数,那么新加入的任务将会等待线程池中的线程执行完毕后再执行
pool = ThreadPoolExecutor(10)
for i in range(20):
pool.submit(task, i)
print('wait for all task done...')
pool.shutdown(wait=True)
print('all task is done!')
线程池完成的回调
# 线程池执行完毕的回调
import time
from concurrent.futures import ThreadPoolExecutor
def task(name):
print('{} is running...'.format(name))
time.sleep(1)
return name
pool = ThreadPoolExecutor(10)
for i in range(20):
# 线程池提交会返回一个futures对象
future = pool.submit(task, i)
# 使用add_done_callback方法添加回调函数
future.add_done_callback(lambda x: print('{} is done'.format(x.result())))
多进程
多进程常见方法
- start: 当前进程准备就绪,等待CPU调度,具体执行时间由CPU决定。最终工作的是进程中的线程。
- join: 等待当前进程的所有线程执行完毕再执行下一步。
- daemon: 设置当前进程为守护进程。
自定义进程类
# 自定义进程
import multiprocessing
class MyProcess(multiprocessing.Process):
def run(self):
print('执行了进程:', self.name)
if __name__ == '__main__':
p = MyProcess(name='test')
p.start()
CPU个数
# CPU个数
import multiprocessing
cpu_count = multiprocessing.cpu_count()
print(cpu_count)
进程间数据的共享
键值对共享数据
from multiprocessing import Process,Value,Array
def fun(v1,v2,v3,a1,a2):
v1.value = 666
v2.value = 'a'.encode('utf-8')
v3.value = "嗨"
a1[0] = 'z'.encode('utf-8')
a2[0] = 123
if __name__ == '__main__':
'''可选的数据类型
"c" -- 字符型 "u: -- 无符号字符型
"b" -- 布尔 "B" -- 无符号布尔
"h" -- 短整型 "H" -- 无符号短整型
"i" -- 整型 "I" -- 无符号整型
"l" -- 长整型 "L" -- 无符号长整型
"f" -- 浮点型 "F" -- 无符号浮点型
'''
v1 = Value('i',0) # Value(类型,初始值)
v2 = Value('c')
v3 = Value('u')
a1 = Array('c',3) # Array(类型,长度:可以是整数或者数组,长度不可更改)
a2 = Array('i',[1,2,3])
p = Process(target=fun,args=(v1,v2,v3,a1,a2))
p.start()
p.join()
print(v1.value)
print(v2.value)
print(v3.value)
print(a1[:])
print(a2[:])
Manager使用dict和list共享数据
from multiprocessing import Process,Manager
def fun(dict,list):
dict[1] = '1'
dict['2'] = 2
dict[0.25] = None
list.append(6666)
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict({1:'a',"2":'b',0.25:'c'})
l = manager.list()
p = Process(target=fun,args=(d,l))
p.start()
p.join()
print(d)
print(l)
基于队列Queue的进程间通信
怎么选择多进程还是多线程
- 计算密集型,用多进程,类如:大量的数据计算【累加计算实例】
- IO密集型,用多线程,类如:大量的数据IO,文件读写,网络数据传输【下载视频等,因为它们大多占用网卡,硬盘等资源,不占用CPU】
创建一个最简单的多线程和多进程
多线程
```python
import time
import requests # 导入requests
import threading
url_list = [
{‘name’:’1.mp4’,’url’:’https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
{‘name’:’2.mp4’,’url’:’https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
{‘name’:’3.mp4’,’url’:’https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
]
start = time.time()
end = 0
def download(name,url):
res = requests.get(url)
with open(name,’wb’) as f:
end = time.time()f.write(res.content)
print(“耗时:”,end-start)
for i in url_list:
t = threading.Thread(target=download,args=(i[‘name’],i[‘url’]))
t.start()
### 多进程
```python
import time
import requests # 导入requests
import multiprocessing
url_list = [
{'name':'1.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
{'name':'2.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
{'name':'3.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
]
start = time.time()
end = 0
def download(name,url):
res = requests.get(url)
with open(name,'wb') as f:
f.write(res.content)
end = time.time()
print("耗时:",end-start)
if __name__ == '__main__': # 多进程必须要有这个判断,如果不需要就加上:multiprocessing.set_start_method('fork')
for i in url_list:
t = multiprocessing.Process(target=download,args=(i['name'],i['url']))
t.start()
多进程和多进程的混合
# 多进程与多进程混合
import time
import multiprocessing
import threading
# 多线程执行函数
def thread_run(name):
print('%s is running' % name)
# 多进程执行函数
def process_run(name):
# 多线程
t1 = threading.Thread(target=thread_run, args=(name,))
t1.start()
t2 = threading.Thread(target=thread_run, args=(name,))
t2.start()
t3 = threading.Thread(target=thread_run, args=(name,))
t3.start()
if __name__ == '__main__':
# 多进程
p1 = multiprocessing.Process(target=process_run, args=('process1',))
p1.start()
p2 = multiprocessing.Process(target=process_run, args=('process2',))
p2.start()
GIL锁
是什么是GIL锁
GIL(全局解释器锁),是CPthon特有的一个玩意,让一个进程一个时间单位只能有一个被CPU调用。
由于GIL锁的存在,所以在选择多线程和多进程的时候
如果想利用CPU的多核,让CPU同时处理一些任务,那么就必须要使用多进程,而不能使用多线程,即使资源开销大。
如果不利于CPU的多核,那么就可以使用多线程,而不能使用多进程。