1 /**
2 	Stream proxy and wrapper facilities.
3 
4 	Copyright: © 2013-2016 RejectedSoftware e.K.
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.stream.wrapper;
9 
10 public import vibe.core.stream;
11 
12 import std.algorithm : min;
13 import std.exception;
14 import core.time;
15 import vibe.internal.interfaceproxy;
16 import vibe.internal.freelistref : FreeListRef;
17 
18 
19 ProxyStream createProxyStream(Stream)(Stream stream)
20 	if (isStream!Stream)
21 {
22 	return new ProxyStream(interfaceProxy!(.Stream)(stream), true);
23 }
24 
25 ProxyStream createProxyStream(InputStream, OutputStream)(InputStream input, OutputStream output)
26 	if (isInputStream!InputStream && isOutputStream!OutputStream)
27 {
28 	return new ProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), true);
29 }
30 
31 ConnectionProxyStream createConnectionProxyStream(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream)
32 	if (isStream!Stream && isConnectionStream!ConnectionStream)
33 {
34 	mixin validateStream!Stream;
35 	mixin validateConnectionStream!ConnectionStream;
36 	return new ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true);
37 }
38 
39 /// private
40 FreeListRef!ConnectionProxyStream createConnectionProxyStreamFL(Stream, ConnectionStream)(Stream stream, ConnectionStream connection_stream)
41 	if (isStream!Stream && isConnectionStream!ConnectionStream)
42 {
43 	mixin validateStream!Stream;
44 	mixin validateConnectionStream!ConnectionStream;
45 	return FreeListRef!ConnectionProxyStream(interfaceProxy!(.Stream)(stream), interfaceProxy!(.ConnectionStream)(connection_stream), true);
46 }
47 
48 ConnectionProxyStream createConnectionProxyStream(InputStream, OutputStream, ConnectionStream)(InputStream input, OutputStream output, ConnectionStream connection_stream)
49 	if (isInputStream!InputStream && isOutputStream!OutputStream && isConnectionStream!ConnectionStream)
50 {
51 	return new ConnectionProxyStream(interfaceProxy!(.InputStream)(input), interfaceProxy!(.OutputStream)(output), interfaceProxy!(.ConnectionStream)(connection_stream), true);
52 }
53 
54 
55 /**
56 	Provides a way to access varying streams using a constant stream reference.
57 */
58 class ProxyStream : Stream {
59 @safe:
60 	private {
61 		InterfaceProxy!(.InputStream) m_input;
62 		InterfaceProxy!(.OutputStream) m_output;
63 		InterfaceProxy!(.Stream) m_underlying;
64 	}
65 
66 	deprecated("Use createProxyStream instead.")
67 	this(Stream stream = null)
68 	{
69 		m_underlying = interfaceProxy!Stream(stream);
70 		m_input = interfaceProxy!InputStream(stream);
71 		m_output = interfaceProxy!OutputStream(stream);
72 	}
73 
74 	deprecated("Use createProxyStream instead.")
75 	this(InputStream input, OutputStream output)
76 	{
77 		m_input = interfaceProxy!InputStream(input);
78 		m_output = interfaceProxy!OutputStream(output);
79 	}
80 
81 	/// private
82 	this(InterfaceProxy!Stream stream, bool dummy)
83 	{
84 		m_underlying = stream;
85 		m_input = stream;
86 		m_output = stream;
87 	}
88 
89 	/// private
90 	this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, bool dummy)
91 	{
92 		m_input = input;
93 		m_output = output;
94 	}
95 
96 	/// The stream that is wrapped by this one
97 	@property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; }
98 	/// ditto
99 	@property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; }
100 	/// ditto
101 	static if (!is(Stream == InterfaceProxy!Stream))
102 		@property void underlying(Stream value) { this.underlying = interfaceProxy!Stream(value); }
103 
104 	@property bool empty() { return m_input ? m_input.empty : true; }
105 
106 	@property ulong leastSize() { return m_input ? m_input.leastSize : 0; }
107 
108 	@property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; }
109 
110 	const(ubyte)[] peek() { return m_input.peek(); }
111 
112 	size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); }
113 
114 	alias read = Stream.read;
115 
116 	size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); }
117 
118 	alias write = Stream.write;
119 
120 	void flush() { m_output.flush(); }
121 
122 	void finalize() { m_output.finalize(); }
123 }
124 
125 
126 /**
127 	Special kind of proxy stream for streams nested in a ConnectionStream.
128 
129 	This stream will forward all stream operations to the selected stream,
130 	but will forward all connection related operations to the given
131 	ConnectionStream. This allows wrapping embedded streams, such as
132 	SSL streams in a ConnectionStream.
133 */
134 class ConnectionProxyStream : ConnectionStream {
135 @safe:
136 
137 	private {
138 		InterfaceProxy!ConnectionStream m_connection;
139 		InterfaceProxy!Stream m_underlying;
140 		InterfaceProxy!InputStream m_input;
141 		InterfaceProxy!OutputStream m_output;
142 	}
143 
144 	deprecated("Use createConnectionProxyStream instead.")
145 	this(Stream stream, ConnectionStream connection_stream)
146 	{
147 		this(interfaceProxy!Stream(stream), interfaceProxy!ConnectionStream(connection_stream), true);
148 	}
149 
150 	deprecated("Use createConnectionProxyStream instead.")
151 	this(InputStream input, OutputStream output, ConnectionStream connection_stream)
152 	{
153 		this(interfaceProxy!InputStream(input), interfaceProxy!OutputStream(output), interfaceProxy!ConnectionStream(connection_stream), true);
154 	}
155 
156 	/// private
157 	this(InterfaceProxy!Stream stream, InterfaceProxy!ConnectionStream connection_stream, bool dummy)
158 	{
159 		assert(!!stream);
160 		m_underlying = stream;
161 		m_input = stream;
162 		m_output = stream;
163 		m_connection = connection_stream;
164 	}
165 
166 	/// private
167 	this(InterfaceProxy!InputStream input, InterfaceProxy!OutputStream output, InterfaceProxy!ConnectionStream connection_stream, bool dummy)
168 	{
169 		m_input = input;
170 		m_output = output;
171 		m_connection = connection_stream;
172 	}
173 
174 	@property bool connected()
175 	const {
176 		if (!m_connection)
177 			return true;
178 
179 		return m_connection.connected;
180 	}
181 
182 	void close()
183 	{
184 		if (!m_connection)
185 			return;
186 
187 		if (m_connection.connected) finalize();
188 		m_connection.close();
189 	}
190 
191 	bool waitForData(Duration timeout = 0.seconds)
192 	{
193 		if (this.dataAvailableForRead) return true;
194 
195 		if (!m_connection)
196 			return timeout == 0.seconds ? !this.empty : false;
197 
198 		return m_connection.waitForData(timeout);
199 	}
200 
201 	/// The stream that is wrapped by this one
202 	@property inout(InterfaceProxy!Stream) underlying() inout { return m_underlying; }
203 	/// ditto
204 	@property void underlying(InterfaceProxy!Stream value) { m_underlying = value; m_input = value; m_output = value; }
205 	/// ditto
206 	static if (!is(Stream == InterfaceProxy!Stream))
207 		@property void underlying(Stream value) { this.underlying = InterfaceProxy!Stream(value); }
208 
209 	@property bool empty() { return m_input ? m_input.empty : true; }
210 
211 	@property ulong leastSize() { return m_input ? m_input.leastSize : 0; }
212 
213 	@property bool dataAvailableForRead() { return m_input ? m_input.dataAvailableForRead : false; }
214 
215 	const(ubyte)[] peek() { return m_input.peek(); }
216 
217 	size_t read(scope ubyte[] dst, IOMode mode) { return m_input.read(dst, mode); }
218 
219 	alias read = ConnectionStream.read;
220 
221 	size_t write(in ubyte[] bytes, IOMode mode) { return m_output.write(bytes, mode); }
222 
223 	alias write = ConnectionStream.write;
224 
225 	void flush() { m_output.flush(); }
226 
227 	void finalize() { m_output.finalize(); }
228 }
229 
230 
231 /**
232 	Implements an input range interface on top of an InputStream using an
233 	internal buffer.
234 
235 	The buffer is GC allocated and is filled chunk wise. Thus an InputStream
236 	that has been wrapped in a StreamInputRange cannot be used reliably on its
237 	own anymore.
238 
239 	Reading occurs in a fully lazy fashion. The first call to either front,
240 	popFront or empty will potentially trigger waiting for the next chunk of
241 	data to arrive - but especially popFront will not wait if it was called
242 	after a call to front. This property allows the range to be used in
243 	request-response scenarios.
244 */
245 struct StreamInputRange {
246 @safe:
247 
248 	private {
249 		struct Buffer {
250 			ubyte[256] data = void;
251 			size_t fill = 0;
252 		}
253 		InputStream m_stream;
254 		Buffer* m_buffer;
255 	}
256 
257 	this (InputStream stream)
258 	{
259 		m_stream = stream;
260 		m_buffer = new Buffer;
261 	}
262 
263 	@property bool empty() { return !m_buffer.fill && m_stream.empty; }
264 
265 	ubyte front()
266 	{
267 		if (m_buffer.fill < 1) readChunk();
268 		return m_buffer.data[$ - m_buffer.fill];
269 	}
270 	void popFront()
271 	{
272 		assert(!empty);
273 		if (m_buffer.fill < 1) readChunk();
274 		m_buffer.fill--;
275 	}
276 
277 	private void readChunk()
278 	{
279 		auto sz = min(m_stream.leastSize, m_buffer.data.length);
280 		assert(sz > 0);
281 		m_stream.read(m_buffer.data[$-sz .. $]);
282 		m_buffer.fill = sz;
283 	}
284 }
285 
286 
287 /**
288 	Implements a buffered output range interface on top of an OutputStream.
289 */
290 StreamOutputRange!OutputStream StreamOutputRange()(OutputStream stream) { return StreamOutputRange!OutputStream(stream); }
291 /// ditto
292 struct StreamOutputRange(OutputStream, size_t buffer_size = 256)
293 	if (isOutputStream!OutputStream)
294 {
295 @safe:
296 
297 	private {
298 		OutputStream m_stream;
299 		size_t m_fill = 0;
300 		ubyte[buffer_size] m_data = void;
301 	}
302 
303 	@disable this(this);
304 
305 	this(OutputStream stream)
306 	{
307 		m_stream = stream;
308 	}
309 
310 	~this()
311 	{
312 		flush();
313 	}
314 
315 	void flush()
316 	{
317 		if (m_fill == 0) return;
318 		m_stream.write(m_data[0 .. m_fill]);
319 		m_fill = 0;
320 	}
321 
322 	void drop()
323 	{
324 		m_fill = 0;
325 	}
326 
327 	void put(ubyte bt)
328 	{
329 		m_data[m_fill++] = bt;
330 		if (m_fill >= m_data.length) flush();
331 	}
332 
333 	void put(const(ubyte)[] bts)
334 	{
335 		// avoid writing more chunks than necessary
336 		if (bts.length + m_fill >= m_data.length * 2) {
337 			flush();
338 			m_stream.write(bts);
339 			return;
340 		}
341 
342 		while (bts.length) {
343 			auto len = min(m_data.length - m_fill, bts.length);
344 			m_data[m_fill .. m_fill + len] = bts[0 .. len];
345 			m_fill += len;
346 			bts = bts[len .. $];
347 			if (m_fill >= m_data.length) flush();
348 		}
349 	}
350 
351 	void put(char elem) { put(cast(ubyte)elem); }
352 	void put(const(char)[] elems) { put(cast(const(ubyte)[])elems); }
353 
354 	void put(dchar elem)
355 	{
356 		import std.utf;
357 		char[4] chars;
358 		auto len = encode(chars, elem);
359 		put(chars[0 .. len]);
360 	}
361 
362 	void put(const(dchar)[] elems) { foreach( ch; elems ) put(ch); }
363 }
364 /// ditto
365 auto streamOutputRange(size_t buffer_size = 256, OutputStream)(OutputStream stream)
366 	if (isOutputStream!OutputStream)
367 {
368 	return StreamOutputRange!(OutputStream, buffer_size)(stream);
369 }
370 
371 unittest {
372 	static long writeLength(ARGS...)(ARGS args) {
373 		import vibe.stream.memory;
374 		auto dst = createMemoryOutputStream;
375 		{
376 			auto rng = StreamOutputRange(dst);
377 			foreach (a; args) rng.put(a);
378 		}
379 		return dst.data.length;
380 	}
381 	assert(writeLength("hello", ' ', "world") == "hello world".length);
382 	assert(writeLength("h\u00E4llo", ' ', "world") == "h\u00E4llo world".length);
383 	assert(writeLength("hello", '\u00E4', "world") == "hello\u00E4world".length);
384 	assert(writeLength("h\u1000llo", '\u1000', "world") == "h\u1000llo\u1000world".length);
385 	auto test = "häl";
386 	assert(test.length == 4);
387 	assert(writeLength(test[0], test[1], test[2], test[3]) == test.length);
388 }