1 /**
2 	Generic stream interface used by several stream-like classes.
3 
4 	This module defines the basic stream primitives. For concrete stream types, take a look at the
5 	`vibe.stream` package. The `vibe.stream.operations` module contains additional high-level
6 	operations on streams, such as reading streams by line or as a whole.
7 
8 	Copyright: © 2012-2015 RejectedSoftware e.K.
9 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
10 	Authors: Sönke Ludwig
11 */
12 module vibe.core.stream;
13 
14 import core.time;
15 import std.algorithm;
16 import std.conv;
17 
18 
19 /**************************************************************************************************/
20 /* Public functions                                                                               */
21 /**************************************************************************************************/
22 
23 /** Pipes an InputStream directly into this OutputStream.
24 
25 	The number of bytes written is either the whole input stream when `nbytes == 0`, or exactly
26 	`nbytes` for `nbytes > 0`. If the input stream contains less than `nbytes` of data, an
27 	exception is thrown.
28 */
29 void pipe(IS : InputStream, OS : OutputStream)(IS source, OS sink, ulong nbytes = 0)
30 @safe {
31 	import vibe.internal.allocator : dispose, makeArray, theAllocator;
32 
33 	auto buffer = () @trusted { return theAllocator.makeArray!ubyte(64*1024); } ();
34 	scope (exit) () @trusted { theAllocator.dispose(buffer); } ();
35 
36 	//logTrace("default write %d bytes, empty=%s", nbytes, stream.empty);
37 	if (nbytes == 0 || nbytes == ulong.max) {
38 		while (!source.empty) {
39 			size_t chunk = min(source.leastSize, buffer.length);
40 			assert(chunk > 0, "leastSize returned zero for non-empty stream.");
41 			//logTrace("read pipe chunk %d", chunk);
42 			source.read(buffer[0 .. chunk], IOMode.all);
43 			sink.write(buffer[0 .. chunk], IOMode.all);
44 		}
45 	} else {
46 		while (nbytes > 0) {
47 			size_t chunk = min(nbytes, buffer.length);
48 			//logTrace("read pipe chunk %d", chunk);
49 			source.read(buffer[0 .. chunk], IOMode.all);
50 			sink.write(buffer[0 .. chunk], IOMode.all);
51 			nbytes -= chunk;
52 		}
53 	}
54 }
55 
56 /**
57 	Returns a `NullOutputStream` instance.
58 
59 	The instance will only be created on the first request and gets reused for
60 	all subsequent calls from the same thread.
61 */
62 NullOutputStream nullSink()
63 @safe {
64 	static NullOutputStream ret;
65 	if (!ret) ret = new NullOutputStream;
66 	return ret;
67 }
68 
69 /**************************************************************************************************/
70 /* Public types                                                                                   */
71 /**************************************************************************************************/
72 
73 /** Controls the waiting behavior of read/write operations.
74 
75 	Note that this is currently ignored for all device streams. Use the "vibe-core" package if you
76 	need this functionality.
77 */
78 enum IOMode {
79 	immediate, /// not supported
80 	once,      /// not supported
81 	all        /// Writes/reads the whole buffer
82 }
83 
84 /**
85 	Interface for all classes implementing readable streams.
86 */
87 interface InputStream {
88 @safe:
89 
90 	/** Returns true $(I iff) the end of the input stream has been reached.
91 	*/
92 	@property bool empty();
93 
94 	/**	(Scheduled for deprecation) Returns the maximum number of bytes that are known to remain in this stream until the
95 		end is reached.
96 
97 		After `leastSize()` bytes have been read, the stream will either have
98 		reached EOS and `empty()` returns `true`, or `leastSize()` returns again a number `> 0`.
99 	*/
100 	@property ulong leastSize();
101 
102 	/** (Scheduled for deprecation) Queries if there is data available for immediate, non-blocking read.
103 	*/
104 	@property bool dataAvailableForRead();
105 
106 	/** Returns a temporary reference to the data that is currently buffered.
107 
108 		The returned slice typically has the size `leastSize()` or `0` if
109 		`dataAvailableForRead()` returns false. Streams that don't have an
110 		internal buffer will always return an empty slice.
111 
112 		Note that any method invocation on the same stream potentially
113 		invalidates the contents of the returned buffer.
114 	*/
115 	const(ubyte)[] peek();
116 
117 	/**	Fills the preallocated array 'bytes' with data from the stream.
118 
119 		Throws: An exception if the operation reads past the end of the stream
120 	*/
121 	size_t read(scope ubyte[] dst, IOMode);
122 	/// ditto
123 	final void read(scope ubyte[] dst) { read(dst, IOMode.all); }
124 }
125 
126 /**
127 	Interface for all classes implementing writeable streams.
128 */
129 interface OutputStream {
130 @safe:
131 
132 	/** Writes an array of bytes to the stream.
133 	*/
134 	size_t write(in ubyte[] bytes, IOMode mode);
135 	/// ditto
136 	final void write(in ubyte[] bytes) { write(bytes, IOMode.all); }
137 	/// ditto
138 	final void write(in char[] bytes) { write(cast(const(ubyte)[])bytes); }
139 
140 	/** Flushes the stream and makes sure that all data is being written to the output device.
141 	*/
142 	void flush();
143 
144 	/** Flushes and finalizes the stream.
145 
146 		Finalize has to be called on certain types of streams. No writes are possible after a
147 		call to finalize().
148 	*/
149 	void finalize();
150 
151 	/** Pipes an InputStream directly into this OutputStream.
152 
153 		The number of bytes written is either the whole input stream when nbytes == 0, or exactly
154 		nbytes for nbytes > 0. If the input stream contains less than nbytes of data, an exception
155 		is thrown.
156 	*/
157 	deprecated("Use pipe(source, sink) instead.")
158 	final void write(InputStream stream, ulong nbytes = 0)
159 	{
160 		stream.pipe(this, nbytes);
161 	}
162 }
163 
164 /**
165 	Interface for all classes implementing readable and writable streams.
166 */
167 interface Stream : InputStream, OutputStream {
168 }
169 
170 
171 /**
172 	Interface for streams based on a connection.
173 
174 	Connection streams are based on streaming socket connections, pipes and
175 	similar end-to-end streams.
176 
177 	See_also: vibe.core.net.TCPConnection
178 */
179 interface ConnectionStream : Stream {
180 @safe:
181 
182 	/** Determines The current connection status.
183 
184 		If connected is false, writing to the connection will trigger an
185 		exception. Reading may still succeed as long as there is data left in
186 		the input buffer. Use InputStream.empty to determine when to stop
187 		reading.
188 	*/
189 	@property bool connected() const;
190 
191 	/** Actively closes the connection and frees associated resources.
192 
193 		Note that close must always be called, even if the remote has already
194 		closed the connection. Failure to do so will result in resource and
195 		memory leakage.
196 
197 		Closing a connection implies a call to finalize, so that it doesn't
198 		need to be called explicitly (it will be a no-op in that case).
199 	*/
200 	void close();
201 
202 	/** Blocks until data becomes available for read.
203 
204 		The maximum wait time can be customized with the `timeout` parameter.
205 		If there is already data availabe for read, or if the connection is
206 		closed, the function will return immediately without blocking.
207 
208 		Params:
209 			timeout = Optional timeout, the default value of `Duration.max`
210 				indicates an infinite timeout
211 
212 		Returns:
213 			The function will return `true` if data becomes available before the
214 			timeout is reached. If the connection gets closed, or the timeout
215 			gets reached, `false` is returned instead.
216 	*/
217 	bool waitForData(Duration timeout = Duration.max);
218 }
219 
220 
221 /**
222 	Interface for all streams supporting random access.
223 */
224 interface RandomAccessStream : Stream {
225 @safe:
226 
227 	/// Returns the total size of the file.
228 	@property ulong size() const nothrow;
229 
230 	/// Determines if this stream is readable.
231 	@property bool readable() const nothrow;
232 
233 	/// Determines if this stream is writable.
234 	@property bool writable() const nothrow;
235 
236 	/// Seeks to a specific position in the file if supported by the stream.
237 	void seek(ulong offset);
238 
239 	/// Returns the current offset of the file pointer
240 	ulong tell() nothrow;
241 }
242 
243 
244 /**
245 	Stream implementation acting as a sink with no function.
246 
247 	Any data written to the stream will be ignored and discarded. This stream type is useful if
248 	the output of a particular stream is not needed but the stream needs to be drained.
249 */
250 final class NullOutputStream : OutputStream {
251 	size_t write(in ubyte[] bytes, IOMode) { return bytes.length; }
252 	alias write = OutputStream.write;
253 	void flush() {}
254 	void finalize() {}
255 }
256 
257 
258 alias InputStreamProxy = InputStream;
259 alias OutputStreamProxy = OutputStream;
260 alias StreamProxy = Stream;
261 alias ConnectionStreamProxy = ConnectionStream;
262 alias RandomAccessStreamProxy = RandomAccessStream;
263 
264 enum isInputStream(T) = is(T : InputStream);
265 enum isOutputStream(T) = is(T : OutputStream);
266 enum isStream(T) = is(T : Stream);
267 enum isConnectionStream(T) = is(T : ConnectionStream);
268 enum isRandomAccessStream(T) = is(T : RandomAccessStream);
269 
270 mixin template validateInputStream(T) { static assert(isInputStream!T); }
271 mixin template validateOutputStream(T) { static assert(isOutputStream!T); }
272 mixin template validateStream(T) { static assert(isStream!T); }
273 mixin template validateConnectionStream(T) { static assert(isConnectionStream!T); }
274 mixin template validateRandomAccessStream(T) { static assert(isRandomAccessStream!T); }