Multithreading in Java
Доступ к разделяемому ресурсу
- Блокировка (mutex, mutual exclusion)
- Неблокирующий доступ:
- Lock-free: гарантируется общий прогресс
- Wait-free: гарантируется прогресс в потоке исполнения
Блокирующий доступ
- Для блокировки используется монитор или лок
- Один и тот же ресурс должен блокироваться одним и тем же монитором
- Блокировать надо как чтение, так и запись
Synchronized
synchronized(obj) {...}
: монитор — объект objsynchronized void method()
: монитор — объект thissynchronized static void method()
: монитор — объект .class
Пример:
public class Container {
private final List<String> list = new ArrayList<>();
synchronized void addEntry(String s) {
list.add(s);
}
int size() {
return list.size();
}
public static void main(String[] args) throws InterruptedException {
Container container = new Container();
Runnable foo = () -> {
for(int i=0; i<100000; i++) {
container.addEntry("foo");
}
};
List<Threads> threads = new ArrayList<>();
for(long count=10; count>0; count--) {
Thread thread = new Thread(foo);
thread.start();
threads.add(thread);
}
for (Thread thread : threads) {
thread.join(); // дождаться завершения thread
}
System.out.println(list.size());
}
}
Атомарность (atomicity)
Операция атомарна, если невозможно наблюдать частичный результат ее выполнения.
Любой наблюдатель видит либо состояние системы до атомарной операции, либо после.
- В Java:
- Запись в поле типа boolean, byte, short, char, int, float, Object всегда атомарна
- Запись в поле long/double: атомарна запись старших и младших 32 бит
- Запись в поле типа long/double, объявленное volatile, атомарна
!! arithmetic operations are not atomic !!
- Арифметические операции не атомарны!
x++
- не атомарнаx *= 2
- не атомарна
/* НЕ работает!!! */
class Counter {
volatile int x = 1;
}
public class Main {
public static void main(String[] args) {
Counter c = new Counter();
Runnable r = () -> {
for (int i = 0; i < 1000000; i++) c.x++;
};
List<Thread> threads = Stream.generate(() -> new Thread(r))
.limit(10).peek(Thread::start)
.collect(Collectors.toList());
for(Thread thread: threads)
thread.join();
System.out.println(c.x);
}
}
Когда завершилась операция?
- A = 1
Когда приступили к выполнению следующей операции?Когда значение 1 оказалось в основной памяти в ячейке соответствующей переменной А?Когда результат изменился в точке чтения переменной А?
Видимость (visibility)
- Результат операции write A, выполненной в потоке 1, виден в операции read A, выполненной в потоке 2
- Видимость определена только для конкретных потоков 1 и 2, нет “глобальной видимости”
Порядок (ordering)
- A happens before B (A hb B), если все записи, выполненные до точки A (включительно),
видны в любой операции чтения после точки B (включительно) - A hb B, B hb C -> A hb C
Простые правила happens before
- Для двух операций A и B в одном потоке A hb B, если A раньше B в тексте программы (program order)
- Завершение конструктора объекта X hb начало finalize X
- Вызов thread.start() hb первое действие в потоке thread
- Последнее действие в потоке thread hb thread.join()
- Инициализация объекта по умолчанию hb любое другое действие
synchronized
- Между синхронизациями по одному объекту установлен полный порядок (total order)
- Завершение синхронизации (monitor exit) hb начало последующей синхронизации
по тому же объекту (monitor enter)
Volatile
- Запись и чтение в поле, объявленное volatile, называется volatile write, volatile read
- Речь идет непосредственно о записи, а не о записи членов/элементов массива
volatile int[] x;
x = new int[10]; // volatile write
x[0] = 1; // volatile read, plain write
- volatile write hb volatile read, который прочитал это значение
Singleton (Double checked locking fixed)
public class Container {
private static volatile Container INSTANCE;
int x = 1;
private Container() {
if (INSTANCE == null) {
synchronized (Container.class) {
if (INSTANCE == null) {
INSTANCE = new Container();
}
}
}
return INSTANCE;
}
}
final
- Если поток увидел ссылку на объект и она не утекала из конструктора,
то он гарантированно увидит все final - поля, записанные в конструкторе - Неизменяемые объекты — друзья многопоточности!
Singleton (initialization on demand holder idiom)
public class Container {
int x = 1;
static Container getInstance() {
return Holder.INSTANCE;
}
private static class Holder {
static final Container INSTANCE = new Container();
}
}
Dead Lock
- Взаимное исключение (неразделяемые ресурсы)
- Минимум два ресурса (один держим, один просим)
- Ресурс освобождается только добровольно тем, кто его держит
- Направленный граф ожидания имеет цикл
Пример dead lock:
class Main {
static void transfer(Queue<String> in, Queue<String> out) {
synchronized (in) {
synchronized (out) {
String res = in.poll();
if (res != null) {
out.add(res);
}
}
}
}
public static void main(String[] args) {
Queue<String> in = new ArrayDeque<>(Arrays.asList("foo", "bar", "baz"));
Queue<String> out = new ArrayDeque<>(Arrays.asList("foo", "bar", "baz"));
Thread t1 = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
System.out.println("Thread1: " + i);
transfer(in, out);
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
System.out.println("Thread2: " + i);
transfer(out, in);
}
});
System.out.println("Started");
t1.start(); t2.start();
t1.join(); t2.join();
System.out.println("Finished");
}
}
Live Lock
- Потоки постоянно меняют состояние, но прогресса нет
Ожидание условия
public class Main {
private boolean content = false;
public synchronized void waitForContent(){
while (!content) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Content has been arrived");
}
public synchronized void deliverContent(){
content = true;
notifyAll();
}
}
Стандартная библиотека java.util.concurrent
- Атомарные переменные
- Неблокирующие коллекции
- Блокирующие коллекции
- Примитивы синхронизации
- Пулы потоков
- CompletableFuture
Атомарные переменные (j.u.c.atomic)
- AtomicBoolean/Integer/Long/Reference
AtomicIntegerReference AtomicBooleanReference ...
- AtomicInteger/Long/ReferenceArray
AtomicIntegerReferenceArray AtomicLongReferenceArray
- AtomicInteger/Long/ReferenceFieldUpdater
AtomicIntegerReferenceFieldUpdater ...
- Long/Double/Accumulator
LongAccumulator DoubleAccumulator
- Long/Double/Adder
LongAdder DoubleAdder
Compare and Set
- (expect, update) -> result
- Базовый примитив для всех lock-free алгоритмов
public class Main {
private final AtomicBoolean flag = new AtomicBoolean(false);
void doOnce(Runnable action) {
if (flag.compareAndSet(false, true)) {
action.run();
}
}
}
Compare and set через updater
public class Doer {
private volatile int flag = 0;
private static final AtomicIntegerFieldUpdater<Doer> FLAG_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Doer.class, "flag");
void doOnce(Runnable action){
if (FLAG_UPDATER.compareAndSet(this, 0, 1)){
action.run();
}
}
}
Compare and Set в действии
Реализация атомарного инкремента
import java.util.concurrent.atomic.AtomicInteger;
public class Main {
private final AtomicInteger count = new AtomicInteger(0);
// getAndIncrement есть в стандартной библиотеки
public int getAndIncrement(AtomicInteger count) {
int cur;
do {
cur = count.get();
} while (!count.compareAndSet(cur, cur + 1));
return cur;
}
// реализация атомарного метода
public int getAndDouble(AtomicInteger count) {
int cur;
do {
cur = count.get();
} while (!count.compareAndSet(cur, cur * 2));
return cur;
}
// getAndDouble с использованием библиотечного метода getAndUpdate
public int getAndDoubleStandard(AtomicInteger count) {
return count.getAndUpdate(val -> val * 2);
}
}
Атомарное обновление нескольких взаимосвязанных переменных
Пример! Изменение координат точки на плоскости
Для изменения логически связанных переменных, создаем объект
и меняем сам объект атомарно
import java.util.concurrent.atomic.AtomicReference;
public class AtomicPoint {
private static class Point {
final int x, y;
private Point(int x, int y) {
this.x = x;
this.y = y;
}
Point rotateClockwise() {
return new Point(y, -x);
}
}
private final AtomicReference<Point> pt = new AtomicReference<>(new Point(0, 1));
public void rotateClockwise() {
pt.updateAndGet(Point::rotateClockwise);
}
}
Применение стандартной библиотеки j.u.concurrent
Методы стандартной библиотеки
- CountDownLatch - дождаться обнуления
- Semaphore - взять ресурс
- Exchanger - обменяться значениями
- CyclicBarrier - периодическая работа с синхронизацией
- Phaser - работа разделенная на фазы
- Executors - делегирование создания потоков
- CompletableFuture
- ReentrantLock
- ReentrantReadWriteLock
Тестирование метода doOnce (единоразовое увеличение на 1) см. выше
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
// количество потоков в зависимости от кол-ва ядер процессора
final int THREADS = Runtime.getRuntime().availableProcessors();
Doer doer = new Doer();
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger count = new AtomicInteger();
Runnable r = () -> {
try {
latch.await();
} catch (InterruptedException e) {
}
doer.doOnce(count::incrementAndGet);
};
List<Thread> threads = Stream.generate(() -> new Thread(r))
.limit(THREADS).peek(Thread::start)
.collect(Collectors.toList());
latch.countDown();
for (Thread thread: threads) {
thread.join();
}
if (count.get() != 1){
System.out.println("oops!");
}
}
}
Примеры CompletableFuture:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
class Main {
public static void main(String[] args) {
final int THREADS = Runtime.getRuntime().availableProcessors();
AtomicReference<Doer> doer = new AtomicReference<>();
AtomicInteger count = new AtomicInteger();
Runnable r = () -> doer.get().doOnce(count::incrementAndGet);
for (int i = 0; i < 10000; i++) {
count.set(0);
doer.set(new Doer());
CompletableFuture<?> future = CompletableFuture.allOf(
Stream.generate(() -> CompletableFuture.runAsync(r))
.limit(THREADS).toArray(CompletableFuture[]::new));
future.join();
if (count.get() != 1){
System.out.println("oops!");
}
}
}
}
import java.math.BigInteger;
import java.util.concurrent.CompletableFuture;
public class Main {
public static BigInteger factorial(int i) {
BigInteger res = BigInteger.ONE;
while (i > 1) {
res = res.multiply(BigInteger.valueOf(i));
i--;
}
return res;
}
public static BigInteger combinations(int n, int k) {
CompletableFuture<BigInteger> factN = CompletableFuture.supplyAsync(() -> factorial(n));
CompletableFuture<BigInteger> factK = CompletableFuture.supplyAsync(() -> factorial(k));
CompletableFuture<BigInteger> factNminusK = CompletableFuture.supplyAsync(() -> factorial(n - k));
return factN.thenCombine(factk, BigInteger::divide)
.thenCombine(factNminusK, BigInteger::divide).join();
}
}
ReentrantLock
Пример:
import java.util.concurrent.locks.ReentrantLock;
class Main {
private final List<String> list = new ArrayList<>();
private final ReentrantLock lock = new ReentrantLock(true); // честный, если true
String get(int i) {
lock.lock();
try {
return list.get(i);
} finally {
lock.unlock();
}
}
void add(String str) {
lock.lock();
try {
list.add(str);
} finally {
lock.unlock();
}
}
}
ReentrantReadWriteLock
Пример:
import java.util.concurrent.locks.ReentrantReadWriteLock;
class Main {
private final List<String> list = new ArrayList<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
String get(int i) {
ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
readLock.lock();
try {
return list.get(i);
} finally {
readLock.unlock();
}
}
void add(String str) {
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
writeLock.lock();
try {
list.add(str);
} finally {
writeLock.unlock();
}
}
}
Примитивы синхронизации, конкурентные коллекции, аннотации, рефлекшн
Не блокирующие коллекции
- ConcurrentLinkedQueue (LinkedList)
- ConcurrentLinkedDequeue (LinkedList/ArrayDequeue)
- CopyOnWriteArrayList (ArrayList)
- CopyOnWriteArraySet
- ConcurrentHashMap (+ newKeySet()) (HashMap/HashSet)
- ConcurrentSkipListMap (TreeMap)
- ConcurrentSkipListSet (TreeSet)
Принципы не блокирующих коллекций
- Простые операции атомарны
- Пакетные операции (addAll, removeAll) могут быть не атомарны
- Как правило, длина не хранится (isEmpty()!)
- Не кидает ConcurrentModificationException
- Обычно weakly-consistent (изменения после создания итератора могут быть видны или не видны)
CopyOnWriteArrayList/Set
- Содержимое хранится в массиве
- Модифицирующие операции синхронизированы
- Операции на чтение и iterator не синхронизированы
- Любое изменение копирует массив
- Итератор, forEach обходит снимок коллекции н начало итерации (итератор не может модифицировать)
- Сортировать можно Java 8+
- CopyOnWriteArraySet - делегат к CopyOnWriteArrayList
- Методы с множественными модификациями не желательны
ConcurrentHashMap
- До Java 8: совокупность сегментов, каждый из которых HashMap
- В Java 8 переписан, больше похож на обычный HashMap
- reduce/search/forEach* - создают параллельные задачи
- putIfAbsent, computeIfAbsent, merge и т.д. - атомарны, но могут вызывать функцию под синхронизацией
ConcurrentSkipListMap
- Конкурентная замена TreeMap
- Структура данных: skip-list, рандомизированна
- Есть ConcurrentSkipListSet
- computeIfAbsent может вызвать функцию и выкинуть результат
- merge может вызвать функцию несколько раз
Блокирующие коллекции
- SynchronousQueue (без памяти, одна вставка = одно удаление)
- ArrayBlockingQueue (фиксированный размер)
- LinkedBlockingQueue (размер может быть не фиксирован)
- PriorityBlockingQueue (PriorityQueue x ArrayBlockingQueue)