Вопрос по memorystream, stream, c# – Есть ли в памяти поток, который блокирует как файловый поток

10

используя библиотеку, которая требует, чтобы я предоставил объект, который реализует этот интерфейс:

public interface IConsole {
    TextWriter StandardInput { get; }
    TextReader StandardOutput { get; }
    TextReader StandardError { get; }
}

Предмет'Затем читатели привыкнут к библиотеке с:

IConsole console = new MyConsole();
int readBytes = console.StandardOutput.Read(buffer, 0, buffer.Length);

Обычно класс, реализующий IConsole, имеет поток StandardOutput как поступающий от внешнего процесса. В этом случае вызовы console.StandardOutput.Read работают путем блокирования, пока в поток StandardOutput не будут записаны некоторые данные.

Что я'я пытаюсь сделать, это создать тестовую реализацию IConsole, которая использует MemoryStreams и echo 'Что бы ни появлялось в StandardInput обратно в StandardInput. Я старался:

MemoryStream echoOutStream = new MemoryStream();
StandardOutput = new StreamReader(echoOutStream);

Но проблема в том, что console.StandardOutput.Read будет возвращать 0, а не блокировать, пока не появятся данные. Могу ли я в любом случае заставить MemoryStream заблокировать, если нет доступных данных или есть другой поток памяти, который я мог бы использовать?

@ Martinv.Löэто былоникогда не о том, что вы не должныне делать. Это'о том, что вы можете сделать с помощью предоставленных инструментов. Умение читать и писать из одного потока должно быть легко, но этонет. Brain2000
Я могу'Я не верю. Net неэто не встроено. ЯЯ беру SQL VDI, который записывает необработанные данные резервного копирования в поток .NET, а затем мне нужно прочитать из этого потока и записать его в вывод PSSession удаленного PowerShell. К сожалению, я могуКажется, что поток читается, потому что он всегда возвращает 0. Brain2000
Вы действительно не должнычтение из выходного потока. Martin v. Löwis

Ваш Ответ

3   ответа
0

Я собираюсь добавить еще одну улучшенную версию EchoStream. Это сочетание двух других версий, а также некоторые предложения из комментариев.

ОБНОВИТЬ - Я тестировал этот EchoStream с более чем 50 терабайтами данных, проходящими через него в течение нескольких дней подряд. Тест проводился между сетевым потоком и потоком сжатия ZStandard. Асинхронность также была проверена, что привело к появлению редких условий на поверхности. Похоже, что встроенный System.IO.Stream не ожидает, что он будет одновременно вызывать ReadAsync и WriteAsync в одном и том же потоке, что может привести к зависанию, еслилюбые доступные данные, потому что оба вызова используют одни и те же внутренние переменные. Поэтому мне пришлось переопределить те функции, которые решили проблему зависания.

В этой версии следующие функции.

1) Это было написано с нуля с использованием базового класса System.IO.Stream вместо MemoryStream.

2) Конструктор может установить максимальную глубину очереди, и если этот уровень будет достигнут, тогда запись потока будет блокироваться до тех пор, пока не будет выполнено чтение, которое опустит глубину очереди ниже максимального уровня (без ограничения = 0, по умолчанию = 10).

3) При чтении / записи данных смещение буфера и число теперь учитываются. Кроме того, вы можете вызвать Read с меньшим буфером, чем Write, не выбрасывая исключение и не теряя данные. BlockCopy используется в цикле для заполнения байтов до тех пор, пока счетчик не будет удовлетворен.

4) Существует открытое свойство AlwaysCopyBuffer, которое создает копию буфера в функции Write. Установка этого в true позволит безопасно использовать байтовый буфер после вызова Write.

5) Существует открытое свойство ReadTimeout / WriteTimeout, которое контролирует, как долго функция чтения / записи будет блокироваться, прежде чем она вернет 0 (по умолчанию = бесконечность, -1).

6) Используется класс BlockingQueue, который объединяет классы ConcurrentQueue и AutoResetEvent. В классе ConcurrentQueue существует редкое условие, при котором вы обнаружите, что после того, как данные были помещены в очередь, они не доступны сразу, когда AutoResetEvent пропускает поток в Read (). Это происходит примерно один раз каждые 500 ГБ данных, которые проходят через него. Лекарство заключается в том, чтобы выспаться и снова проверить данные. Иногда Sleep (0) работает, но в крайних случаях, когда загрузка процессора была высокой, он может достигать Sleep (1000) до того, как данные появятся. ConcurrentQueue обрабатывает все это без каких-либо проблем.

7) Это было проверено, чтобы быть потокобезопасным для одновременного чтения и записи асинхронного.

using System;
using System.IO;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;

public class EchoStream : Stream
{
    public override bool CanTimeout { get; } = true;
    public override int ReadTimeout { get; set; } = Timeout.Infinite;
    public override int WriteTimeout { get; set; } = Timeout.Infinite;
    public override bool CanRead { get; } = true;
    public override bool CanSeek { get; } = false;
    public override bool CanWrite { get; } = true;

    public bool CopyBufferOnWrite { get; set; } = false;

    private readonly object _lock = new object();

    // Default underlying mechanism for BlockingCollection is ConcurrentQueue, which is what we want
    private readonly BlockingCollection _Buffers;
    private int _maxQueueDepth = 10;

    private byte[] m_buffer = null;
    private int m_offset = 0;
    private int m_count = 0;

    private bool m_Closed = false;
    public override void Close()
    {
        m_Closed = true;

        // release any waiting writes
        _Buffers.CompleteAdding();
    }

    public bool DataAvailable
    {
        get
        {
            return _Buffers.Count > 0;
        }
    }

    private long _Length = 0L;
    public override long Length
    {
        get
        {
            return _Length;
        }
    }

    private long _Position = 0L;
    public override long Position
    {
        get
        {
            return _Position;
        }
        set
        {
            throw new NotImplementedException();
        }
    }

    public EchoStream() : this(10)
    {
    }

    public EchoStream(int maxQueueDepth)
    {
        _maxQueueDepth = maxQueueDepth;
        _Buffers = new BlockingCollection(_maxQueueDepth);
    }

    // we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
    public new Task WriteAsync(byte[] buffer, int offset, int count)
    {
        return Task.Run(() => Write(buffer, offset, count));
    }

    ,// we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
    public new Task ReadAsync(byte[] buffer, int offset, int count)
    {
        return Task.Run(() =>
        {
            return Read(buffer, offset, count);
        });
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        if (m_Closed || buffer.Length - offset < count || count <= 0)
            return;

        byte[] newBuffer;
        if (!CopyBufferOnWrite && offset == 0 && count == buffer.Length)
            newBuffer = buffer;
        else
        {
            newBuffer = new byte[count];
            System.Buffer.BlockCopy(buffer, offset, newBuffer, 0, count);
        }
        if (!_Buffers.TryAdd(newBuffer, WriteTimeout))
            throw new TimeoutException("EchoStream Write() Timeout");

        _Length += count;
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (count == 0)
            return 0;
        lock (_lock)
        {
            if (m_count == 0 && _Buffers.Count == 0)
            {
                if (m_Closed)
                    return -1;

                if (_Buffers.TryTake(out m_buffer, ReadTimeout))
                {
                    m_offset = 0;
                    m_count = m_buffer.Length;
                }
                else
                    return m_Closed ? -1 : 0;
            }

            int returnBytes = 0;
            while (count > 0)
            {
                if (m_count == 0)
                {
                    if (_Buffers.TryTake(out m_buffer, 0))
                    {
                        m_offset = 0;
                        m_count = m_buffer.Length;
                    }
                    else
                        break;
                }

                var bytesToCopy = (count < m_count) ? count : m_count;
                System.Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, bytesToCopy);
                m_offset += bytesToCopy;
                m_count -= bytesToCopy;
                offset += bytesToCopy;
                count -= bytesToCopy;

                returnBytes += bytesToCopy;
            }

            _Position += returnBytes;

            return returnBytes;
        }
    }

    public override void Flush()
    {
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }
}
@MaxR. Изменено== 0 " к "<= 0 ", для обработки случаев, когда Read возвращает -1 Brain2000
Конечно! Я забыл readbyte. Добавил, спасибо! Brain2000
Я добавил функцию ReadByte:public override int ReadByte() { byte[] returnValue = new byte[1]; return (Read(returnValue,0,1) == 0 ? -1 : returnValue[0]); } Max R.
11

Вдохновленный вашим ответом, здесьМоя многопоточная, многопоточная версия:

public class EchoStream : MemoryStream
{
    private readonly ManualResetEvent _DataReady = new ManualResetEvent(false);
    private readonly ConcurrentQueue _Buffers = new ConcurrentQueue();

    public bool DataAvailable{get { return !_Buffers.IsEmpty; }}

    public override void Write(byte[] buffer, int offset, int count)
    {
        _Buffers.Enqueue(buffer);
        _DataReady.Set();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        _DataReady.WaitOne();

        byte[] lBuffer;

        if (!_Buffers.TryDequeue(out lBuffer))
        {
            _DataReady.Reset();
            return -1;
        }

        if (!DataAvailable)
            _DataReady.Reset();

        Array.Copy(lBuffer, buffer, lBuffer.Length);
        return lBuffer.Length;
    }
}

С вашей версией вы должны прочитать поток при записи, без какой-либо возможности последовательной записи. Моя версия буферизует любой записанный буфер в ConcurrentQueue (это 'довольно просто изменить его на простую очередь и заблокировать)

это круто, однакоошибка в методе записи,_Buffers.Enqueue(buffer); следует заменить на_Buffers.Enqueue(buffer.Take(count).ToArray()); а потомЭто действительно работает, блокируя и обмениваясь данными между потоками! Спасибо! Marcin Tarsier
Далее должно быть_Buffers.Enqueue(buffer.Skip(offset).Take(count).ToArray());  И обратите внимание, что Read в данный момент игнорирует смещение и счет (для реализации замените ConcurrentQueue <байт []> с ConcurrentQueue <байт> и TryDequeue в цикле, пока вам не хватит или не останется ничего) kskid19
Это может не сработать, если вы попытаетесь выполнить чтение с размером буфера, меньшим, чем запись. Array.Copy сгенерирует исключение, еслине хватает места ... Brain2000
9

В конце концов я нашел простой способ сделать это, унаследовав от MemoryStream и переняв методы Read и Write.

public class EchoStream : MemoryStream {

    private ManualResetEvent m_dataReady = new ManualResetEvent(false);
    private byte[] m_buffer;
    private int m_offset;
    private int m_count;

    public override void Write(byte[] buffer, int offset, int count) {
        m_buffer = buffer;
        m_offset = offset;
        m_count = count;
        m_dataReady.Set();
    }

    public override int Read(byte[] buffer, int offset, int count) {
        if (m_buffer == null) {
            // Block until the stream has some more data.
            m_dataReady.Reset();
            m_dataReady.WaitOne();    
        }

        Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, (count < m_count) ? count : m_count);
        m_buffer = null;
        return (count < m_count) ? count : m_count;
    }
}
Эта версия небудет работать очень хорошо, если буфер чтения меньше, чем запись. Пока он выигралне генерировать исключение, как функция выше, это просто начнет обрезать данные. Brain2000
Справедливо. Я'согласенСтоит поставить чек на. Излишне говорить, что яВ течение 5 лет мы работали с вышеуказанным кодом в общедоступной службе SSH и никогда не зависали, поэтому я подозреваю, чтоЭто условие с очень низкой вероятностью. sipwiz
У вас есть состояние гонки вRead(), ЕслиWrite() вызывается другим потоком между нулевой проверкой буфера иm_dataReady.Reset()Возможно, вам придется ждать вечно, если сервер не будет отправлять данные снова. В большинстве протоколов запрос / ответ это создает мертвую блокировку. Я предлагаю вам использовать автоматическое событие вместо. Jörgen Sigvardsson
@sipwiz это хороший ответ. Однако использование Array.copy не позволяет чтению функционировать должным образом. Это победилоПоддержка офсетных копий. Вам нужно изменить наBuffer.BlockCopy (m_buffer, 0, буфер, смещение, m_count);  это также быстрее в большинстве систем. mark gamache

Похожие вопросы