1 /**
2 	Wrapper streams which count the number of bytes or limit the stream based on the number of
3 	transferred bytes.
4 
5 	Copyright: © 2012 Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 	Authors: Sönke Ludwig
8 */
9 module vibe.stream.counting;
10 
11 public import vibe.core.stream;
12 
13 import std.exception;
14 import vibe.internal.interfaceproxy;
15 import vibe.internal.freelistref : FreeListRef;
16 
17 
18 /** Constructs a limited stream from an existing input stream.
19 
20 	Params:
21 		stream = the input stream to be wrapped
22 		byte_limit = the maximum number of bytes readable from the constructed stream
23 		silent_limit = if set, the stream will behave exactly like the original stream, but
24 			will throw an exception as soon as the limit is reached.
25 */
26 LimitedInputStream createLimitedInputStream(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false)
27 	if (isInputStream!InputStream)
28 {
29 	return new LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true);
30 }
31 
32 /// private
33 FreeListRef!LimitedInputStream createLimitedInputStreamFL(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false)
34 	if (isInputStream!InputStream)
35 {
36 	return FreeListRef!LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true);
37 }
38 
39 /** Creates a proxy stream that counts the number of bytes written.
40 
41 	Params:
42 		output = The stream to forward the written data to
43 		byte_limit = Optional total write size limit after which an exception is thrown
44 */
45 CountingOutputStream createCountingOutputStream(OutputStream)(OutputStream output, ulong byte_limit = ulong.max)
46 	if (isOutputStream!OutputStream)
47 {
48 	return new CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true);
49 }
50 
51 /// private
52 FreeListRef!CountingOutputStream createCountingOutputStreamFL(OutputStream)(OutputStream output, ulong byte_limit = ulong.max)
53 	if (isOutputStream!OutputStream)
54 {
55 	return FreeListRef!CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true);
56 }
57 
58 
59 /** Creates a stream that fires a callback once the end of the underlying input stream is reached.
60 
61 	Params:
62 		input = Source stream to read from
63 		callback = The callback that is invoked one the source stream has been drained
64 */
65 EndCallbackInputStream createEndCallbackInputStream(InputStream)(InputStream input, void delegate() @safe callback)
66 	if (isInputStream!InputStream)
67 {
68 	return new EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true);
69 }
70 
71 /// private
72 FreeListRef!EndCallbackInputStream createEndCallbackInputStreamFL(InputStream)(InputStream input, void delegate() @safe callback)
73 	if (isInputStream!InputStream)
74 {
75 	return FreeListRef!EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true);
76 }
77 
78 
79 /**
80 	Wraps an existing stream, limiting the amount of data that can be read.
81 */
82 class LimitedInputStream : InputStream {
83 @safe:
84 
85 	private {
86 		InterfaceProxy!InputStream m_input;
87 		ulong m_sizeLimit;
88 		bool m_silentLimit;
89 	}
90 
91 	deprecated("Use createLimitedInputStream instead.")
92 	this(InputStream stream, ulong byte_limit, bool silent_limit = false)
93 	{
94 		this(interfaceProxy!InputStream(stream), byte_limit, silent_limit, true);
95 	}
96 
97 	/// private
98 	this(InterfaceProxy!InputStream stream, ulong byte_limit, bool silent_limit, bool dummy)
99 	{
100 		assert(!!stream);
101 		m_input = stream;
102 		m_sizeLimit = byte_limit;
103 		m_silentLimit = silent_limit;
104 	}
105 
106 	/// The stream that is wrapped by this one
107 	@property inout(InterfaceProxy!InputStream) sourceStream() inout { return m_input; }
108 
109 	@property bool empty() { return m_silentLimit ? m_input.empty : (m_sizeLimit == 0); }
110 
111 	@property ulong leastSize() { if( m_silentLimit ) return m_input.leastSize; return m_sizeLimit; }
112 
113 	@property bool dataAvailableForRead() { return m_input.dataAvailableForRead; }
114 
115 	void increment(ulong bytes)
116 	{
117 		if( bytes > m_sizeLimit ) onSizeLimitReached();
118 		m_sizeLimit -= bytes;
119 	}
120 
121 	const(ubyte)[] peek() { return m_input.peek(); }
122 
123 	size_t read(scope ubyte[] dst, IOMode mode)
124 	{
125 		if (dst.length > m_sizeLimit) onSizeLimitReached();
126 		auto ret = m_input.read(dst, mode);
127 		m_sizeLimit -= ret;
128 		return ret;
129 	}
130 
131 	alias read = InputStream.read;
132 
133 	protected void onSizeLimitReached() @safe {
134 		throw new LimitException("Size limit reached", m_sizeLimit);
135 	}
136 }
137 
138 
139 /**
140 	Wraps an existing output stream, counting the bytes that are written.
141 */
142 class CountingOutputStream : OutputStream {
143 @safe:
144 
145 	private {
146 		ulong m_bytesWritten;
147 		ulong m_writeLimit;
148 		InterfaceProxy!OutputStream m_out;
149 	}
150 
151 	deprecated("Use createCountingOutputStream instead.")
152 	this(OutputStream stream, ulong write_limit = ulong.max)
153 	{
154 		this(interfaceProxy!OutputStream(stream), write_limit, true);
155 	}
156 
157 	/// private
158 	this(InterfaceProxy!OutputStream stream, ulong write_limit, bool dummy)
159 	{
160 		assert(!!stream);
161 		m_writeLimit = write_limit;
162 		m_out = stream;
163 	}
164 
165 	/// Returns the total number of bytes written.
166 	@property ulong bytesWritten() const { return m_bytesWritten; }
167 
168 	/// The maximum number of bytes to write
169 	@property ulong writeLimit() const { return m_writeLimit; }
170 	/// ditto
171 	@property void writeLimit(ulong value) { m_writeLimit = value; }
172 
173 	/** Manually increments the write counter without actually writing data.
174 	*/
175 	void increment(ulong bytes)
176 	{
177 		enforce(m_bytesWritten + bytes <= m_writeLimit, "Incrementing past end of output stream.");
178 		m_bytesWritten += bytes;
179 	}
180 
181 	size_t write(in ubyte[] bytes, IOMode mode)
182 	{
183 		enforce(m_bytesWritten + bytes.length <= m_writeLimit, "Writing past end of output stream.");
184 
185 		auto ret = m_out.write(bytes, mode);
186 		m_bytesWritten += ret;
187 		return ret;
188 	}
189 
190 	alias write = OutputStream.write;
191 
192 	void flush() { m_out.flush(); }
193 	void finalize() { m_out.flush(); }
194 }
195 
196 
197 /**
198 	Wraps an existing input stream, counting the bytes that are written.
199 */
200 class CountingInputStream : InputStream {
201 @safe:
202 
203 	private {
204 		ulong m_bytesRead;
205 		InterfaceProxy!InputStream m_in;
206 	}
207 
208 	deprecated("Use createCountingOutputStream instead.")
209 	this(InputStream stream)
210 	{
211 		this(interfaceProxy!InputStream(stream), true);
212 	}
213 
214 	/// private
215 	this(InterfaceProxy!InputStream stream, bool dummy)
216 	{
217 		assert(!!stream);
218 		m_in = stream;
219 	}
220 
221 	@property ulong bytesRead() const { return m_bytesRead; }
222 
223 	@property bool empty() { return m_in.empty(); }
224 	@property ulong leastSize() { return m_in.leastSize();  }
225 	@property bool dataAvailableForRead() { return m_in.dataAvailableForRead; }
226 
227 	void increment(ulong bytes)
228 	{
229 		m_bytesRead += bytes;
230 	}
231 
232 	const(ubyte)[] peek() { return m_in.peek(); }
233 
234 	size_t read(scope ubyte[] dst, IOMode mode)
235 	{
236 		auto ret = m_in.read(dst, mode);
237 		m_bytesRead += ret;
238 		return ret;
239 	}
240 
241 	alias read = InputStream.read;
242 }
243 
244 /**
245 	Wraps an input stream and calls the given delegate once the stream is empty.
246 
247 	Note that this function will potentially block after each read operation to
248 	see if the end has already been reached - this may take as long until either
249 	new data has arrived or until the connection was closed.
250 
251 	The stream will also guarantee that the inner stream is not used after it
252 	has been determined to be empty. It can thus be safely deleted once the
253 	callback is invoked.
254 */
255 class EndCallbackInputStream : InputStream {
256 @safe:
257 
258 	private {
259 		InterfaceProxy!InputStream m_in;
260 		bool m_eof = false;
261 		void delegate() @safe m_callback;
262 	}
263 
264 	deprecated("use createEndCallbackInputStream instead.")
265 	this(InputStream input, void delegate() @safe callback)
266 	{
267 		this(interfaceProxy!InputStream(input), callback, true);
268 	}
269 
270 	/// private
271 	this(InterfaceProxy!InputStream input, void delegate() @safe callback, bool dummy)
272 	{
273 		m_in = input;
274 		m_callback = callback;
275 		checkEOF();
276 	}
277 
278 	@property bool empty()
279 	{
280 		checkEOF();
281 		return !m_in;
282 	}
283 
284 	@property ulong leastSize()
285 	{
286 		checkEOF();
287 		if( m_in ) return m_in.leastSize();
288 		return 0;
289 	}
290 
291 	@property bool dataAvailableForRead()
292 	{
293 		if( !m_in ) return false;
294 		return m_in.dataAvailableForRead;
295 	}
296 
297 	const(ubyte)[] peek()
298 	{
299 		if( !m_in ) return null;
300 		return m_in.peek();
301 	}
302 
303 	size_t read(scope ubyte[] dst, IOMode mode)
304 	{
305 		enforce(!!m_in, "Reading past end of stream.");
306 		auto ret = m_in.read(dst, mode);
307 		checkEOF();
308 		return ret;
309 	}
310 
311 	alias read = InputStream.read;
312 
313 	private void checkEOF()
314 	@safe {
315 		if( !m_in ) return;
316 		if( m_in.empty ){
317 			m_in = InterfaceProxy!InputStream.init;
318 			m_callback();
319 		}
320 	}
321 }
322 
323 class LimitException : Exception {
324 @safe:
325 
326 	private ulong m_limit;
327 
328 	this(string message, ulong limit, Throwable next = null, string file = __FILE__, int line = __LINE__)
329 	{
330 		super(message, next, file, line);
331 	}
332 
333 	/// The byte limit of the stream that emitted the exception
334 	@property ulong limit() const { return m_limit; }
335 }