1 /** Implements a buffered random access wrapper stream. 2 3 Copyright: © 2020-2021 Sönke Ludwig 4 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 5 Authors: Sönke Ludwig 6 */ 7 module vibe.stream.bufferedstream; 8 9 import vibe.core.stream; 10 11 import std.algorithm; 12 import std.traits : Unqual; 13 14 15 /** Creates a new buffered stream wrapper. 16 17 Params: 18 stream = The stream that is going to be wrapped 19 buffer_size = Size of a single buffer segment 20 buffer_count = Number of buffer segments 21 */ 22 auto bufferedStream(S)(S stream, size_t buffer_size = 16_384, size_t buffer_count = 4) 23 if (isRandomAccessStream!S) 24 in { assert(buffer_size > 0 && buffer_count > 0); } 25 do { 26 return BufferedStream!S(stream.move, buffer_size, buffer_count); 27 } 28 29 30 /** Random access stream that provides buffering on top of a wrapped stream. 31 32 The source stream is logically split up into chunks of equal size, where 33 a defined maximum number of those chunks can be cached in memory. Cached 34 chunks are managed using a LRU strategy. 35 */ 36 struct BufferedStream(S) { 37 import std.experimental.allocator; 38 import std.experimental.allocator.mallocator; 39 40 private static struct State { 41 S stream; 42 int refCount = 1; 43 ulong ptr; 44 ulong size; 45 size_t bufferSize; 46 Buffer[] buffers; 47 ulong accessCount; 48 ubyte[] buffermemory; 49 50 this(size_t buffer_size, size_t buffer_count, S stream) 51 { 52 this.bufferSize = buffer_size; 53 this.buffers = Mallocator.instance.makeArray!Buffer(buffer_count); 54 this.buffermemory = Mallocator.instance.makeArray!ubyte(buffer_count * buffer_size); 55 foreach (i, ref b; this.buffers) 56 b.memory = this.buffermemory[i * buffer_size .. (i+1) * buffer_size]; 57 this.size = stream.size; 58 this.stream = stream.move; 59 } 60 61 ~this() 62 { 63 if (this.buffers is null) return; 64 65 if (this.stream.writable) 66 flush(); 67 68 () @trusted { 69 Mallocator.instance.dispose(this.buffermemory); 70 Mallocator.instance.dispose(this.buffers); 71 } (); 72 } 73 74 @disable this(this); 75 76 void flush() 77 { 78 foreach (i; 0 .. this.buffers.length) 79 flushBuffer(i); 80 this.stream.flush(); 81 } 82 83 private size_t bufferChunk(ulong chunk_index) 84 { 85 auto idx = this.buffers.countUntil!((ref b) => b.chunk == chunk_index); 86 if (idx >= 0) return idx; 87 88 auto offset = chunk_index * this.bufferSize; 89 if (offset >= this.size) this.size = this.stream.size; 90 if (offset >= this.size) throw new Exception("Reading past end of stream."); 91 92 auto newidx = this.buffers.minIndex!((ref a, ref b) => a.lastAccess < b.lastAccess); 93 flushBuffer(newidx); 94 95 this.buffers[newidx].fill = 0; 96 this.buffers[newidx].chunk = chunk_index; 97 fillBuffer(newidx, min(this.size - chunk_index*this.bufferSize, this.bufferSize)); 98 return newidx; 99 } 100 101 private void fillBuffer(size_t buffer, size_t size) 102 { 103 auto b = &this.buffers[buffer]; 104 if (size <= b.fill) return; 105 assert(size <= this.bufferSize); 106 107 this.stream.seek(b.chunk*this.bufferSize + b.fill); 108 this.stream.read(b.memory[b.fill .. size]); 109 b.fill = size; 110 touchBuffer(buffer); 111 } 112 113 private void flushBuffer(size_t buffer) 114 { 115 auto b = &this.buffers[buffer]; 116 if (!b.fill || !b.dirty) return; 117 this.stream.seek(b.chunk * this.bufferSize); 118 this.stream.write(b.memory[0 .. b.fill]); 119 if (b.chunk * this.bufferSize + b.fill > this.size) 120 this.size = b.chunk * this.bufferSize + b.fill; 121 b.dirty = false; 122 touchBuffer(buffer); 123 } 124 125 private void touchBuffer(size_t buffer) 126 { 127 this.buffers[buffer].lastAccess = ++this.accessCount; 128 } 129 130 private void iterateChunks(B)(ulong offset, scope B[] bytes, 131 scope bool delegate(ulong offset, scope B[] bytes, sizediff_t buffer, size_t buffer_begin, size_t buffer_end) @safe del) 132 @safe { 133 auto begin = offset; 134 auto end = offset + bytes.length; 135 136 if (bytes.length == 0) return; 137 138 auto chunk_begin = begin / this.bufferSize; 139 auto chunk_end = (end + this.bufferSize - 1) / this.bufferSize; 140 141 foreach (i; chunk_begin .. chunk_end) { 142 auto chunk_off = i * this.bufferSize; 143 auto cstart = max(chunk_off, begin); 144 auto cend = min(chunk_off + this.bufferSize, end); 145 assert(cend > cstart); 146 147 auto buf = this.buffers.countUntil!((ref b) => b.chunk == i); 148 auto buf_begin = cast(size_t)(cstart - chunk_off); 149 auto buf_end = cast(size_t)(cend - chunk_off); 150 151 auto bytes_chunk = bytes[cast(size_t)(cstart - begin) .. cast(size_t)(cend - begin)]; 152 153 if (!del(cstart, bytes_chunk, buf, buf_begin, buf_end)) 154 break; 155 } 156 } 157 } 158 159 private { 160 // makes the stream copyable and makes it small enough to be put into 161 // a stream proxy 162 State* m_state; 163 } 164 165 private this(S stream, size_t buffer_size, size_t buffer_count) 166 @safe { 167 m_state = new State(buffer_size, buffer_count, stream.move); 168 } 169 170 this(this) 171 { 172 if (m_state) m_state.refCount++; 173 } 174 175 ~this() 176 @safe { 177 if (m_state) { 178 if (!--m_state.refCount) 179 destroy(*m_state); 180 } 181 } 182 183 @property bool empty() @blocking { return state.ptr >= state.size; } 184 @property ulong leastSize() @blocking { return state.size - state.ptr; } 185 @property bool dataAvailableForRead() { return peek().length > 0; } 186 @property ulong size() const nothrow { return state.size; } 187 @property bool readable() const nothrow { return state.stream.readable; } 188 @property bool writable() const nothrow { return state.stream.writable; } 189 190 @property ref inout(S) underlying() inout { return state.stream; } 191 192 static if (isClosableRandomAccessStream!S) { 193 void close() 194 { 195 sync(); 196 state.stream.close(); 197 } 198 199 @property bool isOpen() 200 const { 201 return state.stream.isOpen(); 202 } 203 } 204 205 static if (is(typeof(S.init.truncate(ulong.init)))) 206 void truncate(ulong size) 207 { 208 sync(); 209 state.stream.truncate(size); 210 state.size = size; 211 } 212 213 const(ubyte)[] peek() 214 { 215 auto limit = (state.ptr / state.bufferSize + 1) * state.bufferSize; 216 auto dummy = () @trusted { return (cast(const(ubyte)*)null)[0 .. cast(size_t)(limit - state.ptr)]; } (); 217 218 const(ubyte)[] ret; 219 state.iterateChunks!(const(ubyte))(state.ptr, dummy, (offset, scope _, buf, buf_begin, buf_end) { 220 if (buf >= 0) { 221 auto b = &state.buffers[buf]; 222 ret = b.memory[buf_begin .. min(buf_end, b.fill)]; 223 state.touchBuffer(buf); 224 } 225 return false; 226 }); 227 return ret; 228 } 229 230 size_t read(scope ubyte[] dst, IOMode mode) 231 @blocking { 232 size_t nread = 0; 233 234 // update size if a read past EOF is expected 235 if (state.ptr + dst.length > state.size) state.size = state.stream.size; 236 237 state.iterateChunks!ubyte(state.ptr, dst, (offset, scope dst_chunk, buf, buf_begin, buf_end) { 238 if (buf < 0) { 239 if (mode == IOMode.immediate) return false; 240 if (mode == IOMode.once && nread) return false; 241 buf = state.bufferChunk(offset / state.bufferSize); 242 } else state.touchBuffer(buf); 243 244 if (state.buffers[buf].fill < buf_end) { 245 if (mode == IOMode.immediate) return false; 246 if (mode == IOMode.once && nread) return false; 247 state.fillBuffer(buf, buf_end); 248 } 249 250 // the whole of dst_chunk is now in the buffer 251 assert(buf_begin % state.bufferSize == offset % state.bufferSize); 252 assert(dst_chunk.length <= buf_end - buf_begin); 253 dst_chunk[] = state.buffers[buf].memory[buf_begin .. buf_begin + dst_chunk.length]; 254 nread += dst_chunk.length; 255 256 return true; 257 }); 258 259 if (mode == IOMode.all && dst.length != nread) 260 throw new Exception("Reading past end of stream."); 261 262 state.ptr += nread; 263 264 return nread; 265 } 266 267 void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); } 268 269 size_t write(in ubyte[] bytes, IOMode mode) 270 @blocking { 271 size_t nwritten = 0; 272 273 state.iterateChunks!(const(ubyte))(state.ptr, bytes, (offset, scope src_chunk, buf, buf_begin, buf_end) { 274 if (buf < 0) { // write through if not buffered 275 if (mode == IOMode.immediate) return false; 276 if (mode == IOMode.once && nwritten) return false; 277 278 state.stream.seek(offset); 279 state.stream.write(src_chunk); 280 } else { 281 auto b = &state.buffers[buf]; 282 b.memory[buf_begin .. buf_begin + src_chunk.length] = src_chunk; 283 b.fill = max(b.fill, buf_begin + src_chunk.length); 284 b.dirty = true; 285 } 286 287 nwritten += src_chunk.length; 288 if (offset + src_chunk.length > state.size) 289 state.size = offset + src_chunk.length; 290 291 return true; 292 }); 293 294 assert(mode != IOMode.all || nwritten == bytes.length); 295 296 state.ptr += nwritten; 297 298 return nwritten; 299 } 300 301 void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); } 302 void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } 303 304 void flush() @blocking { state.flush(); } 305 306 /** Flushes and releases all buffers and updates the buffer size. 307 308 This forces the buffered view of the source stream to be in sync with 309 the actual source stream. 310 */ 311 void sync() 312 @blocking { 313 flush(); 314 foreach (ref b; state.buffers) { 315 b.chunk = ulong.max; 316 b.fill = 0; 317 } 318 state.size = state.stream.size; 319 } 320 321 void finalize() @blocking { flush(); } 322 323 void seek(ulong offset) { state.ptr = offset; } 324 ulong tell() nothrow { return state.ptr; } 325 326 private ref inout(State) state() @trusted nothrow return inout { return *m_state; } 327 } 328 329 mixin validateRandomAccessStream!(BufferedStream!RandomAccessStream); 330 331 @safe unittest { 332 import std.exception : assertThrown; 333 import vibe.stream.memory : createMemoryStream; 334 import vibe.stream.operations : readAll; 335 336 auto buf = new ubyte[](256); 337 foreach (i, ref b; buf) b = cast(ubyte)i; 338 auto str = createMemoryStream(buf, true, 128); 339 auto bstr = bufferedStream(str, 16, 4); 340 341 // test empty method 342 assert(!bstr.empty); 343 bstr.readAll(); 344 assert(bstr.empty); 345 346 bstr.seek(0); 347 348 ubyte[1] bb; 349 350 // test that each byte is readable 351 foreach (i; 0 .. 128) { 352 bstr.read(bb); 353 assert(bb[0] == i); 354 } 355 356 // same in reverse 357 foreach_reverse (i; 0 .. 128) { 358 bstr.seek(i); 359 bstr.read(bb); 360 assert(bb[0] == i); 361 } 362 363 // the first chunk should be cached now 364 assert(bstr.dataAvailableForRead); 365 assert(bstr.peek().length == 15); 366 assert(bstr.peek()[0] == 1); 367 368 // the last one should not 369 bstr.seek(126); 370 assert(!bstr.dataAvailableForRead); 371 assert(bstr.peek().length == 0); 372 assert(bstr.leastSize == 2); 373 374 // a read brings it back 375 bstr.read(bb); 376 assert(bb[0] == 126); 377 assert(bstr.dataAvailableForRead); 378 assert(bstr.peek() == [127]); 379 380 // the first to third ones should still be there 381 ubyte[3*16-8] mb; 382 bstr.seek(0); 383 assert(bstr.dataAvailableForRead); 384 assert(bstr.read(mb, IOMode.immediate) == mb.length); 385 foreach (i, b; mb) assert(i == b); 386 387 // should read only the remaining 8 buffered bytes 388 assert(bstr.read(mb, IOMode.immediate) == 8); 389 assert(bstr.tell == 3*16); 390 bstr.seek(mb.length); 391 392 // should also read only the remaining 8 buffered bytes 393 assert(bstr.read(mb, IOMode.once) == 8); 394 assert(bstr.tell == 3*16); 395 bstr.seek(mb.length); 396 397 // should read the whole buffer, caching the fourth and fifth chunk 398 assert(bstr.read(mb, IOMode.all) == mb.length); 399 assert(bstr.tell == 2*mb.length); 400 foreach (i, b; mb) assert(i + mb.length == b); 401 402 // the first chunk should now be out of cache 403 bstr.seek(0); 404 assert(!bstr.dataAvailableForRead); 405 406 // reading with immediate should consequently fail 407 assert(bstr.read(mb, IOMode.immediate) == 0); 408 409 // the second/third ones should still be in 410 bstr.seek(16); 411 assert(bstr.dataAvailableForRead); 412 bstr.seek(2*16); 413 assert(bstr.dataAvailableForRead); 414 415 // reading uncached data followed by cached data should succeed for "once" 416 bstr.seek(0); 417 assert(bstr.read(mb, IOMode.once) == mb.length); 418 foreach (i, b; mb) assert(i == b); 419 420 // the first three and the fifth chunk should now be cached 421 bstr.seek(0); 422 assert(bstr.dataAvailableForRead); 423 bstr.seek(16); 424 assert(bstr.dataAvailableForRead); 425 bstr.seek(32); 426 assert(bstr.dataAvailableForRead); 427 bstr.seek(48); 428 assert(!bstr.dataAvailableForRead); 429 bstr.seek(64); 430 assert(bstr.dataAvailableForRead); 431 432 // reading once should read until the end of the cached chunk sequence 433 bstr.seek(0); 434 assert(bstr.read(mb, IOMode.once) == mb.length); 435 foreach (i, b; mb) assert(i == b); 436 437 // produce an EOF error 438 bstr.seek(128); 439 assertThrown(bstr.read(bb)); 440 assert(bstr.tell == 128); 441 442 // add more data from the outside 443 str.seek(str.size); 444 str.write([cast(ubyte)128]); 445 446 // should now succeed 447 bstr.read(bb); 448 assert(bb[0] == 128); 449 450 // next byte should produce an EOF error again 451 assertThrown(bstr.read(bb)); 452 453 // add more data from the inside 454 bstr.write([ubyte(129)]); 455 456 // should now succeed 457 bstr.seek(129); 458 bstr.read(bb); 459 assert(bb[0] == 129); 460 assert(bstr.size == 130); 461 assert(str.size == 129); 462 bstr.flush(); 463 assert(str.size == 130); 464 465 // next byte should produce an EOF error again 466 bstr.seek(130); 467 assertThrown(bstr.read(bb)); 468 assert(bstr.tell == 130); 469 assertThrown(bstr.read(bb, IOMode.once)); 470 assert(bstr.tell == 130); 471 472 // add more data from the inside (chunk now cached) 473 bstr.write([ubyte(130)]); 474 assert(bstr.size == 131); 475 assert(str.size == 130); 476 477 // should now succeed 478 bstr.seek(130); 479 bstr.read(bb); 480 assert(bb[0] == 130); 481 482 // flush should write though to the file 483 bstr.flush(); 484 assert(str.size == 131); 485 486 // read back the written data from the underlying file 487 bstr.sync(); 488 bstr.seek(129); 489 bstr.read(bb); 490 assert(bb[0] == 129); 491 bstr.read(bb); 492 assert(bb[0] == 130); 493 } 494 495 unittest { 496 static assert(isTruncatableStream!(BufferedStream!TruncatableStream)); 497 static assert(isClosableRandomAccessStream!(BufferedStream!ClosableRandomAccessStream)); 498 } 499 500 private struct Buffer { 501 ulong chunk = ulong.max; // chunk index (offset = chunk * state.bufferSize) 502 ubyte[] memory; 503 ulong lastAccess; 504 size_t fill; 505 bool dirty; 506 }