Ausgabe
Wir haben eine große Textdatei, in der jede Zeile intensive process
. Das Design soll einen haben class
, der die Datei liest und die Verarbeitung jeder Zeile an einen delegiert thread
, via thread pool
. Die Dateileseklasse sollte daran gehindert werden, die nächste Zeile zu lesen, sobald es keinen freien Thread im Pool gibt, um die Verarbeitung durchzuführen. Also brauche ich eineblocking thread pool
In der aktuellen Implementierung ThreadPoolExecutor.submit()
und den ThreadPoolExecutor.execute()
Methoden wird eine RejectedExecutionException
Ausnahme ausgelöst, nachdem die konfigurierte Anzahl von Threads ausgelastet ist, wie ich im folgenden Code-Snippet gezeigt habe.
public class BlockingTp {
public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue(3);
ThreadPoolExecutor executorService=
new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, blockingQueue);
int Jobs = 10;
System.out.println("Starting application with " + Jobs + " jobs");
for (int i = 1; i <= Jobs; i++)
try {
executorService.submit(new WorkerThread(i));
System.out.println("job added " + (i));
} catch (RejectedExecutionException e) {
System.err.println("RejectedExecutionException");
}
}
}
class WorkerThread implements Runnable {
int job;
public WorkerThread(int job) {
this.job = job;
}
public void run() {
try {
Thread.sleep(1000);
} catch (Exception excep) {
}
}
}
Ausgabe des obigen Programms ist
Starting application to add 10 jobs
Added job #1
Added job #2
Added job #3
Added job #4
Added job #5
Added job #6
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException
RejectedExecutionException
Kann jemand etwas Licht werfen, dh wie ich das Blockieren des Thread-Pools implementieren kann .
Lösung
Kann jemand etwas Licht werfen, dh wie ich das Blockieren des Thread-Pools implementieren kann.
Sie müssen einen Rejection Execution Handler für Ihren Executor-Service festlegen. Wenn der Thread den Job in den Executor stellt, blockiert er, bis in der Blockierungswarteschlange Platz ist.
BlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
ThreadPoolExecutor executorService =
new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, arrayBlockingQueue);
// when the blocking queue is full, this tries to put into the queue which blocks
executorService.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// block until there's room
executor.getQueue().put(r);
// check afterwards and throw if pool shutdown
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + executor);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Producer interrupted", e);
}
}
});
Anstatt dass das TRE also ein wirft RejectedExecutionException
, ruft es den Rejection-Handler auf, der wiederum versucht, den Job wieder in die Warteschlange zu stellen. Dadurch wird der Anrufer blockiert.
Beantwortet von – Grau
Antwort geprüft von – Timothy Miller (FixError Admin)