Asynchronous Usage#
While the library doesn’t provide its own native asyncio implementation yet, you can use the Sans-I/O interface to build your own datagram protocol that provides async / await capability. You can also integrate the little-a2s protocol with other networking libraries such as Trio, anyio, and gevent.
Below is an exmaple of how to create your own asyncio protocol and use it:
async_protocol.py#
import asyncio
from typing import Callable
from little_a2s import (
# This is the class that handles parsing datagrams into events.
A2SClientProtocol,
# The classes below are some return values of the protocol.
ClientEvent,
ClientEventChallenge,
ClientEventInfo,
ClientEventPlayers,
ClientEventRules,
ClientPacket,
)
host = "127.0.0.1"
port = 27015
async def main():
loop = asyncio.get_running_loop()
transport, protocol = await loop.create_datagram_endpoint(
MyA2SProtocol,
remote_addr=(host, port),
)
async with protocol, asyncio.timeout(1):
info = await protocol.info()
players = await protocol.players()
rules = await protocol.rules()
print(info)
print(players)
print(rules)
class MyA2SProtocol(asyncio.DatagramProtocol):
_send_fut: asyncio.Future[list[ClientEvent]] | None
def __init__(self):
self._proto = A2SClientProtocol()
# For this example, we won't try to support concurrent usage,
# so we're lock sending and receiving to one task at a time.
self._send_lock = asyncio.Lock()
self._send_fut = None
# A future to help us wait for the protocol's closure.
self._close_fut = asyncio.get_running_loop().create_future()
# Datagram protocol methods
def connection_made(self, transport: asyncio.DatagramTransport):
self.transport = transport
def connection_lost(self, exc: Exception | None):
if exc is None:
self._close_fut.set_result(None)
else:
self._close_fut.set_exception(exc)
def datagram_received(self, data: bytes, addr):
# Whenever we receive a datagram, feed it to the Sans-IO protocol.
self._proto.receive_datagram(data)
# The protocol may tell us to send packets in response.
for packet in self._proto.packets_to_send():
self.transport.sendto(bytes(packet))
# After parsing the datagram, we may receive events resulting from it.
# It can take several datagrams in the case of multi-packet responses.
events = self._proto.events_received()
if not events or self._send_fut is None:
return
print(f"Parsed event: {type(events[0]).__name__}")
try:
self._send_fut.set_result(events)
except asyncio.InvalidStateError:
pass # Future was probably cancelled
# Bridge methods
# This is how we convert our low-level, callback-based transport
# to high-level async/await syntax.
async def _send(self, packet: ClientPacket) -> list[ClientEvent]:
# If we send multiple packets at once, we may receive events
# out-of-order. For simplicitly, we'll lock send / read entirely.
async with self._send_lock:
self.transport.sendto(bytes(packet))
print(f"Sent packet: {type(packet).__name__}")
fut = asyncio.get_running_loop().create_future()
self._send_fut = fut
return await fut
# Request methods
async def info(self) -> ClientEventInfo:
event = await self._send_or_challenge(self._proto.info)
assert isinstance(event, ClientEventInfo) # can be false if out-of-order
return event
async def players(self) -> ClientEventPlayers:
event = await self._send_or_challenge(self._proto.players)
assert isinstance(event, ClientEventPlayers) # can be false if out-of-order
return event
async def rules(self) -> ClientEventRules:
event = await self._send_or_challenge(self._proto.rules)
assert isinstance(event, ClientEventRules) # can be false if out-of-order
return event
async def _send_or_challenge(
self,
request: Callable[[], ClientPacket],
) -> ClientEvent:
events = await self._send(request())
# Since we have one sender at a time, we shouldn't receive
# more than one event per request.
assert len(events) == 1
# If we receive a challenge, regenerate the request so the
# A2S protocol can suffix the challenge to the new payload.
#
# Just in case, we'll set a limit so we don't spam traffic.
remaining = 3
while remaining > 0 and isinstance(events[0], ClientEventChallenge):
events = await self._send(request())
assert len(events) == 1
remaining -= 1
return events[0]
# Cleanup methods
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, tb):
await self.close()
async def close(self):
self.transport.close()
return await asyncio.shield(self._close_fut)
if __name__ == "__main__":
asyncio.run(main())