2021/04/01

STM32 펌웨어 - 02.1 COM 포트 연결 - with Python Kivy

보드와 호스트 PC의 통신이 되도록 만들어보자.

to do list

  1. 호스트 PC의 모든 COM 포트 리스트를 만들고 GUI에 표시
  2. 보드와 연결된 COM 포트 선택 후 Connect 버튼 클릭하여 보드 정보 읽어옴
  3. 접속 상태를 주기적으로 점검

 

Phase 1. 호스트 PC의 모든 COM 포트 리스트를 만들고 GUI에 표시

이전 게시물의 updateSerial_select라는 함수에서 담당한다.

def updateSerial_select(self):
        self.available_port = []
        ports = list(serial.tools.list_ports.comports())
        for p in ports:
            self.available_port.append(p[0])

        self.ids.Serial_select.values = self.available_port

이 함수는 생성자에서 호출하므로 앱 실행시 자동으로 실행된다.

호스트 PC의 모든 COM 포트를 읽어와서 스피너 위젯에 표시하게 된다.


스피너 위젯 자체를 클릭하면 COM 포트를 새로 읽어서 갱신한다.

Spinner:
            id : Serial_select
            text_autoupdate : True
            sync_height : True
            values : root.available_port
            size_hint : None, None
            size : 60, 30
            pos : 5, 520
            on_press : root.updateSerial_select()

 

앱 실행시










스피너 위젯 클릭시









 

저번 게시물에서는 여기까지 진행했었다.


Phase 2. 보드와 연결된 COM 포트 선택 후 Connect 버튼 클릭하여 보드 정보 읽어옴

COM 포트 선택 후 접속 버튼을 추가한다.

상태 라벨과 보드 정보 라벨도 추가하자.

        Button:
            id : Serial_Con
            size_hint : None, None
            size : 90, 30
            pos: 65, 520
            text: 'Connect'
            on_press: root.Serial_Con(self)
        Label:
            id: Serial_status
            size_hint : None, None
            size : 80, 34
            pos: 165, 520
            font_size : 12
            text : 'Not Connected'
        Label:
            id : Board_Info
            size_hint : None, None
            size : 200, 100
            pos: -12, 440
            font_size : 12
            text : ''


접속 버튼 클릭시 Serial_Con 함수가 동작한다. 

    def Serial_Con(self, value):
        print(self.ids.Serial_select.text)
        if self.ser_con == False:
            self.ser.port = self.ids.Serial_select.text

            if self.ser.is_open == False:
                try:
                    self.ser.open()
                    if self.ser.isOpen():
                        print("port opened")
                    else:
                        self.ids.Serial_status.text = "port error"
                        print("error opening : " + str(e))
                        return
                except OSError as e:
                    self.ids.Serial_status.text = "port error"
                    print("error opening : " + str(e))
                    return

            ver_cmd = 'VER\0'
            try:
                self.ser.reset_input_buffer()
                self.ser.reset_output_buffer()
                self.ser.write(ver_cmd.encode())
                print(ver_cmd.encode())
                self.ser.flush()
            except Exception as e:
                self.ids.Serial_status.text = "port error"
                print("error sending : " + str(e))
                self.ser.close()
                return

            ###################################################
            # echo back
            ###################################################
            try:
                arg1 = self.ser.read().decode('cp1252')
                if len(arg1) == 0:
                    self.ids.Serial_status.text = "port error"
                    print("No response")
                    self.ser.close()
                    return
            except serial.SerialException as e:
                self.ids.Serial_status.text = 'Not Connected'
                self.ids.Board_Info.text = ''
                self.ids.Serial_Con.text = 'Connect'
                print("SerialException : " + str(e))
                self.ser.close()
                return
            except TypeError as e:
                print("TypeError : " + str(e))
                self.ser.close()
                return

            try:
                while arg1[-1] != '>':
                    arg1 += self.ser.read().decode('cp1252')
            except serial.SerialException as e:
                self.ids.Serial_status.text = 'Not Connected'
                self.ids.Board_Info.text = ''
                self.ids.Serial_Con.text = 'Connect'
                print("SerialException : " + str(e))
                self.ser.close()
                return
            except TypeError as e:
                print("TypeError : " + str(e))
                self.ser.close()
                return
            print(arg1)

            ###################################################
            # Command response
            ###################################################
            try:
                arg1 = self.ser.read().decode('cp1252')
            except serial.SerialException as e:

                self.ids.Serial_status.text = 'Not Connected'
                self.ids.Board_Info.text = ''
                self.ids.Serial_Con.text = 'Connect'

            try:
                while arg1[-1] != '>':
                    arg1 += self.ser.read().decode('cp1252')
            except:
                self.ids.Serial_status.text = 'Not Connected'
                self.ids.Board_Info.text = ''
                self.ids.Serial_Con.text = 'Connect'
                return
            print(arg1)

            print('arg1 : '+ arg1)
            lst1 = arg1.replace('\r\n', '')
            lst1 = lst1.replace('>', '')
            str_lst = lst1.split(' ')

            self.ids.Serial_status.text = 'Connected...'
            self.ids.Board_Info.text = 'Board Name' + ' : ' + str(str_lst[0]) + '\r\nBoard Ver' + ' : ' + str(str_lst[1]) + '\r\nBoard FW' + ' : ' + str(str_lst[2]) + '\r\nBoard MCU' + ' : ' + str(str_lst[3])
            self.ids.Serial_Con.text = 'Disconnect'
            self.ser_con = True
            self.ser_con_port = self.ids.Serial_select.text

            check_connection = threading.Thread(target= self.chk_connection)
            check_connection.daemon = True
            check_connection.start()

        else:
                self.ser.close()

                self.ids.Serial_status.text = 'Not Connected'
                self.ids.Board_Info.text = ''
                self.ids.Serial_Con.text = 'Connect'
                self.ser_con = False

이 함수는 이렇게 동작한다.

1. 해당 COM 포트 오픈

2. 호스트 PC -> 보드 : 'VER\0'(VER + null)을 전송

3. 보드 -> 호스트 PC : 'echo VER>' 전송

4. 보드 -> 호스트 PC : '%board_name %board_version %fw_version %board_MCU'

    exp)

  • %board_name : MQTT-test
  • %board_version : 0.0.1
  • %fw_version : 0.0.1
  • %board_MCU : STM32F103RCT

기존에 쓰던 보드에 연결해봤다.












소스의 분기문과 try...except는 엉뚱한 포트를 선택했을 경우에 대한 예외처리다.


Phase 3. 접속 상태를 주기적으로 점검

쓰레드를 사용해서 현재 접속중인 COM 포트가 존재하는지 5초 단위로 체크한다.

    def chk_connection(self):
        while True:
            if self.ser_con == True:
                port_list = []
                ports = list(serial.tools.list_ports.comports())
                for p in ports:
                    port_list.append(p[0])

                if self.ser_con_port in port_list:
                    print("Connected")
                else:
                    self.ser.close()
                    self.ids.Serial_status.text = 'Not Connected'
                    self.ids.Board_Info.text = ''
                    self.ids.Serial_Con.text = 'Connect'
                    self.ser_con = False
                    print("Disconnected")

            else:
                print("Disconnected")

            time.sleep(5)

이 함수는 무한 루프로 돌아가며, 데몬 쓰레드로 설정해야 한다. 

그렇지 않으면 앱 종료 후에도 계속 돌아간다.


#. 파이썬, kv lang 전체 내용 첨부합니다.

MQTTtest_GUIv001.py

###################################################
###################################################
#    MQTT-test GUI
#    -> ver 0.0.1
#    by JBE
#
#    since 2021-03-21
###################################################
###################################################
import kivy
kivy.require('1.11.1')

from kivy.app import App
from kivy.uix.label import Label
from kivy.lang import Builder
from kivy.uix.spinner import Spinner
from kivy.uix.floatlayout import FloatLayout
from kivy.uix.button import Button

import serial
import serial.tools.list_ports

import threading
import time

from kivy.config import Config
Config.set('graphics', 'width', '800')
Config.set('graphics', 'height', '600')
Config.set('graphics', 'resizable', 0)
Config.write()

#Builder.load_file('MQTTtest_GUIv001.kv')

class MQTTtest_GUIv001(FloatLayout):
    available_port = []

    ser = serial.Serial()
    ser.baudrate = 115200
    ser.timeout = 0.5
    ser.write_timeout = 0.5
    ser_con = False
    ser_con_port = ""

    def __init__(self, **kwargs):
        super(MQTTtest_GUIv001, self).__init__(**kwargs)
        self.updateSerial_select()

    def updateSerial_select(self):
        self.available_port = []
        ports = list(serial.tools.list_ports.comports())
        for p in ports:
            self.available_port.append(p[0])

        self.ids.Serial_select.values = self.available_port

    def Serial_Con(self, value):
        print(self.ids.Serial_select.text)
        if self.ser_con == False:
            self.ser.port = self.ids.Serial_select.text

            if self.ser.is_open == False:
                try:
                    self.ser.open()
                    if self.ser.isOpen():
                        print("port opened")
                    else:
                        self.ids.Serial_status.text = "port error"
                        print("error opening : " + str(e))
                        return
                except OSError as e:
                    self.ids.Serial_status.text = "port error"
                    print("error opening : " + str(e))
                    return

            ver_cmd = 'VER\0'
            try:
                self.ser.reset_input_buffer()
                self.ser.reset_output_buffer()
                self.ser.write(ver_cmd.encode())
                print(ver_cmd.encode())
                self.ser.flush()
            except Exception as e:
                self.ids.Serial_status.text = "port error"
                print("error sending : " + str(e))
                self.ser.close()
                return

            ###################################################
            # echo back
            ###################################################
            try:
                arg1 = self.ser.read().decode('cp1252')
                if len(arg1) == 0:
                    self.ids.Serial_status.text = "port error"
                    print("No response")
                    self.ser.close()
                    return
            except serial.SerialException as e:
                self.ids.Serial_status.text = 'Not Connected'
                self.ids.Board_Info.text = ''
                self.ids.Serial_Con.text = 'Connect'
                print("SerialException : " + str(e))
                self.ser.close()
                return
            except TypeError as e:
                print("TypeError : " + str(e))
                self.ser.close()
                return

            try:
                while arg1[-1] != '>':
                    arg1 += self.ser.read().decode('cp1252')
            except serial.SerialException as e:
                self.ids.Serial_status.text = 'Not Connected'
                self.ids.Board_Info.text = ''
                self.ids.Serial_Con.text = 'Connect'
                print("SerialException : " + str(e))
                self.ser.close()
                return
            except TypeError as e:
                print("TypeError : " + str(e))
                self.ser.close()
                return
            print(arg1)

            ###################################################
            # Command response
            ###################################################
            try:
                arg1 = self.ser.read().decode('cp1252')
            except serial.SerialException as e:

                self.ids.Serial_status.text = 'Not Connected'
                self.ids.Board_Info.text = ''
                self.ids.Serial_Con.text = 'Connect'

            try:
                while arg1[-1] != '>':
                    arg1 += self.ser.read().decode('cp1252')
            except:
                self.ids.Serial_status.text = 'Not Connected'
                self.ids.Board_Info.text = ''
                self.ids.Serial_Con.text = 'Connect'
                return
            print(arg1)

            print('arg1 : '+ arg1)
            lst1 = arg1.replace('\r\n', '')
            lst1 = lst1.replace('>', '')
            str_lst = lst1.split(' ')

            self.ids.Serial_status.text = 'Connected...'
            self.ids.Board_Info.text = 'Board Name' + ' : ' + str(str_lst[0]) + '\r\nBoard Ver' + ' : ' + str(str_lst[1]) + '\r\nBoard FW' + ' : ' + str(str_lst[2]) + '\r\nBoard MCU' + ' : ' + str(str_lst[3])
            self.ids.Serial_Con.text = 'Disconnect'
            self.ser_con = True
            self.ser_con_port = self.ids.Serial_select.text

            check_connection = threading.Thread(target= self.chk_connection)
            check_connection.daemon = True
            check_connection.start()

        else:
                self.ser.close()

                self.ids.Serial_status.text = 'Not Connected'
                self.ids.Board_Info.text = ''
                self.ids.Serial_Con.text = 'Connect'
                self.ser_con = False

    def chk_connection(self):
        while True:
            if self.ser_con == True:
                port_list = []
                ports = list(serial.tools.list_ports.comports())
                for p in ports:
                    port_list.append(p[0])

                if self.ser_con_port in port_list:
                    print("Connected")
                else:
                    self.ser.close()
                    self.ids.Serial_status.text = 'Not Connected'
                    self.ids.Board_Info.text = ''
                    self.ids.Serial_Con.text = 'Connect'
                    self.ser_con = False
                    print("Disconnected")

            else:
                print("Disconnected")

            time.sleep(5)


class MQTTtest_GUIv001App(App):
    def build(self):
        return MQTTtest_GUIv001()

if __name__ == '__main__':
    MQTTtest_GUIv001App().run()


MQTTtest_GUIv001.kv

<MQTTtest_GUIv001>:
    FloatLayout:
        Spinner:
            id : Serial_select
            text_autoupdate : True
            sync_height : True
            values : root.available_port
            size_hint : None, None
            size : 60, 30
            pos : 5, 520
            on_press : root.updateSerial_select()
        Button:
            id : Serial_Con
            size_hint : None, None
            size : 90, 30
            pos: 65, 520
            text: 'Connect'
            on_press: root.Serial_Con(self)
        Label:
            id: Serial_status
            size_hint : None, None
            size : 80, 34
            pos: 165, 520
            font_size : 12
            text : 'Not Connected'
        Label:
            id : Board_Info
            size_hint : None, None
            size : 200, 100
            pos: -12, 440
            font_size : 12
            text : ''


이제 펌웨어를 만들어봅시다.