Friday, December 16, 2011

Using a ThreadPoolExecutor to Parallelize Independent Single-Threaded Tasks

The task execution framework, introduced in Java SE 5.0, is a giant leap forward to simplify the design and the development of multi threaded applications. The framework provides facilities to manage the concept of task, to manage thread life cycles and their execution policy.

In this blog post we'll describe the power, the flexibility and the simplicity of this framework showing off a simple use case.

The Basics

The executor framework introduces an interface to manage task execution: Executor. Executor is the interface you use to submit tasks, represented as Runnable instances. This interface also isolates a task submission from a task execution: executors with different execution policies all publish the same submission interface: should you change your execution policy, your submission logic wouldn't be affected by the change.

If you want to submit a Runnable instance for execution, it's as simple as:

Executor exec = …;
exec.execute(runnable);

Thread Pools

As outlined in the previous section, how the executor is going to execute your runnable isn't specified by the Executor contract: it depends on the specific type of executor you're using. The framework provides some different types of executors, each one with a specific execution policy tailored for different use cases.

The most common type of executors you'll be dealing with are thread pool executors., which are instances of the ThreadPoolExecutor class (and its subclasses). Thread pool executors manage a thread pool, that is the pool of worker threads that's going to execute the tasks, and a work queue.

You surely have seen the concept of pool in other technologies. The primary advantage of using a pool is reducing the overhead of resources creation, reusing structures (in this case, threads) that have been released after use. Another implicit advantage of using a pool is the capability of sizing your resource usage: you can tune the thread pool sizes to achieve the load you desire, without jeopardizing system resources.

The framework provides a factory class for thread pools called Executors. Using this factory you'll be able to create thread pools of different characteristics. Often, the underlying implementation is often the same (ThreadPoolExecutor) but the factory class helps you quickly configure a thread pool without using its more complex constructor. The factory methods are:
  • newFixedThreadPool: this method returns a thread pool whose maximum size is fixed. It will create new threads as needed up to the maximum configured size. When the number of threads hits the maximum, the thread pool will maintain the size constant.
  • newCachedThreadPool: this method returns an unbounded thread pool, that is a thread pool without a maximum size. However, this kind of thread pool will tear down unused thread when the load reduces.
  • newSingleThreadedExecutor: this method returns an executor that guarantees that tasks will be executed in a single thread.
  • newScheduledThreadPool: this method returns a fixed size thread pool that supports delayed and timed task execution.


This is just the beginning. Executors also provide other facilities that are out of scope in this tutorial and that I strongly encourage you to study about:
  • Life cycle management methods, declared by the ExecutorService interface (such as shutdown() and awaitTermination()).
  • Completion services to poll for a task status and retrieve its return value, if applicable.

The ExecutorService interface is particularly important since it provides a way to shutdown a thread pool, which is something you almost surely want to be able to do cleanly. Fortunately, the ExecutorService interface is pretty simple and self-explanatory and I recommend you study its JavaDoc thoroughly.

Basically, you send a shutdown() message to an ExecutorService, after which it won't accept new submitted tasks, but will continue processing the already enqueued jobs. You can pool for an executor service's termination status with isTerminated(), or wait until termination using the awaitTermination(…) method. The awaitTermination method won't wait forever, though: you'll have to pass the maximum wait timeout as a parameter.

Warning: a source of errors and confusion is a understanding why a JVM process never exits. If you don't shutdown your executor services, thus tearing down the underlying threads, the JVM will never exit: a JVM exits when its last non-daemon thread exits.

Configuring a ThreadPoolExecutor

If you decide to create a ThreadPoolExecutor manually instead of using the Executors factory class, you will need to create and configure one using one of its constructors. The most extensive constructor of this class is:

public ThreadPoolExecutor(
    int corePoolSize,
    int maxPoolSize,
    long keepAlive,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    RejectedExecutionHandler handler);

As you can see, you can configure:
  • The core pool size (the size the thread pool will try to stick with).
  • The maximum pool size.
  • The keep alive time, which is a time after which an idle thread is eligible for being torn down.
  • The work queue to hold tasks awaiting execution.
  • The policy to apply when a task submission is rejected.

Limiting the Number of Queued Tasks

Limiting the number of concurrent tasks being executing, sizing your thread pool, represents a huge benefit for your application and its execution environment in terms of predictability and stability: an unbounded thread creation will eventually exhaust the runtime resources and your application might experience as a consequence, serious performance problems that may lead even to application instability.

That's a solution to just one part of the problem: you're capping the number of tasks being executed but aren't capping the number of jobs that can be submitted and enqueued for later execution. The application will experience resource shortage later, but it will eventually experience it if the submission rate consistently outgrows the execution rate.

The solution to this problem is:
  • Providing a blocking queue to the executor to hold the awaiting tasks. In the case the queue fills up, the submitted task will be "rejected".
  • The RejectedExecutionHandler is invoked when a task submission is rejected, and that's why the verb rejected was quoted in the previous item. You can implement your own rejection policy or use one of the built-in policies provided by the framework.

The default rejection policies has the executor throw a RejectedExecutionException. However, other built-in policies let you:
  • Discard a job silently.
  • Discard the oldest job and try to resubmit the last one.
  • Execute the rejected task on the caller's thread.

When and why would one use such a thread pool configuration? Let's see an example.

An Example: Parallelizing Independent Single-Threaded Tasks

Recently, I was called to solve a problem with an old job my client was running since a long time ago. Basically, the job is made up of a component that awaits for file system events on a set of directory hierarchies. Whenever an event is fired, a file must be processed. The file processing is performed by a proprietary single threaded process. Truth be said, by its own nature, even if I could, I don't if I could parallelize it. The arrival rate of events is very high throughout part of the day and there's no need to process file in real time, they just to get processed before the next day.

The current implementation was a mix and match of technologies, including a UNIX shell script that was responsible for scanning huge directory hierarchies to detect where changes were applied. When that implementation was put in place, the number of cores in the execution environment were two, as much. Also, the rate of events was pretty lower: nowadays they're in the order of the millions, for a total of between 1 and 2 terabytes of raw data to be processed.

The servers the client is running these processes nowadays are twelve core machines: a huge opportunity to parallelize those old single-threaded tasks. We've got basically all of the ingredients for the recipe, we just need to decide how to build and tune it. Some thoughts before writing any code were necessary to understand the nature of the load and these are the constraints I detected:

  • A really huge number of files is to be scanned periodically: each directory contains between one and two millions of files.
  • The scanning algorithm is very quick and can be parallelized.
  • Processing a file will take at least 1 second, with spikes of even 2 or 3 seconds.
  • When processing a file, there is no other bottleneck than CPU.
  • CPU usage must be tunable, in order to use a different load profile depending on the time of the day.

I'll thus need a thread pool whose size is determined by the load profile active at the moment of invoking the process. I'm inclined to create, then, a fixed size thread pool executor configured according to the load policy. Since a processing thread is only CPU-bound, its core usage is 100% and waits on no other resources, the load policy is very easy to calculate: just take the number of core available in the processing environment and scale it down using the load factor that's active at that moment (and check that at least one core is used in the moment of peak):

int cpus = Runtime.getRuntime().availableProcessors();
int maxThreads = cpus * scaleFactor;
maxThreads = (maxThreads > 0 ? maxThreads : 1);

Then, I need to create a ThreadPoolExecutor using a blocking queue to bound the number of submitted tasks. Why? Well: the directory scanning algorithms are very quick and will generate a huge number of files to process very quickly. How huge? It's hard to predict and its variability is pretty high. I'm not going to let the internal queue of my executor fill up indiscriminately with the objects representing my tasks (which include a pretty huge file descriptor). I'll prefer let the executor reject the files when the queue fills up.

Also, I'll use the ThreadPoolExecutor.CallerRunsPolicy as rejection policy. Why? Well, because when the queue is filled up and while the threads in the pools are busy processing the file, I'll have the thread that is submitting the task executing it. This way, the scanning stops to process a file and will resume scanning as soon as it finishes executing the current task.

Here's the code that creates the executor:


ExecutorService executorService =
  new ThreadPoolExecutor(
    maxThreads, // core thread pool size
    maxThreads, // maximum thread pool size
    1, // time to wait before resizing pool
    TimeUnit.MINUTES, 
    new ArrayBlockingQueue<Runnable>(maxThreads, true),
    new ThreadPoolExecutor.CallerRunsPolicy());


The skeleton of the code is the following (greatly simplified):


// scanning loop: fake scanning
while (!dirsToProcess.isEmpty()) {
  File currentDir = dirsToProcess.pop();


  // listing children
  File[] children = currentDir.listFiles();


  // processing children
  for (final File currentFile : children) {
  // if it's a directory, defer processing
  if (currentFile.isDirectory()) {
    dirsToProcess.add(currentFile);
    continue;
  }


  executorService.submit(new Runnable() {
    @Override
    public void run() {
      try {
        // if it's a file, process it
        new ConvertTask(currentFile).perform();
      } catch (Exception ex) {
        // error management logic
      }
    }
  });
}


// ...
        
// wait for all of the executor threads to finish
executorService.shutdown();
        
try {
  if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
    // pool didn't terminate after the first try
    executorService.shutdownNow();
  }


  if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
    // pool didn't terminate after the second try
  }
} catch (InterruptedException ex) {
  executorService.shutdownNow();
  Thread.currentThread().interrupt();
}

Conclusion

As you can see, the Java concurrency API is very easy to use, very flexible and extremely powerful. Some years ago, I would have taken much more effort to write such a simple program. This way, I could quickly solve a scalability problem caused by a legacy single threaded component in a matter of hours.

4 comments:

Joaquin said...

just two Question,

When you define maxThreads = 20 ( for example) and we submit 30 process(runnable), the 10 process have to wait to 10 process to end? or there is any way to process the new ones y any of the 20 oldest process are not active now.....

Which is a good number for scaleFactor

Grey said...

Hi Joaquin.

In an executor with a bounded queue like this, when a job is submitted when all of the threads are busy, the job is "rejected". What happens when a job is rejected depends on the policy: in this case, the job isn't rejected at all, is just executed in the main thread, as indicated by the caller-runs policy.

Why a bounded queue? Well, because the scanning algorithm spits out millions of files in a minutes while, on the other hand, each file requires more than a second to be processed. I don't want memory to be filled up with data that I don't really need (the references to the jobs to be executed in the future): I prefer to keep threads occupied processing files and scanning the directories only when idle.

However, you can choose a bigger queue size and store, let's stay, 100 * maxThreads jobs in memory (that depends on parameters such as your own workload, the size of the objects you're storing, etc.).

Take into account that queued jobs are executed *as soon as a thread is available*: in the example you state in your question, you don't have to wait for 10 processes to finish. As soon as a threads finishes is jobs and is relinquished to the pool, the first queued job will start (in your case, job 21) and so on. The thread pool is very efficient.

If this example, the blocking queue that I'm passing to the executor has got the same size of the thread pool. It's strange, perhaps, but I decided so only because the scanning algorithm is *really* fast. You should probably try to tune it to a higher value.

The scaleFactor depends on what you can afford. Remember that load generated to any thread can be distributed to any processor (or core) you have, it doesn't "stick" to one. If you want to generate a maximum total 70% CPU load on your box, and each thread is CPU bounded, then you can use .7 as a scale factor. If each thread spends half of its time waiting, then 1 would be your scale factor (.7 / .5 = 1.4, and you will use all of your processors). You should analyze your workload, but if you're threads are not "waiting" on any kind of resources (locks, sockets, etc.), you can assume they're busy all the time.

Hope this helps,
Grey

pradipta pradhan said...

Hi! Grey,
Hope you are doing fine.
Suppose i write a code something like this.
Executor threadPool = new ThreadPoolExecutor(10,10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(50));

Question 1.
Now if i get 50 threads in process and all my 10 threads in pool is busy will rest 40 threads will be rejected ?

Question 2.
if question 1 is not the case and it queues 50 threads and keeps on consuming as worker threads are free what will happen to 51st thread if all my 10 worker threads are busy executing . will it be rejected ?

Enrico Maria Crisostomo said...

Hi.

No, the point of this example was using a specific "rejection policy" to get the rejected task be executed in the thread submitting jobs. If a task is submitted and there aren't free threads, it's then executed by the same thread submitting it, effectively delaying further submissions until the task execution finishes.

In this case, maybe you want a bigger queue to absorb some peaks, which is fine. But if the submission rate keeps on being higher than the "processing rate", then your queue will eventually fill up, and you'll always have 10 threads executing tasks. Whether this is fine, it's up to your decision, according to the resources you can allocate to this process.

Hope this helps,
-- Enrico