Websockets: not able to retrieve market data

The problem is solved. The API permissions were not correct. You need the Contract Library in addition to Market Data, otherwise all symbols are inaccessible. Thanks to @Alexander and @beebee for your support!

1 Like

Let’s stick with python’s default websockets. This code should work, I removed some irrelevant things so there might be some small errors.

import json
import time
import asyncio
import websockets


def init_md_heartbeat(md_hb_value, running):

    while running:
        if(md_hb_value.value <= 0): #i used a multiprocessing value, to share memory across processes
            md_hb_value.value = 1
            # print("md_heartbeat   -- beating setting value to -- [{0}]".format(md_hb_value.value))
        else:
            # print("md_heartbeat -- SKIP beating VALUE -- [{0}] SLEEPING".format(md_hb_value.value))
            pass
        time.sleep(1.5) #probably don't want to hardcoded values like these


async def websocket_connect(access_token, ws_url):
    warmup   = True
    msg      = None
    init_msg = None
    init_s   = None
    a_seq_n  = 0     #this is a good habit of recording certain sequence numbers to match when you get a response from the websocket api.
    init     = "authorize\n{0}\n\n{1}".format(a_seq_n, access_token)
    mdws     = await websockets.connect(ws_url, ssl=True, compression=None)

    while warmup:
        try:
            msg = await mdws.recv()
        except Exception as e:
            print("websocket_connect -- await mdws.recv() Exception -- [{0}]\n\n".format(str(e)))

        if(msg[0] == "o"):
            try:
                await mdws.send(init)
            except Exception as e:
                print("websocket_connect -- await mdws.send(init) Exception -- [{0}]\n\n".format(str(e)))

        if(msg != "o"):
            try:
                init_s = json.loads(msg[2:-1])
                if(init_s["s"] == 200 and init_s['d']['i'] == a_seq_n): #use this seq_num check pattern 
                    print("websocket_connect -- successfully authorized -- \n[{0}]\n[{1}]\n\n".format(ws_url, msg))
                    warmup = False
                    break
            except Exception as e:
                    print("websocket_connect -- Exception -- [{0}]".format(str(e)))

                    
        if(warmup == False):
            print("websocket_connect -- exiting warmup -- ")
            break

    return mdws



async def marketdata_run(management_pkg):
    payload          = management_pkg['payload'] #payload is the json payload you get back from the authentication request
    """ I keep predefine certain commonly used variables to avoid creating garbage doing my main loop"""
    md_hb_value      = management_pkg['md_hb_value']
    access_token     = payload['accessToken']

    heartbeat_thread = None
    heartbeat_count  = 0
    running          = False
    item             = None
    bid              = 0.0
    ask              = 0.0

    low              = 0.0
    high             = 0.0
    last_bid         = 0.0
    last_ask         = 0.0
    last_trade       = 0.0


    last_trade_price = 0.0
    bid_low_price    = 0.0
    bid_hig_price    = 0.0
    ask_low_price    = 0.0
    ask_hig_price    = 0.0
    open_position    = False
    open_price       = 0.0



    DEMO = True


    if(DEMO):
        WS_URL  = management_pkg['md_demo_wss'] 
        accountSpec      = payload['DEMO_ACCOUNT_SPEC'] #REPLACE THESE
        accountId        = payload['DEMO_USER_ID']

    else:
        accountSpec      = payload['name']
        accountId        = payload['userId']
        WS_URL  = management_pkg['md_live_wss'] 

    ws  = await websocket_connect(access_token, MD_WS_URL)
    if(ws):
        heartbeat_thread = threading.Thread(target=init_md_heartbeat, args=(md_hb_value, True))
        heartbeat_thread.daemon = True
        heartbeat_thread.start()
        running = True


    while running:
        try:
            msg = await ws.recv()
        except Exception as e:
            print("marketdata_run -- msg = await ws.recv() got exception -- [{0}]".format(str(e)))


        if(msg):
            msg_t = msg[0]
            if(msg_t == "a"):
                jmsg = json.loads(msg[2:-1])
                try:
                    event_msg = jmsg['e']
                    response_msg = False
                except Exception as e:
                    response_msg = jmsg['s']
                    event_msg = False

                if(event_msg):
                    try:
                        item             = jmsg['d']['quotes']
                    except Exception as e:
                        item = None

                    if(item == None):
                        continue
                    else
                        for ix, itm in enumerate(item):
                            low        = itm['entries']['LowPrice']['price']
                            high       = itm['entries']['HighPrice']['price']
                            bid        = itm['entries']['Bid']['price']
                            ask        = itm['entries']['Offer']['price']
                            last       = itm['entries']['Trade']['price']
                            #================ USE MARKETDATA TO do SOMETHING USEFUL BEGIN ============================
                            #
                            #
                            #
                            #================ USE MARKETDATA TO do SOMETHING USEFUL  END  ============================
                else:
                    response_status = jmsg['s']
                    if(response_status == 200):
                        response_id     = jmsg['i']
                        response_pld    = jmsg['d']

                    else:
                        print("got response ERROR --\n{0}\n".format(jmsg))
            elif(msg_t == "h"):
                print("\tmarketdata_run -- MD -- heartbeat -- ")
            elif(msg_t == "c"):
                print("marketdata_run -- close message -- [{0}]".format(msg))
            else:
                pass


        if(md_hb_value.value >= 1.0):
            try:
                await ws.send("[]")
                md_hb_value.value -= 1
            except Exception as e:
                print("\nmarketdata_run -- WEBSOCKET -- sent heartbeat EXCEPTION -- [{0}] [{1}]".format(md_hb_value.value, str(e)))




def marketdata_init(management_pkg):
    marketdata_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(marketdata_loop)
    marketdata_loop.run_until_complete(marketdata_run(management_pkg))

Take a look over it, give it a shot. If you have any problems, just continue to post here.

1 Like

Hi, Thanks a lot for sharing that.

I am trying to use your code but its throwing syntax errors. Can you please share the python version you are using.

Also a test sample of how to use this code please

Can you please have a look at following gist:

I am able to run the code you shared, update the gist.

How do i communicate with the script, i did not update any credentials and run it just to test, so it must not be working.

can you please point out where i can update my username and password?

I am using the python multiprocessing library

Since I have 2 or more processes running and I would like to be faster than a queue. I use the multiprocessing shared memory types to pass messages between processes.

It’s similar to setting up a stack before calling a function in a language like c/c++.

This is earlier code and should be replaced.

That code above looks like a really early version;
instead of passing md_hb_value to that function; pass in the payload dictionary
inside the payload add keys mda_hb_init and mda_running

payload['mda_hb_init']
payload['mda_running']

def init_md_heartbeat(payload):
     while(not payload['mda_running']):
          wait until you initialize and connect to the websocket; then set payload['mda_hb_init'] = True
     while(payload['mda_running']):
           if(payload['mda_hb_init']):
                    add a try catch block to make sure the websocket connection is still up, tradovate websockets disconnect quite often
                    
                    if your heartbeat interval passed and there's no exception; send a heartbeat and sleep
                    if there's an exception, set payload['mda_hb_init'] to false, re-initialize the websocket

i renamed management package to payload since it’s shorter, u can simplify it further if it suites your needs.
if you have any further questions post below.

1 Like

Hi,

Thank you! for your time and prompt reply.

I have replaced the code snippet in gist websocket.py · GitHub

I have a demo account and i want to monitor orders created and executions.

Can you please share a usage example and where i have to update the credentials.

Thanks again

update, i did not see that code snippet has pseudo code in it :slight_smile:

I am a beginner so if you can please share an example that would be helpful …

you’re going to have to do some problem solving to get going.

look at the file __tv_api.py, there’s a line at the top with these imports:

from my_secrets import APP_ID, APP_VERSION, ACCOUNT, ACCOUNT_P, CID, CLIENT_SECRET, DEVICE_ID

create the file my_secrets.py and place it next to the __tv_api.py file, inside the __tv_api.py file, define those variables; you get those from the Tradovate backend.

I don’t think you get API access on a demo account. Things might have changed, I am not sure.

1 Like

Hi,

your code and questions have been helpful. Can you please share tradovate_auth after redacting the sensitive information off course.

Thanks.

apologies for the delay, what exactly are you looking for?

@Ingo, @Alexander in order for you to receive chart data did you have to register as Data Distributor with CME? The Tradovate tech support keeps on telling me that I need to register. Called CME they are completely clueless why I would have to register. Just contacted CME’s global account management on this matter.

hi ingo I am having the problem you had, but don’t know what you mean you need the contract library.

Thanks in advance.

Does any one know how to terminate the API websocket. I know socket.send() is the heartbeat reply you send. Is the a code that you can send it to close the websocket?

Thanks in advance.

Typically any websocket API will implement socket.close() or something to that effect to terminate the channel manually.

thanks alot, that helped.

I am having another problem. My websocket is open, every thing is working well. When I send an order, it goes through. I can see it, but the socket stops working properly and I get this error. Am I supposed to do something after an order goes through.

Task exception was never retrieved
future: <Task finished name=‘heart_beat’ coro=<heart_beat() done, defined at C:\pythonproject\Tradovate\mdwebsocket.py:23> exception=ConnectionClosedError(‘code = 1006 (connection closed abnormally [internal]), no reason’)>

Can anyone please help me…I have market data subscription. I was able to setup my webscoket to it, but when i tried the quote end point to get a quote I am getting this error.

a[{“s”:200,“i”:2,“d”:{“errorText”:“Symbol is inaccessible”,“errorCode”:“UnknownSymbol”,“mode”:“None”}}]

this is the code:

payload = {“symbol”: symbol}
await websocket.send(f"md/subscribeQuote\n2\n\n{json.dumps(body)}")

Thanks in advance

payload = {“symbol”: symbol}
await websocket.send(f"md/subscribeQuote\n2\n\n{json.dumps(payload)}")

This seems to be the issue. Maybe contact CME? Sorry for the bad news…

thanks for your help. do you you what others having the same problems are doing?

thanks

Hi there! Do you have, by any changed, the complete code/example on how to retrieve market data with Python and websockets? Some links are broken here, and I wanted to try an example and build up from there… thanks so much!