Step by step Tutorial¶
Source for this page
No Transactional use way¶
- registration, token
- hello world : write and read some key
- plan a scheme :
Person
,Apartment
,Registration
- plan an indexes: .... and create it
- insert data items
Simple "Apartment and Persons" example (non transactional, Key-value with auto indexes)¶
use python client from AcapellaSoft AcapellaDBClient-py repository
import asyncio from datetime import datetime from typing import Optional, List, Tuple from uuid import UUID, uuid4 from acapella.kv import Session from acapella.kv.IndexField import IndexField, IndexFieldType, IndexFieldOrder from acapella.kv.PartitionIndex import QueryCondition
define data classes:
class Person: def __init__(self, id: UUID, first_name: str, last_name: str): self.id = id self.first_name = first_name self.last_name = last_name def __repr__(self): return f'Person(id={self.id}, first_name={self.first_name}, last_name={self.last_name})' class Apartment: def __init__(self, id: UUID, address: str): self.id = id self.address = address def __repr__(self): return f'Apartment(id={self.id}, address={self.address})' class Registration: def __init__(self, person_id: UUID, apartment_id: UUID, created_at: datetime): self.person_id = person_id self.apartment_id = apartment_id self.created_at = created_at def __repr__(self): return f'Registration(' \ f'person_id={self.person_id}, ' \ f'apartment_id={self.apartment_id}, ' \ f'created_at={self.created_at}' \ f')'
We here write full, but understandable classes for work with models:
class Persons: # first key part is user-id # second key part is keyspace PARTITION = ['examples', 'persons'] def __init__(self, session: Session): self._session = session async def drop_table(self): result = await self._session.range(self.PARTITION) batch = self._session.batch_manual() for entry in result: entry.set(None, reindex=True, batch=batch) await batch.send() async def create_table(self): # indexes are assigned to keyspace, which is computed out of partition like this: # [user-id, keyspace, some, custom, parts, ...] # all keys in this keyspace will be indexed using this indexes indexes = self._session.partition_index(self.PARTITION) await indexes.set_index(1, [IndexField('first_name', IndexFieldType.string, IndexFieldOrder.ascending)]) await indexes.set_index(2, [IndexField('last_name', IndexFieldType.string, IndexFieldOrder.ascending)]) async def save(self, person: Person): key, value = self._serialize(person) entry = self._session.entry(self.PARTITION, key) await entry.set(value, reindex=True) async def get(self, id: UUID) -> Optional[Person]: key = self._serialize_key(id) entry = await self._session.get_entry(self.PARTITION, key) return self._deserialize(key, entry.value) if entry.value else None async def get_by_first_name(self, first_name: str) -> List[Person]: result = await self._session.partition_index(self.PARTITION).query({ 'first_name': QueryCondition(eq=first_name) }) return [self._deserialize(entry.clustering, entry.value) for entry in result] async def get_by_last_name(self, last_name: str) -> List[Person]: result = await self._session.partition_index(self.PARTITION).query({ 'last_name': QueryCondition(eq=last_name) }) return [self._deserialize(entry.clustering, entry.value) for entry in result] @classmethod def _serialize_key(cls, id: UUID) -> List[str]: return [str(id)] @classmethod def _serialize(cls, person: Person) -> Tuple[List[str], dict]: key = cls._serialize_key(person.id) value = { 'first_name': person.first_name, 'last_name': person.last_name } return key, value @staticmethod def _deserialize(key: List[str], value: dict) -> Person: return Person( id=UUID(key[0]), first_name=value['first_name'], last_name=value['last_name'] )
Here you can to see index creation and query to indexes.
Another lists:
class Apartments: PARTITION = ['examples', 'apartments'] def __init__(self, session: Session): self._session = session async def drop_table(self): result = await self._session.range(self.PARTITION) batch = self._session.batch_manual() for entry in result: entry.set(None, reindex=True, batch=batch) await batch.send() async def create_table(self): pass async def save(self, apartment: Apartment): key, value = self._serialize(apartment) entry = self._session.entry(self.PARTITION, key) await entry.set(value, reindex=True) async def get(self, id: UUID) -> Optional[Apartment]: key = self._serialize_key(id) entry = await self._session.get_entry(self.PARTITION, key) return self._deserialize(key, entry.value) if entry.value else None @classmethod def _serialize_key(cls, id: UUID) -> List[str]: return [str(id)] @classmethod def _serialize(cls, apartment: Apartment) -> Tuple[List[str], dict]: key = cls._serialize_key(apartment.id) value = { 'address': apartment.address } return key, value @staticmethod def _deserialize(key: List[str], value: dict) -> Apartment: return Apartment( id=UUID(key[0]), address=value['address'] ) class Registrations: PARTITION = ['examples', 'registrations'] def __init__(self, session: Session): self._session = session async def drop_table(self): result = await self._session.range(self.PARTITION) batch = self._session.batch_manual() for entry in result: entry.set(None, reindex=True, batch=batch) await batch.send() async def create_table(self): indexes = self._session.partition_index(self.PARTITION) await indexes.set_index(1, [IndexField('person_id', IndexFieldType.string, IndexFieldOrder.ascending)]) await indexes.set_index(2, [IndexField('apartment_id', IndexFieldType.string, IndexFieldOrder.ascending)]) await indexes.set_index(3, [IndexField('created_at', IndexFieldType.number, IndexFieldOrder.ascending)]) async def save(self, registration: Registration): key, value = self._serialize(registration) entry = self._session.entry(self.PARTITION, key) await entry.set(value, reindex=True) async def get(self, person_id: UUID, apartment_id: UUID) -> Optional[Registration]: key = self._serialize_key(person_id, apartment_id) entry = await self._session.get_entry(self.PARTITION, key) return self._deserialize(key, entry.value) if entry.value else None async def get_by_person(self, person_id: UUID) -> List[Registration]: result = await self._session.partition_index(self.PARTITION).query({ 'person_id': QueryCondition(eq=str(person_id)) }) return [self._deserialize(entry.clustering, entry.value) for entry in result] async def get_by_apartment(self, apartment_id: UUID) -> List[Registration]: result = await self._session.partition_index(self.PARTITION).query({ 'apartment_id': QueryCondition(eq=str(apartment_id)) }) return [self._deserialize(entry.clustering, entry.value) for entry in result] async def get_since(self, since: datetime) -> List[Registration]: result = await self._session.partition_index(self.PARTITION).query({ 'created_at': QueryCondition(from_=since.timestamp()) }) return [self._deserialize(entry.clustering, entry.value) for entry in result] @classmethod def _serialize_key(cls, person_id: UUID, apartment_id: UUID) -> List[str]: return [str(person_id), str(apartment_id)] @classmethod def _serialize(cls, registration: Registration) -> Tuple[List[str], dict]: key = cls._serialize_key(registration.person_id, registration.apartment_id) value = { 'person_id': str(registration.person_id), 'apartment_id': str(registration.apartment_id), 'created_at': registration.created_at.timestamp() } return key, value @staticmethod def _deserialize(key: List[str], value: dict) -> Registration: return Registration( person_id=UUID(key[0]), apartment_id=UUID(key[1]), created_at=datetime.fromtimestamp(value['created_at']) )
So, now we have data classes and list models for work with data lists.
Write main method off our application¶
async def run(): # init database session = Session(port=5678) await session.login('examples', 'examples') # init persons table persons = Persons(session) await persons.drop_table() await persons.create_table() # init apartments table apartments = Apartments(session) await apartments.drop_table() await apartments.create_table() # init registrations registrations = Registrations(session) await registrations.drop_table() await registrations.create_table() # inserting persons person1 = Person(id=uuid4(), first_name='John', last_name='Smith') person2 = Person(id=uuid4(), first_name='Mary', last_name='Smith') person3 = Person(id=uuid4(), first_name='David', last_name='Brown') person4 = Person(id=uuid4(), first_name='David', last_name='Thompson') await persons.save(person1) await persons.save(person2) await persons.save(person3) await persons.save(person4) # inserting apartments apartment1 = Apartment(id=uuid4(), address='123 6th St. Melbourne, FL 32904') apartment2 = Apartment(id=uuid4(), address='4 Goldfield Rd. Honolulu, HI 96815') apartment3 = Apartment(id=uuid4(), address='70 Bowman St. South Windsor, CT 06074') apartment4 = Apartment(id=uuid4(), address='514 S. Magnolia St. Orlando, FL 32806') await apartments.save(apartment1) await apartments.save(apartment2) await apartments.save(apartment3) await apartments.save(apartment4) # inserting registrations registration1 = Registration( person_id=person1.id, apartment_id=apartment1.id, created_at=datetime(year=2015, month=4, day=21) ) registration2 = Registration( person_id=person2.id, apartment_id=apartment1.id, created_at=datetime(year=2015, month=4, day=22) ) registration3 = Registration( person_id=person3.id, apartment_id=apartment2.id, created_at=datetime(year=2016, month=8, day=14) ) registration4 = Registration( person_id=person4.id, apartment_id=apartment3.id, created_at=datetime(year=2017, month=3, day=1) ) registration5 = Registration( person_id=person4.id, apartment_id=apartment4.id, created_at=datetime(year=2017, month=11, day=25) ) await registrations.save(registration1) await registrations.save(registration2) await registrations.save(registration3) await registrations.save(registration4) await registrations.save(registration5) # query by last name = Smith result = await persons.get_by_last_name('Smith') # prints John Smith, Mary Smith print(result) # query by first name = David result = await persons.get_by_first_name('David') # prints David Thompson, David Brown print(result) # query registrations for apartment1 result = await registrations.get_by_apartment(apartment1.id) result = [await persons.get(entry.person_id) for entry in result] # prints John Smith, Mary Smith print(result) # query registrations for person4 result = await registrations.get_by_person(person4.id) result = [await apartments.get(entry.apartment_id) for entry in result] # prints # 70 Bowman St. South Windsor, CT 06074 # 514 S. Magnolia St. Orlando, FL 32806 print(result) # query registrations since year 2016 (inclusive) result = await registrations.get_since(datetime(year=2016, month=1, day=1)) # prints registrations 3, 4 and 5 print(result) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(run())
When run this, you get¶
[Person(id=197da754-6238-4fef-b56a-f2b9ad9e454f, first_name=John, last_name=Smith), Person(id=d5673e5e-9fde-4dc9-a95d-e7d83bcba191, first_name=Mary, last_name=Smith)] [Person(id=9da92b38-c9ca-43ce-88b0-7fbd7a1afa76, first_name=David, last_name=Brown), Person(id=a955ff69-9534-425f-8102-e02d26c3da8d, first_name=David, last_name=Thompson)] [Person(id=197da754-6238-4fef-b56a-f2b9ad9e454f, first_name=John, last_name=Smith), Person(id=d5673e5e-9fde-4dc9-a95d-e7d83bcba191, first_name=Mary, last_name=Smith)] [Apartment(id=07b1b9a7-4a68-49a3-95d3-4e9d740875b4, address=70 Bowman St. South Windsor, CT 06074), Apartment(id=c54732d2-f9ed-418c-b63d-95911f272cad, address=514 S. Magnolia St. Orlando, FL 32806)] [Registration(person_id=9da92b38-c9ca-43ce-88b0-7fbd7a1afa76, apartment_id=54bb2d2d-a629-4a92-89b5-75f68b05aad4, created_at=2016-08-14 00:00:00), Registration(person_id=a955ff69-9534-425f-8102-e02d26c3da8d, apartment_id=07b1b9a7-4a68-49a3-95d3-4e9d740875b4, created_at=2017-03-01 00:00:00), Registration(person_id=a955ff69-9534-425f-8102-e02d26c3da8d, apartment_id=c54732d2-f9ed-418c-b63d-95911f272cad, created_at=2017-11-25 00:00:00)]
Summary:¶
Summary
- non transactional AcapellaDB usage way are fast and simple
- you can write "json" values to AcapellaDB SortedMap
- data will be re-indexed in several indexes when it change (async)
range
can search "documents" by pk or reverse indexes- relation one-to-many are posible
- relation many-to-many must handle in user code / server code
- end user code are simple and clean
- this example use http proxy api (~ 20_000 rps). if you need more, you must to use aeron api (~10x faster)
by default example use replication factor N=3
Transational use way¶
Tip
where will be dragons...
This is the bank example. In bank account have amount of value. One account can to transfer some value to recipient.
AcapellaDB handle this operation in transactional way and write to transfers item fot history.
- repeat first 4 previuos steps
- transactional swap two items
- transactional insert
- check that sequence exists and write to it
- NOT transactional search
detailed source code - bank: "Money transfer between accounts"¶
Data classes
mport asyncio from decimal import Decimal from typing import Optional, List, Tuple from uuid import UUID, uuid1, uuid4 from acapella.kv import Session, Transaction class Account: def __init__(self, id: UUID, balance: Decimal): self.id = id self.balance = balance def __repr__(self): return f'Account(id={self.id}, balance={self.balance})' class Transfer: def __init__(self, timestamp: UUID, id_from: UUID, id_to: UUID, amount: Decimal): self.timestamp = timestamp self.id_from = id_from self.id_to = id_to self.amount = amount def __repr__(self): return f'Transfer(timestamp={self.timestamp}, id_from={self.id_from}, id_to={self.id_to}, amount={self.amount})'
List model callses:
class Accounts: PARTITION = ['examples', 'accounts'] def __init__(self, session: Session): self._session = session async def drop_table(self): result = await self._session.range(self.PARTITION) batch = self._session.batch_manual() for entry in result: entry.set(None, reindex=True, batch=batch) await batch.send() async def create_table(self): pass async def save(self, tx: Transaction, account: Account): key, value = self._serialize(account) entry = tx.entry(self.PARTITION, key) await entry.set(value, reindex=True) async def get(self, tx: Transaction, id: UUID) -> Optional[Account]: key = self._serialize_key(id) entry = await tx.get_entry(self.PARTITION, key, watch=True) return self._deserialize(key, entry.value) if entry.value else None @classmethod def _serialize_key(cls, id: UUID) -> List[str]: return [str(id)] @classmethod def _serialize(cls, account: Account) -> Tuple[List[str], dict]: key = cls._serialize_key(account.id) value = { 'balance': str(account.balance) } return key, value @staticmethod def _deserialize(key: List[str], value: dict) -> Account: return Account( id=UUID(key[0]), balance=Decimal(value['balance']) ) class Transfers: PARTITION = ['examples', 'transfers'] def __init__(self, session: Session): self._session = session async def drop_table(self): result = await self._session.range(self.PARTITION) batch = self._session.batch_manual() for entry in result: entry.set(None, reindex=True, batch=batch) await batch.send() async def create_table(self): pass async def save(self, tx: Transaction, transfer: Transfer): key, value = self._serialize(transfer) entry = tx.entry(self.PARTITION, key) await entry.set(value, reindex=True) async def get(self, tx: Transaction, timestamp: UUID) -> Optional[Transfer]: key = self._serialize_key(timestamp) entry = await tx.get_entry(self.PARTITION, key, watch=True) return self._deserialize(key, entry.value) if entry.value else None async def get_all(self) -> List[Transfer]: result = await self._session.range(self.PARTITION) return [self._deserialize(e.clustering, e.value) for e in result if e.value is not None] @classmethod def _serialize_key(cls, timestamp: UUID) -> List[str]: return [str(timestamp)] @classmethod def _serialize(cls, transfer: Transfer) -> Tuple[List[str], dict]: key = cls._serialize_key(transfer.timestamp) value = { 'id_from': str(transfer.id_from), 'id_to': str(transfer.id_to), 'amount': str(transfer.amount) } return key, value @staticmethod def _deserialize(key: List[str], value: dict) -> Transfer: return Transfer( timestamp=UUID(key[0]), id_from=UUID(value['id_from']), id_to=UUID(value['id_to']), amount=Decimal(value['amount']) )
Container class like "Database Context" :
class Model: def __init__(self, session: Session): self.session = session self.accounts = Accounts(session) self.transfers = Transfers(session) async def drop_all(self): await self.accounts.drop_table() await self.transfers.drop_table() async def create_all(self): await self.accounts.create_table() await self.transfers.create_table()
"transaction handler":
class Transactor: def __init__(self, model: Model): self._model = model async def transaction(self, id_from: UUID, id_to: UUID, amount: Decimal): async with self._model.session.transaction() as tx: timestamp = uuid1() from_account = await self._model.accounts.get(tx, id_from) assert from_account.balance >= amount, "Source account must have required amount to transfer" from_account.balance -= amount await self._model.accounts.save(tx, from_account) to_account = await self._model.accounts.get(tx, id_to) to_account.balance += amount await self._model.accounts.save(tx, to_account) transfer = Transfer(timestamp, id_from, id_to, amount) await self._model.transfers.save(tx, transfer)
main code for this example:¶
async def run(): # init session session = Session(port=5678) await session.login('examples', 'examples') # init model model = Model(session) transactor = Transactor(model) await model.drop_all() await model.create_all() # inserting accounts account1 = Account(uuid4(), Decimal(100.0)) account2 = Account(uuid4(), Decimal(10.0)) async with session.transaction() as tx: await model.accounts.save(tx, account1) await model.accounts.save(tx, account2) # trying to perform bad transfer try: await transactor.transaction(account2.id, account1.id, Decimal(50.0)) except AssertionError as e: print(e) # performing good transfers await transactor.transaction(account1.id, account2.id, Decimal(60.0)) await transactor.transaction(account2.id, account1.id, Decimal(40.0)) # now # account1 have balance = 80.0 # account2 have balance = 30.0 # transfers has two events: # 1 -> 60.0 -> 2 # 2 -> 40.0 -> 1 async with session.transaction() as tx: print('account 1', await model.accounts.get(tx, account1.id)) print('account 2', await model.accounts.get(tx, account2.id)) print('transfers', await model.transfers.get_all()) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(run())
when run, you get:¶
Source account must have required amount to transfer account 1 Account(id=040cc0d7-0003-4a9b-ac45-31e9d9b91274, balance=80) account 2 Account(id=bcd79bdb-8fe1-4eab-89e3-3350374124fb, balance=30) transfers [Transfer(timestamp=edb410cc-e0ca-11e7-aee6-40e230b5623b, id_from=040cc0d7-0003-4a9b-ac45-31e9d9b91274, id_to=bcd79bdb-8fe1-4eab-89e3-3350374124fb, amount=60), Transfer(timestamp=edc1d810-e0ca-11e7-aee6-40e230b5623b, id_from=bcd79bdb-8fe1-4eab-89e3-3350374124fb, id_to=040cc0d7-0003-4a9b-ac45-31e9d9b91274, amount=40)]