1 /**
2 	Wrapper streams which count the number of bytes or limit the stream based on the number of
3 	transferred bytes.
4 
5 	Copyright: © 2012 Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 	Authors: Sönke Ludwig
8 */
9 module vibe.stream.counting;
10 
11 public import vibe.core.stream;
12 
13 import std.exception;
14 import vibe.internal.interfaceproxy;
15 import vibe.internal.freelistref : FreeListRef;
16 
17 
18 /** Constructs a limited stream from an existing input stream.
19 
20 	Params:
21 		stream = the input stream to be wrapped
22 		byte_limit = the maximum number of bytes readable from the constructed stream
23 		silent_limit = if set, the stream will behave exactly like the original stream, but
24 			will throw an exception as soon as the limit is reached.
25 */
26 LimitedInputStream createLimitedInputStream(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false)
27 	if (isInputStream!InputStream)
28 {
29 	return new LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true);
30 }
31 
32 /// private
33 FreeListRef!LimitedInputStream createLimitedInputStreamFL(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false)
34 	if (isInputStream!InputStream)
35 {
36 	return FreeListRef!LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true);
37 }
38 
39 /** Creates a proxy stream that counts the number of bytes written.
40 
41 	Params:
42 		output = The stream to forward the written data to
43 		byte_limit = Optional total write size limit after which an exception is thrown
44 */
45 CountingOutputStream createCountingOutputStream(OutputStream)(OutputStream output, ulong byte_limit = ulong.max)
46 	if (isOutputStream!OutputStream)
47 {
48 	return new CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true);
49 }
50 
51 /// private
52 FreeListRef!CountingOutputStream createCountingOutputStreamFL(OutputStream)(OutputStream output, ulong byte_limit = ulong.max)
53 	if (isOutputStream!OutputStream)
54 {
55 	return FreeListRef!CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true);
56 }
57 
58 
59 /** Creates a stream that fires a callback once the end of the underlying input stream is reached.
60 
61 	Params:
62 		input = Source stream to read from
63 		callback = The callback that is invoked one the source stream has been drained
64 */
65 EndCallbackInputStream createEndCallbackInputStream(InputStream)(InputStream input, void delegate() @safe callback)
66 	if (isInputStream!InputStream)
67 {
68 	return new EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true);
69 }
70 
71 /// private
72 FreeListRef!EndCallbackInputStream createEndCallbackInputStreamFL(InputStream)(InputStream input, void delegate() @safe callback)
73 	if (isInputStream!InputStream)
74 {
75 	return FreeListRef!EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true);
76 }
77 
78 
79 /**
80 	Wraps an existing stream, limiting the amount of data that can be read.
81 */
82 class LimitedInputStream : InputStream {
83 @safe:
84 
85 	private {
86 		InterfaceProxy!InputStream m_input;
87 		ulong m_sizeLimit;
88 		bool m_silentLimit;
89 	}
90 
91 	/// private
92 	this(InterfaceProxy!InputStream stream, ulong byte_limit, bool silent_limit, bool dummy)
93 	{
94 		assert(!!stream);
95 		m_input = stream;
96 		m_sizeLimit = byte_limit;
97 		m_silentLimit = silent_limit;
98 	}
99 
100 	/// The stream that is wrapped by this one
101 	@property inout(InterfaceProxy!InputStream) sourceStream() inout { return m_input; }
102 
103 	@property bool empty() { return m_silentLimit ? m_input.empty : (m_sizeLimit == 0); }
104 
105 	@property ulong leastSize() { if( m_silentLimit ) return m_input.leastSize; return m_sizeLimit; }
106 
107 	@property bool dataAvailableForRead() { return m_input.dataAvailableForRead; }
108 
109 	void increment(ulong bytes)
110 	{
111 		if( bytes > m_sizeLimit ) onSizeLimitReached();
112 		m_sizeLimit -= bytes;
113 	}
114 
115 	const(ubyte)[] peek() { return m_input.peek(); }
116 
117 	size_t read(scope ubyte[] dst, IOMode mode)
118 	{
119 		import std.algorithm: min;
120 
121 		if ((mode == IOMode.all || m_sizeLimit == 0) && dst.length > m_sizeLimit) onSizeLimitReached();
122 
123 		const validReadSize = min(dst.length, m_sizeLimit);
124 		auto ret = m_input.read(dst[0 .. validReadSize], mode);
125 		m_sizeLimit -= ret;
126 		return ret;
127 	}
128 
129 	alias read = InputStream.read;
130 
131 	protected void onSizeLimitReached() @safe {
132 		throw new LimitException("Size limit reached", m_sizeLimit);
133 	}
134 }
135 
136 unittest { // issue 2575
137 	import vibe.stream.memory : createMemoryStream;
138 	import std.exception : assertThrown;
139 
140 	auto buf = new ubyte[](1024);
141 	foreach (i, ref b; buf) b = cast(ubyte) i;
142 	auto input = createMemoryStream(buf, false);
143 
144 	// test IOMode.once and IOMode.immediate
145 	static foreach (bufferSize; [100, 128, 200])
146 	static foreach (ioMode; [IOMode.once, IOMode.immediate])
147 	{{
148 		input.seek(0);
149 		auto limitedStream = createLimitedInputStream(input, 128);
150 
151 		ubyte[] result;
152 
153 		ubyte[bufferSize] buffer;
154 		while (!limitedStream.empty) {
155 			const chunk = limitedStream.read(buffer[], ioMode);
156 			result ~= buffer[0 .. chunk];
157 		}
158 
159 		assert(result[] == buf[0 .. 128]);
160 		assertThrown(limitedStream.read(buffer[], ioMode));
161 	}}
162 
163 	// test IOMode.all normal operation
164 	{
165 		input.seek(0);
166 		auto limitedStream = createLimitedInputStream(input, 128);
167 
168 		ubyte[] result;
169 		ubyte[64] buffer;
170 		result ~= buffer[0 .. limitedStream.read(buffer[], IOMode.all)];
171 		result ~= buffer[0 .. limitedStream.read(buffer[], IOMode.all)];
172 		assert(limitedStream.empty);
173 		assert(result[] == buf[0 .. 128]);
174 		assertThrown(limitedStream.read(buffer[], IOMode.all));
175 	}
176 
177 	// test IOMode.all reading over size limit
178 	{
179 		input.seek(0);
180 		auto limitedStream = createLimitedInputStream(input, 128);
181 
182 		ubyte[256] buffer;
183 		assertThrown(limitedStream.read(buffer[], IOMode.all));
184 	}
185 }
186 
187 /**
188 	Wraps an existing output stream, counting the bytes that are written.
189 */
190 class CountingOutputStream : OutputStream {
191 @safe:
192 
193 	private {
194 		ulong m_bytesWritten;
195 		ulong m_writeLimit;
196 		InterfaceProxy!OutputStream m_out;
197 	}
198 
199 	/// private
200 	this(InterfaceProxy!OutputStream stream, ulong write_limit, bool dummy)
201 	{
202 		assert(!!stream);
203 		m_writeLimit = write_limit;
204 		m_out = stream;
205 	}
206 
207 	/// Returns the total number of bytes written.
208 	@property ulong bytesWritten() const { return m_bytesWritten; }
209 
210 	/// The maximum number of bytes to write
211 	@property ulong writeLimit() const { return m_writeLimit; }
212 	/// ditto
213 	@property void writeLimit(ulong value) { m_writeLimit = value; }
214 
215 	/** Manually increments the write counter without actually writing data.
216 	*/
217 	void increment(ulong bytes)
218 	{
219 		enforce(m_bytesWritten + bytes <= m_writeLimit, "Incrementing past end of output stream.");
220 		m_bytesWritten += bytes;
221 	}
222 
223 	size_t write(in ubyte[] bytes, IOMode mode)
224 	{
225 		enforce(m_bytesWritten + bytes.length <= m_writeLimit, "Writing past end of output stream.");
226 
227 		auto ret = m_out.write(bytes, mode);
228 		m_bytesWritten += ret;
229 		return ret;
230 	}
231 
232 	alias write = OutputStream.write;
233 
234 	void flush() { m_out.flush(); }
235 	void finalize() { m_out.flush(); }
236 }
237 
238 
239 /**
240 	Wraps an existing input stream, counting the bytes that are written.
241 */
242 class CountingInputStream : InputStream {
243 @safe:
244 
245 	private {
246 		ulong m_bytesRead;
247 		InterfaceProxy!InputStream m_in;
248 	}
249 
250 	/// private
251 	this(InterfaceProxy!InputStream stream, bool dummy)
252 	{
253 		assert(!!stream);
254 		m_in = stream;
255 	}
256 
257 	@property ulong bytesRead() const { return m_bytesRead; }
258 
259 	@property bool empty() { return m_in.empty(); }
260 	@property ulong leastSize() { return m_in.leastSize();  }
261 	@property bool dataAvailableForRead() { return m_in.dataAvailableForRead; }
262 
263 	void increment(ulong bytes)
264 	{
265 		m_bytesRead += bytes;
266 	}
267 
268 	const(ubyte)[] peek() { return m_in.peek(); }
269 
270 	size_t read(scope ubyte[] dst, IOMode mode)
271 	{
272 		auto ret = m_in.read(dst, mode);
273 		m_bytesRead += ret;
274 		return ret;
275 	}
276 
277 	alias read = InputStream.read;
278 }
279 
280 /**
281 	Wraps an input stream and calls the given delegate once the stream is empty.
282 
283 	Note that this function will potentially block after each read operation to
284 	see if the end has already been reached - this may take as long until either
285 	new data has arrived or until the connection was closed.
286 
287 	The stream will also guarantee that the inner stream is not used after it
288 	has been determined to be empty. It can thus be safely deleted once the
289 	callback is invoked.
290 */
291 class EndCallbackInputStream : InputStream {
292 @safe:
293 
294 	private {
295 		InterfaceProxy!InputStream m_in;
296 		bool m_eof = false;
297 		void delegate() @safe m_callback;
298 	}
299 
300 	/// private
301 	this(InterfaceProxy!InputStream input, void delegate() @safe callback, bool dummy)
302 	{
303 		m_in = input;
304 		m_callback = callback;
305 		checkEOF();
306 	}
307 
308 	@property bool empty()
309 	{
310 		checkEOF();
311 		return !m_in;
312 	}
313 
314 	@property ulong leastSize()
315 	{
316 		checkEOF();
317 		if( m_in ) return m_in.leastSize();
318 		return 0;
319 	}
320 
321 	@property bool dataAvailableForRead()
322 	{
323 		if( !m_in ) return false;
324 		return m_in.dataAvailableForRead;
325 	}
326 
327 	const(ubyte)[] peek()
328 	{
329 		if( !m_in ) return null;
330 		return m_in.peek();
331 	}
332 
333 	size_t read(scope ubyte[] dst, IOMode mode)
334 	{
335 		enforce(!!m_in, "Reading past end of stream.");
336 		auto ret = m_in.read(dst, mode);
337 		checkEOF();
338 		return ret;
339 	}
340 
341 	alias read = InputStream.read;
342 
343 	private void checkEOF()
344 	@safe {
345 		if( !m_in ) return;
346 		if( m_in.empty ){
347 			m_in = InterfaceProxy!InputStream.init;
348 			m_callback();
349 		}
350 	}
351 }
352 
353 class LimitException : Exception {
354 @safe:
355 
356 	private ulong m_limit;
357 
358 	this(string message, ulong limit, Throwable next = null, string file = __FILE__, int line = __LINE__)
359 	{
360 		super(message, next, file, line);
361 	}
362 
363 	/// The byte limit of the stream that emitted the exception
364 	@property ulong limit() const { return m_limit; }
365 }