Source code for hstreamdb.types
from typing import NamedTuple, Optional, Dict, Union
import HStream.Server.HStreamApi_pb2 as ApiPb
[docs]class TimeStamp(NamedTuple):
seconds: int
nanos: int
[docs]class Stream(NamedTuple):
name: str
replication_factor: int
def stream_type_from(stream: ApiPb.Stream) -> Stream:
return Stream(
name=stream.streamName, replication_factor=stream.replicationFactor
)
[docs]class Subscription(NamedTuple):
subscription_id: str
stream_name: str
ack_timeout: int
max_unacks: int
def subscription_type_from(sub: ApiPb.Subscription) -> Subscription:
return Subscription(
subscription_id=sub.subscriptionId,
stream_name=sub.streamName,
ack_timeout=sub.ackTimeoutSeconds,
max_unacks=sub.maxUnackedRecords,
)
[docs]class RecordId(NamedTuple):
shard_id: int
batch_id: int
batch_index: int
def record_id_to(record_id: RecordId) -> ApiPb.RecordId:
return ApiPb.RecordId(
shardId=record_id.shard_id,
batchId=record_id.batch_id,
batchIndex=record_id.batch_index,
)
def record_id_from(record_id: ApiPb.RecordId) -> RecordId:
return RecordId(
shard_id=record_id.shardId,
batch_id=record_id.batchId,
batch_index=record_id.batchIndex,
)
[docs]class Record(NamedTuple):
id: RecordId
header: RecordHeader
payload: Union[bytes, dict]
class Shard(NamedTuple):
id: int
stream_name: str
start: int
end: int
epoch: int
def shard_type_from(shard: ApiPb.Shard) -> Shard:
return Shard(
id=shard.shardId,
stream_name=shard.streamName,
start=int(shard.startHashRangeKey),
end=int(shard.endHashRangeKey),
epoch=shard.epoch,
)
SpecialOffset = ApiPb.SpecialOffset
ShardOffset = ApiPb.ShardOffset