Switch lang: Русский \ English

"Cloud Parallel Virtual machine - CPVM" Project


Структура кластера

Кластер состоит из "узлов кластера", соединенных в некоторую топологию. В пределе это топология "все со всеми", где все узлы соединены попарно друг с другом.

node some topology

Например такую, это не принципиально. Пользователь может построить люую требуемую ему топологию, соединив узлы, хоть ромашкой.

Задачи от узла к узлу будут "перетекать" с учетом этой топологии.

Каждый узел состоит из нескольких компонентов (не все показаны на картинке):

cpvm node structure

  • Aeron MediaDriver управляющий блоками в shared memory
  • CPVM node
  • TVM - Transation Value Manager. "In memory" хранилище с иерархическими подтранзакциями, которое определяет "конфликты доступа". Нужно для выявления независимых потов управления. Находится рядом с CPVM для увеличения locality данных, хотя в целом это независимый слой
  • IO_service - слой stateless сервисов, вспомогательная система для TVM, которая выполняет IO операции.
  • FragSeq (не изображен). Cервис FragmentSequence - обрабатывает отчеты TVM о обнаруженных зависимостях и оптимизирует базу данных ЯПФ/LPF (ярусно параллельной формы). Иначе, управляет информацией о параллельных ярусах
  • A_Storage. основная подсистема IO , основное хранилище данных и метаинформации системы. Выполняемые Функции в кластере :
    • хранилище кода, фрагментов (CB - Code Base).
    • хранилище матаинформации о стратегиях исполнения фрагментов (PTS)
    • хранилище логов выполнения
    • хранилище данных пользователей, если они хотят ACID. (ACID)
    • обеспечение ACID при работе с данными , изоляция отдельных транзакций
    • содержит структуры данных системы FragSeq - ЯПФ/LPF
    • хранение биллинг информации PaaS системы.
    • информация фронтенда , требуемая LK (в личном кабинете)
  • consul node, как элемент directory service и coordination system

CPVM node HTTP API

На текущий момент АПИ очень сырое, т.к. имеет более низкий приоритет по сравнению с остальным

Method Path Parameters Description Result
POST /connectToNode host, port Соединение текущей ноды с нодой по указанному адресу
POST /disconnect nodeId
POST /terminate Принудительное завершение текущей ноды. Равносильно посылу SIGINT процессу ноды
POST /runFragment parameters, broadcastDepth Запуск фрагмента. parameters - json десериализующийся в это long-poll! Результат (json) приходит только после завершения транзакции.
POST /stop trId, dam Принудительная остановка фрагмента, без оповещения о завершении
POST /stopFragmentsWithDamPrefix trId, damPrefix Принудительная остановка фрагментов с dam-префиксом, без оповещения о завершении
POST /restartFragment trId, dam Лучше пока не трогать
GET /getExecutionStatus trId, dam Инфа о статусе выполнения транзакции или подтранзакции. json. isRunning, равный true, не означает, что фрагмент с указанными tr/dam исполняется, возможно он еще не запустился или уже остановлен, но рантайм ждет завершения подфрагментов, запущенных асинхронно
GET /version Инфа о версии рантайма

Balancing

Кластер получает задачу выполнить некоторую задачу (фрагмент). в процессе исполнения фрагмент может одновременно запускать множество других фрагментов. Они требуют много ресурсов и нода отдаёт их другим узлам.

Какие это фрагменты, на каких языах, как и в чем исполняются это детали, их пока опустим.

Текущая реализация (10.2016)

Часть первая. Оперативное рапределение задач.

  1. Ноде дается задание выполнить фрагмент
  2. Если нода имеет свободные ресурсы, то фрагмент запускется локально.
  3. Если текущих ресурсов недостаточно для выполнения фрагмента, то выполняется перенаправление задачи на дргуие ноды. Если задачу перенаправить некуда, она складывается в локальную очередь. Важные моменты:
    • Определние требуемых ресурсов не учитывает предпологаемую нагрузку запускаемых фрагментов. С точки зрения рантайма они все равны.
    • Задачи перенапрваляются на ноды с наименьшей (с точки зрения текущей ноды) нагрузкой.

Часть вторая. Фоновое перераспредление задач по принципу work-stealing.

  1. Каждая нода рассылает своим соседям информацию о своей нагрузке (heartbeat). Интервал отправки heartbeat меняется в зависимости от нагрузки ноды, отправляющей этот heartbeat. Чем больше нагрука тем больше интервал.
  2. Если ноде приходит heartbeat, она сравнивает свою нагрузку с нагрузкой ноды, которая прислала heartbeat.
  3. Если нагрузка удаленной ноды меньше, то часть задач из локальной очереди перенаправляется на нее.
    • Количество перебрасываемых задач определяется по разнице суммы показателей:
      • количетсво запущенных задач
      • количество задач в очереди

Анализ

  • Нагрузка будет распределяться при любых топологиях
  • Нагрузка может рапределятся очень медленно в неполносвязных топологиях (зависит от того, на какую ноду ушла корневая задача).

Небольшие доработки

  • Учет усредненной нагрузки CPU

Крупные доработки

  • Переброс исполняющихся задач.
  • Учет статистических показателей нагрузки каждого фрагмента. Для доверенных фрагментов это время исполнения. Для фрагментов-процессов это время исполнения + память + нагрузка CPU. Для легковесных фрагментов-процессов - также как и для доверенных - только время исполнения. Проблема тут одна - входные данные все решают. FPM в помощь

Algorithm

Алгоритм запусков и перезапусков (08.2016)

Группы вызовов

Группа вызовов - синхронно исполняемая группа вызвов подфрагментов, внутри которой порядок исполнения определяется рантаймом.

Синхронность означает, что фрагмент, который запросил исполнение группы подфрагментов, не может продолжать исполнение, пока runtime не отчитается об успешном исполнении этой группы.

Объединяя вызовы подфрагментов в группы, фрагмент высказывает желание выполнить эту группу в одном ярусе. Реальные ярусы определяет runtime. Объединение фрагментов в группы позволяет устанавливать внутри фрагменов "точки отката". Иначе говоря, появляется возможность не перезапускать весь фрагмент целиком в случае конфликтов между подфрагментами, а перезапускать лишь ограниченный набор фрагментов внутри группы. Группы вызовов уменьшают потенциальный параллелизм, явно разделяя все множество асинхронных вызовов подфрагментов на ярусы.

Программист должен стараться делать так, чтобы группы удовлетворяли следующим условиям:

  • группа вызовов должна иметь большой потенциальный параллелизм
  • группы вызовов должны быть часто или обязательно конфликутющими, чтобы оправдать явное разделения на ярусы.

Фрагмент может вызвать другой фрагмент двумя спобобами:

  • call - синхронный вызвов. Равносилен запуску группы (яруса) из одного фрагмента.
  • call_async - асинхронный вызов. Реальный вызов подфрагмента может не произойти в момент вызова call_async(). На самом деле последовательные вызвовы call_async() формируют группу вызовов.

Группа вызвов считается завершенной и "сбрасывается" (отправляется рантайму на исполнение) если исполнение фрагмента достигает точки "сброса группы". Этой точкой может быть:

  • Явный вызвов исполнения группы: execute_async_group().
  • Любая операция tvm. Группа исполняется ДО выполнения операции.
  • Синхронный вызвов фрагмента: call(). Группа исполняется ДО вызова call.
  • Конец исполнения фрагмента.

Из этих правил следует вывод, что некторые фрагменты не могут исполнится параллельно, даже если они полностью независимы:

    call_async('f1')
    tvm_write('vid', 12345)
    call_async('f2')

Фрагменты f1, f2 никогда не будут исполнятся параллельно. Эту проблему можно решить отключением точек сброса перед любой tvm-операцией, но тогда будет существовать вероятность конфликта с родителем. Вполне вероятно, подобное поведение следует сделать опциональным для каждого фрагмента в отдельности.

ситуация конфликта с родителем детектируется. В этом случае перед перезапуском текущей ветки подтранзакции в метаинформацию первого async_call записывается что он на самом деле должен быть синхронный.

Фрагменты внутри группы необязательно начнут исполнение именно в момент вызова execute_async_group():

  • Runtime может принять решение исполнять некоторые фрагменты группы, до того как сама группа окончательно сформируется (до момента вызова execute_async_group). Это может происходить в случае, если часть фраментов уже содержащихся в незаконченной группе образуют законченный ярус (предположительно). Логика определения законченного яруса по неполной группе не реализована, но теоритически возможна (планы).
  • Runtime может отложить исполнение группы фрагментов.

Жизненный цикл фрагмента

Завершенный фрагмент - это фрагмент, который runtime исключает из всех индексов исполняющихся фрагментов и рапортует инициатору исполнения о статусе завершения. Реальное исполнение фрагмента (исполнение кода) может продолжаться бесконечно (в случае исполнения внутри потоков). Фрагмент считается успешно завершенным только если успешно завершены все его подфрагменты, втч те которые исполняются асинхронно. Фрагмент завершается со статусом "CONFLICT" если произошел конфликт, который фрагмент не может разрулить самостоятельно. Это происходит только в случае конфликта одного из подфрагментов (втч глубоко вложенных) с фрагментом, исполняющимся вне ветки текущего фрагмента. В этом случае общая часть DAM конфликтующих фрагментов не будет совпадать с DAM текущего фрагмента.

TODO ссылку на определение термина DAM

Реальная остановка фрагмента (исполнение кода) может произойти если фрагмент вызвает любое системное API. В противном случае остановка может не случится вообще, если фрагмент завис и исполняется в потоке, а не в отдельном процессе.

При локальном запуске фрагмента (не путать с перезапуском!) нода делает следующее:

  • Создает задачу запуска фрагмента, по завершению которой, нода отчитывается инициатору запуска о результате выполнения фрагмента. Задача ассоциируется с ключом исполнения, который представляет собой пару (transactionId, dam). Ключ исполнения однозначно идентифицирует запущенный экземпляр фрагмента в пределах всего кластера.
  • Создает легковесный поток для исполнения фрагмента, передает ему параметры и привязывает его к только что созданной задаче.
  • Завершение исполнения лекговесного потока фрагмента не озанчает что фрагмент завершился. Завершение фрагмента - это завершение задачи. Завершение исполнения потока означает, что наступает этап обработки результата исполнения экземпляра фрагмента. Если в процессе выполнения не возникло ошибок и конфликтов - задача завершается со статусом ОК.

Перезапуск фрагментов

Перезапуск инициируется только в случае когда один из подфрагментов завершился со статусом CONFLICT и текущий фрагент может разрулить конфликт самостоятельно (см. п. "Жизненный цикл фрагмента").

При конфликте в группе:

  • Фрагмент делает tvm rollback для ветки фрагментов исполняющихся в текущей группе. Далее нода делает запрос на перезапуск группы фрагментов. При перезапуске группа фрагментов сохраняет натуральный порядок исполнения.

Возможная ситауция конфликта с родителем:

  • перезапускается root
  • фрагменты отчитавшиеся о выполнении OK перезапускаются заново

"Схема запуска"

Алгоритм определения фрагмента для перезапуска при возникновении конфликта в одном из подфрагментов:
  • Если общая часть конфликтующих DAM больше DAM текущего фрагмента, то:
    • Если начало одного из конфликтующих DAM полностью совпадает с DAM конфликтующего фрагмента, а второй DAM меньше DAM конфликтующего фрагмента и длиннее DAM
  • Если общая часть конфликтующих DAM меньше DAM текущего фрагмента, то текущий фрагмент завершается со статусом CONFLICT, тем самым прокидывая конфликт наверх.
Алгоритм перезапуска фрагмента
  1. Если ключ запуска не зарегестрирован в индексах текущих локальных задач, нода прокидывает просьбу о перезапуске той ноде, из которой пришел запрос на запуск текущего фрагмента, далее переход к шагу 1. В противном случае идем далее.
  2. Фрагменту посылается сигнал остановки.
  3. Экземпляр запущенного фрагмента отвязывается от текущей задачи.
  4. Запускается новый экземпляр фрагмента с новым DAM и привязывется к задаче.

Формирование оперативных ярусов

Внутри ноды существует некая структура, описывающая оперативные ярусы, ожидающие заполнения и последующего исполнения. Запросы на исполнения фрагментов из других фрагментов поступают в ноду в виде групп. Группа формирует строгий максимально возмножный широкий ярус, определенный только для ЯПФ запрашивающего фрагмента. Этот ярус может разделится на несколько ярусов и "слится" с общими для нескольких фрагментов опертивным ярусами.

Порядок запуска определяется с помощью:

  • правил перезапуска
  • FragSeq LPF - информация о допустимости выполнения в одном ярусе (ЯПФ)

Каждая нода имеет в наличии один call batch - список планируемых вызвов подфрагментов. Для исполнения фрагментов по ярусам, необходимо и достаточно синхронизировать исполнения пар фрагментов, которые нельзя исполнять параллельно.

Правила перезапуска необходимы, т.к. на первых этапах работы системы отсутствует информация о конфликтах и FragSeq будет выдавать неопределенный результат.

Синхронизация выполнения фрагментов

Синхронизируемые запуски - запуск пары фрагментов, необязательно на одной машине, между которыми установлено правило перезапуска (отношение до-после). Синхронизация существенно осложняется если синхронизируемые запуски фрагментов не находятся в локальном call batch. Для синхронизации фрагментов необходимо чтобы они не исоплнялись параллельно (взимоисключающая блокировка).

С первого взгляда кажется, что нужны ... расределенные блокировки, что делать?

Оказывается это не нужно.

Достаточно делать так, чтобы решения о запуске непараллельных фрагментов принимал один узел. И чтобы любой юзел в любом месте топлогии в любой момент мог понять кто это будет. И узел этот - родитель, о котором знают все узлы производящие запуск каких-то подфрагентов, а он есть у всех. Под родителем подразумевается нода, на которой исполняется фрагмент, который породил ветки непараллельными фрагментами.

Типы фрагментов

Терминология

  • Исполнитель (IFragmentExecutor) - некий програмный компонент, способный исполнять определенные типы фрагментов
  • Фрагмент (IFragment) - кусок кода (в широком смысле). Не важно бинарщина или сорцы.
  • Скомпилированый фрагмент (CompiledFragment) - фрагмент, прошедший некую подготовительную стадию обработки и готовый для исполнения. IFragment и CompiledFragment могут быть одним и тем же объектом (это означает, что компиляция для данного типа фрагмента не имеет смысла)

Классификация фрагментов

  • LuaBase - сорцы без привязки к конкретной версии lua, либо сорцы с неизвестной привязкой
  • LuaC - сорцы привязанные к стнадартному LuaC
  • LuaJIT - сорцы привязанные к LuaJIT
  • LuaJ - сорцы привязанные к LuaJ

Классификация скомпилированых фрагментов

По местнахождению ресурса:
  • InJVM - могут хранится в JVM heap и напрямую передаваться исполнинтелю (IFragmentExecutor)
  • NoJVM - могут существовать только в виде внешнего файла (в широком смысле)
По привязке к исполнителю:
  • Привязан к исполнителю. Имеет уникальные для исполнителя и фрагмента ресурсы, которые должны предоставляться эксклюзивно для них.
  • Не привязан к исполнителю.

Классификация исполнителей

По способу исполнения:
  • Blocking - могут выполняться прямо в контексте Jvm, одним блокирующим вызовом, возвращающим результат исполнения
  • Async - выполняются аснихронно:
    • В отдельном потоке
    • В отдельном процессе
Характеристики производительности (требует додумывания):
  • Предпологаемая категория Latecny.

    • Два вараинта:
      • Константа. Представляется в виде некой числовой метрики или алгебраическим типом с отношением порядка. Нет необходимости в точных предсказаниях абсолютных значений latecny, необходимо лишь определить различие между latecny, для оптимального выбора.
      • Функция. Апроксимирует latency, основываясь на неких свойствах фрагмента и внутренем состоянии исполнителя.

    Очевидно второй вариант универсальней, но сложней в реализации.

  • Предпологаемый Throughput. Те же варанты, что и у latency.

Примеры характеристик производительности:

Latency запуска скомпилированого фрагмента по возрастанию (для Lua):

  • интерпретация в контексте JVM (LuaJ, disabled jit)
  • jit-based исполнение в контексте JVM (LuaJ, enabled jit)
  • интерпретация вне контеска JVM (LuaC)
  • jit-based исполнение вне контекста JVM (LuaJit)

Throughput, по убыванию:

  • LuaJit
  • (LuaJ, enabled jit)
  • (LuaJ, disabled jit) / LuaC. Требует замеров (предположительно, разница не существенна)

Fragment executor types

Термины
  • TaskExecutor - Класс, реализующий функциональонсть параллельного исполнения задач.
  • AjRpc CT - Версия AjRpc без сериализации, предназанчена для работы между потоками внутри одного Java-приложения.
Взаимодействие с узлом

Узел предоставляет базовый API для взаимодействия с ним, описанный в интерфейсе IRuntimeAPI (часть его вынесена в http api). При взаимодействии через AjRpc можно использовать этот интерфейс напрямую. Если использование AjRpc невозможно, необходимо писать отдельные модули взаимодейсвия. На стороне узла эти модули должны использовать IRuntimeAPI (через AjRpc CT).

Основные типы исполнителей:
InJVM (bytecode). для доверенных фрагментов
  • Интерпретаторы (с jit и без) которые могут исполняться полностью внутри JVM.
  • Могут исполнять только доверенные фрагменты.
  • Общение с MainLoop через AjRpc CT.
  • Скомпилированые версии кешируются внутри процесса JVM.
  • Для параллельного исполнения используется TaskExecutor.
  • Поддерживаемые интерпретаторы:
    • LuaJ
    • Jython
    • JRuby
InJVM (native).
  • Нативные встраиваемые интерпретаторы, запускаемые внутри процесса JVM.
  • Могут исполнять только доверенные фрагменты.
  • Реализуется в виде нативных динамических библиотек, работа с которыми ведется через JNA.
  • Для общения с MainLoop используется специальная JNA-обертка вокруг IRuntimeAPI:
    IRuntimeAPI runtimeApi;

    interface ExampleFunction extends Callback{ void invoke() { runtimeApi.exampleFunction(); } }
  • Скомпилированые версии кешируются внутри нативной библиотеки, т.к. передача их в java heap означает создание полной копии (ограничение JNA).
  • Для параллельного исполнения используется TaskExecutor.
  • Поддерживаемые интерпретаторы:
    • LuaC
    • LuaJit
    • CPython
Managed process.
  • Нативные встраиваемые интерпретаторы, запускаемые внутри специальных внешних процессов.
  • Могут исполнять недоверенные фрагменты.
  • Главное преимущество по сравнению с Free process: процессы исполнители - долгоживущие (можно строить пулы).
  • Транспорт JVM process <-> Executor process под вопросом:
    • Pipes - минимальные задержки, но не понятно, насколько удобно работать на стороне Java.
    • TCP - удобно: можно использовать jeromq и AjRpc на стороне JVM, а настороне процесса простой AjRpc-совместимый rawCall сериализатор.
    • Aeron client через shared lib встроенной в процесс. Реализовано для python
  • Скомпилированые версии могут кешироваться внутри процесса-исполнителя (втч с выгрузкой на диск).
  • Поддерживаемые интерпретаторы:
    • LuaC
    • LuaJit
    • CPython

Общая принятая модель: код фрагмента передается CPVM из CB(Code Base) процессу соответсвуещего типа. Т.е. процесс исполняет фрагмент за фрагментом, код "путешествует к исполнителю".

Free process.
  • Любой исполняемый файл - интерпретатор. Каждый запуск фрагмента - запуск процесса интерпретатора.
  • Кеш (не)скомпилированых фрагментов управляется на стороне JVM и хранится исключительно на диске.
  • Могут исполнять недоверенные фрагменты.
  • Фантастическая простота управления, очень большие задержки на запуск.
  • Взаимодействие с JVM как в Managed process, но модуль взаимодействия на стороне процесса реализуется в языковом рантайме.
  • Поддерживаемые интерпретаторы - любые
Execution Context

Данные, уникальные для каждого запуска фрагмента. API на стороне исполнителя использует эти данные чтобы лишний раз не дергать медленный IRuntimeAPI.

  • Fragment id (string)
  • Fragment version (int)
  • Transaction (byte[] dams, string trId)
  • TVM parameters (string[])
  • TVM results (string[])

API Фрагментов (голый , без сахаро-врапперов)

API фрагментов

Синхронный запуск фрагмента. Исполнение текущего фрагмента блокируется до завершения исполнения вызываемого подфрагмента.

    call(fragmentId, contextTVMs, resultTVMs)

Асинхронный запуск другого фрагмента. Может вызывать конфликты, в результате чего вся транзакция завершается. Перезапуски пока не работают.

    call_async(fragmentId, contextTVMs, resultTVMs)

TVM чтение/запись работает для int, bool и str, float:

    tvm_get(valueId)
    tvm_set(valueId, value)

Для TVM-таблиц (ключи также могут быть int, bool и str, float):

    tvm_table_get(valueId, key)
    tvm_table_set(valueId, key, value)

Также в Tvm можно сохранить дескриптор замыкания. (todo описать его конструирование)

Получение длины таблицы:

    tvm_length(valueId)

TODO описание таблиц в TVM и видов из длинны (array + hash)

Создание ячейки со значением nil без указания valueId (он будет сгенерирован):

    local valueId = tvm_new()

Ожидание завершения всех запущенных через call_async подфрагментов в текущем фрагменте:

    await_all()

Cброс батча:

    flush_calls()

создание IO-ячейки:

    new_io("<json с параметрами IO-ресурса", url)

возвращает идентификатор ячейки TVM привязанной к IO рекурсу, например ключу A_Storage или Cloud FS ресурсу типа файл.

Таким образом IO становится не сайд эффектом, а входит во внутреннюю (Control Flow correction)и внешнюю (изоляция IO) часть транзакции.

TODO описать логику обработки зависимостей по IO ресурсам и способ обеспечения консистентности при параллельной работе программы.

IO commit:

    commit()

у каждого фрагмента при старте в глобалы будет проброшено 2 списка со строками:

  • _P - параметры запуска
  • _R - результаты (изначально предпологалось что здесь будут идентификаторы TVM ячеек, в которые фрагмент будет записывать свои результаты)

Еще для тестов ассерты, error_message необязателен:

    assert(condition, error_message)

Планы развития

  • создание PaaS на основе этой технологии, автоматизация, безопасность, пользовательский интерфейс
  • мы начали эксперимены использования результатов оптимизации CPVM на "голом железе" проект HMO (Hardware Multicore Optimizer)
  • идут работы по совершенствованию A_Storage, например распределенные неблокирующие контейнеры, IPC в масштабах grid,
  • добрая половина работ по созданию PTS (Programm Transformation System) проёдена. Мы решили пока писать фрагменты вручную, чтобы уточнить требования по трансформации.
  • планируются оптимизации софта в смысле работы с NUMA.
  • мы продолжаем тестировать сетевые и ipc транспорты, совершенствовать свой ajrpc
  • построение для PTS сервисной части, чтобы он мог выполнять оптимизации стратегий исполнения прямо в процессе работы кода.
  • поддержка новых языков для runtime
  • интеграция с проектом наших друзей http://runparallel.net/

Комментарии

Comments powered by Disqus
Перейти к главному содержимому