1 /** 2 Wrapper streams which count the number of bytes or limit the stream based on the number of 3 transferred bytes. 4 5 Copyright: © 2012 Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 Authors: Sönke Ludwig 8 */ 9 module vibe.stream.counting; 10 11 public import vibe.core.stream; 12 13 import std.exception; 14 import vibe.internal.interfaceproxy; 15 import vibe.internal.freelistref : FreeListRef; 16 17 18 /** Constructs a limited stream from an existing input stream. 19 20 Params: 21 stream = the input stream to be wrapped 22 byte_limit = the maximum number of bytes readable from the constructed stream 23 silent_limit = if set, the stream will behave exactly like the original stream, but 24 will throw an exception as soon as the limit is reached. 25 */ 26 LimitedInputStream createLimitedInputStream(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false) 27 if (isInputStream!InputStream) 28 { 29 return new LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true); 30 } 31 32 /// private 33 FreeListRef!LimitedInputStream createLimitedInputStreamFL(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false) 34 if (isInputStream!InputStream) 35 { 36 return FreeListRef!LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true); 37 } 38 39 /** Creates a proxy stream that counts the number of bytes written. 40 41 Params: 42 output = The stream to forward the written data to 43 byte_limit = Optional total write size limit after which an exception is thrown 44 */ 45 CountingOutputStream createCountingOutputStream(OutputStream)(OutputStream output, ulong byte_limit = ulong.max) 46 if (isOutputStream!OutputStream) 47 { 48 return new CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true); 49 } 50 51 /// private 52 FreeListRef!CountingOutputStream createCountingOutputStreamFL(OutputStream)(OutputStream output, ulong byte_limit = ulong.max) 53 if (isOutputStream!OutputStream) 54 { 55 return FreeListRef!CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true); 56 } 57 58 59 /** Creates a stream that fires a callback once the end of the underlying input stream is reached. 60 61 Params: 62 input = Source stream to read from 63 callback = The callback that is invoked one the source stream has been drained 64 */ 65 EndCallbackInputStream createEndCallbackInputStream(InputStream)(InputStream input, void delegate() @safe callback) 66 if (isInputStream!InputStream) 67 { 68 return new EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true); 69 } 70 71 /// private 72 FreeListRef!EndCallbackInputStream createEndCallbackInputStreamFL(InputStream)(InputStream input, void delegate() @safe callback) 73 if (isInputStream!InputStream) 74 { 75 return FreeListRef!EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true); 76 } 77 78 79 /** 80 Wraps an existing stream, limiting the amount of data that can be read. 81 */ 82 class LimitedInputStream : InputStream { 83 @safe: 84 85 private { 86 InterfaceProxy!InputStream m_input; 87 ulong m_sizeLimit; 88 bool m_silentLimit; 89 } 90 91 deprecated("Use createLimitedInputStream instead.") 92 this(InputStream stream, ulong byte_limit, bool silent_limit = false) 93 { 94 this(interfaceProxy!InputStream(stream), byte_limit, silent_limit, true); 95 } 96 97 /// private 98 this(InterfaceProxy!InputStream stream, ulong byte_limit, bool silent_limit, bool dummy) 99 { 100 assert(!!stream); 101 m_input = stream; 102 m_sizeLimit = byte_limit; 103 m_silentLimit = silent_limit; 104 } 105 106 /// The stream that is wrapped by this one 107 @property inout(InterfaceProxy!InputStream) sourceStream() inout { return m_input; } 108 109 @property bool empty() { return m_silentLimit ? m_input.empty : (m_sizeLimit == 0); } 110 111 @property ulong leastSize() { if( m_silentLimit ) return m_input.leastSize; return m_sizeLimit; } 112 113 @property bool dataAvailableForRead() { return m_input.dataAvailableForRead; } 114 115 void increment(ulong bytes) 116 { 117 if( bytes > m_sizeLimit ) onSizeLimitReached(); 118 m_sizeLimit -= bytes; 119 } 120 121 const(ubyte)[] peek() { return m_input.peek(); } 122 123 size_t read(scope ubyte[] dst, IOMode mode) 124 { 125 if (dst.length > m_sizeLimit) onSizeLimitReached(); 126 auto ret = m_input.read(dst, mode); 127 m_sizeLimit -= ret; 128 return ret; 129 } 130 131 alias read = InputStream.read; 132 133 protected void onSizeLimitReached() @safe { 134 throw new LimitException("Size limit reached", m_sizeLimit); 135 } 136 } 137 138 139 /** 140 Wraps an existing output stream, counting the bytes that are written. 141 */ 142 class CountingOutputStream : OutputStream { 143 @safe: 144 145 private { 146 ulong m_bytesWritten; 147 ulong m_writeLimit; 148 InterfaceProxy!OutputStream m_out; 149 } 150 151 deprecated("Use createCountingOutputStream instead.") 152 this(OutputStream stream, ulong write_limit = ulong.max) 153 { 154 this(interfaceProxy!OutputStream(stream), write_limit, true); 155 } 156 157 /// private 158 this(InterfaceProxy!OutputStream stream, ulong write_limit, bool dummy) 159 { 160 assert(!!stream); 161 m_writeLimit = write_limit; 162 m_out = stream; 163 } 164 165 /// Returns the total number of bytes written. 166 @property ulong bytesWritten() const { return m_bytesWritten; } 167 168 /// The maximum number of bytes to write 169 @property ulong writeLimit() const { return m_writeLimit; } 170 /// ditto 171 @property void writeLimit(ulong value) { m_writeLimit = value; } 172 173 /** Manually increments the write counter without actually writing data. 174 */ 175 void increment(ulong bytes) 176 { 177 enforce(m_bytesWritten + bytes <= m_writeLimit, "Incrementing past end of output stream."); 178 m_bytesWritten += bytes; 179 } 180 181 size_t write(in ubyte[] bytes, IOMode mode) 182 { 183 enforce(m_bytesWritten + bytes.length <= m_writeLimit, "Writing past end of output stream."); 184 185 auto ret = m_out.write(bytes, mode); 186 m_bytesWritten += ret; 187 return ret; 188 } 189 190 alias write = OutputStream.write; 191 192 void flush() { m_out.flush(); } 193 void finalize() { m_out.flush(); } 194 } 195 196 197 /** 198 Wraps an existing input stream, counting the bytes that are written. 199 */ 200 class CountingInputStream : InputStream { 201 @safe: 202 203 private { 204 ulong m_bytesRead; 205 InterfaceProxy!InputStream m_in; 206 } 207 208 deprecated("Use createCountingOutputStream instead.") 209 this(InputStream stream) 210 { 211 this(interfaceProxy!InputStream(stream), true); 212 } 213 214 /// private 215 this(InterfaceProxy!InputStream stream, bool dummy) 216 { 217 assert(!!stream); 218 m_in = stream; 219 } 220 221 @property ulong bytesRead() const { return m_bytesRead; } 222 223 @property bool empty() { return m_in.empty(); } 224 @property ulong leastSize() { return m_in.leastSize(); } 225 @property bool dataAvailableForRead() { return m_in.dataAvailableForRead; } 226 227 void increment(ulong bytes) 228 { 229 m_bytesRead += bytes; 230 } 231 232 const(ubyte)[] peek() { return m_in.peek(); } 233 234 size_t read(scope ubyte[] dst, IOMode mode) 235 { 236 auto ret = m_in.read(dst, mode); 237 m_bytesRead += ret; 238 return ret; 239 } 240 241 alias read = InputStream.read; 242 } 243 244 /** 245 Wraps an input stream and calls the given delegate once the stream is empty. 246 247 Note that this function will potentially block after each read operation to 248 see if the end has already been reached - this may take as long until either 249 new data has arrived or until the connection was closed. 250 251 The stream will also guarantee that the inner stream is not used after it 252 has been determined to be empty. It can thus be safely deleted once the 253 callback is invoked. 254 */ 255 class EndCallbackInputStream : InputStream { 256 @safe: 257 258 private { 259 InterfaceProxy!InputStream m_in; 260 bool m_eof = false; 261 void delegate() @safe m_callback; 262 } 263 264 deprecated("use createEndCallbackInputStream instead.") 265 this(InputStream input, void delegate() @safe callback) 266 { 267 this(interfaceProxy!InputStream(input), callback, true); 268 } 269 270 /// private 271 this(InterfaceProxy!InputStream input, void delegate() @safe callback, bool dummy) 272 { 273 m_in = input; 274 m_callback = callback; 275 checkEOF(); 276 } 277 278 @property bool empty() 279 { 280 checkEOF(); 281 return !m_in; 282 } 283 284 @property ulong leastSize() 285 { 286 checkEOF(); 287 if( m_in ) return m_in.leastSize(); 288 return 0; 289 } 290 291 @property bool dataAvailableForRead() 292 { 293 if( !m_in ) return false; 294 return m_in.dataAvailableForRead; 295 } 296 297 const(ubyte)[] peek() 298 { 299 if( !m_in ) return null; 300 return m_in.peek(); 301 } 302 303 size_t read(scope ubyte[] dst, IOMode mode) 304 { 305 enforce(!!m_in, "Reading past end of stream."); 306 auto ret = m_in.read(dst, mode); 307 checkEOF(); 308 return ret; 309 } 310 311 alias read = InputStream.read; 312 313 private void checkEOF() 314 @safe { 315 if( !m_in ) return; 316 if( m_in.empty ){ 317 m_in = InterfaceProxy!InputStream.init; 318 m_callback(); 319 } 320 } 321 } 322 323 class LimitException : Exception { 324 @safe: 325 326 private ulong m_limit; 327 328 this(string message, ulong limit, Throwable next = null, string file = __FILE__, int line = __LINE__) 329 { 330 super(message, next, file, line); 331 } 332 333 /// The byte limit of the stream that emitted the exception 334 @property ulong limit() const { return m_limit; } 335 }