线程池中的共享

我们在多线程中,很少会直接 new 一个线程,更多的可能是利用线程池处理任务,那么利用 InheritableThreadLocal 可以将生成任务线程的上下文传递给执行任务的线程吗?废话不多说,直接上代码测试一下:

public class InheritableThreadLocalContext {

    private static InheritableThreadLocal<Context> context = new InheritableThreadLocal<>();

    static class Context {

        String name;

        int value;
    }

    public static void main(String[] args) {
        // 固定线程池
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        for (int i = 1; i <= 10; i++) {
            int finalI = i;
            new Thread(
                    () -> {
                        // 生成任务的线程对context进行赋值
                        Context contextMain = new Context();
                        contextMain.name = String.format("Thread%s name", finalI);
                        contextMain.value = finalI * 20;
                        InheritableThreadLocalContext.context.set(contextMain);
                        // 提交任务
                        for (int j = 1; j <= 10; j++) {
                            System.out.println("Thread" + finalI + " produce task " + (finalI * 20 + j));
                            executorService.execute(() -> {
                                // 执行任务的子线程
                                Context contextChild = InheritableThreadLocalContext.context.get();
                                System.out.println(Thread.currentThread().getName() + " execute task, name : " + contextChild.name + " value : " + contextChild.value);
                            });
                        }

                    }
            ).start();
        }
    }
} 

我们希望的结果是,子线程输出的内容能够和父线程对应上。然而,实际的结果却出乎所料,我将结果整理一下:

Thread1 produce task 21
// 省略8行
Thread1 produce task 30

Thread2 produce task 41
// 省略8行
Thread2 produce task 50
pool-1-thread-1 execute task, name : Thread2 name value : 40
// 省略47行
pool-1-thread-1 execute task, name : Thread2 name value : 40

Thread3 produce task 61
// 省略8行
Thread3 produce task 70

Thread4 produce task 81
// 省略8行
Thread4 produce task 90

Thread5 produce task 101
// 省略8行
Thread5 produce task 110

Thread6 produce task 121
// 省略8行
Thread6 produce task 130

Thread7 produce task 141
// 省略8行
Thread7 produce task 150
pool-1-thread-2 execute task, name : Thread7 name value : 140
// 省略6行
pool-1-thread-2 execute task, name : Thread7 name value : 140

Thread8 produce task 161
// 省略8行
Thread8 produce task 170

Thread9 produce task 181
// 省略8行
Thread9 produce task 190
pool-1-thread-4 execute task, name : Thread9 name value : 180
pool-1-thread-4 execute task, name : Thread9 name value : 180

Thread10 produce task 201
// 省略8行
Thread10 produce task 210
pool-1-thread-3 execute task, name : Thread10 name value : 200
// 省略39行
pool-1-thread-3 execute task, name : Thread10 name value : 200 

虽然生产总数和消费总数都是100,但是明显有的消费多了,有的消费少了。合理推测一下,应该是在主线程放进任务后,子线程才生成。为了验证这个猜想,将线程池用 ThreadPoolExecutor 生成,并在用子线程生成任务之前,先赋值 context 并开启所有线程:

    public static void main(String[] args) {
        // 固定线程池
        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
                4,
                4,
                0L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>() );
        // 在main线程中赋值
        Context context = new Context();
        context.name = "Thread0 name";
        context.value = 0;
        InheritableThreadLocalContext.context.set(context);
        // 开启所有线程
        executorService.prestartAllCoreThreads();

        for (int i = 1; i <= 10; i++) {
            int finalI = i;
            new Thread(
                    () -> {
                        // 生成任务的线程对context进行赋值
                        Context contextMain = new Context();
                        contextMain.name = String.format("Thread%s name", finalI);
                        contextMain.value = finalI * 20;
                        InheritableThreadLocalContext.context.set(contextMain);
                        // 提交任务
                        for (int j = 1; j <= 10; j++) {
                            System.out.println("Thread" + finalI + " produce task " + (finalI * 20 + j));
                            executorService.execute(() -> {
                                // 执行任务的子线程
                                Context contextChild = InheritableThreadLocalContext.context.get();
                                System.out.println(Thread.currentThread().getName() + " execute task, name : " + contextChild.name + " value : " + contextChild.value);
                            });
                        }

                    }
            ).start();
        }
    } 

结果不出所料,执行任务的线程输出的,都是最外面主线程设置的值。

那么我们该如何才能达到最初想要的效果呢?就是利用线程池执行任务时,如何能够让执行者线程能够获取调用者线程的 context 呢?

使用 TransmittableThreadLocal 解决

上面的问题主要是因为执行任务的线程是被线程池管理,可以被复用(可以称为池化复用)。那复用了之后,如果还是依赖于父线程的 context,自然是有问题的,因为我们想要的效果是执行线程获取调用线程的 context,这时候就是TransmittableThreadLocal出场了。

TransmittableThreadLocal 是阿里提供的工具类,其主要解决的就是上面遇到的问题。那么该如何使用呢?

首先,你需要引入相应的依赖:

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.11.0</version>
</dependency>复制代码

具体代码,就拿上文提到的情况,我们用 TransmittableThreadLocal 做一个改造:

public class TransmittableThreadLocalTest {
    private static TransmittableThreadLocal<Context> context = new TransmittableThreadLocal<>();

    static class Context {

        String name;

        int value;
    }

    public static void main(String[] args) {
        // 固定线程池
        ExecutorService executorService = Executors.newFixedThreadPool(4);

        for (int i = 1; i <= 10; i++) {
            int finalI = i;
            new Thread(
                    () -> {
                        // 生成任务的线程对context进行赋值
                        Context contextMain = new Context();
                        contextMain.name = String.format("Thread%s name", finalI);
                        contextMain.value = finalI * 20;
                        TransmittableThreadLocalTest.context.set(contextMain);
                        // 提交任务
                        for (int j = 1; j <= 10; j++) {
                            System.out.println("Thread" + finalI + " produce task " + (finalI * 20 + j));
                            Runnable task = () -> {
                                // 执行任务的子线程
                                Context contextChild = TransmittableThreadLocalTest.context.get();
                                System.out.println(Thread.currentThread().getName() + " execute task, name : " + contextChild.name + " value : " + contextChild.value);
                            };
                            // 额外的处理,生成修饰了的对象ttlRunnable
                            Runnable ttlRunnable = TtlRunnable.get(task);
                            executorService.execute(ttlRunnable);
                        }

                    }
            ).start();
        }
    }
} 

此时再次运行,就会发现执行线程运行时的输出内容是完全可以和调用线程对应上的了。当然了,我这种方式是修改了 Runnable 的写法,阿里也提供了线程池的写法,简单如下:

    public static void main(String[] args) {
        // 固定线程池
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        // 额外的处理,生成修饰了的对象executorService
        executorService = TtlExecutors.getTtlExecutorService(executorService);
        ExecutorService finalExecutorService = executorService;

        for (int i = 1; i <= 10; i++) {
            int finalI = i;
            new Thread(
                    () -> {
                        // 生成任务的线程对context进行赋值
                        Context contextMain = new Context();
                        contextMain.name = String.format("Thread%s name", finalI);
                        contextMain.value = finalI * 20;
                        TransmittableThreadLocalTest.context.set(contextMain);
                        // 提交任务
                        for (int j = 1; j <= 10; j++) {
                            System.out.println("Thread" + finalI + " produce task " + (finalI * 20 + j));
                            Runnable task = () -> {
                                // 执行任务的子线程
                                Context contextChild = TransmittableThreadLocalTest.context.get();
                                System.out.println(Thread.currentThread().getName() + " execute task, name : " + contextChild.name + " value : " + contextChild.value);
                            };
                            finalExecutorService.execute(task);
                        }

                    }
            ).start();
        }