@Service public class ShoppingCartService { private ThreadLocal<ShoppingCart> cartHolder = new ThreadLocal<>(); public ShoppingCart getCurrentCart() { ShoppingCart cart = cartHolder.get(); if (cart == null) { cart = new ShoppingCart(); cartHolder.set(cart); } return cart; } public void checkout() { // 获取当前购物车 ShoppingCart cart = getCurrentCart(); // 执行结账操作 // 清除当前线程中购物车的信息,防止内存泄露 cartHolder.remove(); } } // 堆代码 duidaima.com // 购物车类 class ShoppingCart { private List<Product> products = new ArrayList<>(); public void addProduct(Product product) { products.add(product); } public List<Product> getProducts() { return products; } }这段代码中,ShoppingCartService 是一个 Spring Bean,用来管理购物车信息。在这个 Bean 里,使用了 ThreadLocal<ShoppingCart> 来保存每个线程的购物车信息。getCurrentCart 方法首先会从 ThreadLocal 中获取购物车信息,如果当前线程没有对应的购物车信息,那么就创建一个新的购物车,并保存到 ThreadLocal 中。checkout 方法用来执行结账操作,结账完成后,需要通过cartHolder.remove();清除当前线程中的购物车信息,以防止内存泄露。这样,即使在多线程环境下,每个线程都有自己独立的购物车信息,互不影响。这就是 ThreadLocal 在解决 Spring Bean 线程安全问题上的一个应用场景。
@Aspect @Component public class UserConsistencyAspect { // 每个UserVo启用线程隔离,在进入切面后开始创建,在业务逻辑中用完就被GC回收 private static final ThreadLocal<UserVo> userHolder = new ThreadLocal<>(); @Pointcut("@annotation(org.nozomi.common.annotation.GetUser)") public void userAuthPoint() {} @Around("userAuthPoint()") public Object injectUserFromRequest(ProceedingJoinPoint joinPoint) throws Throwable { Authentication authentication = SecurityContextHolder.getContext().getAuthentication(); UserVo operator = (UserVo) authentication.getPrincipal(); if (operator == null) { return Response.fail("用户不存在"); } userHolder.set(operator); return joinPoint.proceed(); } /** * 取出当前线程中的UserVo对象,这些UserVo是跟随http创建的线程而隔离的 * * @return 当前线程的UserVo */ public static UserVo getUser() { return userHolder.get(); } }在业务中使用这个切面以及UserConsistencyAspect .getUser()方法就可以获取到这个http session中的User对象了。
@Service public class ProductService { private final ThreadLocal<Session> sessionThreadLocal = new ThreadLocal<>(); public Product getProductById(String id) { Session session = getSession(); return session.get(Product.class, id); } public void updateProduct(Product product) { Session session = getSession(); session.update(product); } private Session getSession() { Session session = sessionThreadLocal.get(); if (session == null) { session = sessionFactory.openSession(); sessionThreadLocal.set(session); } return session; } public void closeSession() { Session session = sessionThreadLocal.get(); if (session != null) { session.close(); sessionThreadLocal.remove(); } } }
在很多情况下,开发者会使用Spring来管理数据库的会话或者事务,但是这样的Bean通常是线程不安全的,比如 Hibernate的SessionFactory或者MyBatis的SqlSessionFactory。这些工厂产生的Session是线程不安全的。在电商项目中,一个常见的场景是,可能会在一个请求处理的过程中需要多次和数据库进行交互。
这个时候,为了保证在一个请求中使用同一个数据库会话(Session),通常会把这个 Session 放在一个ThreadLocal中。这样,即使在一个线程中的不同方法里,也可以获取到同一个Session。在这个例子中,每个线程都有自己的Session实例,存储在ThreadLocal中。不同的线程调用getSession()方法时,都会从ThreadLocal中获取到属于自己的Session。但是事实上这些session的处理已经在mybatis或hibernate中都已经通过ThreadLocal处理好了不需要开发者再在业务中对session进行隔离。这里的例子主要是为了解释 ThreadLocal 是如何工作的,并不是实际开发中推荐的做法。
结构化并发编程式(Structured Concurrent)和虚拟线程(Virtual Threads)息息相关。要了解ScopedValue就必须先了解这两个概念,自JDK5以来一直保持着这样一种理念:我们不应该直接与线程交互。正确的模式是将任务作为Runnable或Callable提交给ExecutorService或Executor,然后对返回的Future进行操作。Loom项目中一直保留了这种模型,并添加了一些不错的功能。
这里要介绍的第一个对象是Scope对象, 确切的类型是StructuredTaskScope。我们可以把这个对象看做一个虚拟线程启动器,我们以Callable的形式向它提交任务,我们将得到一个future返回,并且这个callable将在由作用域Scope为我们创建的虚线程种执行。这很像Executor。但二者之间也有很大的区别。
public static Weather readWeather() throws Exception { // try-with-resource try(var scope = new StructuredTaskScope<Weather>()) { Future<Weather> future = scope.fork(Weather::readWeatherFrom); scope.join(); return future.resultNow(); } }StructuredTaskScope实例是AutoCloseable(自动关闭)的,我们可以使用try-with-resource模式。通过fork()方法fork一个Callable类型的任务,fork()方法返回一个Future对象,我们调用join()方法阻塞调用,它将阻塞当前线程,直到所有提交(frok)给StructuredTaskScope的任务都完成。最后调用Future的resultNow()获取结果并返回。resultNow()将抛出异常,如果我们在Future完成前调用它,所以我们要在join()方法中调用并将其返回。
3.ThreadLocal变量可以被继承。如果一个子线程从父线程中继承ThreadLocal变量,那么该子线程需要独立存储父线程中的全部ThreadLocal变量,这会产生比较大的内存开销。
module dioxide.cn.module { requires jdk.incubator.concurrent; }同时需要再启动参数VM Option中启用预览功能--enable-preview。下一步是指定ScopedValue对象的值和作用域,通过静态方法where来完成。where方法有 3 个参数:
3.Runnable或Callable对象,表示ScopedValue对象的作用域
public class Main { // 声明了一个静态的、最终的 ScopedValue<String> 实例 // ScopedValue 是一个支持在特定范围内(如任务或线程)中传递值的类 // 它的使用类似于 ThreadLocal,但更适合于结构化并发 private static final ScopedValue<String> VALUE = ScopedValue.newInstance(); public static void main(String[] args) throws Exception { System.out.println(Arrays.toString(stringScope())); } public static Object[] stringScope() throws Exception { return ScopedValue.where(VALUE, "value", () -> { // 使用 try-with-resource 来绑定结构化并发的作用域 // 用于自动管理资源的生命周期,这是一个结构化任务范围 // 在这个范围内创建的所有子任务都将被视为范围的一部分 // 如果范围中的任何任务失败,所有其他任务都将被取消 try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { // 使用了 scope.fork 来创建两个并行的任务 // 每个任务都在执行上下文中获取 VALUE 的值,并对其进行操作 Future<String> user = scope.fork(VALUE::get); Future<Integer> order = scope.fork(() -> VALUE.get().length()); // join() 方法等待所有范围内的任务完成 // throwIfFailed() 方法会检查所有任务的结果,如果任何任务失败,则会抛出异常 scope.join().throwIfFailed(); // 在所有任务完成后,使用 resultNow() 方法获取每个任务的结果,并将结果放入一个对象数组中 return new Object[]{user.resultNow(), order.resultNow()}; } }); } }这段代码展示了如何使用ScopedValue和结构化并发来创建并执行多个并行任务,并安全地传递和操作任务上下文中的值。
在开始ScopedValue的源码分析之前,先看一下Java doc的介绍:ScopedValue 是一个对象,它被设置一次后,在执行期间由一个线程有限期地读取。ScopedValue 允许在有限的执行期间内在不将数据作为方法参数传递的情况下安全、有效地共享数据。ScopedValue 定义了 where(ScopedValue, Object, Runnable) 方法,这个方法在一个线程执行 runnable 的 run 方法的有限执行期间内设置 ScopedValue 的值。
由 run 执行的方法展开执行定义了一个动态作用域。在动态作用域中执行时,作用域值是绑定的,当 run 方法完成时(正常或异常),它恢复到未绑定状态。在动态作用域中执行的代码使用 ScopedValue 的 get 方法来读取其值。与线程局部变量类似,作用域值有多个化身,每个线程一个。使用哪个化身取决于哪个线程调用其方法。ScopedValue 的一个典型用法是在 final 和 static 字段中声明。字段的可访问性将决定哪些组件可以绑定或读取其值。ScopedValue中有3个内部类,分别是Snapshot、Carrier、Cache,他们在ScopedValue中起着至关重要的角色。
graph TB A("ScopedValue.where(key, value, op)") A --> B("ScopedValue.Carrier.of(key, value)") B --> C("ScopedValue.Carrier.where(key, value, prev)") C --> D("返回ScopedValue.Carrier对象")作用域值旨在以结构化方式使用。如果op已经创建了一个StructuredTaskScope但没有关闭它,那么退出op会导致在动态范围内创建的每个StructuredTaskScope被关闭。这可能需要阻塞,直到所有子线程都完成了它们的子任务。关闭是按照创建它们的相反顺序完成的。
public static <T, R> R where(ScopedValue<T> key, T value, Callable<? extends R> op) throws Exception { return where(key, value).call(op); }这个方法会将前两个参数委派给Carrier.of(key, value);方法
/* * 返回由单个绑定组成的新集合 */ static <T> Carrier of(ScopedValue<T> key, T value) { return where(key, value, null); } /** * 向该map添加绑定,返回一个新的 Carrier 实例 */ private static final <T> Carrier where(ScopedValue<T> key, T value, Carrier prev) { return new Carrier(key, value, prev); }在Carrier类中where方法会返回一个新的Carrier对象,这是一种责任链的设计模式
graph TB D("ScopedValue.Carrier") D --> E("ScopedValue.Carrier.call(op)") E -->|分支1| F("ScopedValue.Cache.invalidate()") E -->|分支2| G("ScopedValue.Carrier.runWith(newSnapshot, op)") G --> H("ScopedValueContainer.call(op)") H --> I("ScopedValueContainer.callWithoutScope(op)") I --> J("Callable.call()")call方法调用链的方法中有很多细节是关于处理Snapshot和Cache的,这些内容可能在将来的Java版本中发生变化这里就不再赘述。