1 /**
2 	Wrapper streams which count the number of bytes or limit the stream based on the number of
3 	transferred bytes.
4 
5 	Copyright: © 2012 Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 	Authors: Sönke Ludwig
8 */
9 module vibe.stream.counting;
10 
11 public import vibe.core.stream;
12 
13 import std.exception;
14 import vibe.internal.interfaceproxy;
15 import vibe.internal.freelistref : FreeListRef;
16 
17 
18 /** Constructs a limited stream from an existing input stream.
19 
20 	Params:
21 		stream = the input stream to be wrapped
22 		byte_limit = the maximum number of bytes readable from the constructed stream
23 		silent_limit = if set, the stream will behave exactly like the original stream, but
24 			will throw an exception as soon as the limit is reached.
25 */
26 LimitedInputStream createLimitedInputStream(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false)
27 	if (isInputStream!InputStream)
28 {
29 	return new LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true);
30 }
31 
32 /// private
33 FreeListRef!LimitedInputStream createLimitedInputStreamFL(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false)
34 	if (isInputStream!InputStream)
35 {
36 	return FreeListRef!LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true);
37 }
38 
39 /** Creates a proxy stream that counts the number of bytes written.
40 
41 	Params:
42 		output = The stream to forward the written data to
43 		byte_limit = Optional total write size limit after which an exception is thrown
44 */
45 CountingOutputStream createCountingOutputStream(OutputStream)(OutputStream output, ulong byte_limit = ulong.max)
46 	if (isOutputStream!OutputStream)
47 {
48 	return new CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true);
49 }
50 
51 /// private
52 FreeListRef!CountingOutputStream createCountingOutputStreamFL(OutputStream)(OutputStream output, ulong byte_limit = ulong.max)
53 	if (isOutputStream!OutputStream)
54 {
55 	return FreeListRef!CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true);
56 }
57 
58 
59 /** Creates a stream that fires a callback once the end of the underlying input stream is reached.
60 
61 	Params:
62 		input = Source stream to read from
63 		callback = The callback that is invoked one the source stream has been drained
64 */
65 EndCallbackInputStream createEndCallbackInputStream(InputStream)(InputStream input, void delegate() @safe callback)
66 	if (isInputStream!InputStream)
67 {
68 	return new EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true);
69 }
70 
71 /// private
72 FreeListRef!EndCallbackInputStream createEndCallbackInputStreamFL(InputStream)(InputStream input, void delegate() @safe callback)
73 	if (isInputStream!InputStream)
74 {
75 	return FreeListRef!EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true);
76 }
77 
78 
79 /**
80 	Wraps an existing stream, limiting the amount of data that can be read.
81 */
82 class LimitedInputStream : InputStream {
83 @safe:
84 
85 	private {
86 		InterfaceProxy!InputStream m_input;
87 		ulong m_sizeLimit;
88 		bool m_silentLimit;
89 	}
90 
91 	/// private
92 	this(InterfaceProxy!InputStream stream, ulong byte_limit, bool silent_limit, bool dummy)
93 	{
94 		assert(!!stream);
95 		m_input = stream;
96 		m_sizeLimit = byte_limit;
97 		m_silentLimit = silent_limit;
98 	}
99 
100 	/// The stream that is wrapped by this one
101 	@property inout(InterfaceProxy!InputStream) sourceStream() inout { return m_input; }
102 
103 	@property bool empty() { return m_silentLimit ? m_input.empty : (m_sizeLimit == 0); }
104 
105 	@property ulong leastSize() { if( m_silentLimit ) return m_input.leastSize; return m_sizeLimit; }
106 
107 	@property bool dataAvailableForRead() { return m_input.dataAvailableForRead; }
108 
109 	void increment(ulong bytes)
110 	{
111 		if( bytes > m_sizeLimit ) onSizeLimitReached();
112 		m_sizeLimit -= bytes;
113 	}
114 
115 	const(ubyte)[] peek() { return m_input.peek(); }
116 
117 	size_t read(scope ubyte[] dst, IOMode mode)
118 	{
119 		import std.algorithm: min;
120 
121 		if ((mode == IOMode.all || m_sizeLimit == 0) && dst.length > m_sizeLimit) onSizeLimitReached();
122 
123 		const validReadSize = min(dst.length, m_sizeLimit);
124 		auto ret = m_input.read(dst[0 .. validReadSize], mode);
125 		m_sizeLimit -= ret;
126 		return ret;
127 	}
128 
129 	alias read = InputStream.read;
130 
131 	protected void onSizeLimitReached() @safe {
132 		throw new LimitException("Size limit reached", m_sizeLimit);
133 	}
134 }
135 
136 unittest { // issue 2575
137 	import vibe.stream.memory : createMemoryStream;
138 	import std.exception : assertThrown;
139 
140 	auto buf = new ubyte[](1024);
141 	foreach (i, ref b; buf) b = cast(ubyte) i;
142 	auto input = createMemoryStream(buf, false);
143 
144 	// test IOMode.once and IOMode.immediate
145 	static foreach (bufferSize; [100, 128, 200])
146 	static foreach (ioMode; [IOMode.once, IOMode.immediate])
147 	{{
148 		input.seek(0);
149 		auto limitedStream = createLimitedInputStream(input, 128);
150 
151 		ubyte[] result;
152 
153 		ubyte[bufferSize] buffer;
154 		while (!limitedStream.empty) {
155 			const chunk = limitedStream.read(buffer[], ioMode);
156 			result ~= buffer[0 .. chunk];
157 		}
158 
159 		assert(result[] == buf[0 .. 128]);
160 		assertThrown(limitedStream.read(buffer[], ioMode));
161 	}}
162 
163 	// test IOMode.all normal operation
164 	{
165 		input.seek(0);
166 		auto limitedStream = createLimitedInputStream(input, 128);
167 
168 		ubyte[] result;
169 		ubyte[64] buffer;
170 		result ~= buffer[0 .. limitedStream.read(buffer[], IOMode.all)];
171 		result ~= buffer[0 .. limitedStream.read(buffer[], IOMode.all)];
172 		assert(limitedStream.empty);
173 		assert(result[] == buf[0 .. 128]);
174 		assertThrown(limitedStream.read(buffer[], IOMode.all));
175 	}
176 
177 	// test IOMode.all reading over size limit
178 	{
179 		input.seek(0);
180 		auto limitedStream = createLimitedInputStream(input, 128);
181 
182 		ubyte[256] buffer;
183 		assertThrown(limitedStream.read(buffer[], IOMode.all));
184 	}
185 }
186 
187 /**
188 	Wraps an existing output stream, counting the bytes that are written.
189 */
190 class CountingOutputStream : OutputStream {
191 @safe:
192 
193 	private {
194 		ulong m_bytesWritten;
195 		ulong m_writeLimit;
196 		InterfaceProxy!OutputStream m_out;
197 	}
198 
199 	/// private
200 	this(InterfaceProxy!OutputStream stream, ulong write_limit, bool dummy)
201 	{
202 		assert(!!stream);
203 		m_writeLimit = write_limit;
204 		m_out = stream;
205 	}
206 
207 	/// Returns the total number of bytes written.
208 	@property ulong bytesWritten() const { return m_bytesWritten; }
209 
210 	/// The maximum number of bytes to write
211 	@property ulong writeLimit() const { return m_writeLimit; }
212 	/// ditto
213 	@property void writeLimit(ulong value) { m_writeLimit = value; }
214 
215 	/** Manually increments the write counter without actually writing data.
216 	*/
217 	void increment(ulong bytes)
218 	{
219 		enforce(m_bytesWritten + bytes <= m_writeLimit, "Incrementing past end of output stream.");
220 		m_bytesWritten += bytes;
221 	}
222 
223 	static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) {
224 		override size_t write(scope const(ubyte)[] bytes_, IOMode mode) { return doWrite(bytes_, mode); }
225 	} else {
226 		override size_t write(in ubyte[] bytes_, IOMode mode) { return doWrite(bytes_, mode); }
227 	}
228 
229 	alias write = OutputStream.write;
230 
231 	private size_t doWrite(scope const(ubyte)[] bytes, IOMode mode)
232 	{
233 		enforce(m_bytesWritten + bytes.length <= m_writeLimit, "Writing past end of output stream.");
234 
235 		auto ret = m_out.write(bytes, mode);
236 		m_bytesWritten += ret;
237 		return ret;
238 	}
239 
240 	void flush() { m_out.flush(); }
241 	void finalize() { m_out.flush(); }
242 }
243 
244 
245 /**
246 	Wraps an existing input stream, counting the bytes that are written.
247 */
248 class CountingInputStream : InputStream {
249 @safe:
250 
251 	private {
252 		ulong m_bytesRead;
253 		InterfaceProxy!InputStream m_in;
254 	}
255 
256 	/// private
257 	this(InterfaceProxy!InputStream stream, bool dummy)
258 	{
259 		assert(!!stream);
260 		m_in = stream;
261 	}
262 
263 	@property ulong bytesRead() const { return m_bytesRead; }
264 
265 	@property bool empty() { return m_in.empty(); }
266 	@property ulong leastSize() { return m_in.leastSize();  }
267 	@property bool dataAvailableForRead() { return m_in.dataAvailableForRead; }
268 
269 	void increment(ulong bytes)
270 	{
271 		m_bytesRead += bytes;
272 	}
273 
274 	const(ubyte)[] peek() { return m_in.peek(); }
275 
276 	size_t read(scope ubyte[] dst, IOMode mode)
277 	{
278 		auto ret = m_in.read(dst, mode);
279 		m_bytesRead += ret;
280 		return ret;
281 	}
282 
283 	alias read = InputStream.read;
284 }
285 
286 /**
287 	Wraps an input stream and calls the given delegate once the stream is empty.
288 
289 	Note that this function will potentially block after each read operation to
290 	see if the end has already been reached - this may take as long until either
291 	new data has arrived or until the connection was closed.
292 
293 	The stream will also guarantee that the inner stream is not used after it
294 	has been determined to be empty. It can thus be safely deleted once the
295 	callback is invoked.
296 */
297 class EndCallbackInputStream : InputStream {
298 @safe:
299 
300 	private {
301 		InterfaceProxy!InputStream m_in;
302 		bool m_eof = false;
303 		void delegate() @safe m_callback;
304 	}
305 
306 	/// private
307 	this(InterfaceProxy!InputStream input, void delegate() @safe callback, bool dummy)
308 	{
309 		m_in = input;
310 		m_callback = callback;
311 		checkEOF();
312 	}
313 
314 	@property bool empty()
315 	{
316 		checkEOF();
317 		return !m_in;
318 	}
319 
320 	@property ulong leastSize()
321 	{
322 		checkEOF();
323 		if( m_in ) return m_in.leastSize();
324 		return 0;
325 	}
326 
327 	@property bool dataAvailableForRead()
328 	{
329 		if( !m_in ) return false;
330 		return m_in.dataAvailableForRead;
331 	}
332 
333 	const(ubyte)[] peek()
334 	{
335 		if( !m_in ) return null;
336 		return m_in.peek();
337 	}
338 
339 	size_t read(scope ubyte[] dst, IOMode mode)
340 	{
341 		enforce(!!m_in, "Reading past end of stream.");
342 		auto ret = m_in.read(dst, mode);
343 		checkEOF();
344 		return ret;
345 	}
346 
347 	alias read = InputStream.read;
348 
349 	private void checkEOF()
350 	@safe {
351 		if( !m_in ) return;
352 		if( m_in.empty ){
353 			m_in = InterfaceProxy!InputStream.init;
354 			m_callback();
355 		}
356 	}
357 }
358 
359 class LimitException : Exception {
360 @safe:
361 
362 	private ulong m_limit;
363 
364 	this(string message, ulong limit, Throwable next = null, string file = __FILE__, int line = __LINE__)
365 	{
366 		super(message, next, file, line);
367 	}
368 
369 	/// The byte limit of the stream that emitted the exception
370 	@property ulong limit() const { return m_limit; }
371 }