Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

使用parallelStream和自定义ForkJoinPool线程池,每次都是获取到第一次的值 #391

Closed
ghost opened this issue Jul 6, 2022 · 1 comment
Assignees
Labels
duplicate ❓question Further information is requested

Comments

@ghost
Copy link

ghost commented Jul 6, 2022

public class Main
{
    public static ThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();
    public static ExecutorService executorService = TtlExecutors.getTtlExecutorService(new ForkJoinPool());
    public static void main(String[] args) throws ExecutionException, InterruptedException
    {
        for (int i = 0; i < 4; i++)
        {
            threadLocal.set(i+"");
            List<String> strings = Arrays.asList("a", "b", "c");
            executorService.submit(() -> strings.parallelStream().forEach(j ->
            {
                System.out.println("线程名称: " + Thread.currentThread().getName() + ", 流中的值: " + j + ", 线程中传递的值: " + threadLocal.get());
            })).get();
            threadLocal.remove();
            System.out.println();
        }
    }
}

每次执行结果都是

线程名称: ForkJoinPool-1-worker-9, 流中的值: b, 线程中传递的值: 0
线程名称: ForkJoinPool-1-worker-2, 流中的值: a, 线程中传递的值: 0
线程名称: ForkJoinPool-1-worker-11, 流中的值: c, 线程中传递的值: 0

线程名称: ForkJoinPool-1-worker-9, 流中的值: b, 线程中传递的值: 1
线程名称: ForkJoinPool-1-worker-4, 流中的值: c, 线程中传递的值: 0
线程名称: ForkJoinPool-1-worker-11, 流中的值: a, 线程中传递的值: 0

线程名称: ForkJoinPool-1-worker-9, 流中的值: b, 线程中传递的值: 2
线程名称: ForkJoinPool-1-worker-11, 流中的值: a, 线程中传递的值: 0
线程名称: ForkJoinPool-1-worker-4, 流中的值: c, 线程中传递的值: 0

线程名称: ForkJoinPool-1-worker-9, 流中的值: b, 线程中传递的值: 3
线程名称: ForkJoinPool-1-worker-4, 流中的值: a, 线程中传递的值: 0
线程名称: ForkJoinPool-1-worker-11, 流中的值: c, 线程中传递的值: 0


我觉的理想的结果  应该是

线程名称: ForkJoinPool-1-worker-9, 流中的值: b, 线程中传递的值: 0
线程名称: ForkJoinPool-1-worker-2, 流中的值: a, 线程中传递的值: 0
线程名称: ForkJoinPool-1-worker-11, 流中的值: c, 线程中传递的值: 0

线程名称: ForkJoinPool-1-worker-9, 流中的值: b, 线程中传递的值: 1
线程名称: ForkJoinPool-1-worker-4, 流中的值: c, 线程中传递的值: 1
线程名称: ForkJoinPool-1-worker-11, 流中的值: a, 线程中传递的值: 1

线程名称: ForkJoinPool-1-worker-9, 流中的值: b, 线程中传递的值: 2
线程名称: ForkJoinPool-1-worker-11, 流中的值: a, 线程中传递的值: 2
线程名称: ForkJoinPool-1-worker-4, 流中的值: c, 线程中传递的值: 2

线程名称: ForkJoinPool-1-worker-9, 流中的值: b, 线程中传递的值: 3
线程名称: ForkJoinPool-1-worker-4, 流中的值: a, 线程中传递的值: 3
线程名称: ForkJoinPool-1-worker-11, 流中的值: c, 线程中传递的值: 3

第一次线程中的值是正确的, 第二次开始就会出现只有第一个值是正确的, 其他的仍然是第一次的0,
不知道是我哪里写错了, 请大佬指教

@oldratlee
Copy link
Member

oldratlee commented Jul 6, 2022

原因

strings.parallelStream()下层 使用的是 缺省的ForkJoinPool实例(不能被定制),
缺少了处理逻辑 以支持TransmittableThreadLocal。 @dongshuke

关于parallel stream,用户文档中有相关的说明:2.3 使用Java Agent来修饰JDK线程池实现类

image

已有的 issue:

相关的 issue:

解法

  • parallelStream增加处理,如使用wrapConsumer。(见下面的示例代码)
  • 或是 使用TTL Agent
    通过TTL Agent修改像ThreadPoolExecutorForkJoinPool执行器相关类的实现,这样就不用在业务代码中关心这些执行器类的处理了。
import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlWrappers;
import com.alibaba.ttl.threadpool.TtlExecutors;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;

public class Main {
    public static final ThreadLocal<String> context = new TransmittableThreadLocal<>();
    public static final ExecutorService executorService = TtlExecutors.getTtlExecutorService(new ForkJoinPool());

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 4; i++) {
            final int round = i;
            List<String> strings = Arrays.asList("a", "b", "c");

            context.set(i + "");

            executorService.submit(() -> {
                        show("in submit", round);

                        Consumer<String> consumer = TtlWrappers.wrapConsumer(s -> show("in stream " + round, s));
                        strings.parallelStream().forEach(consumer);
                    }
            ).get();
        }
    }

    private static void show(String title, Object value) {
        System.out.printf("[%s] %-15s - value: %s, thread local value: %s%n",
                Thread.currentThread().getName(), title, value, context.get());
    }
}

/*

An output:

[ForkJoinPool-1-worker-1] in submit       - value: 0, thread local value: 0
[ForkJoinPool-1-worker-3] in stream 0     - value: c, thread local value: 0
[ForkJoinPool-1-worker-2] in stream 0     - value: a, thread local value: 0
[ForkJoinPool-1-worker-1] in stream 0     - value: b, thread local value: 0
[ForkJoinPool-1-worker-1] in submit       - value: 1, thread local value: 1
[ForkJoinPool-1-worker-2] in stream 1     - value: a, thread local value: 1
[ForkJoinPool-1-worker-3] in stream 1     - value: c, thread local value: 1
[ForkJoinPool-1-worker-1] in stream 1     - value: b, thread local value: 1
[ForkJoinPool-1-worker-1] in submit       - value: 2, thread local value: 2
[ForkJoinPool-1-worker-1] in stream 2     - value: b, thread local value: 2
[ForkJoinPool-1-worker-2] in stream 2     - value: c, thread local value: 2
[ForkJoinPool-1-worker-3] in stream 2     - value: a, thread local value: 2
[ForkJoinPool-1-worker-1] in submit       - value: 3, thread local value: 3
[ForkJoinPool-1-worker-1] in stream 3     - value: b, thread local value: 3
[ForkJoinPool-1-worker-4] in stream 3     - value: c, thread local value: 3
[ForkJoinPool-1-worker-3] in stream 3     - value: a, thread local value: 3

 */

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
duplicate ❓question Further information is requested
Projects
None yet
Development

No branches or pull requests

1 participant