Beschreibung#
Der TimeoutExecutor benutzt eine begrenzte Zahl von Threads aus einem bestehenden ExecutorService zum Ausführen von Callables. Wenn das Ergebnis eines Callables nicht innerhalb der angegebenen Zeit vorliegt, wird die Abarbeitung abgebrochen.Ziel#
Die Threads werden effizient genutzt, indem auf eine Ressource maximal die angegebene Zeit gewartet wird. Sobald ein Thread frei wird, entweder durch liefern des Ergebnisses oder durch den Timeout, wird dieser wieder verwendet.package com.intersult.util.concurrent import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class TimeoutExecutor<V> implements Future<List<V>> { private class TimeoutFuture extends FutureTask<V> { private long end; TimeoutFuture(Callable<V> callable, long end) { super(callable); this.end = end; } @Override protected void done() { if (!isCancelled()) completionQueue.add(this); } public boolean isTimeout() { return end < System.nanoTime(); } public long getEnd() { return end; } } final BlockingQueue<TimeoutFuture> submissionQueue; final BlockingQueue<TimeoutFuture> completionQueue; private final Executor executor; private boolean cancelled; private final int maxThreads; private final Collection<Callable<V>> callables; public TimeoutExecutor(Executor executor, Collection<Callable<V>> callables, int maxThreads) { this.maxThreads = maxThreads; if (executor == null || callables == null) throw new NullPointerException(); this.executor = executor; this.callables = callables; submissionQueue = new LinkedBlockingQueue<TimeoutFuture>(maxThreads); completionQueue = new LinkedBlockingQueue<TimeoutFuture>(callables.size()); } @Override public boolean cancel(boolean mayInterruptIfRunning) { if (isDone()) return false; cancelled = true; for (Future<V> future : submissionQueue) { future.cancel(mayInterruptIfRunning); } return cancelled; } @Override public List<V> get() throws InterruptedException, ExecutionException { return get(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } @Override public List<V> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException { long nanoTimeout = unit.toNanos(timeout); List<V> result = new ArrayList<V>(callables.size()); for (Iterator<Callable<V>> iterator = callables.iterator(); iterator.hasNext() || !submissionQueue.isEmpty();) { while (!cancelled && submissionQueue.size() < maxThreads && iterator.hasNext()) { long end = System.nanoTime() + nanoTimeout; TimeoutFuture future = new TimeoutFuture(iterator.next(), end); submissionQueue.add(future); executor.execute(future); } long remain = submissionQueue.peek().getEnd() - System.nanoTime(); TimeoutFuture future = completionQueue.poll(remain, TimeUnit.NANOSECONDS); if (future != null) { submissionQueue.remove(future); result.add(future.get()); } while (!submissionQueue.isEmpty()) { if (submissionQueue.peek().isTimeout()) { submissionQueue.poll().cancel(true); } else { break; } } } return result; } @Override public boolean isCancelled() { return cancelled; } @Override public boolean isDone() { return submissionQueue.isEmpty(); }}