To jest przykład jak przetwarzać wiadomości API używając wielowątkowość. App.run() jest uruchamiany jako osobny wątek i nasłuchuje odpowiedzi TWS API. Główny program wysyła następnie 5 wniosków o szczegóły dotyczące zamówienia, a następnie program główny czeka 10 sekund na odpowiedź. Wiadomości TWS API są przechowywane w instancji aplikacji i prostych sygnałach semaforów, gdy odpowiedź jest gotowa do przetworzenia.
To jest mój pierwszy program wielowątkowy, wszelkie komentarze są mile widziane.
from ibapi import wrapper
from ibapi.client import EClient
from ibapi.utils import iswrapper #just for decorator
from ibapi.common import *
from ibapi.contract import *
from ibapi.ticktype import *
#from OrderSamples import OrderSamples
import threading
import time
class myThread (threading.Thread):
def __init__(self, app, threadID, name):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.app = app
def run(self):
print ("Starting application in separate thread:", self.name, "threadID:", self.threadID )
self.app.run()
print ("Exiting " + self.name)
class TestApp(wrapper.EWrapper, EClient):
def __init__(self):
wrapper.EWrapper.__init__(self)
EClient.__init__(self, wrapper=self)
self.started = False
self.nextValidOrderId = 0
self.reqData = {} # store data returned by requests
self.reqStatus = {} # semaphore of requests - status End will indicate request is finished
@iswrapper
def nextValidId(self, orderId:int):
print("setting nextValidOrderId: %d", orderId)
self.nextValidOrderId = orderId
@iswrapper
def error(self, reqId:TickerId, errorCode:int, errorString:str):
print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)
@iswrapper
# ! [contractdetails]
def contractDetails(self, reqId: int, contractDetails: ContractDetails):
super().contractDetails(reqId, contractDetails)
# store response in reqData dict, for each request several objects are appended into list
if not reqId in self.reqData:
self.reqData[reqId] = []
self.reqData[reqId].append(contractDetails) # put returned data into data storage dict
# ! [contractdetails]
@iswrapper
# ! [contractdetailsend]
def contractDetailsEnd(self, reqId: int):
super().contractDetailsEnd(reqId)
print("ContractDetailsEnd. ", reqId, "\n") # just info
self.reqStatus[reqId] = 'End' # indicates the response is ready for further processing
# ! [contractdetailsend]
def main():
app = TestApp()
app.connect("127.0.0.1", 4001, clientId=123)
print("serverVersion:%s connectionTime:%s" % (app.serverVersion(),
app.twsConnectionTime()))
thread1App = myThread(app, 1, "Thread-1") # define thread for sunning app
thread1App.start() # start app.run(] as infitnite loop in separate thread
print('Requesting MSFT contract details:')
contract = Contract()
contract.symbol = "MSFT"
contract.secType = "STK"
contract.currency = "USD"
contract.exchange = ""
app.reqStatus[210] = 'Sent' # set request status to "sent to TWS"
app.reqContractDetails(210, contract)
print('Requesting IBM contract details:')
contract.symbol = "IBM"
app.reqStatus[211] = 'Sent'
app.reqContractDetails(211, contract)
print('Requesting IBM contract details:')
contract.symbol = "GE"
app.reqStatus[212] = 'Sent'
app.reqContractDetails(212, contract)
print('Requesting IBM contract details:')
contract.symbol = "GM"
app.reqStatus[213] = 'Sent'
app.reqContractDetails(213, contract)
print('Requesting IBM contract details:')
contract.symbol = "BAC"
app.reqStatus[214] = 'Sent'
app.reqContractDetails(214, contract)
i = 0
while i < 100: # exit loop after 10 sec (100 x time.sleep(0.1)
i = i+1
for reqId in app.reqStatus:
if app.reqStatus[reqId] == 'End':
for contractDetails in app.reqData[reqId]:
print("ContractDetails. ReqId:", reqId, contractDetails.summary.symbol,
contractDetails.summary.secType, "ConId:", contractDetails.summary.conId,
"@", contractDetails.summary.exchange)
app.reqStatus[reqId] = 'Processed'
time.sleep(0.1)
app.done = True # this stops app.run() loop
if __name__ == "__main__":
main()
dziękuję bardzo za wejście. Chętnie go wypróbuję, ale z jakiegoś powodu otrzymuję komunikat "błąd serwera" i nie mogę zalogować się do TWS w tym momencie. Wrócę, aby oznaczyć to jako odpowiedź, gdy tylko się połączę i będę mógł wypróbować twój kod. Dzięki. –
Możesz testować, logując się za pomocą edemo/demouser. To jest to co zrobiłem. Zauważyłem, że używasz 7496, co oznacza konto rzeczywiste lub konto demo. Sugeruję testowanie z demo lub kontem papierowym, jeśli je masz (papier ma zwykle port 7497). – brian
z kodem: self.done = True, funkcja Run breaks. i podniesiony błąd: wyjątek w wątku Thread-1 :, wiersz 113, w _recvAllMsg, buf = self.socket.recv (4096) AttributeError: Obiekt "NoneType" nie ma atrybutu "recv". Pomyślałem, że to spowodowane przez .disconnection. Czy widzisz ten sam problem? Masz pomysł, jak bezpiecznie odłączyć się i wyjść z programu? Dzięki. –