Step by step Tutorial

Source for this page

  1. LINK TO GITHUB
  2. LINK TO EXAMNPLES ON GITHUB

No Transactional use way

  1. registration, token
  2. hello world : write and read some key
  3. plan a scheme : Person, Apartment, Registration
  4. plan an indexes: .... and create it
  5. insert data items

Simple "Apartment and Persons" example (non transactional, Key-value with auto indexes)

use python client from AcapellaSoft AcapellaDBClient-py repository

import asyncio
from datetime import datetime
from typing import Optional, List, Tuple
from uuid import UUID, uuid4

from acapella.kv import Session
from acapella.kv.IndexField import IndexField, IndexFieldType, IndexFieldOrder
from acapella.kv.PartitionIndex import QueryCondition

define data classes:

class Person:
    def __init__(self, id: UUID, first_name: str, last_name: str):
        self.id = id
        self.first_name = first_name
        self.last_name = last_name

    def __repr__(self):
        return f'Person(id={self.id}, first_name={self.first_name}, last_name={self.last_name})'


class Apartment:
    def __init__(self, id: UUID, address: str):
        self.id = id
        self.address = address

    def __repr__(self):
        return f'Apartment(id={self.id}, address={self.address})'


class Registration:
    def __init__(self, person_id: UUID, apartment_id: UUID, created_at: datetime):
        self.person_id = person_id
        self.apartment_id = apartment_id
        self.created_at = created_at

    def __repr__(self):
        return f'Registration(' \
                   f'person_id={self.person_id}, ' \
                   f'apartment_id={self.apartment_id}, ' \
                   f'created_at={self.created_at}' \
               f')'

We here write full, but understandable classes for work with models:

class Persons:
    # first key part is user-id
    # second key part is keyspace
    PARTITION = ['examples', 'persons']

    def __init__(self, session: Session):
        self._session = session

    async def drop_table(self):
        result = await self._session.range(self.PARTITION)
        batch = self._session.batch_manual()
        for entry in result:
            entry.set(None, reindex=True, batch=batch)
        await batch.send()

    async def create_table(self):
        # indexes are assigned to keyspace, which is computed out of partition like this:
        # [user-id, keyspace, some, custom, parts, ...]
        # all keys in this keyspace will be indexed using this indexes
        indexes = self._session.partition_index(self.PARTITION)
        await indexes.set_index(1, [IndexField('first_name', IndexFieldType.string, IndexFieldOrder.ascending)])
        await indexes.set_index(2, [IndexField('last_name', IndexFieldType.string, IndexFieldOrder.ascending)])

    async def save(self, person: Person):
        key, value = self._serialize(person)
        entry = self._session.entry(self.PARTITION, key)
        await entry.set(value, reindex=True)

    async def get(self, id: UUID) -> Optional[Person]:
        key = self._serialize_key(id)
        entry = await self._session.get_entry(self.PARTITION, key)
        return self._deserialize(key, entry.value) if entry.value else None

    async def get_by_first_name(self, first_name: str) -> List[Person]:
        result = await self._session.partition_index(self.PARTITION).query({
            'first_name': QueryCondition(eq=first_name)
        })
        return [self._deserialize(entry.clustering, entry.value) for entry in result]

    async def get_by_last_name(self, last_name: str) -> List[Person]:
        result = await self._session.partition_index(self.PARTITION).query({
            'last_name': QueryCondition(eq=last_name)
        })
        return [self._deserialize(entry.clustering, entry.value) for entry in result]

    @classmethod
    def _serialize_key(cls, id: UUID) -> List[str]:
        return [str(id)]

    @classmethod
    def _serialize(cls, person: Person) -> Tuple[List[str], dict]:
        key = cls._serialize_key(person.id)
        value = {
            'first_name': person.first_name,
            'last_name': person.last_name
        }
        return key, value

    @staticmethod
    def _deserialize(key: List[str], value: dict) -> Person:
        return Person(
            id=UUID(key[0]),
            first_name=value['first_name'],
            last_name=value['last_name']
        )

Here you can to see index creation and query to indexes.

Another lists:

class Apartments:
    PARTITION = ['examples', 'apartments']

    def __init__(self, session: Session):
        self._session = session

    async def drop_table(self):
        result = await self._session.range(self.PARTITION)
        batch = self._session.batch_manual()
        for entry in result:
            entry.set(None, reindex=True, batch=batch)
        await batch.send()

    async def create_table(self):
        pass

    async def save(self, apartment: Apartment):
        key, value = self._serialize(apartment)
        entry = self._session.entry(self.PARTITION, key)
        await entry.set(value, reindex=True)

    async def get(self, id: UUID) -> Optional[Apartment]:
        key = self._serialize_key(id)
        entry = await self._session.get_entry(self.PARTITION, key)
        return self._deserialize(key, entry.value) if entry.value else None

    @classmethod
    def _serialize_key(cls, id: UUID) -> List[str]:
        return [str(id)]

    @classmethod
    def _serialize(cls, apartment: Apartment) -> Tuple[List[str], dict]:
        key = cls._serialize_key(apartment.id)
        value = {
            'address': apartment.address
        }
        return key, value

    @staticmethod
    def _deserialize(key: List[str], value: dict) -> Apartment:
        return Apartment(
            id=UUID(key[0]),
            address=value['address']
        )


class Registrations:
    PARTITION = ['examples', 'registrations']

    def __init__(self, session: Session):
        self._session = session

    async def drop_table(self):
        result = await self._session.range(self.PARTITION)
        batch = self._session.batch_manual()
        for entry in result:
            entry.set(None, reindex=True, batch=batch)
        await batch.send()

    async def create_table(self):
        indexes = self._session.partition_index(self.PARTITION)
        await indexes.set_index(1, [IndexField('person_id', IndexFieldType.string, IndexFieldOrder.ascending)])
        await indexes.set_index(2, [IndexField('apartment_id', IndexFieldType.string, IndexFieldOrder.ascending)])
        await indexes.set_index(3, [IndexField('created_at', IndexFieldType.number, IndexFieldOrder.ascending)])

    async def save(self, registration: Registration):
        key, value = self._serialize(registration)
        entry = self._session.entry(self.PARTITION, key)
        await entry.set(value, reindex=True)

    async def get(self, person_id: UUID, apartment_id: UUID) -> Optional[Registration]:
        key = self._serialize_key(person_id, apartment_id)
        entry = await self._session.get_entry(self.PARTITION, key)
        return self._deserialize(key, entry.value) if entry.value else None

    async def get_by_person(self, person_id: UUID) -> List[Registration]:
        result = await self._session.partition_index(self.PARTITION).query({
            'person_id': QueryCondition(eq=str(person_id))
        })
        return [self._deserialize(entry.clustering, entry.value) for entry in result]

    async def get_by_apartment(self, apartment_id: UUID) -> List[Registration]:
        result = await self._session.partition_index(self.PARTITION).query({
            'apartment_id': QueryCondition(eq=str(apartment_id))
        })
        return [self._deserialize(entry.clustering, entry.value) for entry in result]

    async def get_since(self, since: datetime) -> List[Registration]:
        result = await self._session.partition_index(self.PARTITION).query({
            'created_at': QueryCondition(from_=since.timestamp())
        })
        return [self._deserialize(entry.clustering, entry.value) for entry in result]

    @classmethod
    def _serialize_key(cls, person_id: UUID, apartment_id: UUID) -> List[str]:
        return [str(person_id), str(apartment_id)]

    @classmethod
    def _serialize(cls, registration: Registration) -> Tuple[List[str], dict]:
        key = cls._serialize_key(registration.person_id, registration.apartment_id)
        value = {
            'person_id': str(registration.person_id),
            'apartment_id': str(registration.apartment_id),
            'created_at': registration.created_at.timestamp()
        }
        return key, value

    @staticmethod
    def _deserialize(key: List[str], value: dict) -> Registration:
        return Registration(
            person_id=UUID(key[0]),
            apartment_id=UUID(key[1]),
            created_at=datetime.fromtimestamp(value['created_at'])
        )

So, now we have data classes and list models for work with data lists.

Write main method off our application

async def run():
    # init database
    session = Session(port=5678)
    await session.login('examples', 'examples')

    # init persons table
    persons = Persons(session)
    await persons.drop_table()
    await persons.create_table()

    # init apartments table
    apartments = Apartments(session)
    await apartments.drop_table()
    await apartments.create_table()

    # init registrations
    registrations = Registrations(session)
    await registrations.drop_table()
    await registrations.create_table()

    # inserting persons
    person1 = Person(id=uuid4(), first_name='John', last_name='Smith')
    person2 = Person(id=uuid4(), first_name='Mary', last_name='Smith')
    person3 = Person(id=uuid4(), first_name='David', last_name='Brown')
    person4 = Person(id=uuid4(), first_name='David', last_name='Thompson')
    await persons.save(person1)
    await persons.save(person2)
    await persons.save(person3)
    await persons.save(person4)

    # inserting apartments
    apartment1 = Apartment(id=uuid4(), address='123 6th St. Melbourne, FL 32904')
    apartment2 = Apartment(id=uuid4(), address='4 Goldfield Rd. Honolulu, HI 96815')
    apartment3 = Apartment(id=uuid4(), address='70 Bowman St. South Windsor, CT 06074')
    apartment4 = Apartment(id=uuid4(), address='514 S. Magnolia St. Orlando, FL 32806')
    await apartments.save(apartment1)
    await apartments.save(apartment2)
    await apartments.save(apartment3)
    await apartments.save(apartment4)

    # inserting registrations
    registration1 = Registration(
        person_id=person1.id,
        apartment_id=apartment1.id,
        created_at=datetime(year=2015, month=4, day=21)
    )
    registration2 = Registration(
        person_id=person2.id,
        apartment_id=apartment1.id,
        created_at=datetime(year=2015, month=4, day=22)
    )
    registration3 = Registration(
        person_id=person3.id,
        apartment_id=apartment2.id,
        created_at=datetime(year=2016, month=8, day=14)
    )
    registration4 = Registration(
        person_id=person4.id,
        apartment_id=apartment3.id,
        created_at=datetime(year=2017, month=3, day=1)
    )
    registration5 = Registration(
        person_id=person4.id,
        apartment_id=apartment4.id,
        created_at=datetime(year=2017, month=11, day=25)
    )
    await registrations.save(registration1)
    await registrations.save(registration2)
    await registrations.save(registration3)
    await registrations.save(registration4)
    await registrations.save(registration5)

    # query by last name = Smith
    result = await persons.get_by_last_name('Smith')
    # prints John Smith, Mary Smith
    print(result)

    # query by first name = David
    result = await persons.get_by_first_name('David')
    # prints David Thompson, David Brown
    print(result)

    # query registrations for apartment1
    result = await registrations.get_by_apartment(apartment1.id)
    result = [await persons.get(entry.person_id) for entry in result]
    # prints John Smith, Mary Smith
    print(result)

    # query registrations for person4
    result = await registrations.get_by_person(person4.id)
    result = [await apartments.get(entry.apartment_id) for entry in result]
    # prints
    # 70 Bowman St. South Windsor, CT 06074
    # 514 S. Magnolia St. Orlando, FL 32806
    print(result)

    # query registrations since year 2016 (inclusive)
    result = await registrations.get_since(datetime(year=2016, month=1, day=1))
    # prints registrations 3, 4 and 5
    print(result)


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(run())

When run this, you get

[Person(id=197da754-6238-4fef-b56a-f2b9ad9e454f, first_name=John, last_name=Smith), Person(id=d5673e5e-9fde-4dc9-a95d-e7d83bcba191, first_name=Mary, last_name=Smith)]
[Person(id=9da92b38-c9ca-43ce-88b0-7fbd7a1afa76, first_name=David, last_name=Brown), Person(id=a955ff69-9534-425f-8102-e02d26c3da8d, first_name=David, last_name=Thompson)]
[Person(id=197da754-6238-4fef-b56a-f2b9ad9e454f, first_name=John, last_name=Smith), Person(id=d5673e5e-9fde-4dc9-a95d-e7d83bcba191, first_name=Mary, last_name=Smith)]
[Apartment(id=07b1b9a7-4a68-49a3-95d3-4e9d740875b4, address=70 Bowman St. South Windsor, CT 06074), Apartment(id=c54732d2-f9ed-418c-b63d-95911f272cad, address=514 S. Magnolia St. Orlando, FL 32806)]
[Registration(person_id=9da92b38-c9ca-43ce-88b0-7fbd7a1afa76, apartment_id=54bb2d2d-a629-4a92-89b5-75f68b05aad4, created_at=2016-08-14 00:00:00), Registration(person_id=a955ff69-9534-425f-8102-e02d26c3da8d, apartment_id=07b1b9a7-4a68-49a3-95d3-4e9d740875b4, created_at=2017-03-01 00:00:00), Registration(person_id=a955ff69-9534-425f-8102-e02d26c3da8d, apartment_id=c54732d2-f9ed-418c-b63d-95911f272cad, created_at=2017-11-25 00:00:00)]

Summary:

Summary

  • non transactional AcapellaDB usage way are fast and simple
  • you can write "json" values to AcapellaDB SortedMap
  • data will be re-indexed in several indexes when it change (async)
  • range can search "documents" by pk or reverse indexes
  • relation one-to-many are posible
  • relation many-to-many must handle in user code / server code
  • end user code are simple and clean
  • this example use http proxy api (~ 20_000 rps). if you need more, you must to use aeron api (~10x faster)

by default example use replication factor N=3

Transational use way

Tip

where will be dragons...

This is the bank example. In bank account have amount of value. One account can to transfer some value to recipient.

AcapellaDB handle this operation in transactional way and write to transfers item fot history.

  1. repeat first 4 previuos steps
  2. transactional swap two items
  3. transactional insert
  4. check that sequence exists and write to it
  5. NOT transactional search

detailed source code - bank: "Money transfer between accounts"

Data classes

mport asyncio
from decimal import Decimal
from typing import Optional, List, Tuple
from uuid import UUID, uuid1, uuid4

from acapella.kv import Session, Transaction


class Account:
    def __init__(self, id: UUID, balance: Decimal):
        self.id = id
        self.balance = balance

    def __repr__(self):
        return f'Account(id={self.id}, balance={self.balance})'


class Transfer:
    def __init__(self, timestamp: UUID, id_from: UUID, id_to: UUID, amount: Decimal):
        self.timestamp = timestamp
        self.id_from = id_from
        self.id_to = id_to
        self.amount = amount

    def __repr__(self):
        return f'Transfer(timestamp={self.timestamp}, id_from={self.id_from}, id_to={self.id_to}, amount={self.amount})'

List model callses:

class Accounts:
    PARTITION = ['examples', 'accounts']

    def __init__(self, session: Session):
        self._session = session

    async def drop_table(self):
        result = await self._session.range(self.PARTITION)
        batch = self._session.batch_manual()
        for entry in result:
            entry.set(None, reindex=True, batch=batch)
        await batch.send()

    async def create_table(self):
        pass

    async def save(self, tx: Transaction, account: Account):
        key, value = self._serialize(account)
        entry = tx.entry(self.PARTITION, key)
        await entry.set(value, reindex=True)

    async def get(self, tx: Transaction, id: UUID) -> Optional[Account]:
        key = self._serialize_key(id)
        entry = await tx.get_entry(self.PARTITION, key, watch=True)
        return self._deserialize(key, entry.value) if entry.value else None

    @classmethod
    def _serialize_key(cls, id: UUID) -> List[str]:
        return [str(id)]

    @classmethod
    def _serialize(cls, account: Account) -> Tuple[List[str], dict]:
        key = cls._serialize_key(account.id)
        value = {
            'balance': str(account.balance)
        }
        return key, value

    @staticmethod
    def _deserialize(key: List[str], value: dict) -> Account:
        return Account(
            id=UUID(key[0]),
            balance=Decimal(value['balance'])
        )


class Transfers:
    PARTITION = ['examples', 'transfers']

    def __init__(self, session: Session):
        self._session = session

    async def drop_table(self):
        result = await self._session.range(self.PARTITION)
        batch = self._session.batch_manual()
        for entry in result:
            entry.set(None, reindex=True, batch=batch)
        await batch.send()

    async def create_table(self):
        pass

    async def save(self, tx: Transaction, transfer: Transfer):
        key, value = self._serialize(transfer)
        entry = tx.entry(self.PARTITION, key)
        await entry.set(value, reindex=True)

    async def get(self, tx: Transaction, timestamp: UUID) -> Optional[Transfer]:
        key = self._serialize_key(timestamp)
        entry = await tx.get_entry(self.PARTITION, key, watch=True)
        return self._deserialize(key, entry.value) if entry.value else None

    async def get_all(self) -> List[Transfer]:
        result = await self._session.range(self.PARTITION)
        return [self._deserialize(e.clustering, e.value) for e in result if e.value is not None]

    @classmethod
    def _serialize_key(cls, timestamp: UUID) -> List[str]:
        return [str(timestamp)]

    @classmethod
    def _serialize(cls, transfer: Transfer) -> Tuple[List[str], dict]:
        key = cls._serialize_key(transfer.timestamp)
        value = {
            'id_from': str(transfer.id_from),
            'id_to': str(transfer.id_to),
            'amount': str(transfer.amount)
        }
        return key, value

    @staticmethod
    def _deserialize(key: List[str], value: dict) -> Transfer:
        return Transfer(
            timestamp=UUID(key[0]),
            id_from=UUID(value['id_from']),
            id_to=UUID(value['id_to']),
            amount=Decimal(value['amount'])
        )

Container class like "Database Context" :

class Model:
    def __init__(self, session: Session):
        self.session = session
        self.accounts = Accounts(session)
        self.transfers = Transfers(session)

    async def drop_all(self):
        await self.accounts.drop_table()
        await self.transfers.drop_table()

    async def create_all(self):
        await self.accounts.create_table()
        await self.transfers.create_table()

"transaction handler":

class Transactor:
    def __init__(self, model: Model):
        self._model = model

    async def transaction(self, id_from: UUID, id_to: UUID, amount: Decimal):
        async with self._model.session.transaction() as tx:
            timestamp = uuid1()

            from_account = await self._model.accounts.get(tx, id_from)
            assert from_account.balance >= amount, "Source account must have required amount to transfer"
            from_account.balance -= amount
            await self._model.accounts.save(tx, from_account)

            to_account = await self._model.accounts.get(tx, id_to)
            to_account.balance += amount
            await self._model.accounts.save(tx, to_account)

            transfer = Transfer(timestamp, id_from, id_to, amount)
            await self._model.transfers.save(tx, transfer)

main code for this example:

async def run():
    # init session
    session = Session(port=5678)
    await session.login('examples', 'examples')

    # init model
    model = Model(session)
    transactor = Transactor(model)
    await model.drop_all()
    await model.create_all()

    # inserting accounts
    account1 = Account(uuid4(), Decimal(100.0))
    account2 = Account(uuid4(), Decimal(10.0))
    async with session.transaction() as tx:
        await model.accounts.save(tx, account1)
        await model.accounts.save(tx, account2)

    # trying to perform bad transfer
    try:
        await transactor.transaction(account2.id, account1.id, Decimal(50.0))
    except AssertionError as e:
        print(e)

    # performing good transfers
    await transactor.transaction(account1.id, account2.id, Decimal(60.0))
    await transactor.transaction(account2.id, account1.id, Decimal(40.0))

    # now
    # account1 have balance = 80.0
    # account2 have balance = 30.0
    # transfers has two events:
    # 1 -> 60.0 -> 2
    # 2 -> 40.0 -> 1

    async with session.transaction() as tx:
        print('account 1', await model.accounts.get(tx, account1.id))
        print('account 2', await model.accounts.get(tx, account2.id))
        print('transfers', await model.transfers.get_all())


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(run())

when run, you get:

Source account must have required amount to transfer
account 1 Account(id=040cc0d7-0003-4a9b-ac45-31e9d9b91274, balance=80)
account 2 Account(id=bcd79bdb-8fe1-4eab-89e3-3350374124fb, balance=30)
transfers [Transfer(timestamp=edb410cc-e0ca-11e7-aee6-40e230b5623b, id_from=040cc0d7-0003-4a9b-ac45-31e9d9b91274, id_to=bcd79bdb-8fe1-4eab-89e3-3350374124fb, amount=60), Transfer(timestamp=edc1d810-e0ca-11e7-aee6-40e230b5623b, id_from=bcd79bdb-8fe1-4eab-89e3-3350374124fb, id_to=040cc0d7-0003-4a9b-ac45-31e9d9b91274, amount=40)]