Skip to main content

Python With JSON Communication and Custom WebSocket Class

How to run:

  1. pip install websockets
  2. put credentials.json right beside the script
  3. put OAModel.custom.json and payloadTypes.custom.json right beside the script. (check here to see how to create them)
  4. put WebSocket definition in a file named WebSocket.py, and place it beside the script.

Also be sure to check this note.

import asyncio as aio
import json
from types import SimpleNamespace
from WebSocket import WebSocket # our custom class

# gen util
parsejson = lambda s: json.loads(s, object_hook=lambda d: SimpleNamespace(**d))
def readfile(path):
with open(path) as f:
return parsejson(f.read())


creds = readfile('./credentials.json')
oa = readfile('./OAModel.custom.json')
pt = readfile('./payloadTypes.custom.json')

# app util
def construct_msg(payloadType, fields={}, clientMsgId=''):
msg = {
'payloadType': payloadType,
'payload': {
'ctidTraderAccountId': creds.accountId,
'accessToken': creds.accessToken,
**fields
}
}
if clientMsgId:
msg['clientMsgId'] = clientMsgId
return json.dumps(msg)



# application code

# application's starting point
# called after the program is connected to server and fully authenticated
async def on_ready(ws):
print('on ready')

# get symbols list
msg = construct_msg(pt.req.SymbolsList, {'symbolId':41})
await ws.send(msg)

# subscribe to live price
msg = construct_msg(pt.req.SubscribeSpots, {'symbolId':41})
await ws.send(msg)

# place limit order on XAUUSD symbol with volume of 0.01 lot
msg = construct_msg(pt.req.NewOrder, {
'symbolId': 41,
'orderType': oa.OrderType.LIMIT,
'tradeSide': oa.TradeSide.BUY,
'volume': 100,
'limitPrice': 3800.00,
'stopLoss': 3600.00,
'takeProfit': 4200.00
})
await ws.send(msg)


# application's main message callback
# called upon incomming server messages (except heartbeat msgs)
async def on_resp(ws, msg_str):
msg = json.loads(msg_str) # parse json as dict
# msg = parsejson(msg_str) # parse json as obj
payloadType = msg['payloadType']
payload = msg['payload']

if payloadType == pt.res.SymbolsList:
with open('symbols.json', 'w') as f:
syms = payload['symbol']
json.dump(syms, f, indent=2)

if payloadType == pt.event.Execution:
print('order executed')
print(json.dumps(payload, indent=2))




# code related to setting up connection and authentication
async def main():
ws = WebSocket('wss://live.ctraderapi.com:5036')

async def on_open():
print('connected')
# we don't use the `construct_msg()` function for the first message,
# because its payload is different from the rest of the messages
msg = {
'payloadType': pt.req.ApplicationAuth,
'payload': {
'clientId': creds.clientId,
'clientSecret': creds.clientSecret
}
}
await ws.send(json.dumps(msg))

async def on_message(msg_str):
# we're defining a hardcoded behaviour for special incomming messages
# and redirecting the rest of the messages to the `on_resp()` function
msg = parsejson(msg_str)
payloadType = msg.payloadType
if payloadType == pt.res.ApplicationAuth:
print('auth app')
msg = construct_msg(pt.req.AccountAuth)
await ws.send(msg)
return
if payloadType == pt.res.AccountAuth:
print('auth account')
await on_ready(ws)
return
if payloadType == pt.common.HeartbeatEvent:
msg = {'payloadType': pt.common.HeartbeatEvent}
await ws.send(json.dumps(msg))
print('heartbeat exchange')
return
await on_resp(ws, msg_str)

async def on_close():
print('closed')

async def on_error(err):
print('error:', err)

ws.onopen = on_open
ws.onmessage = on_message
ws.onclose = on_close
ws.onerror = on_error

# keep running the loop until WebSocket closes
while ws.readyState != WebSocket.CLOSED:
await aio.sleep(0.1)


aio.run(main())

The WebSocket Class

This is just a wrapper class that tries to mimic Browser's WebSocket API.

import asyncio
import websockets


class WebSocket:
CONNECTING = 0
OPEN = 1
CLOSING = 2
CLOSED = 3

def __init__(self, url):
self.url = url
self.readyState = WebSocket.CONNECTING
self.onopen = None
self.onmessage = None
self.onclose = None
self.onerror = None
self._ws = None
self._loop = asyncio.get_event_loop()
self._loop.create_task(self._connect())

async def _connect(self):
try:
async with websockets.connect(self.url) as ws:
self._ws = ws
self.readyState = WebSocket.OPEN
if callable(self.onopen):
await self._maybe_await(self.onopen())

async for message in ws:
if callable(self.onmessage):
await self._maybe_await(self.onmessage(message))

except Exception as e:
if callable(self.onerror):
await self._maybe_await(self.onerror(e))
finally:
self.readyState = WebSocket.CLOSED
if callable(self.onclose):
await self._maybe_await(self.onclose())

async def send(self, data):
if self.readyState == WebSocket.OPEN and self._ws:
await self._ws.send(data)
else:
raise ConnectionError('WebSocket is not open')

async def close(self):
if self._ws and self.readyState == WebSocket.OPEN:
self.readyState = WebSocket.CLOSING
await self._ws.close()
self.readyState = WebSocket.CLOSED

async def _maybe_await(self, result):
'''Allow both sync and async callbacks.'''
if asyncio.iscoroutine(result):
await result