Examples

Basic Appends

"""
Simple tool for writing to hstreamdb, demonstrating usage of the append API.
"""
import asyncio
import json
from hstreamdb import insecure_client


async def create_stream_if_not_exist(client, name):
    ss = await client.list_streams()
    if name not in {s.name for s in ss}:
        await client.create_stream(name)


async def appends_simple(client, stream_name):
    print("-> Append raw msg...")
    await client.append(stream_name, ["binmsg", "binmsg"])
    print("=> Done")

    print("-> Append hrecord msg...")
    await client.append(stream_name, [{"msg": "hello"}])
    print("=> Done")

    print("-> Append msg with gzip compression...")
    await client.append(stream_name, ["binmsg", "binmsg"], compresstype="gzip")
    print("=> Done")


async def appends_repl(client, stream_name):
    print(
        "You can input a string message or a json message.\n"
        "-----------------------------\n"
        "For example:\n"
        "input> raw_msg\n"
        "input> :gzip raw_msg\n"
        'input> {"msg": "hello, world"}\n'
        "-----------------------------"
    )
    while True:
        r = input("input> ")
        compresstype = None
        if r.startswith(":gzip "):
            r = r.split(maxsplit=1)[1]
            compresstype = "gzip"
        try:
            payload = json.loads(r)
        except json.decoder.JSONDecodeError:
            payload = r
        await client.append(stream_name, [payload], compresstype=compresstype)


async def main(host, port, stream_name, simple=False):
    async with await insecure_client(host=host, port=port) as client:
        await create_stream_if_not_exist(client, stream_name)
        if simple:
            await appends_simple(client, stream_name)
        else:
            await appends_repl(client, stream_name)


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Append Example")
    parser.add_argument(
        "--host", type=str, help="server host", default="127.0.0.1"
    )
    parser.add_argument("--port", type=int, help="server port", default=6570)
    parser.add_argument(
        "--stream-name",
        type=str,
        help="name of the stream, default is 'test_stream'",
        default="test_stream",
    )
    parser.add_argument(
        "--simple",
        help="run simple appends",
        default=False,
        action="store_true",
    )

    args = parser.parse_args()
    asyncio.run(
        main(args.host, args.port, args.stream_name, simple=args.simple)
    )

Buffered Appends

import asyncio
from hstreamdb import insecure_client, BufferedProducer


async def create_stream_if_not_exist(client, name):
    ss = await client.list_streams()
    if name not in {s.name for s in ss}:
        await client.create_stream(name, 1)


class AppendCallback(BufferedProducer.AppendCallback):
    def on_success(self, stream_name, payloads, stream_keyid: int):
        print(f"Append success with {len(payloads)} batches.")

    def on_fail(self, stream_name, payloads, stream_keyid, e):
        print("Append failed!")
        print(e)


async def buffered_appends(client, stream_name):
    p = client.new_producer(
        append_callback=AppendCallback(),
        size_trigger=10240,
        time_trigger=0.5,
        retry_count=2,
    )

    for i in range(50):
        await p.append(stream_name, "x")

    await asyncio.sleep(1)

    for i in range(50):
        await p.append(stream_name, "x")

    await p.wait_and_close()


async def buffered_appends_with_compress(client, stream_name):
    p = client.new_producer(
        append_callback=AppendCallback(),
        size_trigger=10240,
        time_trigger=0.5,
        retry_count=2,
        compresstype="gzip",
        compresslevel=9,
    )
    for i in range(50):
        await p.append(stream_name, "x")

    await asyncio.sleep(1)

    for i in range(50):
        await p.append(stream_name, "x")

    await p.wait_and_close()


async def main(host, port, stream_name):
    async with await insecure_client(host, port) as client:
        await create_stream_if_not_exist(client, stream_name)

        print("-> BufferedProducer")
        await buffered_appends(client, stream_name)

        print("-> BufferedProducer with compression")
        await buffered_appends_with_compress(client, stream_name)


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="BufferedProducer Example")
    parser.add_argument(
        "--host", type=str, help="server host", default="127.0.0.1"
    )
    parser.add_argument("--port", type=int, help="server port", default=6570)
    parser.add_argument(
        "--stream-name",
        type=str,
        help="name of the stream, default is 'test_stream'",
        default="test_stream",
    )

    args = parser.parse_args()
    asyncio.run(main(args.host, args.port, args.stream_name))

Consumer

"""
Simple tool for reading from hstreamdb, demonstrating usage of the subscription API.
"""
import asyncio
from hstreamdb import insecure_client


async def create_subscription_if_not_exist(client, subscription, stream_name):
    ss = await client.list_subscriptions()
    if subscription not in {s.subscription_id for s in ss}:
        await client.create_subscription(
            subscription, stream_name, ack_timeout=600, max_unacks=10000
        )


class Processing:
    count = 0
    max_count: int

    def __init__(self, max_count):
        self.max_count = max_count

    async def __call__(self, ack_fun, stop_fun, rs_iter):
        rs = list(rs_iter)
        for r in rs:
            self.count += 1
            print(f"[{self.count}] Receive: {r}")
            if self.max_count > 0 and self.count >= self.max_count:
                await stop_fun()
                break

        await ack_fun(r.id for r in rs)


async def main(host, port, subid, stream_name, count):
    async with await insecure_client(host, port) as client:
        await create_subscription_if_not_exist(client, subid, stream_name)
        consumer = client.new_consumer(
            "test_consumer", subid, Processing(count)
        )
        await consumer.start()


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Consumer Example")
    parser.add_argument(
        "--host", type=str, help="server host", default="127.0.0.1"
    )
    parser.add_argument("--port", type=int, help="server port", default=6570)
    parser.add_argument(
        "--stream-name",
        type=str,
        help="name of the stream, default is 'test_stream'",
        default="test_stream",
    )
    parser.add_argument(
        "--subscription",
        type=str,
        help="id of the subscription, default is 'test_subscription'",
        default="test_subscription",
    )
    parser.add_argument(
        "--count",
        type=int,
        help="total messages to read, negative means infinite, default is -1",
        default=-1,
    )

    args = parser.parse_args()
    asyncio.run(
        main(
            args.host,
            args.port,
            args.subscription,
            args.stream_name,
            args.count,
        )
    )
"""
Simple tool for reading from hstreamdb, demonstrating usage of the read API.
"""
import asyncio
from hstreamdb import insecure_client, ShardOffset, SpecialOffset


async def main(host, port, stream_name, reader_id, max_records):
    async with await insecure_client(host, port) as client:
        offset = ShardOffset()
        offset.specialOffset = SpecialOffset.EARLIEST
        async with client.with_reader(
            stream_name, reader_id, offset, 1000
        ) as reader:
            records = await reader.read(max_records)
            for i, r in enumerate(records):
                print(f"[{i}] payload: {r.payload}")


if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Consumer Example")
    parser.add_argument(
        "--host", type=str, help="server host", default="127.0.0.1"
    )
    parser.add_argument("--port", type=int, help="server port", default=6570)
    parser.add_argument(
        "--stream-name",
        type=str,
        help="name of the stream, default is 'test_stream'",
        default="test_stream",
    )
    parser.add_argument(
        "--reader-id",
        type=str,
        help="id of the reader, default is 'test_reader'",
        default="test_reader",
    )
    parser.add_argument(
        "--max-records",
        type=int,
        help="max records to read each time, default is 10",
        default=10,
    )

    args = parser.parse_args()
    asyncio.run(
        main(
            args.host,
            args.port,
            args.stream_name,
            args.reader_id,
            args.max_records,
        )
    )