jsonrpcclient

jsonrpcclient Examples

Showing how to send JSON-RPC requests using various frameworks and transport protocols.

Synchronous

Requests

$ pip install "jsonrpcclient[requests]"
import logging

from jsonrpcclient.clients.http_client import HTTPClient

client = HTTPClient("http://localhost:5000")
response = client.request("ping")

if response.data.ok:
    print(response.data.result)
else:
    logging.error(response.data.message)

ZeroMQ

$ pip install 'jsonrpcclient[pyzmq]'
import logging
import sys

from jsonrpcclient.clients.zeromq_client import ZeroMQClient

response = ZeroMQClient("tcp://localhost:5000").request("ping")

if response.data.ok:
    print(response.data.result)
else:
    logging.error(response.data.message)

See blog post.

Asynchronous

These require Python 3.5+.

aiohttp

$ pip install 'jsonrpcclient[aiohttp]'
import asyncio
import logging

import aiohttp

from jsonrpcclient.clients.aiohttp_client import AiohttpClient


async def main(loop):
    async with aiohttp.ClientSession(loop=loop) as session:
        client = AiohttpClient(session, "http://localhost:5000")
        response = await client.request("ping")
    if response.data.ok:
        print(response.data.result)
    else:
        logging.error(response.data.message)


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))

See blog post.

Tornado

$ pip install 'jsonrpcclient[tornado]'
import logging

from tornado.ioloop import IOLoop

from jsonrpcclient.clients.tornado_client import TornadoClient

client = TornadoClient("http://localhost:5000/")


async def main():
    response = await client.request("ping")
    if response.data.ok:
        print(response.data.result)
    else:
        logging.error(response.data.message)


IOLoop.current().run_sync(main)

Websockets

$ pip install 'jsonrpcclient[websockets]'
import asyncio
import logging

import websockets

from jsonrpcclient.clients.websockets_client import WebSocketsClient


async def main():
    async with websockets.connect("ws://localhost:5000") as ws:
        response = await WebSocketsClient(ws).request("ping")
    if response.data.ok:
        print(response.data.result)
    else:
        logging.error(response.data.message)


asyncio.get_event_loop().run_until_complete(main())

See blog post.