Home Java javaTutorial Java thread pool supporting production blocking

Java thread pool supporting production blocking

Feb 07, 2017 pm 02:52 PM
java thread pool

Generally speaking, the speed of production tasks is greater than the speed of consumption. A detail is the queue length and how to match the speed of production and consumption.

A typical producer-consumer model is as follows:

Java thread pool supporting production blocking

Using the Queue implementation provided by J.U.C in a concurrent environment can easily ensure production and consumption. Thread safety in the process. What needs to be noted here is that the Queue must set the initial capacity to prevent the producer from producing too quickly, causing the queue length to skyrocket, and eventually triggering OutOfMemory.


For the general situation where production is faster than consumption. When the queue is full, we do not want any tasks to be ignored or not executed. At this time, the producer can wait for a while before submitting the task. A better approach is to block the producer in the method of submitting the task, and wait for the task to be submitted. Continue to submit tasks when the queue is not full, so there is no wasted idle time. Blocking is also very easy. BlockingQueue is built for this. Both ArrayBlockingQueue and LinkedBlockingQueue can provide capacity limits when constructing. LinkedBlockingQueue determines the capacity after each lock is obtained when the queue is actually operated.

Furthermore, when the queue is empty, the consumer cannot get the task, so he can wait for a while and then get it again. A better approach is to use the take method of BlockingQueue to block and wait, and when there is a task, he can immediately To obtain execution, it is recommended to call the overloaded method of take with a timeout parameter. The thread will exit after the timeout. In this way, when the producer has actually stopped producing, the consumer will not be left waiting indefinitely.

So an efficient production and consumption model that supports blocking is implemented.

Wait a minute, since J.U.C has helped us implement the thread pool, why do we still need to use this set of things? Isn't it more convenient to use ExecutorService directly?

Let’s take a look at the basic structure of ThreadPoolExecutor:

Java thread pool supporting production blocking

As you can see, in ThreadPoolExecutor, the BlockingQueue and Consumer parts have been implemented for us, and There are many advantages to directly using the thread pool implementation, such as dynamic adjustment of the number of threads.


But the problem is that even if you manually specify a BlockingQueue as a queue implementation when constructing ThreadPoolExecutor, in fact when the queue is full, the execute method will not block because ThreadPoolExecutor The non-blocking offer method of BlockingQueue is called:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}
Copy after login

At this time, something needs to be done to achieve a result: when the producer submits the task and the queue is full, the producer can be blocked and wait for the task. be consumed.

The key is that in a concurrent environment, the producer cannot determine whether the queue is full, and ThreadPoolExecutor.getQueue().size() cannot be called to determine whether the queue is full.

In the implementation of the thread pool, when the queue is full, the RejectedExecutionHandler passed in during construction will be called to reject the processing of the task. The default implementation is AbortPolicy, which directly throws a RejectedExecutionException.

I won’t go into details here. The one that is closer to our needs is CallerRunsPolicy. This strategy will allow the thread that submitted the task to execute the task when the queue is full, which is equivalent to letting the production The producer temporarily does the work of the consumer, so that although the producer is not blocked, the submitted task will also be suspended.

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a <tt>CallerRunsPolicy</tt>.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller&#39;s thread, unless the executor
     * has been shut down, in which case the task is discarded.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
Copy after login

But this strategy also has hidden dangers. When there are few producers, during the time when the producers consume tasks, the consumers may have finished consuming all the tasks, and the queue will be in an empty state. When the producers finish executing the tasks, Only then can the production task be continued. This process may cause starvation of the consumer thread.

Referring to a similar idea, the simplest way is to define a RejectedExecutionHandler directly, and when the queue is full, call BlockingQueue.put to implement producer blocking:

new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                if (!executor.isShutdown()) {
                        try {
                                executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                                // should not be interrupted
                        }
                }
        }
};
Copy after login

In this way, we You no longer need to care about the logic of Queue and Consumer, just focus on the implementation logic of producer and consumer threads, and just submit tasks to the thread pool.

Compared with the original design, this method can reduce the amount of code a lot, and can avoid many problems in concurrent environments. Of course, you can also use other methods, such as using semaphores to limit entry when submitting, but if you just want the producer to block, it becomes complicated.

For more articles related to Java thread pools that support production blocking, please pay attention to the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

Is the company's security software causing the application to fail to run? How to troubleshoot and solve it? Is the company's security software causing the application to fail to run? How to troubleshoot and solve it? Apr 19, 2025 pm 04:51 PM

Troubleshooting and solutions to the company's security software that causes some applications to not function properly. Many companies will deploy security software in order to ensure internal network security. ...

How to simplify field mapping issues in system docking using MapStruct? How to simplify field mapping issues in system docking using MapStruct? Apr 19, 2025 pm 06:21 PM

Field mapping processing in system docking often encounters a difficult problem when performing system docking: how to effectively map the interface fields of system A...

How to elegantly obtain entity class variable names to build database query conditions? How to elegantly obtain entity class variable names to build database query conditions? Apr 19, 2025 pm 11:42 PM

When using MyBatis-Plus or other ORM frameworks for database operations, it is often necessary to construct query conditions based on the attribute name of the entity class. If you manually every time...

How do I convert names to numbers to implement sorting and maintain consistency in groups? How do I convert names to numbers to implement sorting and maintain consistency in groups? Apr 19, 2025 pm 11:30 PM

Solutions to convert names to numbers to implement sorting In many application scenarios, users may need to sort in groups, especially in one...

How does IntelliJ IDEA identify the port number of a Spring Boot project without outputting a log? How does IntelliJ IDEA identify the port number of a Spring Boot project without outputting a log? Apr 19, 2025 pm 11:45 PM

Start Spring using IntelliJIDEAUltimate version...

How to safely convert Java objects to arrays? How to safely convert Java objects to arrays? Apr 19, 2025 pm 11:33 PM

Conversion of Java Objects and Arrays: In-depth discussion of the risks and correct methods of cast type conversion Many Java beginners will encounter the conversion of an object into an array...

E-commerce platform SKU and SPU database design: How to take into account both user-defined attributes and attributeless products? E-commerce platform SKU and SPU database design: How to take into account both user-defined attributes and attributeless products? Apr 19, 2025 pm 11:27 PM

Detailed explanation of the design of SKU and SPU tables on e-commerce platforms This article will discuss the database design issues of SKU and SPU in e-commerce platforms, especially how to deal with user-defined sales...

How to elegantly get entity class variable name building query conditions when using TKMyBatis for database query? How to elegantly get entity class variable name building query conditions when using TKMyBatis for database query? Apr 19, 2025 pm 09:51 PM

When using TKMyBatis for database queries, how to gracefully get entity class variable names to build query conditions is a common problem. This article will pin...

See all articles