抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

题外话

一款进度条库

1
pip install tqdm

多线程与多进程

什么是进程和线程

简单概述进程和线程的关系

  • 一个工厂,至少有一个车间,一个车间至少有一个工人,最终是工人在工作。
  • 一个程序,至少有一个进程,一个车间至少有一个线程,最终是线程在工作。

进程

是计算机资源分配的最小单元,为线程提供资源

线程

是计算机中可以被CPU执行的最小单元,是真正在工作的单位

多线程和多进程

多线程

多线程常见方法

  • start(): 当前线程准备就绪,等待CPU调度,具体执行时间由CPU决定。主线程不会等待子线程执行完毕,直接往下走。
  • join(): 当前线程等待子线程执行完毕,再往下走。
  • setDaemon(boolean): 设置当前线程是否为守护线程。(主线程执行完毕是否等待子线程执行完毕,true为不等待,false为等待)
  • setName(String): 设置当前线程的名称。

自定义多线程

1
2
3
4
5
6
7
8
import time
import threading
class MyThread(threading.Thread):
# 实例化执行的方法
def run(self):
print("执行此线程",self._args)
t= MyThread(args=("我是参数1","我是参数2"))
t.start()

自定义多线程实列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 自定义线程实列
import requests # 导入requests
import threading
url_list = [
{'name':'1.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
{'name':'2.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
{'name':'3.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
]

class DowBili(threading.Thread):
def run(self):
name,url = self._args
r = requests.get(url)
with open(name,'wb') as f:
f.write(r.content)

for i in url_list:
dow = DowBili(target=requests.get,args=(i['name'],i['url']))
dow.start()

线程安全

设置线程锁
  • 在多线程环境下,线程安全是一个重要的问题。如果两个线程同时修改一个变量,可能会出现不可预料的结果。在Python中,通过使用锁机制来解决线程安全问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 手动上锁解锁
import threading
lock_object = threading.RLock()
num = 0
def add(count):
lock_object.acquire() # 申请锁(没有申请到就等待)
global num
for i in range(count):
num += 1
lock_object.release() # 释放锁
def sub(count):
lock_object.acquire() # 申请锁(没有申请到就等待)
global num
for i in range(count):
num -= 1
lock_object.release() # 释放锁
t1 = threading.Thread(target=add, args=(1000000,))
t2 = threading.Thread(target=sub, args=(1000000,))
t1.start()
t2.start()
t1.join()
t2.join()
print(num)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
```python
# 上下文自动进行上锁解锁
import threading
lock_object = threading.RLock()
sum = 0
def add(count):
with lock_object:
global sum
for i in range(count):
sum += 1
print(sum)

for i in range(10):
t = threading.Thread(target=add, args=(1000000,))
t.start()
线程安全类型

是指有一些类型内部已经实现了线程安全的功能,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 带线程锁的类型实列
import threading

data_list = []

lock = threading.RLock()

def append_data(count):
for i in range(count):
data_list.append(i)
print(len(data_list))

for i in range(2):
t = threading.Thread(target=append_data, args=(100,))
t.start()
Lock和RLock的区别
  • Lock: 不可套娃锁,已经被锁的代码块里面不能再被锁。强行加锁会造成死锁(程序卡死)。但是Lock反复上锁解锁效率高。

  • RLock: 可重入锁,已经被锁的代码块里面能再被锁。但是RLock反复上锁解锁效率低。

  • 注:一般碰到调用别人上了锁的代码块或方法,就用RLock。

线程池

python3才提供线程池。线程不是开的越多越好,多了效率会下降,所以要有一个线程池来管理线程。

不等待线程池的任务完成,就直接返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 线程池
import time
from concurrent.futures import ThreadPoolExecutor

def task(name):
print('{} is running...'.format(name))
time.sleep(1)

# 创建线程池,最大线程数为10,如果线程池中的线程数超过了最大线程数,那么新加入的任务将会等待线程池中的线程执行完毕后再执行
pool = ThreadPoolExecutor(10)

for i in range(20):
pool.submit(task, i)

print('all task is done!')

等待线程池的任务完成,才能返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 线程池
import time
from concurrent.futures import ThreadPoolExecutor

def task(name):
print('{} is running...'.format(name))
time.sleep(1)

# 创建线程池,最大线程数为10,如果线程池中的线程数超过了最大线程数,那么新加入的任务将会等待线程池中的线程执行完毕后再执行
pool = ThreadPoolExecutor(10)

for i in range(20):
pool.submit(task, i)
print('wait for all task done...')
pool.shutdown(wait=True)
print('all task is done!')
线程池完成的回调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 线程池执行完毕的回调
import time
from concurrent.futures import ThreadPoolExecutor
def task(name):
print('{} is running...'.format(name))
time.sleep(1)
return name
pool = ThreadPoolExecutor(10)
for i in range(20):

# 线程池提交会返回一个futures对象
future = pool.submit(task, i)
# 使用add_done_callback方法添加回调函数
future.add_done_callback(lambda x: print('{} is done'.format(x.result())))

多进程

多进程常见方法
  • start: 当前进程准备就绪,等待CPU调度,具体执行时间由CPU决定。最终工作的是进程中的线程。
  • join: 等待当前进程的所有线程执行完毕再执行下一步。
  • daemon: 设置当前进程为守护进程。

自定义进程类

1
2
3
4
5
6
7
8
9
10
# 自定义进程
import multiprocessing

class MyProcess(multiprocessing.Process):
def run(self):
print('执行了进程:', self.name)

if __name__ == '__main__':
p = MyProcess(name='test')
p.start()

CPU个数

1
2
3
4
5
# CPU个数
import multiprocessing

cpu_count = multiprocessing.cpu_count()
print(cpu_count)

进程间数据的共享

键值对共享数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from multiprocessing import Process,Value,Array

def fun(v1,v2,v3,a1,a2):
v1.value = 666
v2.value = 'a'.encode('utf-8')
v3.value = "嗨"
a1[0] = 'z'.encode('utf-8')
a2[0] = 123

if __name__ == '__main__':
'''可选的数据类型
"c" -- 字符型 "u: -- 无符号字符型
"b" -- 布尔 "B" -- 无符号布尔
"h" -- 短整型 "H" -- 无符号短整型
"i" -- 整型 "I" -- 无符号整型
"l" -- 长整型 "L" -- 无符号长整型
"f" -- 浮点型 "F" -- 无符号浮点型
'''
v1 = Value('i',0) # Value(类型,初始值)
v2 = Value('c')
v3 = Value('u')
a1 = Array('c',3) # Array(类型,长度:可以是整数或者数组,长度不可更改)
a2 = Array('i',[1,2,3])

p = Process(target=fun,args=(v1,v2,v3,a1,a2))
p.start()
p.join()

print(v1.value)
print(v2.value)
print(v3.value)
print(a1[:])
print(a2[:])
Manager使用dict和list共享数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Process,Manager

def fun(dict,list):
dict[1] = '1'
dict['2'] = 2
dict[0.25] = None
list.append(6666)

if __name__ == '__main__':
with Manager() as manager:
d = manager.dict({1:'a',"2":'b',0.25:'c'})
l = manager.list()

p = Process(target=fun,args=(d,l))
p.start()
p.join()

print(d)
print(l)
基于队列Queue的进程间通信

怎么选择多进程还是多线程

  • 计算密集型,用多进程,类如:大量的数据计算【累加计算实例】
  • IO密集型,用多线程,类如:大量的数据IO,文件读写,网络数据传输【下载视频等,因为它们大多占用网卡,硬盘等资源,不占用CPU】

创建一个最简单的多线程和多进程

多线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time
import requests # 导入requests
import threading
url_list = [
{'name':'1.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
{'name':'2.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
{'name':'3.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
]
start = time.time()
end = 0
def download(name,url):
res = requests.get(url)
with open(name,'wb') as f:
f.write(res.content)
end = time.time()
print("耗时:",end-start)

for i in url_list:
t = threading.Thread(target=download,args=(i['name'],i['url']))
t.start()

多进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time
import requests # 导入requests
import multiprocessing
url_list = [
{'name':'1.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
{'name':'2.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
{'name':'3.mp4','url':'https://api.007666.xyz/bilibili/video/url?id=BV1Rm4y1R7rT'},
]
start = time.time()
end = 0
def download(name,url):
res = requests.get(url)
with open(name,'wb') as f:
f.write(res.content)
end = time.time()
print("耗时:",end-start)
if __name__ == '__main__': # 多进程必须要有这个判断,如果不需要就加上:multiprocessing.set_start_method('fork')
for i in url_list:
t = multiprocessing.Process(target=download,args=(i['name'],i['url']))
t.start()

多进程和多进程的混合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 多进程与多进程混合
import time
import multiprocessing
import threading

# 多线程执行函数
def thread_run(name):
print('%s is running' % name)

# 多进程执行函数
def process_run(name):
# 多线程
t1 = threading.Thread(target=thread_run, args=(name,))
t1.start()

t2 = threading.Thread(target=thread_run, args=(name,))
t2.start()

t3 = threading.Thread(target=thread_run, args=(name,))
t3.start()

if __name__ == '__main__':
# 多进程
p1 = multiprocessing.Process(target=process_run, args=('process1',))
p1.start()

p2 = multiprocessing.Process(target=process_run, args=('process2',))
p2.start()

GIL锁

是什么是GIL锁

GIL(全局解释器锁),是CPthon特有的一个玩意,让一个进程一个时间单位只能有一个被CPU调用。
由于GIL锁的存在,所以在选择多线程和多进程的时候

如果想利用CPU的多核,让CPU同时处理一些任务,那么就必须要使用多进程,而不能使用多线程,即使资源开销大。
如果不利于CPU的多核,那么就可以使用多线程,而不能使用多进程。

评论




站点访问量 Loading… 站点访客数 Loading…