ThreadLocal

ThreadLocal 有什么用?

通常情况下,我们创建的变量可以被任何一个线程访问和修改。这在多线程环境中可能导致数据竞争和线程安全问题。那么,如果想让每个线程都有自己的专属本地变量,该如何实现呢?

JDK 中提供的 ThreadLocal 类正是为了解决这个问题。ThreadLocal 类允许每个线程绑定自己的值,可以将其形象地比喻为一个“存放数据的盒子”。每个线程都有自己独立的盒子,用于存储私有数据,确保不同线程之间的数据互不干扰。

当你创建一个 ThreadLocal 变量时,每个访问该变量的线程都会拥有一个独立的副本。这也是 ThreadLocal 名称的由来。线程可以通过 get() 方法获取自己线程的本地副本,或通过 set() 方法修改该副本的值,从而避免了线程安全问题。

举个简单的例子:假设有两个人去宝屋收集宝物。如果他们共用一个袋子,必然会产生争执;但如果每个人都有一个独立的袋子,就不会有这个问题。如果将这两个人比作线程,那么 ThreadLocal 就是用来避免这两个线程竞争同一个资源的方法。

public class ThreadLocalExample {
    private static ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);

    public static void main(String[] args) {
        Runnable task = () -> {
            int value = threadLocal.get();
            value += 1;
            threadLocal.set(value);
            System.out.println(Thread.currentThread().getName() + " Value: " + threadLocal.get());
        };

        Thread thread1 = new Thread(task, "Thread-1");
        Thread thread2 = new Thread(task, "Thread-2");

        thread1.start(); // 输出: Thread-1 Value: 1
        thread2.start(); // 输出: Thread-2 Value: 1
    }
}

⭐️ThreadLocal 原理了解吗?

从 Thread类源代码入手。

public class Thread implements Runnable {
    //......
    //与此线程有关的ThreadLocal值。由ThreadLocal类维护
    ThreadLocal.ThreadLocalMap threadLocals = null;

    //与此线程有关的InheritableThreadLocal值。由InheritableThreadLocal类维护
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    //......
}

从上面Thread类 源代码可以看出Thread 类中有一个 threadLocals 和 一个 inheritableThreadLocals 变量,它们都是 ThreadLocalMap 类型的变量,我们可以把 ThreadLocalMap 理解为ThreadLocal 类实现的定制化的 HashMap。默认情况下这两个变量都是 null,只有当前线程调用 ThreadLocal 类的 setget方法时才创建它们,实际上调用这两个方法的时候,我们调用的是ThreadLocalMap类对应的 get()set()方法。

ThreadLocal类的set()方法

public void set(T value) {
    //获取当前请求的线程
    Thread t = Thread.currentThread();
    //取出 Thread 类内部的 threadLocals 变量(哈希表结构)
    ThreadLocalMap map = getMap(t);
    if (map != null)
        // 将需要存储的值放入到这个哈希表中
        map.set(this, value);
    else
        createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

通过上面这些内容,我们足以通过猜测得出结论:最终的变量是放在了当前线程的 ThreadLocalMap 中,并不是存在 ThreadLocal 上,ThreadLocal 可以理解为只是ThreadLocalMap的封装,传递了变量值。 ThrealLocal 类中可以通过Thread.currentThread()获取到当前线程对象后,直接通过getMap(Thread t)可以访问到该线程的ThreadLocalMap对象。

每个Thread中都具备一个ThreadLocalMap,而ThreadLocalMap可以存储以ThreadLocal为 key ,Object 对象为 value 的键值对。

比如我们在同一个线程中声明了两个 ThreadLocal 对象的话, Thread内部都是使用仅有的那个ThreadLocalMap 存放数据的,ThreadLocalMap的 key 就是 ThreadLocal对象,value 就是 ThreadLocal 对象调用set方法设置的值。

ThreadLocalMapThreadLocal的静态内部类。

⭐️ThreadLocal 内存泄露问题是怎么导致的?

ThreadLocal 内存泄漏的根本原因在于其内部实现机制。

通过上面的内容我们已经知道:每个线程维护一个名为 ThreadLocalMap 的 map。 当你使用 ThreadLocal 存储值时,实际上是将值存储在当前线程的 ThreadLocalMap 中,其中 ThreadLocal 实例本身作为 key,而你要存储的值作为 value。

ThreadLocalMapset()createMap() 方法中,并没有直接存储 ThreadLocal 对象本身,而是使用 ThreadLocal 的哈希值计算数组索引,最终存储于类型为static class Entry extends WeakReference<ThreadLocal<?>>的数组中。

ThreadLocalMap 的 Entry 定义如下:

static class Entry extends WeakReference<ThreadLocal<?>> {
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

ThreadLocalMap 的 key 和 value 引用机制:

key 是弱引用ThreadLocalMap 中的 key 是 ThreadLocal 的弱引用 (WeakReference<ThreadLocal<?>>)。 这意味着,如果 ThreadLocal 实例不再被任何强引用指向,垃圾回收器会在下次 GC 时回收该实例,导致 ThreadLocalMap 中对应的 key 变为 nullvalue 是强引用:即使 key 被 GC 回收,value 仍然被 ThreadLocalMap.Entry 强引用存在,无法被 GC 回收。

ThreadLocal 实例失去强引用后,其对应的 value 仍然存在于 ThreadLocalMap 中,因为 Entry 对象强引用了它。如果线程持续存活(例如线程池中的线程),ThreadLocalMap 也会一直存在,导致 key 为 null 的 entry 无法被垃圾回收,即会造成内存泄漏。

也就是说,内存泄漏的发生需要同时满足两个条件:

  1. ThreadLocal 实例不再被强引用;
  2. 线程持续存活,导致 ThreadLocalMap 长期存在。

虽然 ThreadLocalMapget(), set()remove() 操作时会尝试清理 key 为 null 的 entry,但这种清理机制是被动的,并不完全可靠。

如何避免内存泄漏的发生?

  1. 在使用完 ThreadLocal 后,务必调用 remove() 方法。 这是最安全和最推荐的做法。 remove() 方法会从 ThreadLocalMap 中显式地移除对应的 entry,彻底解决内存泄漏的风险。 即使将 ThreadLocal 定义为 static final,也强烈建议在每次使用后调用 remove()
  2. 在线程池等线程复用的场景下,使用 try-finally 块可以确保即使发生异常,remove() 方法也一定会被执行。

⭐️如何跨线程传递 ThreadLocal 的值?

由于 ThreadLocal 的变量值存放在 Thread 里,而父子线程属于不同的 Thread 的。因此在异步场景下,父子线程的 ThreadLocal 值无法进行传递。

如果想要在异步场景下传递 ThreadLocal 值,有两种解决方案:

  • InheritableThreadLocalInheritableThreadLocal 是 JDK1.2 提供的工具,继承自 ThreadLocal 。使用 InheritableThreadLocal 时,会在创建子线程时,令子线程继承父线程中的 ThreadLocal 值,但是无法支持线程池场景下的 ThreadLocal 值传递。
  • TransmittableThreadLocalTransmittableThreadLocal (简称 TTL) 是阿里巴巴开源的工具类,继承并加强了InheritableThreadLocal类,可以在线程池的场景下支持 ThreadLocal 值传递。项目地址:https://github.com/alibaba/transmittable-thread-local
  public static void main(String[] args) {

        ThreadLocal<Object> objectThreadLocal = new ThreadLocal<>();
        ThreadLocal<String> inheritableThreadLocal = new InheritableThreadLocal<>();

        objectThreadLocal.set("objectThreadLocal");
        inheritableThreadLocal.set("inheritableThreadLocal");

        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("子线程获取父类ThreadLocal数据:" + objectThreadLocal.get());
                System.out.println("子线程获取父类inheritableThreadLocal数据:" + inheritableThreadLocal.get());
            }
        }).start();
        
    }

输出:

子线程获取父类ThreadLocal数据:null
子线程获取父类inheritableThreadLocal数据:inheritableThreadLocal

线程池

什么是线程池?

顾名思义,线程池就是管理一系列线程的资源池。当有任务要处理时,直接从线程池中获取线程来处理,处理完之后线程并不会立即被销毁,而是等待下一个任务。

⭐️为什么要用线程池?

池化技术想必大家已经屡见不鲜了,线程池、数据库连接池、HTTP 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

线程池提供了一种限制和管理资源(包括执行一个任务)的方式。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

如何创建线程池?

方式一:通过ThreadPoolExecutor构造函数来创建(推荐)。

方式二:通过 Executor 框架的工具类 Executors 来创建。

Executors工具类提供的创建线程池的方法如下图所示:

可以看出,通过Executors工具类可以创建多种类型的线程池,包括:

  • FixedThreadPool:固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
  • SingleThreadExecutor: 只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
  • CachedThreadPool: 可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
  • ScheduledThreadPool:给定的延迟后运行任务或者定期执行任务的线程池。

创建一个线程池

        String threadNamePrefix = "jwq_test";
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNamePrefix(threadNamePrefix + "-%d")
                .setDaemon(true).build();

        ExecutorService threadPool = new ThreadPoolExecutor(
                5, //核心线程数
                10, //最大线程数
                1, //线程空闲时间
                TimeUnit.MINUTES, //线程空闲时间
                new ArrayBlockingQueue<>(5), //任务队列
                threadFactory, //线程工厂
               // new ThreadPoolExecutor.CallerRunsPolicy()//拒绝策略
                new ThreadPoolExecutor.AbortPolicy()//拒绝策略
        );

⭐️为什么不推荐使用内置线程池?

在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。

为什么呢?

使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

另外,《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

Executors 返回线程池对象的弊端如下(后文会详细介绍到):

  • FixedThreadPoolSingleThreadExecutor:使用的是阻塞队列 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可以看作是无界的,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool:使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM。
  • ScheduledThreadPoolSingleThreadScheduledExecutor:使用的无界的延迟阻塞队列DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
public static ExecutorService newFixedThreadPool(int nThreads) {
    // LinkedBlockingQueue 的默认长度为 Integer.MAX_VALUE,可以看作是无界的
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

}

public static ExecutorService newSingleThreadExecutor() {
    // LinkedBlockingQueue 的默认长度为 Integer.MAX_VALUE,可以看作是无界的
    return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));

}

// 同步队列 SynchronousQueue,没有容量,最大线程数是 Integer.MAX_VALUE`
public static ExecutorService newCachedThreadPool() {

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());

}

// DelayedWorkQueue(延迟阻塞队列)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

⭐️线程池常见参数有哪些?如何解释?

    /**
     * 用给定的初始参数创建一个新的ThreadPoolExecutor。
     */
    public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
                              int maximumPoolSize,//线程池的最大线程数
                              long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                              TimeUnit unit,//时间单位
                              BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
                              ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
                              RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                               ) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 任务队列未达到队列容量时,最大可以同时运行的线程数量。
  • maximumPoolSize : 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数 :

  • keepAliveTime:当线程池中的线程数量大于 corePoolSize ,即有非核心线程(线程池中核心线程以外的线程)时,这些非核心线程空闲后不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁。
  • unit : keepAliveTime 参数的时间单位。
  • threadFactory :executor 创建新线程的时候会用到。
  • handler :拒绝策略(后面会单独详细介绍一下)。

下面这张图可以加深你对线程池中各个参数的相互关系的理解(图片来源:《Java 性能调优实战》):

线程池的核心线程会被回收吗?

ThreadPoolExecutor 默认不会回收核心线程,即使它们已经空闲了。这是为了减少创建线程的开销,因为核心线程通常是要长期保持活跃的。但是,如果线程池是被用于周期性使用的场景,且频率不高(周期之间有明显的空闲时间),可以考虑将 allowCoreThreadTimeOut(boolean value) 方法的参数设置为 true,这样就会回收空闲(时间间隔由 keepAliveTime 指定)的核心线程了。

public void allowCoreThreadTimeOut(boolean value) {
    // 核心线程的 keepAliveTime 必须大于 0 才能启用超时机制
    if (value && keepAliveTime <= 0) {
        throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
    }
    // 设置 allowCoreThreadTimeOut 的值
    if (value != allowCoreThreadTimeOut) {
        allowCoreThreadTimeOut = value;
        // 如果启用了超时机制,清理所有空闲的线程,包括核心线程
        if (value) {
            interruptIdleWorkers();
        }
    }
}

核心线程空闲时处于什么状态?

核心线程空闲时,其状态分为以下两种情况:

  • 设置了核心线程的存活时间 :核心线程在空闲时,会处于 WAITING 状态,等待获取任务。如果阻塞等待的时间超过了核心线程存活时间,则该线程会退出工作,将该线程从线程池的工作线程集合中移除,线程状态变为 TERMINATED 状态。
  • 没有设置核心线程的存活时间 :核心线程在空闲时,会一直处于 WAITING 状态,等待获取任务,核心线程会一直存活在线程池中。

当队列中有可用任务时,会唤醒被阻塞的线程,线程的状态会由 WAITING 状态变为 RUNNABLE 状态,之后去执行对应任务。

⭐️线程池的拒绝策略有哪些?

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行者自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果你的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy:此策略将丢弃最早的未处理的任务请求。

举个例子:Spring 通过 ThreadPoolTaskExecutor 或者我们直接通过 ThreadPoolExecutor 的构造函数创建线程池的时候,当我们不指定 RejectedExecutionHandler 拒绝策略来配置线程池的时候,默认使用的是 AbortPolicy。在这种拒绝策略下,如果队列满了,ThreadPoolExecutor 将抛出 RejectedExecutionException 异常来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。如果不想丢弃任务的话,可以使用CallerRunsPolicyCallerRunsPolicy 和其他的几个策略不同,它既不会抛弃任务,也不会抛出异常,而是将任务回退给调用者,使用调用者的线程来执行任务。

public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                // 直接主线程执行,而不是线程池中的线程执行
                r.run();
            }
        }
    }

如果不允许丢弃任务,应该选择哪个拒绝策略?

根据上面对线程池拒绝策略的介绍,相信大家很容易能够得出答案是:CallerRunsPolicy

这里我们再来结合CallerRunsPolicy 的源码来看看:

public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }


        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //只要当前程序没有关闭,就用执行execute方法的线程执行该任务
            if (!e.isShutdown()) {

                r.run();
            }
        }
    }

CallerRunsPolicy 拒绝策略有什么风险?如何解决?

我们上面也提到了:如果想要保证任何一个任务请求都要被执行的话,那选择 CallerRunsPolicy 拒绝策略更合适一些。

不过,如果走到CallerRunsPolicy的任务是个非常耗时的任务,且处理提交任务的线程是主线程,可能会导致主线程阻塞,影响程序的正常运行。

这里简单举一个例子,该线程池限定了最大线程数为 2,阻塞队列大小为 1(这意味着第 4 个任务就会走到拒绝策略),ThreadUtil为 Hutool 提供的工具类:

public class ThreadPoolTest {

    private static final Logger log = LoggerFactory.getLogger(ThreadPoolTest.class);

    public static void main(String[] args) {
        // 创建一个线程池,核心线程数为1,最大线程数为2
        // 当线程数大于核心线程数时,多余的空闲线程存活的最长时间为60秒,
        // 任务队列为容量为1的ArrayBlockingQueue,饱和策略为CallerRunsPolicy。
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
                2,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // 提交第一个任务,由核心线程执行
        threadPoolExecutor.execute(() -> {
            log.info("核心线程执行第一个任务");
            ThreadUtil.sleep(1, TimeUnit.MINUTES);
        });

        // 提交第二个任务,由于核心线程被占用,任务将进入队列等待
        threadPoolExecutor.execute(() -> {
            log.info("非核心线程处理入队的第二个任务");
            ThreadUtil.sleep(1, TimeUnit.MINUTES);
        });

        // 提交第三个任务,由于核心线程被占用且队列已满,创建非核心线程处理
        threadPoolExecutor.execute(() -> {
            log.info("非核心线程处理第三个任务");
            ThreadUtil.sleep(1, TimeUnit.MINUTES);
        });

        // 提交第四个任务,由于核心线程和非核心线程都被占用,队列也满了,根据CallerRunsPolicy策略,任务将由提交任务的线程(即主线程)来执行
        threadPoolExecutor.execute(() -> {
            log.info("主线程处理第四个任务");
            ThreadUtil.sleep(2, TimeUnit.MINUTES);
        });

        // 提交第五个任务,主线程被第四个任务卡住,该任务必须等到主线程执行完才能提交
        threadPoolExecutor.execute(() -> {
            log.info("核心线程执行第五个任务");
        });

        // 关闭线程池
        threadPoolExecutor.shutdown();
    }
}
18:19:48.203 INFO  [pool-1-thread-1] c.j.concurrent.ThreadPoolTest - 核心线程执行第一个任务
18:19:48.203 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理第三个任务
18:19:48.203 INFO  [main] c.j.concurrent.ThreadPoolTest - 主线程处理第四个任务
18:20:48.212 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 非核心线程处理入队的第二个任务
18:21:48.219 INFO  [pool-1-thread-2] c.j.concurrent.ThreadPoolTest - 核心线程执行第五个任务

从输出结果可以看出,因为CallerRunsPolicy这个拒绝策略,导致耗时的任务用了主线程执行,导致线程池阻塞,进而导致后续任务无法及时执行,严重的情况下很可能导致 OOM。

我们从问题的本质入手,调用者采用CallerRunsPolicy是希望所有的任务都能够被执行,暂时无法处理的任务又被保存在阻塞队列BlockingQueue中。这样的话,在内存允许的情况下,我们可以增加阻塞队列BlockingQueue的大小并调整堆内存以容纳更多的任务,确保任务能够被准确执行。

为了充分利用 CPU,我们还可以调整线程池的maximumPoolSize (最大线程数)参数,这样可以提高任务处理速度,避免累计在 BlockingQueue的任务过多导致内存用完。

整个实现逻辑还是比较简单的,核心在于自定义拒绝策略和阻塞队列。如此一来,一旦我们的线程池中线程达到满载时,我们就可以通过拒绝策略将最新任务持久化到 MySQL 数据库中,等到线程池有了有余力处理所有任务时,让其优先处理数据库中的任务以避免”饥饿”问题。

当然,对于这个问题,我们也可以参考其他主流框架的做法,以 Netty 为例,它的拒绝策略则是直接创建一个线程池以外的线程处理这些任务,为了保证任务的实时处理,这种做法可能需要良好的硬件设备且临时创建的线程无法做到准确的监控:

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
    NewThreadRunsPolicy() {
        super();
    }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            //创建一个临时线程处理任务
            final Thread t = new Thread(r, "Temporary task executor");
            t.start();
        } catch (Throwable e) {
            throw new RejectedExecutionException(
                    "Failed to start a new thread", e);
        }
    }
}

线程池常用的阻塞队列有哪些?

新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

不同的线程池会选用不同的阻塞队列,我们可以结合内置线程池来分析。

  • 容量为 Integer.MAX_VALUELinkedBlockingQueue(有界阻塞队列):FixedThreadPoolSingleThreadExecutorFixedThreadPool最多只能创建核心线程数的线程(核心线程数和最大线程数相等),SingleThreadExecutor只能创建一个线程(核心线程数和最大线程数都是 1),二者的任务队列永远不会被放满。
  • SynchronousQueue(同步队列):CachedThreadPoolSynchronousQueue 没有容量,不存储元素,目的是保证对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务。也就是说,CachedThreadPool 的最大线程数是 Integer.MAX_VALUE ,可以理解为线程数是可以无限扩展的,可能会创建大量线程,从而导致 OOM。
  • DelayedWorkQueue(延迟队列):ScheduledThreadPoolSingleThreadScheduledExecutorDelayedWorkQueue 的内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构,可以保证每次出队的任务都是当前队列中执行时间最靠前的。DelayedWorkQueue 添加元素满了之后会自动扩容,增加原来容量的 50%,即永远不会阻塞,最大扩容可达 Integer.MAX_VALUE,所以最多只能创建核心线程数的线程。
    ArrayBlockingQueue(有界阻塞队列):底层由数组实现,容量一旦创建,就不能修改。

⭐️线程池处理任务的流程了解吗?

如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,拒绝策略会调用RejectedExecutionHandler.rejectedExecution()方法。

⭐️线程池中线程异常后,销毁还是复用?

直接说结论,需要分两种情况:

  • 使用execute()提交任务:当任务通过execute()提交到线程池并在执行过程中抛出异常时,如果这个异常没有在任务内被捕获,那么该异常会导致当前线程终止,并且异常会被打印到控制台或日志文件中。线程池会检测到这种线程终止,并创建一个新线程来替换它,从而保持配置的线程数不变。
  • 使用submit()提交任务:对于通过submit()提交的任务,如果在任务执行中发生异常,这个异常不会直接打印出来。相反,异常会被封装在由submit()返回的Future对象中。当调用Future.get()方法时,可以捕获到一个ExecutionException。在这种情况下,线程不会因为异常而终止,它会继续存在于线程池中,准备执行后续的任务。

简单来说:使用execute()时,未捕获异常导致线程终止,线程池创建新线程替代;使用submit()时,异常被封装在Future中,线程继续复用。

这种设计允许submit()提供更灵活的错误处理机制,因为它允许调用者决定如何处理异常,而execute()则适用于那些不需要关注执行结果的场景。

⭐️如何给线程池命名?

初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。

默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。

给线程池里的线程命名通常有下面两种方式:

1、利用 guava 的 ThreadFactoryBuilder

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);

2、自己实现 ThreadFactory

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final String name;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(String name) {
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }
}

⭐️如何动态修改线程池的参数?

美团技术团队在《Java 线程池实现原理及其在美团业务中的实践》这篇文章中介绍到对线程池参数实现可自定义配置的思路和方法。

美团技术团队的思路是主要对线程池的核心参数实现自定义可配置。这三个核心参数是:

  • corePoolSize : 核心线程数线程数定义了最小可以同时运行的线程数量。
  • maximumPoolSize : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

为什么是这三个参数?

我在Java 线程池详解 这篇文章中就说过这三个参数是 ThreadPoolExecutor 最重要的参数,它们基本决定了线程池对于任务的处理策略。

如何支持参数动态配置? 且看 ThreadPoolExecutor 提供的下面这些方法。

JDK允许线程池使用方通过ThreadPoolExecutor的实例来动态设置线程池的核心策略,以setCorePoolSize为方法例,在运行期线程池使用方调用此方法设置corePoolSize之后,线程池会直接覆盖原来的corePoolSize值,并且基于当前值和原始值的比较结果采取不同的处理策略。对于当前值小于当前工作线程数的情况,说明有多余的worker线程,此时会向当前idle的worker线程发起中断请求以实现回收,多余的worker在下次idel的时候也会被回收;对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的worker线程来执行队列任务,setCorePoolSize具体流程如下:

我们只需要维护ThreadPoolExecutor的实例,并且在需要修改的时候拿到实例修改其参数即可。基于以上的思路,我们实现了线程池参数的动态化、线程池参数在管理平台可配置可修改,其效果图如下图所示:

Future

Future 类有什么用?

Future 类是异步思想的典型运用,主要用在一些需要执行耗时任务的场景,避免程序一直原地等待耗时任务执行完成,执行效率太低。具体来说是这样的:当我们执行某一耗时的任务时,可以将这个耗时任务交给一个子线程去异步执行,同时我们可以干点其他事情,不用傻傻等待耗时任务执行完成。等我们的事情干完后,我们再通过 Future 类获取到耗时任务的执行结果。这样一来,程序的执行效率就明显提高了。

这其实就是多线程中经典的 Future 模式,你可以将其看作是一种设计模式,核心思想是异步调用,主要用在多线程领域,并非 Java 语言独有。

在 Java 中,Future 类只是一个泛型接口,位于 java.util.concurrent 包下,其中定义了 5 个方法,主要包括下面这 4 个功能:

  • 取消任务;
  • 判断任务是否被取消;
  • 判断任务是否已经执行完成;
  • 获取任务执行结果
// V 代表了Future执行的任务返回值的类型
public interface Future<V> {
    // 取消任务执行
    // 成功取消返回 true,否则返回 false
    boolean cancel(boolean mayInterruptIfRunning);
    // 判断任务是否被取消
    boolean isCancelled();
    // 判断任务是否已经执行完成
    boolean isDone();
    // 获取任务执行结果
    V get() throws InterruptedException, ExecutionException;
    // 指定时间内没有返回计算结果就抛出 TimeOutException 异常
    V get(long timeout, TimeUnit unit)

        throws InterruptedException, ExecutionException, TimeoutExceptio

}

简单理解就是:我有一个任务,提交给了 Future 来处理。任务执行期间我自己可以去做任何想做的事情。并且,在这期间我还可以取消任务以及获取任务的执行状态。一段时间之后,我就可以 Future 那里直接取出任务执行结果。

Callable 和 Future 有什么关系?

我们可以通过 FutureTask 来理解 CallableFuture 之间的关系。

FutureTask 提供了 Future 接口的基本实现,常用来封装 CallableRunnable,具有取消任务、查看任务是否执行完成以及获取任务执行结果的方法。ExecutorService.submit() 方法返回的其实就是 Future 的实现类 FutureTask

<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);

FutureTask 不光实现了 Future接口,还实现了Runnable 接口,因此可以作为任务直接被线程执行。

FutureTask 有两个构造函数,可传入 Callable 或者 Runnable 对象。实际上,传入 Runnable 对象也会在方法内部转换为Callable 对象。

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
    // 通过适配器RunnableAdapter来将Runnable对象runnable转换成Callable对象
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;
}

FutureTask相当于对Callable 进行了封装,管理着任务执行的情况,存储了 Callable 的 call 方法的任务执行结果。

CompletableFuture 类有什么用?

Future 在实际使用过程中存在一些局限性,比如不支持异步任务的编排组合、获取计算结果的 get() 方法为阻塞调用。

Java 8 才被引入CompletableFuture 类可以解决Future 的这些缺陷。CompletableFuture 除了提供了更为好用和强大的 Future 特性之外,还提供了函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力。

下面我们来简单看看 CompletableFuture 类的定义。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}

可以看到,CompletableFuture 同时实现了 Future 和 CompletionStage 接口。

CompletionStage 接口描述了一个异步计算的阶段。很多计算可以分成多个阶段或步骤,此时可以通过它将所有步骤组合起来,形成异步计算的流水线。

CompletionStage 接口中的方法比较多,CompletableFuture 的函数式能力就是这个接口赋予的。从这个接口的方法参数你就可以发现其大量使用了 Java8 引入的函数式编程。


0 条评论

发表回复

Avatar placeholder

您的邮箱地址不会被公开。 必填项已用 * 标注