前言

并发编程是目的其实就是为了提高我们程序的性能,但是在日常的工作过程中,大部分开发人员可能没有机会去接触并发编程,而在并发编程中忽略的一些细节可能就会引发问题的产生,以下仅介绍我个人在并发编程中的经验及理解,希望能对你带来一些帮助。

JUC

大神Doug Lea在JDK1.5中提供了Java并发包(JUC),在java.util.concurrent这个包下提供了并发相关的工具类,例如锁、线程同步队列、线程池等等,但是如果你对其的原理不了解或者没有并发编程的相关经验,就很容易犯错。

线程池

使用线程池时,我们应该明确线程池中的基本参数,了解其作用,这样才能更准确、有效的使用线程池,在《阿里巴巴Java开发手册》中对于线程池的创建说明如下:

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors 返回的线程池对象的弊端如下:

1)FixedThreadPool 和 SingleThreadPool:

​ 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

2)CachedThreadPool 和 ScheduledThreadPool:

​ 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

所以在创建线程池时,需要明确该线程池的使用目的,有以下要点需要注意

  • 使用无界队列和设置队列大小时需要谨慎,避免OOM;
  • 线程数量和超时时间也需要明确分配,避免造成资源的浪费;
  • 对于线程池的使用,最好不要轻易的共用线程池,对于CPU密集型任务和IO密集型任务需要进行区分;
  • 最好重写线程工厂创建线程的newThread()方法定义线程名称,方便问题的排查;
  • 最好在饱和策略中进行日志输出,了解在什么时候进行了饱和策略及线程池当前的状态;
  • 捕获线程执行时的run()方法非受检异常,或者设置UncaughtExceptionHandler回调处理;

创建线程池时,务必重写ThreadFactory的newThread()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public Thread newThread(Runnable r) {
//设置线程名称
Thread thread = new Thread(r,prefix + "-" + thisPoolCount + "-"
+ name + "-" + count.incrementAndGet());
//是否为守护线程
thread.setDaemon(isDaemon);
//设置优先级
thread.setPriority(priority);
//设置异常处理器
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
return thread;
}

设置线程的名称及是否为守护线程、优先级等属性,以及设置线程的异常处理器;如果线程中没有自己捕获非受检异常,当这个异常发生时,如果设置了异常处理器,那么我们可以通过UncaughtExceptionHandle的回调方法来处理该异常。

尤其是设置线程名称这一点尤为重要,如果项目中使用了多个线程池,在线程抛出异常或使用jstack命令查看堆栈信息时,看到的线程池中的线程名称都是pool-x-thread-x,这样不利于快速排查出现问题的所在线程,所以在创建线程池时,应该将线程的名称设置为与业务相关。

重写RejectedExecutionHandler的rejectedExecution()方法,在饱和策略中打印相关信息,以便排查问题:

1
2
3
4
5
6
7
8
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//饱和策略打印相关信息
log.warn("线程" + r + "执行饱和策略,当前线程池大小:" + executor.getPoolSize() +
",当前活跃线程数:"+executor.getActiveCount() + ",当前队列任务数:" +
executor.getQueue().size() + ",任务数:"+executor.getTaskCount());
throw new RejectedExecutionException("线程池已满,执行饱和策略");
}

重写ThreadPoolExecutor的beforeExecute()和afterExecute()方法,记录任务执行时间:

1
2
3
4
5
6
7
8
9
10
@Override
protected void beforeExecute(Thread t, Runnable r) {
startTime.set(System.currentTimeMillis());
log.info("开始执行任务");
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
log.info("任务执行完成,执行耗时:" + System.currentTimeMillis() - startTime.get());
}

并发Map

在并发编程中,我们经常使用的并发Map应该就是ConcurrentHashmap了,但是很多开发人员对于并发Map的使用可能也是有问题的,以下是一个错误的案例:

1
2
3
4
5
6
7
8
9
10
private Map<String,String> concurrentHashmap = new ConcurrentHashMap<>();

public String getValue(String key){
String value = concurrentHashmap.get(key);
if(null == value){
value = createValue(key);
concurrentHashmap.put(key,value);
}
return value;
}

在这个案例中,value = createValue(key)这行代码可能会有并发问题,在这行代码中,存在多个线程同时执行的情况,那么在这种情况下,map中存放的值就会成为薛定谔的值,如果两个线程同时执行,就会丢失掉一次创建的value值,因为最终保存到该key中的value只有一个。

可以使用双重校验锁来规避以上这种情况:

1
2
3
4
5
6
7
8
9
10
11
12
public String getValue1(String key){
String value = concurrentHashmap.get(key);
if(null == value){
synchronized (concurrentHashmap) {
if (null == concurrentHashmap.get(key)) {
value = createValue(key);
concurrentHashmap.put(key, value);
}
}
}
return value;
}

也可以使用ConcurrentHashmap中自带的putIfAbsent()方法:

1
2
3
4
5
6
7
8
9
10
11
public String getValue2(String key){
String value = concurrentHashmap.get(key);
if(null == value){
value = createValue(key);
String old = concurrentHashmap.putIfAbsent(key, value);
if(null != old){
value = old;
}
}
return value;
}

具体采取哪种方法可能需要根据自身业务的场景来决定,但在双重校验锁中不会重复创建value,而putIfAbsent()方法是需要先创建value才决定是否put进去,这也是需要进行权衡的一点。

SimpleDateFormat

SimpleDateFormat类相信大家都不陌生,我们经常采用它来进行时间的格式化操作,但是在使用这个类时也需要考虑其线程安全的问题,如果直接将该类作为全局变量来使用,那么就会引起错误:

1
2
3
4
5
6
7
public class Test {
private SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy");

public String formatDate(Date date) {
return sdf.format(date);
}
}

查看SimpleDateFormat类的format()方法,发现其使用了全局共享变量calendar,因为该变量并不是线程安全的,所以在并发执行过程中就会发生错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected Calendar calendar;

public StringBuffer format(Date date, StringBuffer toAppendTo,
FieldPosition pos)
{
pos.beginIndex = pos.endIndex = 0;
return format(date, toAppendTo, pos.getFieldDelegate());
}

private StringBuffer format(Date date, StringBuffer toAppendTo,
FieldDelegate delegate) {
// Convert input date to time field list
calendar.setTime(date);
...
}

可以采用局部变量的方式使用SimpleDateFormat来解决该问题:

1
2
3
4
public String formatDate(Date date) {
SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy");
return sdf.format(date);
}

也可以采用ThreadLocal让每个线程持有一份SimpleDateFormat对象:

1
2
3
4
5
ThreadLocal<SimpleDateFormat> local = new ThreadLocal<>();
public String formatDate(Date date) {
SimpleDateFormat sdf = local.get();
return sdf.format(date);
}

当然,也可以采用加锁的方式来避免问题的产生,在JDK8中,也可以采用DateTimeFormatter类来替代时间格式化的功能。

ThreadLocal

ThreadLocal 可能我们平常很少用到,但是在我们所使用的工具或框架中,经常能看见它的身影;ThreadLocal实际上就是用来维护本地线程中的变量,在Thread类的源码中,变量如下:

1
ThreadLocal.ThreadLocalMap threadLocals = null;

ThreadLocal 的 get() 及 set() 方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

ThreadLocal 的使用也很简单,使用案例如下:

1
2
3
4
5
6
ThreadLocal<String> localName = new ThreadLocal<>();

public void xxx(String name){
localName.set(name);
//以下省略80行代码
}

当两个线程同时执行xxx()方法时,线程A的name值为zhangsan,线程B的name值为lisi,则示意图如下:

所以,当两个线程执行如下代码时,线程A获取的值为zhangsan,而线程B获取的值为lisi:

1
String name = localName.get();

每个ThreadLocal变量的值都保存到了各自线程中Thread的threadLocals变量里,也就是ThreadLocalMap中,如果创建多个ThreadLocal变量,则示意图如下:

谨防内存泄漏

需要注意的一点是,在ThreadLocalMap中,它的Key是一个弱引用;也就是说它的Key在没有被外部对象强引用时,将会在下次GC时被回收,而它的Value却没有被回收,而这个访问不到的Value对象一直不会被回收,就发生了内存泄漏。

ThreadLocalMap的结构如下,通过Entry来保存K/V结构的数据,而key只能为ThreadLocal类型:

1
2
3
4
5
6
7
8
9
10
11
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
private Entry[] table;
...
}

正确的使用ThreadLocal对象,应该在每次set()及get()处理完逻辑后,显式的调用remove()方法来清除数据来避免内存泄漏;在使用线程池的场景下,如果没有及时的清理ThreadLocal,则可能导致更严重的逻辑错误。