Python With JSON Communication and Custom WebSocket Class
How to run:
pip install websockets- put
credentials.jsonright beside the script - put
OAModel.custom.jsonandpayloadTypes.custom.jsonright beside the script. (check here to see how to create them) - put WebSocket definition in a file named
WebSocket.py, and place it beside the script.
Also be sure to check this note.
import asyncio as aio
import json
from types import SimpleNamespace
from WebSocket import WebSocket # our custom class
# gen util
parsejson = lambda s: json.loads(s, object_hook=lambda d: SimpleNamespace(**d))
def readfile(path):
with open(path) as f:
return parsejson(f.read())
creds = readfile('./credentials.json')
oa = readfile('./OAModel.custom.json')
pt = readfile('./payloadTypes.custom.json')
# app util
def construct_msg(payloadType, fields={}, clientMsgId=''):
msg = {
'payloadType': payloadType,
'payload': {
'ctidTraderAccountId': creds.accountId,
'accessToken': creds.accessToken,
**fields
}
}
if clientMsgId:
msg['clientMsgId'] = clientMsgId
return json.dumps(msg)
# application code
# application's starting point
# called after the program is connected to server and fully authenticated
async def on_ready(ws):
print('on ready')
# get symbols list
msg = construct_msg(pt.req.SymbolsList, {'symbolId':41})
await ws.send(msg)
# subscribe to live price
msg = construct_msg(pt.req.SubscribeSpots, {'symbolId':41})
await ws.send(msg)
# place limit order on XAUUSD symbol with volume of 0.01 lot
msg = construct_msg(pt.req.NewOrder, {
'symbolId': 41,
'orderType': oa.OrderType.LIMIT,
'tradeSide': oa.TradeSide.BUY,
'volume': 100,
'limitPrice': 3800.00,
'stopLoss': 3600.00,
'takeProfit': 4200.00
})
await ws.send(msg)
# application's main message callback
# called upon incomming server messages (except heartbeat msgs)
async def on_resp(ws, msg_str):
msg = json.loads(msg_str) # parse json as dict
# msg = parsejson(msg_str) # parse json as obj
payloadType = msg['payloadType']
payload = msg['payload']
if payloadType == pt.res.SymbolsList:
with open('symbols.json', 'w') as f:
syms = payload['symbol']
json.dump(syms, f, indent=2)
if payloadType == pt.event.Execution:
print('order executed')
print(json.dumps(payload, indent=2))
# code related to setting up connection and authentication
async def main():
ws = WebSocket('wss://live.ctraderapi.com:5036')
async def on_open():
print('connected')
# we don't use the `construct_msg()` function for the first message,
# because its payload is different from the rest of the messages
msg = {
'payloadType': pt.req.ApplicationAuth,
'payload': {
'clientId': creds.clientId,
'clientSecret': creds.clientSecret
}
}
await ws.send(json.dumps(msg))
async def on_message(msg_str):
# we're defining a hardcoded behaviour for special incomming messages
# and redirecting the rest of the messages to the `on_resp()` function
msg = parsejson(msg_str)
payloadType = msg.payloadType
if payloadType == pt.res.ApplicationAuth:
print('auth app')
msg = construct_msg(pt.req.AccountAuth)
await ws.send(msg)
return
if payloadType == pt.res.AccountAuth:
print('auth account')
await on_ready(ws)
return
if payloadType == pt.common.HeartbeatEvent:
msg = {'payloadType': pt.common.HeartbeatEvent}
await ws.send(json.dumps(msg))
print('heartbeat exchange')
return
await on_resp(ws, msg_str)
async def on_close():
print('closed')
async def on_error(err):
print('error:', err)
ws.onopen = on_open
ws.onmessage = on_message
ws.onclose = on_close
ws.onerror = on_error
# keep running the loop until WebSocket closes
while ws.readyState != WebSocket.CLOSED:
await aio.sleep(0.1)
aio.run(main())
The WebSocket Class
This is just a wrapper class that tries to mimic Browser's WebSocket API.
import asyncio
import websockets
class WebSocket:
CONNECTING = 0
OPEN = 1
CLOSING = 2
CLOSED = 3
def __init__(self, url):
self.url = url
self.readyState = WebSocket.CONNECTING
self.onopen = None
self.onmessage = None
self.onclose = None
self.onerror = None
self._ws = None
self._loop = asyncio.get_event_loop()
self._loop.create_task(self._connect())
async def _connect(self):
try:
async with websockets.connect(self.url) as ws:
self._ws = ws
self.readyState = WebSocket.OPEN
if callable(self.onopen):
await self._maybe_await(self.onopen())
async for message in ws:
if callable(self.onmessage):
await self._maybe_await(self.onmessage(message))
except Exception as e:
if callable(self.onerror):
await self._maybe_await(self.onerror(e))
finally:
self.readyState = WebSocket.CLOSED
if callable(self.onclose):
await self._maybe_await(self.onclose())
async def send(self, data):
if self.readyState == WebSocket.OPEN and self._ws:
await self._ws.send(data)
else:
raise ConnectionError('WebSocket is not open')
async def close(self):
if self._ws and self.readyState == WebSocket.OPEN:
self.readyState = WebSocket.CLOSING
await self._ws.close()
self.readyState = WebSocket.CLOSED
async def _maybe_await(self, result):
'''Allow both sync and async callbacks.'''
if asyncio.iscoroutine(result):
await result