Избегайте ненужного исключения ConcurrentModificationException при итерации

У меня есть большое количество вещей, поток, который многократно перебирает их, и отдельный поток, который время от времени удаляет или добавляет отдельные вещи. Вещи находятся в синхронизированном связанном списке:

private List<Thing> things = Collections.synchronizedList(new LinkedList<Thing>());

Мой первый поток перебирает их:

while (true) {
    for (Thing thing : things) {
        thing.costlyOperation();
    }
    // ...
}

Когда мой второй поток добавляет или удаляет что-то, приведенный выше итератор взрывается ConcurrentModificationException. Однако кажется, что удаление и добавление допустимы в моем вышеприведенном случае. Ограничения заключаются в том, что (A) метод costlyOperation не должен вызываться, когда вещь отсутствует в списке, и (B) должен вызываться сразу (на текущей или следующей итерации), когда вещь присутствует, и (C ) добавления и удаления через этот второй поток должны выполняться быстро, а не ждать.

Я мог бы выполнить синхронизацию на things перед входом в итерационный цикл, но это слишком долго блокирует другой поток. В качестве альтернативы я могу изменить Thing, чтобы включить флаг isDead, и выполнить хороший быстрый цикл iterator.remove(), чтобы избавиться от мертвых вещей перед циклом выше, в котором я снова проверю флаг. Но я хочу решить эту проблему рядом с циклом for с минимальным загрязнением. Я также рассматривал возможность создания копии списка (форма «отказоустойчивой итерации»), но это, похоже, нарушает ограничение A.

Потребуется ли мне кодировать мою собственную коллекцию и итератор? С нуля? Не упускаю ли я из виду, что CME на самом деле является полезным исключением даже в этом случае? Есть ли структура данных, стратегия или шаблон, который я мог бы использовать вместо итератора по умолчанию для решения этой проблемы?


person mk.    schedule 24.03.2015    source источник
comment
Как насчет CopyOnWriteArrayList? docs.oracle.com/javase/7/ документы/api/java/util/concurrent/   -  person copeg    schedule 24.03.2015
comment
Будет ли модель производитель/потребитель работать для ваших требований? Тогда вы могли бы вообще избежать итерации, избежать задержек из-за последовательного выполнения вызовов costlyOperation() и удалить элементы из очереди производителя до выполнения, если они становятся мертвыми.   -  person Kevin Condon    schedule 24.03.2015
comment
Ваше ограничение A в основном требует полной синхронизации в списке для правильной работы; в зависимости от того, когда в costlyOperation thing должен быть в things чек, ничто другое не может гарантировать это. У @KevinCondon есть лучшее решение, если вы можете немного расслабиться, ИМО.   -  person Sbodd    schedule 24.03.2015
comment
@KevinCondon Возможно, но потребляемые элементы необходимо повторно вставлять, что может быть беспорядочным, потому что кажется, что вы в конечном итоге жонглируете двумя списками, а второй поток может многократно добавлять или удалять что-то, поэтому трудно сказать.   -  person mk.    schedule 24.03.2015
comment
@Sbodd да, но полная синхронизация внутри цикла, как в ответе yshavit (а не вне всего цикла), кажется, удовлетворяет (A), нет?   -  person mk.    schedule 24.03.2015
comment
Да, если удержание блокировки для одной costlyOperation не нарушает ваше ограничение C, то ответ yshavit выглядит так, как будто он обеспечивает хорошую строгую гарантию A.   -  person Sbodd    schedule 24.03.2015
comment
Я добавил второй ответ, который использует Executor для реализации модели производителя/потребителя для этой проблемы.   -  person Kevin Condon    schedule 25.03.2015


Ответы (4)


Вы можете получить CME, когда базовая структура итератора изменяется во время использования итератора — независимо от параллелизма. На самом деле, довольно легко получить его только в одном потоке:

List<String> strings = new ArrayList<String>();
Iterator<String> stringsIter = strings.iterator();
strings.add("hello"); // modify the collection
stringsIter.next();   // CME thrown!

Точная семантика зависит от коллекции, но обычно CME возникает всякий раз, когда вы изменяете коллекцию после создания итератора. (Это при условии, что итератор специально не разрешает одновременный доступ, конечно, как это делают некоторые коллекции в java.util.concurrent).

Наивным решением является синхронизация по всему циклу while (true), но, конечно, вы не хотите этого делать, потому что тогда вы получите блокировку на кучу дорогостоящих операций.

Вместо этого вам, вероятно, следует скопировать коллекцию (под замком), а затем работать с копией. Чтобы убедиться, что вещь все еще находится в things, вы можете перепроверить ее внутри цикла:

List<Thing> thingsCopy;
synchronized (things) {
    thingsCopy = new ArrayList<Thing>(things);
}
for (Thing thing : thingsCopy) {
    synchronized (things) {
        if (things.contains(thing)) {
            thing.costlyOperation();
        }
    }
}

(Вы также можете использовать одну из вышеупомянутых коллекций java.util.concurrent, которые позволяют вносить изменения во время итерации, например CopyOnWriteArrayList.)

Конечно, теперь у вас все еще есть синхронизированный блок вокруг costlyOperation. Вы можете переместить только проверку contains в синхронизированный блок:

boolean stillInThings;
synchronized (things) {
    stillInThings = things.contains(thing);
}

... но это только снижает расистость, но не устраняет ее. Достаточно ли этого для вас, зависит от семантики вашего приложения. Как правило, если вы хотите, чтобы дорогостоящая операция выполнялась только тогда, когда вещь находится в вещах, вам нужно поставить вокруг нее какой-то замок.

Если это так, вы можете рассмотреть использование ReadWriteLock вместо synchronized блоков. Это немного более опасно (в том смысле, что язык позволяет вам делать больше ошибок, если вы не всегда снимаете блокировку в блоке finally), но оно того стоит. Основной шаблон будет заключаться в том, чтобы заставить thingsCopy и перепроверить работать под блокировкой чтения, в то время как изменения в списке происходят под блокировкой записи.

person yshavit    schedule 24.03.2015
comment
Это примерно та безотказная итерация, о которой я упоминал. Проблема заключалась в том, что (помимо того, что копирование кажется излишне медленным) я мог в конечном итоге вызвать costlyOperation для Вещи, которая находится в thingsCopy, но больше не находится в things, что нарушает ограничение A. , Могу ли я как-то обойти это? - person mk.; 24.03.2015

Вы должны синхронизироваться при повторении

Из JavaDoc< /а>:

Крайне важно, чтобы пользователь вручную синхронизировал возвращенный список при повторении по нему:

List list = Collections.synchronizedList(new ArrayList());
...
synchronized (list) {
Iterator i = list.iterator(); // Должен находиться в синхронизированном блоке
while (i.hasNext())
foo(i.next());
}

Несоблюдение этого совета может привести к детерминированное поведение.

Твоих условий недостаточно

Вы используете связанный список. Таким образом, у вас есть структура, подобная этой:

class ListEntry{
    Thing data;
    ListEntry next;
}

Когда вы перебираете things, вы выбираете одну запись из списка, а затем что-то делаете с данными. Если эта запись списка будет удалена во время итерации, итерация завершится преждевременно, если для параметра next установлено значение null, или она может сделать что-то совершенно другое, например выполнить итерацию по другому списку, если запись будет переработана.

Вам нужно разработать надлежащую синхронизацию, ваша идея разрешить одновременную модификацию и использование данных без синхронизации — это рецепт катастрофы.

Предложение решения

Добавьте два списка pendingremoval и pendingaddition, для которых ваш фоновый поток может добавлять запросы. Когда ваш поток обработки завершит один цикл обработки контейнера, замените списки pending* пустыми новыми и синхронно обработайте удаления в своем потоке обработки.

Вот действительно небрежный пример, чтобы показать вам идею:

public class Processor {

    private List<Thing> pendingRemoval;
    private List<Thing> pendingAddition;
    private List<Thing> things;

    public void add(Thing aThing) {
        pendingAddition.add(aThing);
    }

    public void remove(Thing aThing) {
        pendingRemoval.add(aThing);
    }

    public void run() {
        while (true) {
            for (Thing thing : things) {
                if (!pendingRemoval.contains(thing)) {
                    thing.costlyOperation();
                }
            }

            synchronized (pendingRemoval) {
                things.removeAll(pendingRemoval);
                pendingRemoval.clear();
            }

            synchronized (pendingAddition) {
                things.addAll(pendingAddition);
                pendingAddition.clear();
            }
        }
    }
}

Редактировать: Забыли условие не обработки удаленных вещей

Edit2: В ответ на комментарии:

public class Processor {
    private Map<Thing, Integer> operations = new HashMap<Thing, Integer>();
    private List<Thing> things;

    public void add(Thing aThing) {
        synchronized (operations) {
            Integer multiplicity = operations.get(aThing);
            if (null == multiplicity) {
                multiplicity = 0;
            }
            operations.put(aThing, multiplicity + 1);
        }
    }

    public void remove(Thing aThing) {
        synchronized (operations) {
            Integer multiplicity = operations.get(aThing);
            if (null == multiplicity) {
                multiplicity = 0;
            }
            operations.put(aThing, multiplicity - 1);
        }
    }

    public void run() {
        while (true) {
            for (Thing thing : things) {
                Integer multiplicity;
                synchronized (operations) { 
                    multiplicity = operations.get(thing);
                }
                if (null == multiplicity || multiplicity > 0) {
                    thing.costlyOperation();
                }
            }

            synchronized (operations) {
                for (Map.Entry<Thing, Integer> operation : operations.entrySet()) {
                    int multiplicity = operation.getValue();
                    while(multiplicity<0){
                        things.remove(operation.getKey());
                    }
                    while(multiplicity>0){
                        things.add(operation.getKey());
                    }
                }
                operations.clear();
            }
        }
    }
}
person Emily L.    schedule 24.03.2015
comment
Предложение решения близко к тому, что я бы предложил (можно было бы рассмотреть Set вместо List, чтобы иметь O(1) для вызова contains, но это детали и зависят от конкретного случая применения). Я думаю, что это пока единственное решение, которое не требует копирования и применимо в целом (поскольку оно не требует модификаций класса Thing) - person Marco13; 24.03.2015
comment
Если вещь добавляется, а затем сразу же удаляется, things будет содержать вещь, которой там быть не должно. Как я могу гарантировать, что этого (или подобного сценария) не произойдет? - person mk.; 24.03.2015
comment
@mk см. Редактирование действительно примитивного подхода, который должен работать. Вы также можете реализовать класс команд с флагом добавления/удаления и тем, что нужно добавить/удалить. Затем создайте очередь из этих команд и обработайте их соответствующим образом внутри цикла обработки. - person Emily L.; 25.03.2015

Вы можете использовать что-то вроде ConcurrentLinkedQueue. Однако в конечном итоге можно будет выполнить thing.costlyOperation() для элемента, который был удален из списка в другом потоке из-за слабо согласованного итератора. Если вы можете добавить потокобезопасное свойство dead в класс Thing, которое сокращает действие в costlyOperation(), то вы сможете избежать дорогостоящей операции даже со слабо согласованным итератором. Просто установите свойство элемента при удалении его из списка.

Удалить из списка:

Thing thing = ...;
thing.setDead(true);
things.remove(thing);

Отметить вещь:

private volatile boolean dead = false;

public void costlyOperation() {
    if (dead) { return; }
    // do costly stuff...
}
person Kevin Condon    schedule 24.03.2015

Воспользуйтесь преимуществами Executor реализации и использовать модель производитель/потребитель. Вместо того, чтобы добавлять в список, который вы должны повторить, отправьте costlyOperation() исполнителю в любом потоке, который в данный момент добавляется в список.

private final Executor exec = ...;

private void methodThatAddsThings(final Thing t) {
    exec.execute(new Runnable() {
        public void run() {
          t.costlyOperation();
        }
    });
}

Кажется, это удовлетворяет A, B и C без необходимости делиться списком или синхронизировать что-либо. Теперь у вас также есть варианты масштабирования путем ограничения рабочей очереди или ограничения количества потоков обработки.

Если вам нужна возможность остановить ожидающую дорогостоящую операцию, поток, удаляющий Things, может использовать ExecutorService вместо Executor. Вызов submit() вернет Future, который вы можете сохранить на карте. При удалении Thing найдите его Future на карте и вызовите cancel() Future, чтобы предотвратить его запуск.

person Kevin Condon    schedule 24.03.2015