Source code for hstreamdb.aio.consumer
import asyncio
import logging
from typing import Any, Iterable
import HStream.Server.HStreamApi_pb2 as ApiPb
import HStream.Server.HStreamApi_pb2_grpc as ApiGrpc
from hstreamdb.types import RecordId, record_id_to
from hstreamdb.utils import decode_records
logger = logging.getLogger(__name__)
[docs]class Consumer:
ResponseTy = Any
_stub: ApiGrpc.HStreamApiStub
_requests: asyncio.Queue
# _call:
def __init__(
self,
name: str,
subscription: str,
find_stub_coro,
processing_func,
):
self._name = name
self._subscription = subscription
self._requests = asyncio.Queue()
self._find_stub_coro = find_stub_coro
self._processing_func = processing_func
[docs] async def start(self):
self._stub = await self._find_stub_coro()
await self._requests.put(self._fetch_request)
self._call = self._stub.StreamingFetch(self._request_gen())
try:
async for r in self._call:
await self._processing_func(
self._ack,
self._stop,
decode_records(r.receivedRecords),
)
except asyncio.exceptions.CancelledError:
logger.info("Consumer is Cancelled")
async def _stop(self):
if self._call:
self._call.cancel()
else:
logger.error("Make sure you have started the consumer!")
async def _ack(self, record_ids: Iterable[RecordId]):
await self._requests.put(
ApiPb.StreamingFetchRequest(
subscriptionId=self._subscription,
consumerName=self._name,
ackIds=[record_id_to(r) for r in record_ids],
)
)
async def _request_gen(self):
while True:
r = await self._requests.get()
self._requests.task_done()
if not r:
break
else:
yield r
@property
def _fetch_request(self):
return ApiPb.StreamingFetchRequest(
subscriptionId=self._subscription,
consumerName=self._name,
ackIds=[],
)