1 /** Implements a buffered random access wrapper stream.
2 
3 	Copyright: © 2020-2021 Sönke Ludwig
4 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
5 	Authors: Sönke Ludwig
6 */
7 module vibe.stream.bufferedstream;
8 
9 import vibe.core.stream;
10 
11 import std.algorithm;
12 import std.traits : Unqual;
13 
14 
15 /** Creates a new buffered stream wrapper.
16 
17 	Params:
18 		stream = The stream that is going to be wrapped
19 		buffer_size = Size of a single buffer segment
20 		buffer_count = Number of buffer segments
21 */
22 auto bufferedStream(S)(S stream, size_t buffer_size = 16_384, size_t buffer_count = 4)
23 	if (isRandomAccessStream!S)
24 	in { assert(buffer_size > 0 && buffer_count > 0); }
25 do {
26 	return BufferedStream!S(stream.move, buffer_size, buffer_count);
27 }
28 
29 
30 /** Random access stream that provides buffering on top of a wrapped stream.
31 
32 	The source stream is logically split up into chunks of equal size, where
33 	a defined maximum number of those chunks can be cached in memory. Cached
34 	chunks are managed using a LRU strategy.
35 */
36 struct BufferedStream(S) {
37 	import std.experimental.allocator;
38 	import std.experimental.allocator.mallocator;
39 
40 	private static struct State {
41 		S stream;
42 		int refCount = 1;
43 		ulong ptr;
44 		ulong size;
45 		size_t bufferSize;
46 		Buffer[] buffers;
47 		ulong accessCount;
48 		ubyte[] buffermemory;
49 
50 		this(size_t buffer_size, size_t buffer_count, S stream)
51 		{
52 			this.bufferSize = buffer_size;
53 			this.buffers = Mallocator.instance.makeArray!Buffer(buffer_count);
54 			this.buffermemory = Mallocator.instance.makeArray!ubyte(buffer_count * buffer_size);
55 			foreach (i, ref b; this.buffers)
56 				b.memory = this.buffermemory[i * buffer_size .. (i+1) * buffer_size];
57 			this.size = stream.size;
58 			this.stream = stream.move;
59 		}
60 
61 		~this()
62 		{
63 			if (this.buffers is null) return;
64 
65 			if (this.stream.writable)
66 				flush();
67 
68 			() @trusted {
69 				Mallocator.instance.dispose(this.buffermemory);
70 				Mallocator.instance.dispose(this.buffers);
71 			} ();
72 		}
73 
74 		@disable this(this);
75 
76 		void flush()
77 		{
78 			foreach (i; 0 .. this.buffers.length)
79 				flushBuffer(i);
80 			this.stream.flush();
81 		}
82 
83 		private size_t bufferChunk(ulong chunk_index)
84 		{
85 			auto idx = this.buffers.countUntil!((ref b) => b.chunk == chunk_index);
86 			if (idx >= 0) return idx;
87 
88 			auto offset = chunk_index * this.bufferSize;
89 			if (offset >= this.size) this.size = this.stream.size;
90 			if (offset >= this.size) throw new Exception("Reading past end of stream.");
91 
92 			auto newidx = this.buffers.minIndex!((ref a, ref b) => a.lastAccess < b.lastAccess);
93 			flushBuffer(newidx);
94 
95 			this.buffers[newidx].fill = 0;
96 			this.buffers[newidx].chunk = chunk_index;
97 			fillBuffer(newidx, min(this.size - chunk_index*this.bufferSize, this.bufferSize));
98 			return newidx;
99 		}
100 
101 		private void fillBuffer(size_t buffer, size_t size)
102 		{
103 			auto b = &this.buffers[buffer];
104 			if (size <= b.fill) return;
105 			assert(size <= this.bufferSize);
106 
107 			this.stream.seek(b.chunk*this.bufferSize + b.fill);
108 			this.stream.read(b.memory[b.fill .. size]);
109 			b.fill = size;
110 			touchBuffer(buffer);
111 		}
112 
113 		private void flushBuffer(size_t buffer)
114 		{
115 			auto b = &this.buffers[buffer];
116 			if (!b.fill || !b.dirty) return;
117 			this.stream.seek(b.chunk * this.bufferSize);
118 			this.stream.write(b.memory[0 .. b.fill]);
119 			if (b.chunk * this.bufferSize + b.fill > this.size)
120 				this.size = b.chunk * this.bufferSize + b.fill;
121 			b.dirty = false;
122 			touchBuffer(buffer);
123 		}
124 
125 		private void touchBuffer(size_t buffer)
126 		{
127 			this.buffers[buffer].lastAccess = ++this.accessCount;
128 		}
129 
130 		private void iterateChunks(B)(ulong offset, scope B[] bytes,
131 			scope bool delegate(ulong offset, scope B[] bytes, sizediff_t buffer, size_t buffer_begin, size_t buffer_end) @safe del)
132 		@safe {
133 			auto begin = offset;
134 			auto end = offset + bytes.length;
135 
136 			if (bytes.length == 0) return;
137 
138 			auto chunk_begin = begin / this.bufferSize;
139 			auto chunk_end = (end + this.bufferSize - 1) / this.bufferSize;
140 
141 			foreach (i; chunk_begin .. chunk_end) {
142 				auto chunk_off = i * this.bufferSize;
143 				auto cstart = max(chunk_off, begin);
144 				auto cend = min(chunk_off + this.bufferSize, end);
145 				assert(cend > cstart);
146 
147 				auto buf = this.buffers.countUntil!((ref b) => b.chunk == i);
148 				auto buf_begin = cast(size_t)(cstart - chunk_off);
149 				auto buf_end = cast(size_t)(cend - chunk_off);
150 
151 				auto bytes_chunk = bytes[cast(size_t)(cstart - begin) .. cast(size_t)(cend - begin)];
152 
153 				if (!del(cstart, bytes_chunk, buf, buf_begin, buf_end))
154 					break;
155 			}
156 		}
157 	}
158 
159 	private {
160 		// makes the stream copyable and makes it small enough to be put into
161 		// a stream proxy
162 		State* m_state;
163 	}
164 
165 	private this(S stream, size_t buffer_size, size_t buffer_count)
166 	@safe {
167 		m_state = new State(buffer_size, buffer_count, stream.move);
168 	}
169 
170 	this(this)
171 	{
172 		if (m_state) m_state.refCount++;
173 	}
174 
175 	~this()
176 	@safe {
177 		if (m_state) {
178 			if (!--m_state.refCount)
179 				destroy(*m_state);
180 		}
181 	}
182 
183 	@property bool empty() @blocking { return state.ptr >= state.size; }
184 	@property ulong leastSize() @blocking { return state.size - state.ptr; }
185 	@property bool dataAvailableForRead() { return peek().length > 0; }
186 	@property ulong size() const nothrow { return state.size; }
187 	@property bool readable() const nothrow { return state.stream.readable; }
188 	@property bool writable() const nothrow { return state.stream.writable; }
189 
190 	@property ref inout(S) underlying() inout { return state.stream; }
191 
192 	static if (isClosableRandomAccessStream!S) {
193 		void close()
194 		{
195 			sync();
196 			state.stream.close();
197 		}
198 
199 		@property bool isOpen()
200 		const {
201 			return state.stream.isOpen();
202 		}
203 	}
204 
205 	static if (is(typeof(S.init.truncate(ulong.init))))
206 		void truncate(ulong size)
207 		{
208 			sync();
209 			state.stream.truncate(size);
210 			state.size = size;
211 		}
212 
213 	const(ubyte)[] peek()
214 	{
215 		auto limit = (state.ptr / state.bufferSize + 1) * state.bufferSize;
216 		auto dummy = () @trusted { return (cast(const(ubyte)*)null)[0 .. cast(size_t)(limit - state.ptr)]; } ();
217 
218 		const(ubyte)[] ret;
219 		state.iterateChunks!(const(ubyte))(state.ptr, dummy, (offset, scope _, buf, buf_begin, buf_end) {
220 			if (buf >= 0) {
221 				auto b = &state.buffers[buf];
222 				ret = b.memory[buf_begin .. min(buf_end, b.fill)];
223 				state.touchBuffer(buf);
224 			}
225 			return false;
226 		});
227 		return ret;
228 	}
229 
230 	size_t read(scope ubyte[] dst, IOMode mode)
231 	@blocking {
232 		size_t nread = 0;
233 
234 		// update size if a read past EOF is expected
235 		if (state.ptr + dst.length > state.size) state.size = state.stream.size;
236 
237 		state.iterateChunks!ubyte(state.ptr, dst, (offset, scope dst_chunk, buf, buf_begin, buf_end) {
238 			if (buf < 0) {
239 				if (mode == IOMode.immediate) return false;
240 				if (mode == IOMode.once && nread) return false;
241 				buf = state.bufferChunk(offset / state.bufferSize);
242 			} else state.touchBuffer(buf);
243 
244 			if (state.buffers[buf].fill < buf_end) {
245 				if (mode == IOMode.immediate) return false;
246 				if (mode == IOMode.once && nread) return false;
247 				state.fillBuffer(buf, buf_end);
248 			}
249 
250 			// the whole of dst_chunk is now in the buffer
251 			assert(buf_begin % state.bufferSize == offset % state.bufferSize);
252 			assert(dst_chunk.length <= buf_end - buf_begin);
253 			dst_chunk[] = state.buffers[buf].memory[buf_begin .. buf_begin + dst_chunk.length];
254 			nread += dst_chunk.length;
255 
256 			return true;
257 		});
258 
259 		if (mode == IOMode.all && dst.length != nread)
260 			throw new Exception("Reading past end of stream.");
261 
262 		state.ptr += nread;
263 
264 		return nread;
265 	}
266 
267 	void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); }
268 
269 	size_t write(in ubyte[] bytes, IOMode mode)
270 	@blocking {
271 		size_t nwritten = 0;
272 
273 		state.iterateChunks!(const(ubyte))(state.ptr, bytes, (offset, scope src_chunk, buf, buf_begin, buf_end) {
274 			if (buf < 0) { // write through if not buffered
275 				if (mode == IOMode.immediate) return false;
276 				if (mode == IOMode.once && nwritten) return false;
277 
278 				state.stream.seek(offset);
279 				state.stream.write(src_chunk);
280 			} else {
281 				auto b = &state.buffers[buf];
282 				b.memory[buf_begin .. buf_begin + src_chunk.length] = src_chunk;
283 				b.fill = max(b.fill, buf_begin + src_chunk.length);
284 				b.dirty = true;
285 			}
286 
287 			nwritten += src_chunk.length;
288 			if (offset + src_chunk.length > state.size)
289 				state.size = offset + src_chunk.length;
290 
291 			return true;
292 		});
293 
294 		assert(mode != IOMode.all || nwritten == bytes.length);
295 
296 		state.ptr += nwritten;
297 
298 		return nwritten;
299 	}
300 
301 	void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); }
302 	void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
303 
304 	void flush() @blocking { state.flush(); }
305 
306 	/** Flushes and releases all buffers and updates the buffer size.
307 
308 		This forces the buffered view of the source stream to be in sync with
309 		the actual source stream.
310 	*/
311 	void sync()
312 	@blocking {
313 		flush();
314 		foreach (ref b; state.buffers) {
315 			b.chunk = ulong.max;
316 			b.fill = 0;
317 		}
318 		state.size = state.stream.size;
319 	}
320 
321 	void finalize() @blocking { flush(); }
322 
323 	void seek(ulong offset) { state.ptr = offset; }
324 	ulong tell() nothrow { return state.ptr; }
325 
326 	private ref inout(State) state() @trusted nothrow return inout { return *m_state; }
327 }
328 
329 mixin validateRandomAccessStream!(BufferedStream!RandomAccessStream);
330 
331 @safe unittest {
332 	import std.exception : assertThrown;
333 	import vibe.stream.memory : createMemoryStream;
334 	import vibe.stream.operations : readAll;
335 
336 	auto buf = new ubyte[](256);
337 	foreach (i, ref b; buf) b = cast(ubyte)i;
338 	auto str = createMemoryStream(buf, true, 128);
339 	auto bstr = bufferedStream(str, 16, 4);
340 
341 	// test empty method
342 	assert(!bstr.empty);
343 	bstr.readAll();
344 	assert(bstr.empty);
345 
346 	bstr.seek(0);
347 
348 	ubyte[1] bb;
349 
350 	// test that each byte is readable
351 	foreach (i; 0 .. 128) {
352 		bstr.read(bb);
353 		assert(bb[0] == i);
354 	}
355 
356 	// same in reverse
357 	foreach_reverse (i; 0 .. 128) {
358 		bstr.seek(i);
359 		bstr.read(bb);
360 		assert(bb[0] == i);
361 	}
362 
363 	// the first chunk should be cached now
364 	assert(bstr.dataAvailableForRead);
365 	assert(bstr.peek().length == 15);
366 	assert(bstr.peek()[0] == 1);
367 
368 	// the last one should not
369 	bstr.seek(126);
370 	assert(!bstr.dataAvailableForRead);
371 	assert(bstr.peek().length == 0);
372 	assert(bstr.leastSize == 2);
373 
374 	// a read brings it back
375 	bstr.read(bb);
376 	assert(bb[0] == 126);
377 	assert(bstr.dataAvailableForRead);
378 	assert(bstr.peek() == [127]);
379 
380 	// the first to third ones should still be there
381 	ubyte[3*16-8] mb;
382 	bstr.seek(0);
383 	assert(bstr.dataAvailableForRead);
384 	assert(bstr.read(mb, IOMode.immediate) == mb.length);
385 	foreach (i, b; mb) assert(i == b);
386 
387 	// should read only the remaining 8 buffered bytes
388 	assert(bstr.read(mb, IOMode.immediate) == 8);
389 	assert(bstr.tell == 3*16);
390 	bstr.seek(mb.length);
391 
392 	// should also read only the remaining 8 buffered bytes
393 	assert(bstr.read(mb, IOMode.once) == 8);
394 	assert(bstr.tell == 3*16);
395 	bstr.seek(mb.length);
396 
397 	// should read the whole buffer, caching the fourth and fifth chunk
398 	assert(bstr.read(mb, IOMode.all) == mb.length);
399 	assert(bstr.tell == 2*mb.length);
400 	foreach (i, b; mb) assert(i + mb.length == b);
401 
402 	// the first chunk should now be out of cache
403 	bstr.seek(0);
404 	assert(!bstr.dataAvailableForRead);
405 
406 	// reading with immediate should consequently fail
407 	assert(bstr.read(mb, IOMode.immediate) == 0);
408 
409 	// the second/third ones should still be in
410 	bstr.seek(16);
411 	assert(bstr.dataAvailableForRead);
412 	bstr.seek(2*16);
413 	assert(bstr.dataAvailableForRead);
414 
415 	// reading uncached data followed by cached data should succeed for "once"
416 	bstr.seek(0);
417 	assert(bstr.read(mb, IOMode.once) == mb.length);
418 	foreach (i, b; mb) assert(i == b);
419 
420 	// the first three and the fifth chunk should now be cached
421 	bstr.seek(0);
422 	assert(bstr.dataAvailableForRead);
423 	bstr.seek(16);
424 	assert(bstr.dataAvailableForRead);
425 	bstr.seek(32);
426 	assert(bstr.dataAvailableForRead);
427 	bstr.seek(48);
428 	assert(!bstr.dataAvailableForRead);
429 	bstr.seek(64);
430 	assert(bstr.dataAvailableForRead);
431 
432 	// reading once should read until the end of the cached chunk sequence
433 	bstr.seek(0);
434 	assert(bstr.read(mb, IOMode.once) == mb.length);
435 	foreach (i, b; mb) assert(i == b);
436 
437 	// produce an EOF error
438 	bstr.seek(128);
439 	assertThrown(bstr.read(bb));
440 	assert(bstr.tell == 128);
441 
442 	// add more data from the outside
443 	str.seek(str.size);
444 	str.write([cast(ubyte)128]);
445 
446 	// should now succeed
447 	bstr.read(bb);
448 	assert(bb[0] == 128);
449 
450 	// next byte should produce an EOF error again
451 	assertThrown(bstr.read(bb));
452 
453 	// add more data from the inside
454 	bstr.write([ubyte(129)]);
455 
456 	// should now succeed
457 	bstr.seek(129);
458 	bstr.read(bb);
459 	assert(bb[0] == 129);
460 	assert(bstr.size == 130);
461 	assert(str.size == 129);
462 	bstr.flush();
463 	assert(str.size == 130);
464 
465 	// next byte should produce an EOF error again
466 	bstr.seek(130);
467 	assertThrown(bstr.read(bb));
468 	assert(bstr.tell == 130);
469 	assertThrown(bstr.read(bb, IOMode.once));
470 	assert(bstr.tell == 130);
471 
472 	// add more data from the inside (chunk now cached)
473 	bstr.write([ubyte(130)]);
474 	assert(bstr.size == 131);
475 	assert(str.size == 130);
476 
477 	// should now succeed
478 	bstr.seek(130);
479 	bstr.read(bb);
480 	assert(bb[0] == 130);
481 
482 	// flush should write though to the file
483 	bstr.flush();
484 	assert(str.size == 131);
485 
486 	// read back the written data from the underlying file
487 	bstr.sync();
488 	bstr.seek(129);
489 	bstr.read(bb);
490 	assert(bb[0] == 129);
491 	bstr.read(bb);
492 	assert(bb[0] == 130);
493 }
494 
495 unittest {
496 	static assert(isTruncatableStream!(BufferedStream!TruncatableStream));
497 	static assert(isClosableRandomAccessStream!(BufferedStream!ClosableRandomAccessStream));
498 }
499 
500 private struct Buffer {
501 	ulong chunk = ulong.max; // chunk index (offset = chunk * state.bufferSize)
502 	ubyte[] memory;
503 	ulong lastAccess;
504 	size_t fill;
505 	bool dirty;
506 }