1 /**
2 	Stream proxy and wrapper facilities.
3 
4 	Copyright: © 2013-2016 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.stream.wrapper;
9 
10 public import vibe.core.stream;
11 
12 import std.algorithm : min;
13 import std.exception;
14 import core.time;
15 import vibe.internal.interfaceproxy;
16 import vibe.internal.freelistref : FreeListRef;
17 
18 
19 ProxyStream createProxyStream(Stream)(Stream stream)
20 	if (isStream!Stream)
21 {
22 	return new ProxyStream(interfaceProxy!(.Stream)(stream), true);
23 }
24 
25 ProxyStream createProxyStream(InputStream, OutputStream)(InputStream input, OutputStream output)
26 	if (isInputStream!InputStream && isOutputStream!OutputStream)
27 {
28 	return new ProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), true);
29 }
30 
31 ConnectionProxyStream createConnectionProxyStream(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream)
32 	if (isStream!Stream && isConnectionStream!ConnectionStream)
33 {
34 	mixin validateStream!Stream;
35 	mixin validateConnectionStream!ConnectionStream;
36 	return new ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true);
37 }
38 
39 /// private
40 FreeListRef!ConnectionProxyStream createConnectionProxyStreamFL(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream)
41 	if (isStream!Stream && isConnectionStream!ConnectionStream)
42 {
43 	mixin validateStream!Stream;
44 	mixin validateConnectionStream!ConnectionStream;
45 	return FreeListRef!ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true);
46 }
47 
48 ConnectionProxyStream createConnectionProxyStream(InputStream, OutputStream, ConnectionStream)(InputStream input, OutputStream output, ConnectionStream connection_stream)
49 	if (isInputStream!InputStream && isOutputStream!OutputStream && isConnectionStream!ConnectionStream)
50 {
51 	return new ConnectionProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), interfaceProxy!(.ConnectionStream)(connection_stream), true);
52 }
53 
54 
55 /**
56 	Provides a way to access varying streams using a constant stream reference.
57 */
58 class ProxyStream : Stream {
59 @safe:
60 	private {
61 		InterfaceProxy!(.InputStream) m_input;
62 		InterfaceProxy!(.OutputStream) m_output;
63 		InterfaceProxy!(.Stream) m_underlying;
64 	}
65 
66 	deprecated("Use createProxyStream instead.")
67 	this(Stream stream = null)
68 	{
69 		m_underlying = interfaceProxy!Stream(stream);
70 		m_input = interfaceProxy!InputStream(stream);
71 		m_output = interfaceProxy!OutputStream(stream);
72 	}
73 
74 	deprecated("Use createProxyStream instead.")
75 	this(InputStream input, OutputStream output)
76 	{
77 		m_input = interfaceProxy!InputStream(input);
78 		m_output = interfaceProxy!OutputStream(output);
79 	}
80 
81 	/// private
82 	this(InterfaceProxy!Stream stream, bool dummy)
83 	{
84 		m_underlying = stream;
85 		m_input = stream;
86 		m_output = stream;
87 	}
88 
89 	/// private
90 	this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, bool dummy)
91 	{
92 		m_input = input;
93 		m_output = output;
94 	}
95 
96 	/// The stream that is wrapped by this one
97 	@property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; }
98 	/// ditto
99 	@property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; }
100 	/// ditto
101 	static if (!is(Stream == InterfaceProxy!Stream))
102 		@property void underlying(Stream value) { this.underlying = interfaceProxy!Stream(value); }
103 
104 	@property bool empty() { return m_input ? m_input.empty : true; }
105 
106 	@property ulong leastSize() { return m_input ? m_input.leastSize : 0; }
107 
108 	@property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; }
109 
110 	const(ubyte)[] peek() { return m_input.peek(); }
111 
112 	size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); }
113 
114 	alias read = Stream.read;
115 
116 	size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); }
117 
118 	alias write = Stream.write;
119 
120 	void flush() { m_output.flush(); }
121 
122 	void finalize() { m_output.finalize(); }
123 }
124 
125 
126 /**
127 	Special kind of proxy stream for streams nested in a ConnectionStream.
128 
129 	This stream will forward all stream operations to the selected stream,
130 	but will forward all connection related operations to the given
131 	ConnectionStream. This allows wrapping embedded streams, such as
132 	SSL streams in a ConnectionStream.
133 */
134 class ConnectionProxyStream : ConnectionStream {
135 @safe:
136 
137 	private {
138 		InterfaceProxy!ConnectionStream m_connection;
139 		InterfaceProxy!Stream m_underlying;
140 		InterfaceProxy!InputStream m_input;
141 		InterfaceProxy!OutputStream m_output;
142 	}
143 
144 	deprecated("Use createConnectionProxyStream instead.")
145 	this(Stream stream, ConnectionStream connection_stream)
146 	{
147 		this(interfaceProxy!Stream(stream), interfaceProxy!ConnectionStream(connection_stream), true);
148 	}
149 
150 	deprecated("Use createConnectionProxyStream instead.")
151 	this(InputStream input, OutputStream output, ConnectionStream connection_stream)
152 	{
153 		this(interfaceProxy!InputStream(input), interfaceProxy!OutputStream(output), interfaceProxy!ConnectionStream(connection_stream), true);
154 	}
155 
156 	/// private
157 	this(InterfaceProxy!Stream stream, InterfaceProxy!ConnectionStream connection_stream, bool dummy)
158 	{
159 		assert(!!stream);
160 		m_underlying = stream;
161 		m_input = stream;
162 		m_output = stream;
163 		m_connection = connection_stream;
164 	}
165 
166 	/// private
167 	this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, InterfaceProxy!ConnectionStream connection_stream, bool dummy)
168 	{
169 		m_input = input;
170 		m_output = output;
171 		m_connection = connection_stream;
172 	}
173 
174 	@property bool connected()
175 	const {
176 		if (!m_connection)
177 			return true;
178 
179 		return m_connection.connected;
180 	}
181 
182 	void close()
183 	{
184 		if (!m_connection)
185 			return;
186 
187 		if (m_connection.connected) finalize();
188 		m_connection.close();
189 	}
190 
191 	bool waitForData(Duration timeout = 0.seconds)
192 	{
193 		if (this.dataAvailableForRead) return true;
194 
195 		if (!m_connection)
196 			return timeout == 0.seconds ? !this.empty : false;
197 
198 		return m_connection.waitForData(timeout);
199 	}
200 
201 	/// The stream that is wrapped by this one
202 	@property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; }
203 	/// ditto
204 	@property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; }
205 	/// ditto
206 	static if (!is(Stream == InterfaceProxy!Stream))
207 		@property void underlying(Stream value) { this.underlying = InterfaceProxy!Stream(value); }
208 
209 	@property bool empty() { return m_input ? m_input.empty : true; }
210 
211 	@property ulong leastSize() { return m_input ? m_input.leastSize : 0; }
212 
213 	@property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; }
214 
215 	const(ubyte)[] peek() { return m_input.peek(); }
216 
217 	size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); }
218 
219 	alias read = ConnectionStream.read;
220 
221 	size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); }
222 
223 	alias write = ConnectionStream.write;
224 
225 	void flush() { m_output.flush(); }
226 
227 	void finalize() { m_output.finalize(); }
228 }
229 
230 
231 /**
232 	Implements an input range interface on top of an InputStream using an
233 	internal buffer.
234 
235 	The buffer is GC allocated and is filled chunk wise. Thus an InputStream
236 	that has been wrapped in a StreamInputRange cannot be used reliably on its
237 	own anymore.
238 
239 	Reading occurs in a fully lazy fashion. The first call to either front,
240 	popFront or empty will potentially trigger waiting for the next chunk of
241 	data to arrive - but especially popFront will not wait if it was called
242 	after a call to front. This property allows the range to be used in
243 	request-response scenarios.
244 */
245 deprecated("Use streamInputRange() instead.")
246 StreamInputRange!OutputStream StreamInputRange()(InputStream stream) { return StreamInputRange!InputStream(stream); }
247 /// ditto
248 struct StreamInputRange(InputStream, size_t buffer_size = 256)
249 	if (isInputStream!InputStream)
250 {
251 @safe:
252 	private {
253 		struct Buffer {
254 			ubyte[buffer_size] data = void;
255 			size_t fill = 0;
256 		}
257 		InputStream m_stream;
258 		Buffer* m_buffer;
259 	}
260 
261 	private this(InputStream stream)
262 	{
263 		m_stream = stream;
264 		m_buffer = new Buffer;
265 	}
266 
267 	@property bool empty() { return !m_buffer.fill && m_stream.empty; }
268 
269 	ubyte front()
270 	{
271 		if (m_buffer.fill < 1) readChunk();
272 		return m_buffer.data[$ - m_buffer.fill];
273 	}
274 	void popFront()
275 	{
276 		assert(!empty);
277 		if (m_buffer.fill < 1) readChunk();
278 		m_buffer.fill--;
279 	}
280 
281 	private void readChunk()
282 	{
283 		auto sz = min(m_stream.leastSize, m_buffer.data.length);
284 		assert(sz > 0);
285 		m_stream.read(m_buffer.data[$-sz .. $]);
286 		m_buffer.fill = sz;
287 	}
288 }
289 /// ditto
290 auto streamInputRange(size_t buffer_size = 256, InputStream)(InputStream stream)
291 	if (isInputStream!InputStream)
292 {
293 	return StreamInputRange!(InputStream, buffer_size)(stream);
294 }
295 
296 
297 /**
298 	Implements a buffered output range interface on top of an OutputStream.
299 */
300 deprecated("Use streamOutputRange() instead.")
301 StreamOutputRange!OutputStream StreamOutputRange()(OutputStream stream) { return StreamOutputRange!OutputStream(stream); }
302 /// ditto
303 struct StreamOutputRange(OutputStream, size_t buffer_size = 256)
304 	if (isOutputStream!OutputStream)
305 {
306 @safe:
307 
308 	private {
309 		OutputStream m_stream;
310 		size_t m_fill = 0;
311 		ubyte[buffer_size] m_data = void;
312 		bool m_flushInDestructor = true;
313 	}
314 
315 	@disable this(this);
316 
317 	/// private
318 	this(OutputStream stream)
319 	{
320 		m_stream = stream;
321 	}
322 
323 	~this()
324 	{
325 		if (m_flushInDestructor) {
326 			scope (failure) () @trusted { destroy(m_stream); }(); // workaround for #2484
327 			flush();
328 		}
329 	}
330 
331 	void flush()
332 	{
333 		if (m_fill == 0) return;
334 		writeToStream(m_data[0 .. m_fill]);
335 		m_fill = 0;
336 	}
337 
338 	void drop()
339 	{
340 		m_fill = 0;
341 	}
342 
343 	void put(ubyte bt)
344 	{
345 		m_data[m_fill++] = bt;
346 		if (m_fill >= m_data.length) flush();
347 	}
348 
349 	void put(const(ubyte)[] bts)
350 	{
351 		// avoid writing more chunks than necessary
352 		if (bts.length + m_fill >= m_data.length * 2) {
353 			flush();
354 			writeToStream(bts);
355 			return;
356 		}
357 
358 		while (bts.length) {
359 			auto len = min(m_data.length - m_fill, bts.length);
360 			m_data[m_fill .. m_fill + len] = bts[0 .. len];
361 			m_fill += len;
362 			bts = bts[len .. $];
363 			if (m_fill >= m_data.length) flush();
364 		}
365 	}
366 
367 	void put(char elem) { put(cast(ubyte)elem); }
368 	void put(const(char)[] elems) { put(cast(const(ubyte)[])elems); }
369 
370 	void put(dchar elem)
371 	{
372 		import std.utf;
373 		char[4] chars;
374 		auto len = encode(chars, elem);
375 		put(chars[0 .. len]);
376 	}
377 
378 	void put(const(dchar)[] elems) { foreach( ch; elems ) put(ch); }
379 
380 	private void writeToStream(in ubyte[] bytes)
381 	{
382 		// if the write fails, do not attempt another write in the destructor
383 		// to avoid throwing an exception twice or nested
384 		m_flushInDestructor = false;
385 		m_stream.write(bytes);
386 		m_flushInDestructor = true;
387 	}
388 }
389 /// ditto
390 auto streamOutputRange(size_t buffer_size = 256, OutputStream)(OutputStream stream)
391 	if (isOutputStream!OutputStream)
392 {
393 	return StreamOutputRange!(OutputStream, buffer_size)(stream);
394 }
395 
396 unittest {
397 	static long writeLength(ARGS...)(ARGS args) {
398 		import vibe.stream.memory;
399 		auto dst = createMemoryOutputStream;
400 		{
401 			auto rng = streamOutputRange(dst);
402 			foreach (a; args) rng.put(a);
403 		}
404 		return dst.data.length;
405 	}
406 	assert(writeLength("hello", ' ', "world") == "hello world".length);
407 	assert(writeLength("h\u00E4llo", ' ', "world") == "h\u00E4llo world".length);
408 	assert(writeLength("hello", '\u00E4', "world") == "hello\u00E4world".length);
409 	assert(writeLength("h\u1000llo", '\u1000', "world") == "h\u1000llo\u1000world".length);
410 	auto test = "häl";
411 	assert(test.length == 4);
412 	assert(writeLength(test[0], test[1], test[2], test[3]) == test.length);
413 }
414 
415 unittest {
416 	static struct ThrowOutputStream {
417 		@safe:
418 		size_t write(in ubyte[] bytes, IOMode mode) @blocking { throw new Exception("Write failed."); }
419 		void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); }
420 		void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
421 		void flush() @blocking {}
422 		void finalize() @blocking {}
423 	}
424 	mixin validateOutputStream!ThrowOutputStream;
425 
426 	ThrowOutputStream str;
427 
428 	assertThrown!Exception(() {
429 		auto r = streamOutputRange(str);
430 		// too few bytes to auto-flush
431 		assertNotThrown!Exception(r.put("test"));
432 	} ());
433 
434 	try {
435 		auto r = streamOutputRange(str);
436 		// too few bytes to auto-flush
437 		assertNotThrown!Exception(r.put("test"));
438 		assertThrown!Exception(r.flush());
439 		assertThrown!Exception(r.flush());
440 	} catch (Exception e) assert(false, "Descructor has thrown redundant exception");
441 }