Вопрос по multithreading, .net-2.0, c# – Ускорение цикла с использованием многопоточности в C # (Вопрос)

13

Представьте, что у меня есть функция, которая просматривает миллион / миллиард строк и проверяет что-то в них.

f.ex:

<code>foreach (String item in ListOfStrings)
{
    result.add(CalculateSmth(item));
}
</code>

он отнимает много времени, потому что CalculateSmth является очень трудоемкой функцией.

Я хочу спросить: как интегрировать многопоточность в этот своего рода процесс?

f.ex: я хочу запустить 5 потоков, и каждый из них возвращает некоторые результаты, и это продолжается до тех пор, пока в списке не появятся элементы.

Может быть, кто-нибудь может показать несколько примеров или статей ..

Забыл упомянуть, что мне это нужно в .NET 2.0

Вам нужны результаты обратно в том же порядке? Keith
Не могли бы вы использовать несколько фоновых работников? создать какую-то логику, которая будет считать счетчик списка строк, затем создавать X количества BW и делить каждую Crash893

Ваш Ответ

6   ответов
17

Вы можете попробоватьПараллельные расширения (часть .NET 4.0)

Это позволяет вам написать что-то вроде:

Parallel.Foreach (ListOfStrings, (item) => 
    result.add(CalculateSmth(item));
);

Конечно, result.add должен быть потокобезопасным.

result.add должен быть потокобезопасным, да ..
Хорошо, тогда вы могли бы взглянуть на исходный код Parallel.Foreach в отражателе ... Хотя я полагаю, что под ним находится целый другой слой, поэтому он не будет простой копирующей пастой для получения аналогичной функциональности в .NET 2.0.
Параллельные расширения теперь являются частью .net 4.0, поэтому больше не являются CTP. :)
Забыл упомянуть, что мне это нужно в .NET 2.0 Lukas Šalkauskas
в этом случае, будет ли какое-либо состояние гонки в наборе результатов? после того, как все несколько потоков могут одновременно выполнять result.add ...
2

Первый вопрос, на который вы должны ответить, следует ли использовать потоки

Если ваша функция CalculateSmth () в основном привязана к процессору, т. Е. Интенсивно использует процессор и практически не использует ввод-вывод, то мне трудно понять смысл использования потоков, поскольку потоки будут конкурировать за один и тот же ресурс. В этом случае процессор.

Если ваш CalculateSmth () использует как процессор, так и ввод / вывод, то это может быть точка в использовании многопоточности.

Я полностью согласен с комментарием к моему ответу. Я сделал ошибочное предположение, что мы говорим об одном процессоре с одним ядром, но в наши дни у нас многоядерные процессоры, мой плохой.

Зависит от того, является ли это многоядерной системой. Например, если у вас доступно четыре ядра, то использование четырех потоков должно привести к примерно четырехкратному ускорению обработки (при условии отсутствия взаимозависимостей между потоками).
18

Расширения Parallel - это круто, но это также можно сделать, просто используя пул потоков следующим образом:

using System.Collections.Generic;
using System.Threading;

namespace noocyte.Threading
{
    class CalcState
    {
        public CalcState(ManualResetEvent reset, string input) {
            Reset = reset;
            Input = input;
        }
        public ManualResetEvent Reset { get; private set; }
        public string Input { get; set; }
    }

    class CalculateMT
    {
        List<string> result = new List<string>();
        List<ManualResetEvent> events = new List<ManualResetEvent>();

        private void Calc() {
            List<string> aList = new List<string>();
            aList.Add("test");

            foreach (var item in aList)
            {
                CalcState cs = new CalcState(new ManualResetEvent(false), item);
                events.Add(cs.Reset);
                ThreadPool.QueueUserWorkItem(new WaitCallback(Calculate), cs);
            }
            WaitHandle.WaitAll(events.ToArray());
        }

        private void Calculate(object s)
        {
            CalcState cs = s as CalcState;
            cs.Reset.Set();
            result.Add(cs.Input);
        }
    }
}
Для чего нужен WaitHandle.WaitAll ()? Я получаю исключение NotSupportedException: "Число WaitHandles должно быть меньше или равно 64".
Вы генерируете более 64 потоков ... Возможно, это не очень хорошая идея ... :) Попробуйте с меньшим количеством потоков.
Может иметь ManualResetEvent, который вызывает функция WaitCallback, и основной поток WaitOne включен.
А как узнать, когда это закончено? ттт.
Добавлен код, чтобы показать, как вы можете использовать MRE для этого.
5

Вы должны разделить работу, которую вы хотите делать параллельно. Вот пример того, как вы можете разделить работу на две части:

List<string> work = (some list with lots of strings)

// Split the work in two
List<string> odd = new List<string>();
List<string> even = new List<string>();
for (int i = 0; i < work.Count; i++)
{
    if (i % 2 == 0)
    {
        even.Add(work[i]);
    }
    else
    {
        odd.Add(work[i]);
    }
}

// Set up to worker delegates
List<Foo> oddResult = new List<Foo>();
Action oddWork = delegate { foreach (string item in odd) oddResult.Add(CalculateSmth(item)); };

List<Foo> evenResult = new List<Foo>();
Action evenWork = delegate { foreach (string item in even) evenResult.Add(CalculateSmth(item)); };

// Run two delegates asynchronously
IAsyncResult evenHandle = evenWork.BeginInvoke(null, null);
IAsyncResult oddHandle = oddWork.BeginInvoke(null, null);

// Wait for both to finish
evenWork.EndInvoke(evenHandle);
oddWork.EndInvoke(oddHandle);

// Merg,e the results from the two jobs
List<Foo> allResults = new List<Foo>();
allResults.AddRange(oddResult);
allResults.AddRange(evenResult);

return allResults;
1

Не то чтобы у меня сейчас были какие-то хорошие статьи, но то, что вы хотите сделать, это что-то вроде Producer-Consumer с Threadpool.

Producers выполняет циклы и создает задачи (которые в данном случае могли бы просто поставить в очередь элементы списка или стека). Потребителями являются, скажем, пять потоков, которые считывают один элемент из стека, потребляют его, вычисляя, а затем сохраняют в другом месте.

Таким образом, многопоточность ограничена только этими пятью потоками, и у них у всех будет работа до тех пор, пока стек не опустеет.

Что нужно подумать:

  • Put protection on the input and output list, such as a mutex.
  • If the order is important, make sure that the output order is maintained. One example could be to store them in a SortedList or something like that.
  • Make sure that the CalculateSmth is thread safe, that it doesn't use any global state.
12

Обратите внимание, что параллелизм волшебным образом не дает вам больше ресурсов. Вам необходимо установить, что замедляет CalculateSmth.

Например, если он привязан к процессору (а у вас одно ядро), то одинаковое количество тактов процессора попадет в код независимо от того, выполняете ли вы их последовательно или параллельно. Кроме того, вы получаете некоторые издержки от управления потоками. Тот же аргумент применяется к другим ограничениям (например, ввод / вывод)

При этом вы получите прирост производительности только в том случае, если CalculateSmth оставляет ресурс свободным во время его выполнения, что может быть использовано другим экземпляром. Это не редкость. Например, если задача включает в себя ввод-вывод, за которым следуют некоторые операции с процессором, то процесс 1 может выполнять работу с процессором, а процесс 2 выполняет ввод-вывод. Как указывают маты, цепочка производителей-потребителей может достичь этого, если у вас есть инфраструктура.

Похожие вопросы