1 /**
2 	Stream proxy and wrapper facilities.
3 
4 	Copyright: © 2013-2016 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.stream.wrapper;
9 
10 public import vibe.core.stream;
11 
12 import std.algorithm : min;
13 import std.exception;
14 import core.time;
15 import vibe.internal.interfaceproxy;
16 import vibe.internal.freelistref : FreeListRef;
17 
18 
19 ProxyStream createProxyStream(Stream)(Stream stream)
20 	if (isStream!Stream)
21 {
22 	return new ProxyStream(interfaceProxy!(.Stream)(stream), true);
23 }
24 
25 ProxyStream createProxyStream(InputStream, OutputStream)(InputStream input, OutputStream output)
26 	if (isInputStream!InputStream && isOutputStream!OutputStream)
27 {
28 	return new ProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), true);
29 }
30 
31 ConnectionProxyStream createConnectionProxyStream(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream)
32 	if (isStream!Stream && isConnectionStream!ConnectionStream)
33 {
34 	mixin validateStream!Stream;
35 	mixin validateConnectionStream!ConnectionStream;
36 	return new ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true);
37 }
38 
39 /// private
40 FreeListRef!ConnectionProxyStream createConnectionProxyStreamFL(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream)
41 	if (isStream!Stream && isConnectionStream!ConnectionStream)
42 {
43 	mixin validateStream!Stream;
44 	mixin validateConnectionStream!ConnectionStream;
45 	return FreeListRef!ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true);
46 }
47 
48 ConnectionProxyStream createConnectionProxyStream(InputStream, OutputStream, ConnectionStream)(InputStream input, OutputStream output, ConnectionStream connection_stream)
49 	if (isInputStream!InputStream && isOutputStream!OutputStream && isConnectionStream!ConnectionStream)
50 {
51 	return new ConnectionProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), interfaceProxy!(.ConnectionStream)(connection_stream), true);
52 }
53 
54 
55 /**
56 	Provides a way to access varying streams using a constant stream reference.
57 */
58 class ProxyStream : Stream {
59 @safe:
60 	private {
61 		InterfaceProxy!(.InputStream) m_input;
62 		InterfaceProxy!(.OutputStream) m_output;
63 		InterfaceProxy!(.Stream) m_underlying;
64 	}
65 
66 	/// private
67 	this(InterfaceProxy!Stream stream, bool dummy)
68 	{
69 		m_underlying = stream;
70 		m_input = stream;
71 		m_output = stream;
72 	}
73 
74 	/// private
75 	this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, bool dummy)
76 	{
77 		m_input = input;
78 		m_output = output;
79 	}
80 
81 	/// The stream that is wrapped by this one
82 	@property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; }
83 	/// ditto
84 	@property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; }
85 	/// ditto
86 	static if (!is(Stream == InterfaceProxy!Stream))
87 		@property void underlying(Stream value) { this.underlying = interfaceProxy!Stream(value); }
88 
89 	@property bool empty() { return m_input ? m_input.empty : true; }
90 
91 	@property ulong leastSize() { return m_input ? m_input.leastSize : 0; }
92 
93 	@property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; }
94 
95 	const(ubyte)[] peek() { return m_input.peek(); }
96 
97 	size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); }
98 
99 	alias read = Stream.read;
100 
101 	size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); }
102 
103 	alias write = Stream.write;
104 
105 	void flush() { m_output.flush(); }
106 
107 	void finalize() { m_output.finalize(); }
108 }
109 
110 
111 /**
112 	Special kind of proxy stream for streams nested in a ConnectionStream.
113 
114 	This stream will forward all stream operations to the selected stream,
115 	but will forward all connection related operations to the given
116 	ConnectionStream. This allows wrapping embedded streams, such as
117 	SSL streams in a ConnectionStream.
118 */
119 class ConnectionProxyStream : ConnectionStream {
120 @safe:
121 
122 	private {
123 		InterfaceProxy!ConnectionStream m_connection;
124 		InterfaceProxy!Stream m_underlying;
125 		InterfaceProxy!InputStream m_input;
126 		InterfaceProxy!OutputStream m_output;
127 	}
128 
129 	/// private
130 	this(InterfaceProxy!Stream stream, InterfaceProxy!ConnectionStream connection_stream, bool dummy)
131 	{
132 		assert(!!stream);
133 		m_underlying = stream;
134 		m_input = stream;
135 		m_output = stream;
136 		m_connection = connection_stream;
137 	}
138 
139 	/// private
140 	this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, InterfaceProxy!ConnectionStream connection_stream, bool dummy)
141 	{
142 		m_input = input;
143 		m_output = output;
144 		m_connection = connection_stream;
145 	}
146 
147 	@property bool connected()
148 	const {
149 		if (!m_connection)
150 			return true;
151 
152 		return m_connection.connected;
153 	}
154 
155 	void close()
156 	{
157 		if (!m_connection)
158 			return;
159 
160 		if (m_connection.connected) finalize();
161 		m_connection.close();
162 	}
163 
164 	bool waitForData(Duration timeout = 0.seconds)
165 	{
166 		if (this.dataAvailableForRead) return true;
167 
168 		if (!m_connection)
169 			return timeout == 0.seconds ? !this.empty : false;
170 
171 		return m_connection.waitForData(timeout);
172 	}
173 
174 	/// The stream that is wrapped by this one
175 	@property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; }
176 	/// ditto
177 	@property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; }
178 	/// ditto
179 	static if (!is(Stream == InterfaceProxy!Stream))
180 		@property void underlying(Stream value) { this.underlying = InterfaceProxy!Stream(value); }
181 
182 	@property bool empty() { return m_input ? m_input.empty : true; }
183 
184 	@property ulong leastSize() { return m_input ? m_input.leastSize : 0; }
185 
186 	@property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; }
187 
188 	const(ubyte)[] peek() { return m_input.peek(); }
189 
190 	size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); }
191 
192 	alias read = ConnectionStream.read;
193 
194 	size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); }
195 
196 	alias write = ConnectionStream.write;
197 
198 	void flush() { m_output.flush(); }
199 
200 	void finalize() { m_output.finalize(); }
201 }
202 
203 
204 /**
205 	Implements an input range interface on top of an InputStream using an
206 	internal buffer.
207 
208 	The buffer is GC allocated and is filled chunk wise. Thus an InputStream
209 	that has been wrapped in a StreamInputRange cannot be used reliably on its
210 	own anymore.
211 
212 	Reading occurs in a fully lazy fashion. The first call to either front,
213 	popFront or empty will potentially trigger waiting for the next chunk of
214 	data to arrive - but especially popFront will not wait if it was called
215 	after a call to front. This property allows the range to be used in
216 	request-response scenarios.
217 */
218 struct StreamInputRange(InputStream, size_t buffer_size = 256)
219 	if (isInputStream!InputStream)
220 {
221 @safe:
222 	private {
223 		struct Buffer {
224 			ubyte[buffer_size] data = void;
225 			size_t fill = 0;
226 		}
227 		InputStream m_stream;
228 		Buffer* m_buffer;
229 	}
230 
231 	private this(InputStream stream)
232 	{
233 		m_stream = stream;
234 		m_buffer = new Buffer;
235 	}
236 
237 	@property bool empty() { return !m_buffer.fill && m_stream.empty; }
238 
239 	ubyte front()
240 	{
241 		if (m_buffer.fill < 1) readChunk();
242 		return m_buffer.data[$ - m_buffer.fill];
243 	}
244 	void popFront()
245 	{
246 		assert(!empty);
247 		if (m_buffer.fill < 1) readChunk();
248 		m_buffer.fill--;
249 	}
250 
251 	private void readChunk()
252 	{
253 		auto sz = min(m_stream.leastSize, m_buffer.data.length);
254 		assert(sz > 0);
255 		m_stream.read(m_buffer.data[$-sz .. $]);
256 		m_buffer.fill = sz;
257 	}
258 }
259 /// ditto
260 auto streamInputRange(size_t buffer_size = 256, InputStream)(InputStream stream)
261 	if (isInputStream!InputStream)
262 {
263 	return StreamInputRange!(InputStream, buffer_size)(stream);
264 }
265 
266 
267 /**
268 	Implements a buffered output range interface on top of an OutputStream.
269 */
270 struct StreamOutputRange(OutputStream, size_t buffer_size = 256)
271 	if (isOutputStream!OutputStream)
272 {
273 @safe:
274 
275 	private {
276 		OutputStream m_stream;
277 		size_t m_fill = 0;
278 		ubyte[buffer_size] m_data = void;
279 		bool m_flushInDestructor = true;
280 	}
281 
282 	@disable this(this);
283 
284 	/// private
285 	this(OutputStream stream)
286 	{
287 		m_stream = stream;
288 	}
289 
290 	~this()
291 	{
292 		if (m_flushInDestructor) {
293 			scope (failure) () @trusted { destroy(m_stream); }(); // workaround for #2484
294 			flush();
295 		}
296 	}
297 
298 	void flush()
299 	{
300 		if (m_fill == 0) return;
301 		writeToStream(m_data[0 .. m_fill]);
302 		m_fill = 0;
303 	}
304 
305 	void drop()
306 	{
307 		m_fill = 0;
308 	}
309 
310 	void put(ubyte bt)
311 	{
312 		m_data[m_fill++] = bt;
313 		if (m_fill >= m_data.length) flush();
314 	}
315 
316 	void put(const(ubyte)[] bts)
317 	{
318 		// avoid writing more chunks than necessary
319 		if (bts.length + m_fill >= m_data.length * 2) {
320 			flush();
321 			writeToStream(bts);
322 			return;
323 		}
324 
325 		while (bts.length) {
326 			auto len = min(m_data.length - m_fill, bts.length);
327 			m_data[m_fill .. m_fill + len] = bts[0 .. len];
328 			m_fill += len;
329 			bts = bts[len .. $];
330 			if (m_fill >= m_data.length) flush();
331 		}
332 	}
333 
334 	void put(char elem) { put(cast(ubyte)elem); }
335 	void put(const(char)[] elems) { put(cast(const(ubyte)[])elems); }
336 
337 	void put(dchar elem)
338 	{
339 		import std.utf;
340 		char[4] chars;
341 		auto len = encode(chars, elem);
342 		put(chars[0 .. len]);
343 	}
344 
345 	void put(const(dchar)[] elems) { foreach( ch; elems ) put(ch); }
346 
347 	private void writeToStream(in ubyte[] bytes)
348 	{
349 		// if the write fails, do not attempt another write in the destructor
350 		// to avoid throwing an exception twice or nested
351 		m_flushInDestructor = false;
352 		m_stream.write(bytes);
353 		m_flushInDestructor = true;
354 	}
355 }
356 /// ditto
357 auto streamOutputRange(size_t buffer_size = 256, OutputStream)(OutputStream stream)
358 	if (isOutputStream!OutputStream)
359 {
360 	return StreamOutputRange!(OutputStream, buffer_size)(stream);
361 }
362 
363 unittest {
364 	static long writeLength(ARGS...)(ARGS args) {
365 		import vibe.stream.memory;
366 		auto dst = createMemoryOutputStream;
367 		{
368 			auto rng = streamOutputRange(dst);
369 			foreach (a; args) rng.put(a);
370 		}
371 		return dst.data.length;
372 	}
373 	assert(writeLength("hello", ' ', "world") == "hello world".length);
374 	assert(writeLength("h\u00E4llo", ' ', "world") == "h\u00E4llo world".length);
375 	assert(writeLength("hello", '\u00E4', "world") == "hello\u00E4world".length);
376 	assert(writeLength("h\u1000llo", '\u1000', "world") == "h\u1000llo\u1000world".length);
377 	auto test = "häl";
378 	assert(test.length == 4);
379 	assert(writeLength(test[0], test[1], test[2], test[3]) == test.length);
380 }
381 
382 unittest {
383 	static struct ThrowOutputStream {
384 		@safe:
385 		size_t write(in ubyte[] bytes, IOMode mode) @blocking { throw new Exception("Write failed."); }
386 		void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); }
387 		void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
388 		void flush() @blocking {}
389 		void finalize() @blocking {}
390 	}
391 	mixin validateOutputStream!ThrowOutputStream;
392 
393 	ThrowOutputStream str;
394 
395 	assertThrown!Exception(() {
396 		auto r = streamOutputRange(str);
397 		// too few bytes to auto-flush
398 		assertNotThrown!Exception(r.put("test"));
399 	} ());
400 
401 	try {
402 		auto r = streamOutputRange(str);
403 		// too few bytes to auto-flush
404 		assertNotThrown!Exception(r.put("test"));
405 		assertThrown!Exception(r.flush());
406 		assertThrown!Exception(r.flush());
407 	} catch (Exception e) assert(false, "Descructor has thrown redundant exception");
408 }