# Transactions
GetAll
, SetAll
and ExecAll
are the foundation of transactions at key value level in immudb. They allow the execution of a group of commands in a single step, with two important guarantees:
- All the commands in a transaction are serialized and executed sequentially. No request issued by another client can ever interrupt the execution of a transaction. This guarantees that the commands are executed as a single isolated operation.
- Either all of the commands are processed, or none are, so the transaction is also atomic.
package main
import (
"context"
"log"
immudb "github.com/codenotary/immudb/pkg/client"
)
func main() {
opts := immudb.DefaultOptions().
WithAddress("localhost").
WithPort(3322)
client := immudb.NewClient().WithOptions(opts)
err := client.OpenSession(
context.TODO(),
[]byte(`immudb`),
[]byte(`immudb`),
"defaultdb",
)
if err != nil {
log.Fatal(err)
}
defer client.CloseSession(context.TODO())
_, err = client.Set(context.TODO(), []byte(`key1`), []byte(`val1`))
if err != nil {
log.Fatal(err)
}
_, err = client.Set(context.TODO(), []byte(`key2`), []byte(`val2`))
if err != nil {
log.Fatal(err)
}
itList, err := client.GetAll(context.TODO(), [][]byte{
[]byte("key1"),
[]byte("key2"),
[]byte("key3"), // does not exist, no value returned
})
if err != nil {
log.Fatal(err)
}
log.Printf("Set: tx: %+v", itList)
}
from immudb import ImmudbClient
URL = "localhost:3322" # immudb running on your machine
LOGIN = "immudb" # Default username
PASSWORD = "immudb" # Default password
DB = b"defaultdb" # Default database name (must be in bytes)
def main():
client = ImmudbClient(URL)
client.login(LOGIN, PASSWORD, database = DB)
client.set(b'key1', b'value1')
client.set(b'key2', b'value2')
client.set(b'key3', b'value3')
response = client.getAll([b'key1', b'key2', b'key3'])
print(response) # The same as dictToSetGet, retrieved in one step
if __name__ == "__main__":
main()
package io.codenotary.immudb.helloworld;
import java.util.Arrays;
import java.util.List;
import io.codenotary.immudb4j.Entry;
import io.codenotary.immudb4j.FileImmuStateHolder;
import io.codenotary.immudb4j.ImmuClient;
public class App {
public static void main(String[] args) {
ImmuClient client = null;
try {
FileImmuStateHolder stateHolder = FileImmuStateHolder.newBuilder()
.withStatesFolder("./immudb_states")
.build();
client = ImmuClient.newBuilder()
.withServerUrl("127.0.0.1")
.withServerPort(3322)
.withStateHolder(stateHolder)
.build();
client.openSession("defaultdb", "immudb", "immudb");
byte[] value1 = { 0, 1, 2, 3 };
byte[] value2 = { 4, 5, 6, 7 };
client.set("key1", value1);
client.set("key2", value2);
List<String> keys = Arrays.asList("key1", "key2");
List<Entry> entries = client.getAll(keys);
for (Entry entry : entries) {
System.out.format("('%s', '%s')\n", new String(entry.getKey()), Arrays.toString(entry.getValue()));
}
client.closeSession();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (client != null) {
try {
client.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
import ImmudbClient from 'immudb-node'
import Parameters from 'immudb-node/types/parameters'
const IMMUDB_HOST = '127.0.0.1'
const IMMUDB_PORT = '3322'
const IMMUDB_USER = 'immudb'
const IMMUDB_PWD = 'immudb'
const cl = new ImmudbClient({ host: IMMUDB_HOST, port: IMMUDB_PORT });
(async () => {
await cl.login({ user: IMMUDB_USER, password: IMMUDB_PWD })
const getAllReq: Parameters.GetAll = {
keysList: ['key1', 'key2', 'key3'],
sincetx: 0
}
const getAllRes = await cl.getAll(getAllReq)
console.log('success: getAll', getAllRes)
})()
If you're using another development language, please refer to the immugw option.
# SetAll
A more versatile atomic multi set operation
package main
import (
"context"
"log"
"github.com/codenotary/immudb/pkg/api/schema"
immudb "github.com/codenotary/immudb/pkg/client"
)
func main() {
opts := immudb.DefaultOptions().
WithAddress("localhost").
WithPort(3322)
client := immudb.NewClient().WithOptions(opts)
err := client.OpenSession(
context.TODO(),
[]byte(`immudb`),
[]byte(`immudb`),
"defaultdb",
)
if err != nil {
log.Fatal(err)
}
defer client.CloseSession(context.TODO())
tx, err := client.SetAll(context.TODO(), &schema.SetRequest{
KVs: []*schema.KeyValue{
{Key: []byte(`1`), Value: []byte(`key1`)},
{Key: []byte(`2`), Value: []byte(`key2`)},
{Key: []byte(`3`), Value: []byte(`key3`)},
},
})
if err != nil {
log.Fatal(err)
}
log.Printf("SetAll: tx: %d", tx.Id)
}
from immudb import ImmudbClient
URL = "localhost:3322" # immudb running on your machine
LOGIN = "immudb" # Default username
PASSWORD = "immudb" # Default password
DB = b"defaultdb" # Default database name (must be in bytes)
def main():
client = ImmudbClient(URL)
client.login(LOGIN, PASSWORD, database = DB)
dictToSetGet = {
b'key1': b'value1',
b'key2': b'value2',
b'key3': b'value3'
}
response = client.setAll(dictToSetGet)
print(response.id) # All in one transaction
response = client.getAll([b'key1', b'key2', b'key3'])
print(response) # The same as dictToSetGet, retrieved in one step
if __name__ == "__main__":
main()
package io.codenotary.immudb.helloworld;
import io.codenotary.immudb4j.FileImmuStateHolder;
import io.codenotary.immudb4j.ImmuClient;
import io.codenotary.immudb4j.KVListBuilder;
public class App {
public static void main(String[] args) {
ImmuClient client = null;
try {
FileImmuStateHolder stateHolder = FileImmuStateHolder.newBuilder()
.withStatesFolder("./immudb_states")
.build();
client = ImmuClient.newBuilder()
.withServerUrl("127.0.0.1")
.withServerPort(3322)
.withStateHolder(stateHolder)
.build();
client.openSession("defaultdb", "immudb", "immudb");
byte[] value1 = { 0, 1, 2, 3 };
byte[] value2 = { 4, 5, 6, 7 };
KVListBuilder kvListBuilder = KVListBuilder.newBuilder().
add("key1", value1).
add("key2", value2);
client.setAll(kvListBuilder.entries());
client.closeSession();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (client != null) {
try {
client.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
import ImmudbClient from 'immudb-node'
import Parameters from 'immudb-node/types/parameters'
const IMMUDB_HOST = '127.0.0.1'
const IMMUDB_PORT = '3322'
const IMMUDB_USER = 'immudb'
const IMMUDB_PWD = 'immudb'
const cl = new ImmudbClient({ host: IMMUDB_HOST, port: IMMUDB_PORT });
(async () => {
await cl.login({ user: IMMUDB_USER, password: IMMUDB_PWD })
const setAllReq: Parameters.SetAll = {
kvsList: [
{ key: '1,2,3', value: '3,2,1' },
{ key: '4,5,6', value: '6,5,4' },
]
}
const setAllRes = await cl.setAll(setAllReq)
console.log('success: setAll', setAllRes)
})()
If you're using another development language, please refer to the immugw option.
# ExecAll
ExecAll
allows multiple insertions at once. The difference is that it is possible to specify a list of mixes of key/value sets, references and zAdd
insertions.
The argument of a ExecAll
is an array of the following types:
It's possible to persist and reference items that are already persisted on disk. In that case is mandatory to provide the index of the referenced item. This has to be done for:
Op_ZAdd
Op_Ref
IfzAdd
orreference
is not yet persisted on disk it's possible to add it as a regular key value and the reference is done only. In that case ifBoundRef
is true the reference is bounded to the current transaction values.
package main
import (
"context"
"encoding/json"
"log"
"github.com/codenotary/immudb/pkg/api/schema"
immudb "github.com/codenotary/immudb/pkg/client"
)
func main() {
opts := immudb.DefaultOptions().
WithAddress("localhost").
WithPort(3322)
client := immudb.NewClient().WithOptions(opts)
err := client.OpenSession(
context.TODO(),
[]byte(`immudb`),
[]byte(`immudb`),
"defaultdb",
)
if err != nil {
log.Fatal(err)
}
defer client.CloseSession(context.TODO())
idx, err := client.Set(
context.TODO(),
[]byte(`persistedKey`),
[]byte(`persistedVal`),
)
if err != nil {
log.Fatal(err)
}
aOps := &schema.ExecAllRequest{
Operations: []*schema.Op{
{
Operation: &schema.Op_Kv{
Kv: &schema.KeyValue{
Key: []byte(`notPersistedKey`),
Value: []byte(`notPersistedVal`),
},
},
},
{
Operation: &schema.Op_ZAdd{
ZAdd: &schema.ZAddRequest{
Set: []byte(`mySet`),
Score: 0.4,
Key: []byte(`notPersistedKey`)},
},
},
{
Operation: &schema.Op_ZAdd{
ZAdd: &schema.ZAddRequest{
Set: []byte(`mySet`),
Score: 0.6,
Key: []byte(`persistedKey`),
AtTx: idx.Id,
BoundRef: true,
},
},
},
},
}
idx, err = client.ExecAll(context.TODO(), aOps)
if err != nil {
log.Fatal(err)
}
list, err := client.ZScan(context.TODO(), &schema.ZScanRequest{
Set: []byte(`mySet`),
SinceTx: idx.Id,
NoWait: true,
})
if err != nil {
log.Fatal(err)
}
s, _ := json.MarshalIndent(list, "", "\t")
log.Print(string(s))
}
from immudb import ImmudbClient
from immudb.datatypes import KeyValue, ZAddRequest, ReferenceRequest
URL = "localhost:3322" # immudb running on your machine
LOGIN = "immudb" # Default username
PASSWORD = "immudb" # Default password
DB = b"defaultdb" # Default database name (must be in bytes)
def main():
client = ImmudbClient(URL)
client.login(LOGIN, PASSWORD, database = DB)
toExecute = [
KeyValue(b'key', b'value'),
ZAddRequest(b'testscore', 100, b'key'),
KeyValue(b'key2', b'value2'),
ZAddRequest(b'testscore', 150, b'key2'),
ReferenceRequest(b'reference1', b'key')
]
info = client.execAll(toExecute)
print(info.id) # All in one transaction
print(client.zScan(b'testscore', b'', 0, 0, True, 10, True, 0, 200)) # Shows these entries
print(client.get(b'reference1'))
if __name__ == "__main__":
main()
This feature is not yet supported or not documented. Do you want to make a feature request or help out? Open an issue on Java sdk github project (opens new window)
import ImmudbClient from 'immudb-node'
import Parameters from 'immudb-node/types/parameters'
const IMMUDB_HOST = '127.0.0.1'
const IMMUDB_PORT = '3322'
const IMMUDB_USER = 'immudb'
const IMMUDB_PWD = 'immudb'
const cl = new ImmudbClient({ host: IMMUDB_HOST, port: IMMUDB_PORT });
(async () => {
await cl.login({ user: IMMUDB_USER, password: IMMUDB_PWD })
const { id } = await cl.set({ key: 'persistedKey', value: 'persistedVal' })
const setOperation = { kv: { key: 'notPersistedKey', value: 'notPersistedVal' } }
const zAddOperation = {
zadd: {
set: 'mySet',
score: 0.6,
key: 'notPersistedKey',
attx: 0,
boundref: true
}
}
const zAddOperation1 = {
zadd: {
set: 'mySet',
score: 0.6,
key: 'persistedKey',
attx: id,
boundref: true
}
}
const execAllReq: Parameters.ExecAll = {
operationsList: [
setOperation,
zAddOperation,
zAddOperation1,
]
}
const execAllRes = await cl.execAll(execAllReq)
console.log('success: execAll', execAllRes)
})()
If you're using another development language, please refer to the immugw option.
# TxScan
TxScan
permits iterating over transactions.
The argument of a TxScan
is an array of the following types:
InitialTx
: initial transaction idLimit
: number of transactions returnedDesc
: order of returned transacations
package main
import (
"context"
"log"
"github.com/codenotary/immudb/pkg/api/schema"
immudb "github.com/codenotary/immudb/pkg/client"
)
func main() {
opts := immudb.DefaultOptions().
WithAddress("localhost").
WithPort(3322)
client := immudb.NewClient().WithOptions(opts)
err := client.OpenSession(
context.TODO(),
[]byte(`immudb`),
[]byte(`immudb`),
"defaultdb",
)
if err != nil {
log.Fatal(err)
}
defer client.CloseSession(context.TODO())
tx, err := client.Set(
context.TODO(),
[]byte("key1"),
[]byte("val1"),
)
if err != nil {
log.Fatal(err)
}
_, err = client.Set(
context.TODO(),
[]byte("key2"),
[]byte("val2"),
)
if err != nil {
log.Fatal(err)
}
_, err = client.Set(
context.TODO(),
[]byte("key3"),
[]byte("val3"),
)
if err != nil {
log.Fatal(err)
}
txs, err := client.TxScan(context.TODO(), &schema.TxScanRequest{
InitialTx: tx.Id,
Limit: 3,
Desc: true,
})
if err != nil {
log.Fatal(err)
}
// Then it's possible to retrieve entries of every transactions:
for _, tx := range txs.GetTxs() {
for _, entry := range tx.Entries {
item, err := client.GetAt(
context.TODO(),
entry.Key[1:],
tx.Header.Id,
)
if err != nil {
item, err = client.GetAt(
context.TODO(),
entry.Key,
tx.Header.Id,
)
if err != nil {
log.Fatal(err)
}
}
log.Printf("retrieved key %s and val %s\n", item.Key, item.Value)
}
}
}
Remember to strip the first byte in the key (key prefix). Remember that a transaction could contain sorted sets keys that should not be skipped.
This feature is not yet supported or not documented. Do you want to make a feature request or help out? Open an issue on Python sdk github project (opens new window)
package io.codenotary.immudb.helloworld;
import java.util.List;
import io.codenotary.immudb4j.FileImmuStateHolder;
import io.codenotary.immudb4j.ImmuClient;
import io.codenotary.immudb4j.Tx;
import io.codenotary.immudb4j.TxHeader;
public class App {
public static void main(String[] args) {
ImmuClient client = null;
try {
FileImmuStateHolder stateHolder = FileImmuStateHolder.newBuilder()
.withStatesFolder("./immudb_states")
.build();
client = ImmuClient.newBuilder()
.withServerUrl("127.0.0.1")
.withServerPort(3322)
.withStateHolder(stateHolder)
.build();
client.openSession("defaultdb", "immudb", "immudb");
byte[] value1 = { 0, 1, 2, 3 };
byte[] value2 = { 4, 5, 6, 7 };
TxHeader hdr = client.set("key1", value1);
client.set("key2", value2);
List<Tx> txs = client.txScanAll(hdr.getId());
for (Tx tx : txs) {
System.out.format("tx '%d'\n", tx.getHeader().getId());
}
client.closeSession();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (client != null) {
try {
client.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
import ImmudbClient from 'immudb-node'
import Parameters from 'immudb-node/types/parameters'
const IMMUDB_HOST = '127.0.0.1'
const IMMUDB_PORT = '3322'
const IMMUDB_USER = 'immudb'
const IMMUDB_PWD = 'immudb'
const cl = new ImmudbClient({ host: IMMUDB_HOST, port: IMMUDB_PORT });
(async () => {
await cl.login({ user: IMMUDB_USER, password: IMMUDB_PWD })
for (let i = 0; i < 3; i++) {
await cl.set({ key: `key${i}`, value: `val${i}` })
}
const txScanReq: Parameters.TxScan = {
initialtx: 2,
limit: 3,
desc: false
}
const txScanRes = await cl.txScan(txScanReq)
console.log('success: txScan', txScanRes)
})()
If you're using another development language, please refer to the immugw option.
# Filter Transactions
The transaction entries are generated by writing key-value pairs, referencing keys, associating scores to key-value pairs (with ZAdd
operation), and by mapping SQL data model into key-value model.
With TxScan
or TxByIDWithSpec
operations it's possible to retrieve entries of certain types, either retrieving the digest of the value assigned to the key (EntryTypeAction_ONLY_DIGEST
), the raw value (EntryTypeAction_RAW_VALUE
) or the structured value (EntryTypeAction_RESOLVE
).
package main
import (
"context"
"log"
"github.com/codenotary/immudb/pkg/api/schema"
immudb "github.com/codenotary/immudb/pkg/client"
)
func main() {
opts := immudb.DefaultOptions().
WithAddress("localhost").
WithPort(3322)
client := immudb.NewClient().WithOptions(opts)
err := client.OpenSession(
context.TODO(),
[]byte(`immudb`),
[]byte(`immudb`),
"defaultdb",
)
if err != nil {
log.Fatal(err)
}
defer client.CloseSession(context.TODO())
hdr, err := client.ExecAll(
context.TODO(),
&schema.ExecAllRequest{
Operations: []*schema.Op{
{
Operation: &schema.Op_Kv{
Kv: &schema.KeyValue{
Key: []byte("key1"),
Value: []byte("value1"),
},
},
},
{
Operation: &schema.Op_Ref{
Ref: &schema.ReferenceRequest{
Key: []byte("ref1"),
ReferencedKey: []byte("key1"),
},
},
},
{
Operation: &schema.Op_ZAdd{
ZAdd: &schema.ZAddRequest{
Set: []byte("set1"),
Score: 10,
Key: []byte("key1"),
},
},
},
},
},
)
if err != nil {
log.Fatal(err)
}
// fetch kv and sorted-set entries as structured values
// while skipping sql-related entries
tx, err := client.TxByIDWithSpec(
context.TODO(),
&schema.TxRequest{
Tx: hdr.Id,
EntriesSpec: &schema.EntriesSpec{
KvEntriesSpec: &schema.EntryTypeSpec{
Action: schema.EntryTypeAction_RESOLVE,
},
ZEntriesSpec: &schema.EntryTypeSpec{
Action: schema.EntryTypeAction_RESOLVE,
},
// explicit exclusion is optional
SqlEntriesSpec: &schema.EntryTypeSpec{
// resolution of sql entries is not supported
Action: schema.EntryTypeAction_EXCLUDE,
},
},
},
)
if err != nil {
log.Fatal(err)
}
for _, entry := range tx.KvEntries {
log.Printf(
"retrieved key %s and val %s",
entry.Key,
entry.Value,
)
}
for _, entry := range tx.ZEntries {
log.Printf(
"retrieved set %s key %s and score %v",
entry.Set,
entry.Key,
entry.Score,
)
}
// scan over unresolved entries
// either EntryTypeAction_ONLY_DIGEST or
// EntryTypeAction_RAW_VALUE options
for _, entry := range tx.Entries {
log.Printf(
"retrieved key %s and digest %v",
entry.Key,
entry.HValue,
)
}
}
This feature is not yet supported or not documented. Do you want to make a feature request or help out? Open an issue on Python sdk github project (opens new window)
This feature is not yet supported or not documented. Do you want to make a feature request or help out? Open an issue on Java sdk github project (opens new window)
This feature is not yet supported or not documented. Do you want to make a feature request or help out? Open an issue on Node.js sdk github project (opens new window)
If you're using another development language, please refer to the immugw option.