Вопрос по memorystream, stream, c# – Есть ли в памяти поток, который блокирует как файловый поток

10

Я использую библиотеку, которая требует, чтобы я предоставил объект, который реализует этот интерфейс:

public interface IConsole {
    TextWriter StandardInput { get; }
    TextReader StandardOutput { get; }
    TextReader StandardError { get; }
}

Затем читатели объекта используются библиотекой с:

IConsole console = new MyConsole();
int readBytes = console.StandardOutput.Read(buffer, 0, buffer.Length);

Обычно класс, реализующий IConsole, имеет поток StandardOutput как поступающий от внешнего процесса. В этом случае вызовы console.StandardOutput.Read работают путем блокирования, пока в поток StandardOutput не будут записаны некоторые данные.

Я пытаюсь создать тестовую реализацию IConsole, которая использует MemoryStreams и отображает все, что появляется в StandardInput, обратно в StandardInput. Я пытался:

MemoryStream echoOutStream = new MemoryStream();
StandardOutput = new StreamReader(echoOutStream);

Но проблема в том, что console.StandardOutput.Read будет возвращать 0, а не блокировать, пока не появятся данные. Могу ли я в любом случае заставить MemoryStream заблокировать, если нет доступных данных или есть другой поток памяти, который я мог бы использовать?

Я не могу поверить, что .net не имеет этого встроенного. Я беру SQL VDI, который записывает необработанные данные резервных копий в поток .NET, а затем мне нужно прочитать из этого потока и записать его в удаленную PSSession PowerShell выход. К сожалению, я не могу прочитать поток, потому что он всегда возвращает 0. Brain2000
@ Martinv.Löwis Это никогда не о том, что вы не должны делать. Речь идет о том, что вы можете сделать с помощью предоставленных инструментов. Быть способным читать и писать из одного потока должно быть легко, но это не так. Brain2000
Вы действительно не должны читать из выходного потока. Martin v. Löwis

Ваш Ответ

3   ответа
0

ние двух других версий, а также некоторые предложения из комментариев.

ОБНОВИТЬ - Я тестировал этот EchoStream с более чем 50 терабайтами данных, проходящими через него в течение нескольких дней подряд. Тест проводился между сетевым потоком и потоком сжатия ZStandard. Асинхронность также была проверена, что привело к появлению редких условий на поверхности. Похоже, что встроенный System.IO.Stream не ожидает, что он будет одновременно вызывать ReadAsync и WriteAsync в одном и том же потоке, что может привести к зависанию, если нет доступных данных, поскольку оба вызова используют один и тот же внутренний переменные. Поэтому мне пришлось переопределить те функции, которые решили проблему зависания.

В этой версии следующие функции.

1) Это было написано с нуля с использованием базового класса System.IO.Stream вместо MemoryStream.

2) Конструктор может установить максимальную глубину очереди, и если этот уровень достигнут, тогда запись потока будет блокироваться до тех пор, пока не будет выполнено чтение, что приведет к падению глубины очереди ниже максимального уровня (без ограничения = 0, по умолчанию = 10).

3) При чтении / записи данных смещение буфера и число теперь учитываются. Кроме того, вы можете вызывать Read с меньшим буфером, чем Write, не выбрасывая исключение и не теряя данные. BlockCopy используется в цикле для заполнения байтов до тех пор, пока счетчик не будет удовлетворен.

4) Существует открытое свойство AlwaysCopyBuffer, которое создает копию буфера в функции Write. Установка этого в true позволит безопасно использовать байтовый буфер после вызова Write.

5) Существует открытое свойство ReadTimeout / WriteTimeout, которое контролирует, как долго функция чтения / записи будет блокироваться, прежде чем она вернет 0 (по умолчанию = бесконечность, -1).

6) Используется класс BlockingQueue, который объединяет классы ConcurrentQueue и AutoResetEvent. В классе ConcurrentQueue существует редкое условие, когда вы обнаружите, что после того, как данные были помещены в очередь, они не сразу доступны, когда AutoResetEvent пропускает поток в Read (). Это происходит примерно один раз каждые 500 ГБ данных, которые проходят через него. Лекарство заключается в том, чтобы выспаться и снова проверить данные. Иногда Sleep (0) работает, но в крайних случаях, когда загрузка процессора была высокой, он может достигать Sleep (1000) до того, как данные появятся. ConcurrentQueue обрабатывает все это без каких-либо проблем.

7) Это было проверено, чтобы быть потокобезопасным для одновременного чтения и записи асинхронного.

using System;
using System.IO;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Concurrent;

public class EchoStream : Stream
{
    public override bool CanTimeout { get; } = true;
    public override int ReadTimeout { get; set; } = Timeout.Infinite;
    public override int WriteTimeout { get; set; } = Timeout.Infinite;
    public override bool CanRead { get; } = true;
    public override bool CanSeek { get; } = false;
    public override bool CanWrite { get; } = true;

    public bool CopyBufferOnWrite { get; set; } = false;

    private readonly object _lock = new object();

    // Default underlying mechanism for BlockingCollection is ConcurrentQueue<T>, which is what we want
    private readonly BlockingCollection<byte[]> _Buffers;
    private int _maxQueueDepth = 10;

    private byte[] m_buffer = null;
    private int m_offset = 0;
    private int m_count = 0;

    private bool m_Closed = false;
    public override void Close()
    {
        m_Closed = true;

        // release any waiting writes
        _Buffers.CompleteAdding();
    }

    public bool DataAvailable
    {
        get
        {
            return _Buffers.Count > 0;
        }
    }

    private long _Length = 0L;
    public override long Length
    {
        get
        {
            return _Length;
        }
    }

    private long _Position = 0L;
    public override long Position
    {
        get
        {
            return _Position;
        }
        set
        {
            throw new NotImplementedException();
        }
    }

    public EchoStream() : this(10)
    {
    }

    public EchoStream(int maxQueueDepth)
    {
        _maxQueueDepth = maxQueueDepth;
        _Buffers = new BlockingCollection<byte[]>(_maxQueueDepth);
    }

    // we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
    public new Task WriteAsync(byte[] buffer, int offset, int count)
    {
        return Task.Run(() => Write(buffer, offset, count));
    }

    // we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
    public new Task<int> ReadAsync(byte[] buffer, int offset, int count)
    {
        return Task.Run(() =>
        {
            return Read(buffer, offset, count);
        });
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        if (m_Closed || buffer.Length - offset < count || count <= 0)
            return;

        byte[] newBuffer;
        if (!CopyBufferOnWrite && offset == 0 && count == buffer.Length)
            newBuffer = buffer;
        else
        {
            newBuffer = new byte[count];
            System.Buffer.BlockCopy(buffer, offset, newBuffer, 0, count);
        }
        if (!_Buffers.TryAdd(newBuffer, WriteTimeout))
            throw new TimeoutException("EchoStream Write() Timeout");

        _Length += count;
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (count == 0)
            return 0;
        lock (_lock)
        {
            if (m_count == 0 && _Buffers.Count == 0)
            {
                if (m_Closed)
                    return -1;

                if (_Buffers.TryTake(out m_buffer, ReadTimeout))
                {
                    m_offset = 0;
                    m_count = m_buffer.Length;
                }
                else
                    return m_Closed ? -1 : 0;
            }

            int returnBytes = 0;
            while (count > 0)
            {
                if (m_count == 0)
                {
                    if (_Buffers.TryTake(out m_buffer, 0))
                    {
                        m_offset = 0;
                        m_count = m_buffer.Length;
                    }
                    else
                        break;
                }

                var bytesToCopy = (count < m_count) ? count : m_count;
                System.Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, bytesToCopy);
                m_offset += bytesToCopy;
                m_count -= bytesToCopy;
                offset += bytesToCopy;
                count -= bytesToCopy;

                returnBytes += bytesToCopy;
            }

            _Position += returnBytes;

            return returnBytes;
        }
    }

    public override void Flush()
    {
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }
}
9

унаследовав от MemoryStream и переняв методы Read и Write.

public class EchoStream : MemoryStream {

    private ManualResetEvent m_dataReady = new ManualResetEvent(false);
    private byte[] m_buffer;
    private int m_offset;
    private int m_count;

    public override void Write(byte[] buffer, int offset, int count) {
        m_buffer = buffer;
        m_offset = offset;
        m_count = count;
        m_dataReady.Set();
    }

    public override int Read(byte[] buffer, int offset, int count) {
        if (m_buffer == null) {
            // Block until the stream has some more data.
            m_dataReady.Reset();
            m_dataReady.WaitOne();    
        }

        Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, (count < m_count) ? count : m_count);
        m_buffer = null;
        return (count < m_count) ? count : m_count;
    }
}
@sipwiz это хороший ответ. Однако использование Array.copy не позволяет чтению функционировать должным образом. Он не будет поддерживать офсетные копии. Вам нужно изменить наBuffer.BlockCopy (m_buffer, 0, буфер, смещение, m_count); это также быстрее в большинстве систем. mark gamache
Эта версия не будет работать очень хорошо, если буфер чтения меньше, чем запись. Хотя оно не будет выдавать исключение, как функция выше, это просто начнет блокировать данные. Brain2000
Справедливо. Я согласен, что стоит поставить чек на. Само собой разумеется, что у меня был вышеупомянутый код в производстве на общедоступной службе SSH в течение 5 лет, и у меня никогда не было зависания службы, поэтому я подозреваю, что это условие с очень низкой вероятностью. sipwiz
У вас есть состояние гонки вRead(), ЕслиWrite() вызывается другим потоком между нулевой проверкой буфера иm_dataReady.Reset()Возможно, вам придется ждать вечно, если сервер не будет отправлять данные снова. В большинстве протоколов запрос / ответ это создает мертвую блокировку. Я предлагаю вам использовать автоматическое событие вместо. Jörgen Sigvardsson
11

вот моя многопоточная, многопоточная версия:

public class EchoStream : MemoryStream
{
    private readonly ManualResetEvent _DataReady = new ManualResetEvent(false);
    private readonly ConcurrentQueue<byte[]> _Buffers = new ConcurrentQueue<byte[]>();

    public bool DataAvailable{get { return !_Buffers.IsEmpty; }}

    public override void Write(byte[] buffer, int offset, int count)
    {
        _Buffers.Enqueue(buffer);
        _DataReady.Set();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        _DataReady.WaitOne();

        byte[] lBuffer;

        if (!_Buffers.TryDequeue(out lBuffer))
        {
            _DataReady.Reset();
            return -1;
        }

        if (!DataAvailable)
            _DataReady.Reset();

        Array.Copy(lBuffer, buffer, lBuffer.Length);
        return lBuffer.Length;
    }
}

С вашей версией вы должны прочитать поток при записи, без какой-либо возможности последовательной записи. Моя версия буферизует любой записанный буфер в ConcurrentQueue (довольно просто изменить его на простую очередь и заблокировать его)

Это может не сработать, если вы попытаетесь выполнить чтение с размером буфера, меньшим, чем запись. Array.Copy сгенерирует исключение, если недостаточно места ... Brain2000
это здорово, однако есть ошибка в методе записи,_Buffers.Enqueue(buffer); следует заменить на_Buffers.Enqueue(buffer.Take(count).ToArray()); и тогда он действительно работает, блокируя и обмениваясь данными между потоками! Спасибо! Marcin Tarsier
Далее должно быть_Buffers.Enqueue(buffer.Skip(offset).Take(count).ToArray()); И обратите внимание, что Read в настоящее время игнорирует смещение и счет (для реализации замените ConcurrentQueue <byte []> на ConcurrentQueue <byte> и TryDequeue в цикле, пока у вас не будет достаточно или не останется ни одного) kskid19

Похожие вопросы