Вопрос по system.reactive, .net, c# – Как читать файл с чередованием одновременно, используя реактивные расширения

2

Я новичок в реактивных расширениях, и я хотел бы использовать его (в C #) для чтения файла, который содержит несколько потоков, которые чередуются. В основном файл в форматеABCDABCDABCD..., Я бы предпочел читать файл последовательно и разделять потоки (т.е.AAA.., BBB..и т. д.) и обрабатывать каждый поток параллельно, используя отдельные потоки для каждого потока.

Должна быть некоторая форма буферизации, чтобы гарантировать, что каждый поток может оставаться максимально занятым (конечно, в определенных пределах). Не все потоки обязательно начинаются одновременно, и в этом случае для задержанных потоков необходимо пропустить несколько элементов. В этом случае буферизация может сократить разрыв.

Элементы в файле маленькие (4 байта), поэтому они довольно болтливы. Поэтому я также ищу способ эффективно с этим справиться.

Я начал с создания перечислимого файла для чтения. Это может быть сделано для предоставления структуры, которая содержит идентификатор потока, или потоки могут быть разделены на основе порядка (номер элемента по модулю количества потоков). Последнее, вероятно, более эффективно, хотя.

Ваш Ответ

2   ответа
1

основанное на ответе Ямена. Кажется, что он работает правильно, это означает, что последовательный чередующийся ввод разделен на несколько последовательных потоков, которые обрабатываются параллельно (многопоточность).

Тем не менее, я не уверен, что это правильная реализация (с точки зрения стиля программирования, RX-контрактов и т. Д.).

const int MAX_BUFFERED_ELEMENTS = 1024;

// number of streams in the file
var numberOfStreams = 8;

// semaphore to limit buffered elements
var semaphore = new SemaphoreSlim(MAX_BUFFERED_ELEMENTS);
var cts = new CancellationTokenSource(); // should be used to cancel (left out of this sample)

// create subjects that are the base of each output stream
var subjects = Enumerable.Repeat(0, numberOfStreams).Select(_ => new Subject<ElementType>()).ToArray();

// create the source stream (reader is IEnumerable<ElementType>)
var observable = reader.ToObservable(Scheduler.ThreadPool).Publish();

// forward elements from source to the output subjects
int stream = 0;
observable.Subscribe(x => { 
    semaphores.Wait(cts.Token);   // wait if buffer is full
    _subjects[stream].OnNext(x);  // forward to output stream
    if (++stream >= numberOfStreams) stream = 0; }); // stream = stream++ % numberOfStreams

// build output streams
subjects.Select(
    (s,i) => s.ObserveOn(Scheduler.ThreadPool) // process on separate threads
    .Do(_ => semaphore.Release())              // signal that element is consumed
    .Subscribe(x => Console.WriteLine("stream: {0}\t element: {1}", i, x)) // debug 'processing'
    );

// start processing!
observable.Connect();
3

особенно когда вы говорите о производительности и эффективности, но предоставили пример, который несколько надуманный. А именно, ваш файл примера очень прост по сравнению с реальным файлом. Тем не менее, я постараюсь дать несколько советов о том, насколько это полезно.

Здесь способ превращения потока вEnumerable<char>, Поток будет применять буферизацию, это будет отправлять один результат за раз. Это можно сделать более эффективным (для отправки кусков данных назад), но в какой-то момент вам нужно обрабатывать их по одному, и это также может быть здесь. Не оптимизировать преждевременно.

IEnumerable<char> ReadBytes(Stream stream)
{
    using (StreamReader reader = new StreamReader(stream))
    {
        while (!reader.EndOfStream)
            yield return (char)reader.Read();
    }
}

Теперь предположим, что это код обработки для «вывода». наблюдаемые. Сначала я настраиваю выходные данные, а затем подписываюсь на них соответствующим образом. Обратите внимание, что я здесь использую массив, поэтому мой выходной наблюдаемый индекс - это индекс массива. Можно также использовать словарь, если индекс потока нельзя превратить в индекс, начинающийся с нуля.

var outputs = Enumerable.Repeat(0, 3).Select(_ => new Subject<char>()).ToArray();                                                                                                     

outputs[0].Delay(TimeSpan.FromSeconds(2)).Subscribe(x => Console.WriteLine("hi: {0}", x));
outputs[1].Delay(TimeSpan.FromSeconds(1)).Subscribe(x => Console.WriteLine("ho: {0}", x));
outputs[2].Subscribe(x => Console.WriteLine("he: {0}", x));

Обратите внимание на использованиеSubject<char> чтобы отправить мои элементы на. Это зависит от типа вашего элемента, ноchar работает в приведенном примере. Также обратите внимание, что я задерживаю элементы только для того, чтобы доказать, что все работает. Теперь они являются независимыми потоками, и вы можете делать с ними все, что захотите.

Хорошо, учитывая поток файлов:

var file = @"C:\test.txt";
var buffer = 32;
var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, buffer);

Теперь я могу подписаться и использовать индекс по модулю для отправки в правильный поток вывода:

ReadBytes(stream)
.ToObservable(Scheduler.ThreadPool)
.Select((x,i) => new { Key = (i % 3), Value = x }) // you can change it up here
.Subscribe(x => outputs[x.Key].OnNext(x.Value));

Здесь есть потенциально более эффективные методы, в зависимости от того, как именно вы можете рассчитать целевой поток, но идея остается той же.

Входной файл содержит только одну строку:ABCABCABCABCABCABC

Выход из запуска программы:

he: C
he: C
he: C
he: C
he: C
he: C

Спустя секунду:

ho: B
ho: B
ho: B
ho: B
ho: B
ho: B

А потом еще секунду:

hi: A
hi: A
hi: A
hi: A
hi: A
hi: A
На самом деле не понимаю, чего вы хотите, если один поток отстает. Это потому, что в файле нет данных для этого потока (в таком случае, что должно произойти)? Или это потому, что этот поток занимает больше времени для обработки его элементов?
Справедливо, принял ваш ответ :) Herman
Позже я имел в виду, что поток обрабатывается дольше. Я добавил текущую версию своего кода в качестве ответа. Если у вас есть комментарии к ним, они очень приветствуются. Я бы хотел, чтобы вы ответили за свой ответ, но это не совсем правильный ответ, у вас есть предложение? (Я также новичок в stackoverflow) Herman
Это отличное решение! Однако я намеревался обрабатывать потоки параллельно с несколькими потоками. Я смог сделать это, добавив.ObserveOn(Scheduler.ThreadPool) междуoutputs[i] а также.Subscribe(), Я проверил это с снами внутри.Subscribe() и, кажется, работает как задумано. Потоки работают параллельно, и для каждого потока он блокируется (поэтому поддерживается порядок элементов для каждого потока). Единственная проблема остается в том, что весь файл читается сразу, я хотел бы добавить максимум к тому, что буферизируется. Так что, если один поток слишком сильно отстает, другие должны голодать. Herman
Вообще говоря, это «один вопрос». формат. В вашем вопросе много проблем. Я ответил на один из них преимущественно (разделив чередующийся поток на несколько наблюдаемых). Я не буду смотреть на вторую половину (дросселирование нескольких наблюдаемых), так как считаю, что это должен быть еще один вопрос, касающийся этой проблемы. Тем не менее, я думаю, что есть улучшения, которые могут быть внесены в ваш существующий код, так как он в настоящее время сложен и труден для понимания.

Похожие вопросы