Простой Python — страница 39 из 66

1. Запишите текущие дату и время как строку в текстовый файл today.txt.

2. Прочтите текстовый файл today.txt и разместите данные в строке today_string.

3. Разберите дату из строки today_string.

4. Выведите на экран список файлов текущего каталога.

5. Выведите на экран список файлов родительского каталога.

6. Используйте модуль multiprocessing, чтобы создать три отдельных процесса. Заставьте каждый из них ждать случайное количество секунд (от одной до пяти), вывести текущее время и завершить работу.

7. Создайте объект date, содержащий дату вашего рождения.

8. В какой день недели вы родились?

9. Когда вам будет (или уже было) 10 000 дней от роду?

Глава 11. Конкуренция и сети

Время — такая штука, при помощи которой природа не позволяет всем событиям произойти сразу. Пространства — такая штука, благодаря которой все это не происходит со мной.

Альберт Эйнштейн

До этого момента большинство программ, которые вы писали, запускались в одном месте (на одном компьютере) по одной строке за раз (последовательные). Но мы можем делать больше одного дела одновременно (конкуренция) и в нескольких местах сразу (распределенные вычисления, или работа с сетями). Существует несколько хороших причин бросить вызов пространству и времени.

• Производительность. Ваша цель заключается в том, чтобы более быстрые компоненты были постоянно заняты, а не ждали более медленных.

• Прочность. Один в поле не воин, поэтому вы хотите продублировать задачи, чтобы обойти недостатки аппаратной и программной частей.

• Простота. Хорошим тоном является разбиение сложных задач на много простых, которые проще создать, понять и исправить.

• Коммуникация. Отправлять независимые байты куда-нибудь далеко, чтобы они пришли с друзьями, очень весело.

Мы начнем с рассмотрения конкуренции, основываясь поначалу на несетевых приемах, описанных в главе 10, — процессах и потоках. Далее рассмотрим остальные подходы вроде функций обратного вызова, зеленых потоков и сопрограмм. Наконец, поговорим о работе с сетями, изначально в рамках вопроса о конкуренции, а затем и целиком.


Некоторые пакеты Python, рассмотренные в этой главе, еще не были портированы в Python 3 на момент написания книги. В большинстве случаев я буду показывать вам код, который нужно запускать с помощью интерактивного интерпретатора Python 2, который мы называем python2.

Конкуренция

Официальный сайт Python рассматривает тему конкуренции в общих чертах и с точки зрения стандартной библиотеки (http://bit.ly/concur-lib). Эти страницы содержат множество ссылок на различные пакеты и приемы, мы покажем наиболее полезные из них в этой главе.

Когда речь идет о компьютерах, вам приходится ждать чего-то по одной из двух причин:

• ограничения ввода-вывода. Эта причина распространена шире других. Процессоры компьютеров безумно быстры — в сотни раз быстрее, чем компьютерная память, и в тысячи — чем диски или сети.

• ограничения процессора. Это случается, если выполняется большое количество объемных задач наподобие научных или графических расчетов.

С конкуренцией связаны еще два термина:

• синхронность — одна вещь следует за другой, как на похоронной процессии;

• асинхронность — задачи независимы, как кошки, которые гуляют сами по себе.

По мере продвижения от простых систем и задач к проблемам реальной жизни в какой-то момент вам придется решить проблему конкуренции. Например, рассмотрим сайт. Вы, как правило, можете предоставить статическую и динамическую страницы довольно быстро. Если ожидание длится долю секунды, приложение считается интерактивным, но если время до отображения или взаимодействия более продолжительное, люди становятся нетерпеливыми. Тесты, проведенные компаниями Google и Amazon, показали, что трафик быстро падает, если страница загружается хоть немного медленнее обычного.

Но что, если вы не можете повлиять на то, что долго выполняется, например загрузка файла на сервер, изменение размеров изображения или запрос к базе данных? Вы больше не можете делать это с помощью синхронного кода, поскольку кто-то уже ждет.

Если вы хотите выполнить несколько задач как можно быстрее на одном компьютере, вы можете сделать их независимыми. Медленные задачи не будут блокировать остальные.

В разделе «Программы и процессы» главы 10 показано, как многопроцессорная обработка может быть использована для того, чтобы распараллелить работу на одной машине. Если вам нужно изменить размер изображения, ваш веб-сервер может создать отдельный процесс, посвященный именно этой задаче, и запустить его асинхронно. Можно масштабировать приложение горизонтально, вызвав несколько процессов изменения размера.

Идея заключается в том, чтобы заставить их работать друг с другом. Наличие любого общего элемента управления или состояния означает, что будут возникать узкие места. Обрабатывать ошибки еще сложнее, поскольку конкурентные вычисления труднее, чем обычные. Многое может пойти не так, и ваши шансы на успех меньше обычных.

Какие же методы могут помочь вам справиться с этими сложностями? Начнем с хорошего способа, который помогает справиться с несколькими задачами, — очереди.

Очереди

Очередь похожа на список: элементы добавляются с одного ее конца и выходят с другого. Часто такой принцип называют FIFO (first in, first out — «первым пришел — первым ушел»).

Представьте, что вы моете посуду. Если вы делаете работу целиком, вам нужно вымыть каждую тарелку, высушить ее и отложить в сторону. Вы можете сделать это несколькими способами. Можете вымыть первую тарелку, высушить ее и отложить в сторону, а затем повторить для второй и последующих тарелок. Или же можете сгруппировать операции и сначала помыть всю посуду, затем высушить ее целиком, а затем отложить ее в сторону, при этом подразумевается, что в раковине и сушилке достаточно места, чтобы разместить там всю посуду на каждом шаге. Все эти подходы являются синхронными — один работник выполняет одно действие в любой момент времени.

В качестве альтернативы вы могли бы найти одного-двух помощников. Если вы мойщик, то можете вручать каждую вымытую тарелку сушильщику, который будет вручать каждую высохшую тарелку тому, кто отложит ее в сторону. Если все работают в одном темпе, вы должны закончить работу гораздо быстрее, чем если бы делали ее целиком самостоятельно.

Но что, если вы моете посуду быстрее, чем сушильщик успевает с ней справляться? Либо влажная посуда падает на пол, либо вы будете складывать ее между собой и сушильщиком, либо вы просто что-нибудь насвистываете до тех пор, пока сушильщик не будет готов. А если последний человек медленнее сушильщика, сухая посуда будет либо падать на пол, либо накапливаться или насвистывать начнет уже сушильщик. У вас есть несколько работников, но общая задача все еще синхронна и может выполняться только со скоростью самого медленного работника.

«Берись дружно, не будет грузно» — гласит старая пословица (я всегда думал, что это пословица амишей, поскольку она заставляет меня думать о строительстве сарая). Добавление работников может помочь построить сарай или вымыть посуду быстрее. При этом будут задействованы очереди.

В общем случае очереди переносят сообщения, которые могут содержать любую информацию. В нашем случае мы заинтересованы в создании очереди для распределенного управления задачами, также известной как очередь заданий. Каждая тарелка в раковине выдается доступному мойщику, который моет ее и отдает сушильщику, который сушит ее и отдает человеку, убирающему ее в сторону. Этот процесс может быть синхронным (работники ждут, когда им дадут тарелку, а затем ждут, когда освободится следующий в очереди работник) или асинхронным (посуда поступает от работников с разной скоростью). Если у вас есть достаточно работников и они трудятся в одном темпе, задача будет выполнена гораздо быстрее.

Процессы

Очереди вы можете реализовать множеством способов. Для одного компьютера модуль стандартной библиотеки multiprocessing (с которым вы можете познакомиться в разделе «Программы и процессы» главы 10) содержит функцию Queue. Симулируем процессы одного мойщика посуды и одного сушильщика (кто-то может отложить посуду в сторону позже), а также промежуточную очередь dish_queue. Назовите эту программу dishes.py:

import multiprocessing as mp

def washer(dishes, output):

····for dish in dishes:

········print('Washing', dish, 'dish')

········output.put(dish)

def dryer(input):

····while True:

········dish = input.get()

········print('Drying', dish, 'dish')

········input.task_done()

dish_queue = mp.JoinableQueue()

dryer_proc = mp.Process(target=dryer, args=(dish_queue,))

dryer_proc.daemon = True

dryer_proc.start()

dishes = ['salad', 'bread', 'entree', 'dessert']

washer(dishes, dish_queue)

dish_queue.join()

Запустите новую программу:

$ python dishes.py

Washing salad dish

Washing bread dish

Washing entree dish

Washing dessert dish

Drying salad dish

Drying bread dish

Drying entree dish

Drying dessert dish

Эта очередь похожа на простой итератор, который создает набор тарелок. В действительности здесь создаются отдельные процессы, общающиеся между собой. Я использовал JoinableQueue и последний метод join(), чтобы дать знать мойщику, что вся посуда была высушена. В модуле multiprocessing существуют очереди и других типов, вы можете обратиться к документации, чтобы получить больше примеров.

Потоки

Поток работает внутри процесса, имея доступ ко всему, что находится в процессе, — это похоже на раздвоение личности. Модуль multiprocessing имеет кузена по имени threading, который использует потоки вместо процессов (на самом деле модуль multiprocessing был разработан позже своего собрата, основанного на процессах). Переделаем наш пример с процессами для использования потоков:

import threading

def do_this(what):

····whoami(what)

def whoami(what):

····print("Thread %s says: %s" % (threading.current_thread(), what))

if __name__ == "__main__":

····whoami("I'm the main program")

····for n in range(4):

········p = threading.Thread(target=do_this,

··········args=("I'm function %s" % n,))

········p.start()

Вот что я вижу на своем экране:

Thread <_MainThread(MainThread, started 140735207346960)> says: I'm the main

program

Thread  says: I'm function 0

Thread  says: I'm function 1

Thread  says: I'm function 2

Thread  says: I'm function 3

Мы можем воссоздать пример о посуде, основанный на процессах, с помощью потоков:

import threading, queue

import time

def washer(dishes, dish_queue):

····for dish in dishes:

····print ("Washing", dish)

········time.sleep(5)

········dish_queue.put(dish)

def dryer(dish_queue):

····while True:

········dish = dish_queue.get()

········print ("Drying", dish)

········time.sleep(10)

········dish_queue.task_done()

dish_queue = queue.Queue()

for n in range(2):

····dryer_thread = threading.Thread(target=dryer, args=(dish_queue,))

········dryer_thread.start()

dishes = ['salad', 'bread', 'entree', 'desert']

washer(dishes, dish_queue)

dish_queue.join()

Различие между модулями multiprocessing и threading заключается в том, что модуль threading не имеет функции terminate(). Не существует простого способа завершить запущенный поток, поскольку это может вызвать разнообразные проблемы в коде и, возможно, даже в пространственно-временном континууме.

Потоки могут быть опасны. Как и управление памятью вручную в языках вроде С и С++, они могут вызвать появление ошибок, которые ужасно трудно найти и исправить. Для того чтобы использовать потоки, весь код программы — и код внешних библиотек, которые он использует, — должен быть потокобезопасным. В предыдущем примере кода потоки не работали с глобальными переменными, поэтому они могли работать независимо, ничего не разрушая.

Представьте, что вы исследуете паранормальную активность в доме с привидениями. Привидения скитаются по коридорам, но ни одно из них не знает о другом, и в любой момент любое из них может просмотреть, добавить, удалить или переместить домашнюю обстановку.

Вы настороженно идете по дому, снимая показатели со своих впечатляющих инструментов. И внезапно замечаете, что подсвечник, мимо которого прошли несколько секунд назад, пропал.

Содержимое дома похоже на переменные программы. Привидения — это потоки процесса (дома). Если бы привидения только просматривали содержимое дома, проблемы бы не было — поток просто считает значение константы или переменной, не пытаясь его изменить.

Однако некая невидимая сущность может схватить ваш фонарик, дунуть холодной струей воздуха на вашу шею, рассыпать шарики на ступеньках или заставить вспыхнуть огонь в камине. Особо утонченные привидения изменили бы что-нибудь в другой комнате, чего вы бы даже не заметили.

Несмотря на ваши шикарные инструменты, вам будет очень трудно разобраться в том, кто, как и когда это сделал.

Если бы вы использовали несколько процессов вместо потоков, это было бы похоже на несколько домов, в которых обитает только одно (живое) существо. Если бы вы поставили бутылку бренди перед камином, она все еще была бы на своем месте час спустя. Возможно, немного жидкости испарилось бы, но сама бутылка осталась бы на том же месте.

Потоки могут быть полезны и безопасны, когда речь не идет о глобальных данных. В частности, потоки полезно использовать для экономии времени при ожидании завершения некой операции ввода/вывода. В этих случаях потокам не придется сражаться за данные, поскольку у каждого из них имеется свой набор переменных.

Но потоки иногда могут менять глобальные данные по хорошей причине. Фактически самая распространенная причина использования нескольких потоков — это возможность разделить между ними работу над некоторыми данными, поэтому можно ожидать, что некоторые данные будут изменены.

Классический способ разделить данные безопасно — разместить программную блокировку перед изменением переменной в потоке. Это позволит оградить ее значение от других потоков и внести свои изменения. В примере с домом вы бы просто оставили бригаду охотников за привидениями в той комнате, которая должна остаться свободной от привидений. Вам лишь нужно не забывать разблокировать ее. Блокировки также могут быть вложенными — что, если другая бригада охотников за привидениями также будет наблюдать за этой же комнатой или за всем домом? Использование блокировок является традицией, но печально известно тем, что его трудно организовать правильно.


В Python потоки не ускоряют задачи, связанные с ограничениями процессора, из-за одной детали реализации стандартной системы Python, которая называется Global Interpreter Lock (GIL). Она предназначена для того, чтобы избежать потоковых проблем в интерпретаторе Python, и действительно может замедлить многопоточную программу по сравнению с однопоточной или даже многопроцессорной версией.


Рассмотрим рекомендации для работы с Python.

• Используйте потоки для задач, связанных с ограничениями ввода-вывода.

• Используйте процессы, сетевые вычисления или события (которые мы рассмотрим в следующем разделе) для задач, связанных с ограничениями процессора.

Зеленые потоки и gevent

Как вы уже видели, разработчики стремятся избежать медленных мест в программах, запуская их в отдельных потоках или процессах. Примером такого дизайна является веб-сервер Apache.

Альтернативой этому подходу является программирование, основанное на событиях. Программа, основанная на событиях, запускает центральный цикл обработки событий, раздает задачи и повторяет цикл. Так устроен веб-сервер nginx, он работает быстрее Apache.

Библиотека gevent основана на событиях и позволяет достичь следующего: вы пишете обычный императивный код, и он волшебным образом превращается в сопрограммы. Они похожи на генераторы, которые могут взаимодействовать друг с другом и отслеживать свое текущее состояние. Библиотека gevent модифицирует многие стандартные объекты Python вроде socket для того, чтобы использовать его механизм вместо блокирования. Это не сработает для кода надстроек Python, который написан на С, например для некоторых драйверов баз данных.


На момент написания этой книги библиотека gevent не была полностью портирована на Python 3, поэтому примеры кода используют инструменты Python 2 pip2 и python2.


Вы можете установить библиотеку gevent с помощью версии pip для Python 2:

$ pip2 install gevent

Так выглядит пример кода на сайте библиотеки gevent (http://www.gevent.org/). Вы увидите функцию gethostbyname() класса socket в следующем разделе DNS. Эта функция работает синхронно, поэтому вам придется подождать (возможно, много секунд), пока она не получит имена серверов со всего мира, чтобы найти нужный адрес. Но вы можете использовать версию gevent, чтобы искать несколько сайтов независимо друг от друга. Сохраните этот файл как gevent_test.py:

import gevent

from gevent import socket

hosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com',

····'www.antique-taxidermy.com']

jobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts]

gevent.joinall(jobs, timeout=5)

for job in jobs:

····print(job.value)

В этом примере вы можете увидеть однострочный цикл for. Каждое имя хоста по очереди передается в вызов gethostbyname(), но они могут быть запущены асинхронно благодаря версии функции gethostbyname() библиотеки gevent.

Запустите файл gevent_test.py с помощью Python 2, введя следующее:

$ python2 gevent_test.py

66.6.44.4

74.125.142.121

78.136.12.50

Функция gevent.spawn() создает зеленый поток (его также иногда называют микропотоком) для выполнения каждого вызова gevent.socket.gethostbyname(url).

Разница между ним и обычным потоком заключается в том, что зеленый поток не блокируется. Если произошло какое-то событие, которое заблокировало бы обычный поток, gevent переключит управление на другой зеленый поток.

Метод gevent.joinall() ожидает завершения всех созданных задач. Наконец, мы выводим на экран IP-адреса, полученные для заданных имен хостов.

Вместо класса socket модуля gevent вы можете использовать его функции для monkey-patching (обезьяний патч). Они модифицируют стандартные модули вроде socket так, чтобы они использовали зеленые потоки вместо того, чтобы каждый раз вызывать версию модуля gevent. Это полезно, если вы хотите использовать gevent везде, даже в коде, к которому вы можете не иметь доступа.

Добавьте в начало программы следующий вызов:

from gevent import monkey

monkey.patch_socket()

Это заменит все обычные сокеты на сокеты gevent даже в стандартной библиотеке. Но это работает только для кода Python, но не для библиотек, написанных на С.

Еще одна функция выполняет такой патчинг для еще большего количества модулей стандартной библиотеки:

from gevent import monkey

monkey.patch_all()

Разместите этот код в начале программы, чтобы максимально воспользоваться ускорением, обеспечиваемым gevent.

Сохраните программу под именем gevent_monkey.py:

import gevent

from gevent import monkey; monkey.patch_all()

import socket

hosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com',

····'www.antique-taxidermy.com']

jobs = [gevent.spawn(socket.gethostbyname, host) for host in hosts]

gevent.joinall(jobs, timeout=5)

for job in jobs:

····print(job.value)

Запустите программу с помощью Python 2:

$ python2 gevent_monkey.py

66.6.44.4

74.125.192.121

78.136.12.50

Использование gevent может нести потенциальную опасность. Как и в случае с любой другой системой, основанной на событиях, каждый исполняемый вами фрагмент кода должен быть относительно быстрым. Несмотря на то что код, который выполняет много работы, не блокируется, он будет работать медленно.

Сама идея monkey-patching заставляет нервничать некоторых людей. Несмотря на это, многие крупные сайты вроде Pinterest используют gevent для значительного ускорения своей работы. Используйте gevent строго по назначению, как таблетки по рецепту.


Существуют два других популярных фреймворка, основанных на событиях, — tornado (http://www.tornadoweb.org/) и gunicorn (http://gunicorn.org/). Они помогают обрабатывать события на низком уровне, а также предоставляют быстрый веб-сервер. Их стоит рассмотреть, если вы хотите создать быстрый сайт, не применяя традиционные веб-серверы вроде Apache.

twisted

twisted (http://twistedmatrix.com/trac/) — это асинхронный фреймворк для работы с сетями, управляемый событиями. Вы подключаете функции к событиям вроде получения данных или закрытия соединения, и эти функции вызываются, когда событие случается. Эти функции называются функциями обратного вызова, и если вы уже писали код на языке JavaScript, он может показаться вам знакомым. Если же он для вас в новинку, то может показаться вывернутым наизнанку. Некоторым разработчикам может оказаться труднее поддерживать код, основанный на функциях обратного вызова, по мере роста приложения.

Как и gevent, twisted еще не был портирован на Python 3. В этом разделе мы будем использовать установщик и интерактивный интерпретатор Python 2. Чтобы установить фреймворк, введите следующую команду:

$ pip2 install twisted

twisted — это крупный пакет, который поддерживает множество интернет-протоколов на базе TCP и UDP. Для краткости мы рассмотрим небольшой сервер и клиент, созданные на базе примеров для twisted (http://bit.ly/twisted-ex). Сначала рассмотрим сервер, knock_server.py (обратите внимание на синтаксис Python 2 для функции print()):

from twisted.internet import protocol, reactor

class Knock(protocol.Protocol):

····def dataReceived(self, data):

········print 'Client:', data

········if data.startswith("Knock knock"):

············response = "Who's there?"

········else:

············response = data + " who?"

········print 'Server:', response

········self.transport.write(response)

class KnockFactory(protocol.Factory):

····def buildProtocol(self, addr):

········return Knock()

reactor.listenTCP(8000, KnockFactory())

reactor.run()

Теперь взглянем на его верного компаньона, knock_client.py:

from twisted.internet import reactor, protocol

class KnockClient(protocol.Protocol):

····def connectionMade(self):

········self.transport.write("Knock knock")

····def dataReceived(self, data):

········if data.startswith("Who's there?"):

············response = "Disappearing client"

············self.transport.write(response)

········else:

············self.transport.loseConnection()

············reactor.stop()

class KnockFactory(protocol.ClientFactory):

····protocol = KnockClient

def main():

····f = KnockFactory()

····reactor.connectTCP("localhost", 8000, f)

····reactor.run()

if __name__ == '__main__':

····main()

Сначала запустим сервер:

$ python2 knock_server.py

Потом — клиент:

$ python2 knock_client.py

Сервер и клиент обмениваются сообщениями, и сервер выводит весь диалог:

Client: Knock knock

Server: Who's there?

Client: Disappearing client

Server: Disappearing client who?

Наш клиент-шутник завершает работу, оставляя сервер ждать ударной реплики.

Если вы хотите забраться в дебри, попробуйте запустить другие примеры из его документации.

asyncio

Недавно Гвидо ван Россум (помните его?) начал заниматься проблемой конкуренции в Python. У многих пакетов был собственный цикл событий, и каждый из них хотел быть единственным. Как могут примириться механизмы вроде функций обратного вызова, зеленых потоков и др.? После продолжительных дискуссий он предложил решение: модуль asyncio (от Asynchronous IO Support Rebooted — асинхронная поддержка ввода-вывода, http://bit.ly/pep-3156) под кодовым именем Tulip. Он появился в Python 3.4 под именем asyncio. Теперь же он предлагает цикл событий, который совместим с twisted, gevent и другими асинхронными методами. Цель его создания заключается в том, чтобы предоставить стандартный, чистый, отлаженный асинхронный API. Вы сможете наблюдать за тем, как он расширяется, в будущих релизах Python.

Redis

Приведенные раньше примеры кода о мытье посуды, где использовались процессы или потоки, запускались на одной машине. Рассмотрим еще один подход к реализации очередей, которые могут запускаться на одной машине или во всей сети. Даже несмотря на наличие нескольких процессов и/или потоков, иногда одной машины недостаточно. Вы можете считать этот раздел мостиком между конкуренцией на одной машине и конкуренцией на нескольких машинах.

Чтобы запустить примеры из этого раздела, вам нужен сервер Redis и его модуль для Python. Чтобы узнать, как их скачать, обратитесь к разделу «Redis» главы 8. В этой главе Redis используется как база данных. Здесь же мы рассмотрим ту его грань, которая работает с конкуренцией.

Создать очередь можно с помощью списка Redis. Сервер Redis работает на одной машине, на которой могут быть запущены и клиенты. Возможно также, что никакие клиенты на ней не запускаются, а остальные машины получают доступ к серверу по сети. В любом случае клиент общается с сервером с помощью протокола TCP. Один или несколько клиентов-провайдеров помещают сообщения в конец списка. Один или несколько клиентов-работников наблюдают за списком и используют операцию «блокирующее выталкивание». Если список пуст, то все они просто проводят время впустую. Как только появляется сообщение, его получает первый желающий работник.

Как и в предыдущем примере, основанном на процессах и потоках, код файла redis_washer.py генерирует последовательность посуды:

import redis

conn = redis.Redis()

print('Washer is starting')

dishes = ['salad', 'bread', 'entree', 'dessert']

for dish in dishes:

····msg = dish.encode('utf-8')

····conn.rpush('dishes', msg)

····print('Washed', num)

conn.rpush('dishes', 'quit')

print('Washer is done')

Цикл генерирует четыре сообщения, содержащие названия тарелок, за которыми следуют финальные сообщения, которые содержат слово quit. Каждое сообщение добавляется в список тарелок на сервере Redis по принципу, сходному с принципами Python.

Как только первая тарелка готова, в работу вступает код файла redis_dryer.py:

import redis

conn = redis.Redis()

print('Dryer is starting')

while True:

····msg = conn.blpop('dishes')

····if not msg:

········break

····val = msg[1].decode('utf-8')

····if val == 'quit':

········break

····print('Dried', val)

print('Dishes are dried')

Этот код ожидает прихода сообщений, чьим первым токеном является слово dishes, и выводит сообщение каждой высушенной тарелки. Он подчиняется сообщению quit, завершая цикл.

Запустите сначала сушильщика, а затем мойщика. С помощью символа & в конце команды мы запускаем первую программу в фоновом режиме, она продолжает работать, но больше не принимает команды с клавиатуры. Это работает для операционных систем Linux, OS X и Windows, однако вы можете получить разные результаты в следующей строке. В нашем случае (OS X) этим результатом является некоторая информация о фоновом процессе сушильщика. Далее мы запускаем процесс мойщика как обычно (на переднем плане). Вы увидите смешанную выходную информацию двух процессов:

$ python redis_dryer.py &

[2] 81691

Dryer is starting

$ python redis_washer.py

Washer is starting

Washed salad

Dried salad

Washed bread

Dried bread

Washed entree

Dried entree

Washed dessert

Washer is done

Dried dessert

Dishes are dried

[2]+··Done····················python redis_dryer.py

Как только идентификаторы посуды начинают приходить от мойщика, наш трудолюбивый процесс сушильщика начинает их обрабатывать. Каждый идентификатор посуды, за исключением финального контрольного значения, является числом из строки quit. Когда процесс сушильщика считает этот идентификатор quit, он завершает работу, после чего в терминал выводится еще немного информации о фоновом процессе (также зависит от системы). Вы можете использовать контрольное значение (некорректное значение), чтобы указать на что-то особенное в потоке данных — в нашем случае мы говорим, что закончили работу. В противном случае нам придется добавлять больше программной логики наподобие следующей.

• Заранее оговорить некоторое максимальное количество посуды, что также будет похоже на контрольное значение.

• Выполнять некоторую специфическую коммуникацию вне потока данных между процессами.

• Завершать работу по прошествии какого-то времени, если данных не поступало.

Внесем еще несколько изменений.

• Создадим несколько процессов-сушильщиков.

• Заставим их завершаться по прошествии некоторого времени, если данных не приходило.

Новый файл redis_dryer2.py:

def dryer():

····import redis

····import os

····import time

····conn = redis.Redis()

····pid = os.getpid()

····timeout = 20

····print('Dryer process %s is starting' % pid)

····while True:

········msg = conn.blpop('dishes', timeout)

········if not msg:

············break

········val = msg[1].decode('utf-8')

········if val == 'quit':

············break

········print('%s: dried %s' % (pid, val))

········time.sleep(0.1)

····print('Dryer process %s is done' % pid)

import multiprocessing

DRYERS=3

for num in range(DRYERS):

····p = multiprocessing.Process(target=dryer)

····p.start()

Запустим процессы сушильщиков в фоновом режиме и процесс мойщика на переднем плане:

$ python redis_dryer2.py &

Dryer process 44447 is starting

Dryer process 44448 is starting

Dryer process 44446 is starting

$ python redis_washer.py

Washer is starting

Washed salad

44447: dried salad

Washed bread

44448: dried bread

Washed entree

44446: dried entree

Washed dessert

Washer is done

44447: dried dessert

Один процесс сушильщика считывает идентификатор quit и завершает работу:

Dryer process 44448 is done

После 20 секунд другие процессы-сушильщики получают значение None от вызова blpop, что указывает на то, что они завершились по таймеру. Они выводят свои последние сообщения и завершаются:

Dryer process 44447 is done

Dryer process 44446 is done

После того как последний подпроцесс-сушильщик завершается, заканчивается и основная программа-сушильщик:

[1]+··Done····················python redis_dryer2.py

Помимо очередей

С увеличением числа работающих элементов повышается вероятность того, что что-то может помешать работе нашего конвейера. Если нам нужно помыть посуду после банкета, хватит ли нам работников? Что, если сушильщики напьются до чертиков? Что, если забьется раковина? Ох уж эти проблемы!

Как же справиться с такими проблемами? К счастью, вам доступны некоторые приемы.

• Запустить и забыть. Просто передавайте обработанные объекты дальше и не заботьтесь о последствиях, даже если рядом никого нет. Этот подход похож на сбрасывание посуды на пол.

• Запрос — ответ. Мойщик получает подтверждение от сушильщика, а сушильщик — от того, кто откладывает посуду в сторону. Все это выполняется для каждой тарелки.

• Регулирование нагрузки. Этот прием указывает самому быстрому работнику притормозить, если один из работников, стоящих после него, не поспевает за ним.

В реальных системах вам нужно внимательно следить за тем, чтобы все работники успевали за предложением, в противном случае вы услышите звук бьющейся посуды. Вы можете добавить новые задачи в список ожидания, а какой-нибудь процесс будет доставать из этого списка последнее сообщение и помещать в список обработки. Когда сообщение будет обработано, оно будет удалено из списка обработки и добавлено в список завершенных задач. Это позволит вам узнать, какие задачи были провалены или занимают слишком много времени. Вы можете сделать это самостоятельно с помощью Redis или использовать систему, которую кто-то написал и протестировал до вас. Некоторые основанные на Python пакеты для работы с очередями (часть из них используют Redis) позволяют удобно управлять процессом.

• Celery. На этот пакет стоит обратить внимание. Он может выполнять распределенные задачи как синхронно, так и асинхронно, используя рассмотренные нами методы: multiprocessing, gevent и др.

• thoonk. Этот пакет создан на базе Redis, он позволяет использовать очереди задач и механизм публикации-подписки (этот механизм будет рассмотрен в следующем разделе).

• rq. Это библиотека Python для очередей задач, она также основана на Redis.

• Queues. Этот сайт предлагает поучаствовать в дискуссии о программном обеспечении для создания очередей, как написанном на Python, так и ином.

Сети