1 /*
2 * Copyright 2007, 2008 Ange Optimization ApS
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file excep in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 /**
17 * @author Kim Hansen
18 */
19 package eu.simuline.octave.exec;
20
21 import java.io.BufferedReader;
22 import java.io.File;
23 import java.io.IOException;
24 import java.io.InputStreamReader;
25 import java.io.OutputStreamWriter;
26 import java.io.Writer;
27 import java.nio.charset.Charset;
28 import java.util.Random;
29 import java.util.concurrent.CancellationException;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.ThreadFactory;
35
36 import eu.simuline.octave.exception.OctaveException;
37 import eu.simuline.octave.exception.OctaveIOException;
38 import eu.simuline.octave.util.NamedThreadFactory;
39 import eu.simuline.octave.util.NoCloseWriter;
40 import eu.simuline.octave.util.ReaderWriterPipeThread;
41 import eu.simuline.octave.util.TeeWriter;
42 import eu.simuline.octave.OctaveUtils;
43
44 import org.apache.commons.logging.Log;
45 import org.apache.commons.logging.LogFactory;
46
47 /**
48 * The object connecting to the octave process.
49 */
50 public final class OctaveExec {
51
52 public static final String MSG_IOE_NH =
53 "InterruptedException should not happen";
54
55 public static final String MSG_EXE_NH =
56 "ExecutionException should not happen";
57
58 public static final String MSG_RTE_NH =
59 "RuntimeException should not happen";
60
61 private static final Log LOG = LogFactory.getLog(OctaveExec.class);
62
63 /**
64 * The octave process created in the constructor
65 * with given command, arguments, environment and working directory.
66 * This is initialized
67 * in {@link #OctaveExec(int, Writer, Writer, Charset, String[], String[], File)}
68 * and used in {@link #close()} and {@link #destroy()} only.
69 */
70 private final Process process;
71
72 /**
73 * The output writer for {@link #process} derived from {@link Process#getOutputStream()}.
74 */
75 private final Writer processWriter;
76
77 /**
78 * The input reader for {@link #process} derived from {@link Process#getInputStream()}.
79 * This is used by {@link #evalRW(WriteFunctor, ReadFunctor)}
80 * and used to close via {@link #close()}.
81 */
82 private final BufferedReader processReader;
83
84 /**
85 * Used in method {@link #evalRW(WriteFunctor, ReadFunctor)}
86 * to submit essentially the write functor which submits the input
87 * and thereafter the read function which collects the output.
88 * Besides this, the executor is invoked to shutdown.
89 */
90 private final ExecutorService executor;
91
92 /**
93 * The error thread of the error stream of {@link #process}
94 * writing the error stream to a given writer.
95 * This is used to close but also to change the error writer
96 * by {@link #setErrorWriter(Writer)}.
97 */
98 private final ReaderWriterPipeThread errorStreamThread;
99
100 private boolean destroyed = false;
101
102 /**
103 * Will start the octave process.
104 *
105 * @param numThreadsReuse
106 * the number of threads to be reused in a fixed thread pool.
107 * This is either positive or <code>-1</code>,
108 * which means that a cached thread pool is used instead of a fixed one.
109 * @param stdinLog
110 * This writer will capture all
111 * that is written to the octave process via stdin,
112 * if null the data will not be captured.
113 * @param stderrLog
114 * This writer will capture all
115 * that is written from the octave process on stderr,
116 * if null the data will not be captured.
117 * @param charset
118 * the charset used for communication with the octave process.
119 * @param cmdArray
120 * The array consisting of command and arguments:
121 * The 0th entry is either the path to the octave program,
122 * or the command found by looking at the built-in variable "paths"
123 * reconstructing the path.
124 * starting with the 1th entry,
125 * may follow the array of arguments to start the octave program with.
126 * CAUTION: allowed values depend on the octave version.
127 * @param environment
128 * Either the environment for the octave process,
129 * i.e. the set of values of environment variables
130 * with each entry of the form <code>name=value</code>
131 * or null to make {@link #process},
132 * the process created, inherit the environment of the current process.
133 * @param workingDir
134 * Either the working directory for the octave process, or <code>null</code> to make {@link #process},
135 * the process created, inherit the working directory
136 * of the current process.
137 * @throws OctaveIOException
138 * If execution
139 */
140 public OctaveExec(final int numThreadsReuse,
141 final Writer stdinLog,
142 final Writer stderrLog,
143 final Charset charset, // TBD: ensure that various charsets fit.
144 final String[] cmdArray,
145 final String[] environment, // always invoked with null
146 final File workingDir) {
147 ThreadFactory threadFactory = new NamedThreadFactory();
148 this.executor = numThreadsReuse == -1
149 ? Executors.newCachedThreadPool(threadFactory)
150 : Executors.newFixedThreadPool(numThreadsReuse, threadFactory);
151
152 try {
153 // exec may throw
154 // - SecurityException TBC
155 // - UnsupportedOperationException TBC
156 // - IOException (handled by catch)
157 // - NullPointerException if cmdArray is null
158 // or so is one of its components,
159 // - IndexOutOfBoundsException if cmdArray is empty
160 // The latter two are excluded.
161 this.process = Runtime.getRuntime().exec(cmdArray,
162 environment,
163 workingDir);
164 } catch (final IOException e) {
165 throw new OctaveIOException(e);
166 }
167 // Connect stderr
168 this.errorStreamThread = ReaderWriterPipeThread
169 .instantiate(new InputStreamReader(this.process.getErrorStream(), charset),
170 stderrLog);
171
172 // Connect stdout
173 this.processReader = new BufferedReader
174 (new InputStreamReader(this.process.getInputStream(), charset));
175
176 // Connect stdin
177 Writer pw = new OutputStreamWriter(this.process.getOutputStream(), charset);
178 // all written to processWriter will go to pw and,
179 // if not null to stdinLog
180 this.processWriter = (stdinLog == null)
181 ? pw
182 : new TeeWriter(new NoCloseWriter(stdinLog), pw);
183 }
184
185 private final Random random = new Random();
186
187 private String generateSpacer() {
188 return "-=+X+=- Octave.java spacer -=+X+=- " +
189 this.random.nextLong() + " -=+X+=-";
190 }
191
192 /**
193 * Passes <code>input</code> to octave, get back <code>output</code>
194 * and throws according exceptions if reading or writing went wrong.
195 *
196 * @param input
197 * a write functor which represents the script
198 * to be executed in octave.
199 * @param output
200 * the read functor which reads the result of octave execution.
201 * After evaluation of this method,
202 * the <code>output</code> is asked for the result.
203 *///<code></code>
204 // used in OctaveIO#set(Map), OctaveIO#get(String),
205 // OctaveIO#checkIfVarExists(String) and in
206 // OctaveEngine#unsafeEval(String) OctaveEngine#unsafeEval(Reader) and
207 // OctaveEngine#getVersion() only
208 // TBD: document which exceptions can be thrown in detail
209 public void evalRW(final WriteFunctor input, final ReadFunctor output) {
210 final String spacer = generateSpacer();
211 final Future<Void> writerFuture =
212 this.executor.submit(new OctaveWriterCallable(this.processWriter,
213 input,
214 spacer));
215 final Future<Void> readerFuture =
216 this.executor.submit(new OctaveReaderCallable(this.processReader,
217 output,
218 spacer));
219 final RuntimeException writerException = getFromFuture(writerFuture);
220 // if (writerException instanceof CancellationException) {
221 // LOG.error("Did not expect writer to be canceled",
222 // writerException);
223 // }
224 if (writerException != null) {
225 if (writerException instanceof CancellationException) {
226 LOG.error("Did not expect writer to be canceled",
227 writerException);
228 }
229 readerFuture.cancel(true); // may interrupt if running
230 throw writerException;
231 }
232 final RuntimeException readerException = getFromFuture(readerFuture);
233 // if (writerException != null) {
234 // throw writerException;
235 // }
236 if (readerException != null) {
237 // Only gets here when writerException==null,
238 // and in that case we don't expect the reader to be cancelled
239 if (readerException instanceof CancellationException) {
240 LOG.error("Did not expect reader to be canceled",
241 readerException);
242 }
243 throw readerException;
244 }
245 }
246
247 /**
248 * Completes computation on future
249 * and returns an exception thrown or null.
250 */
251 private RuntimeException getFromFuture(Future<Void> future) {
252 try {
253 future.get();
254 } catch (final InterruptedException e) {
255 LOG.error(MSG_IOE_NH, e);
256 return new RuntimeException(MSG_IOE_NH, e);
257 } catch (final ExecutionException e) {
258 if (e.getCause() instanceof OctaveException) {
259 final OctaveException/../../eu/simuline/octave/exception/OctaveException.html#OctaveException">OctaveException oe = (OctaveException) e.getCause();
260 return reInstException(oe);
261 }
262 // Can happen when there is an error in a OctaveWriter
263 LOG.error(MSG_EXE_NH, e);
264 return new RuntimeException(MSG_EXE_NH, e);
265 } catch (final CancellationException e) {
266 return e;
267 } catch (final RuntimeException e) { // NOPMD
268 LOG.error(MSG_RTE_NH, e);
269 return new RuntimeException(MSG_RTE_NH, e);
270 }
271 return null;
272 }
273
274 /**
275 * Used by {@link #getFromFuture(Future)}
276 * to re-instantiate an {@link OctaveException}
277 * if this occurs as the cause of an {@link ExecutionException}.
278 */
279 private OctaveExceptionsimuline/octave/exception/OctaveException.html#OctaveException">OctaveException reInstException(OctaveException exc) {
280 OctaveException res;
281 try {
282 res = exc.getClass()
283 // may throw NoSuchMethodException
284 // isa ReflectiveOperationException,
285 // SecurityException isa RuntimeException
286 .getConstructor(String.class, Throwable.class)
287 // may throw
288 // IllegalArgumentException,
289 // ReflectiveOperationException:
290 // - IllegalAccessException: constructor inaccessible
291 // - InstantiationException: Exception class is abstract
292 // - InvocationTargetException: if the constructor throws an exc
293 // ExceptionInInitializerError
294 .newInstance(exc.getMessage(), exc);
295 } catch (RuntimeException e) { // NOPMD
296 throw new IllegalStateException("Exception should not happen", e);
297 } catch (ReflectiveOperationException e) {
298 throw new IllegalStateException("Exception should not happen", e);
299 } catch (ExceptionInInitializerError e) {
300 throw new IllegalStateException("Error should not happen", e);
301 }
302 if (isDestroyed()) {
303 res.setDestroyed(true);
304 }
305 return res;
306 }
307
308 /**
309 * Sets {@link #destroyed} to the parameter value given.
310 */
311 private synchronized void setDestroyed(final boolean destroyed) {
312 this.destroyed = destroyed;
313 }
314
315 /**
316 * Returns {@link #destroyed}.
317 */
318 private synchronized boolean isDestroyed() {
319 return this.destroyed;
320 }
321
322 /**
323 * Kill the octave process without remorse.
324 */
325 public void destroy() {
326 setDestroyed(true);
327 this.executor.shutdownNow(); // returns list of tasks awaiting exec.
328 this.process.destroy();
329 this.errorStreamThread.close();
330 try {
331 this.processWriter.close();
332 } catch (final IOException e) {
333 LOG.debug("Ignored error from processWriter.close() " +
334 "in OctaveExec.destroy()", e);
335 }
336 }
337
338 /**
339 * Close the octave process in an orderly fashion:
340 * Send command <code>exit</code> and expect a single line in return,
341 * namely an empty one.
342 *
343 * @throws OctaveIOException
344 * if
345 */
346 public void close() {
347 try {
348 // it is not worth it to rewrite this
349 // to use eval() and some specialized Functors
350 // the next three commands all may throw IOException
351 this.processWriter.write("exit\n");
352 this.processWriter.close();
353 final String read1 = this.processReader.readLine();
354
355 // Allow a single blank line, exit in octave 3.2 returns that:
356 if (read1 != null && !"".equals(read1)) {
357 throw new OctaveIOException
358 ("Expected a blank line, read '" + read1 + "'");
359 }
360 final String read2 = this.processReader.readLine();// may throw IOExceptino
361 if (read2 != null) {
362 throw new OctaveIOException
363 ("Expected reader to be at end of stream, read '" +
364 read2 + "'");
365 }
366 // may throw IOException
367 this.processReader.close();
368 // may throw IOException
369 this.errorStreamThread.close();
370 int exitValue;
371 try {
372 exitValue = this.process.waitFor();
373 } catch (final InterruptedException e) {
374 throw new OctaveIOException
375 ("Interrupted when waiting for octave process " +
376 "to terminate", e);
377 }
378 if (exitValue != 0) {
379 throw new OctaveIOException
380 ("octave process terminated abnormaly, exitValue='" +
381 exitValue + "'");
382 }
383 } catch (final IOException e) {
384 // TBD: correct this: may be also a problem with the writer.
385 // Reader: may be processReader or error stream.
386 final OctaveIOException octaveException =
387 new OctaveIOException("reader error", e);
388 if (isDestroyed()) {
389 octaveException.setDestroyed(true);
390 }
391 throw octaveException;
392 } finally {
393 this.executor.shutdown();
394 }
395 }
396
397 /**
398 * @param writer
399 * the new writer to write the error output to
400 */
401 public void setErrorWriter(final Writer writer) {
402 this.errorStreamThread.setWriter(writer);
403 }
404
405 }