1 /*
2 * Copyright 2008, 2009 Ange Optimization ApS
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package eu.simuline.octave.exec;
17
18 import java.io.BufferedReader;
19 import java.io.IOException;
20 import java.io.Reader;
21 import java.util.concurrent.Callable;
22
23 import eu.simuline.octave.exception.OctaveIOException;
24 import eu.simuline.octave.util.StringUtil;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28
29 /**
30 * {@link Callable} that reads from the octave process.
31 * Used in {@link OctaveExec#evalRW(WriteFunctor, ReadFunctor)} only.
32 * This is complementary to {@link OctaveWriterCallable}.
33 */
34 final class OctaveReaderCallable implements Callable<Void> {
35
36 private static final Log LOG = LogFactory.getLog(OctaveReaderCallable.class);
37
38 private static final String MSG_IOE_READ =
39 "IOException from ReadFunctor";
40 private static final String MSG_IOE_CLS =
41 "IOException during close";
42
43 /**
44 * The reader for feedback on scripts received from octave.
45 * This is nothing but {@link OctaveExec#processReader}.
46 */
47 private final BufferedReader processReader;
48
49 /**
50 * The functor the reading task is delegated to.
51 */
52 private final ReadFunctor readFunctor;
53
54 /**
55 * A string essentially consisting of a unique hashvalue.
56 * It has been printed by the according {@link OctaveWriterCallable}
57 * after having applied the write functor
58 * and thus indicates the end of the read sequence of the octave process.
59 */
60 private final String spacer;
61
62 // TBC: strictly speaking, this goes wrong with a small but positive probability.
63 /**
64 * @param processReader
65 * the reader used for reading.
66 * @param readFunctor
67 * the functor the reading process is delegated to.
68 * @param spacer
69 * a string essentially consisting of a unique hashvalue
70 * printed at the end of the according write process
71 * in {@link OctaveExec#evalRW(WriteFunctor, ReadFunctor)}
72 * and thus signifying the end of the sequence to be read.
73 */
74 OctaveReaderCallable(final BufferedReader processReader,
75 final ReadFunctor readFunctor,
76 final String spacer) {
77 this.processReader = processReader;
78 this.readFunctor = readFunctor;
79 this.spacer = spacer;
80 }
81
82 /**
83 * Reader that passes the reading on to the output from the octave process,
84 * i.e. from {@link OctaveReaderCallable#processReader}
85 * until the spacer {@link OctaveReaderCallable#spacer} reached, then it returns EOF.
86 * When this reader is closed
87 * the underlying reader is slurped up to the spacer.
88 * <p>
89 * This is used in {@link OctaveReaderCallable#call()} only.
90 */
91 // TBC: maybe thus this shall be an inner class.
92 final class OctaveExecuteReader extends Reader {
93
94 /**
95 * The current line read from {@link OctaveReaderCallable#processReader}
96 * but not yet passed to a char-array by {@link #read(char[], int, int)}.
97 * If this buffer were empty, it is <code>null</code> instead,
98 * which is also the initial value.
99 * If this is not the first line, the line read from {@link OctaveReaderCallable#processReader}
100 * is preceded by newline before being passed to {@link #buffer}.
101 */
102 private StringBuffer buffer = null;
103
104 /**
105 * Whether reading the first line.
106 * Initially, this is true.
107 * It is set to false, by {@link #read(char[], int, int)}
108 * if {@link #buffer} is filled for the first time.
109 */
110 private boolean firstLine = true;
111
112 /**
113 * Whether end of reader found.
114 * Initially, this is false.
115 * It is set to false, by {@link #read(char[], int, int)}
116 * if {@link #buffer} equals {@link #spacer},
117 * not really end of {@link OctaveReaderCallable#processReader}.
118 */
119 private boolean eof = false;
120
121 /**
122 * Reads characters into a portion of an array.
123 * This method will block until some input is available,
124 * an I/O error occurs, or the end of the stream is reached.
125 *
126 * @param cbuf
127 * Destination buffer
128 * @param off
129 * Offset at which to start storing characters.
130 * @param len
131 * Maximum number of characters to read.
132 * @return
133 * The number of characters read,
134 * or -1 if the end of the stream has been reached.
135 * The latter is the case if {@link #eof} is set
136 * which means that line {@link #spacer} has been found.
137 * @throws IOException
138 * If an I/O error occurs.
139 * This is true in particular,
140 * if null-line has been read from {@link OctaveReaderCallable#processReader}.
141 */
142 @Override
143 public int read(final char[] cbuf, final int off, final int len)
144 throws IOException {
145 if (this.eof) {
146 return -1;
147 }
148 if (this.buffer == null) {
149 // may throw IOException
150 final String line = OctaveReaderCallable.this.processReader.readLine();
151 if (OctaveReaderCallable.LOG.isTraceEnabled()) {
152 OctaveReaderCallable.LOG.trace("octaveReader.readLine() = " +
153 StringUtil.jQuote(line));
154 }
155 if (line == null) {
156 throw new IOException("Pipe to octave-process broken");
157 }
158 if (OctaveReaderCallable.this.spacer.equals(line)) {
159 this.eof = true;
160 return -1;
161 }
162
163 // line possibly preceded by \n
164 this.buffer = new StringBuffer(line.length() + 1);
165 if (this.firstLine) {
166 this.firstLine = false;
167 } else {
168 this.buffer.append('\n');
169 }
170 this.buffer.append(line);
171 }
172 assert this.buffer != null;
173
174 final int charsRead = Math.min(this.buffer.length(), len);
175 this.buffer.getChars(0, charsRead, cbuf, off);
176 if (charsRead == buffer.length()) {
177 this.buffer = null;
178 } else {
179 this.buffer.delete(0, charsRead);
180 }
181 return charsRead;
182 }
183
184 @Override
185 @SuppressWarnings({"checkstyle:magicnumber", "checkstyle:emptyblock"})
186 // length of buffer is immaterial for function
187 public void close() throws IOException {
188 final char[] buffer1 = new char[4096];
189 // Slurp the rest of the wrapped input
190 // may throw IOException
191 while (read(buffer1) != -1) { // NOPMD
192 // Do nothing
193 }
194 // may throw IOException
195 if (OctaveReaderCallable.this.processReader.ready()) {
196 throw new IOException("octaveReader is ready()");
197 }
198 OctaveReaderCallable.LOG.debug("Reader closed()");
199 }
200
201 } // class OctaveExecuteReader
202
203 // TBC: what about spacer?
204 // where do deviations from OctaveWriterCallable.call() come from?
205 /**
206 * Calling essentially reads from {@link #processReader} representing the octave process:
207 * Reads until {@link #spacer} is detected which is not really read but is interpreted
208 * as an eof symbol.
209 * To that end, {@link #processReader} is wrapped into an {@link OctaveExecuteReader}
210 * with parameter {@link #spacer} which yields eof if the spacer is detected.
211 * Closing the {@link OctaveExecuteReader} reads the spacer without passing it further.
212 * Exceptions are logged on {@link #LOG}.
213 *
214 * @throws OctaveIOException
215 * if underlying {@link #readFunctor} or the reader {@link #processReader} or
216 * the wrapping {@link OctaveExecuteReader}
217 * throws an {@link IOException}.
218 */
219 @Override
220 public Void call() {
221 final Reader reader = new OctaveExecuteReader();
222 try {
223 this.readFunctor.doReads(reader);
224 } catch (final IOException e) {
225 LOG.debug(MSG_IOE_READ, e);
226 throw new OctaveIOException(MSG_IOE_READ, e);
227 } finally { // NOPMD
228 try {
229 // this slurps the spacer
230 reader.close();
231 } catch (final IOException e) {
232 LOG.debug(MSG_IOE_CLS, e);
233 throw new OctaveIOException(MSG_IOE_CLS, e);
234 }
235 }
236 return null;
237 }
238
239 }