Описание архитектуры AcapellaDB v3

Здесь описано как должно быть. Пока идёт разработка многие части будут упрощены.

Description of Previous Version2

Далее - описание V3, которая доступна и используется в нескольких проектах. 2018.06

Scheme Manager

БД состоит из кейспейсов, которые состоят из таблиц, в которых находятся записи, записи состоят из колонок. Каждая запись должна иметь основной ключ, который может состоять из одной или нескольких колонок. По этому ключу записи распределяются по нодам в отсортированом порядке. БД автоматически распределяет данные по нодам, не нужно указывать как именно шардировать данные.

Схема базы тоже хранится в таблицах в системном кейспейсе, поэтому перед каждым запросом к данным происходит запрос к схеме, обычно из кэша. Для обновления кэша используется счётчик изменений: все ноды периодически запрашивают его значение, и если оно увеличилось, то выкачивают новую схему. Из этого следует, что схема может быть немного устаревшей на некоторых нодах, но только до момента следующей синхронизации.

Для моментального удаления и создания кейспейсов, таблиц и др., они адресуются не по имени, а по UUID, который генерируется заново при каждом создании. Старые данные не удаляются, а только становяться недоступными. Удаление происходит при сборке мусора.

Ordered Key-Value Storage

Весь кластер представлен на этом уровне как непрерывный упорядоченный key-value storage с поддержкой транзакций. Ключи состоят из отдельных частей, сортировка происходит в натуральном порядке сначала по первой части, потом по второй, и т. д. Части ключей и значения - бинарные строки. Внутри key-value разбит на диапазоны размером по 64 МБ, ограниченные разделительными ключами, каждый диапазон идентифицируется UUID'ом.

В начале работы кластера существует всего один диапазон который хранит все ключи от начала и до бесконечности. Когда размер этого диапазона превышает 128 МБ, он делится на два диапазона по 64 МБ каждый. Когда размер двух соседних диапазонов уменьшается обратно до 64 МБ в сумме, то они объединяются в один.

Информация о диапазонах распространяется по кластеру с помощью gossip-протокола. Все ноды сообщяют друг другу, сколько диапазонов на них находится, далее планировщик на каждой из них пытается отдать свои диапазоны другим, чтобы выравнять количество на каждой ноде. Пока происходит это перераспределение, записи проксируются в старое и новое расположение диапазона. Поскольку информация о диапазонах распространяется по кластеру не мгновенно, какая-то нода может обратится к старому расположению диапазона, тогда запрос перенаправится на новое расположение, поскольку нода ранее содержащая диапазон всегда знает куда его перенаправила.

Планировщик старается располагать соседние (по ключам) диапазоны на одной ноде, чтобы ускорять запросы на несколько диапазонов и чтобы иметь возможность мержить мелкие диапазоны. По сути, диапазоны - это "виртуальное"" разделение данных для удобства их перераспределения, физически данные на ноде лежат вместе в одном пространстве. Перенос диапазона - перенос всех его ключей.

Для обеспечения надёжности хранения (durability) каждый ключ лежит сразу на нескольких нодах - в репликах. Чтобы обеспечить это, в кластере присутствует сразу несколько колец из диапазонов, каждый из которых хранит все ключи. Запись или чтение считаются успешными только при успешном ответе от кворума реплик.

Между собой кольца независимы, то есть одно из них может содержать диапазоны A-F-L-Z, а другое A-F-Z. Такое может происходить в моменты деления/объединения диапазонов, или когда какой-то диапазон был недоступен, получил не все команды, и его размер не превысил норму. После продолжительной работы БД, кольца могут сильно рассинхронизироваться, но это не влечёт за собой никаких проблем. Едиственная взаимосвязь колец - их планировщики стараются не располагать реплики одинаковых ключей на одних и тех же нодах, поскольку это никак не поможет надёжному хранению данных. Но всё-таки это может произойти, когда два разных планировщика независимо перенесли данные, не зная про решения друг друга, вероятность этого довольно мала и сразу же произойдёт другое перераспределение.

Здесь не применяется никакого алгоритма консенсуса, поскольку уже есть транзакции. При любом потенциальном конфликте всегда есть реплика, которая будет общей между двумя конфликтующими транзакциями, поскольку успешным считается только операция, подтверждённая кворумом. Если такой конфликт произошёл, то транзакция будет откачена и никакой "мусор", оставшийся на репликах никогда не будет применёт, так что это не вызовет повреждения данных. От пользователя требуется, чтобы он не пытался восстановить состояние конфликтующей транзакции и не пытался применить транзакцию, завершённую таймаутом или любой другой ошибкой. В подобных ситуациях правильным решением будет перезапустить транзакцию.

Чтение данных (get range)

Приходит запрос от верхнего уровня (query processor), содержащий префикс (prefix), первый ключ (first_key), последний ключ (last_key), порядок выборки (order), количество ключей (limit), которые нужно вернуть, и ID транзакции. Из текущей информации о диапазонах для каждого кольца формируется список диапазонов, данные с которых нужно выбрать. В каждый диапазон направляется запрос для выборки данных. Если указан лимит, то обращения к диапазонам в пределах кольца происходят последовательно с учётом порядка выборки, пока не будет получено нужное количество данных, иначе - все запросы отправляются одновременно. Выборка считается успешной, когда по всем ключам получен кворум ответов. Если в пределах таймаута не получен кворум ответов хотя бы по одному ключу, то запрос завершается ошибкой таймаута.

В ответ на запросы к диапазонам приходят списки ключей, которые также содержат значения и ID транзакций, записавших эти значения. Из значений по одному ключу (с разных реплик) выбирается то, чей ID записавшей транзакции максимальный. Если от реплик пришли разные значения, то выполняется read-repair: всем репликам рассылается самое новое значение. В ответе содержатся даже удалённые значения, чтобы при устаревших данных на других репликах, можно было понять, что они удалены. При конфликте текущей и записавшей транзакции запрос к диапазону может завершиться ошибкой, в таком случае текущий запрос завершается с такой же ошибкой и необходимо будет перезапустить транзакцию.

Если в запосе указан limit, то при его достижении, в ответе возвращается сессия - последний прочитанный ключ (включая удалённые), чтобы с него продолжить чтение в следующем запросе.

Запись данных (put batch)

Приходит запрос с верхнего уровня, который содержит список пар (ключ, значение) и ID транзакции. Из текущей информации о диапазонах для каждого кольца формируется список диапазонов, в которые нужно записать данные. В каждый диапазон посылается набор ключей, который ему соответствует и ID транзакции. Запись считается успешной, когда по всем ключам получен кворум ответов. Если в пределах таймаута не получен кворум ответов хотя бы по одному ключу, то запрос завершается ошибкой таймаута.

Query Processor

С этим слоем напрямую работают клиенты. Сюда приходят запросы на выборку, добавление и изменение данных в таблицах.

Если в запросе не указан ID транзакции, то - создаётся новая транзакция, которая будет завершена сразу же после выполнения этого запроса (auto commit). Транзакция пробрасывается в нижний слой (ordered key-value).

Выборка данных (select)

В базу приходит запрос выборки данных из таблицы, который может содержать ограничения на значения любых колонок (where), ограничения на количество выбираемых данных (limit) и список колонок, которые нужно вернуть пользователю. Ограничения на значения колонок собраны вместе по оператору AND, а эти наборы ограничений собраны по оператору OR.

Ограничения из запроса преобразуются в набор условий для слоя ordered key-value store: префикс (prefix), первый ключ (first_key), последний ключ (last_key), порядок выборки (order) и коичество ключей (limit). Префикс состоит из имени кейспейса, имени таблицы или индекса и значений полей ключа.

Первым делом выбирается индекс, по которому будет выполнен поиск, им может быть основной ключ таблицы или один из собственно индексов таблицы. Выбор зависит от локальности результирующей выборки: отношения всех выбранных записей к количеству полезных записей. Локальность определяется эвристикой по количеству колонок из условия, которые не вошли в условие для нижнего слоя: чем меньше оставшихся колонок, тем выше локальность. В префикс, кроме описанных выше данных, входят колонки, с которых начинается выбранный индекс и которые выбираются по строгому равенству. Начальный и конечный ключи ограничены индексируемыми колонками из условия, которые были выбраны по диапазону (больше, меньше). В зависимости от строгости неравенства устанавливаются флаги включения/исключения граничных ключей. Все остальные условия проверяются при пост-обработке на уже выбранных данных.

Пример формирования условия для key-value.

Пусть у нас есть таблица:

@Index(name = "person_by_country_city_age", key = ["country", "city", "age", "id"])
class Person(
    @Key val id: UUID,
    @Column val name: String,
    @Column val country: String,
    @Column val city: String,
    @Column val age: Int,
    @Column val birthDate: Date
)

И мы хотим сделать такой запрос:

select()
  .where { (Person::country eq <country>) and (Person:: city eq <city>) and (Person::age ge <age>) and (Person::birthDate lt <birthDate>) }
  .ascending()
  .limit(10)

Тогда у нас есть два варианта, какой взять индекс: основной ключ id или индекс person_by_country_city_age. Чтобы максимизировать локальность запроса, выберем второй вариант, тогда получится следующий запрос к key-value:

prefix = [<keyspace_name>, person_by_country_city_age, <country>, <city>]
first_key = [<keyspace_name>, person_by_country_city_age, <country>, <city>, <age>]
first_inclusive = true
last_key = infinity
last_inclusive = false
order = ascending
limit = 10

После выполнения выборки останется пост-обработкой отфильтровать условие Person::birthDate lt <birthDate>.

Значения полученные из key-value содержат только те колонки, которые не вошли в ключ (индекс), так что для возвращения данных пользователю, записи наполняются всеми необходимыми колонками из ключа и из значения.

При ограничении limit'ом пользователю так же возвращается "сессия" запроса - последний прочитанный ключ, чтобы потом продолжить с него. Если в последующих запросах указана сессия, то начальный ключ смещается к указанному. Данный слой берёт информацию о последнем прочитанном ключе из ответа от key-value.

Если нужно пересортировать данные не в том порядке, как они выбирались, или если есть несколько OR условий, то лимит не перенаправляется в ordered key-value, вместо этого выбираются все данные и сортируются в памяти.

Добавление данных (insert)

В базу приходит запрос на вставку записи в таблицу, он содержит все колонки, которые нужно записать. Колонки должны содержать все части основного ключа, но не обязаны содержать индексируемые колонки. В случае если одна из индексируемых колонок отсутствует, то запись не будет добавлена в этот индекс.

Перед записью происходит чтение возможно уже существующей записи из таблицы по основному ключу (правила формирования ключа как в select запросе). Если выбран режим SKIP_IF_EXISTS, то при присутствии записи пользователю сразу же возвращается ответ, что вставлено 0 записей. Если выбран режим OVERRIDE_IF_EXISTS и запись уже существует, то для каждого индекса формируется ключ в key-value storage и ему устанавливается пустое значение. Таким образом происходит удаление предыдущей записи из индексов.

Далее, для новой записи так же формируются ключи для key-value storage по основному ключу таблицы и для каждого индекса. Значениями для этих ключей становится список пар (колонка, значение) за исключением соответствующих колонок, которые уже вошли в ключ.

Весь сформированный набор (ключ, значение) отправляется в метод put batch из слоя ordered key-value.

Обновление данных (update)

Запрос содержит ограничения на значения колонок (where), как и запрос select, и набор колонок с новыми значениями. По этим ограничениям так же формируется запрос на выборку данных из ordered key-value. После получения и пост-фильтрации данных, формируется список старых ключей в индексах с null-значенями. Далле, к строкам применяются новые значения, формируется список новых ключей в индексах, и выполняется запрос на запись put batch.

Удаление данных (delete)

Запрос содержит ограничения на значения колонок (where), как и запрос select. По этим ограничениям так же формируется запрос на выборку данных из ordered key-value. После получения и пост-фильтрации формируется список старых ключей в индексах и строк в основной таблице с null-значенями и выполняется запрос на запись put batch.

ORM

Для v3 ORM существует для языка Kotlin

Уникальный таймштамп

Это таймштамп для уникального описания событий во времени и пространстве кластера. Он состоит из локального таймштампа в миллисекундах, монотонно возрастающего счётчика и ID ноды, которая его сгенерировала. Эти данные не сохраняются на диске, при перезапуске ноды она получит их от других нод. Все ноды периодически обмениваются своими локальными таймштампами, чтобы синхронизировать время, при этом выбирается максимальное значение. Счётчик инкрементируется при генерации очередного уникального таймштампа, когда локальный остался неизменным (это происходит если генерация произошла в одну миллисекунду или если локальное время отстаёт от общего). Такой формат обеспечивает уникальность в пределах кластера и синхронизацию времени между нодами, достаточную, чтобы детектировать конфликты. Он используется для генерации ID транзакции, пометки версий значений в Paxos и MVCC алгоритмах.

Transaction Manager

Записи о транзакции должны храниться так же надёжно, как и сами данные, поэтому их нужно реплицировать. Чтобы атомарно обновлять все реплики используется алгоритм консенсуса Paxos. Распределение реплик по кластеру производится путём хеширования ID транзакции и маппинга полученого значения ня кольцо нод кластера. ID транзакции генерируется как уникальный таймштамп описанный выше. При попытке чтения или записи, если транзакция в незавершённом состоянии и с момента начала прошло больше 5 секунд, то транзакция считается откаченой.

Запуск новой транзакции (begin)

Нода, на которую пришёл запрос, генерирует ID новой транзакции, при этом не записывается никаких данных (пустое значение = транзакция незавершена). Запросы к данным в транзакции могут направляться напрямую к содержащим их нодам, поэтому они будут обрабатываться максимально быстро, если роутить их туда вручную.

Применение транзакции (commit)

По ID транзакции формируется список нод, которые содержат реплики, далее используется Paxos для согласованного обновления реплик (этот алгоритм можно повторить несколько раз):

  1. Генерируется уникальный таймштамп для новой версии значения.
  2. Всем репликам отправляются запросы на чтение с указанием новой версии.
  3. Каждая реплика должна вернуть своё значение и запомнить новую версию, если локальная версия меньше, иначе - ответить ошибой.
  4. Когда от кворума реплик получены ответы, выбирается самая новая версия значения (по таймштампу). Если одна из реплик ответила ошибкой или в значении сказано, что транзакция уже завершена (commit/rollback), то клиенту вернётся ошибка конфликта транзакций, иначе значение изменяется на commit.
  5. Новому значению присваивается версия, значение рассылается всем репликам.
  6. Реплика должна записать новое значение, если новая версия больше или равна локальной версии реплики, иначе - ответить ошибкой.
  7. Если от кворума реплик получен ответ об успешной записи, то запрос успешно завершается. Если хотя бы одна реплика ответила ошибкой, то клиенту возвращаетя ошибка.

Откат транзакции (rollback)

Вызывается только пользователем. Алгоритм такой же как в случае с примерением транзакции (commit), только новым значением выбирается rollback.

Чтение состояния транзакции (get)

По ID транзакции формируется список нод, которые содержат реплики, далее используется Paxos для согласованного чтения реплик (этот алгоритм можно повторить несколько раз):

  1. Всем репликам отправляются запросы на чтение.
  2. Когда от кворума реплик получены ответы, выбирается самая новая версия значения (по таймштампу). Если все полученные версии равны, то сразу завершаем алгоритм.
  3. Всем репликам рассылается значение с самой новой версией (read repair).
  4. Реплика должна записать новое значение, если новая версия больше или равна локальной версии реплики, иначе - ответить ошибкой.
  5. Если хотя бы одна реплика ответила ошибкой, то клиенту возвращаетя ошибка.

MVCC-алгоритм

TODO: попробовать расписать со всеми нюансами.

Multiversion Concurrency Control. Применяется для разрешения транзакционных конфликтов в ordered key-value storage. Используется на самом деле не multi-, а two-version. Это всё ещё позволяет читателям не блокировать писателей и наоборот, но только на одну новую версию. С другой стороны, это сильно сокращает сложность выборок данных с диска и ускоряет запросы. Данные лежат на диске в виде пар (ключ -> [текущее значение, новое значение, пометка читающей транзакции]). Каждое значение содержит ID записавшей его транзакции и таймштам записи этого значения. Далее идёт довольно упрощённый алгоритм, чтобы понять все нюансы всё равно нужно смотреть код.

Чтение данных

  1. Из запроса от ordered key-value выбирается оптимальная стратегия чтения данных (вперёд, назад, начальный и последний ключ).
  2. Читаем все ключи, пока не прочитаем всё или limit ключей.
  3. По всем неприменённым значениям формируем список транзакций и запрашиваем их состояния.
  4. Применяем, откатываем или оставляем записи неизменёнными в зависимости от состояния транзакций.
  5. Если хотя бы одна запись записана более новой транзакцией, то отвечаем ошибкой конфликта.
  6. Помечаем все значения, как прочитанные этой транзакцией (если в пометке предыдущей читающей транзакции ID старше).
  7. Возвращаем все прочитанные значения.

Запись данных

  1. Читаем все ключи из запроса ordered key-value.
  2. По всем неприменённым значениям формируем список транзакций и запрашиваем их состояния.
  3. Применяем, откатываем или оставляем записи неизменёнными в зависимости от состояния транзакций.
  4. Если хотя бы одна запись записана или прочитана более новой транзакцией, то отвечаем ошибкой конфликта.
  5. Записываем все значения с пометкой новой транзакции.

Gossip-протокол

Используется для хранения информации о кластере: список нод кластера и диапазонов key-value. Полностью продублирован на каждой ноде, работает в режиме eventually consistent. Поддерживает только простые get/put операции, никаких транзакций или atomic-операций тут нет. Данные храняться персистентно на диске и закэшированы в памяти. Удалённые ключи навсегда остаются в памяти.

Запись (put batch)

Все новые ключи со значениями записываются в память и на диск с таймштампом, который генерируется по тем же правилам, как и ID транзакции. После этого обновляется словарь синхронизации, описанный ниже.

Чтение (get range)

Читаются локальные данные, здесь всё просто. Параметры такие же как и у key-value storage: prefix, first_key, last_key, limit.

Синхронизация

Общее правило: значения с более новыми таймштампами перекрывают устаревшие значения. Каждая нода хранит словарь (нода -> таймштамп). В нём хранится таймашамп, до которого записи другой ноды синхронны с записями этой ноды. На каждом периоде синхронизации, ключи с таймштампом записи выше этого значения отправляются на другую ноду, и после успешного завершения запроса записывается новый таймштамп. Если текущая нода получила новые для неё данные от другой ноды, таймштампы которых ниже, чем значение из таблицы, то в таблицу записывается минимальное значение. Это нужно, чтобы на следующем периоде синхронизации разослать данные, которых потенциально нет на других нодах. Такой алгоритм довольно избыточен по загрузке сети, но здесь это не приципиально. Так же этот протокол позволяет проверять статус других нод: живы они или нет.

Сборка мусора и самовосстановление кластера

Периодически кластер выполняет сборку мусора. Под понятие мусора попадают:

  • старые записи о транзакциях, на которые уже никто не ссылается;
  • null-значения ключей ordered key-value, которые уже присутствуют на всех репликах (иначе старые значения могут "восстать из мёртвых");
  • данные старых таблиц, кейспейсов, которые были пересозданы, и теперь на них нет ссылок.

Сборка записей о транзакциях

Каждая нода периодически обходит все свои данные и если они не применены, то проверяет статус транзакции и пытается их применить. После этого вычисляется минимальная ещё активная транзакция и записывается в данные gossip-протокола. Теперь каждая нода может вычислить ID транзакции, с которой можно удалить все более старые транзакции. Вторым проходом собственно удаляются все эти записи.

Сборка null-значений

TODO: слишком непонятно

Сборка данных пересозданных таблиц

Каждая нода проходится периодически по своим данным и ищет ключи, которые ссылаются на таблицы/кейспейсы/индексы, которых уже нет, такие данные можно смело удалять.