使用python操作串口
一、
為了使用python操作串口,首先需要下載相關(guān)模塊:
1. pyserial (http://pyserial.wiki./pySerial)
2. pywin32 (http:///projects/pywin32/)
二、
google “python 串口 操作”關(guān)鍵字,找到相關(guān)python代碼,
我是從http://currentlife.blog.sohu.com/53741351.html頁(yè)面上拷貝的代碼。
咱得參考人家的代碼修改。
三、
發(fā)送數(shù)據(jù)可用chr和pack組裝處理,如:
snd = ''
snd += chr(97)
data = 0x12345678
snd += pack.('i', data)
snd += chr(0x64)
self.l_serial.write(snd);
#發(fā)送的數(shù)據(jù)是(16進(jìn)制):61 78 56 34 12 64
接收的數(shù)據(jù)用ord函數(shù),將字節(jié)內(nèi)容變?yōu)檎麛?shù),進(jìn)行判斷處理。
如:if ord(recv[2])== 0x01:
判斷recv[2]是否是0x01.
注意:不能這樣比較
if recv[2] == 'a':
pass
也不能這樣比較
if recv[2] == 0x97:
pass
因?yàn)?/span>python的字符串存儲(chǔ)機(jī)制我不清楚,所以不知道為什么這樣比較不可以。
帖點(diǎn)代碼,依據(jù)前面的參考代碼修改的:
#coding=gb18030
import sys,threading,time; import serial; import binascii,encodings; import re; import socket; from struct import *;
class ComThread: def __init__(self, Port=0): self.l_serial = None; self.alive = False; self.waitEnd = None; self.port = Port;
def waiting(self): if not self.waitEnd is None: self.waitEnd.wait();
def SetStopEvent(self): if not self.waitEnd is None: self.waitEnd.set(); self.alive = False; self.stop();
def start(self): self.l_serial = serial.Serial(); self.l_serial.port = self.port; self.l_serial.baudrate = 9600; self.l_serial.timeout = 2; self.l_serial.open();
if self.l_serial.isOpen(): self.waitEnd = threading.Event(); self.alive = True; self.thread_read = None; self.thread_read = threading.Thread(target=self.FirstReader); self.thread_read.setDaemon(1); self.thread_read.start(); return True; else: return False;
def FirstReader(self): while self.alive: # 接收間隔 time.sleep(0.1); try: data = ''; n = self.l_serial.inWaiting(); if n: data = data + self.l_serial.read(n); for l in xrange(len(data)): print '%02X' % ord(data[l]),
# 發(fā)送數(shù)據(jù) snddata = ''; snddata += chr(97) tt = 0x12345678; snddata += pack('i', tt) snddata += chr(0x64) self.l_serial.write(snddata);
# 判斷結(jié)束 if len(data) > 0 and ord(data[len(data)-1])==0x45: #pass; break; except Exception, ex: print str(ex);
self.waitEnd.set(); self.alive = False;
def stop(self): self.alive = False; self.thread_read.join(); if self.l_serial.isOpen(): self.l_serial.close();
#測(cè)試用部分 if __name__ == '__main__': rt = ComThread(); try: if rt.start(): rt.waiting(); rt.stop(); else: pass; except Exception,se: print str(se);
if rt.alive: rt.stop();
print ''; print 'End OK .'; del rt;
|