java线程池(八):ForkJoinPool源码分析之四(ForkJoinWorkerThread源码)

it2025-12-27  6

文章目录

1.类结构及其成员变量1.1 类结构和注释1.2 常量 2.构造函数2.1 ForkJoinWorkerThread(ForkJoinPool pool)2.2 ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc) 3.重要的方法3.1 run3.2 定义的可扩展方法 4.内部类InnocuousForkJoinWorkerThread5. ForkJoinPool中创建工作线程的过程5.1 makeCommonPool创建过程5.2 createWorker创建过程5.3 InnocuousForkJoinWorkerThreadFactory5.4 DefaultForkJoinWorkerThreadFactory 6.总结

1.类结构及其成员变量

1.1 类结构和注释

类结构代码如下:

public class ForkJoinWorkerThread extends Thread { }

ForkJoinWorkerThread继承了Thread类,其注释大意如下:

ForkJoinWorkerThread是由ForkJoinPool管理的线程,该线程执行ForkJoinTask。此类仅可做为扩展功能的需要而被集成,因为没有提供可以调度或者可重新的方法。但是,你可以覆盖主任务处理循环周围初始化和终止方法。如果确实创建了这个类的子类,还需要在ForkJoinPool中提供自定义的ForkJoinWorkerThreadFactory来使用。

1.2 常量

主要有两个:

final ForkJoinPool pool; // the pool this thread works in final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

注释大意: ForkJoinWorkerThreads由ForkJoinPools管理,并执行ForkJoinTasks。请参见ForkJoinPool的内部文档。此类仅仅维护了指向pool和WorkQueue的链接。pool字段在构造的时候直接设置。但是直到对registerWorker调用完成之后,才设置workQueue字段。这将导致可见性竞争,可以通过要求workQueue字段仅由其所属线程访问来规避这个问题。对于InnocuousForkJoinWorkerThread子类的支持,要求我们在此处和子类中破坏很多封装,通过Unsafe以访问和设置Thread字段。 这是两个final修饰的常量,智能初始化一次。

2.构造函数

2.1 ForkJoinWorkerThread(ForkJoinPool pool)

/** * Creates a ForkJoinWorkerThread operating in the given pool. * * @param pool the pool this thread works in * @throws NullPointerException if pool is null */ protected ForkJoinWorkerThread(ForkJoinPool pool) { // Use a placeholder until a useful name can be set in registerWorker super("aForkJoinWorkerThread"); this.pool = pool; this.workQueue = pool.registerWorker(this); }

其构造函数主要是创建一个在给定pool中的ForkJoinWorkerThread。 构造函数中最主要的方法就是registerWorker。

2.2 ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc)

ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc) { super(threadGroup, null, "aForkJoinWorkerThread"); U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc); eraseThreadLocals(); // clear before registering this.pool = pool; this.workQueue = pool.registerWorker(this); }

这个方法需要注意的是,采用unSafe方法,在这个类的INHERITEDACCESSCONTROLCONTEXT位置处,设置传入的AccessControlContext对象。 之后调用方法eraseThreadLocals将threadLocals清除。 之后与同用的构造函数一致。 擦除ThreadLocal也是采用UnSafe来完成。通过putObject将ThreadLocals的位置设置为null。

/** * Erases ThreadLocals by nulling out Thread maps. */ final void eraseThreadLocals() { U.putObject(this, THREADLOCALS, null); U.putObject(this, INHERITABLETHREADLOCALS, null); }

实际上这个方法将会提供给InnocuousForkJoinWorkerThread继承的时候使用。

3.重要的方法

3.1 run

做为Thread,最重要的就是run方法,我们来看看ForkJoinWorkerThread的实现:

public void run() { //如果workQueue为空,则抛出异常 if (workQueue.array == null) { // only run once Throwable exception = null; try { //调用onStart方法 onStart(); //调用pool的runWorker方法,运行workQueue pool.runWorker(workQueue); } catch (Throwable ex) { exception = ex; } finally { //操作完之后处理 try { //调用onTermination方法 onTermination(exception); } catch (Throwable ex) { //如果出现异常,则进行异常处理 if (exception == null) exception = ex; } finally { //最终需要指向deregister方法 pool.deregisterWorker(this, exception); } } } }

runWorker方法实际上是对workQueue结合随机魔数,选择一个workQueue进行遍历,调用scan方法,如果不为空则执行,反之则wait。 其中registerWorker与deregisterWorker方法 ,我们可以参考前面的ForkJoinPool源码解读。

3.2 定义的可扩展方法

由于ForkJoinWorkerThread还支持继承扩展,因此在此定义了两个扩展的方法:

/** * Initializes internal state after construction but before * processing any tasks. If you override this method, you must * invoke {@code super.onStart()} at the beginning of the method. * Initialization requires care: Most fields must have legal * default values, to ensure that attempted accesses from other * threads work correctly even before this thread starts * processing tasks. */ protected void onStart() { } /** * Performs cleanup associated with termination of this worker * thread. If you override this method, you must invoke * {@code super.onTermination} at the end of the overridden method. * * @param exception the exception causing this thread to abort due * to an unrecoverable error, or {@code null} if completed normally */ protected void onTermination(Throwable exception) { }

这两个方法用于执行之前和之后,onStart用于run实际执行之前,执行一些初始化操作。onTermination用于run实际执行之后,执行一些清理操作。

4.内部类InnocuousForkJoinWorkerThread

这个类就是继承了ForkJoinWorkerThread的一个实现类。此类定义了一个没有任何权限、也非用户定义的任何线程组的线程。这个线程在运行完每个top的task之后,会擦除所有的ThreadLocals。

源码如下:

static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread { /** The ThreadGroup for all InnocuousForkJoinWorkerThreads */ private static final ThreadGroup innocuousThreadGroup = createThreadGroup(); /** An AccessControlContext supporting no privileges */ private static final AccessControlContext INNOCUOUS_ACC = new AccessControlContext( new ProtectionDomain[] { new ProtectionDomain(null, null) }); InnocuousForkJoinWorkerThread(ForkJoinPool pool) { super(pool, innocuousThreadGroup, INNOCUOUS_ACC); } @Override // to erase ThreadLocals void afterTopLevelExec() { eraseThreadLocals(); } @Override // to always report system loader public ClassLoader getContextClassLoader() { return ClassLoader.getSystemClassLoader(); } @Override // to silently fail public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { } @Override // paranoically public void setContextClassLoader(ClassLoader cl) { throw new SecurityException("setContextClassLoader"); } /** * Returns a new group with the system ThreadGroup (the * topmost, parent-less group) as parent. Uses Unsafe to * traverse Thread.group and ThreadGroup.parent fields. */ private static ThreadGroup createThreadGroup() { try { sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe(); Class<?> tk = Thread.class; Class<?> gk = ThreadGroup.class; long tg = u.objectFieldOffset(tk.getDeclaredField("group")); long gp = u.objectFieldOffset(gk.getDeclaredField("parent")); ThreadGroup group = (ThreadGroup) u.getObject(Thread.currentThread(), tg); while (group != null) { ThreadGroup parent = (ThreadGroup)u.getObject(group, gp); if (parent == null) return new ThreadGroup(group, "InnocuousForkJoinWorkerThreadGroup"); group = parent; } } catch (Exception e) { throw new Error(e); } // fall through if null as cannot-happen safeguard throw new Error("Cannot create ThreadGroup"); } }

AccessControlContext INNOCUOUS_ACC 定义了一个不支持任何特权访问的AccessControlContext。这个类会创建一个单独的threadGroup,以确保其不属于任何一个用户创建的ThreadGroup。

5. ForkJoinPool中创建工作线程的过程

此时再来结合ForkJoinPool中的ForkJoinWorkerThreadFactory,就能明白ForkJoinThread的创建意义了。ForkJoinPool根据访问权限的需要,定义了采用默认的创建方法,还是创建InnocuousForkJoinWorkerThread。

5.1 makeCommonPool创建过程

再ForkJoinPool重的makeCommPool,有如下代码:

if (factory == null) { if (System.getSecurityManager() == null) factory = defaultForkJoinWorkerThreadFactory; else // use security-managed default factory = new InnocuousForkJoinWorkerThreadFactory(); }

这里也就是说,如果System.getSecurityManager()为null,则返回默认的ThreadFactory,而不为null,则说, 使用了默认的安全管理级别,因此将创建InnocuousForkJoinWorkerThreadFactory。

5.2 createWorker创建过程

ForkJoinWorkerThreadFactory fac = factory; try { if (fac != null && (wt = fac.newThread(this)) != null) { wt.start(); return true; } } catch (Throwable rex) { ex = rex; }

也就是说,createWorker根据ForkJoinWorkerThreadFactory的实现类来创建。

5.3 InnocuousForkJoinWorkerThreadFactory

static final class InnocuousForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { /** * An ACC to restrict permissions for the factory itself. * The constructed workers have no permissions set. */ private static final AccessControlContext innocuousAcc; static { Permissions innocuousPerms = new Permissions(); innocuousPerms.add(modifyThreadPermission); innocuousPerms.add(new RuntimePermission( "enableContextClassLoaderOverride")); innocuousPerms.add(new RuntimePermission( "modifyThreadGroup")); innocuousAcc = new AccessControlContext(new ProtectionDomain[] { new ProtectionDomain(null, innocuousPerms) }); } public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread) java.security.AccessController.doPrivileged( new java.security.PrivilegedAction<ForkJoinWorkerThread>() { public ForkJoinWorkerThread run() { return new ForkJoinWorkerThread. InnocuousForkJoinWorkerThread(pool); }}, innocuousAcc); } }

5.4 DefaultForkJoinWorkerThreadFactory

static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } }

6.总结

ForkJoinWorkerThread实际上非常简单,就是结合ForkJoinPool,然后根据其需要,创建合适的线程的过程。这里面值得我们借鉴的是,如果需要创建无其他访问权限的线程,实际上这两种线程大部分内容都是相同的,因此可以通过继承来复用大部分代码。之后定义两个factory,让最终的用户根据需要选择factory。

最新回复(0)