1 /** 2 Wrapper streams which count the number of bytes or limit the stream based on the number of 3 transferred bytes. 4 5 Copyright: © 2012 Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 Authors: Sönke Ludwig 8 */ 9 module vibe.stream.counting; 10 11 public import vibe.core.stream; 12 13 import std.exception; 14 import vibe.internal.interfaceproxy; 15 import vibe.internal.freelistref : FreeListRef; 16 17 18 /** Constructs a limited stream from an existing input stream. 19 20 Params: 21 stream = the input stream to be wrapped 22 byte_limit = the maximum number of bytes readable from the constructed stream 23 silent_limit = if set, the stream will behave exactly like the original stream, but 24 will throw an exception as soon as the limit is reached. 25 */ 26 LimitedInputStream createLimitedInputStream(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false) 27 if (isInputStream!InputStream) 28 { 29 return new LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true); 30 } 31 32 /// private 33 FreeListRef!LimitedInputStream createLimitedInputStreamFL(InputStream)(InputStream stream, ulong byte_limit, bool silent_limit = false) 34 if (isInputStream!InputStream) 35 { 36 return FreeListRef!LimitedInputStream(interfaceProxy!(.InputStream)(stream), byte_limit, silent_limit, true); 37 } 38 39 /** Creates a proxy stream that counts the number of bytes written. 40 41 Params: 42 output = The stream to forward the written data to 43 byte_limit = Optional total write size limit after which an exception is thrown 44 */ 45 CountingOutputStream createCountingOutputStream(OutputStream)(OutputStream output, ulong byte_limit = ulong.max) 46 if (isOutputStream!OutputStream) 47 { 48 return new CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true); 49 } 50 51 /// private 52 FreeListRef!CountingOutputStream createCountingOutputStreamFL(OutputStream)(OutputStream output, ulong byte_limit = ulong.max) 53 if (isOutputStream!OutputStream) 54 { 55 return FreeListRef!CountingOutputStream(interfaceProxy!(.OutputStream)(output), byte_limit, true); 56 } 57 58 59 /** Creates a stream that fires a callback once the end of the underlying input stream is reached. 60 61 Params: 62 input = Source stream to read from 63 callback = The callback that is invoked one the source stream has been drained 64 */ 65 EndCallbackInputStream createEndCallbackInputStream(InputStream)(InputStream input, void delegate() @safe callback) 66 if (isInputStream!InputStream) 67 { 68 return new EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true); 69 } 70 71 /// private 72 FreeListRef!EndCallbackInputStream createEndCallbackInputStreamFL(InputStream)(InputStream input, void delegate() @safe callback) 73 if (isInputStream!InputStream) 74 { 75 return FreeListRef!EndCallbackInputStream(interfaceProxy!(.InputStream)(input), callback, true); 76 } 77 78 79 /** 80 Wraps an existing stream, limiting the amount of data that can be read. 81 */ 82 class LimitedInputStream : InputStream { 83 @safe: 84 85 private { 86 InterfaceProxy!InputStream m_input; 87 ulong m_sizeLimit; 88 bool m_silentLimit; 89 } 90 91 /// private 92 this(InterfaceProxy!InputStream stream, ulong byte_limit, bool silent_limit, bool dummy) 93 { 94 assert(!!stream); 95 m_input = stream; 96 m_sizeLimit = byte_limit; 97 m_silentLimit = silent_limit; 98 } 99 100 /// The stream that is wrapped by this one 101 @property inout(InterfaceProxy!InputStream) sourceStream() inout { return m_input; } 102 103 @property bool empty() { return m_silentLimit ? m_input.empty : (m_sizeLimit == 0); } 104 105 @property ulong leastSize() { if( m_silentLimit ) return m_input.leastSize; return m_sizeLimit; } 106 107 @property bool dataAvailableForRead() { return m_input.dataAvailableForRead; } 108 109 void increment(ulong bytes) 110 { 111 if( bytes > m_sizeLimit ) onSizeLimitReached(); 112 m_sizeLimit -= bytes; 113 } 114 115 const(ubyte)[] peek() { return m_input.peek(); } 116 117 size_t read(scope ubyte[] dst, IOMode mode) 118 { 119 import std.algorithm: min; 120 121 if ((mode == IOMode.all || m_sizeLimit == 0) && dst.length > m_sizeLimit) onSizeLimitReached(); 122 123 const validReadSize = min(dst.length, m_sizeLimit); 124 auto ret = m_input.read(dst[0 .. validReadSize], mode); 125 m_sizeLimit -= ret; 126 return ret; 127 } 128 129 alias read = InputStream.read; 130 131 protected void onSizeLimitReached() @safe { 132 throw new LimitException("Size limit reached", m_sizeLimit); 133 } 134 } 135 136 unittest { // issue 2575 137 import vibe.stream.memory : createMemoryStream; 138 import std.exception : assertThrown; 139 140 auto buf = new ubyte[](1024); 141 foreach (i, ref b; buf) b = cast(ubyte) i; 142 auto input = createMemoryStream(buf, false); 143 144 // test IOMode.once and IOMode.immediate 145 static foreach (bufferSize; [100, 128, 200]) 146 static foreach (ioMode; [IOMode.once, IOMode.immediate]) 147 {{ 148 input.seek(0); 149 auto limitedStream = createLimitedInputStream(input, 128); 150 151 ubyte[] result; 152 153 ubyte[bufferSize] buffer; 154 while (!limitedStream.empty) { 155 const chunk = limitedStream.read(buffer[], ioMode); 156 result ~= buffer[0 .. chunk]; 157 } 158 159 assert(result[] == buf[0 .. 128]); 160 assertThrown(limitedStream.read(buffer[], ioMode)); 161 }} 162 163 // test IOMode.all normal operation 164 { 165 input.seek(0); 166 auto limitedStream = createLimitedInputStream(input, 128); 167 168 ubyte[] result; 169 ubyte[64] buffer; 170 result ~= buffer[0 .. limitedStream.read(buffer[], IOMode.all)]; 171 result ~= buffer[0 .. limitedStream.read(buffer[], IOMode.all)]; 172 assert(limitedStream.empty); 173 assert(result[] == buf[0 .. 128]); 174 assertThrown(limitedStream.read(buffer[], IOMode.all)); 175 } 176 177 // test IOMode.all reading over size limit 178 { 179 input.seek(0); 180 auto limitedStream = createLimitedInputStream(input, 128); 181 182 ubyte[256] buffer; 183 assertThrown(limitedStream.read(buffer[], IOMode.all)); 184 } 185 } 186 187 /** 188 Wraps an existing output stream, counting the bytes that are written. 189 */ 190 class CountingOutputStream : OutputStream { 191 @safe: 192 193 private { 194 ulong m_bytesWritten; 195 ulong m_writeLimit; 196 InterfaceProxy!OutputStream m_out; 197 } 198 199 /// private 200 this(InterfaceProxy!OutputStream stream, ulong write_limit, bool dummy) 201 { 202 assert(!!stream); 203 m_writeLimit = write_limit; 204 m_out = stream; 205 } 206 207 /// Returns the total number of bytes written. 208 @property ulong bytesWritten() const { return m_bytesWritten; } 209 210 /// The maximum number of bytes to write 211 @property ulong writeLimit() const { return m_writeLimit; } 212 /// ditto 213 @property void writeLimit(ulong value) { m_writeLimit = value; } 214 215 /** Manually increments the write counter without actually writing data. 216 */ 217 void increment(ulong bytes) 218 { 219 enforce(m_bytesWritten + bytes <= m_writeLimit, "Incrementing past end of output stream."); 220 m_bytesWritten += bytes; 221 } 222 223 size_t write(in ubyte[] bytes, IOMode mode) 224 { 225 enforce(m_bytesWritten + bytes.length <= m_writeLimit, "Writing past end of output stream."); 226 227 auto ret = m_out.write(bytes, mode); 228 m_bytesWritten += ret; 229 return ret; 230 } 231 232 alias write = OutputStream.write; 233 234 void flush() { m_out.flush(); } 235 void finalize() { m_out.flush(); } 236 } 237 238 239 /** 240 Wraps an existing input stream, counting the bytes that are written. 241 */ 242 class CountingInputStream : InputStream { 243 @safe: 244 245 private { 246 ulong m_bytesRead; 247 InterfaceProxy!InputStream m_in; 248 } 249 250 /// private 251 this(InterfaceProxy!InputStream stream, bool dummy) 252 { 253 assert(!!stream); 254 m_in = stream; 255 } 256 257 @property ulong bytesRead() const { return m_bytesRead; } 258 259 @property bool empty() { return m_in.empty(); } 260 @property ulong leastSize() { return m_in.leastSize(); } 261 @property bool dataAvailableForRead() { return m_in.dataAvailableForRead; } 262 263 void increment(ulong bytes) 264 { 265 m_bytesRead += bytes; 266 } 267 268 const(ubyte)[] peek() { return m_in.peek(); } 269 270 size_t read(scope ubyte[] dst, IOMode mode) 271 { 272 auto ret = m_in.read(dst, mode); 273 m_bytesRead += ret; 274 return ret; 275 } 276 277 alias read = InputStream.read; 278 } 279 280 /** 281 Wraps an input stream and calls the given delegate once the stream is empty. 282 283 Note that this function will potentially block after each read operation to 284 see if the end has already been reached - this may take as long until either 285 new data has arrived or until the connection was closed. 286 287 The stream will also guarantee that the inner stream is not used after it 288 has been determined to be empty. It can thus be safely deleted once the 289 callback is invoked. 290 */ 291 class EndCallbackInputStream : InputStream { 292 @safe: 293 294 private { 295 InterfaceProxy!InputStream m_in; 296 bool m_eof = false; 297 void delegate() @safe m_callback; 298 } 299 300 /// private 301 this(InterfaceProxy!InputStream input, void delegate() @safe callback, bool dummy) 302 { 303 m_in = input; 304 m_callback = callback; 305 checkEOF(); 306 } 307 308 @property bool empty() 309 { 310 checkEOF(); 311 return !m_in; 312 } 313 314 @property ulong leastSize() 315 { 316 checkEOF(); 317 if( m_in ) return m_in.leastSize(); 318 return 0; 319 } 320 321 @property bool dataAvailableForRead() 322 { 323 if( !m_in ) return false; 324 return m_in.dataAvailableForRead; 325 } 326 327 const(ubyte)[] peek() 328 { 329 if( !m_in ) return null; 330 return m_in.peek(); 331 } 332 333 size_t read(scope ubyte[] dst, IOMode mode) 334 { 335 enforce(!!m_in, "Reading past end of stream."); 336 auto ret = m_in.read(dst, mode); 337 checkEOF(); 338 return ret; 339 } 340 341 alias read = InputStream.read; 342 343 private void checkEOF() 344 @safe { 345 if( !m_in ) return; 346 if( m_in.empty ){ 347 m_in = InterfaceProxy!InputStream.init; 348 m_callback(); 349 } 350 } 351 } 352 353 class LimitException : Exception { 354 @safe: 355 356 private ulong m_limit; 357 358 this(string message, ulong limit, Throwable next = null, string file = __FILE__, int line = __LINE__) 359 { 360 super(message, next, file, line); 361 } 362 363 /// The byte limit of the stream that emitted the exception 364 @property ulong limit() const { return m_limit; } 365 }