jsonrpcclient Examples¶
Showing how to send JSON-RPC requests using various frameworks and transport protocols.
Synchronous¶
Requests¶
$ pip install "jsonrpcclient[requests]"
import logging
from jsonrpcclient.clients.http_client import HTTPClient
client = HTTPClient("http://localhost:5000")
response = client.request("ping")
if response.data.ok:
print(response.data.result)
else:
logging.error(response.data.message)
ZeroMQ¶
$ pip install 'jsonrpcclient[pyzmq]'
import logging
import sys
from jsonrpcclient.clients.zeromq_client import ZeroMQClient
response = ZeroMQClient("tcp://localhost:5000").request("ping")
if response.data.ok:
print(response.data.result)
else:
logging.error(response.data.message)
See blog post.
Asynchronous¶
These require Python 3.5+.
aiohttp¶
$ pip install 'jsonrpcclient[aiohttp]'
import asyncio
import logging
import aiohttp
from jsonrpcclient.clients.aiohttp_client import AiohttpClient
async def main(loop):
async with aiohttp.ClientSession(loop=loop) as session:
client = AiohttpClient(session, "http://localhost:5000")
response = await client.request("ping")
if response.data.ok:
print(response.data.result)
else:
logging.error(response.data.message)
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
See blog post.
Tornado¶
$ pip install 'jsonrpcclient[tornado]'
import logging
from tornado.ioloop import IOLoop
from jsonrpcclient.clients.tornado_client import TornadoClient
client = TornadoClient("http://localhost:5000/")
async def main():
response = await client.request("ping")
if response.data.ok:
print(response.data.result)
else:
logging.error(response.data.message)
IOLoop.current().run_sync(main)
Websockets¶
$ pip install 'jsonrpcclient[websockets]'
import asyncio
import logging
import websockets
from jsonrpcclient.clients.websockets_client import WebSocketsClient
async def main():
async with websockets.connect("ws://localhost:5000") as ws:
response = await WebSocketsClient(ws).request("ping")
if response.data.ok:
print(response.data.result)
else:
logging.error(response.data.message)
asyncio.get_event_loop().run_until_complete(main())
See blog post.