Fork/Join<译>

November 18, 2022 作者: yijianhao 分类: jdk文档翻译 浏览: 183 评论: 0

The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.

fork/join 框架是 ExecutorService 接口的实现,可以帮助你利用多个多处理器,它专为可以递归分解为更小部分的工作而设计。目标是使用所有可用的处理能力来增强应用程序的性能。

As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.

与其他ExecutorService的实现一样,fork/join框架分发任务到线程池中的工作线程中。不同的是,fork/join框架使用了work-stealing(任务窃取)算法,没有事情要做的工作线程可以从仍在繁忙的其他线程中窃取任务。

The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.

fork/join框架的核心是ForkJoinPool类,它是AbstractExecutorService类的扩展,ForkJoinPool实现了核心的工作窃取算法,可以执行ForkJoinTask进程。

Basic Use

The first step for using the fork/join framework is to write code that performs a segment of the work. Your code should look similar to the following pseudocode:

使用fork/join框架的第一步就是编写执行一段工作的代码,你的代码应该看起来类似于下面的伪代码:

if (my portion of the work is small enough)
  do the work directly
else
  split my work into two pieces
  invoke the two pieces and wait for the results

Wrap this code in a ForkJoinTask subclass, typically using one of its more specialized types, either RecursiveTask (which can return a result) or RecursiveAction.

将此代码包装在 ForkJoinTask 子类中,通常使用其更专业的类型之一,即递归任务(可以返回结果)或递归操作。

After your ForkJoinTask subclass is ready, create the object that represents all the work to be done and pass it to the invoke() method of a ForkJoinPool instance.

在 ForkJoinTask 子类准备就绪后,创建表示要完成的所有工作的对象,并将其传递给 ForkJoinPool 实例的 invoke() 方法。

Blurring for Clarity

To help you understand how the fork/join framework works, consider the following example. Suppose that you want to blur an image. The original source image is represented by an array of integers, where each integer contains the color values for a single pixel. The blurred destination image is also represented by an integer array with the same size as the source.

为了帮助你理解 fork/join框架如何工作,思考接下来的例子。假设你想把一张图变模糊,原始源图像由整数数组表示,其中每个整数包含单个像素的颜色值。模糊目标图像也由与源大小相同的整数数组表示。

Performing the blur is accomplished by working through the source array one pixel at a time. Each pixel is averaged with its surrounding pixels (the red, green, and blue components are averaged), and the result is placed in the destination array. Since an image is a large array, this process can take a long time. You can take advantage of concurrent processing on multiprocessor systems by implementing the algorithm using the fork/join framework. Here is one possible implementation:

执行模糊是通过一次处理一个像素源阵列来完成的。每个像素与其周围的像素进行平均(对红色、绿色和蓝色分量进行平均),并将结果放置在目标数组中。由于图像是一个大型数组,因此此过程可能需要很长时间。你可以使用fork/join框架实现上面的模糊算法来利用多处理器系统的并发处理。下面是一个可能的实现:

public class ForkBlur extends RecursiveAction {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;
  
    // Processing window size; should be odd.
    private int mBlurWidth = 15;
  
    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    protected void computeDirectly() {
        int sidePixels = (mBlurWidth - 1) / 2;
        for (int index = mStart; index < mStart + mLength; index++) {
            // Calculate average.
            float rt = 0, gt = 0, bt = 0;
            for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                int mindex = Math.min(Math.max(mi + index, 0),
                                    mSource.length - 1);
                int pixel = mSource[mindex];
                rt += (float)((pixel & 0x00ff0000) >> 16)
                      / mBlurWidth;
                gt += (float)((pixel & 0x0000ff00) >>  8)
                      / mBlurWidth;
                bt += (float)((pixel & 0x000000ff) >>  0)
                      / mBlurWidth;
            }
          
            // Reassemble destination pixel.
            int dpixel = (0xff000000     ) |
                   (((int)rt) << 16) |
                   (((int)gt) <<  8) |
                   (((int)bt) <<  0);
            mDestination[index] = dpixel;
        }
    }
  
  ...

Now you implement the abstract compute() method, which either performs the blur directly or splits it into two smaller tasks. A simple array length threshold helps determine whether the work is performed or split.

现在你实现这个抽象compute()方法,它直接执行模糊处理或将其分为两个较小的任务。

protected static int sThreshold = 100000;

protected void compute() {
    if (mLength < sThreshold) {
        computeDirectly();
        return;
    }
    
    int split = mLength / 2;
    
    invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
              new ForkBlur(mSource, mStart + split, mLength - split,
                           mDestination));
}

If the previous methods are in a subclass of the RecursiveAction class, then setting up the task to run in a ForkJoinPool is straightforward, and involves the following steps:

如果前面的方法位于 RecursiveAction 类的子类中,则设置要在 ForkJoinPool 中运行的任务非常简单,涉及以下步骤:

  1. Create a task that represents all of the work to be done.
    // source image pixels are in src
    // destination image pixels are in dst
    ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
  2. Create the ForkJoinPool that will run the task.
    ForkJoinPool pool = new ForkJoinPool();
  3. Run the task.
    pool.invoke(fb);

==

  1. 创建一个表示要完成的所有工作的任务
    // 源图像像素在src中
    // 目标图像像素在dst中
    ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
  2. 创建ForkJoinPool来运行将要运行的任务
    ForkJoinPool pool = new ForkJoinPool();
  3. 运行任务
    pool.invoke(fb);

For the full source code, including some extra code that creates the destination image file, see the ForkBlur example.

全部的代码,包括额外创建目标图像的图像文件,查看ForkBlur示例

Standard Implementations

Besides using the fork/join framework to implement custom algorithms for tasks to be performed concurrently on a multiprocessor system (such as the ForkBlur.java example in the previous section), there are some generally useful features in Java SE which are already implemented using the fork/join framework. One such implementation, introduced in Java SE 8, is used by the java.util.Arrays class for its parallelSort() methods. These methods are similar to sort(), but leverage concurrency via the fork/join framework. Parallel sorting of large arrays is faster than sequential sorting when run on multiprocessor systems. However, how exactly the fork/join framework is leveraged by these methods is outside the scope of the Java Tutorials. For this information, see the Java API documentation.

除了使用 fork/join框架是为了使用多处理器系统的并发能力去执行自定义算法之外(例如上一个章节的ForkBlur.java例子),Java SE 中有一些通常有用的功能,这些功能已经使用 fork/join 框架实现。Java SE 8 中引入的一个这样的实现被 java.util.Arrays 类用于其 parallelSort() 方法。这些方法类似于 sort(),但通过 fork/join 框架利用并发性。在多处理器系统上运行时,对大型阵列进行排序,parallelSort比顺序排序更快。但是,这些方法究竟如何利用fork/join框架,超出了java教程的范围。有关此信息,请参阅 Java API 文档。

Another implementation of the fork/join framework is used by methods in the java.util.streams package, which is part of Project Lambda scheduled for the Java SE 8 release. For more information, see the Lambda Expressions section.

fork/join 框架的另一个实现被 java.util.streams 包中的方法使用,该包是 Java SE 8 版本中计划的 Lambda 项目的一部分。有关更多信息,请参阅 Lambda 表达式部分。


评论