Выполнение распределенного перетасовывания без системы MapReduce
Распределенное перетасовывание – это операция с большим объемом данных, для выполнения которой обычно требуется система, созданная специально для этой цели. В этом посте мы покажем, как распределенное перетасовывание может быть реализовано всего несколькими строками в Python с использованием Ray, фреймворка общего назначения, основной API которого не содержит операций перетасовывания.
Перетасовать небольшой набор данных достаточно просто, эту операцию можно легко выполнить в оперативной памяти. Однако более крупный набор данных порождает новые проблемы, например:
- Несколько узлов для масштабирования объема памяти за пределы одной машины, чтобы обеспечить параллельную обработку на карте, и чтобы сократить фазы.
- Распространение на внешнее хранилище, когда общий набор данных превышает общий объем памяти кластера.
- Обеспечение ограничений памяти, чтобы избежать запуска в ОС дорогостоящей обработки нехватки памяти, такой как переключение на диск или запуск убийцы нехватки памяти (OOM).
В предыдущем посте мы показали, как можно использовать Ray в качестве общей основы для приложений обработки данных. Здесь мы подробно рассмотрим, как Ray выполняет распределенное перетасовывание вне ядра, выраженное в виде задач в Python.
Что такое перетасовывание?
Из-за своего размера распределенный набор данных обычно хранится в нескольких разделах, каждый из которых содержит группу строк. Это также улучшает параллелизм для таких операций, как карта или фильтр. Перетасовывание – это любая операция над набором данных, которая требует перераспределения данных по его разделам. Среди примеров – сортировка и группировка по ключу.
Распространенный метод перетасовки большого набора данных – разделить выполнение на карту и этап сокращения. Затем данные перетасовываются между картой и задачами сокращения. Например, предположим, что мы хотим отсортировать набор данных с 4 разделами. Цель состоит в том, чтобы создать еще один набор данных с 4 разделами, но на этот раз отсортированный по ключу.
Перетасовывание распределенного набора данных с 4 разделами, где каждый раздел представляет собой группу из 4 блоков. Например, в операции сортировки каждый квадрат представляет собой отсортированный подраздел с ключами в отдельном диапазоне. В каждой задаче сокращения затем объединятся-сортируются подразделения одного оттенка.
Этот процесс показан на приведенной выше диаграмме. Первоначально несортированный набор данных группируется по цвету (синий, фиолетовый, зеленый, оранжевый). Цель перетасовывания – перегруппировать блоки по оттенку (от светлого к темному). Эта перегруппировка требует всеобщего взаимодействия: каждая задача карты (цветной кружок) производит один промежуточный результат (квадрат) для каждого оттенка, и эти промежуточные результаты перетасовываются в соответствующую задачу уменьшения (серый кружок).
Такие программы могут быть написаны с помощью Ray и с использованием функций Python. Задача создается путем вызова функции. Каждая задача карты возвращает несколько значений, по одному для каждой задачи сокращения, и каждая задача уменьшения принимает эти выходные данные в качестве своих аргументов. Тогда сортировка с использованием API Ray будет выглядеть примерно так:
Перетасовывание с одним узлом, когда память не является ограничением
Распределенное перетасовывание является – это сложная задача из-за полной зависимости между картой и этапом сокращения. При N разделах это приводит к N² промежуточным выходным данным, которые необходимо перетасовывать между отображением и сокращением задач.
Но когда все данные умещаются в памяти, сделать это достаточно просто. Давайте посмотрим, как это работает на Ray, с уменьшенной версией предыдущего примера.
Мы хотим построить этот график для набора данных 4 ГБ и 2 разделов. Тогда каждый промежуточный вывод (квадраты) будет 1 ГБ:
Давайте попробуем сделать это на машине с 2 процессорами и 8 ГБ ОЗУ:
Ray распределяет задачи карты (синий и фиолетовый) на два рабочих слота. Они выполняются параллельно и создают свои объекты (синие и фиолетовые квадраты) в локальном хранилище объектов Ray.
Затем Ray планирует задачи уменьшения (серые кружки). Ray решает зависимости задач сокращения, планируя задачу сокращения только после того, как все ее аргументы (синие и фиолетовые квадраты, созданные на этапе карты) станут локальными. Каждый рабочий получает копию выходных данных карты из локального хранилища объектов. Поскольку хранилище объектов реализовано с общей памятью, это можно сделать с нулевыми копиями для объектов numpy. Как только задачи сокращения будут завершены, Ray автоматически соберет их аргументы.
Ray управляет параллелизмом и зависимостями задач для этого небольшого перетасовывания, но, конечно, сделать все это довольно просто, потому что все данные умещаются в памяти одной машины. В следующем примере мы займемся полноразмерным перетасовыванием на нескольких узлах.
Перетасовывание нескольких узлов с переносом на внешнее хранилище
Давайте рассмотрим более крупный пример, скажем, 16 ГБ с 4 разделами с промежуточными выходами (квадратами) по 1 ГБ:
И теперь мы хотим выполнить его в распределенном режиме вне ядра, скажем, с кластером из 2 машин, каждая с 1 процессором и 4 ГБ памяти. Мы также добавим внешнюю систему хранения, поскольку мы не можем вместить весь набор данных в памяти:
Поскольку мы можем выполнять только две задачи одновременно, нам придется запланировать каждую фазу на две волны. Первая волна задач с синей и фиолетовой картой выглядит почти так же, как и в случае с одним узлом, поскольку у нас достаточно памяти для хранения их выходных данных:
Но что происходит, когда выполняются две другие задачи карты (зеленая и оранжевая)? У нас недостаточно памяти для хранения этих выходных данных!
В этот момент каждый Ray-узел луча начинает перенаправлять свои текущие объекты (синий и фиолетовый) на внешнее хранилище, чтобы освободить место для новых объектов (зеленого и оранжевого). Нам нужно пролить объекты вместо того, чтобы удалять их, поскольку задачи сокращения все еще зависят от них.
Синие и фиолетовые объекты автоматически переносятся в удаленное хранилище, чтобы освободить место для новых выходных данных задач карты (зеленый и оранжевый).
Конечно, на этапе сокращения мы также должны восстанавливать рассыпанные объекты, чтобы рабочие сокращения могли извлечь их из памяти. Каждый Ray-узел будет определять, какие объекты нужно восстановить, на основе зависимостей запланированных задач, выделенных ниже красным. Чтобы освободить место для аргументов задач уменьшения, каждый узел Ray снова разливает любые локальные объекты.
Восстановление разлитых объектов, которые теперь необходимы для задач уменьшения. Квадраты, обведенные красным, - это объекты, необходимые для двух запланированных задач сокращения (серые кружки). Чтобы освободить место для аргументов задачи, Ray снова разливает, на этот раз разливая зеленые и оранжевые выходные данные карты.
В целом, эта рабочая нагрузка потребовала одного раунда разлива и восстановления промежуточных выходных данных N², созданных задачами карты, не включая память, необходимую для чтения исходного набора данных и записи окончательного результата. Например, с набором данных в 16 ГБ каждый промежуточный вывод будет 1 ГБ, и мы будем записывать и читать 16 ГБ данных во внешнее хранилище и из него во время этапа перетасовывания.
Когда секции меньше, мы можем быть ограничены произвольной емкостью ввода-вывода внешней системы хранения. Слишком большое количество небольших запросов на чтение и запись может привести к тому, что внешнее хранилище станет узким местом. Чтобы избежать этого, Ray объединяет несколько промежуточных выходов в один запрос на запись (например, файл). Чтения сложнее объединить, но мы планируем реализовать оптимизацию в будущем.
Ограничение памяти при предотвращении тупика
В предыдущих двух примерах у нас всегда было достаточно памяти для выполнения стольких задач, сколько есть ядер в распоряжении. В примере с 2 узлами у нас был 1 ЦП и 4 ГБ памяти на каждом узле. Поскольку это была именно та комбинация, которая нам нужна для выполнения одной задачи, планирование было простой задачей.
Однако ресурсы не всегда будут так хорошо сбалансированы. Одной из проблем в рабочих нагрузках, интенсивно использующих память, является достижение параллелизма без нехватки памяти. Мы хотим выполнять задачи параллельно, вплоть до количества ядер на машине, но мы также не хотим выполнять несколько задач параллельно, и так чтобы они превышали объем памяти узла. Это может вызвать в ОС обработку нехватки памяти, например, OOM.
Ray избегает этого, устанавливая ограничение на общую память, используемую хранилищем объектов каждого узла, которое пользователь может легко настроить. Однако это только часть решения. Если мы попытаемся составить расписание, основанное только на доступности ЦП, и проигнорируем требования к памяти, мы легко можем зайти в тупик.
Например, предположим, что мы хотим выполнить предыдущий пример на одной машине с 2 процессорами и 6 ГБ памяти. Если мы планируем задачи на основе доступности ЦП, мы можем вызвать тупиковую ситуацию:
Пример возможной взаимоблокировки с наивным планированием, учитывающим только доступность ЦП. Планировщик пытается запланировать обе задачи уменьшения (серые кружки) одновременно, что приводит к тупиковой ситуации, поскольку для выполнения обеих задач требуется еще один блок памяти.
Задача может быть запущена только тогда, когда все ее зависимости находятся в локальном хранилище объектов. Это означает, что для параллельного выполнения двух задач сокращения нам потребуется 8 ГБ общей памяти для хранения их объединенных аргументов. Но в данном случае у нас всего 6 ГБ, так что мы заходим в тупик. Ни одна из задач не может освободить свои аргументы до завершения, но для каждой из них требуется дополнительный 1 ГБ памяти для начала выполнения.
Чтобы предотвратить такую проблему, планировщик Ray также учитывает требования к памяти для каждой задачи, вычисленные на основе размера ее входных аргументов. Затем система планирует задачи в зависимости от общей доступности ЦП и памяти узла:
Ray предотвращает взаимоблокировку, планируя задачи в зависимости от доступности ЦП и памяти. Ray видит, что памяти достаточно только для выполнения одной задачи за раз, поэтому сначала выбирает аргументы для светло-серой задачи. Как только эта задача завершается, она повторяется для темно-серой задачи и так далее.
Итак, в этом примере Ray увидит, что каждой задаче требуется 4 ГБ общей памяти для хранения своих аргументов. Поскольку у нас только 6 ГБ памяти, Ray будет получать аргументы только для одной из задач сокращения за раз, даже если у нас достаточно процессоров для выполнения двух за раз. После завершения одной задачи сокращения мы можем освободить ее аргументы и повторить для следующей задачи сокращения в очереди.
Заключение
В этом посте мы показали, как Ray обеспечивает управление памятью для приложений с интенсивным использованием данных. Для этого он:
- Обеспечивает высокопроизводительное хранилище объектов с общей памятью.
- Выливает объекты во внешнее хранилище, когда память заполняется.
- Применяет контроль допуска во время планирования задач для принудительного ограничения памяти и предотвращения взаимоблокировок.
В нашем следующем посте об обработке данных мы обсудим некоторые проблемы, связанные с масштабированием этой схемы для сортировки 100 ТБ. А пока, если вы хотите узнать больше, вы можете ознакомиться с Ray whitepaper, присоединиться к нам на Ray Discourse или посмотреть наши выступления на Ray Summit.