1. 多线程同步—— join, CountDownLatch和CyclicBarrier
- join: 主线程等待子线程结束
- CountDownLatch: 一个线程等待多个线程的场景,保证1个线程和多个线程之间的同步。
- CyclicBarrier: 线程之间同步并通知
1.1 join
public class JoinExample {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
System.out.println("I am in thread t1");
});
t1.start();
try {
t1.join();
} catch (Exception InterruptedException) {
Thread.currentThread().interrupt();
}
System.out.println("in main thread");
}
}
输出:
I am in thread t1
in main thread
加了join后,主线程会等待t1结束,所以”in main thread” 永远在最后打印。
1.2 CountDownlantch
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class CountDownLatchExample {
public static void main(String[] args) {
Executor executor = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; i++) {
CountDownLatch latch = new CountDownLatch(2);
executor.execute(() -> {
System.out.println("Operation 1");
latch.countDown();
});
executor.execute(() -> {
System.out.println("Operation 2");
latch.countDown();
});
try {
latch.await();
System.out.println("operation 1 and operation 2 finished.");
} catch (InterruptedException ie) {
}
}
}
}
Operation 1
Operation 2
operation 1 and operation 2 finished.
Operation 1
Operation 2
operation 1 and operation 2 finished.
通过循环初始化CountDownLatch, 使得主线程可以重复等待线程t1和t2的的操作执行完成。
1.3 CyclicBarrier
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class CyclicBarrierExample {
public static void main(String[] args) {
Executor executor = Executors.newFixedThreadPool(2);
CyclicBarrier barrier = new CyclicBarrier(2, () -> {
System.out.println("t1 and t2 execute finished");
// 打开注释,有10s睡眠的时候,第二次循环的执行,等待了10s,说明。只有回调执行完成后,才会重置计数器。
// try {
// Thread.sleep(10000);
// } catch (InterruptedException ie) {
//
// }
});
Thread t1 = new Thread(() -> {
for (int i = 0; i < 2; i++) {
System.out.println("thread t1");
try {
barrier.await();
} catch (BrokenBarrierException be) {
} catch (InterruptedException ie) {
}
}
});
t1.start();
Thread t2 = new Thread(() -> {
for (int i = 0; i < 2; i++) {
System.out.println("thread t2");
try {
barrier.await();
} catch (BrokenBarrierException be) {
} catch (InterruptedException ie) {
}
}
});
t2.start();
try {
t1.join();
t2.join();
} catch (Exception ex) {
}
}
}
CyclicBarrier的计数器是可以循环利用的,并且具备自动重置的功能,一旦计数器减到0,会自动重置到你设置的初始值。
2. 线程池
创建一个线程,需要调用操作系统内核的API,操作系统要为线程分配一系列的资源,陈本很高,线程是一个重量级的对象,应该避免频繁创建和销毁。可以通过线程池避免。
2.1 如何实现一个线程池
线程池的设计和实现,目前普遍采用的是生产者-消费者模式。线程池的使用方是生产者,线程池本身是消费者。
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class ThreadPoolSimple {
private int coreSize;
private BlockingQueue<Runnable> blockingQueue;
private List<Thread> threadList = new LinkedList<>();
public ThreadPoolSimple(int coreSize, BlockingQueue<Runnable> blockingQueue) {
this.coreSize = coreSize;
this.blockingQueue = blockingQueue;
for (int i = 0; i < coreSize; i++) {
Thread thread = new Thread(() -> {
while (true) {
try {
Runnable task = blockingQueue.take();
task.run();
} catch (InterruptedException ie) {
}
}
});
thread.start();
threadList.add(thread);
}
}
public void execute(Runnable runnable) {
blockingQueue.offer(runnable);
}
}
public class ThreadPoolExample {
static int i;
public static void main(String[] args) {
ThreadPoolSimple threadPoolSimple = new ThreadPoolSimple(2, new LinkedBlockingQueue<>(5));
for (i = 0; i < 3; i++) {
threadPoolSimple.execute(()-> {
System.out.println("thread: " + Thread.currentThread().getName());
});
}
}
}
线程池是一个生产者和消费者模型,创建线程池的时候,创建一个阻塞队列和初始化线程,线程里面执行从阻塞队列读取任务并执行。线程池的使用者,调用execute接口的时候,线程池只是将待执行的任务加入阻塞队列。
2.2 Java 线程池
ThreaddPoolExecutor(
int corePoolSize,
int maximumSize,
long keepAliveTime,
TimeUnit unit,
Blocking<Runnable> workQueue,
ThreadFactory threadFactory,
RejectExecutionHandler handler)
- corePoolSize: 表示线程池保有的最小线程数。
- maxiumuPoolSize: 线程池创建的最大线程数。
- keepAliveTime & unit: 定义线程的空闲多久被回收
- workQueue: 阻塞队列
- threadFactory: 定义如何创建线程
- handler: 通过这个参数定义了工作队列满了的任务拒绝策略。
ThreadPoolHander定义了4种handler:
- CallerRunsPolicy: 提交任务的线程自己去执行任务。
- AbortPolicy: 默认的拒绝策略,抛出throws RejectedExecutionException。
- DiscardPolicy: 直接丢弃任务,没有任何异常抛出。
- DiscardOldestPolicy: 丢弃最老的任务,然后把最新的任务加入到工作队列。
- 强烈建议使用有界队列,高负责情境下,无界队列很容易导致OOM。
- 慎重使用拒绝策略,RejectedExectionException是一个运行时异常,很容易被忽略,所以最好定义自己的拒绝策略。
- 如果execute提交的任务执行过程种异常,会导致执行任务的线程中止。捕获执行过程中的异常,按需处理。防止任务异常了,还没有收到任何通知。
1
2
3
4
5
6
try {
} catch(RuntimeExecption re) {
} catch(Throwable t) {
}