aboutsummaryrefslogtreecommitdiffstats
path: root/framework/src/ant/apache-ant-1.9.6/src/main/org/apache/tools/ant/taskdefs/Parallel.java
diff options
context:
space:
mode:
Diffstat (limited to 'framework/src/ant/apache-ant-1.9.6/src/main/org/apache/tools/ant/taskdefs/Parallel.java')
-rw-r--r--framework/src/ant/apache-ant-1.9.6/src/main/org/apache/tools/ant/taskdefs/Parallel.java489
1 files changed, 489 insertions, 0 deletions
diff --git a/framework/src/ant/apache-ant-1.9.6/src/main/org/apache/tools/ant/taskdefs/Parallel.java b/framework/src/ant/apache-ant-1.9.6/src/main/org/apache/tools/ant/taskdefs/Parallel.java
new file mode 100644
index 00000000..c4f5c9e9
--- /dev/null
+++ b/framework/src/ant/apache-ant-1.9.6/src/main/org/apache/tools/ant/taskdefs/Parallel.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.tools.ant.taskdefs;
+
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Vector;
+
+import org.apache.tools.ant.BuildException;
+import org.apache.tools.ant.ExitStatusException;
+import org.apache.tools.ant.Location;
+import org.apache.tools.ant.Task;
+import org.apache.tools.ant.TaskContainer;
+import org.apache.tools.ant.property.LocalProperties;
+import org.apache.tools.ant.util.StringUtils;
+
+/**
+ * Executes the contained tasks in separate threads, continuing
+ * once all are completed.
+ * <p>
+ * New behavior allows for the ant script to specify a maximum number of
+ * threads that will be executed in parallel. One should be very careful about
+ * using the <code>waitFor</code> task when specifying <code>threadCount</code>
+ * as it can cause deadlocks if the number of threads is too small or if one of
+ * the nested tasks fails to execute completely. The task selection algorithm
+ * will insure that the tasks listed before a task have started before that
+ * task is started, but it will not insure a successful completion of those
+ * tasks or that those tasks will finish first (i.e. it's a classic race
+ * condition).
+ * </p>
+ * @since Ant 1.4
+ *
+ * @ant.task category="control"
+ */
+public class Parallel extends Task
+ implements TaskContainer {
+
+ private static final int NUMBER_TRIES = 100;
+
+ /** Class which holds a list of tasks to execute */
+ public static class TaskList implements TaskContainer {
+ /** Collection holding the nested tasks */
+ private List tasks = new ArrayList();
+
+ /**
+ * Add a nested task to execute parallel (asynchron).
+ * <p>
+ * @param nestedTask Nested task to be executed in parallel.
+ * must not be null.
+ */
+ public void addTask(Task nestedTask) {
+ tasks.add(nestedTask);
+ }
+ }
+
+ /** Collection holding the nested tasks */
+ private Vector nestedTasks = new Vector();
+
+ /** Semaphore to notify of completed threads */
+ private final Object semaphore = new Object();
+
+ /** Total number of threads to run */
+ private int numThreads = 0;
+
+ /** Total number of threads per processor to run. */
+ private int numThreadsPerProcessor = 0;
+
+ /** The timeout period in milliseconds */
+ private long timeout;
+
+ /** Indicates threads are still running and new threads can be issued */
+ private volatile boolean stillRunning;
+
+ /** Indicates that the execution timedout */
+ private boolean timedOut;
+
+ /**
+ * Indicates whether failure of any of the nested tasks should end
+ * execution
+ */
+ private boolean failOnAny;
+
+ /** The dameon task list if any */
+ private TaskList daemonTasks;
+
+ /** Accumulation of exceptions messages from all nested tasks */
+ private StringBuffer exceptionMessage;
+
+ /** Number of exceptions from nested tasks */
+ private int numExceptions = 0;
+
+ /** The first exception encountered */
+ private Throwable firstException;
+
+ /** The location of the first exception */
+ private Location firstLocation;
+
+ /** The status of the first ExitStatusException. */
+ private Integer firstExitStatus;
+
+ /**
+ * Add a group of daemon threads
+ * @param daemonTasks The tasks to be executed as daemon.
+ */
+ public void addDaemons(TaskList daemonTasks) {
+ if (this.daemonTasks != null) {
+ throw new BuildException("Only one daemon group is supported");
+ }
+ this.daemonTasks = daemonTasks;
+ }
+
+ /**
+ * Interval to poll for completed threads when threadCount or
+ * threadsPerProcessor is specified. Integer in milliseconds.; optional
+ *
+ * @param pollInterval New value of property pollInterval.
+ */
+ public void setPollInterval(int pollInterval) {
+ }
+
+ /**
+ * Control whether a failure in a nested task halts execution. Note that
+ * the task will complete but existing threads will continue to run - they
+ * are not stopped
+ *
+ * @param failOnAny if true any nested task failure causes parallel to
+ * complete.
+ */
+ public void setFailOnAny(boolean failOnAny) {
+ this.failOnAny = failOnAny;
+ }
+
+ /**
+ * Add a nested task to execute in parallel.
+ * @param nestedTask Nested task to be executed in parallel
+ */
+ public void addTask(Task nestedTask) {
+ nestedTasks.addElement(nestedTask);
+ }
+
+ /**
+ * Dynamically generates the number of threads to execute based on the
+ * number of available processors (via
+ * <code>java.lang.Runtime.availableProcessors()</code>).
+ * Will overwrite the value set in threadCount; optional
+ * @param numThreadsPerProcessor Number of threads to create per available
+ * processor.
+ *
+ */
+ public void setThreadsPerProcessor(int numThreadsPerProcessor) {
+ this.numThreadsPerProcessor = numThreadsPerProcessor;
+ }
+
+ /**
+ * Statically determine the maximum number of tasks to execute
+ * simultaneously. If there are less tasks than threads then all will be
+ * executed at once, if there are more then only <code>threadCount</code>
+ * tasks will be executed at one time. If <code>threadsPerProcessor</code>
+ * is set then this value is
+ * ignored.; optional
+ *
+ * @param numThreads total number of threads.
+ *
+ */
+ public void setThreadCount(int numThreads) {
+ this.numThreads = numThreads;
+ }
+
+ /**
+ * Sets the timeout on this set of tasks. If the timeout is reached
+ * before the other threads complete, the execution of this
+ * task completes with an exception.
+ *
+ * Note that existing threads continue to run.
+ *
+ * @param timeout timeout in milliseconds.
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+
+
+ /**
+ * Execute the parallel tasks
+ *
+ * @exception BuildException if any of the threads failed.
+ */
+ public void execute() throws BuildException {
+ updateThreadCounts();
+ if (numThreads == 0) {
+ numThreads = nestedTasks.size();
+ }
+ spinThreads();
+ }
+
+ /**
+ * Determine the number of threads based on the number of processors
+ */
+ private void updateThreadCounts() {
+ if (numThreadsPerProcessor != 0) {
+ numThreads = Runtime.getRuntime().availableProcessors() *
+ numThreadsPerProcessor;
+ }
+ }
+
+ private void processExceptions(TaskRunnable[] runnables) {
+ if (runnables == null) {
+ return;
+ }
+ for (int i = 0; i < runnables.length; ++i) {
+ Throwable t = runnables[i].getException();
+ if (t != null) {
+ numExceptions++;
+ if (firstException == null) {
+ firstException = t;
+ }
+ if (t instanceof BuildException
+ && firstLocation == Location.UNKNOWN_LOCATION) {
+ firstLocation = ((BuildException) t).getLocation();
+ }
+ if (t instanceof ExitStatusException
+ && firstExitStatus == null) {
+ ExitStatusException ex = (ExitStatusException) t;
+ firstExitStatus = ex.getStatus();
+ // potentially overwriting existing value but the
+ // location should match the exit status
+ firstLocation = ex.getLocation();
+ }
+ exceptionMessage.append(StringUtils.LINE_SEP);
+ exceptionMessage.append(t.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Spin up required threads with a maximum number active at any given time.
+ *
+ * @exception BuildException if any of the threads failed.
+ */
+ private void spinThreads() throws BuildException {
+ final int numTasks = nestedTasks.size();
+ TaskRunnable[] runnables = new TaskRunnable[numTasks];
+ stillRunning = true;
+ timedOut = false;
+ boolean interrupted = false;
+
+ int threadNumber = 0;
+ for (Enumeration e = nestedTasks.elements(); e.hasMoreElements();
+ threadNumber++) {
+ Task nestedTask = (Task) e.nextElement();
+ runnables[threadNumber]
+ = new TaskRunnable(nestedTask);
+ }
+
+ final int maxRunning = numTasks < numThreads ? numTasks : numThreads;
+ TaskRunnable[] running = new TaskRunnable[maxRunning];
+
+ threadNumber = 0;
+ ThreadGroup group = new ThreadGroup("parallel");
+
+ TaskRunnable[] daemons = null;
+ if (daemonTasks != null && daemonTasks.tasks.size() != 0) {
+ daemons = new TaskRunnable[daemonTasks.tasks.size()];
+ }
+
+ synchronized (semaphore) {
+ // When we leave this block we can be sure all data is really
+ // stored in main memory before the new threads start, the new
+ // threads will for sure load the data from main memory.
+ //
+ // This probably is slightly paranoid.
+ }
+
+ synchronized (semaphore) {
+ // start any daemon threads
+ if (daemons != null) {
+ for (int i = 0; i < daemons.length; ++i) {
+ daemons[i] = new TaskRunnable((Task) daemonTasks.tasks.get(i));
+ Thread daemonThread = new Thread(group, daemons[i]);
+ daemonThread.setDaemon(true);
+ daemonThread.start();
+ }
+ }
+
+ // now run main threads in limited numbers...
+ // start initial batch of threads
+ for (int i = 0; i < maxRunning; ++i) {
+ running[i] = runnables[threadNumber++];
+ Thread thread = new Thread(group, running[i]);
+ thread.start();
+ }
+
+ if (timeout != 0) {
+ // start the timeout thread
+ Thread timeoutThread = new Thread() {
+ public synchronized void run() {
+ try {
+ wait(timeout);
+ synchronized (semaphore) {
+ stillRunning = false;
+ timedOut = true;
+ semaphore.notifyAll();
+ }
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ };
+ timeoutThread.start();
+ }
+
+ try {
+ // now find available running slots for the remaining threads
+ outer: while (threadNumber < numTasks && stillRunning) {
+ for (int i = 0; i < maxRunning; i++) {
+ if (running[i] == null || running[i].isFinished()) {
+ running[i] = runnables[threadNumber++];
+ Thread thread = new Thread(group, running[i]);
+ thread.start();
+ // continue on outer while loop to get another
+ // available slot
+ continue outer;
+ }
+ }
+
+ // if we got here all slots in use, so sleep until
+ // something happens
+ semaphore.wait();
+ }
+
+ // are all threads finished
+ outer2: while (stillRunning) {
+ for (int i = 0; i < maxRunning; ++i) {
+ if (running[i] != null && !running[i].isFinished()) {
+ // System.out.println("Thread " + i + " is still
+ // alive ");
+ // still running - wait for it
+ semaphore.wait();
+ continue outer2;
+ }
+ }
+ stillRunning = false;
+ }
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ }
+
+ if (!timedOut && !failOnAny) {
+ // https://issues.apache.org/bugzilla/show_bug.cgi?id=49527
+ killAll(running);
+ }
+ }
+
+ if (interrupted) {
+ throw new BuildException("Parallel execution interrupted.");
+ }
+ if (timedOut) {
+ throw new BuildException("Parallel execution timed out");
+ }
+
+ // now did any of the threads throw an exception
+ exceptionMessage = new StringBuffer();
+ numExceptions = 0;
+ firstException = null;
+ firstExitStatus = null;
+ firstLocation = Location.UNKNOWN_LOCATION;
+ processExceptions(daemons);
+ processExceptions(runnables);
+
+ if (numExceptions == 1) {
+ if (firstException instanceof BuildException) {
+ throw (BuildException) firstException;
+ } else {
+ throw new BuildException(firstException);
+ }
+ } else if (numExceptions > 1) {
+ if (firstExitStatus == null) {
+ throw new BuildException(exceptionMessage.toString(),
+ firstLocation);
+ } else {
+ throw new ExitStatusException(exceptionMessage.toString(),
+ firstExitStatus, firstLocation);
+ }
+ }
+ }
+
+ /**
+ * Doesn't do anything if all threads where already gone,
+ * else it tries to interrupt the threads 100 times.
+ * @param running The list of tasks that may currently be running.
+ */
+ private void killAll(TaskRunnable[] running) {
+ boolean oneAlive;
+ int tries = 0;
+ do {
+ oneAlive = false;
+ for (int i = 0; i < running.length; i++) {
+ if (running[i] != null && !running[i].isFinished()) {
+ running[i].interrupt();
+ Thread.yield();
+ oneAlive = true;
+ }
+ }
+ if (oneAlive) {
+ tries++;
+ Thread.yield();
+ }
+ } while (oneAlive && tries < NUMBER_TRIES);
+ }
+
+ /**
+ * thread that execs a task
+ */
+ private class TaskRunnable implements Runnable {
+ private Throwable exception;
+ private Task task;
+ private boolean finished;
+ private volatile Thread thread;
+
+ /**
+ * Construct a new TaskRunnable.<p>
+ *
+ * @param task the Task to be executed in a separate thread
+ */
+ TaskRunnable(Task task) {
+ this.task = task;
+ }
+
+ /**
+ * Executes the task within a thread and takes care about
+ * Exceptions raised within the task.
+ */
+ public void run() {
+ try {
+ LocalProperties.get(getProject()).copy();
+ thread = Thread.currentThread();
+ task.perform();
+ } catch (Throwable t) {
+ exception = t;
+ if (failOnAny) {
+ stillRunning = false;
+ }
+ } finally {
+ synchronized (semaphore) {
+ finished = true;
+ semaphore.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * get any exception that got thrown during execution;
+ * @return an exception or null for no exception/not yet finished
+ */
+ public Throwable getException() {
+ return exception;
+ }
+
+ /**
+ * Provides the indicator that the task has been finished.
+ * @return Returns true when the task is finished.
+ */
+ boolean isFinished() {
+ return finished;
+ }
+
+ void interrupt() {
+ thread.interrupt();
+ }
+ }
+
+}