Switch lang: Русский \ English

AJRPC - Acapella Java Remote Procedure Call (message orientated IPC Library)

Назначение

AJRPC нужен для организации RPC поверх произвольного транспорта. Если не нужен функционал RPC, то можно использовать как message passing, т.к. можно делать вызовы без возврата ответа.

А мне нужна эта штука ?

Нужен ли вам велосипед написанный одним человеком, пользоваться которым, вероятно, может только сам автор?

Ответом была тишина и инженер ушел просветленным.

Однако, мы используем ajrpc в нескольких проектах чтобы писать логику и одновременно развивать низкий транспортный слой, экспериментировать с транспортом и не переписывать все с нуля каждый раз, когда нам нужно менять архитектуру.

Почему вообще могут быть нужны такие подобные библотеки

Если у вашего ПО больше одного программного потока (Thread) или более одного процесса то, в нужен RPC или message passing, что угодно чтобы программные компоненты могли общаться.

Общение состоит из двух аспектов :

  • сериализация (serialization)
  • передача данных (transport)

Сериализация

Так как текстовый формат - частный случай банарного формата, будем рассматривать и те и другие как бинарые форматы.

binary serialization formats

Сравнение форматов сериализации, wiki таблица

Обзор бинарных форматов

Существуют варианты со схемой и без :

  • варианты со схемой
  • варианты без схемы
    • Bson ()
    • Smile (просто бинарный JSON , )
    • Binn (стандартизованный, бинарный, малопопулярный)

следующая таблица хорошо выглядит на широком экране , обозначения :

Effort

  • CLASSES_KNOWN - классы заранее известны (предварительная генерация схемы сериализации)
  • ZERO_KNOWLEDGE - классы неизвестны на момент сериализации (как правило схемы кешируются)
  • MANUAL_OPT - все вручную. будет похоже на SAX. это, конечно, sucks

Format

  • BINARY - бинарщина, заточен на java
  • BIN_CROSSLANG - бинарщина, НЕ заточен на определенный язык

Structure

  • FLAT_TREE - граф объектов представляет собой дерево (нет циклов). Забавно что произвольные древовидные структуры обычно задаются рекусивно - т.е для них нужен FULL_GRAPH
  • FULL_GRAPH - вообще любой граф объектов

Отсортировано по характеристикам производительности. Учитывается скорость сериализации/десериализации и размер сообщения. Чем выше - тем лучше.

                                   Effort          Format         Structure  Misc
protobuf/protostuff                CLASSES_KNOWN   BIN_CROSSLANG  FLAT_TREE  [] protobuf + generated code                                
protostuff                         CLASSES_KNOWN   BINARY         FLAT_TREE  [] generated code                                           
protobuf/protostuff-runtime        ZERO_KNOWLEDGE  BIN_CROSSLANG  FLAT_TREE  [] protobuf + reflection                                    
wobly                              MANUAL_OPT      BINARY         FLAT_TREE  []                                                          
kryo-opt                           MANUAL_OPT      BINARY         FLAT_TREE  [] manually optimized                                       
protostuff-graph                   CLASSES_KNOWN   BINARY         FULL_GRAPH [] graph + generated code                                   
wobly-compact                      MANUAL_OPT      BINARY         FLAT_TREE  []                                                          
fst-flat-pre                       CLASSES_KNOWN   BINARY         FLAT_TREE  [] fst in unshared mode with preregistered classes          
protostuff-graph-runtime           ZERO_KNOWLEDGE  BINARY         FULL_GRAPH [] graph + reflection                                       
json/dsl-platform                  CLASSES_KNOWN   JSON           FLAT_TREE  [] Serializes all properties.                               
protobuf                           CLASSES_KNOWN   BIN_CROSSLANG  FLAT_TREE  []                                                          
smile/jackson/manual               MANUAL_OPT      BINARY         FLAT_TREE  []                                                          
cbor/jackson/manual                MANUAL_OPT      BIN_CROSSLANG  FLAT_TREE  []                                                          
fst-flat                           ZERO_KNOWLEDGE  BINARY         FLAT_TREE  [] fst default, but unshared mode                           
msgpack/manual                     MANUAL_OPT      BIN_CROSSLANG  FLAT_TREE  [] uses positional (column) layout (instead of Maps std impl uses) to eliminate use of names
json/fastjson/databind             ZERO_KNOWLEDGE  JSON           FLAT_TREE  []                                                          
msgpack/databind                   CLASSES_KNOWN   BIN_CROSSLANG  FLAT_TREE  [] uses positional (column) layout (instead of Maps std impl uses) to eliminate use of names
smile/jackson+afterburner/databind ZERO_KNOWLEDGE  BINARY         FLAT_TREE  [] uses bytecode generation to reduce overhead              
json-col/jackson/databind          ZERO_KNOWLEDGE  JSON           FLAT_TREE  [] uses positional (column) layout to eliminate use of names
thrift                             CLASSES_KNOWN   BIN_CROSSLANG  FLAT_TREE  []                                                          
json/jackson/manual                MANUAL_OPT      JSON           FLAT_TREE  []                                                          
cbor/jackson+afterburner/databind  ZERO_KNOWLEDGE  BINARY         FLAT_TREE  [] uses bytecode generation to reduce overhead              
jboss-marshalling-river-ct-manual  MANUAL_OPT      BINARY         FULL_GRAPH [] full graph preregistered classes, manual optimization    
kryo-serializer                    ZERO_KNOWLEDGE  BINARY         FULL_GRAPH [] default                                                  
fst                                ZERO_KNOWLEDGE  BINARY         FULL_GRAPH [] default: JDK serialization drop-in-replacement mode      
avro-specific                      MANUAL_OPT      BIN_CROSSLANG  UNKNOWN    []                                                          
smile/jackson/databind             ZERO_KNOWLEDGE  BINARY         FLAT_TREE  []                                                          
scala/sbinary                      MISC            MISC           UNKNOWN    [] null                                                     
json/protostuff-manual             MANUAL_OPT      JSON           FLAT_TREE  [] json + manual                                            
json/jackson+afterburner/databind  ZERO_KNOWLEDGE  BINARY         FLAT_TREE  [] uses bytecode generation to reduce overhead              
json/jackson-jr/databind           ZERO_KNOWLEDGE  JSON           FLAT_TREE  []                                                          
jboss-marshalling-river-ct         CLASSES_KNOWN   BINARY         FULL_GRAPH [] full graph with preregistered classes                    
xml/aalto-manual                   MANUAL_OPT      XML            UNKNOWN    []                                                          
json/gson/manual                   MANUAL_OPT      JSON           FLAT_TREE  []                                                          
xml/woodstox-manual                MANUAL_OPT      XML            UNKNOWN    []                                                          
hessian                            ZERO_KNOWLEDGE  BIN_CROSSLANG  FULL_GRAPH []                                                          
xml/jackson/databind               ZERO_KNOWLEDGE  XML            FLAT_TREE  []                                                          
json/json-smart/manual-tree        MANUAL_OPT      JSON           FLAT_TREE  []                                                          
bson/mongodb/manual                MANUAL_OPT      BIN_CROSSLANG  FLAT_TREE  []                                                          
json/gson/manual-tree              MANUAL_OPT      JSON           FLAT_TREE  []                                                          
xml/xstream+c-aalto                MANUAL_OPT      XML            FLAT_TREE  []                                                          
xml/javolution/manual              MANUAL_OPT      XML            FLAT_TREE  []                                                          
xml/xstream+c-fastinfo             MANUAL_OPT      XML            FLAT_TREE  []                                                          
xml/xstream+c-woodstox             MANUAL_OPT      XML            FLAT_TREE  []                                                          
json/org.json/manual-tree          MANUAL_OPT      JSON           FLAT_TREE  []                                                          
json/javax-stream/glassfish        MANUAL_OPT      JSON           FLAT_TREE  []                                                          
xml/xstream+c                      ZERO_KNOWLEDGE  XML            FLAT_TREE  []                                                          
json/javax-tree/glassfish          ZERO_KNOWLEDGE  JSON           FLAT_TREE  []                                                          
jboss-marshalling-river            ZERO_KNOWLEDGE  BINARY         FULL_GRAPH [] full graph zero knowledge                                
stephenerialization                ZERO_KNOWLEDGE  BINARY         FULL_GRAPH [] null                                                     
xml/exi-manual                     ZERO_KNOWLEDGE  XML            UNKNOWN    []                                                          
java-built-in-serializer           ZERO_KNOWLEDGE  BINARY         FULL_GRAPH []                                                          
jboss-serialization                ZERO_KNOWLEDGE  BINARY         FULL_GRAPH []                                                          
java-built-in                      ZERO_KNOWLEDGE  BINARY         FLAT_TREE  []                                                          
json/flexjson/databind             ZERO_KNOWLEDGE  JSON           FULL_GRAPH []                                                          
json/jsonij/manual-jpath           MANUAL_OPT      JSON           FLAT_TREE  []                                                          
json/protobuf                      CLASSES_KNOWN   JSON           FLAT_TREE  []                                                          
json/json-lib/databind             ZERO_KNOWLEDGE  JSON           FLAT_TREE  []                                                         

Транспорт

Транспорт может быть

  • IPC
  • Сетевой (TCP, UDP, SCTP)

или

  • с брокером
  • без брокера (brockerless) или P2P

существуют и другие классификации.

Примеры "транспотрных" решений : ZMQ, JeroMQ, NanoMSG, RabitMQ, ActiveMQ, Redis, Crossroads I/O, ...

Проблема : одни из них быстры на сети, но медленные на IPC или не предназначены для ipc, другие быстры наоборот на IPC, но на сети раобтают хуже; одни ориентированы на пропускную способность, другие имеют быстрый отклик.

Для разных задач требуются разные транспорты, и хорошо бы иметь возможность их переключать.

Наши требования:

  • Подключения к нескольким P2P точкам
  • Возможность будить блокировки в ожидании прихода сообщений ЛИБО быстрый inrpoc (желательно без серилизации)
  • Подтвреждение подключения ЛИБО возможность слать сообщения неподключенному ендпойнту (должны в локальную очередь вставать)

хотелось бы:

  • поддержание соединения
  • управляемый и явный батчинг
  • измерение пропускной способности канала и задержек из коробки
  • более менее оперативные уведомления о разрывах

Видно , что цели все же немного иные, чем у message passing

Нам нужна не "Messaging System" а "robust message orientated IPC Library", это и есть роль ajrpc.

Возможности, Использование (радужная часть)

Начало

Определяем rpc-интерфейс:

    interface TestInterface {
        int doJob(String param);
        Person findOneByAge(int age, Person ... persons);
    }

Клиент

Создаем клиент:

    RpcClient<TestInterface> workersApi = RpcClient.newSingleClient(TestInterface.class);

Асинхронный вызов с сериализацией:

    workersApi
        .async(i -> i.doJob("jobParameter"))
        .sendBytesTo(localActor("worker1"))
        .onResult(System.out::println)
        .invoke();

Без сериализации:

    workersApi
        .async(i -> i.doJob("jobParameter"))
        .sendRequestTo(localActor("worker1"))
        .onResult(System.out::println)
        .invoke();  

Если не нужен результат, но нужно подтверждение вызова:

    workersApi
        .async(i -> i.doJob("jobParameter"))
        .needResult(false)
        ...
        .invoke();  

Если не нужно никакого ответа:

    workersApi
        .async(i -> i.doJob("jobParameter"))
        .needResponse(false)
        ...
        .invoke();  

Если не нужно никакого ответа и транспорт определен заранее:

    workersApi.fireAndForget(i -> i.doJob("jobParameter"));

При аснхронном вызвове обработка ответа инициируется программистом самостоятельно:

    // здесь может быть InputStream, byte[] или RpcMessage
    boolean handled = workersApi.handleResponse(bytes);

Для нескольких клиентов на одном канале транспорта:

    RpcClientGroup clients = new RpcClientGroup();
    RpcClient<TestInterface> workersApi = clients.newSingleClient(TestInterface.class);
    RpcClient<TestInterface2> workersApi2 = clients.newSingleClient(TestInterface2.class);

    clients.handleResponse(bytes)

Поскольку ajrpc строго однопоточный (не потокобезопасен) и не использует никаких блокировок/синхронизаций, то для синхронных вызвов нужен синхронный/блокирующий транспорт.

Настройка транспорта:

    RpcClient<TestInterface> workersApi = RpcClient.newSingleClient(TestInterface.class);

    // блокирующее получение ответа через inStream
    workersApi.setInputStream(inStream);

    // или через блокирющий mq, напр zmq.recv()
    workersApi.receiveBytesFrom(() -> socket.recv());

    // или без сериализации через блокирующую очередь, напр. ArrayBlockingQueue
    workersApi.receiveResponseFrom(() -> queue.poll());

Сам вызов:

    TestInterface ti = workersApi.createBlocking();

    Person result = ti.findOneByAge(14, new Person("bob"), new Person("jack"));

Или без создания экземпляра интерфейса:

    Person result = workersApi.sync(i -> i.findOneByAge(14, new Person("bob"), new Person("jack")));

Сервер

Серверная часть намного проще:

    RpcServer server = RpcServer.fromImplementations(new TestInterface() {
        int doJob(String param) {
            // implementation
        }

        Person findOneByAge(int age, Person ... persons) {
            // implementation
        }
    });

    byte[] response = server.handleRequest(in);

Здесь in может быть:

  • InputStream
  • byte[]
  • RpcMessage (вариант без сериализации)

Можно посылать ответ лямбдой:

    server.handleRequest(in, response -> socket.send(response));

Или в OutputStream:

    OutputStream out = socket.getOut();
    server.handleRequest(in, out);

Сервер и клиент вместе

Если нужно вешать клиент и сервер на один канал транспорта:

    RpcNode node = new RpcNode();

    RpcClient<TestInterface> workersApi = node.newSingleClient(TestInterface.class);

    node.addServerLogic(new TestInterface() {...});

    // обработка сообщения
    node.handleMessage(msg, response -> socket.send(response));

Продвинутые фичи

Сериализация

При взаимодействии java-java, думать о серализации нужно только при написании pojo, упоминающихся в интерфейсе. Единственное ограничение в этом случае - наличие дефолтного конструктора. В противном случае, в pojo можно вручную реализовать логику сериализации наследуясь от инерфейсов Serializable или KryoSerializable.

Если речь идет о взаимодействии с другимим языками, тогда логично использвать protobuf. На java-стороне, в этом случае, кроме наличия сгенерированых стабов от программиста вобще ничего не требуется, т.к. ajrpc детектит protobuf-классы и использует для сериализации сам protobuf.

Важно заметить, что параметры вызвов и возвращаемые занчения могут серилизовать по разному. Вполне возможно использовать protobuf и kryo в пределах одной сигнатуры метода.

Подключение стороннего сериализатора на данный момент возможно только при перекомпиляции самого ajrpc. Для добавления оного нужно наследоваться от AjRpcSerializer и прописать полученный сериализатор в SerializerManager.

Результаты через Future

На данный момент поддерживается CompletableFuture (а точнее, более общий CompletionStage) и ru.satek.ajrpc.Promise. Необходимо определить возвращаемое значение как future:

    interface TestInterface {
        CompletableFuture<String> test(int parameter);
        Promise<String> test2(int parameter);
    }

Вобщем-то все работает как и ожидается, кроме:

  • ajrpc не thread-safe, и CompletableFuture, которые он отдает на клиенте уже не являются thread-safe, что противоречит спеке.
  • при блокирующем получени результата вызвова функции, которая возвращает future, ajrpc будет долбить блокирующими чтениями из транспорта до тех пор пока не придет результат, и только потом вернет управление и возвратит готовый future.

Пробрасывание исключений

Тут все просто - на клиент будут валится RuntimeException с текстом ошибки и стектрейсом оригинального исключения, произошедшего на сервере. К тексту ошибки будет приписываться имя класса оригинального исключения. Короче говоря, честная сериализация исключений не производится.

Этого проброса хватает в 80% случаев.

Таймауты

Таймауты могут быть:

  • константными
  • "экспоненциально" растущими

Константный таймаут:

    workersApi
        .async(i -> i.doJob("jobParameter"))
        .timeout(1000)
        .invoke();  

"Экспоненциальный" таймаут:

    int initialTimeout = 1000; // начальный таймаут
    double exp = 1.6;          // множитель

    workersApi
        .async(i -> i.doJob("jobParameter"))
        .exponentialTimeout(initialTimeout, exp)
        .invoke();  

Применение "экспоненциального" таймаута значительно улучшает пропускную способность транспорта и снижает задержки в "независших соединениях".

Логика обработки таймаута может быть разной:

    workersApi
        .async(i -> i.doJob("jobParameter"))
        .timeout(1000)
        .retryOnTimeout()               // послылаем запрос заново при таймауте
        .maxRetries(3)                  // огранчиваем количество попыток
        .onTimeout(() -> { ... })       // если все поптыки провалились вызывается onTimeout-callback
        .invoke(); 

Т.е. есть повторы передачи , число попыток и обработка в если попытки не удались.

Повторные попытки при исключениях

    workersApi
        .async(i -> i.doJob("jobParameter"))
        .retryOn(NotInitializedException.class) // делаем запрос заново если на сервере вылетает NotInitializedException
        .invoke();  

Протокол

TODO вставить раздел "приложение", где вставить примеры кода с подсветкой строк, а то тут ссылки битые получаются

Формат запроса:

  • ubyte - версия протокола (на данный момент: 3)
  • ubyte - тип сообщения: src/MsgType
  • Секция параметров. Секция повторяется для каждого парамтера вызываемой фукнции:
    • 4 байта - длина массива байтов сериализованого параметра
    • Сериализованый параметр (зависит от типа сериализатора)

Формат ответа:

  • ubyte - версия протокола (на данный момент: 3)
  • ubyte - тип сообщения: src/MsgType
  • Возвращаемое значение ИЛИ параметры исключения (флаг):
    • Возвращаемое значение (только если установлен флаг isResultIncluded):
      • 4 байта - длина массива байтов сериализованого возвращаемого значения
      • сериализованое возвращаемое значение (зависит от типа сериализатора)
    • Исключение: сериализация и десериализация

Минусы/подводные камни (похоронная часть)

  • Нет интеграции с траспортом, поэтому из коробки ничего не заработает
  • Не потокобезопасен. Использвание в пределах одного потока (с другой стороны, так всегда надо стараться делать).
  • Функционал блокирующих вызвов завязан на транспорте (с другой стороны - никакой многопоточной магии, предельно простой data flow и никаких оверхедов)
  • Для задействования тауймаутов нужно заботится о своевременных вызовах updateTime()
  • ajrpc предполагает монопольное владение stream-based транспортом, т.е. ничего кроме пакетов ajrpc по нему ходить не должно. Схожая ситуация и с mq транспортом: тут ограничения чуть слабее - обработка может завершится неудачно, но сообщение при этом не теряется, т.е. его можно обработать по другому.
  • Для работы с другими языками нужно чтобы параметры и возвращаемые значения были классами/стабами protobuf (кроме случая void-результата).

Минусы текущей реализации (в перспективе исправимо):

  • Блокирующие вызовы не используют таймауты (можно делать только на транспортной стороне)
  • Очень жирные сообщения (нет синхронизаций идентификаторов классов/полей). По размеру такие же, как при использовании текстовых сериализаторов.
  • CompletableFuture, которые используются сейчас для методов с Future в качестве возвращаемого занчения, перестают быть thread-safe, хотя по спеке они таковыми являются.

Как включить в проект

В settings.gradle:

        include 'ajrpc'
        project(":ajrpc").projectDir = file("../../../../ajrpc")

путь настраиваем какой надо.

В build.gradle добавить зависимость

        dependencies {
            compile project(':ajrpc')
        }

Как собрать JAR

        ./gradlew jar

Jar будет лежать в /build/libs

PS ПО распространяется по лицензии, скачать не получится

Comments

Comments powered by Disqus
Skip to main content