Вопрос по multithreading, concurrentdictionary, concurrency, c#, race-condition – получение исключения аргумента в параллельном словаре при сортировке и отображении по мере его обновления

3

Мне трудно воспроизвести ошибку в следующей программе, в которой несколько потоков параллельно обновляют параллельный словарь, а основной поток отображает состояние словаря в отсортированном порядке через фиксированные промежутки времени, пока не завершатся все потоки обновления.

public void Function(IEnumerable<ICharacterReader> characterReaders, IOutputter outputter)
{
    ConcurrentDictionary<string, int> wordFrequencies = new ConcurrentDictionary<string, int>();
    Thread t = new Thread(() => UpdateWordFrequencies(characterReaders, wordFrequencies));
    bool completed = false;
    var q = from pair in wordFrequencies orderby pair.Value descending, pair.Key select new Tuple<string, int>(pair.Key, pair.Value);
    t.Start();
    Thread.Sleep(0);

    while (!completed)
    {
        completed = t.Join(1);
        outputter.WriteBatch(q);
    }            
}

Функция получает список символьных потоков и вывод. Функция поддерживает параллельный словарь частот слов слов, прочитанных из каждого из потоков символов (параллельно). Слова считываются новым потоком, и основной поток выводит текущее состояние словаря (в отсортированном порядке) каждые 1 миллисекунду, пока все входные потоки не будут прочитаны (на практике вывод будет примерно таким же, как каждые 10 секунд, но ошибка появляется только для очень маленьких значений). Функция WriteBatch просто пишет в консоль:

public void WriteBatch(IEnumerable<Tuple<string, int>> batch)
{
    foreach (var tuple in batch)
    {
        Console.WriteLine("{0} - {1}", tuple.Item1, tuple.Item2);
    }
    Console.WriteLine();
}

Большинство выполнений в порядке, но иногда я получаю следующую ошибку в операторе foreach в функции WriteBatch:

& quot; Необработанное исключение: System.ArgumentException: индекс равен или больше чем длина массива, или количество элементов в словаре больше после доступного пространства от индекса до конца целевого массива. & quot;

Ошибка действительно исчезает, если основной поток спит в течение короткого времени после запуска потоков обновления и перед запуском цикла отображения. Также кажется, что оно пропадает, если предложение orderby удалено и словарь не отсортирован в запросе linq. Есть объяснения?

foreach (var tuple in batch) оператор в функции WriteBatch выдает ошибку. Трассировка стека выглядит следующим образом:

Необработанное исключение: System.ArgumentException: индекс равен или больше чем длина массива, или количество элементов в словаре больше после доступного пространства от индекса до конца целевого массива.     в System.Collections.Concurrent.ConcurrentDictionary2.System.Collections.Ge     neric.ICollection & GT; .CopyTo (К     массив eyValuePair2 [], индекс Int32)        в System.Linq.Buffer1..ctor (источник IEnumerable1)        в System.Linq.OrderedEnumerable1.d__0.MoveNext ()        в System.Linq.Enumerable.WhereSelectEnumerableIterator2.MoveNext ()        в MyProject.ConsoleOutputter.WriteBatch (пакет IEnumerable1) в C: \ MyProject \ ConsoleOutputter.cs: строка 10        в MyProject.Function (IEnumerable1 characterReaders, IOutputter outputter)

Кроме того, есть ли трассировка стека? Вы можете опубликовать это? Chris Shain
Какая строка дает исключение? Chris Shain
Я обновил с трассировкой стека. Исключение происходит в операторе foreach в функции WriteBatch. Да, это какая-то гонка, но мне бы очень хотелось узнать, почему это происходит. Идея сделать снимок с помощью ToArray () хороша, но разве это не приводит к снижению производительности и ненужному копированию? А как насчет синхронизации всего словаря при отображении? Sanorita Rm
@MichaelDmitryAzarkevich Я сомневаюсь в этом. Из документов по ConcurrentDictionary: & quot; Все открытые и защищенные члены ConcurrentDictionary & lt; TKey, TValue & gt; являются потокобезопасными и могут использоваться одновременно из нескольких потоков. & quot;msdn.microsoft.com/en-us/library/dd287191.aspx Chris Shain
Я догадываюсь, что, как и почти во всех трудно воспроизводимых проблемах параллелизма, у вас есть какие-то условия гонки. Я думаю, что проблема в том, что, когда вы просматриваете словарь, он изменяется другими потоками. Если вы хотите вывести статус словаря в определенный момент времени, вам нужно получить статический снимок этого слова в тот момент и распечататьthat не ссылка, которую вы даете своей функции. Попробуйте сделать всю работу наq.ToArray() вместо самого словаря. Я думаю, что это может решить это. Если он даст мне знать, и я опубликую его как ответ, я просто не совсем уверен. Michael

Ваш Ответ

2   ответа
1

вы пришли к выводу, что вы должны получить взаимоисключающий доступ к словарю перед его распечаткой, либо сmutex изlock заявление.

Делаем это с замком:

public void WriteBatch(IEnumerable<Tuple<string, int>> batch)
{
    lock (myLock) 
    {
        foreach (var tuple in batch)
        {
            Console.WriteLine("{0} - {1}", tuple.Item1, tuple.Item2);
        }
        Console.WriteLine();
    }
}

при условии, что вы выделилиmyLock объект на уровне класса. Увидетьпример.

Делаем это с мьютексом:

public void WriteBatch(IEnumerable<Tuple<string, int>> batch)
{
    mut.WaitOne();

    foreach (var tuple in batch)
    {
        Console.WriteLine("{0} - {1}", tuple.Item1, tuple.Item2);
    }
    Console.WriteLine();

    mut.ReleaseMutex();
}

Опять же, при условии, что вы выделилиMutex объект на уровне класса. Увидетьпример.

Спасибо, ребята, у Николаса была замечательная идея использовать ToArray () в самой параллельной коллекции, чтобы мы могли - безопасно - сделать снимок, как изначально предлагал Майкл. Sanorita Rm
Спасибо, Майкл, Шейн. Синхронизация, кажется, путь, но это не полное решение, верно? Блокировка также должна выполняться каждым из обновляющих потоков - верно? Но это отчасти побьет цель использования параллельного словаря - то же самое можно сделать с небезопасным словарем, так как мы все равно блокируем все это. В некоторой степени мне нужна полная синхронизация между отображением и обновлением группы, а также точная синхронизация (использование параллельного словаря) между всеми потоками, которые обновляются. Sanorita Rm
6

System.Linq.Buffer<T>который называетсяOrderBy.

Вот фрагмент кода обидчика:

TElement[] array = null;
int num = 0;
if (collection != null)
{
    num = collection.Count;
    if (num > 0)
    {
        array = new TElement[num];
        collection.CopyTo(array, 0);
    }
}

Исключение выдается, когда элемент (ы) добавляются вcollection после звонкаcollection.Count но до звонкаcollection.CopyTo.

Чтобы обойти это, вы можете сделать & quot; снимок & quot; копия словаря, прежде чем его отсортировать.

Вы можете сделать это, позвонивConcurrentDictionary.ToArray.
 Как это реализовано вConcurrentDictionary Сам класс, это безопасно.

Использование этого подхода означает, что вам не нужно защищать коллекцию с помощью блокировки, которая, как вы говорите, в первую очередь противоречит цели использования параллельной коллекции.

while (!completed)
{
    completed = t.Join(1);

    var q =
      from pair in wordFrequencies.ToArray() // <-- add ToArray here
      orderby pair.Value descending, pair.Key
      select new Tuple<string, int>(pair.Key, pair.Value);

    outputter.WriteBatch(q);
}            
Спасибо! это проясняет ситуацию, мне не нравилась идея взять полную блокировку на одновременную коллекцию (интересно, будет ли это когда-нибудь хорошей идеей). Sanorita Rm
+1, спасибо. Кажется дажеToList() имеет то же самое состояние гонки, которое (по иронии судьбы) я пытался использовать для создания снимка в первую очередь.

Похожие вопросы