Вопрос по .net, c#, parallel-processing, c#-4.0 – Ограничивает ли Parallel.ForEach количество активных потоков?

98

Учитывая этот код:

var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});

Будут ли все 1000 потоков появляться почти одновременно?

Ваш Ответ

5   ответов
26

На одноядерном компьютере ... Parallel.ForEach разделы (порции) коллекции, над которой он работает, между несколькими потоками, но это число рассчитывается на основе алгоритма, который учитывает и, по-видимому, постоянно отслеживает выполненную работу. потоками, которые он выделяет для ForEach. Такif the body part of the ForEach calls out to long running IO-bound/blocking functions which would leave the thread waiting around, the algorithm will spawn up more threads and repartition the collection between them, Если потоки завершаются быстро и не блокируют, например, потоки ввода-вывода, например, просто вычисляя некоторые числа,the algorithm will ramp up (or indeed down) the number of threads to a point where the algorithm considers optimum for throughput (average completion time of each iteration).

По сути, пул потоков за всеми различными функциями библиотеки Parallel будет определять оптимальное количество потоков для использования. Количество ядер физического процессора составляет только часть уравнения. Между числом ядер и количеством порождаемых потоков НЕ существует простого отношения один к одному.

Я не считаю документацию по отмене и обработке синхронизирующих потоков очень полезной. Надеюсь, MS может предоставить лучшие примеры в MSDN.

Не забывайте, что код тела должен быть написан для выполнения в нескольких потоках, наряду со всеми обычными соображениями безопасности потоков, среда не абстрагирует этот фактор ... пока.

Вы правы, для IO он может выделить +100 потоков, так как я сам отлаживал
& quot; .. если часть тела ForEach вызывает долго функционирующие блокирующие функции, которые заставили бы поток ожидать, алгоритм создаст больше потоков .. & quot; -In degenerate cases this means there might be as many threads created as allowed per ThreadPool.
3

Отличный вопрос В вашем примере уровень распараллеливания довольно низок даже на четырехъядерном процессоре, но при некотором ожидании уровень распараллеливания может стать довольно высоким.

// Max concurrency: 5
[Test]
public void Memory_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);
        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Теперь посмотрим, что происходит, когда добавляется ожидающая операция для имитации HTTP-запроса.

// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Я еще не внес никаких изменений, и уровень параллелизма / распараллеливания резко вырос. Параллельность может быть увеличена с помощьюParallelOptions.MaxDegreeOfParallelism.

// Max concurrency: 43
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayS,trings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

// Max concurrency: 391
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(100000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Я рекомендую установкуParallelOptions.MaxDegreeOfParallelism, Это не обязательно увеличит количество используемых потоков, но обеспечит запуск только нормального количества потоков, что, по-видимому, является вашей заботой.

Наконец, чтобы ответить на ваш вопрос, нет, вы не получите все темы для запуска сразу. Используйте Parallel.Invoke, если вы ищете для параллельного запуска, например, тестирование условий гонки.

// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
    ConcurrentBag<string> monitor = new ConcurrentBag<string>();
    ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(DateTime.UtcNow.Ticks.ToString());
        monitor.TryTake(out string result);
        monitorOut.Add(result);
    });

    var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
    Console.WriteLine(string.Join(Envir,onment.NewLine, startTimes.Take(10)));
}
139

Нет, он не запустит 1000 потоков - да, он ограничит количество используемых потоков. Parallel Extensions использует соответствующее количество ядер, исходя из того, сколько у вас физическиand сколько уже занято. Он распределяет работу для каждого ядра, а затем использует метод, называемыйwork stealing чтобы каждый поток мог эффективно обрабатывать свою очередь, и ему нужен только дорогой межпотоковый доступ, когда это действительно необходимо.

Посмотрите наБлог команды PFX заloads информации о том, как он распределяет работу и все виды других тем.

Обратите внимание, что в некоторых случаях вы также можете указать желаемую степень параллелизма.

Я использовал Parallel.ForEach (FilePathArray, path = & gt; ... для чтения около 24 000 файлов сегодня вечером, создавая по одному новому файлу для каждого файла, в котором я читал. Очень простой код. Кажется, что даже 6 потоков было достаточно, чтобы перегружать 7200 об / мин диск, с которого я читал со 100% загрузкой. В течение нескольких часов я наблюдал, как библиотека Parallel запускает более 8000 потоков. Я проверил с помощью MaxDegreeOfParallelism и, конечно же, пропал 8000+ потоков. Я проверял это несколько раз с тот же результат.
Этоcould начать 1000 потоков для некоторого вырожденного «DoSomething». (Как и в случае, когда я в настоящее время имею дело с проблемой в производственном коде, которая не смогла установить ограничение и породила 200+ потоков, тем самым выталкивая пул соединений SQL. Я рекомендую установить Max DOP для любой работы, которая не может быть тривиально обоснована о явной привязке к процессору.)
5

УвидетьИспользует ли Parallel.For одну задачу на итерацию? для идеи «ментальной модели»; использовать. Однако автор заявляет, что «в конце концов, важно помнить, что детали реализации могут измениться в любое время».

5

Он вырабатывает оптимальное количество потоков в зависимости от количества процессоров / ядер. Они не все будут появляться одновременно.

Похожие вопросы