Thứ ba, 23/06/2020 | 00:00 GMT+7

Cách sử dụng ThreadPoolExecutor trong Python 3

Các chuỗi Python là một dạng song song cho phép chương trình của bạn chạy nhiều thủ tục cùng một lúc. Tính song song trong Python cũng có thể đạt được bằng cách sử dụng nhiều quy trình, nhưng các stream đặc biệt phù hợp để tăng tốc các ứng dụng liên quan đến lượng I / O (đầu vào / kết quả ) đáng kể.

Ví dụ về hoạt động liên kết I / O bao gồm thực hiện các yêu cầu web và đọc dữ liệu từ các file . Ngược lại với các hoạt động ràng buộc I / O, các hoạt động ràng buộc CPU (như thực hiện phép toán với thư viện chuẩn Python) sẽ không được hưởng lợi nhiều từ các stream Python.

Python 3 bao gồm tiện ích ThreadPoolExecutor để thực thi mã trong một stream .

Trong hướng dẫn này, ta sẽ sử dụng ThreadPoolExecutor để thực hiện các yêu cầu mạng một cách nhanh chóng. Ta sẽ xác định một hàm rất phù hợp để gọi trong các stream , sử dụng ThreadPoolExecutor để thực thi chức năng đó và xử lý kết quả từ các lần thực thi đó.

Đối với hướng dẫn này, ta sẽ thực hiện các yêu cầu mạng để kiểm tra sự tồn tại của các trang Wikipedia .

Lưu ý: Thực tế là các hoạt động liên kết I / O được hưởng lợi nhiều hơn từ các stream so với các hoạt động liên kết CPU là do một đặc điểm riêng trong Python được gọi là khóa thông dịch toàn cục . Nếu muốn, bạn có thể tìm hiểu thêm về khóa thông dịch global của Python trong tài liệu Python chính thức .

Yêu cầu

Để tận dụng tối đa hướng dẫn này, bạn nên làm quen với lập trình bằng Python và môi trường lập trình Python local với requests được cài đặt.

Bạn có thể xem lại các hướng dẫn này để biết thông tin cơ bản cần thiết:

  • pip install --user requests==2.23.0

Bước 1 - Xác định một hàm để thực thi trong chuỗi

Hãy bắt đầu bằng cách xác định một hàm mà ta muốn thực thi với sự trợ giúp của các stream .

Sử dụng nano hoặc môi trường phát triển / soạn thảo văn bản bạn muốn , bạn có thể mở file này:

  • nano wiki_page_function.py

Đối với hướng dẫn này, ta sẽ viết một hàm xác định xem trang Wikipedia có tồn tại hay không:

wiki_page_ Chức năng.py
import requests  def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status 

Các get_wiki_page_existence chức năng chấp nhận hai đối số: một URL đến một trang Wikipedia ( wiki_page_url ), và một timeout số giây để chờ đợi một phản ứng từ URL đó.

get_wiki_page_existence sử dụng gói requests để thực hiện yêu cầu web tới URL đó. Tùy thuộc vào mã trạng thái của response HTTP, một chuỗi được trả về mô tả trang có tồn tại hay không. Các mã trạng thái khác nhau thể hiện các kết quả khác nhau của một yêu cầu HTTP. Quy trình này giả định mã trạng thái 200 "thành công" nghĩa là trang Wikipedia tồn tại và mã trạng thái 404 "không tìm thấy" nghĩa là trang Wikipedia không tồn tại.

Như được mô tả trong phần Yêu cầu , bạn cần cài đặt gói requests để chạy chức năng này.

Hãy thử chạy hàm bằng cách thêm url và lệnh gọi hàm sau hàm get_wiki_page_existence :

wiki_page_ Chức năng.py
. . . url = "https://en.wikipedia.org/wiki/Ocean" print(get_wiki_page_existence(wiki_page_url=url)) 

Khi bạn đã thêm mã, hãy lưu file .

Nếu ta chạy mã này:

  • python wiki_page_function.py

Ta sẽ thấy kết quả như sau:

Output
https://en.wikipedia.org/wiki/Ocean - exists

Việc gọi hàm get_wiki_page_existence với một trang Wikipedia hợp lệ sẽ trả về một chuỗi xác nhận trên thực tế, trang đó tồn tại.

Cảnh báo: Nói chung, không an toàn khi chia sẻ các đối tượng hoặc trạng thái Python giữa các stream mà không cần chú ý đặc biệt để tránh các lỗi đồng thời. Khi xác định một hàm để thực thi trong một stream , cách tốt nhất là xác định một hàm thực hiện một công việc duy nhất và không chia sẻ hoặc xuất bản trạng thái cho các stream khác. get_wiki_page_existence là một ví dụ về một hàm như vậy.

Bước 2 - Sử dụng ThreadPoolExecutor để thực thi một chức năng trong chủ đề

Bây giờ ta có một hàm rất phù hợp để gọi với các stream , ta có thể sử dụng ThreadPoolExecutor để thực hiện nhanh nhiều lệnh gọi của hàm đó.

Hãy thêm đoạn mã được đánh dấu sau vào chương trình của bạn trong wiki_page_function.py :

wiki_page_ Chức năng.py
import requests import concurrent.futures  def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status  wiki_page_urls = [     "https://en.wikipedia.org/wiki/Ocean",     "https://en.wikipedia.org/wiki/Island",     "https://en.wikipedia.org/wiki/this_page_does_not_exist",     "https://en.wikipedia.org/wiki/Shark", ] with concurrent.futures.ThreadPoolExecutor() as executor:     futures = []     for url in wiki_page_urls:         futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))     for future in concurrent.futures.as_completed(futures):         print(future.result()) 

Ta hãy xem cách mã này hoạt động:

  • concurrent.futures được nhập để cung cấp cho ta quyền truy cập vào ThreadPoolExecutor .
  • Câu lệnh with được sử dụng để tạo một trình executor cá thể ThreadPoolExecutor sẽ nhanh chóng dọn dẹp các stream sau khi hoàn thành.
  • Bốn công việc được submitted cho người executor : một công việc cho mỗi URL trong danh sách wiki_page_urls .
  • Mỗi cuộc gọi để submit trả về một version Future được lưu trữ trong danh sách futures .
  • Hàm as_completed đợi mỗi get_wiki_page_existence gọi get_wiki_page_existence Future hoàn tất để ta có thể in kết quả của nó.

Nếu ta chạy lại chương trình này, với lệnh sau:

  • python wiki_page_function.py

Ta sẽ thấy kết quả như sau:

Output
https://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists

Kết quả này có ý nghĩa: 3 trong số các URL là trang Wikipedia hợp lệ và một trong số chúng this_page_does_not_exist thì không. Lưu ý kết quả của bạn có thể được đặt hàng khác với kết quả này. Hàm concurrent.futures.as_completed trong ví dụ này trả về kết quả ngay khi chúng có sẵn, dù công việc được gửi theo thứ tự nào.

Bước 3 - Xử lý ngoại lệ từ các hàm chạy trong chuỗi

Trong bước trước, get_wiki_page_existence đã trả về thành công một giá trị cho tất cả các lệnh gọi của ta . Trong bước này, ta sẽ thấy rằng ThreadPoolExecutor cũng có thể nêu ra các ngoại lệ được tạo trong các lệnh gọi hàm stream .

Hãy xem xét khối mã ví dụ sau:

wiki_page_ Chức năng.py
import requests import concurrent.futures   def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status   wiki_page_urls = [     "https://en.wikipedia.org/wiki/Ocean",     "https://en.wikipedia.org/wiki/Island",     "https://en.wikipedia.org/wiki/this_page_does_not_exist",     "https://en.wikipedia.org/wiki/Shark", ] with concurrent.futures.ThreadPoolExecutor() as executor:     futures = []     for url in wiki_page_urls:         futures.append(             executor.submit(                 get_wiki_page_existence, wiki_page_url=url, timeout=0.00001             )         )     for future in concurrent.futures.as_completed(futures):         try:             print(future.result())         except requests.ConnectTimeout:             print("ConnectTimeout.") 

Khối mã này gần giống với khối mà ta đã sử dụng ở Bước 2, nhưng nó có hai điểm khác biệt chính:

  • Bây giờ ta vượt qua timeout=0.00001 để get_wiki_page_existence . Vì gói requests sẽ không thể hoàn thành yêu cầu web của nó tới Wikipedia trong 0.00001 giây, nên nó sẽ đưa ra một ngoại lệ ConnectTimeout .
  • Ta bắt các ngoại lệ ConnectTimeout được nêu ra bởi future.result() và in ra một chuỗi mỗi lần ta làm như vậy.

Nếu ta chạy lại chương trình, ta sẽ thấy kết quả sau:

Output
ConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.

Bốn thông báo ConnectTimeout được in — một thông báo cho mỗi wiki_page_urls của ta , vì không ai trong số chúng có thể hoàn thành trong 0.00001 giây và mỗi lệnh trong bốn get_wiki_page_existence gọi get_wiki_page_existence đã nêu ra ngoại lệ ConnectTimeout .

Đến đây bạn đã thấy rằng nếu một lệnh gọi hàm được gửi đến ThreadPoolExecutor tạo ra một ngoại lệ, thì ngoại lệ đó có thể được nâng lên bình thường bằng cách gọi Future.result . Gọi Future.result trên tất cả các lệnh gọi đã gửi của bạn đảm bảo chương trình của bạn sẽ không bỏ lỡ bất kỳ trường hợp ngoại lệ nào được nêu ra từ hàm stream của bạn.

Bước 4 - So sánh thời gian thực thi có và không có stream

Bây giờ hãy xác minh việc sử dụng ThreadPoolExecutor thực sự làm cho chương trình của bạn nhanh hơn.

Đầu tiên, hãy dành thời gian get_wiki_page_existence nếu ta chạy nó mà không có chuỗi:

wiki_page_ Chức năng.py
import time import requests import concurrent.futures   def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status  wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]  print("Running without threads:") without_threads_start = time.time() for url in wiki_page_urls:     print(get_wiki_page_existence(wiki_page_url=url)) print("Without threads time:", time.time() - without_threads_start) 

Trong ví dụ mã, ta gọi hàm get_wiki_page_existence với năm mươi URL trang Wikipedia khác nhau từng cái một. Ta sử dụng hàm time.time() để in ra số giây cần thiết để chạy chương trình của ta .

Nếu ta chạy lại mã này như trước, ta sẽ thấy kết quả như sau:

Output
Running without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182

Mục 2–47 trong kết quả này đã được bỏ qua cho ngắn gọn.

Số giây được in sau Without threads time sẽ khác khi bạn chạy nó trên máy của bạn — không sao cả, bạn chỉ nhận được một số cơ sở để so sánh với giải pháp sử dụng ThreadPoolExecutor . Trong trường hợp này, nó là ~5.803 giây.

Hãy chạy cùng năm mươi URL Wikipedia thông qua get_wiki_page_existence , nhưng lần này sử dụng ThreadPoolExecutor :

wiki_page_ Chức năng.py
import time import requests import concurrent.futures   def get_wiki_page_existence(wiki_page_url, timeout=10):     response = requests.get(url=wiki_page_url, timeout=timeout)      page_status = "unknown"     if response.status_code == 200:         page_status = "exists"     elif response.status_code == 404:         page_status = "does not exist"      return wiki_page_url + " - " + page_status wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]  print("Running threaded:") threaded_start = time.time() with concurrent.futures.ThreadPoolExecutor() as executor:     futures = []     for url in wiki_page_urls:         futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))     for future in concurrent.futures.as_completed(futures):         print(future.result()) print("Threaded time:", time.time() - threaded_start) 

Mã này giống như mã ta đã tạo ở Bước 2, chỉ với việc bổ sung một số câu lệnh in cho ta biết số giây cần thiết để thực thi mã của ta .

Nếu ta chạy lại chương trình, ta sẽ thấy như sau:

Output
Running threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543

, số giây được in sau Threaded time sẽ khác trên máy tính của bạn (cũng như thứ tự kết quả của bạn).

Đến đây bạn có thể so sánh thời gian thực hiện để tìm nạp 50 URL trang Wikipedia có và không có chuỗi.

Trên máy được sử dụng trong hướng dẫn này, không có stream mất ~5.803 giây và có stream mất ~1.220 giây. Chương trình của ta chạy nhanh hơn đáng kể với các chủ đề.

Kết luận

Trong hướng dẫn này, bạn đã học cách sử dụng trình ThreadPoolExecutor trong Python 3 để chạy mã bị ràng buộc I / O một cách hiệu quả. Bạn đã tạo một hàm rất phù hợp để gọi trong các stream , học cách truy xuất cả kết quả và ngoại lệ từ các lần thực thi theo stream của hàm đó và quan sát hiệu suất tăng lên khi sử dụng các stream .

Từ đây, bạn có thể tìm hiểu thêm về các hàm đồng thời khác được cung cấp bởi mô-đun concurrent.futures .


Tags:

Các tin liên quan

Cách sử dụng module sqlite3 trong Python 3
2020-06-02
Cách thiết lập notebook Jupyter với Python 3 trên Ubuntu 20.04 và Kết nối qua Đường hầm SSH
2020-05-19
Cách cài đặt Phân phối Python Anaconda trên Ubuntu 20.04 [Khởi động nhanh]
2020-05-19
Cách cài đặt Phân phối Python Anaconda trên Ubuntu 20.04
2020-05-06
Cách cài đặt Python 3 và thiết lập môi trường lập trình trên server Ubuntu 20.04
2020-04-24
Cách cài đặt Python 3 và thiết lập môi trường lập trình trên server Ubuntu 18.04
2020-04-24
Cách cài đặt Python 3 và thiết lập môi trường lập trình trên Ubuntu 20.04 [Quickstart]
2020-04-24
Cách cài đặt Python 3 và thiết lập môi trường lập trình trên Ubuntu 18.04 [Quickstart]
2020-04-24
Cách cài đặt Python 3 và thiết lập môi trường lập trình trên CentOS 8
2020-04-10
Cách bắt đầu với Python trong Visual Studio Code
2020-04-09