API Documentation

For examples about actual usage, see the Examples.

Module Contents

Create Client

async hstreamdb.insecure_client(host='127.0.0.1', port=6570, url=None)[source]

Creates an insecure client to a cluster.

Parameters:
  • host – hostname to connect to HStreamDB, defaults to ‘127.0.0.1’

  • port – port to connect to HStreanDB, defaults to 6570

  • url – alternative service url to connect to HStreamDB, it should be in ‘hstream://your-host’ format. Note that if you provide this url then the ‘host’ and ‘port’ args will be ignored.

Returns:

A HStreamDBClient

async hstreamdb.secure_client(host='127.0.0.1', port=6570, url=None, is_creds_file=False, root_certificates=None, private_key=None, certificate_chain=None)[source]

Creates a secure client to a cluster.

Parameters:
  • host – hostname to connect to HStreamDB, defaults to ‘127.0.0.1’

  • port – port to connect to HStreanDB, defaults to 6570

  • url – alternative service url to connect to HStreamDB, it should be in ‘hstreams://your-host’ format. Note that if you provide this url then the ‘host’ and ‘port’ args will be ignored.

  • is_creds_file – whether the credentials is a filepath or the contents.

  • root_certificates – The PEM-encoded root certificates as a byte string, or None to retrieve them from a default location chosen by gRPC runtime. Note: if ‘is_creds_file’ is True this is the filepath instead of the contents.

  • private_key – The PEM-encoded private key as a byte string, or None if no private key should be used. Note: if ‘is_creds_file’ is True this is the filepath instead of the contents.

  • certificate_chain – The PEM-encoded certificate chain as a byte string to use or None if no certificate chain should be used. Note: if ‘is_creds_file’ is True this is the filepath instead of the contents.

Returns:

A HStreamDBClient

HStreamDBClient Object

class hstreamdb.HStreamDBClient(host='127.0.0.1', port=6570, credentials=None)[source]
Parameters:
  • host (str) –

  • port (int) –

async init_cluster_info()[source]
async create_stream(name, replication_factor=1, backlog=0, shard_count=1)[source]
Parameters:
  • name – stream name

  • replication_factor – how stream can be replicated across nodes in the cluster

  • backlog – how long streams of HStreamDB retain records after being appended, in senconds.

async delete_stream(name, ignore_non_exist=False, force=False)[source]
async list_streams()[source]

List all streams

Return type:

Iterator[Stream]

async append(name, payloads, key=None, compresstype=None, compresslevel=9)[source]

Append payloads to a stream.

Parameters:
  • name (str) – stream name

  • payloads (Iterable[Any]) – a list of string, bytes or dict(json).

  • key (str | None) – Optional stream key.

Returns:

Appended RecordIds generator

Return type:

Iterator[RecordId]

new_producer(append_callback=None, size_trigger=0, time_trigger=0, workers=1, retry_count=0, retry_max_delay=60, compresstype=None, compresslevel=9)[source]
Parameters:

append_callback (Type[AppendCallback] | None) –

async list_shards(stream_name)[source]
Return type:

List[Shard]

async create_subscription(subscription_id, stream_name, ack_timeout=600, max_unacks=10000, offset=1)[source]
Parameters:
  • subscription_id (str) –

  • stream_name (str) –

  • ack_timeout (int) –

  • max_unacks (int) –

  • offset (<google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x7fb3db790dc0>) –

async list_subscriptions()[source]
Return type:

Iterator[Subscription]

async does_subscription_exist(subscription_id)[source]
Parameters:

subscription_id (str) –

async delete_subscription(subscription_id, force=False)[source]
Parameters:

subscription_id (str) –

new_consumer(name, subscription_id, processing_func)[source]
Parameters:
  • name (str) –

  • subscription_id (str) –

with_reader(stream_name, reader_id, shard_offset, timeout, shard_id=None, stream_key=None)[source]
Parameters:
  • stream_name (str) –

  • reader_id (str) –

  • shard_offset (ShardOffset) –

  • timeout (int) –

  • shard_id (int | None) –

  • stream_key (str | None) –

async create_reader(stream_name, reader_id, shard_offset, timeout, shard_id=None, stream_key=None)[source]

Create a reader.

If the ‘shard_id’ is None, then use the shard which the optional ‘stream_key’ corresponds.

Parameters:
  • stream_name (str) –

  • reader_id (str) –

  • shard_offset (ShardOffset) –

  • timeout (int) –

  • shard_id (int | None) –

  • stream_key (str | None) –

async read_reader(reader_id, max_records)[source]
Parameters:
  • reader_id (str) –

  • max_records (str) –

Return type:

Iterator[Record]

async delete_reader(reader_id)[source]
Parameters:

reader_id (str) –

Return type:

None

BufferedProducer Object

class hstreamdb.BufferedProducer(flush_coro, find_stream_key_id_coro, append_callback=None, size_trigger=0, time_trigger=0, workers=1, retry_count=0, retry_max_delay=60, compresstype=None, compresslevel=9)[source]
Parameters:
  • flush_coro (Callable[[str, List[AppendPayload], int, str | None, int], Awaitable[Iterator[RecordId]]]) –

  • find_stream_key_id_coro (Callable[[str, str | None], Awaitable[int]]) –

  • append_callback (Type[AppendCallback] | None) –

StreamKeyId

alias of int

GroupKeyTy

alias of Tuple[str, int]

class AppendCallback[source]
abstract on_success(stream_name, payloads, stream_keyid)[source]
Parameters:
  • stream_name (str) –

  • payloads (List[AppendPayload]) –

  • stream_keyid (int) –

abstract on_fail(stream_name, payloads, stream_keyid, e)[source]
Parameters:
  • stream_name (str) –

  • payloads (List[AppendPayload]) –

  • stream_keyid (int) –

  • e (Exception) –

async append(stream_name, payload, key=None)[source]
Parameters:
  • stream_name (str) –

  • payload (bytes | str | Dict[Any, Any]) –

  • key (str | None) –

async flush(stream_name, shard_id)[source]
Parameters:
  • stream_name (str) –

  • shard_id (int) –

async flush_by_key(stream_name, key=None)[source]
Parameters:
  • stream_name (str) –

  • key (str | None) –

async flushall()[source]
async close()[source]
async wait()[source]
async wait_and_close()[source]

Consumer Object

class hstreamdb.Consumer(name, subscription, find_stub_coro, processing_func)[source]
Parameters:
  • name (str) –

  • subscription (str) –

ResponseTy = typing.Any
async start()[source]

Types

class hstreamdb.TimeStamp(seconds, nanos)[source]
Parameters:
  • seconds (int) –

  • nanos (int) –

seconds: int

Alias for field number 0

nanos: int

Alias for field number 1

class hstreamdb.Stream(name, replication_factor)[source]
Parameters:
  • name (str) –

  • replication_factor (int) –

name: str

Alias for field number 0

replication_factor: int

Alias for field number 1

class hstreamdb.Subscription(subscription_id, stream_name, ack_timeout, max_unacks)[source]
Parameters:
  • subscription_id (str) –

  • stream_name (str) –

  • ack_timeout (int) –

  • max_unacks (int) –

subscription_id: str

Alias for field number 0

stream_name: str

Alias for field number 1

ack_timeout: int

Alias for field number 2

max_unacks: int

Alias for field number 3

class hstreamdb.RecordId(shard_id, batch_id, batch_index)[source]
Parameters:
  • shard_id (int) –

  • batch_id (int) –

  • batch_index (int) –

shard_id: int

Alias for field number 0

batch_id: int

Alias for field number 1

batch_index: int

Alias for field number 2

class hstreamdb.RecordHeader(publish_time, key, attributes)[source]
Parameters:
  • publish_time (TimeStamp) –

  • key (str | None) –

  • attributes (Dict[str, str]) –

publish_time: TimeStamp

Alias for field number 0

key: str | None

Alias for field number 1

attributes: Dict[str, str]

Alias for field number 2

class hstreamdb.Record(id, header, payload)[source]
Parameters:
id: RecordId

Alias for field number 0

header: RecordHeader

Alias for field number 1

payload: bytes | dict

Alias for field number 2