1 /** Implements a buffered random access wrapper stream. 2 3 Copyright: © 2020-2021 Sönke Ludwig 4 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 5 Authors: Sönke Ludwig 6 */ 7 module vibe.stream.bufferedstream; 8 9 import vibe.core.stream; 10 11 import std.algorithm; 12 import std.traits : Unqual; 13 14 15 /** Creates a new buffered stream wrapper. 16 17 Params: 18 stream = The stream that is going to be wrapped 19 buffer_size = Size of a single buffer segment 20 buffer_count = Number of buffer segments 21 */ 22 auto bufferedStream(S)(S stream, size_t buffer_size = 16_384, size_t buffer_count = 4) 23 if (isRandomAccessStream!S) 24 in { assert(buffer_size > 0 && buffer_count > 0); } 25 do { 26 return BufferedStream!S(stream.move, buffer_size, buffer_count); 27 } 28 29 30 /** Random access stream that provides buffering on top of a wrapped stream. 31 32 The source stream is logically split up into chunks of equal size, where 33 a defined maximum number of those chunks can be cached in memory. Cached 34 chunks are managed using a LRU strategy. 35 */ 36 struct BufferedStream(S) { 37 import std.experimental.allocator; 38 import std.experimental.allocator.mallocator; 39 40 private static struct State { 41 S stream; 42 int refCount = 1; 43 ulong ptr; 44 ulong size; 45 size_t bufferSize; 46 int bufferSizeBits; 47 Buffer[] buffers; 48 ulong accessCount; 49 ubyte[] buffermemory; 50 // slice into the matching chunk buffer, beginning at the current ptr 51 ubyte[] peekBuffer; 52 53 this(size_t buffer_size, size_t buffer_count, S stream) 54 { 55 import core.bitop : bsr; 56 57 this.bufferSizeBits = max(bsr(buffer_size), 1); 58 this.bufferSize = 1 << this.bufferSizeBits; 59 this.buffers = Mallocator.instance.makeArray!Buffer(buffer_count); 60 this.buffermemory = Mallocator.instance.makeArray!ubyte(buffer_count * buffer_size); 61 foreach (i, ref b; this.buffers) 62 b.memory = this.buffermemory[i * buffer_size .. (i+1) * buffer_size]; 63 this.size = stream.size; 64 this.stream = stream.move; 65 } 66 67 ~this() 68 { 69 if (this.buffers is null) return; 70 71 if (this.stream.writable) 72 flush(); 73 74 () @trusted { 75 Mallocator.instance.dispose(this.buffermemory); 76 Mallocator.instance.dispose(this.buffers); 77 } (); 78 } 79 80 @disable this(this); 81 82 void flush() 83 { 84 foreach (i; 0 .. this.buffers.length) 85 flushBuffer(i); 86 this.stream.flush(); 87 } 88 89 private size_t bufferChunk(ulong chunk_index) 90 { 91 auto idx = this.buffers.countUntil!((ref b) => b.chunk == chunk_index); 92 if (idx >= 0) return idx; 93 94 auto offset = chunk_index << this.bufferSizeBits; 95 if (offset >= this.size) this.size = this.stream.size; 96 if (offset >= this.size) throw new Exception("Reading past end of stream."); 97 98 auto newidx = this.buffers.minIndex!((ref a, ref b) => a.lastAccess < b.lastAccess); 99 flushBuffer(newidx); 100 101 // clear peek buffer in case it points to the reused buffer 102 if (this.buffers[newidx].chunk == this.ptr >> this.bufferSizeBits) 103 this.peekBuffer = null; 104 this.buffers[newidx].fill = 0; 105 this.buffers[newidx].chunk = chunk_index; 106 fillBuffer(newidx, min(this.size - (chunk_index << this.bufferSizeBits), this.bufferSize)); 107 return newidx; 108 } 109 110 private void fillBuffer(size_t buffer, size_t size) 111 { 112 auto b = &this.buffers[buffer]; 113 if (size <= b.fill) return; 114 assert(size <= this.bufferSize); 115 116 this.stream.seek((b.chunk << this.bufferSizeBits) + b.fill); 117 this.stream.read(b.memory[b.fill .. size]); 118 b.fill = size; 119 touchBuffer(buffer); 120 } 121 122 private void flushBuffer(size_t buffer) 123 { 124 auto b = &this.buffers[buffer]; 125 if (!b.fill || !b.dirty) return; 126 this.stream.seek(b.chunk << this.bufferSizeBits); 127 this.stream.write(b.memory[0 .. b.fill]); 128 if ((b.chunk << this.bufferSizeBits) + b.fill > this.size) 129 this.size = (b.chunk << this.bufferSizeBits) + b.fill; 130 b.dirty = false; 131 touchBuffer(buffer); 132 } 133 134 private void touchBuffer(size_t buffer) 135 { 136 this.buffers[buffer].lastAccess = ++this.accessCount; 137 } 138 139 private void iterateChunks(B)(ulong offset, scope B[] bytes, 140 scope bool delegate(ulong offset, ulong chunk, scope B[] bytes, sizediff_t buffer, size_t buffer_begin, size_t buffer_end) @safe del) 141 @safe { 142 doIterateChunks!B(offset, bytes, del); 143 } 144 145 private void iterateChunks(B)(ulong offset, scope B[] bytes, 146 scope bool delegate(ulong offset, ulong chunk, scope B[] bytes, sizediff_t buffer, size_t buffer_begin, size_t buffer_end) @safe nothrow del) 147 @safe nothrow { 148 doIterateChunks!B(offset, bytes, del); 149 } 150 151 private void doIterateChunks(B, DEL)(ulong offset, scope B[] bytes, 152 scope DEL del) 153 @safe { 154 auto begin = offset; 155 auto end = offset + bytes.length; 156 157 if (bytes.length == 0) return; 158 159 ulong chunk_begin, chunk_end, chunk_off; 160 chunk_begin = begin >> this.bufferSizeBits; 161 chunk_end = (end + this.bufferSize - 1) >> this.bufferSizeBits; 162 chunk_off = chunk_begin << this.bufferSizeBits; 163 164 foreach (i; chunk_begin .. chunk_end) { 165 auto cstart = max(chunk_off, begin); 166 auto cend = min(chunk_off + this.bufferSize, end); 167 assert(cend > cstart); 168 169 auto buf = this.buffers.countUntil!((ref b) => b.chunk == i); 170 auto buf_begin = cast(size_t)(cstart - chunk_off); 171 auto buf_end = cast(size_t)(cend - chunk_off); 172 173 auto bytes_chunk = bytes[cast(size_t)(cstart - begin) .. cast(size_t)(cend - begin)]; 174 175 if (!del(cstart, i, bytes_chunk, buf, buf_begin, buf_end)) 176 break; 177 178 chunk_off += this.bufferSize; 179 } 180 } 181 } 182 183 private { 184 // makes the stream copyable and makes it small enough to be put into 185 // a stream proxy 186 State* m_state; 187 } 188 189 private this(S stream, size_t buffer_size, size_t buffer_count) 190 @safe { 191 m_state = new State(buffer_size, buffer_count, stream.move); 192 } 193 194 this(this) 195 { 196 if (m_state) m_state.refCount++; 197 } 198 199 ~this() 200 @safe { 201 if (m_state) { 202 if (!--m_state.refCount) 203 destroy(*m_state); 204 } 205 } 206 207 @property bool empty() @blocking { return state.ptr >= state.size; } 208 @property ulong leastSize() @blocking { return state.size - state.ptr; } 209 @property bool dataAvailableForRead() { return peek().length > 0; } 210 @property ulong size() const nothrow { return state.size; } 211 @property bool readable() const nothrow { return state.stream.readable; } 212 @property bool writable() const nothrow { return state.stream.writable; } 213 214 @property ref inout(S) underlying() inout { return state.stream; } 215 216 static if (isClosableRandomAccessStream!S) { 217 void close() 218 { 219 sync(); 220 state.stream.close(); 221 } 222 223 @property bool isOpen() 224 const { 225 return state.stream.isOpen(); 226 } 227 } 228 229 static if (is(typeof(S.init.truncate(ulong.init)))) 230 void truncate(ulong size) 231 { 232 sync(); 233 state.stream.truncate(size); 234 state.size = size; 235 state.peekBuffer = null; 236 } 237 238 const(ubyte)[] peek() 239 { 240 return state.peekBuffer; 241 } 242 243 size_t read(scope ubyte[] dst, IOMode mode) 244 @blocking { 245 if (dst.length <= state.peekBuffer.length) { 246 dst[] = state.peekBuffer[0 .. dst.length]; 247 state.peekBuffer = state.peekBuffer[dst.length .. $]; 248 state.ptr += dst.length; 249 return dst.length; 250 } 251 252 size_t nread = 0; 253 254 // update size if a read past EOF is expected 255 if (state.ptr + dst.length > state.size) state.size = state.stream.size; 256 257 ubyte[] newpeek; 258 259 state.iterateChunks!ubyte(state.ptr, dst, (offset, chunk, scope dst_chunk, buf, buf_begin, buf_end) { 260 if (buf < 0) { 261 if (mode == IOMode.immediate) return false; 262 if (mode == IOMode.once && nread) return false; 263 buf = state.bufferChunk(chunk); 264 } else state.touchBuffer(buf); 265 266 if (state.buffers[buf].fill < buf_end) { 267 if (mode == IOMode.immediate) return false; 268 if (mode == IOMode.once && nread) return false; 269 state.fillBuffer(buf, buf_end); 270 } 271 272 // the whole of dst_chunk is now in the buffer 273 assert((buf_begin & ((1<<state.bufferSizeBits)-1)) == (offset & ((1<<state.bufferSizeBits)-1))); 274 assert(dst_chunk.length <= buf_end - buf_begin); 275 dst_chunk[] = state.buffers[buf].memory[buf_begin .. buf_begin + dst_chunk.length]; 276 nread += dst_chunk.length; 277 278 // any remaining buffer space of the last chunk will be used for 279 // quick access on the next read 280 newpeek = state.buffers[buf].memory[buf_begin + dst_chunk.length .. state.buffers[buf].fill]; 281 282 return true; 283 }); 284 285 if (mode == IOMode.all && dst.length != nread) 286 throw new Exception("Reading past end of stream."); 287 288 state.ptr += nread; 289 state.peekBuffer = newpeek; 290 291 return nread; 292 } 293 294 void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); } 295 296 size_t write(in ubyte[] bytes, IOMode mode) 297 @blocking { 298 size_t nwritten = 0; 299 300 ubyte[] newpeek; 301 302 state.iterateChunks!(const(ubyte))(state.ptr, bytes, (offset, chunk, scope src_chunk, buf, buf_begin, buf_end) { 303 if (buf < 0) { // write through if not buffered 304 if (mode == IOMode.immediate) return false; 305 if (mode == IOMode.once && nwritten) return false; 306 307 state.stream.seek(offset); 308 state.stream.write(src_chunk); 309 } else { 310 auto b = &state.buffers[buf]; 311 b.memory[buf_begin .. buf_begin + src_chunk.length] = src_chunk; 312 b.fill = max(b.fill, buf_begin + src_chunk.length); 313 b.dirty = true; 314 } 315 316 nwritten += src_chunk.length; 317 if (offset + src_chunk.length > state.size) 318 state.size = offset + src_chunk.length; 319 320 // any remaining buffer space of the last chunk will be used for 321 // quick access on the next read 322 if (buf >= 0) 323 newpeek = state.buffers[buf].memory[buf_begin + src_chunk.length .. $]; 324 325 return true; 326 }); 327 328 assert(mode != IOMode.all || nwritten == bytes.length); 329 330 state.ptr += nwritten; 331 state.peekBuffer = newpeek; 332 333 return nwritten; 334 } 335 336 void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); } 337 void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } 338 339 void flush() @blocking { state.flush(); } 340 341 /** Flushes and releases all buffers and updates the buffer size. 342 343 This forces the buffered view of the source stream to be in sync with 344 the actual source stream. 345 */ 346 void sync() 347 @blocking { 348 flush(); 349 foreach (ref b; state.buffers) { 350 b.chunk = ulong.max; 351 b.fill = 0; 352 } 353 state.size = state.stream.size; 354 state.peekBuffer = null; 355 } 356 357 void finalize() @blocking { flush(); } 358 359 void seek(ulong offset) 360 nothrow { 361 state.ptr = offset; 362 363 if (offset > state.ptr && offset < state.ptr + state.peekBuffer.length) { 364 state.peekBuffer = state.peekBuffer[cast(size_t)(offset - state.ptr) .. $]; 365 } else { 366 ubyte[1] dummy; 367 state.peekBuffer = null; 368 state.iterateChunks!ubyte(offset, dummy[], (offset, chunk, scope bytes, buffer, buffer_begin, buffer_end) @safe nothrow { 369 if (buffer >= 0 && buffer_begin < state.buffers[buffer].fill) { 370 state.peekBuffer = state.buffers[buffer].memory[buffer_begin .. state.buffers[buffer].fill]; 371 state.touchBuffer(buffer); 372 } 373 return true; 374 }); 375 } 376 } 377 ulong tell() nothrow { return state.ptr; } 378 379 private ref inout(State) state() @trusted nothrow return inout { return *m_state; } 380 } 381 382 mixin validateRandomAccessStream!(BufferedStream!RandomAccessStream); 383 384 @safe unittest { 385 import std.exception : assertThrown; 386 import vibe.stream.memory : createMemoryStream; 387 import vibe.stream.operations : readAll; 388 389 auto buf = new ubyte[](256); 390 foreach (i, ref b; buf) b = cast(ubyte)i; 391 auto str = createMemoryStream(buf, true, 128); 392 auto bstr = bufferedStream(str, 16, 4); 393 394 // test empty method 395 assert(!bstr.empty); 396 bstr.readAll(); 397 assert(bstr.empty); 398 399 bstr.seek(0); 400 401 ubyte[1] bb; 402 403 // test that each byte is readable 404 foreach (i; 0 .. 128) { 405 bstr.read(bb); 406 assert(bb[0] == i); 407 } 408 409 // same in reverse 410 foreach_reverse (i; 0 .. 128) { 411 bstr.seek(i); 412 bstr.read(bb); 413 assert(bb[0] == i); 414 } 415 416 // the first chunk should be cached now 417 assert(bstr.dataAvailableForRead); 418 assert(bstr.peek().length == 15); 419 assert(bstr.peek()[0] == 1); 420 421 // the last one should not 422 bstr.seek(126); 423 assert(!bstr.dataAvailableForRead); 424 assert(bstr.peek().length == 0); 425 assert(bstr.leastSize == 2); 426 427 // a read brings it back 428 bstr.read(bb); 429 assert(bb[0] == 126); 430 assert(bstr.dataAvailableForRead); 431 assert(bstr.peek() == [127]); 432 433 // the first to third ones should still be there 434 ubyte[3*16-8] mb; 435 bstr.seek(0); 436 assert(bstr.dataAvailableForRead); 437 assert(bstr.read(mb, IOMode.immediate) == mb.length); 438 foreach (i, b; mb) assert(i == b); 439 440 // should read only the remaining 8 buffered bytes 441 assert(bstr.read(mb, IOMode.immediate) == 8); 442 assert(bstr.tell == 3*16); 443 bstr.seek(mb.length); 444 445 // should also read only the remaining 8 buffered bytes 446 assert(bstr.read(mb, IOMode.once) == 8); 447 assert(bstr.tell == 3*16); 448 bstr.seek(mb.length); 449 450 // should read the whole buffer, caching the fourth and fifth chunk 451 assert(bstr.read(mb, IOMode.all) == mb.length); 452 assert(bstr.tell == 2*mb.length); 453 foreach (i, b; mb) assert(i + mb.length == b); 454 455 // the first chunk should now be out of cache 456 bstr.seek(0); 457 assert(!bstr.dataAvailableForRead); 458 459 // reading with immediate should consequently fail 460 assert(bstr.read(mb, IOMode.immediate) == 0); 461 462 // the second/third ones should still be in 463 bstr.seek(16); 464 assert(bstr.dataAvailableForRead); 465 bstr.seek(2*16); 466 assert(bstr.dataAvailableForRead); 467 468 // reading uncached data followed by cached data should succeed for "once" 469 bstr.seek(0); 470 assert(bstr.read(mb, IOMode.once) == mb.length); 471 foreach (i, b; mb) assert(i == b); 472 473 // the first three and the fifth chunk should now be cached 474 bstr.seek(0); 475 assert(bstr.dataAvailableForRead); 476 bstr.seek(16); 477 assert(bstr.dataAvailableForRead); 478 bstr.seek(32); 479 assert(bstr.dataAvailableForRead); 480 bstr.seek(48); 481 assert(!bstr.dataAvailableForRead); 482 bstr.seek(64); 483 assert(bstr.dataAvailableForRead); 484 485 // reading once should read until the end of the cached chunk sequence 486 bstr.seek(0); 487 assert(bstr.read(mb, IOMode.once) == mb.length); 488 foreach (i, b; mb) assert(i == b); 489 490 // produce an EOF error 491 bstr.seek(128); 492 assertThrown(bstr.read(bb)); 493 assert(bstr.tell == 128); 494 495 // add more data from the outside 496 str.seek(str.size); 497 str.write([cast(ubyte)128]); 498 499 // should now succeed 500 bstr.read(bb); 501 assert(bb[0] == 128); 502 503 // next byte should produce an EOF error again 504 assertThrown(bstr.read(bb)); 505 506 // add more data from the inside 507 bstr.write([ubyte(129)]); 508 509 // should now succeed 510 bstr.seek(129); 511 bstr.read(bb); 512 assert(bb[0] == 129); 513 assert(bstr.size == 130); 514 assert(str.size == 129); 515 bstr.flush(); 516 assert(str.size == 130); 517 518 // next byte should produce an EOF error again 519 bstr.seek(130); 520 assertThrown(bstr.read(bb)); 521 assert(bstr.tell == 130); 522 assertThrown(bstr.read(bb, IOMode.once)); 523 assert(bstr.tell == 130); 524 525 // add more data from the inside (chunk now cached) 526 bstr.write([ubyte(130)]); 527 assert(bstr.size == 131); 528 assert(str.size == 130); 529 530 // should now succeed 531 bstr.seek(130); 532 bstr.read(bb); 533 assert(bb[0] == 130); 534 535 // flush should write though to the file 536 bstr.flush(); 537 assert(str.size == 131); 538 539 // read back the written data from the underlying file 540 bstr.sync(); 541 bstr.seek(129); 542 bstr.read(bb); 543 assert(bb[0] == 129); 544 bstr.read(bb); 545 assert(bb[0] == 130); 546 } 547 548 @safe unittest { // regression: write after read causes write to be missed during flush 549 import std.exception : assertThrown; 550 import vibe.stream.memory : createMemoryStream; 551 import vibe.stream.operations : readAll; 552 553 auto buf = new ubyte[](256); 554 foreach (i, ref b; buf) b = cast(ubyte)i; 555 auto str = createMemoryStream(buf, true, 128); 556 auto bstr = bufferedStream(str, 16, 4); 557 558 ubyte[1] ob; 559 bstr.read(ob[]); 560 assert(ob[0] == 0); 561 bstr.seek(0); 562 bstr.write([cast(ubyte)1]); 563 bstr.seek(0); 564 bstr.read(ob[]); 565 assert(ob[0] == 1); 566 bstr.finalize(); 567 str.seek(0); 568 str.read(ob[]); 569 assert(ob[0] == 1); 570 } 571 572 @safe unittest { // regression seeking past end of file within the last chunk 573 import std.exception : assertThrown; 574 import vibe.stream.memory : createMemoryStream; 575 import vibe.stream.operations : readAll; 576 577 auto buf = new ubyte[](256); 578 foreach (i, ref b; buf) b = cast(ubyte)i; 579 auto str = createMemoryStream(buf, true, 1); 580 auto bstr = bufferedStream(str, 16, 4); 581 582 ubyte[1] ob; 583 bstr.read(ob[]); 584 assert(ob[0] == 0); 585 bstr.seek(10); 586 bstr.write([cast(ubyte)1]); 587 bstr.seek(10); 588 bstr.read(ob[]); 589 assert(ob[0] == 1); 590 } 591 592 unittest { 593 static assert(isTruncatableStream!(BufferedStream!TruncatableStream)); 594 static assert(isClosableRandomAccessStream!(BufferedStream!ClosableRandomAccessStream)); 595 } 596 597 private struct Buffer { 598 ulong chunk = ulong.max; // chunk index (offset = chunk * state.bufferSize) 599 ubyte[] memory; 600 ulong lastAccess; 601 size_t fill; 602 bool dirty; 603 }