• 聊一聊JDK 21引入的新型并发编程模型
  • 发布于 2个月前
  • 249 热度
    0 评论
如果你仍然认为之前的JDK 17没有太多改变,那么JDK 21需要引起你的注意。因为JDK 21引入了一种新型的并发编程模型。目前在Java中的多线程并发编程是我们头痛的另一部分。感觉学起来很困难,使用起来也很复杂。但是回头看看使用其他语言的朋友,他们根本没有这种麻烦,比如GoLang,使用起来非常顺畅。

JDK 21在这个领域取得了巨大的改进,使Java并发编程变得更加简单和顺畅。更准确地说,这些改进在JDK 19或JDK 20中已经存在。

其中,虚拟线程、作用域值和结构化并发是多线程并发编程的一些功能。

一. 虚拟线程
虚拟线程是基于协程的线程,类似于其他语言中的协程,但也有一些区别。虚拟线程附加在主线程上。如果主线程被销毁,虚拟线程将不再存在。

相似之处:
•虚拟线程和协程都很轻量级,它们的创建和销毁开销比传统操作系统线程要小。
•虚拟线程和协程都可以通过挂起和恢复来在线程之间切换,从而避免了线程上下文切换的开销。
•虚拟线程和协程都可以以异步和非阻塞的方式处理任务,提高了应用程序的性能和响应能力。

不同之处
•虚拟线程是在JVM级别实现的,而协程是在语言级别实现的。因此,虚拟线程的实现可以与支持JVM的任何语言一起使用,而协程的实现需要特定的编程语言支持。
•虚拟线程是协程的基于线程的实现,因此它们可以使用与线程相关的API,如ThreadLocal、Lock和Semaphore。协程不依赖于线程,通常需要特定的异步编程框架和API。
•虚拟线程的调度由JVM管理,而协程的调度由编程语言或异步编程框架管理。因此,虚拟线程可以更好地与其他线程合作,而协程更适合处理异步任务。

总的来说,虚拟线程是一种新的线程类型,可以提高应用程序的性能和资源利用率,同时还可以使用传统的与线程相关的API。虚拟线程与协程有许多相似之处,但也存在一些不同之处。

虚拟线程确实可以使多线程编程更加简单和高效。与传统的操作系统线程相比,创建和销毁虚拟线程的开销更小,线程上下文切换的开销也更小,因此可以大大减少多线程编程中的资源消耗和性能瓶颈。

使用虚拟线程,开发人员可以像编写传统线程代码一样编写代码,而不必担心线程的数量和调度,因为JVM会自动管理虚拟线程的数量和调度。此外,虚拟线程还支持传统的与线程相关的API,如ThreadLocal、Lock和Semaphore,这使得开发人员更容易将传统线程代码迁移到虚拟线程中。虚拟线程的引入使多线程编程更加高效、简单和安全,允许开发人员更多关注业务逻辑,而不必过多关注底层线程管理。

二. 结构化并发
结构化并发是一种旨在通过提供结构化且易于遵循的方法来简化并发编程的编程范例。使用结构化并发,开发人员可以创建更容易理解和调试、不容易出现竞态条件和其他与并发相关的错误的并发代码。在结构化并发中,所有并发代码都被结构化为称为任务的明确定义的工作单元。任务以结构化的方式创建、执行和完成,任务的执行始终保证在其父任务完成之前完成。

结构化并发可以使多线程编程更加简单和可靠。在传统的多线程编程中,线程的启动、执行和终止都是由开发人员手动管理的,因此容易出现线程泄漏、死锁和不正确的异常处理等问题。

使用结构化并发,开发人员可以更自然地组织并发任务,使任务之间的依赖关系更清晰,代码逻辑更简洁。结构化并发还提供了一些异常处理机制,以更好地管理并发任务中的异常,避免由异常引起的程序崩溃或数据不一致。

此外,结构化并发还可以通过限制并发任务的数量和优先级来防止资源

竞争和饥饿现象。这些特性使得开发人员能够更容易地实现高效且可靠的并发程序,而不必过多关注底层线程管理。

三. 作用域值
作用域值是JDK 20中的一项功能,允许开发人员创建仅限于特定线程或任务的作用域值。作用域值类似于线程本地变量,但设计用于与虚拟线程和结构化并发一起使用。它们允许开发人员以结构化的方式在不同部分的应用程序之间传递上下文信息,例如用户身份验证或请求特定数据。

四. 实践
在继续以下探索之前,您需要至少下载JDK 19或直接下载JDK 20。截止到2023年9月,JDK 20是官方发布的最高版本。如果使用JDK 19,您将无法体验到Scoped Values功能。

或者直接下载JDK 21的早期访问版本。

如果您使用的是IDEA,则您的IDEA版本必须至少为2022.3或更高版本,否则不支持这样的新JDK版本。如果您使用的是JDK 19或JDK 20,您应该在项目设置中将语言级别设置为19或20。否则,在编译时会提示您无法使用预览版本功能。虚拟线程是预览版本的功能。

如果您使用的是JDK 21,请将语言级别设置为X - 实验性功能。此外,由于JDK 21不是官方版本,您需要进入IDEA设置(请注意,这是IDEA设置,而不是项目设置),并手动将项目的目标字节码版本更改为21。当前,最高选项为20,即JDK 20。将其设置为21后,您可以在JDK 21中使用这些功能。

4.1 虚拟线程
现在我们如何启动线程?
首先,声明一个线程类,实现Runnable接口,并实现run方法。
public class SimpleThread implements Runnable {
    // 堆代码 duidaima.com
    @Override
    public void run() {
        System.out.println("name:" + Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
然后,您可以使用这个线程类并启动线程。
Thread thread = new Thread(new SimpleThread());
thread.start();
拥有虚拟线程后,如何实现呢?
Thread.ofPlatform().name("thread-test").start(new SimpleThread());
以下是使用虚拟线程的几种方式。
1. 直接启动虚拟线程
Thread thread = Thread.startVirtualThread(new SimpleThread());
2. 使用ofVirtual(),构建器模式启动虚拟线程,您可以设置线程名称、优先级、异常处理和其他配置
Thread.ofVirtual()
      .name("thread-test")
      .start(new SimpleThread());
或者
Thread thread = Thread.ofVirtual()
      .name("thread-test")
      .uncaughtExceptionHandler((t, e) -> {
          System.out.println(t.getName() + e.getMessage());
      })
      .unstarted(new SimpleThread());
thread.start();
3. 使用工厂创建线程
ThreadFactory factory = Thread.ofVirtual().factory();
Thread thread = factory.newThread(new SimpleThread());
thread.setName("thread-test");
thread.start();
4. 使用Executors
ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor();
Future<?> submit = executorService.submit(new SimpleThread());
Object o = submit.get();
4.2 结构化并发
想象以下情景。假设您有三个任务需要同时执行。只要任何一个任务完成并返回结果,就可以直接使用该结果,可以停止其他两个任务。例如,一个天气服务通过三个渠道获取天气情况,只要一个渠道返回即可。

在这种情况下,在Java 8下应该做什么,当然也是可以的。
List<Future<?>> futures = executor.invokeAll(tasks);
String result = executor.invokeAny(tasks);
使用ExecutorService的invokeAll和invokeAny方法实现,但会有一些额外的工作。在获取第一个结果后,您需要手动关闭另一个线程。在JDK 21中,可以使用结构化编程来实现。ShutdownOnSuccess捕获第一个结果并关闭任务范围以中断未完成的线程并唤醒调用线程。一种情况是任何子任务的结果都可以直接使用,而无需等待其他未完成任务的结果。

它定义了获取第一个结果或在所有子任务失败时抛出异常的方法。
public static void main(String[] args) throws IOException {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) {
        Future<String> res1 = scope.fork(() -> runTask(1));
        Future<String> res2 = scope.fork(() -> runTask(2));
        Future<String> res3 = scope.fork(() -> runTask(3));
        scope.join();
        System.out.println("scope:" + scope.result());
    } catch (ExecutionException | InterruptedException e) {
        throw new RuntimeException(e);
    }
}

public static String runTask(int i) throws InterruptedException {
    Thread.sleep(

1000);
    long l = new Random().nextLong();
    String s = String.valueOf(l);
    System.out.println(i + "task:" + s);
    return s;
}
ShutdownOnFailure
执行多个任务,只要有一个失败(发生异常或引发其他活动异常),就停止其他未完成的任务,并使用scope.throwIfFailed来捕获并抛出异常。如果所有任务都正常,可以使用Feture.get()或*Feture.resultNow()来获取结果。
public static void main(String[] args) throws IOException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Future<String> res1 = scope.fork(() -> runTaskWithException(1));
        Future<String> res2 = scope.fork(() -> runTaskWithException(2));
        Future<String> res3 = scope.fork(() -> runTaskWithException(3));
        scope.join();
        scope.throwIfFailed(Exception::new);

        String s = res1.resultNow();
        System.out.println(s);

        String result = Stream.of(res1, res2, res3)
                             .map(Future::resultNow)
                             .collect(Collectors.joining());
        System.out.println("result:" + result);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public static String runTaskWithException(int i) throws InterruptedException {
    Thread.sleep(1000);
    long l = new Random().nextLong(3);
    if (l == 0) {
        throw new InterruptedException();
    }
    String s = String.valueOf(l);
    System.out.println(i + "task:" + s);
    return s;
}
4.3 作用域值
我们一定使用过ThreadLocal,它是线程本地变量,只要线程不销毁,就可以随时获取ThreadLocal中的变量值。作用域值也可以在线程内的任何时候获取变量,但它有一个作用域的概念,当超出作用域时将被销毁。
public class ScopedValueExample {
    final static ScopedValue<String> LoginUser = ScopedValue.newInstance();

    public static void main(String[] args) throws InterruptedException {
        ScopedValue.where(LoginUser, "Tom")
                   .run(() -> {
                       new Service().login();
                   });

        Thread.sleep(2000);
    }

    static class Service {
        void login() {
            System.out.println("user:" + LoginUser.get());
        }
    }
}
上面的示例模拟了用户登录过程,使用ScopedValue.newInstance()声明了一个ScopedValue,使用ScopedValue.where为ScopedValue设置了一个值,并使用run方法执行接下来要做的事情,以便在run()内部随时获取ScopedValue。在run方法中模拟了service的登录方法,不需要传递参数LoginUser,直接通过LoginUser.get方法可以直接获取当前登录用户的值。

最后
感谢阅读,希望这篇文章对你有所收获!
用户评论