1 /** 2 Wrapper streams which count the number of bytes or limit the stream based on the number of 3 transferred bytes. 4 5 Copyright: © 2012 Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 Authors: Sönke Ludwig 8 */ 9 module vibe.stream.counting; 10 11 public import vibe.core.stream; 12 13 import std.exception; 14 import vibe.internal.interfaceproxy; 15 import vibe.internal.freelistref : FreeListRef; 16 17 18 /** Constructs a limited stream from an existing input stream. 19 20 Params: 21 stream = the input stream to be wrapped 22 byte_limit = the maximum number of bytes readable from the constructed stream 23 silent_limit = if set, the stream will behave exactly like the original stream, but 24 will throw an exception as soon as the limit is reached. 25 */ 26 LimitedInputStream createLimitedInputStream(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false) 27 if (isInputStream!InputStream) 28 { 29 return new LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true); 30 } 31 32 /// private 33 FreeListRef!LimitedInputStream createLimitedInputStreamFL(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false) 34 if (isInputStream!InputStream) 35 { 36 return FreeListRef!LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true); 37 } 38 39 /** Creates a proxy stream that counts the number of bytes written. 40 41 Params: 42 output = The stream to forward the written data to 43 byte_limit = Optional total write size limit after which an exception is thrown 44 */ 45 CountingOutputStream createCountingOutputStream(OutputStream)(OutputStream output, ulong byte_limit = ulong.max) 46 if (isOutputStream!OutputStream) 47 { 48 return new CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true); 49 } 50 51 /// private 52 FreeListRef!CountingOutputStream createCountingOutputStreamFL(OutputStream)(OutputStream output, ulong byte_limit = ulong.max) 53 if (isOutputStream!OutputStream) 54 { 55 return FreeListRef!CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true); 56 } 57 58 59 /** Creates a stream that fires a callback once the end of the underlying input stream is reached. 60 61 Params: 62 input = Source stream to read from 63 callback = The callback that is invoked one the source stream has been drained 64 */ 65 EndCallbackInputStream createEndCallbackInputStream(InputStream)(InputStream input, void delegate() @safe callback) 66 if (isInputStream!InputStream) 67 { 68 return new EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true); 69 } 70 71 /// private 72 FreeListRef!EndCallbackInputStream createEndCallbackInputStreamFL(InputStream)(InputStream input, void delegate() @safe callback) 73 if (isInputStream!InputStream) 74 { 75 return FreeListRef!EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true); 76 } 77 78 79 /** 80 Wraps an existing stream, limiting the amount of data that can be read. 81 */ 82 class LimitedInputStream : InputStream { 83 @safe: 84 85 private { 86 InterfaceProxy!InputStream m_input; 87 ulong m_sizeLimit; 88 bool m_silentLimit; 89 } 90 91 deprecated("Use createLimitedInputStream instead.") 92 this(InputStream stream, ulong byte_limit, bool silent_limit = false) 93 { 94 this(interfaceProxy!InputStream(stream), byte_limit, silent_limit, true); 95 } 96 97 /// private 98 this(InterfaceProxy!InputStream stream, ulong byte_limit, bool silent_limit, bool dummy) 99 { 100 assert(!!stream); 101 m_input = stream; 102 m_sizeLimit = byte_limit; 103 m_silentLimit = silent_limit; 104 } 105 106 /// The stream that is wrapped by this one 107 @property inout(InterfaceProxy!InputStream) sourceStream() inout { return m_input; } 108 109 @property bool empty() { return m_silentLimit ? m_input.empty : (m_sizeLimit == 0); } 110 111 @property ulong leastSize() { if( m_silentLimit ) return m_input.leastSize; return m_sizeLimit; } 112 113 @property bool dataAvailableForRead() { return m_input.dataAvailableForRead; } 114 115 void increment(ulong bytes) 116 { 117 if( bytes > m_sizeLimit ) onSizeLimitReached(); 118 m_sizeLimit -= bytes; 119 } 120 121 const(ubyte)[] peek() { return m_input.peek(); } 122 123 size_t read(scope ubyte[] dst, IOMode mode) 124 { 125 import std.algorithm: min; 126 127 if ((mode == IOMode.all || m_sizeLimit == 0) && dst.length > m_sizeLimit) onSizeLimitReached(); 128 129 const validReadSize = min(dst.length, m_sizeLimit); 130 auto ret = m_input.read(dst[0 .. validReadSize], mode); 131 m_sizeLimit -= ret; 132 return ret; 133 } 134 135 alias read = InputStream.read; 136 137 protected void onSizeLimitReached() @safe { 138 throw new LimitException("Size limit reached", m_sizeLimit); 139 } 140 } 141 142 unittest { // issue 2575 143 import vibe.stream.memory : createMemoryStream; 144 import std.exception : assertThrown; 145 146 auto buf = new ubyte[](1024); 147 foreach (i, ref b; buf) b = cast(ubyte) i; 148 auto input = createMemoryStream(buf, false); 149 150 // test IOMode.once and IOMode.immediate 151 static foreach (bufferSize; [100, 128, 200]) 152 static foreach (ioMode; [IOMode.once, IOMode.immediate]) 153 {{ 154 input.seek(0); 155 auto limitedStream = createLimitedInputStream(input, 128); 156 157 ubyte[] result; 158 159 ubyte[bufferSize] buffer; 160 while (!limitedStream.empty) { 161 const chunk = limitedStream.read(buffer[], ioMode); 162 result ~= buffer[0 .. chunk]; 163 } 164 165 assert(result[] == buf[0 .. 128]); 166 assertThrown(limitedStream.read(buffer[], ioMode)); 167 }} 168 169 // test IOMode.all normal operation 170 { 171 input.seek(0); 172 auto limitedStream = createLimitedInputStream(input, 128); 173 174 ubyte[] result; 175 ubyte[64] buffer; 176 result ~= buffer[0 .. limitedStream.read(buffer[], IOMode.all)]; 177 result ~= buffer[0 .. limitedStream.read(buffer[], IOMode.all)]; 178 assert(limitedStream.empty); 179 assert(result[] == buf[0 .. 128]); 180 assertThrown(limitedStream.read(buffer[], IOMode.all)); 181 } 182 183 // test IOMode.all reading over size limit 184 { 185 input.seek(0); 186 auto limitedStream = createLimitedInputStream(input, 128); 187 188 ubyte[256] buffer; 189 assertThrown(limitedStream.read(buffer[], IOMode.all)); 190 } 191 } 192 193 /** 194 Wraps an existing output stream, counting the bytes that are written. 195 */ 196 class CountingOutputStream : OutputStream { 197 @safe: 198 199 private { 200 ulong m_bytesWritten; 201 ulong m_writeLimit; 202 InterfaceProxy!OutputStream m_out; 203 } 204 205 deprecated("Use createCountingOutputStream instead.") 206 this(OutputStream stream, ulong write_limit = ulong.max) 207 { 208 this(interfaceProxy!OutputStream(stream), write_limit, true); 209 } 210 211 /// private 212 this(InterfaceProxy!OutputStream stream, ulong write_limit, bool dummy) 213 { 214 assert(!!stream); 215 m_writeLimit = write_limit; 216 m_out = stream; 217 } 218 219 /// Returns the total number of bytes written. 220 @property ulong bytesWritten() const { return m_bytesWritten; } 221 222 /// The maximum number of bytes to write 223 @property ulong writeLimit() const { return m_writeLimit; } 224 /// ditto 225 @property void writeLimit(ulong value) { m_writeLimit = value; } 226 227 /** Manually increments the write counter without actually writing data. 228 */ 229 void increment(ulong bytes) 230 { 231 enforce(m_bytesWritten + bytes <= m_writeLimit, "Incrementing past end of output stream."); 232 m_bytesWritten += bytes; 233 } 234 235 size_t write(in ubyte[] bytes, IOMode mode) 236 { 237 enforce(m_bytesWritten + bytes.length <= m_writeLimit, "Writing past end of output stream."); 238 239 auto ret = m_out.write(bytes, mode); 240 m_bytesWritten += ret; 241 return ret; 242 } 243 244 alias write = OutputStream.write; 245 246 void flush() { m_out.flush(); } 247 void finalize() { m_out.flush(); } 248 } 249 250 251 /** 252 Wraps an existing input stream, counting the bytes that are written. 253 */ 254 class CountingInputStream : InputStream { 255 @safe: 256 257 private { 258 ulong m_bytesRead; 259 InterfaceProxy!InputStream m_in; 260 } 261 262 deprecated("Use createCountingOutputStream instead.") 263 this(InputStream stream) 264 { 265 this(interfaceProxy!InputStream(stream), true); 266 } 267 268 /// private 269 this(InterfaceProxy!InputStream stream, bool dummy) 270 { 271 assert(!!stream); 272 m_in = stream; 273 } 274 275 @property ulong bytesRead() const { return m_bytesRead; } 276 277 @property bool empty() { return m_in.empty(); } 278 @property ulong leastSize() { return m_in.leastSize(); } 279 @property bool dataAvailableForRead() { return m_in.dataAvailableForRead; } 280 281 void increment(ulong bytes) 282 { 283 m_bytesRead += bytes; 284 } 285 286 const(ubyte)[] peek() { return m_in.peek(); } 287 288 size_t read(scope ubyte[] dst, IOMode mode) 289 { 290 auto ret = m_in.read(dst, mode); 291 m_bytesRead += ret; 292 return ret; 293 } 294 295 alias read = InputStream.read; 296 } 297 298 /** 299 Wraps an input stream and calls the given delegate once the stream is empty. 300 301 Note that this function will potentially block after each read operation to 302 see if the end has already been reached - this may take as long until either 303 new data has arrived or until the connection was closed. 304 305 The stream will also guarantee that the inner stream is not used after it 306 has been determined to be empty. It can thus be safely deleted once the 307 callback is invoked. 308 */ 309 class EndCallbackInputStream : InputStream { 310 @safe: 311 312 private { 313 InterfaceProxy!InputStream m_in; 314 bool m_eof = false; 315 void delegate() @safe m_callback; 316 } 317 318 deprecated("use createEndCallbackInputStream instead.") 319 this(InputStream input, void delegate() @safe callback) 320 { 321 this(interfaceProxy!InputStream(input), callback, true); 322 } 323 324 /// private 325 this(InterfaceProxy!InputStream input, void delegate() @safe callback, bool dummy) 326 { 327 m_in = input; 328 m_callback = callback; 329 checkEOF(); 330 } 331 332 @property bool empty() 333 { 334 checkEOF(); 335 return !m_in; 336 } 337 338 @property ulong leastSize() 339 { 340 checkEOF(); 341 if( m_in ) return m_in.leastSize(); 342 return 0; 343 } 344 345 @property bool dataAvailableForRead() 346 { 347 if( !m_in ) return false; 348 return m_in.dataAvailableForRead; 349 } 350 351 const(ubyte)[] peek() 352 { 353 if( !m_in ) return null; 354 return m_in.peek(); 355 } 356 357 size_t read(scope ubyte[] dst, IOMode mode) 358 { 359 enforce(!!m_in, "Reading past end of stream."); 360 auto ret = m_in.read(dst, mode); 361 checkEOF(); 362 return ret; 363 } 364 365 alias read = InputStream.read; 366 367 private void checkEOF() 368 @safe { 369 if( !m_in ) return; 370 if( m_in.empty ){ 371 m_in = InterfaceProxy!InputStream.init; 372 m_callback(); 373 } 374 } 375 } 376 377 class LimitException : Exception { 378 @safe: 379 380 private ulong m_limit; 381 382 this(string message, ulong limit, Throwable next = null, string file = __FILE__, int line = __LINE__) 383 { 384 super(message, next, file, line); 385 } 386 387 /// The byte limit of the stream that emitted the exception 388 @property ulong limit() const { return m_limit; } 389 }