Вопрос по serial-port, arduino, python, pyserial – pyserial - как прочитать последнюю строку, отправленную с последовательного устройства

19

У меня есть Arduino, подключенный к моему компьютеру, который выполняет цикл, отправляя значение через последовательный порт обратно на компьютер каждые 100 мсек.

Я хочу сделать скрипт Python, который будет читать с последовательного порта только каждые несколько секунд, поэтому я хочу, чтобы он просто видел последнюю вещь, отправленную с Arduino.

Как вы делаете это в Pyserial?

Вот код, который я пробовал, который не работает. Он читает строки последовательно.

import serial
import time

ser = serial.Serial('com4',9600,timeout=1)
while 1:
    time.sleep(10)
    print ser.readline() #How do I get the most recent line sent from the device?

Ваш Ответ

10   ответов
3

Этот метод позволяет вам отдельно контролировать время ожидания для сбора всех данных для каждой строки и другое время ожидания для ожидания на дополнительных строках.

# get the last line from serial port
lines = serial_com()
lines[-1]              

def serial_com():
    '''Serial communications: get a response'''

    # open serial port
    try:
        serial_port = serial.Serial(com_port, baudrate=115200, timeout=1)
    except serial.SerialException as e:
        print("could not open serial port '{}': {}".format(com_port, e))

    # read response from serial port
    lines = []
    while True:
        line = serial_port.readline()
        lines.append(line.decode('utf-8').rstrip())

        # wait for new data after each line
        timeout = time.time() + 0.1
        while not serial_port.inWaiting() and timeout > time.time():
            pass
        if not serial_port.inWaiting():
            break 

    #close the serial port
    serial_port.close()   
    return lines
Спасибо за это. Я не смог найти другого ответа, который бы на самом деле возвращал все строки из последовательного ответа.
2

Небольшая модификация для mtasic & amp; Код Vinay Sajip:

Хотя я нашел этот код довольно полезным для меня для аналогичного приложения, мне нужно былоall линии, возвращающиеся из последовательного устройства, которые будут периодически отправлять информацию.

Я решил вытолкнуть первый элемент сверху, записать его, а затем снова присоединиться к оставшимся элементам в качестве нового буфера и продолжить оттуда.

Я понимаю, что этоnot то, что просил Грег, но я подумал, что стоит поделиться в качестве дополнительной заметки.

def receiving(ser):
    global last_received

    buffer = ''
    while True:
        buffer = buffer + ser.read(ser.inWaiting())
        if '\n' in buffer:
            lines = buffer.split('\n')
            last_received = lines.pop(0)

            buffer = '\n'.join(lines)
2

Вам понадобится цикл для чтения всего отправленного, с последним вызовом readline (), блокирующим до истечения времени ожидания. Так:

def readLastLine(ser):
    last_data=''
    while True:
        data=ser.readline()
        if data!='':
            last_data=data
        else:
            return last_data
2

С помощью.inWaiting() Внутри бесконечного цикла может возникнуть проблема. Это может поглотить весьЦПУ в зависимости от реализации. Вместо этого я бы рекомендовал использовать данные определенного размера для чтения. Так что в этом случае, например, следует сделать следующее:

ser.read(1024)
7

Эти решения будут загружать процессор в ожидании символов.

Вы должны сделать хотя бы один блокирующий вызов для чтения (1)

while True:
    if '\n' in buffer: 
        pass # skip if a line already in buffer
    else:
        buffer += ser.read(1)  # this will block until one more char or timeout
    buffer += ser.read(ser.inWaiting()) # get remaining buffered chars

... и делай вещи, как раньше.

2

Too much complications

В чем причина разбивать объект bytes на новую строку или другие манипуляции с массивами? Я пишу самый простой способ, который решит вашу проблему:

import serial
s = serial.Serial(31)
s.write(bytes("ATI\r\n", "utf-8"));
while True:
    last = ''
    for byte in s.read(s.inWaiting()): last += chr(byte)
    if len(last) > 0:
        # Do whatever you want with last
        print (bytes(last, "utf-8"))
        last = ''
5

Ты можешь использоватьser.flushInput() очистить все последовательные данные, которые в настоящее время находятся в буфере.

После очистки старых данных вы можете использовать ser.readline () для получения самых последних данных с последовательного устройства.

Я думаю, что это немного проще, чем другие предлагаемые здесь решения. Работал на меня, надеюсь, он подходит вам.

11
from serial import *
from threading import Thread

last_received = ''

def receiving(ser):
    global last_received
    buffer = ''

    while True:
        # last_received = ser.readline()
        buffer += ser.read(ser.inWaiting())
        if '\n' in buffer:
            last_received, buffer = buffer.split('\n')[-2:]

if __name__ ==  '__main__':
    ser = Serial(
        port=None,
        baudrate=9600,
        bytesize=EIGHTBITS,
        parity=PARITY_NONE,
        stopbits=STOPBITS_ONE,
        timeout=0.1,
        xonxoff=0,
        rtscts=0,
        interCharTimeout=None
    )

    Thread(target=receiving, args=(ser,)).start()
Смотрите мой обновленный ответ,mtasicКод выглядит хорошо, за исключением того, что я считаю одним небольшим затруднением.
Так у last_received всегда будет то, что мне нужно? Есть ли способ сделать это с readline? Greg
это верно, в соответствии с вашим вопросом
Хорошо, что читается как общая сумма того, что находится в буфере приема. У меня сложилось впечатление, что спрашивающий разграничивает то, что arduino отправляет с помощью новых строк, поэтому он, вероятно, не будет соответствовать размеру буфера приема.
Ваше обновление почти правильно. Он устанавливает пустую строку, если буфер заканчивается новой строкой. Смотрите мое дальнейшее обновление ответа.
28

Возможно, я неправильно понимаю ваш вопрос, но, поскольку он представляет собой последовательную линию, вам придется читать все, что отправлено с Arduino последовательно, - он будет буферизироваться в Arduino до тех пор, пока вы его не прочитаете.

Если вы хотите, чтобы отображался статус, показывающий последнюю отправленную информацию - используйте поток, который включает в себя код в вашем вопросе (без спящего режима), и оставьте последнюю полную строку прочитанной как последнюю строку из Arduino.

Update: mtasicПример кода довольно хорош, но если Arduino отправил частичную строку, когдаinWaiting() называется, вы получите усеченную строку. Вместо этого, то, что вы хотите сделать, это поставить последнийcomplete линия вlast_receivedи сохранить частичную линию вbuffer так что это может быть добавлено в следующий раз вокруг цикла. Что-то вроде этого:

def receiving(ser):
    global last_received

    buffer_string = ''
    while True:
        buffer_string = buffer_string + ser.read(ser.inWaiting())
        if '\n' in buffer_string:
            lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries
            last_received = lines[-2]
            #If the Arduino sends lots of empty lines, you'll lose the
            #last filled line, so you could make the above statement conditional
            #like so: if lines[-2]: last_received = lines[-2]
            buffer_string = lines[-1]

Что касается использованияreadline()Вот что должна сказать документация Pyserial (слегка отредактированная для ясности и с упоминанием readlines ()):

Be careful when using "readline". Do specify a timeout when opening the serial port, otherwise it could block forever if no newline character is received. Also note that "readlines()" only works with a timeout. It depends on having a timeout and interprets that as EOF (end of file).

что мне кажется вполне разумным!

0

Вот пример использования оболочки, которая позволяет вам читать самую последнюю строку без 100% CPU

class ReadLine:
    """
    pyserial object wrapper for reading line
    source: https://github.com/pyserial/pyserial/issues/216
    """
    def __init__(self, s):
        self.buf = bytearray()
        self.s = s

    def readline(self):
        i = self.buf.find(b"\n")
        if i >= 0:
            r = self.buf[:i + 1]
            self.buf = self.buf[i + 1:]
            return r
        while True:
            i = max(1, min(2048, self.s.in_waiting))
            data = self.s.read(i)
            i = data.find(b"\n")
            if i >= 0:
                r = self.buf + data[:i + 1]
                self.buf[0:] = data[i + 1:]
                return r
            else:
                self.buf.extend(data)

s = serial.Serial('/dev/ttyS0')
device = ReadLine(s)
while True:
    print(device.readline())

Похожие вопросы