Source code for hstreamdb.types

from typing import NamedTuple, Optional, Dict, Union

import HStream.Server.HStreamApi_pb2 as ApiPb


[docs]class TimeStamp(NamedTuple): seconds: int nanos: int
[docs]class Stream(NamedTuple): name: str replication_factor: int
def stream_type_from(stream: ApiPb.Stream) -> Stream: return Stream( name=stream.streamName, replication_factor=stream.replicationFactor )
[docs]class Subscription(NamedTuple): subscription_id: str stream_name: str ack_timeout: int max_unacks: int
def subscription_type_from(sub: ApiPb.Subscription) -> Subscription: return Subscription( subscription_id=sub.subscriptionId, stream_name=sub.streamName, ack_timeout=sub.ackTimeoutSeconds, max_unacks=sub.maxUnackedRecords, )
[docs]class RecordId(NamedTuple): shard_id: int batch_id: int batch_index: int
def record_id_to(record_id: RecordId) -> ApiPb.RecordId: return ApiPb.RecordId( shardId=record_id.shard_id, batchId=record_id.batch_id, batchIndex=record_id.batch_index, ) def record_id_from(record_id: ApiPb.RecordId) -> RecordId: return RecordId( shard_id=record_id.shardId, batch_id=record_id.batchId, batch_index=record_id.batchIndex, )
[docs]class RecordHeader(NamedTuple): publish_time: TimeStamp key: Optional[str] attributes: Dict[str, str]
[docs]class Record(NamedTuple): id: RecordId header: RecordHeader payload: Union[bytes, dict]
class Shard(NamedTuple): id: int stream_name: str start: int end: int epoch: int def shard_type_from(shard: ApiPb.Shard) -> Shard: return Shard( id=shard.shardId, stream_name=shard.streamName, start=int(shard.startHashRangeKey), end=int(shard.endHashRangeKey), epoch=shard.epoch, ) SpecialOffset = ApiPb.SpecialOffset ShardOffset = ApiPb.ShardOffset