基于python串口通信简单实现物联网设备的自动化测试

2020-11-28 10:00:00
Man-Li
转贴:
CSDN
13166
摘要:如何基于python串口通信简单实现物联网设备的自动化测试?

1.环境

python2.7
serial 库

2.AT command

什么是AT command:

https://baike.baidu.com/item/AT命令/3441555?fr=aladdin

3.实现


4.假设环境

有一款物联网设备,主要功能连接MTQQ实现数据的推送,有:
1.devices建立与服务器的新的MQTT和TCP连接
	AT+NEW=<server>,<port>,<command_timeout>
	【server :  0.0.0.0   MQTTservers 地址】
	【port:  5001    端口】
	【command_timeout:  100ms   延时】
	Response
	a.	OK  连接成功
	b.	ERROR  连接错误
	c.	Timeout  连接超时
2.devices发送MQTT断开数据包
	AT + DISCON
	Response
	a.	OK\r\n
	b.	ERROR\r\n
3.devices重启
	AT + Reboot
	Response
	a.	OK\r\n
	
测试案例:连接成功 10s后断开  重复100次。

5.代码实现,因为项目存在研发保密,测试脚本只体现大致思路


#!/usr/bin/env python
# -*- coding=utf-8 -*-
__author__ = 'man' 
'''
 AT Command 自动化测试实验;
'''
import sys
import time
import serial
class AT_Command:
	#port 串口  COM1
	#baudrate 波特率  9000
	def __init__(self,port,baudrate):
        self.serial_port = serial.Serial()
		self.serial_port.baudrate = baudrate
		self.serial_port.port = port
		self.serial_port.timeout = 30 # 30 seconds
		self.serial_port.open()
		if self.serial_port.is_open != True:
			print "Open Serial Port "+self.serial_port.port+" Failed!!!"
			exit()
		elif self.serial_port.is_open == True:
			print "Open Serial Port "+self.serial_port.port+" Passed!!!"
	
	#ATcomd is string  输入的CMD
	#seek is list 匹配的子串列表
	#timeout is int  规定匹配的时间
	def input_cmd(self,ATcomd,seek,timeout=20):
	    print "input command : "+str(ATcomd)
	    #self.serial_port.flushInput()
	    self.serial_port.write(ATcomd)
	    read_buffer = ""
	    timeout_number=0
	    while(timeout_number<timeout):
	        read_buffer = read_buffer + self.serial_port.read()
	        if read_buffer in seek:
	            print "response :   ",read_buffer
	            return read_buffer
	            break;
	        else:
	            time.sleep(0.1)
	            timeout_number+=1
	    print "not response"
	    return "not response"
	  
	def input_hex(self,send_data,seek,timeout=20):
	    #serial_port.flushInput()
	    self.serial_port.write(send_data.decode("hex"))
	    print "Sended data: ", send_data.decode("hex")
	    read_buffer = ""
	    timeout_number=0
	    while(timeout_number<timeout):
	        read_buffer = read_buffer + self.serial_port.read()
	        if read_buffer in seek:
	            print "response :   ",read_buffer
	            return read_buffer
	            break;
	        else:
	            time.sleep(0.1)
	            timeout_number+=1
	    print "not response"
	    return "not response"
def testcase():
	atcommand = AT_Command('COM1','9000')
	conn_response = ['OK','ERROR','Timeout']
	dsconn_response = ['OK','ERROR']
	reboot_response = ['OK']
	passed_number = 0
	case_number = 0
	while(case_number<100):
		conn = atcommand.input_cmd('AT+NEW=1.1.1.1,80,10',conn_response)
		if conn == 'OK':
			time.sleep(10)
			dsconn = atcommand.input_cmd('AT + DISCON',dsconn_response )
			if dsconn == 'OK':
				print "Passed"
				passed_number+=1
			elif dsconn == 'ERROR':
				print "Failed; dscon fail"
			else:
				print "Failed; dscon time out"
		elif conn == 'ERROR':
			print "Failed; con ERROR"
		elif conn == 'Timeout':
			print "Failed; con Timeout"
		else:
			print "Failed; con time out"
		atcommand.input_cmd('AT + Reboot',reboot_response)
		case_number+=1
	fail_number = case_number - passed_number
	return passed_number,fail_number 
if __name__=="__main__":
	testcase()


发表评论
评论通过审核后显示。
联系我们
  • 联系人:阿道
  • 联系方式: 17762006160
  • 地址:青岛市黄岛区长江西路118号青铁广场18楼