본문 바로가기
AyoProject/Ayotera-Trade

[AT] 10. 증권사 API연동 소스 Refectoring

by 청양호박이 2020. 2. 27.

이번에는 기존에 작성했던 소스에 대해서 실제로 사용이 가능하고, 향후 다른코드에서 재활용이 가능하도록 Refectoring을 진행해 보겠습니다. 가장 큰 문제는 아래와 같은 소스상 문제로 한번 python프로그램이 실행되면 강제로 console에서 종료하기 전까지 실행이 되게 됩니다.

if __name__ == "__main__":
    app = QApplication(sys.argv)
    test = KiwoomAPI()
    test.login()
    app.exec_()

그 이유는 QApplication을 실행 후 별도로 종료하는 로직이 없기 때문입니다. 사실 이렇게 한 이유는 키움증권 Open API+ 특성상 서버로 dynamicCall을 보내고 그에따른 event를 기다려야 하기 때문에 프로그램이 종료되지 않고 계속 동작하는 상태여야 했기 때문입니다. 이를 보완하기 위해서 아래의 2가지 Refectoring을 진행하겠습니다.

 

  • 이벤트 리스너를 자체 dynamicCall안에 구성
  • 기능의 모듈화

 

 

1. 이벤트 리스너 내재화


내재화의 가장 큰 핵심은 기존에 QApplication에서의 이벤트 리스너 처리를 메서드 단으로 분산해서 작성하는 것입니다. 예를 들어, 이벤트를 발생시키는 CommConnect 메서드를 사용할때 호출하는 지점에서 이벤트 리스너를 생성해주고, 이벤트를 받아서 처리하는 메서드에서는 이벤트 리스너를 종료해주는 방식입니다.

 

[생성자 유지]

    def __init__(self):
        # QAxWidget Instance
        self.OCXConn = QAxWidget("KHOPENAPI.KHOpenAPICtrl.1")
        # Event Listener
        self.OCXConn.OnEventConnect.connect(self.connEvent)
        self.OCXConn.OnReceiveTrData.connect(self.trEvent)
        # time.sleep
        self.sleepDuration = 0.2

[로그인처리]

    def CommConnect(self):
        self.OCXConn.dynamicCall("CommConnect()")
        self.ConnEventLoop = QEventLoop()
        self.ConnEventLoop.exec_()

    def connEvent(self, nErrCode):
        if nErrCode == 0:
            print('로그인 성공')
        else:
            print('로그인 실패')
        self.ConnEventLoop.exit()

다음과 같이, 이벤트를 발생시키는 dynamicCall을 실행할때 QEventLoop( )로 이벤트 객체를 생성하고 exec_( )로 리스너를 동작합니다. 그렇게되면 이벤트 객체가 종료될때까지 시스템은 기다리게 되고... 실제 이벤트가 발생해서 위의 생성자에서 class에 대한 인스턴스 생성 시, 등록된 connect에 따라서 connEvent( ) 메서드를 호출하게 됩니다.

 

주의할점!!! QEventLoop( )는 동기식(sync)으로 동작하기 때문에 응답이 오고 종료가 될때까지 프로그램은 다른 동작을 수행하지 않습니다.

 

중요한건 해당 connEvent( )에서는 자체 처리를 하고 마지막에 이벤트 객체를 종료시키는 exit( )를 실행합니다. 이렇게 하게되면 로그인을 하면 기존에는 프로그램이 종료되지 않지만, Refectoring 코드에서는 바로 종료되게 됩니다.

if __name__ == "__main__":
    app = QApplication(sys.argv)
    kd = KiwoomDaily()
    kd.CommConnect()

 

 

2. 기능의 모듈화


그렇다면 기존에 엉겨붙어 있던 기능들을 가능한 모두 모듈화를 진행하겠습니다. 하지만 크게 고민할것은 없습니다. 왜냐하면, 개발자 가이드에 제공되는 메서드들을 그냥 함수화 시키며 되기 때문입니다. class내에서는 그렇게 모듈화하고 실제 __main__에서 호출해서 사용하면 되니까요.

 

[주요메서드 모듈화]

    def SetInputValue(self, sID, sValue):
        self.OCXconn.dynamicCall("SetInputValue(QString, QString)", sID, sValue)

    def CommRqData(self, sRQName, sTrCode, nPrevNext, BSTR):
        self.OCXConn.dynamicCall("CommRqData(QString, QString, QString, QString)", sRQName, sTrCode, nPrevNext, BSTR)
        # 이벤트 발생시키는 메서드에는 항상 EventLoop Exec 구현
        self.TrEventLoop = QEventLoop()
        self.TrEventLoop.exec_()
        
    def GetCommData(self, strTrCode, strRecordName, nIndex, strItemName):
        return self.OCXconn.dynamicCall("GetCommData(QString, QString, int, QString)", strTrCode, strRecordName, nIndex, strItemName).strip()

    def GetRepeatCnt(self, sTrCode, sRQName):
        return self.OCXconn.dynamicCall("GetRepeatCnt(QString, QString)", sTrCode, sRQName);

TR을 요청하고, 결과의 Data를 받아오는데 필요한 주요메서드는 위와같이 모듈화를 진행합니다. 향후에는 __main__에서 해당 메서드를 호출하면 됩니다.

 

[opt10001 TR요청]

TR을 요청하기 위해서는 SetInputValue와 CommRqData를 호출하여 적당한 파라미터를 넣고 전송합니다. 그리고 이벤트를 통해서 오면, GetCommData를 통해서 값을 가져오는데... 한가지 문제가 있습니다. 내가 호출한 메서드는 이벤트를 요청할 뿐이고, 실제로 값이 도출되는 메서드는 다른메서드 입니다. 따라서 __main__에서 그 값을 바로 return받아서 사용할 수가 없습니다.

 

결국 이벤트를 통해서 값이 전달되는 모든 방식은 수행전에 값을 저장할 일정한 공간을 생성한 후, TR요청을 진행하고 이벤트가 왔을때 처리하는 메서드에서는 해당 공간에 값을 넣어주는 방식으로 로직을 구현해야 합니다. 그럼 실제 코드로 보겠습니다.

    # tr전체 이벤트 처리
    def trEvent(self, sScrNo, sRQName, sTrCode, sRecordName, sPreNext, nDataLength, sErrorCode, sMessage, sSplmMsg):
        # sScrNo(화면번호), sRQName(사용자구분), sTrCode(Tran명), sRecordName(레코드명), sPreNext(연속조회 유무)
        if sRQName == 'AT_opt10001':
            self.AT_opt10001(sTrCode, sRQName)
        # 이벤트 처리에는 항상 EventLoop Exit( )구현
        self.TrEventLoop.exit()

    def AT_opt10001(self, sTrCode, sRQName):
        self.opt10001_res["종목명"] = self.GetCommData(sTrCode, sRQName, 0, "종목명")


if __name__ == "__main__":
    app = QApplication(sys.argv)
    kd = KiwoomDaily()
    kd.CommConnect()

    # opt10001 TR요청
    kd.opt10001_res = {}
    kd.SetInputValue("종목코드", "005930")
    kd.CommRqData("AT_opt10001", "opt10001", "0", "0101")

    print(kd.opt10001_res["종목명"])

opt10001을 요청할때 클래스내 dict 변수를 하나 생성해줍니다. 해당 변수는 클래스 내 메서드에서도 사용이 가능해집니다. OnReceiveTrData 이벤트가 발생하면, trEvent( )메서드로 connect가 되고 내부 분기로 인해서 AT_opt10001( )메서드까지 오게 됩니다. 해당 메서드에서는 GetCommData( )를 호출해서 값을 가져오고, 이를 dict 변수에 저장합니다.

 

그리고 실제 해당 dict변수에 저장된 값을 확인해보면...

다음과 같이 정상적으로 구현이 된점을 확인가능합니다. 다른 TR도 해당 방법으로 순차적으로 구현하면 되겠습니다.

 

[opt10081 TR요청]

주식일봉차트조회 TR은 현재부터 해당 종목이 상장하는 시점까지의 모든 데이터를 600개씩 가져오기 때문에 오래된 종목은 여러번 가져와야 합니다. 지난번에 구현한 부분으로 이번에는 응답이 오는 부분을 모두 모듈화를 했기 때문에 반복해서 호출하는 부분은 __main__에서 사용자가 구현해야 합니다.

 

게다가 opt10081에는 많은 인자를 포함해서 보내기 때문에 class내에서 알아서 데이터가 있는부분까지 반복해서 호출한 다음 결과만 리턴하기에는 구현이 모듈화가 불가능한 부분이 생깁니다. 

    # tr전체 이벤트 처리
    def trEvent(self, sScrNo, sRQName, sTrCode, sRecordName, sPreNext, nDataLength, sErrorCode, sMessage, sSplmMsg):
        # sScrNo(화면번호), sRQName(사용자구분), sTrCode(Tran명), sRecordName(레코드명), sPreNext(연속조회 유무)
        if sPreNext == '2': self.haveNext = True
        else: self.haveNext = False

        if sRQName == 'AT_opt10001':
            self.AT_opt10001(sTrCode, sRQName)
        elif sRQName == 'AT_opt10081':
            self.AT_opt10081(sTrCode, sRQName)
        # 이벤트 처리에는 항상 EventLoop Exit( )구현
        self.TrEventLoop.exit()

다음과 같이 sPreNext가 '0' 인지 '2'인지 구분해서 다음데이터가 있는지 없는지 판단하는 변수를 생성하여 boolean값을 할당해줍니다. 그 다음 __main__에서는 해당 구분자가 True일 경우 계속 반복해서 데이터를 요청하는 CommRqData를 실행하면 됩니다.

    # opt10081 TR요청
    kd.opt10081_res = {'date': [], 'open': [], 'high': [], 'low': [], 'close': [], 'volume': []}
    kd.SetInputValue("종목코드", "000020")
    kd.SetInputValue("기준일자", "20200221")
    kd.SetInputValue("수정주가구분", "1")
    kd.CommRqData("AT_opt10081", "opt10081", "0", "0101")

    while kd.haveNext:
        time.sleep(kd.sleepDuration)
        kd.SetInputValue("종목코드", "000020")
        kd.SetInputValue("기준일자", "20200221")
        kd.SetInputValue("수정주가구분", "1")
        kd.CommRqData("AT_opt10081", "opt10081", "2", "0101")

실질적으로 데이터를 받는 부분은 아래와 같이 하고 구체화 하면 되겠습니다. 

    def AT_opt10081(self, sTrCode, sRQName):
        dataCnt = self.GetRepeatCnt(sTrCode, sRQName);
        print('가져온 갯수 : ', dataCnt)

        for i in range(dataCnt):
            date = self.GetCommData(sTrCode, sRQName, i, "일자").strip()
            finish = self.GetCommData(sTrCode, sRQName, i, "현재가").strip()
            mount = self.GetCommData(sTrCode, sRQName, i, "거래량").strip()
            start = self.GetCommData(sTrCode, sRQName, i, "시가").strip()
            high = self.GetCommData(sTrCode, sRQName, i, "고가").strip()
            low = self.GetCommData(sTrCode, sRQName, i, "저가").strip()
            # print(date, finish, mount, start, high, low)

opt10081에 대해서는 실제 DB에 저장하는 로직을 포함해서 구현해 보겠습니다.

 

-Ayotera Lab-

댓글