abstract class AbstractWorkerPool extends WorkerPool with Closeable
This class handles a FixedThreadPool from Java executors, excepted that the used threads can be busy about executing other tasks contained in the pool instead of stupidly waiting in an object monitor
Problem
Let's say that we have a pool of 3 threads that handle deserialization and packet injections.
(see fr.linkit.api.gnom.packet.traffic.PacketInjectable for further details about injection)
Then suddenly, for a reason that can often appear, every threads of the pool are waiting to receipt another packet.
The waited packet will effectively be downloaded by the PacketReaderThread,
but it could not be deserialized and injected because all the thread are currently waiting for an injection.
This way, a kind of deadlock will occur because each threads are waiting for their packet to be injected,
and there is no free thread in the pool that would process the packet injection
Pseudo code :
Here is an illustration of a normal execution.
Where the first thread is waiting for a packet to be received,
and where a second thread is injecting the packet in the channel.
<u>Thread 1 : waiting for a packet to be received
val channel = connection.getInjectable(x, ChannelScope.y, SyncPacketChannel) val nextPacket = channel.nextPacket() println("A new packet has been received !") // Process nextPacket....
<u>Thread 2 : injecting the next packet that concerns the channel, handled by the thread 1.</u>
val injectable = //Retrieves the needed injectable, stored with 'x' identifier val packetInjection = //Get the concerned injection object injectable.injectPacket(packetInjection) //The injection will notify the first thread. //println("Another packet injection has been performed !")
Solution
In a normal execution, where a second thread is free to notify the first thread, the two prints would be done successfully
Therefore, if the second thread were not able to handle the injection, because it would be busy to
execute another task submitted to the thread pool, the first thread could not be
notified, and will wait until a thread is free to process the injection.
But we have to rely on the fact that the first thread is doing noting.
But, are we saying that we have a thread that is stupidly waiting for a packet to be provided, and will do
absolutely nothing until he does not received its wants,
where he can take the time he is sleeping for executing other
tasks in the pool ? and thus consequently injecting his own packet to unlock itself ? What a lazy thread !
The Busy thread system will save this lost time in order to fluidify task execution, and make one thread able to handle multiple tasks even if a task needs to wait a Linkit resource.
- See also
WorkerBlockingQueue for busy waitings example.
- Alphabetic
- By Inheritance
- AbstractWorkerPool
- Closeable
- AutoCloseable
- WorkerPool
- ExecutionContext
- ProcrastinatorControl
- Procrastinator
- AnyRef
- Any
- Hide All
- Show All
- Public
- Protected
Instance Constructors
- new AbstractWorkerPool(name: String)
Abstract Value Members
Concrete Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- def addWorker(worker: Worker): Unit
- Attributes
- protected
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @native() @IntrinsicCandidate()
- def close(): Unit
- Definition Classes
- AbstractWorkerPool → Closeable → AutoCloseable
- var closed: Boolean
- Attributes
- protected
- def currentTaskExecutionDepth: Int
The Task Execution Depth is an int value that determines the number of tasks a thread is consequently executing.
The Task Execution Depth is an int value that determines the number of tasks a thread is consequently executing.
- returns
the task execution depth of the current thread
- Annotations
- @workerExecution()
- Exceptions thrown
IllegalThreadException
if the current thread is not a Worker
- def ensureCurrentThreadNotOwned(msg: String): Unit
- def ensureCurrentThreadNotOwned(): Unit
- def ensureCurrentThreadOwned(msg: String): Unit
- Definition Classes
- AbstractWorkerPool → WorkerPool
- def ensureCurrentThreadOwned(): Unit
- Definition Classes
- AbstractWorkerPool → WorkerPool
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- def execute(runnable: Runnable): Unit
- Definition Classes
- AbstractWorkerPool → ExecutionContext
- def executeRemainingTasks(): Unit
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @IntrinsicCandidate()
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @native() @IntrinsicCandidate()
- final def haveMoreTasks: Boolean
- def isCurrentThreadOwned: Boolean
- Definition Classes
- AbstractWorkerPool → WorkerPool
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- val name: String
- Definition Classes
- AbstractWorkerPool → WorkerPool
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def newBusyQueue[A]: BlockingQueue[A]
Creates a blocking queue that keep busy its thread instead of make it waiting the provided queue will use the busy threading system for concurrent operations such as BlockingQueue#take()
Creates a blocking queue that keep busy its thread instead of make it waiting the provided queue will use the busy threading system for concurrent operations such as BlockingQueue#take()
- A
the type of element the queue will contains
- returns
- Definition Classes
- AbstractWorkerPool → WorkerPool
- def nextTaskCount: Int
- Attributes
- protected
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @IntrinsicCandidate()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @native() @IntrinsicCandidate()
- def pauseCurrentTask(lock: Lock): Unit
pauses the current task and look for executing other pending tasks while this task is paused
pauses the current task and look for executing other pending tasks while this task is paused
- lock
a lock to acquire until the current task is marked as paused.
- Definition Classes
- AbstractWorkerPool → WorkerPool
- Annotations
- @workerExecution()
- Exceptions thrown
IllegalThreadException
if the current thread is not a Worker
- def pauseCurrentTask(): Unit
- Definition Classes
- AbstractWorkerPool → WorkerPool
- def pauseCurrentTaskForAtLeast(timeoutMillis: Long): Unit
Keep the current thread busy with task execution for at least x milliseconds.
Keep the current thread busy with task execution for at least x milliseconds.
- timeoutMillis
the number of milliseconds the thread must be busy.
- Definition Classes
- AbstractWorkerPool → WorkerPool
- Exceptions thrown
IllegalThreadException
if the current thread is not a Worker
- def reportFailure(cause: Throwable): Unit
- Definition Classes
- AbstractWorkerPool → ExecutionContext
- def runLater(task: => Unit): Unit
- Definition Classes
- AbstractWorkerPool → Procrastinator
- def runLaterControl[A](task: => A): WorkerTask[A]
Submits a task to the executor thread pool.
Submits a task to the executor thread pool. The task will then be handled directly or right after a thread is looking for other task to schedule.
- task
the task to execute in the thread pool
- Definition Classes
- AbstractWorkerPool → ProcrastinatorControl
- Exceptions thrown
IllegalStateException
if the pool is closed
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def threadCount: Int
- Attributes
- protected
- def toString(): String
- Definition Classes
- AnyRef → Any
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- val workers: ListBuffer[Worker]
- Attributes
- protected
Deprecated Value Members
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable]) @Deprecated
- Deprecated
- def prepare(): ExecutionContext
- Definition Classes
- ExecutionContext
- Annotations
- @deprecated
- Deprecated
(Since version 2.12.0) preparation of ExecutionContexts will be removed