1 /** Implements a buffered random access wrapper stream.
2 
3 	Copyright: © 2020-2021 Sönke Ludwig
4 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
5 	Authors: Sönke Ludwig
6 */
7 module vibe.stream.bufferedstream;
8 
9 import vibe.core.stream;
10 
11 import std.algorithm;
12 import std.traits : Unqual;
13 
14 
15 /** Creates a new buffered stream wrapper.
16 
17 	Params:
18 		stream = The stream that is going to be wrapped
19 		buffer_size = Size of a single buffer segment
20 		buffer_count = Number of buffer segments
21 */
22 auto bufferedStream(S)(S stream, size_t buffer_size = 16_384, size_t buffer_count = 4)
23 	if (isRandomAccessStream!S)
24 	in { assert(buffer_size > 0 && buffer_count > 0); }
25 do {
26 	return BufferedStream!S(stream.move, buffer_size, buffer_count);
27 }
28 
29 
30 /** Random access stream that provides buffering on top of a wrapped stream.
31 
32 	The source stream is logically split up into chunks of equal size, where
33 	a defined maximum number of those chunks can be cached in memory. Cached
34 	chunks are managed using a LRU strategy.
35 */
36 struct BufferedStream(S) {
37 	import std.experimental.allocator;
38 	import std.experimental.allocator.mallocator;
39 
40 	private static struct State {
41 		S stream;
42 		int refCount = 1;
43 		ulong ptr;
44 		ulong size;
45 		size_t bufferSize;
46 		int bufferSizeBits;
47 		Buffer[] buffers;
48 		ulong accessCount;
49 		ubyte[] buffermemory;
50 		// slice into the matching chunk buffer, beginning at the current ptr
51 		ubyte[] peekBuffer;
52 
53 		this(size_t buffer_size, size_t buffer_count, S stream)
54 		{
55 			import core.bitop : bsr;
56 
57 			this.bufferSizeBits = max(bsr(buffer_size), 1);
58 			this.bufferSize = 1 << this.bufferSizeBits;
59 			this.buffers = Mallocator.instance.makeArray!Buffer(buffer_count);
60 			this.buffermemory = Mallocator.instance.makeArray!ubyte(buffer_count * buffer_size);
61 			foreach (i, ref b; this.buffers)
62 				b.memory = this.buffermemory[i * buffer_size .. (i+1) * buffer_size];
63 			this.size = stream.size;
64 			this.stream = stream.move;
65 		}
66 
67 		~this()
68 		{
69 			if (this.buffers is null) return;
70 
71 			if (this.stream.writable)
72 				flush();
73 
74 			() @trusted {
75 				Mallocator.instance.dispose(this.buffermemory);
76 				Mallocator.instance.dispose(this.buffers);
77 			} ();
78 		}
79 
80 		@disable this(this);
81 
82 		void flush()
83 		{
84 			foreach (i; 0 .. this.buffers.length)
85 				flushBuffer(i);
86 			this.stream.flush();
87 		}
88 
89 		private size_t bufferChunk(ulong chunk_index)
90 		{
91 			auto idx = this.buffers.countUntil!((ref b) => b.chunk == chunk_index);
92 			if (idx >= 0) return idx;
93 
94 			auto offset = chunk_index << this.bufferSizeBits;
95 			if (offset >= this.size) this.size = this.stream.size;
96 			if (offset >= this.size) throw new Exception("Reading past end of stream.");
97 
98 			auto newidx = this.buffers.minIndex!((ref a, ref b) => a.lastAccess < b.lastAccess);
99 			flushBuffer(newidx);
100 
101 			// clear peek buffer in case it points to the reused buffer
102 			if (this.buffers[newidx].chunk == this.ptr >> this.bufferSizeBits)
103 				this.peekBuffer = null;
104 			this.buffers[newidx].fill = 0;
105 			this.buffers[newidx].chunk = chunk_index;
106 			fillBuffer(newidx, min(this.size - (chunk_index << this.bufferSizeBits), this.bufferSize));
107 			return newidx;
108 		}
109 
110 		private void fillBuffer(size_t buffer, size_t size)
111 		{
112 			auto b = &this.buffers[buffer];
113 			if (size <= b.fill) return;
114 			assert(size <= this.bufferSize);
115 
116 			this.stream.seek((b.chunk << this.bufferSizeBits) + b.fill);
117 			this.stream.read(b.memory[b.fill .. size]);
118 			b.fill = size;
119 			touchBuffer(buffer);
120 		}
121 
122 		private void flushBuffer(size_t buffer)
123 		{
124 			auto b = &this.buffers[buffer];
125 			if (!b.fill || !b.dirty) return;
126 			this.stream.seek(b.chunk << this.bufferSizeBits);
127 			this.stream.write(b.memory[0 .. b.fill]);
128 			if ((b.chunk << this.bufferSizeBits) + b.fill > this.size)
129 				this.size = (b.chunk << this.bufferSizeBits) + b.fill;
130 			b.dirty = false;
131 			touchBuffer(buffer);
132 		}
133 
134 		private void touchBuffer(size_t buffer)
135 		{
136 			this.buffers[buffer].lastAccess = ++this.accessCount;
137 		}
138 
139 		private void iterateChunks(B)(ulong offset, scope B[] bytes,
140 			scope bool delegate(ulong offset, ulong chunk, scope B[] bytes, sizediff_t buffer, size_t buffer_begin, size_t buffer_end) @safe del)
141 		@safe {
142 			doIterateChunks!B(offset, bytes, del);
143 		}
144 
145 		private void iterateChunks(B)(ulong offset, scope B[] bytes,
146 			scope bool delegate(ulong offset, ulong chunk, scope B[] bytes, sizediff_t buffer, size_t buffer_begin, size_t buffer_end) @safe nothrow del)
147 		@safe nothrow {
148 			doIterateChunks!B(offset, bytes, del);
149 		}
150 
151 		private void doIterateChunks(B, DEL)(ulong offset, scope B[] bytes,
152 			scope DEL del)
153 		@safe {
154 			auto begin = offset;
155 			auto end = offset + bytes.length;
156 
157 			if (bytes.length == 0) return;
158 
159 			ulong chunk_begin, chunk_end, chunk_off;
160 			chunk_begin = begin >> this.bufferSizeBits;
161 			chunk_end = (end + this.bufferSize - 1) >> this.bufferSizeBits;
162 			chunk_off = chunk_begin << this.bufferSizeBits;
163 
164 			foreach (i; chunk_begin .. chunk_end) {
165 				auto cstart = max(chunk_off, begin);
166 				auto cend = min(chunk_off + this.bufferSize, end);
167 				assert(cend > cstart);
168 
169 				auto buf = this.buffers.countUntil!((ref b) => b.chunk == i);
170 				auto buf_begin = cast(size_t)(cstart - chunk_off);
171 				auto buf_end = cast(size_t)(cend - chunk_off);
172 
173 				auto bytes_chunk = bytes[cast(size_t)(cstart - begin) .. cast(size_t)(cend - begin)];
174 
175 				if (!del(cstart, i, bytes_chunk, buf, buf_begin, buf_end))
176 					break;
177 
178 				chunk_off += this.bufferSize;
179 			}
180 		}
181 	}
182 
183 	private {
184 		// makes the stream copyable and makes it small enough to be put into
185 		// a stream proxy
186 		State* m_state;
187 	}
188 
189 	private this(S stream, size_t buffer_size, size_t buffer_count)
190 	@safe {
191 		m_state = new State(buffer_size, buffer_count, stream.move);
192 	}
193 
194 	this(this)
195 	{
196 		if (m_state) m_state.refCount++;
197 	}
198 
199 	~this()
200 	@safe {
201 		if (m_state) {
202 			if (!--m_state.refCount)
203 				destroy(*m_state);
204 		}
205 	}
206 
207 	@property bool empty() @blocking { return state.ptr >= state.size; }
208 	@property ulong leastSize() @blocking { return state.size - state.ptr; }
209 	@property bool dataAvailableForRead() { return peek().length > 0; }
210 	@property ulong size() const nothrow { return state.size; }
211 	@property bool readable() const nothrow { return state.stream.readable; }
212 	@property bool writable() const nothrow { return state.stream.writable; }
213 
214 	@property ref inout(S) underlying() inout { return state.stream; }
215 
216 	static if (isClosableRandomAccessStream!S) {
217 		void close()
218 		{
219 			sync();
220 			state.stream.close();
221 		}
222 
223 		@property bool isOpen()
224 		const {
225 			return state.stream.isOpen();
226 		}
227 	}
228 
229 	static if (is(typeof(S.init.truncate(ulong.init))))
230 		void truncate(ulong size)
231 		{
232 			sync();
233 			state.stream.truncate(size);
234 			state.size = size;
235 			state.peekBuffer = null;
236 		}
237 
238 	const(ubyte)[] peek()
239 	{
240 		return state.peekBuffer;
241 	}
242 
243 	size_t read(scope ubyte[] dst, IOMode mode)
244 	@blocking {
245 		if (dst.length <= state.peekBuffer.length) {
246 			dst[] = state.peekBuffer[0 .. dst.length];
247 			state.peekBuffer = state.peekBuffer[dst.length .. $];
248 			state.ptr += dst.length;
249 			return dst.length;
250 		}
251 
252 		size_t nread = 0;
253 
254 		// update size if a read past EOF is expected
255 		if (state.ptr + dst.length > state.size) state.size = state.stream.size;
256 
257 		ubyte[] newpeek;
258 
259 		state.iterateChunks!ubyte(state.ptr, dst, (offset, chunk, scope dst_chunk, buf, buf_begin, buf_end) {
260 			if (buf < 0) {
261 				if (mode == IOMode.immediate) return false;
262 				if (mode == IOMode.once && nread) return false;
263 				buf = state.bufferChunk(chunk);
264 			} else state.touchBuffer(buf);
265 
266 			if (state.buffers[buf].fill < buf_end) {
267 				if (mode == IOMode.immediate) return false;
268 				if (mode == IOMode.once && nread) return false;
269 				state.fillBuffer(buf, buf_end);
270 			}
271 
272 			// the whole of dst_chunk is now in the buffer
273 			assert((buf_begin & ((1<<state.bufferSizeBits)-1)) == (offset & ((1<<state.bufferSizeBits)-1)));
274 			assert(dst_chunk.length <= buf_end - buf_begin);
275 			dst_chunk[] = state.buffers[buf].memory[buf_begin .. buf_begin + dst_chunk.length];
276 			nread += dst_chunk.length;
277 
278 			// any remaining buffer space of the last chunk will be used for
279 			// quick access on the next read
280 			newpeek = state.buffers[buf].memory[buf_begin + dst_chunk.length .. state.buffers[buf].fill];
281 
282 			return true;
283 		});
284 
285 		if (mode == IOMode.all && dst.length != nread)
286 			throw new Exception("Reading past end of stream.");
287 
288 		state.ptr += nread;
289 		state.peekBuffer = newpeek;
290 
291 		return nread;
292 	}
293 
294 	void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); }
295 
296 	size_t write(in ubyte[] bytes, IOMode mode)
297 	@blocking {
298 		size_t nwritten = 0;
299 
300 		ubyte[] newpeek;
301 
302 		state.iterateChunks!(const(ubyte))(state.ptr, bytes, (offset, chunk, scope src_chunk, buf, buf_begin, buf_end) {
303 			if (buf < 0) { // write through if not buffered
304 				if (mode == IOMode.immediate) return false;
305 				if (mode == IOMode.once && nwritten) return false;
306 
307 				state.stream.seek(offset);
308 				state.stream.write(src_chunk);
309 			} else {
310 				auto b = &state.buffers[buf];
311 				b.memory[buf_begin .. buf_begin + src_chunk.length] = src_chunk;
312 				b.fill = max(b.fill, buf_begin + src_chunk.length);
313 				b.dirty = true;
314 			}
315 
316 			nwritten += src_chunk.length;
317 			if (offset + src_chunk.length > state.size)
318 				state.size = offset + src_chunk.length;
319 
320 			// any remaining buffer space of the last chunk will be used for
321 			// quick access on the next read
322 			if (buf >= 0)
323 				newpeek = state.buffers[buf].memory[buf_begin + src_chunk.length .. $];
324 
325 			return true;
326 		});
327 
328 		assert(mode != IOMode.all || nwritten == bytes.length);
329 
330 		state.ptr += nwritten;
331 		state.peekBuffer = newpeek;
332 
333 		return nwritten;
334 	}
335 
336 	void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); }
337 	void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
338 
339 	void flush() @blocking { state.flush(); }
340 
341 	/** Flushes and releases all buffers and updates the buffer size.
342 
343 		This forces the buffered view of the source stream to be in sync with
344 		the actual source stream.
345 	*/
346 	void sync()
347 	@blocking {
348 		flush();
349 		foreach (ref b; state.buffers) {
350 			b.chunk = ulong.max;
351 			b.fill = 0;
352 		}
353 		state.size = state.stream.size;
354 		state.peekBuffer = null;
355 	}
356 
357 	void finalize() @blocking { flush(); }
358 
359 	void seek(ulong offset)
360 	nothrow {
361 		state.ptr = offset;
362 
363 		if (offset > state.ptr && offset < state.ptr + state.peekBuffer.length) {
364 			state.peekBuffer = state.peekBuffer[cast(size_t)(offset - state.ptr) .. $];
365 		} else {
366 			ubyte[1] dummy;
367 			state.peekBuffer = null;
368 			state.iterateChunks!ubyte(offset, dummy[], (offset, chunk, scope bytes, buffer, buffer_begin, buffer_end) @safe nothrow {
369 				if (buffer >= 0 && buffer_begin < state.buffers[buffer].fill) {
370 					state.peekBuffer = state.buffers[buffer].memory[buffer_begin .. state.buffers[buffer].fill];
371 					state.touchBuffer(buffer);
372 				}
373 				return true;
374 			});
375 		}
376 	}
377 	ulong tell() nothrow { return state.ptr; }
378 
379 	private ref inout(State) state() @trusted nothrow return inout { return *m_state; }
380 }
381 
382 mixin validateRandomAccessStream!(BufferedStream!RandomAccessStream);
383 
384 @safe unittest {
385 	import std.exception : assertThrown;
386 	import vibe.stream.memory : createMemoryStream;
387 	import vibe.stream.operations : readAll;
388 
389 	auto buf = new ubyte[](256);
390 	foreach (i, ref b; buf) b = cast(ubyte)i;
391 	auto str = createMemoryStream(buf, true, 128);
392 	auto bstr = bufferedStream(str, 16, 4);
393 
394 	// test empty method
395 	assert(!bstr.empty);
396 	bstr.readAll();
397 	assert(bstr.empty);
398 
399 	bstr.seek(0);
400 
401 	ubyte[1] bb;
402 
403 	// test that each byte is readable
404 	foreach (i; 0 .. 128) {
405 		bstr.read(bb);
406 		assert(bb[0] == i);
407 	}
408 
409 	// same in reverse
410 	foreach_reverse (i; 0 .. 128) {
411 		bstr.seek(i);
412 		bstr.read(bb);
413 		assert(bb[0] == i);
414 	}
415 
416 	// the first chunk should be cached now
417 	assert(bstr.dataAvailableForRead);
418 	assert(bstr.peek().length == 15);
419 	assert(bstr.peek()[0] == 1);
420 
421 	// the last one should not
422 	bstr.seek(126);
423 	assert(!bstr.dataAvailableForRead);
424 	assert(bstr.peek().length == 0);
425 	assert(bstr.leastSize == 2);
426 
427 	// a read brings it back
428 	bstr.read(bb);
429 	assert(bb[0] == 126);
430 	assert(bstr.dataAvailableForRead);
431 	assert(bstr.peek() == [127]);
432 
433 	// the first to third ones should still be there
434 	ubyte[3*16-8] mb;
435 	bstr.seek(0);
436 	assert(bstr.dataAvailableForRead);
437 	assert(bstr.read(mb, IOMode.immediate) == mb.length);
438 	foreach (i, b; mb) assert(i == b);
439 
440 	// should read only the remaining 8 buffered bytes
441 	assert(bstr.read(mb, IOMode.immediate) == 8);
442 	assert(bstr.tell == 3*16);
443 	bstr.seek(mb.length);
444 
445 	// should also read only the remaining 8 buffered bytes
446 	assert(bstr.read(mb, IOMode.once) == 8);
447 	assert(bstr.tell == 3*16);
448 	bstr.seek(mb.length);
449 
450 	// should read the whole buffer, caching the fourth and fifth chunk
451 	assert(bstr.read(mb, IOMode.all) == mb.length);
452 	assert(bstr.tell == 2*mb.length);
453 	foreach (i, b; mb) assert(i + mb.length == b);
454 
455 	// the first chunk should now be out of cache
456 	bstr.seek(0);
457 	assert(!bstr.dataAvailableForRead);
458 
459 	// reading with immediate should consequently fail
460 	assert(bstr.read(mb, IOMode.immediate) == 0);
461 
462 	// the second/third ones should still be in
463 	bstr.seek(16);
464 	assert(bstr.dataAvailableForRead);
465 	bstr.seek(2*16);
466 	assert(bstr.dataAvailableForRead);
467 
468 	// reading uncached data followed by cached data should succeed for "once"
469 	bstr.seek(0);
470 	assert(bstr.read(mb, IOMode.once) == mb.length);
471 	foreach (i, b; mb) assert(i == b);
472 
473 	// the first three and the fifth chunk should now be cached
474 	bstr.seek(0);
475 	assert(bstr.dataAvailableForRead);
476 	bstr.seek(16);
477 	assert(bstr.dataAvailableForRead);
478 	bstr.seek(32);
479 	assert(bstr.dataAvailableForRead);
480 	bstr.seek(48);
481 	assert(!bstr.dataAvailableForRead);
482 	bstr.seek(64);
483 	assert(bstr.dataAvailableForRead);
484 
485 	// reading once should read until the end of the cached chunk sequence
486 	bstr.seek(0);
487 	assert(bstr.read(mb, IOMode.once) == mb.length);
488 	foreach (i, b; mb) assert(i == b);
489 
490 	// produce an EOF error
491 	bstr.seek(128);
492 	assertThrown(bstr.read(bb));
493 	assert(bstr.tell == 128);
494 
495 	// add more data from the outside
496 	str.seek(str.size);
497 	str.write([cast(ubyte)128]);
498 
499 	// should now succeed
500 	bstr.read(bb);
501 	assert(bb[0] == 128);
502 
503 	// next byte should produce an EOF error again
504 	assertThrown(bstr.read(bb));
505 
506 	// add more data from the inside
507 	bstr.write([ubyte(129)]);
508 
509 	// should now succeed
510 	bstr.seek(129);
511 	bstr.read(bb);
512 	assert(bb[0] == 129);
513 	assert(bstr.size == 130);
514 	assert(str.size == 129);
515 	bstr.flush();
516 	assert(str.size == 130);
517 
518 	// next byte should produce an EOF error again
519 	bstr.seek(130);
520 	assertThrown(bstr.read(bb));
521 	assert(bstr.tell == 130);
522 	assertThrown(bstr.read(bb, IOMode.once));
523 	assert(bstr.tell == 130);
524 
525 	// add more data from the inside (chunk now cached)
526 	bstr.write([ubyte(130)]);
527 	assert(bstr.size == 131);
528 	assert(str.size == 130);
529 
530 	// should now succeed
531 	bstr.seek(130);
532 	bstr.read(bb);
533 	assert(bb[0] == 130);
534 
535 	// flush should write though to the file
536 	bstr.flush();
537 	assert(str.size == 131);
538 
539 	// read back the written data from the underlying file
540 	bstr.sync();
541 	bstr.seek(129);
542 	bstr.read(bb);
543 	assert(bb[0] == 129);
544 	bstr.read(bb);
545 	assert(bb[0] == 130);
546 }
547 
548 @safe unittest { // regression: write after read causes write to be missed during flush
549 	import std.exception : assertThrown;
550 	import vibe.stream.memory : createMemoryStream;
551 	import vibe.stream.operations : readAll;
552 
553 	auto buf = new ubyte[](256);
554 	foreach (i, ref b; buf) b = cast(ubyte)i;
555 	auto str = createMemoryStream(buf, true, 128);
556 	auto bstr = bufferedStream(str, 16, 4);
557 
558 	ubyte[1] ob;
559 	bstr.read(ob[]);
560 	assert(ob[0] == 0);
561 	bstr.seek(0);
562 	bstr.write([cast(ubyte)1]);
563 	bstr.seek(0);
564 	bstr.read(ob[]);
565 	assert(ob[0] == 1);
566 	bstr.finalize();
567 	str.seek(0);
568 	str.read(ob[]);
569 	assert(ob[0] == 1);
570 }
571 
572 @safe unittest { // regression seeking past end of file within the last chunk
573 	import std.exception : assertThrown;
574 	import vibe.stream.memory : createMemoryStream;
575 	import vibe.stream.operations : readAll;
576 
577 	auto buf = new ubyte[](256);
578 	foreach (i, ref b; buf) b = cast(ubyte)i;
579 	auto str = createMemoryStream(buf, true, 1);
580 	auto bstr = bufferedStream(str, 16, 4);
581 
582 	ubyte[1] ob;
583 	bstr.read(ob[]);
584 	assert(ob[0] == 0);
585 	bstr.seek(10);
586 	bstr.write([cast(ubyte)1]);
587 	bstr.seek(10);
588 	bstr.read(ob[]);
589 	assert(ob[0] == 1);
590 }
591 
592 unittest {
593 	static assert(isTruncatableStream!(BufferedStream!TruncatableStream));
594 	static assert(isClosableRandomAccessStream!(BufferedStream!ClosableRandomAccessStream));
595 }
596 
597 private struct Buffer {
598 	ulong chunk = ulong.max; // chunk index (offset = chunk * state.bufferSize)
599 	ubyte[] memory;
600 	ulong lastAccess;
601 	size_t fill;
602 	bool dirty;
603 }