Examples¶
Basic Appends¶
"""
Simple tool for writing to hstreamdb, demonstrating usage of the append API.
"""
import asyncio
import json
from hstreamdb import insecure_client
async def create_stream_if_not_exist(client, name):
ss = await client.list_streams()
if name not in {s.name for s in ss}:
await client.create_stream(name)
async def appends_simple(client, stream_name):
print("-> Append raw msg...")
await client.append(stream_name, ["binmsg", "binmsg"])
print("=> Done")
print("-> Append hrecord msg...")
await client.append(stream_name, [{"msg": "hello"}])
print("=> Done")
print("-> Append msg with gzip compression...")
await client.append(stream_name, ["binmsg", "binmsg"], compresstype="gzip")
print("=> Done")
async def appends_repl(client, stream_name):
print(
"You can input a string message or a json message.\n"
"-----------------------------\n"
"For example:\n"
"input> raw_msg\n"
"input> :gzip raw_msg\n"
'input> {"msg": "hello, world"}\n'
"-----------------------------"
)
while True:
r = input("input> ")
compresstype = None
if r.startswith(":gzip "):
r = r.split(maxsplit=1)[1]
compresstype = "gzip"
try:
payload = json.loads(r)
except json.decoder.JSONDecodeError:
payload = r
await client.append(stream_name, [payload], compresstype=compresstype)
async def main(host, port, stream_name, simple=False):
async with await insecure_client(host=host, port=port) as client:
await create_stream_if_not_exist(client, stream_name)
if simple:
await appends_simple(client, stream_name)
else:
await appends_repl(client, stream_name)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Append Example")
parser.add_argument(
"--host", type=str, help="server host", default="127.0.0.1"
)
parser.add_argument("--port", type=int, help="server port", default=6570)
parser.add_argument(
"--stream-name",
type=str,
help="name of the stream, default is 'test_stream'",
default="test_stream",
)
parser.add_argument(
"--simple",
help="run simple appends",
default=False,
action="store_true",
)
args = parser.parse_args()
asyncio.run(
main(args.host, args.port, args.stream_name, simple=args.simple)
)
Buffered Appends¶
import asyncio
from hstreamdb import insecure_client, BufferedProducer
async def create_stream_if_not_exist(client, name):
ss = await client.list_streams()
if name not in {s.name for s in ss}:
await client.create_stream(name, 1)
class AppendCallback(BufferedProducer.AppendCallback):
def on_success(self, stream_name, payloads, stream_keyid: int):
print(f"Append success with {len(payloads)} batches.")
def on_fail(self, stream_name, payloads, stream_keyid, e):
print("Append failed!")
print(e)
async def buffered_appends(client, stream_name):
p = client.new_producer(
append_callback=AppendCallback(),
size_trigger=10240,
time_trigger=0.5,
retry_count=2,
)
for i in range(50):
await p.append(stream_name, "x")
await asyncio.sleep(1)
for i in range(50):
await p.append(stream_name, "x")
await p.wait_and_close()
async def buffered_appends_with_compress(client, stream_name):
p = client.new_producer(
append_callback=AppendCallback(),
size_trigger=10240,
time_trigger=0.5,
retry_count=2,
compresstype="gzip",
compresslevel=9,
)
for i in range(50):
await p.append(stream_name, "x")
await asyncio.sleep(1)
for i in range(50):
await p.append(stream_name, "x")
await p.wait_and_close()
async def main(host, port, stream_name):
async with await insecure_client(host, port) as client:
await create_stream_if_not_exist(client, stream_name)
print("-> BufferedProducer")
await buffered_appends(client, stream_name)
print("-> BufferedProducer with compression")
await buffered_appends_with_compress(client, stream_name)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="BufferedProducer Example")
parser.add_argument(
"--host", type=str, help="server host", default="127.0.0.1"
)
parser.add_argument("--port", type=int, help="server port", default=6570)
parser.add_argument(
"--stream-name",
type=str,
help="name of the stream, default is 'test_stream'",
default="test_stream",
)
args = parser.parse_args()
asyncio.run(main(args.host, args.port, args.stream_name))
Consumer¶
"""
Simple tool for reading from hstreamdb, demonstrating usage of the subscription API.
"""
import asyncio
from hstreamdb import insecure_client
async def create_subscription_if_not_exist(client, subscription, stream_name):
ss = await client.list_subscriptions()
if subscription not in {s.subscription_id for s in ss}:
await client.create_subscription(
subscription, stream_name, ack_timeout=600, max_unacks=10000
)
class Processing:
count = 0
max_count: int
def __init__(self, max_count):
self.max_count = max_count
async def __call__(self, ack_fun, stop_fun, rs_iter):
rs = list(rs_iter)
for r in rs:
self.count += 1
print(f"[{self.count}] Receive: {r}")
if self.max_count > 0 and self.count >= self.max_count:
await stop_fun()
break
await ack_fun(r.id for r in rs)
async def main(host, port, subid, stream_name, count):
async with await insecure_client(host, port) as client:
await create_subscription_if_not_exist(client, subid, stream_name)
consumer = client.new_consumer(
"test_consumer", subid, Processing(count)
)
await consumer.start()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Consumer Example")
parser.add_argument(
"--host", type=str, help="server host", default="127.0.0.1"
)
parser.add_argument("--port", type=int, help="server port", default=6570)
parser.add_argument(
"--stream-name",
type=str,
help="name of the stream, default is 'test_stream'",
default="test_stream",
)
parser.add_argument(
"--subscription",
type=str,
help="id of the subscription, default is 'test_subscription'",
default="test_subscription",
)
parser.add_argument(
"--count",
type=int,
help="total messages to read, negative means infinite, default is -1",
default=-1,
)
args = parser.parse_args()
asyncio.run(
main(
args.host,
args.port,
args.subscription,
args.stream_name,
args.count,
)
)
"""
Simple tool for reading from hstreamdb, demonstrating usage of the read API.
"""
import asyncio
from hstreamdb import insecure_client, ShardOffset, SpecialOffset
async def main(host, port, stream_name, reader_id, max_records):
async with await insecure_client(host, port) as client:
offset = ShardOffset()
offset.specialOffset = SpecialOffset.EARLIEST
async with client.with_reader(
stream_name, reader_id, offset, 1000
) as reader:
records = await reader.read(max_records)
for i, r in enumerate(records):
print(f"[{i}] payload: {r.payload}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Consumer Example")
parser.add_argument(
"--host", type=str, help="server host", default="127.0.0.1"
)
parser.add_argument("--port", type=int, help="server port", default=6570)
parser.add_argument(
"--stream-name",
type=str,
help="name of the stream, default is 'test_stream'",
default="test_stream",
)
parser.add_argument(
"--reader-id",
type=str,
help="id of the reader, default is 'test_reader'",
default="test_reader",
)
parser.add_argument(
"--max-records",
type=int,
help="max records to read each time, default is 10",
default=10,
)
args = parser.parse_args()
asyncio.run(
main(
args.host,
args.port,
args.stream_name,
args.reader_id,
args.max_records,
)
)