API Documentation¶
For examples about actual usage, see the Examples.
Module Contents¶
Create Client¶
- async hstreamdb.insecure_client(host='127.0.0.1', port=6570, url=None)[source]¶
Creates an insecure client to a cluster.
- Parameters:
host – hostname to connect to HStreamDB, defaults to ‘127.0.0.1’
port – port to connect to HStreanDB, defaults to 6570
url – alternative service url to connect to HStreamDB, it should be in ‘hstream://your-host’ format. Note that if you provide this url then the ‘host’ and ‘port’ args will be ignored.
- Returns:
- async hstreamdb.secure_client(host='127.0.0.1', port=6570, url=None, is_creds_file=False, root_certificates=None, private_key=None, certificate_chain=None)[source]¶
Creates a secure client to a cluster.
- Parameters:
host – hostname to connect to HStreamDB, defaults to ‘127.0.0.1’
port – port to connect to HStreanDB, defaults to 6570
url – alternative service url to connect to HStreamDB, it should be in ‘hstreams://your-host’ format. Note that if you provide this url then the ‘host’ and ‘port’ args will be ignored.
is_creds_file – whether the credentials is a filepath or the contents.
root_certificates – The PEM-encoded root certificates as a byte string, or None to retrieve them from a default location chosen by gRPC runtime. Note: if ‘is_creds_file’ is True this is the filepath instead of the contents.
private_key – The PEM-encoded private key as a byte string, or None if no private key should be used. Note: if ‘is_creds_file’ is True this is the filepath instead of the contents.
certificate_chain – The PEM-encoded certificate chain as a byte string to use or None if no certificate chain should be used. Note: if ‘is_creds_file’ is True this is the filepath instead of the contents.
- Returns:
HStreamDBClient Object¶
- class hstreamdb.HStreamDBClient(host='127.0.0.1', port=6570, credentials=None)[source]¶
- Parameters:
host (str) –
port (int) –
- async create_stream(name, replication_factor=1, backlog=0, shard_count=1)[source]¶
- Parameters:
name – stream name
replication_factor – how stream can be replicated across nodes in the cluster
backlog – how long streams of HStreamDB retain records after being appended, in senconds.
- async append(name, payloads, key=None, compresstype=None, compresslevel=9)[source]¶
Append payloads to a stream.
- Parameters:
name (str) – stream name
payloads (Iterable[Any]) – a list of string, bytes or dict(json).
key (str | None) – Optional stream key.
- Returns:
Appended RecordIds generator
- Return type:
Iterator[RecordId]
- new_producer(append_callback=None, size_trigger=0, time_trigger=0, workers=1, retry_count=0, retry_max_delay=60, compresstype=None, compresslevel=9)[source]¶
- Parameters:
append_callback (Type[AppendCallback] | None) –
- async create_subscription(subscription_id, stream_name, ack_timeout=600, max_unacks=10000, offset=1)[source]¶
- Parameters:
subscription_id (str) –
stream_name (str) –
ack_timeout (int) –
max_unacks (int) –
offset (<google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x7fb3db790dc0>) –
- async list_subscriptions()[source]¶
- Return type:
Iterator[Subscription]
- async delete_subscription(subscription_id, force=False)[source]¶
- Parameters:
subscription_id (str) –
- new_consumer(name, subscription_id, processing_func)[source]¶
- Parameters:
name (str) –
subscription_id (str) –
- with_reader(stream_name, reader_id, shard_offset, timeout, shard_id=None, stream_key=None)[source]¶
- Parameters:
stream_name (str) –
reader_id (str) –
shard_offset (ShardOffset) –
timeout (int) –
shard_id (int | None) –
stream_key (str | None) –
- async create_reader(stream_name, reader_id, shard_offset, timeout, shard_id=None, stream_key=None)[source]¶
Create a reader.
If the ‘shard_id’ is None, then use the shard which the optional ‘stream_key’ corresponds.
- Parameters:
stream_name (str) –
reader_id (str) –
shard_offset (ShardOffset) –
timeout (int) –
shard_id (int | None) –
stream_key (str | None) –
BufferedProducer Object¶
- class hstreamdb.BufferedProducer(flush_coro, find_stream_key_id_coro, append_callback=None, size_trigger=0, time_trigger=0, workers=1, retry_count=0, retry_max_delay=60, compresstype=None, compresslevel=9)[source]¶
- Parameters:
flush_coro (Callable[[str, List[AppendPayload], int, str | None, int], Awaitable[Iterator[RecordId]]]) –
find_stream_key_id_coro (Callable[[str, str | None], Awaitable[int]]) –
append_callback (Type[AppendCallback] | None) –
- StreamKeyId¶
alias of
int
- GroupKeyTy¶
alias of
Tuple
[str
,int
]
- class AppendCallback[source]¶
- async append(stream_name, payload, key=None)[source]¶
- Parameters:
stream_name (str) –
payload (bytes | str | Dict[Any, Any]) –
key (str | None) –
Consumer Object¶
Types¶
- class hstreamdb.TimeStamp(seconds, nanos)[source]¶
- Parameters:
seconds (int) –
nanos (int) –
- seconds: int¶
Alias for field number 0
- nanos: int¶
Alias for field number 1
- class hstreamdb.Stream(name, replication_factor)[source]¶
- Parameters:
name (str) –
replication_factor (int) –
- name: str¶
Alias for field number 0
- replication_factor: int¶
Alias for field number 1
- class hstreamdb.Subscription(subscription_id, stream_name, ack_timeout, max_unacks)[source]¶
- Parameters:
subscription_id (str) –
stream_name (str) –
ack_timeout (int) –
max_unacks (int) –
- subscription_id: str¶
Alias for field number 0
- stream_name: str¶
Alias for field number 1
- ack_timeout: int¶
Alias for field number 2
- max_unacks: int¶
Alias for field number 3
- class hstreamdb.RecordId(shard_id, batch_id, batch_index)[source]¶
- Parameters:
shard_id (int) –
batch_id (int) –
batch_index (int) –
- shard_id: int¶
Alias for field number 0
- batch_id: int¶
Alias for field number 1
- batch_index: int¶
Alias for field number 2
- class hstreamdb.RecordHeader(publish_time, key, attributes)[source]¶
- Parameters:
publish_time (TimeStamp) –
key (str | None) –
attributes (Dict[str, str]) –
- key: str | None¶
Alias for field number 1
- attributes: Dict[str, str]¶
Alias for field number 2
- class hstreamdb.Record(id, header, payload)[source]¶
- Parameters:
id (RecordId) –
header (RecordHeader) –
payload (bytes | dict) –
- header: RecordHeader¶
Alias for field number 1
- payload: bytes | dict¶
Alias for field number 2