1 /**
2 	Stream proxy and wrapper facilities.
3 
4 	Copyright: © 2013-2016 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.stream.wrapper;
9 
10 public import vibe.core.stream;
11 
12 import std.algorithm : min;
13 import std.exception;
14 import core.time;
15 import vibe.internal.interfaceproxy;
16 import vibe.internal.freelistref : FreeListRef;
17 
18 
19 ProxyStream createProxyStream(Stream)(Stream stream)
20 	if (isStream!Stream)
21 {
22 	return new ProxyStream(interfaceProxy!(.Stream)(stream), true);
23 }
24 
25 ProxyStream createProxyStream(InputStream, OutputStream)(InputStream input, OutputStream output)
26 	if (isInputStream!InputStream && isOutputStream!OutputStream)
27 {
28 	return new ProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), true);
29 }
30 
31 ConnectionProxyStream createConnectionProxyStream(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream)
32 	if (isStream!Stream && isConnectionStream!ConnectionStream)
33 {
34 	mixin validateStream!Stream;
35 	mixin validateConnectionStream!ConnectionStream;
36 	return new ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true);
37 }
38 
39 /// private
40 FreeListRef!ConnectionProxyStream createConnectionProxyStreamFL(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream)
41 	if (isStream!Stream && isConnectionStream!ConnectionStream)
42 {
43 	mixin validateStream!Stream;
44 	mixin validateConnectionStream!ConnectionStream;
45 	return FreeListRef!ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true);
46 }
47 
48 ConnectionProxyStream createConnectionProxyStream(InputStream, OutputStream, ConnectionStream)(InputStream input, OutputStream output, ConnectionStream connection_stream)
49 	if (isInputStream!InputStream && isOutputStream!OutputStream && isConnectionStream!ConnectionStream)
50 {
51 	return new ConnectionProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), interfaceProxy!(.ConnectionStream)(connection_stream), true);
52 }
53 
54 
55 /**
56 	Provides a way to access varying streams using a constant stream reference.
57 */
58 class ProxyStream : Stream {
59 @safe:
60 	private {
61 		InterfaceProxy!(.InputStream) m_input;
62 		InterfaceProxy!(.OutputStream) m_output;
63 		InterfaceProxy!(.Stream) m_underlying;
64 	}
65 
66 	/// private
67 	this(InterfaceProxy!Stream stream, bool dummy)
68 	{
69 		m_underlying = stream;
70 		m_input = stream;
71 		m_output = stream;
72 	}
73 
74 	/// private
75 	this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, bool dummy)
76 	{
77 		m_input = input;
78 		m_output = output;
79 	}
80 
81 	/// The stream that is wrapped by this one
82 	@property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; }
83 	/// ditto
84 	@property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; }
85 	/// ditto
86 	static if (!is(Stream == InterfaceProxy!Stream))
87 		@property void underlying(Stream value) { this.underlying = interfaceProxy!Stream(value); }
88 
89 	@property bool empty() { return m_input ? m_input.empty : true; }
90 
91 	@property ulong leastSize() { return m_input ? m_input.leastSize : 0; }
92 
93 	@property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; }
94 
95 	const(ubyte)[] peek() { return m_input.peek(); }
96 
97 	size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); }
98 
99 	alias read = Stream.read;
100 
101 	static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) {
102 		override size_t write(scope const(ubyte)[] bytes, IOMode mode) { return m_output.write(bytes, mode); }
103 	} else {
104 		override size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); }
105 	}
106 
107 	alias write = Stream.write;
108 
109 	void flush() { m_output.flush(); }
110 
111 	void finalize() { m_output.finalize(); }
112 }
113 
114 
115 /**
116 	Special kind of proxy stream for streams nested in a ConnectionStream.
117 
118 	This stream will forward all stream operations to the selected stream,
119 	but will forward all connection related operations to the given
120 	ConnectionStream. This allows wrapping embedded streams, such as
121 	SSL streams in a ConnectionStream.
122 */
123 class ConnectionProxyStream : ConnectionStream {
124 @safe:
125 
126 	private {
127 		InterfaceProxy!ConnectionStream m_connection;
128 		InterfaceProxy!Stream m_underlying;
129 		InterfaceProxy!InputStream m_input;
130 		InterfaceProxy!OutputStream m_output;
131 	}
132 
133 	/// private
134 	this(InterfaceProxy!Stream stream, InterfaceProxy!ConnectionStream connection_stream, bool dummy)
135 	{
136 		assert(!!stream);
137 		m_underlying = stream;
138 		m_input = stream;
139 		m_output = stream;
140 		m_connection = connection_stream;
141 	}
142 
143 	/// private
144 	this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, InterfaceProxy!ConnectionStream connection_stream, bool dummy)
145 	{
146 		m_input = input;
147 		m_output = output;
148 		m_connection = connection_stream;
149 	}
150 
151 	@property bool connected()
152 	const {
153 		if (!m_connection)
154 			return true;
155 
156 		return m_connection.connected;
157 	}
158 
159 	void close()
160 	{
161 		if (!m_connection)
162 			return;
163 
164 		if (m_connection.connected) finalize();
165 		m_connection.close();
166 	}
167 
168 	bool waitForData(Duration timeout = 0.seconds)
169 	{
170 		if (this.dataAvailableForRead) return true;
171 
172 		if (!m_connection)
173 			return timeout == 0.seconds ? !this.empty : false;
174 
175 		return m_connection.waitForData(timeout);
176 	}
177 
178 	/// The stream that is wrapped by this one
179 	@property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; }
180 	/// ditto
181 	@property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; }
182 	/// ditto
183 	static if (!is(Stream == InterfaceProxy!Stream))
184 		@property void underlying(Stream value) { this.underlying = InterfaceProxy!Stream(value); }
185 
186 	@property bool empty() { return m_input ? m_input.empty : true; }
187 
188 	@property ulong leastSize() { return m_input ? m_input.leastSize : 0; }
189 
190 	@property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; }
191 
192 	const(ubyte)[] peek() { return m_input.peek(); }
193 
194 	size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); }
195 
196 	alias read = ConnectionStream.read;
197 
198 	static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) {
199 		size_t write(scope const(ubyte)[] bytes, IOMode mode) { return m_output.write(bytes, mode); }
200 	} else {
201 		size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); }
202 	}
203 
204 	alias write = ConnectionStream.write;
205 
206 	void flush() { m_output.flush(); }
207 
208 	void finalize() { m_output.finalize(); }
209 }
210 
211 
212 /**
213 	Implements an input range interface on top of an InputStream using an
214 	internal buffer.
215 
216 	The buffer is GC allocated and is filled chunk wise. Thus an InputStream
217 	that has been wrapped in a StreamInputRange cannot be used reliably on its
218 	own anymore.
219 
220 	Reading occurs in a fully lazy fashion. The first call to either front,
221 	popFront or empty will potentially trigger waiting for the next chunk of
222 	data to arrive - but especially popFront will not wait if it was called
223 	after a call to front. This property allows the range to be used in
224 	request-response scenarios.
225 */
226 struct StreamInputRange(InputStream, size_t buffer_size = 256)
227 	if (isInputStream!InputStream)
228 {
229 @safe:
230 	private {
231 		struct Buffer {
232 			ubyte[buffer_size] data = void;
233 			size_t fill = 0;
234 		}
235 		InputStream m_stream;
236 		Buffer* m_buffer;
237 	}
238 
239 	private this(InputStream stream)
240 	{
241 		m_stream = stream;
242 		m_buffer = new Buffer;
243 	}
244 
245 	@property bool empty() { return !m_buffer.fill && m_stream.empty; }
246 
247 	ubyte front()
248 	{
249 		if (m_buffer.fill < 1) readChunk();
250 		return m_buffer.data[$ - m_buffer.fill];
251 	}
252 	void popFront()
253 	{
254 		assert(!empty);
255 		if (m_buffer.fill < 1) readChunk();
256 		m_buffer.fill--;
257 	}
258 
259 	private void readChunk()
260 	{
261 		auto sz = min(m_stream.leastSize, m_buffer.data.length);
262 		assert(sz > 0);
263 		m_stream.read(m_buffer.data[$-sz .. $]);
264 		m_buffer.fill = sz;
265 	}
266 }
267 /// ditto
268 auto streamInputRange(size_t buffer_size = 256, InputStream)(InputStream stream)
269 	if (isInputStream!InputStream)
270 {
271 	return StreamInputRange!(InputStream, buffer_size)(stream);
272 }
273 
274 
275 /**
276 	Implements a buffered output range interface on top of an OutputStream.
277 */
278 struct StreamOutputRange(OutputStream, size_t buffer_size = 256)
279 	if (isOutputStream!OutputStream)
280 {
281 @safe:
282 
283 	private {
284 		OutputStream m_stream;
285 		size_t m_fill = 0;
286 		ubyte[buffer_size] m_data = void;
287 		bool m_flushInDestructor = true;
288 	}
289 
290 	@disable this(this);
291 
292 	/// private
293 	this(OutputStream stream)
294 	{
295 		m_stream = stream;
296 	}
297 
298 	~this()
299 	{
300 		if (m_flushInDestructor) {
301 			scope (failure) () @trusted { destroy(m_stream); }(); // workaround for #2484
302 			flush();
303 		}
304 	}
305 
306 	void flush()
307 	{
308 		if (m_fill == 0) return;
309 		writeToStream(m_data[0 .. m_fill]);
310 		m_fill = 0;
311 	}
312 
313 	void drop()
314 	{
315 		m_fill = 0;
316 	}
317 
318 	void put(ubyte bt)
319 	{
320 		m_data[m_fill++] = bt;
321 		if (m_fill >= m_data.length) flush();
322 	}
323 
324 	void put(const(ubyte)[] bts)
325 	{
326 		// avoid writing more chunks than necessary
327 		if (bts.length + m_fill >= m_data.length * 2) {
328 			flush();
329 			writeToStream(bts);
330 			return;
331 		}
332 
333 		while (bts.length) {
334 			auto len = min(m_data.length - m_fill, bts.length);
335 			m_data[m_fill .. m_fill + len] = bts[0 .. len];
336 			m_fill += len;
337 			bts = bts[len .. $];
338 			if (m_fill >= m_data.length) flush();
339 		}
340 	}
341 
342 	void put(char elem) { put(cast(ubyte)elem); }
343 	void put(const(char)[] elems) { put(cast(const(ubyte)[])elems); }
344 
345 	void put(dchar elem)
346 	{
347 		import std.utf;
348 		char[4] chars;
349 		auto len = encode(chars, elem);
350 		put(chars[0 .. len]);
351 	}
352 
353 	void put(const(dchar)[] elems) { foreach( ch; elems ) put(ch); }
354 
355 	private void writeToStream(scope const(ubyte)[] bytes)
356 	{
357 		// if the write fails, do not attempt another write in the destructor
358 		// to avoid throwing an exception twice or nested
359 		m_flushInDestructor = false;
360 		m_stream.write(bytes);
361 		m_flushInDestructor = true;
362 	}
363 }
364 /// ditto
365 auto streamOutputRange(size_t buffer_size = 256, OutputStream)(OutputStream stream)
366 	if (isOutputStream!OutputStream)
367 {
368 	return StreamOutputRange!(OutputStream, buffer_size)(stream);
369 }
370 
371 unittest {
372 	static long writeLength(ARGS...)(ARGS args) {
373 		import vibe.stream.memory;
374 		auto dst = createMemoryOutputStream;
375 		{
376 			auto rng = streamOutputRange(dst);
377 			foreach (a; args) rng.put(a);
378 		}
379 		return dst.data.length;
380 	}
381 	assert(writeLength("hello", ' ', "world") == "hello world".length);
382 	assert(writeLength("h\u00E4llo", ' ', "world") == "h\u00E4llo world".length);
383 	assert(writeLength("hello", '\u00E4', "world") == "hello\u00E4world".length);
384 	assert(writeLength("h\u1000llo", '\u1000', "world") == "h\u1000llo\u1000world".length);
385 	auto test = "häl";
386 	assert(test.length == 4);
387 	assert(writeLength(test[0], test[1], test[2], test[3]) == test.length);
388 }
389 
390 unittest {
391 	static struct ThrowOutputStream {
392 		@safe:
393 		size_t write(in ubyte[] bytes, IOMode mode) @blocking { throw new Exception("Write failed."); }
394 		void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); }
395 		void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
396 		void flush() @blocking {}
397 		void finalize() @blocking {}
398 	}
399 	mixin validateOutputStream!ThrowOutputStream;
400 
401 	ThrowOutputStream str;
402 
403 	assertThrown!Exception(() {
404 		auto r = streamOutputRange(str);
405 		// too few bytes to auto-flush
406 		assertNotThrown!Exception(r.put("test"));
407 	} ());
408 
409 	try {
410 		auto r = streamOutputRange(str);
411 		// too few bytes to auto-flush
412 		assertNotThrown!Exception(r.put("test"));
413 		assertThrown!Exception(r.flush());
414 		assertThrown!Exception(r.flush());
415 	} catch (Exception e) assert(false, "Descructor has thrown redundant exception");
416 }