Source code for aiorpc.connection

# -*- coding: utf-8 -*-

import asyncio
from aiorpc.log import rootLogger
from aiorpc.constants import SOCKET_RECV_SIZE

__all__ = ['Connection']
_logger = rootLogger.getChild(__name__)


[docs]class Connection: def __init__(self, reader, writer, unpacker): self.reader = reader self.writer = writer self.unpacker = unpacker self._is_closed = False self.peer = self.writer.get_extra_info('peername')
[docs] async def sendall(self, raw_req, timeout): _logger.debug('sending raw_req {} to {}'.format( str(raw_req), self.peer)) self.writer.write(raw_req) await asyncio.wait_for(self.writer.drain(), timeout) _logger.debug('sending {} completed'.format(str(raw_req)))
[docs] async def recvall(self, timeout): _logger.debug('entered recvall from {}'.format(self.peer)) # buffer, line = bytearray(), b'' # while not line.endswith(b'\r\n'): # _logger.debug('receiving data, timeout: {}'.format(timeout)) # line = await asyncio.wait_for(self.reader.readline(), timeout) # if not line: # break # _logger.debug('received data {}'.format(line)) # buffer.extend(line) # _logger.debug('buffer: {}'.format(buffer)) req = None while True: data = await asyncio.wait_for(self.reader.read(SOCKET_RECV_SIZE), timeout) _logger.debug('receiving data {} from {}'.format(data, self.peer)) if not data: raise IOError('Connection to {} closed'.format(self.peer)) self.unpacker.feed(data) try: req = next(self.unpacker) break except StopIteration: continue _logger.debug('received req from {} : {}'.format(self.peer, req)) _logger.debug('exiting recvall from {}'.format(self.peer)) return req
[docs] def close(self): self.reader.feed_eof() self.writer.close() self._is_closed = True
[docs] def is_closed(self): return self._is_closed