/** * Invokes the rejected execution handler for the given command. * Package-protected for use by ScheduledThreadPoolExecutor. */ final void reject(Runnable command) { handler.rejectedExecution(command, this); }很好,线程池底层设计中,将抛出拒绝策略方法的访问级别设置为 默认访问权限,并添加了 final 关键字,所以我们没法继承和直接调用reject方法
那怎么办,用代理!!!开始跟着我学习!
public class SupportThreadPoolExecutor extends ThreadPoolExecutor { /** * 堆代码 duidaima.com * 拒绝策略次数统计 */ private final AtomicInteger rejectCount = new AtomicInteger(); public SupportThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } /** * 设置 {@link SupportThreadPoolExecutor#rejectCount} 自增 */ public void incrementRejectCount() { rejectCount.incrementAndGet(); } /** * 获取拒绝次数 * * @return */ public int getRejectCount() { return rejectCount.get(); } }好了,然后呢,我有点懵,别急,慢慢捋一下,接下来是不是要用到上面的扩展的东西了?就是该使用自己扩展的线程池了!使用之前,是不是得自定义一个拒绝策略?是的,来直接扩展拒绝策略处理器。
public interface SupportRejectedExecutionHandler extends RejectedExecutionHandler { /** * 拒绝策略记录时, 执行某些操作 * * @param executor */ default void beforeReject(ThreadPoolExecutor executor) { if (executor instanceof SupportThreadPoolExecutor) { SupportThreadPoolExecutor supportExecutor = (SupportThreadPoolExecutor) executor; // 发起自增 supportExecutor.incrementRejectCount(); // 触发报警... System.out.println("线程池触发了任务拒绝..."); } } }很简答,在这个接口中我们新写了一个接口的方法的默认实现(jdk8新特性),在这个里面实现了上面我们的两个要求:次数自增、触发警报
public class SupportAbortPolicyRejected extends ThreadPoolExecutor.AbortPolicy implements SupportRejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { beforeReject(e); super.rejectedExecution(r, e); } }打工打工告成!
@SneakyThrows public static void main(String[] args) { SupportThreadPoolExecutor executor = new SupportThreadPoolExecutor( 1, 1, 1024, TimeUnit.SECONDS, new LinkedBlockingQueue(1), // 使用自定义拒绝策略 new SupportAbortPolicyRejected() ); // 测试流程 for (int i = 0; i < 3; i++) { try { // 无限睡眠, 以此触发拒绝策略.(此处有异常, 为了减少无用代码, 省略...) executor.execute(() -> Thread.sleep(Integer.MAX_VALUE)); } catch (Exception ignored) { } } Thread.sleep(50); System.out.println(String.format("线程池拒绝策略次数 :: %d", executor.getRejectCount())); } /** * 日志打印: * * 线程池触发了任务拒绝... * 线程池拒绝策略次数 :: 1 */
根据日至打印得知,我们的扩展需求完整的实现了。当线程池执行任务拒绝行为时,首先会调用 SupportRejectedExecutionHandler#beforeReject,然后才是执行真正的拒绝策略行为。
怎么解决,动态代理缓缓走来。
@AllArgsConstructor public class RejectedExecutionProxyInvocationHandler implements InvocationHandler { private RejectedExecutionHandler target; private SupportThreadPoolExecutor executor; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 执行拒绝策略前自增拒绝次数 & 发起报警 executor.incrementRejectCount(); System.out.println("线程池触发了任务拒绝..."); return method.invoke(target, args); } }创建一个接口和一个接口实现的类(这个类是用来被扩展的),这个用的是线程池的拒绝策略实例abortPolicy
ThreadPoolExecutor.AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();然后创建这两个类的实例,并在代码中测试:
@SneakyThrows public static void main(String[] args) { // 删除 SupportThreadPoolExecutor 构造方法中的拒绝策略 SupportThreadPoolExecutor executor = new SupportThreadPoolExecutor( 1, 1, 1024, TimeUnit.SECONDS, new LinkedBlockingQueue(1) ); ThreadPoolExecutor.AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy(); // 创建拒绝策略代理类 RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) Proxy.newProxyInstance( abortPolicy.getClass().getClassLoader(), abortPolicy.getClass().getInterfaces(), new RejectedExecutionProxyInvocationHandler(abortPolicy, executor) ); // 线程池 set 拒绝策略代理类 executor.setRejectedExecutionHandler(rejectedExecutionHandler); // 堆代码 duidaima.com // 测试流程 for (int i = 0; i < 3; i++) { try { // 无限睡眠, 以此触发拒绝策略.(此处有异常, 为了减少无用代码, 省略...) executor.execute(() -> Thread.sleep(Integer.MAX_VALUE)); } catch (Exception ex) { // ignore } } Thread.sleep(50); System.out.println(String.format("线程池拒绝策略次数 :: %d", executor.getRejectCount())); } /** * 日志打印: * * 线程池触发了任务拒绝... * 线程池拒绝策略次数 :: 1 */好的,完成,至此,已经成功了一半多了。
public class SupportThreadPoolExecutor extends ThreadPoolExecutor { // 省略代码... public SupportThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) Proxy.newProxyInstance( handler.getClass().getClassLoader(), handler.getClass().getInterfaces(), new RejectedExecutionProxyInvocationHandler(handler, this) ); setRejectedExecutionHandler(rejectedExecutionHandler); } // 省略代码... }完结撒花😜😜😜,总的来说不难,捋清楚了很简单,你搞清楚了吗?