보드와 호스트 PC의 통신이 되도록 만들어보자.
to do list
- 호스트 PC의 모든 COM 포트 리스트를 만들고 GUI에 표시
- 보드와 연결된 COM 포트 선택 후 Connect 버튼 클릭하여 보드 정보 읽어옴
- 접속 상태를 주기적으로 점검
Phase 1. 호스트 PC의 모든 COM 포트 리스트를 만들고 GUI에 표시
이전 게시물의 updateSerial_select라는 함수에서 담당한다.
def updateSerial_select(self): self.available_port = [] ports = list(serial.tools.list_ports.comports()) for p in ports: self.available_port.append(p[0]) self.ids.Serial_select.values = self.available_port
이 함수는 생성자에서 호출하므로 앱 실행시 자동으로 실행된다.
호스트 PC의 모든 COM 포트를 읽어와서 스피너 위젯에 표시하게 된다.
스피너 위젯 자체를 클릭하면 COM 포트를 새로 읽어서 갱신한다.
Spinner: id : Serial_select text_autoupdate : True sync_height : True values : root.available_port size_hint : None, None size : 60, 30 pos : 5, 520 on_press : root.updateSerial_select()
앱 실행시
스피너 위젯 클릭시
저번 게시물에서는 여기까지 진행했었다.
Phase 2. 보드와 연결된 COM 포트 선택 후 Connect 버튼 클릭하여 보드 정보 읽어옴
COM 포트 선택 후 접속 버튼을 추가한다.
상태 라벨과 보드 정보 라벨도 추가하자.
Button: id : Serial_Con size_hint : None, None size : 90, 30 pos: 65, 520 text: 'Connect' on_press: root.Serial_Con(self) Label: id: Serial_status size_hint : None, None size : 80, 34 pos: 165, 520 font_size : 12 text : 'Not Connected' Label: id : Board_Info size_hint : None, None size : 200, 100 pos: -12, 440 font_size : 12 text : ''
접속 버튼 클릭시 Serial_Con 함수가 동작한다.
def Serial_Con(self, value): print(self.ids.Serial_select.text) if self.ser_con == False: self.ser.port = self.ids.Serial_select.text if self.ser.is_open == False: try: self.ser.open() if self.ser.isOpen(): print("port opened") else: self.ids.Serial_status.text = "port error" print("error opening : " + str(e)) return except OSError as e: self.ids.Serial_status.text = "port error" print("error opening : " + str(e)) return ver_cmd = 'VER\0' try: self.ser.reset_input_buffer() self.ser.reset_output_buffer() self.ser.write(ver_cmd.encode()) print(ver_cmd.encode()) self.ser.flush() except Exception as e: self.ids.Serial_status.text = "port error" print("error sending : " + str(e)) self.ser.close() return ################################################### # echo back ################################################### try: arg1 = self.ser.read().decode('cp1252') if len(arg1) == 0: self.ids.Serial_status.text = "port error" print("No response") self.ser.close() return except serial.SerialException as e: self.ids.Serial_status.text = 'Not Connected' self.ids.Board_Info.text = '' self.ids.Serial_Con.text = 'Connect' print("SerialException : " + str(e)) self.ser.close() return except TypeError as e: print("TypeError : " + str(e)) self.ser.close() return try: while arg1[-1] != '>': arg1 += self.ser.read().decode('cp1252') except serial.SerialException as e: self.ids.Serial_status.text = 'Not Connected' self.ids.Board_Info.text = '' self.ids.Serial_Con.text = 'Connect' print("SerialException : " + str(e)) self.ser.close() return except TypeError as e: print("TypeError : " + str(e)) self.ser.close() return print(arg1) ################################################### # Command response ################################################### try: arg1 = self.ser.read().decode('cp1252') except serial.SerialException as e: self.ids.Serial_status.text = 'Not Connected' self.ids.Board_Info.text = '' self.ids.Serial_Con.text = 'Connect' try: while arg1[-1] != '>': arg1 += self.ser.read().decode('cp1252') except: self.ids.Serial_status.text = 'Not Connected' self.ids.Board_Info.text = '' self.ids.Serial_Con.text = 'Connect' return print(arg1) print('arg1 : '+ arg1) lst1 = arg1.replace('\r\n', '') lst1 = lst1.replace('>', '') str_lst = lst1.split(' ') self.ids.Serial_status.text = 'Connected...' self.ids.Board_Info.text = 'Board Name' + ' : ' + str(str_lst[0]) + '\r\nBoard Ver' + ' : ' + str(str_lst[1]) + '\r\nBoard FW' + ' : ' + str(str_lst[2]) + '\r\nBoard MCU' + ' : ' + str(str_lst[3]) self.ids.Serial_Con.text = 'Disconnect' self.ser_con = True self.ser_con_port = self.ids.Serial_select.text check_connection = threading.Thread(target= self.chk_connection) check_connection.daemon = True check_connection.start() else: self.ser.close() self.ids.Serial_status.text = 'Not Connected' self.ids.Board_Info.text = '' self.ids.Serial_Con.text = 'Connect' self.ser_con = False
이 함수는 이렇게 동작한다.
1. 해당 COM 포트 오픈
2. 호스트 PC -> 보드 : 'VER\0'(VER + null)을 전송
3. 보드 -> 호스트 PC : 'echo VER>' 전송
4. 보드 -> 호스트 PC : '%board_name %board_version %fw_version %board_MCU'
exp)
- %board_name : MQTT-test
- %board_version : 0.0.1
- %fw_version : 0.0.1
- %board_MCU : STM32F103RCT
기존에 쓰던 보드에 연결해봤다.
소스의 분기문과 try...except는 엉뚱한 포트를 선택했을 경우에 대한 예외처리다.
Phase 3. 접속 상태를 주기적으로 점검
쓰레드를 사용해서 현재 접속중인 COM 포트가 존재하는지 5초 단위로 체크한다.
def chk_connection(self): while True: if self.ser_con == True: port_list = [] ports = list(serial.tools.list_ports.comports()) for p in ports: port_list.append(p[0]) if self.ser_con_port in port_list: print("Connected") else: self.ser.close() self.ids.Serial_status.text = 'Not Connected' self.ids.Board_Info.text = '' self.ids.Serial_Con.text = 'Connect' self.ser_con = False print("Disconnected") else: print("Disconnected") time.sleep(5)
이 함수는 무한 루프로 돌아가며, 데몬 쓰레드로 설정해야 한다.
그렇지 않으면 앱 종료 후에도 계속 돌아간다.
#. 파이썬, kv lang 전체 내용 첨부합니다.
MQTTtest_GUIv001.py
################################################### ################################################### # MQTT-test GUI # -> ver 0.0.1 # by JBE # # since 2021-03-21 ################################################### ################################################### import kivy kivy.require('1.11.1') from kivy.app import App from kivy.uix.label import Label from kivy.lang import Builder from kivy.uix.spinner import Spinner from kivy.uix.floatlayout import FloatLayout from kivy.uix.button import Button import serial import serial.tools.list_ports import threading import time from kivy.config import Config Config.set('graphics', 'width', '800') Config.set('graphics', 'height', '600') Config.set('graphics', 'resizable', 0) Config.write() #Builder.load_file('MQTTtest_GUIv001.kv') class MQTTtest_GUIv001(FloatLayout): available_port = [] ser = serial.Serial() ser.baudrate = 115200 ser.timeout = 0.5 ser.write_timeout = 0.5 ser_con = False ser_con_port = "" def __init__(self, **kwargs): super(MQTTtest_GUIv001, self).__init__(**kwargs) self.updateSerial_select() def updateSerial_select(self): self.available_port = [] ports = list(serial.tools.list_ports.comports()) for p in ports: self.available_port.append(p[0]) self.ids.Serial_select.values = self.available_port def Serial_Con(self, value): print(self.ids.Serial_select.text) if self.ser_con == False: self.ser.port = self.ids.Serial_select.text if self.ser.is_open == False: try: self.ser.open() if self.ser.isOpen(): print("port opened") else: self.ids.Serial_status.text = "port error" print("error opening : " + str(e)) return except OSError as e: self.ids.Serial_status.text = "port error" print("error opening : " + str(e)) return ver_cmd = 'VER\0' try: self.ser.reset_input_buffer() self.ser.reset_output_buffer() self.ser.write(ver_cmd.encode()) print(ver_cmd.encode()) self.ser.flush() except Exception as e: self.ids.Serial_status.text = "port error" print("error sending : " + str(e)) self.ser.close() return ################################################### # echo back ################################################### try: arg1 = self.ser.read().decode('cp1252') if len(arg1) == 0: self.ids.Serial_status.text = "port error" print("No response") self.ser.close() return except serial.SerialException as e: self.ids.Serial_status.text = 'Not Connected' self.ids.Board_Info.text = '' self.ids.Serial_Con.text = 'Connect' print("SerialException : " + str(e)) self.ser.close() return except TypeError as e: print("TypeError : " + str(e)) self.ser.close() return try: while arg1[-1] != '>': arg1 += self.ser.read().decode('cp1252') except serial.SerialException as e: self.ids.Serial_status.text = 'Not Connected' self.ids.Board_Info.text = '' self.ids.Serial_Con.text = 'Connect' print("SerialException : " + str(e)) self.ser.close() return except TypeError as e: print("TypeError : " + str(e)) self.ser.close() return print(arg1) ################################################### # Command response ################################################### try: arg1 = self.ser.read().decode('cp1252') except serial.SerialException as e: self.ids.Serial_status.text = 'Not Connected' self.ids.Board_Info.text = '' self.ids.Serial_Con.text = 'Connect' try: while arg1[-1] != '>': arg1 += self.ser.read().decode('cp1252') except: self.ids.Serial_status.text = 'Not Connected' self.ids.Board_Info.text = '' self.ids.Serial_Con.text = 'Connect' return print(arg1) print('arg1 : '+ arg1) lst1 = arg1.replace('\r\n', '') lst1 = lst1.replace('>', '') str_lst = lst1.split(' ') self.ids.Serial_status.text = 'Connected...' self.ids.Board_Info.text = 'Board Name' + ' : ' + str(str_lst[0]) + '\r\nBoard Ver' + ' : ' + str(str_lst[1]) + '\r\nBoard FW' + ' : ' + str(str_lst[2]) + '\r\nBoard MCU' + ' : ' + str(str_lst[3]) self.ids.Serial_Con.text = 'Disconnect' self.ser_con = True self.ser_con_port = self.ids.Serial_select.text check_connection = threading.Thread(target= self.chk_connection) check_connection.daemon = True check_connection.start() else: self.ser.close() self.ids.Serial_status.text = 'Not Connected' self.ids.Board_Info.text = '' self.ids.Serial_Con.text = 'Connect' self.ser_con = False def chk_connection(self): while True: if self.ser_con == True: port_list = [] ports = list(serial.tools.list_ports.comports()) for p in ports: port_list.append(p[0]) if self.ser_con_port in port_list: print("Connected") else: self.ser.close() self.ids.Serial_status.text = 'Not Connected' self.ids.Board_Info.text = '' self.ids.Serial_Con.text = 'Connect' self.ser_con = False print("Disconnected") else: print("Disconnected") time.sleep(5) class MQTTtest_GUIv001App(App): def build(self): return MQTTtest_GUIv001() if __name__ == '__main__': MQTTtest_GUIv001App().run()
MQTTtest_GUIv001.kv
<MQTTtest_GUIv001>: FloatLayout: Spinner: id : Serial_select text_autoupdate : True sync_height : True values : root.available_port size_hint : None, None size : 60, 30 pos : 5, 520 on_press : root.updateSerial_select() Button: id : Serial_Con size_hint : None, None size : 90, 30 pos: 65, 520 text: 'Connect' on_press: root.Serial_Con(self) Label: id: Serial_status size_hint : None, None size : 80, 34 pos: 165, 520 font_size : 12 text : 'Not Connected' Label: id : Board_Info size_hint : None, None size : 200, 100 pos: -12, 440 font_size : 12 text : ''
이제 펌웨어를 만들어봅시다.