1 /** 2 Wrapper streams which count the number of bytes or limit the stream based on the number of 3 transferred bytes. 4 5 Copyright: © 2012 Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 Authors: Sönke Ludwig 8 */ 9 module vibe.stream.counting; 10 11 public import vibe.core.stream; 12 13 import std.exception; 14 import vibe.internal.interfaceproxy; 15 import vibe.internal.freelistref : FreeListRef; 16 17 18 /** Constructs a limited stream from an existing input stream. 19 20 Params: 21 stream = the input stream to be wrapped 22 byte_limit = the maximum number of bytes readable from the constructed stream 23 silent_limit = if set, the stream will behave exactly like the original stream, but 24 will throw an exception as soon as the limit is reached. 25 */ 26 LimitedInputStream createLimitedInputStream(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false) 27 if (isInputStream!InputStream) 28 { 29 return new LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true); 30 } 31 32 /// private 33 FreeListRef!LimitedInputStream createLimitedInputStreamFL(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false) 34 if (isInputStream!InputStream) 35 { 36 return FreeListRef!LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true); 37 } 38 39 /** Creates a proxy stream that counts the number of bytes written. 40 41 Params: 42 output = The stream to forward the written data to 43 byte_limit = Optional total write size limit after which an exception is thrown 44 */ 45 CountingOutputStream createCountingOutputStream(OutputStream)(OutputStream output, ulong byte_limit = ulong.max) 46 if (isOutputStream!OutputStream) 47 { 48 return new CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true); 49 } 50 51 /// private 52 FreeListRef!CountingOutputStream createCountingOutputStreamFL(OutputStream)(OutputStream output, ulong byte_limit = ulong.max) 53 if (isOutputStream!OutputStream) 54 { 55 return FreeListRef!CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true); 56 } 57 58 59 /** Creates a stream that fires a callback once the end of the underlying input stream is reached. 60 61 Params: 62 input = Source stream to read from 63 callback = The callback that is invoked one the source stream has been drained 64 */ 65 EndCallbackInputStream createEndCallbackInputStream(InputStream)(InputStream input, void delegate() @safe callback) 66 if (isInputStream!InputStream) 67 { 68 return new EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true); 69 } 70 71 /// private 72 FreeListRef!EndCallbackInputStream createEndCallbackInputStreamFL(InputStream)(InputStream input, void delegate() @safe callback) 73 if (isInputStream!InputStream) 74 { 75 return FreeListRef!EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true); 76 } 77 78 79 /** 80 Wraps an existing stream, limiting the amount of data that can be read. 81 */ 82 class LimitedInputStream : InputStream { 83 @safe: 84 85 private { 86 InterfaceProxy!InputStream m_input; 87 ulong m_sizeLimit; 88 bool m_silentLimit; 89 } 90 91 /// private 92 this(InterfaceProxy!InputStream stream, ulong byte_limit, bool silent_limit, bool dummy) 93 { 94 assert(!!stream); 95 m_input = stream; 96 m_sizeLimit = byte_limit; 97 m_silentLimit = silent_limit; 98 } 99 100 /// The stream that is wrapped by this one 101 @property inout(InterfaceProxy!InputStream) sourceStream() inout { return m_input; } 102 103 @property bool empty() { return m_silentLimit ? m_input.empty : (m_sizeLimit == 0); } 104 105 @property ulong leastSize() { if( m_silentLimit ) return m_input.leastSize; return m_sizeLimit; } 106 107 @property bool dataAvailableForRead() { return m_input.dataAvailableForRead; } 108 109 void increment(ulong bytes) 110 { 111 if( bytes > m_sizeLimit ) onSizeLimitReached(); 112 m_sizeLimit -= bytes; 113 } 114 115 const(ubyte)[] peek() { return m_input.peek(); } 116 117 size_t read(scope ubyte[] dst, IOMode mode) 118 { 119 import std.algorithm: min; 120 121 if ((mode == IOMode.all || m_sizeLimit == 0) && dst.length > m_sizeLimit) onSizeLimitReached(); 122 123 const validReadSize = min(dst.length, m_sizeLimit); 124 auto ret = m_input.read(dst[0 .. validReadSize], mode); 125 m_sizeLimit -= ret; 126 return ret; 127 } 128 129 alias read = InputStream.read; 130 131 protected void onSizeLimitReached() @safe { 132 throw new LimitException("Size limit reached", m_sizeLimit); 133 } 134 } 135 136 unittest { // issue 2575 137 import vibe.stream.memory : createMemoryStream; 138 import std.exception : assertThrown; 139 140 auto buf = new ubyte[](1024); 141 foreach (i, ref b; buf) b = cast(ubyte) i; 142 auto input = createMemoryStream(buf, false); 143 144 // test IOMode.once and IOMode.immediate 145 static foreach (bufferSize; [100, 128, 200]) 146 static foreach (ioMode; [IOMode.once, IOMode.immediate]) 147 {{ 148 input.seek(0); 149 auto limitedStream = createLimitedInputStream(input, 128); 150 151 ubyte[] result; 152 153 ubyte[bufferSize] buffer; 154 while (!limitedStream.empty) { 155 const chunk = limitedStream.read(buffer[], ioMode); 156 result ~= buffer[0 .. chunk]; 157 } 158 159 assert(result[] == buf[0 .. 128]); 160 assertThrown(limitedStream.read(buffer[], ioMode)); 161 }} 162 163 // test IOMode.all normal operation 164 { 165 input.seek(0); 166 auto limitedStream = createLimitedInputStream(input, 128); 167 168 ubyte[] result; 169 ubyte[64] buffer; 170 result ~= buffer[0 .. limitedStream.read(buffer[], IOMode.all)]; 171 result ~= buffer[0 .. limitedStream.read(buffer[], IOMode.all)]; 172 assert(limitedStream.empty); 173 assert(result[] == buf[0 .. 128]); 174 assertThrown(limitedStream.read(buffer[], IOMode.all)); 175 } 176 177 // test IOMode.all reading over size limit 178 { 179 input.seek(0); 180 auto limitedStream = createLimitedInputStream(input, 128); 181 182 ubyte[256] buffer; 183 assertThrown(limitedStream.read(buffer[], IOMode.all)); 184 } 185 } 186 187 /** 188 Wraps an existing output stream, counting the bytes that are written. 189 */ 190 class CountingOutputStream : OutputStream { 191 @safe: 192 193 private { 194 ulong m_bytesWritten; 195 ulong m_writeLimit; 196 InterfaceProxy!OutputStream m_out; 197 } 198 199 /// private 200 this(InterfaceProxy!OutputStream stream, ulong write_limit, bool dummy) 201 { 202 assert(!!stream); 203 m_writeLimit = write_limit; 204 m_out = stream; 205 } 206 207 /// Returns the total number of bytes written. 208 @property ulong bytesWritten() const { return m_bytesWritten; } 209 210 /// The maximum number of bytes to write 211 @property ulong writeLimit() const { return m_writeLimit; } 212 /// ditto 213 @property void writeLimit(ulong value) { m_writeLimit = value; } 214 215 /** Manually increments the write counter without actually writing data. 216 */ 217 void increment(ulong bytes) 218 { 219 enforce(m_bytesWritten + bytes <= m_writeLimit, "Incrementing past end of output stream."); 220 m_bytesWritten += bytes; 221 } 222 223 static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) { 224 override size_t write(scope const(ubyte)[] bytes_, IOMode mode) { return doWrite(bytes_, mode); } 225 } else { 226 override size_t write(in ubyte[] bytes_, IOMode mode) { return doWrite(bytes_, mode); } 227 } 228 229 alias write = OutputStream.write; 230 231 private size_t doWrite(scope const(ubyte)[] bytes, IOMode mode) 232 { 233 enforce(m_bytesWritten + bytes.length <= m_writeLimit, "Writing past end of output stream."); 234 235 auto ret = m_out.write(bytes, mode); 236 m_bytesWritten += ret; 237 return ret; 238 } 239 240 void flush() { m_out.flush(); } 241 void finalize() { m_out.flush(); } 242 } 243 244 245 /** 246 Wraps an existing input stream, counting the bytes that are written. 247 */ 248 class CountingInputStream : InputStream { 249 @safe: 250 251 private { 252 ulong m_bytesRead; 253 InterfaceProxy!InputStream m_in; 254 } 255 256 /// private 257 this(InterfaceProxy!InputStream stream, bool dummy) 258 { 259 assert(!!stream); 260 m_in = stream; 261 } 262 263 @property ulong bytesRead() const { return m_bytesRead; } 264 265 @property bool empty() { return m_in.empty(); } 266 @property ulong leastSize() { return m_in.leastSize(); } 267 @property bool dataAvailableForRead() { return m_in.dataAvailableForRead; } 268 269 void increment(ulong bytes) 270 { 271 m_bytesRead += bytes; 272 } 273 274 const(ubyte)[] peek() { return m_in.peek(); } 275 276 size_t read(scope ubyte[] dst, IOMode mode) 277 { 278 auto ret = m_in.read(dst, mode); 279 m_bytesRead += ret; 280 return ret; 281 } 282 283 alias read = InputStream.read; 284 } 285 286 /** 287 Wraps an input stream and calls the given delegate once the stream is empty. 288 289 Note that this function will potentially block after each read operation to 290 see if the end has already been reached - this may take as long until either 291 new data has arrived or until the connection was closed. 292 293 The stream will also guarantee that the inner stream is not used after it 294 has been determined to be empty. It can thus be safely deleted once the 295 callback is invoked. 296 */ 297 class EndCallbackInputStream : InputStream { 298 @safe: 299 300 private { 301 InterfaceProxy!InputStream m_in; 302 bool m_eof = false; 303 void delegate() @safe m_callback; 304 } 305 306 /// private 307 this(InterfaceProxy!InputStream input, void delegate() @safe callback, bool dummy) 308 { 309 m_in = input; 310 m_callback = callback; 311 checkEOF(); 312 } 313 314 @property bool empty() 315 { 316 checkEOF(); 317 return !m_in; 318 } 319 320 @property ulong leastSize() 321 { 322 checkEOF(); 323 if( m_in ) return m_in.leastSize(); 324 return 0; 325 } 326 327 @property bool dataAvailableForRead() 328 { 329 if( !m_in ) return false; 330 return m_in.dataAvailableForRead; 331 } 332 333 const(ubyte)[] peek() 334 { 335 if( !m_in ) return null; 336 return m_in.peek(); 337 } 338 339 size_t read(scope ubyte[] dst, IOMode mode) 340 { 341 enforce(!!m_in, "Reading past end of stream."); 342 auto ret = m_in.read(dst, mode); 343 checkEOF(); 344 return ret; 345 } 346 347 alias read = InputStream.read; 348 349 private void checkEOF() 350 @safe { 351 if( !m_in ) return; 352 if( m_in.empty ){ 353 m_in = InterfaceProxy!InputStream.init; 354 m_callback(); 355 } 356 } 357 } 358 359 class LimitException : Exception { 360 @safe: 361 362 private ulong m_limit; 363 364 this(string message, ulong limit, Throwable next = null, string file = __FILE__, int line = __LINE__) 365 { 366 super(message, next, file, line); 367 } 368 369 /// The byte limit of the stream that emitted the exception 370 @property ulong limit() const { return m_limit; } 371 }