1 /**
2 	Wrapper streams which count the number of bytes or limit the stream based on the number of
3 	transferred bytes.
4 
5 	Copyright: © 2012 Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 	Authors: Sönke Ludwig
8 */
9 module vibe.stream.counting;
10 
11 public import vibe.core.stream;
12 
13 import std.exception;
14 import vibe.internal.interfaceproxy;
15 import vibe.internal.freelistref : FreeListRef;
16 
17 
18 /** Constructs a limited stream from an existing input stream.
19 
20 	Params:
21 		stream = the input stream to be wrapped
22 		byte_limit = the maximum number of bytes readable from the constructed stream
23 		silent_limit = if set, the stream will behave exactly like the original stream, but
24 			will throw an exception as soon as the limit is reached.
25 */
26 LimitedInputStream createLimitedInputStream(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false)
27 	if (isInputStream!InputStream)
28 {
29 	return new LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true);
30 }
31 
32 /// private
33 FreeListRef!LimitedInputStream createLimitedInputStreamFL(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false)
34 	if (isInputStream!InputStream)
35 {
36 	return FreeListRef!LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true);
37 }
38 
39 /** Creates a proxy stream that counts the number of bytes written.
40 
41 	Params:
42 		output = The stream to forward the written data to
43 		byte_limit = Optional total write size limit after which an exception is thrown
44 */
45 CountingOutputStream createCountingOutputStream(OutputStream)(OutputStream output, ulong byte_limit = ulong.max)
46 	if (isOutputStream!OutputStream)
47 {
48 	return new CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true);
49 }
50 
51 /// private
52 FreeListRef!CountingOutputStream createCountingOutputStreamFL(OutputStream)(OutputStream output, ulong byte_limit = ulong.max)
53 	if (isOutputStream!OutputStream)
54 {
55 	return FreeListRef!CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true);
56 }
57 
58 
59 /** Creates a stream that fires a callback once the end of the underlying input stream is reached.
60 
61 	Params:
62 		input = Source stream to read from
63 		callback = The callback that is invoked one the source stream has been drained
64 */
65 EndCallbackInputStream createEndCallbackInputStream(InputStream)(InputStream input, void delegate() @safe callback)
66 	if (isInputStream!InputStream)
67 {
68 	return new EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true);
69 }
70 
71 /// private
72 FreeListRef!EndCallbackInputStream createEndCallbackInputStreamFL(InputStream)(InputStream input, void delegate() @safe callback)
73 	if (isInputStream!InputStream)
74 {
75 	return FreeListRef!EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true);
76 }
77 
78 
79 /**
80 	Wraps an existing stream, limiting the amount of data that can be read.
81 */
82 class LimitedInputStream : InputStream {
83 @safe:
84 
85 	private {
86 		InterfaceProxy!InputStream m_input;
87 		ulong m_sizeLimit;
88 		bool m_silentLimit;
89 	}
90 
91 	deprecated("Use createLimitedInputStream instead.")
92 	this(InputStream stream, ulong byte_limit, bool silent_limit = false)
93 	{
94 		this(interfaceProxy!InputStream(stream), byte_limit, silent_limit, true);
95 	}
96 
97 	/// private
98 	this(InterfaceProxy!InputStream stream, ulong byte_limit, bool silent_limit, bool dummy)
99 	{
100 		assert(!!stream);
101 		m_input = stream;
102 		m_sizeLimit = byte_limit;
103 		m_silentLimit = silent_limit;
104 	}
105 
106 	/// The stream that is wrapped by this one
107 	@property inout(InterfaceProxy!InputStream) sourceStream() inout { return m_input; }
108 
109 	@property bool empty() { return m_silentLimit ? m_input.empty : (m_sizeLimit == 0); }
110 
111 	@property ulong leastSize() { if( m_silentLimit ) return m_input.leastSize; return m_sizeLimit; }
112 
113 	@property bool dataAvailableForRead() { return m_input.dataAvailableForRead; }
114 
115 	void increment(ulong bytes)
116 	{
117 		if( bytes > m_sizeLimit ) onSizeLimitReached();
118 		m_sizeLimit -= bytes;
119 	}
120 
121 	const(ubyte)[] peek() { return m_input.peek(); }
122 
123 	size_t read(scope ubyte[] dst, IOMode mode)
124 	{
125 		import std.algorithm: min;
126 
127 		if ((mode == IOMode.all || m_sizeLimit == 0) && dst.length > m_sizeLimit) onSizeLimitReached();
128 
129 		const validReadSize = min(dst.length, m_sizeLimit);
130 		auto ret = m_input.read(dst[0 .. validReadSize], mode);
131 		m_sizeLimit -= ret;
132 		return ret;
133 	}
134 
135 	alias read = InputStream.read;
136 
137 	protected void onSizeLimitReached() @safe {
138 		throw new LimitException("Size limit reached", m_sizeLimit);
139 	}
140 }
141 
142 unittest { // issue 2575
143 	import vibe.stream.memory : createMemoryStream;
144 	import std.exception : assertThrown;
145 
146 	auto buf = new ubyte[](1024);
147 	foreach (i, ref b; buf) b = cast(ubyte) i;
148 	auto input = createMemoryStream(buf, false);
149 
150 	// test IOMode.once and IOMode.immediate
151 	static foreach (bufferSize; [100, 128, 200])
152 	static foreach (ioMode; [IOMode.once, IOMode.immediate])
153 	{{
154 		input.seek(0);
155 		auto limitedStream = createLimitedInputStream(input, 128);
156 
157 		ubyte[] result;
158 
159 		ubyte[bufferSize] buffer;
160 		while (!limitedStream.empty) {
161 			const chunk = limitedStream.read(buffer[], ioMode);
162 			result ~= buffer[0 .. chunk];
163 		}
164 
165 		assert(result[] == buf[0 .. 128]);
166 		assertThrown(limitedStream.read(buffer[], ioMode));
167 	}}
168 
169 	// test IOMode.all normal operation
170 	{
171 		input.seek(0);
172 		auto limitedStream = createLimitedInputStream(input, 128);
173 
174 		ubyte[] result;
175 		ubyte[64] buffer;
176 		result ~= buffer[0 .. limitedStream.read(buffer[], IOMode.all)];
177 		result ~= buffer[0 .. limitedStream.read(buffer[], IOMode.all)];
178 		assert(limitedStream.empty);
179 		assert(result[] == buf[0 .. 128]);
180 		assertThrown(limitedStream.read(buffer[], IOMode.all));
181 	}
182 
183 	// test IOMode.all reading over size limit
184 	{
185 		input.seek(0);
186 		auto limitedStream = createLimitedInputStream(input, 128);
187 
188 		ubyte[256] buffer;
189 		assertThrown(limitedStream.read(buffer[], IOMode.all));
190 	}
191 }
192 
193 /**
194 	Wraps an existing output stream, counting the bytes that are written.
195 */
196 class CountingOutputStream : OutputStream {
197 @safe:
198 
199 	private {
200 		ulong m_bytesWritten;
201 		ulong m_writeLimit;
202 		InterfaceProxy!OutputStream m_out;
203 	}
204 
205 	deprecated("Use createCountingOutputStream instead.")
206 	this(OutputStream stream, ulong write_limit = ulong.max)
207 	{
208 		this(interfaceProxy!OutputStream(stream), write_limit, true);
209 	}
210 
211 	/// private
212 	this(InterfaceProxy!OutputStream stream, ulong write_limit, bool dummy)
213 	{
214 		assert(!!stream);
215 		m_writeLimit = write_limit;
216 		m_out = stream;
217 	}
218 
219 	/// Returns the total number of bytes written.
220 	@property ulong bytesWritten() const { return m_bytesWritten; }
221 
222 	/// The maximum number of bytes to write
223 	@property ulong writeLimit() const { return m_writeLimit; }
224 	/// ditto
225 	@property void writeLimit(ulong value) { m_writeLimit = value; }
226 
227 	/** Manually increments the write counter without actually writing data.
228 	*/
229 	void increment(ulong bytes)
230 	{
231 		enforce(m_bytesWritten + bytes <= m_writeLimit, "Incrementing past end of output stream.");
232 		m_bytesWritten += bytes;
233 	}
234 
235 	size_t write(in ubyte[] bytes, IOMode mode)
236 	{
237 		enforce(m_bytesWritten + bytes.length <= m_writeLimit, "Writing past end of output stream.");
238 
239 		auto ret = m_out.write(bytes, mode);
240 		m_bytesWritten += ret;
241 		return ret;
242 	}
243 
244 	alias write = OutputStream.write;
245 
246 	void flush() { m_out.flush(); }
247 	void finalize() { m_out.flush(); }
248 }
249 
250 
251 /**
252 	Wraps an existing input stream, counting the bytes that are written.
253 */
254 class CountingInputStream : InputStream {
255 @safe:
256 
257 	private {
258 		ulong m_bytesRead;
259 		InterfaceProxy!InputStream m_in;
260 	}
261 
262 	deprecated("Use createCountingOutputStream instead.")
263 	this(InputStream stream)
264 	{
265 		this(interfaceProxy!InputStream(stream), true);
266 	}
267 
268 	/// private
269 	this(InterfaceProxy!InputStream stream, bool dummy)
270 	{
271 		assert(!!stream);
272 		m_in = stream;
273 	}
274 
275 	@property ulong bytesRead() const { return m_bytesRead; }
276 
277 	@property bool empty() { return m_in.empty(); }
278 	@property ulong leastSize() { return m_in.leastSize();  }
279 	@property bool dataAvailableForRead() { return m_in.dataAvailableForRead; }
280 
281 	void increment(ulong bytes)
282 	{
283 		m_bytesRead += bytes;
284 	}
285 
286 	const(ubyte)[] peek() { return m_in.peek(); }
287 
288 	size_t read(scope ubyte[] dst, IOMode mode)
289 	{
290 		auto ret = m_in.read(dst, mode);
291 		m_bytesRead += ret;
292 		return ret;
293 	}
294 
295 	alias read = InputStream.read;
296 }
297 
298 /**
299 	Wraps an input stream and calls the given delegate once the stream is empty.
300 
301 	Note that this function will potentially block after each read operation to
302 	see if the end has already been reached - this may take as long until either
303 	new data has arrived or until the connection was closed.
304 
305 	The stream will also guarantee that the inner stream is not used after it
306 	has been determined to be empty. It can thus be safely deleted once the
307 	callback is invoked.
308 */
309 class EndCallbackInputStream : InputStream {
310 @safe:
311 
312 	private {
313 		InterfaceProxy!InputStream m_in;
314 		bool m_eof = false;
315 		void delegate() @safe m_callback;
316 	}
317 
318 	deprecated("use createEndCallbackInputStream instead.")
319 	this(InputStream input, void delegate() @safe callback)
320 	{
321 		this(interfaceProxy!InputStream(input), callback, true);
322 	}
323 
324 	/// private
325 	this(InterfaceProxy!InputStream input, void delegate() @safe callback, bool dummy)
326 	{
327 		m_in = input;
328 		m_callback = callback;
329 		checkEOF();
330 	}
331 
332 	@property bool empty()
333 	{
334 		checkEOF();
335 		return !m_in;
336 	}
337 
338 	@property ulong leastSize()
339 	{
340 		checkEOF();
341 		if( m_in ) return m_in.leastSize();
342 		return 0;
343 	}
344 
345 	@property bool dataAvailableForRead()
346 	{
347 		if( !m_in ) return false;
348 		return m_in.dataAvailableForRead;
349 	}
350 
351 	const(ubyte)[] peek()
352 	{
353 		if( !m_in ) return null;
354 		return m_in.peek();
355 	}
356 
357 	size_t read(scope ubyte[] dst, IOMode mode)
358 	{
359 		enforce(!!m_in, "Reading past end of stream.");
360 		auto ret = m_in.read(dst, mode);
361 		checkEOF();
362 		return ret;
363 	}
364 
365 	alias read = InputStream.read;
366 
367 	private void checkEOF()
368 	@safe {
369 		if( !m_in ) return;
370 		if( m_in.empty ){
371 			m_in = InterfaceProxy!InputStream.init;
372 			m_callback();
373 		}
374 	}
375 }
376 
377 class LimitException : Exception {
378 @safe:
379 
380 	private ulong m_limit;
381 
382 	this(string message, ulong limit, Throwable next = null, string file = __FILE__, int line = __LINE__)
383 	{
384 		super(message, next, file, line);
385 	}
386 
387 	/// The byte limit of the stream that emitted the exception
388 	@property ulong limit() const { return m_limit; }
389 }