1 /** Implements a buffered random access wrapper stream.
2
3 Copyright: © 2020-2021 Sönke Ludwig
4 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
5 Authors: Sönke Ludwig
6 */
7 module vibe.stream.bufferedstream;
8
9 import vibe.core.stream;
10
11 import std.algorithm;
12 import std.traits : Unqual;
13
14
15 /** Creates a new buffered stream wrapper.
16
17 Params:
18 stream = The stream that is going to be wrapped
19 buffer_size = Size of a single buffer segment
20 buffer_count = Number of buffer segments
21 */
22 auto bufferedStream(S)(S stream, size_t buffer_size = 16_384, size_t buffer_count = 4)
23 if (isRandomAccessStream!S)
24 in { assert(buffer_size > 0 && buffer_count > 0); }
25 do {
26 return BufferedStream!S(stream.move, buffer_size, buffer_count);
27 }
28
29
30 /** Random access stream that provides buffering on top of a wrapped stream.
31
32 The source stream is logically split up into chunks of equal size, where
33 a defined maximum number of those chunks can be cached in memory. Cached
34 chunks are managed using a LRU strategy.
35 */
36 struct BufferedStream(S) {
37 import std.experimental.allocator;
38 import std.experimental.allocator.mallocator;
39
40 private static struct State {
41 S stream;
42 int refCount = 1;
43 ulong ptr;
44 ulong size;
45 size_t bufferSize;
46 Buffer[] buffers;
47 ulong accessCount;
48 ubyte[] buffermemory;
49
50 this(size_t buffer_size, size_t buffer_count, S stream)
51 {
52 this.bufferSize = buffer_size;
53 this.buffers = Mallocator.instance.makeArray!Buffer(buffer_count);
54 this.buffermemory = Mallocator.instance.makeArray!ubyte(buffer_count * buffer_size);
55 foreach (i, ref b; this.buffers)
56 b.memory = this.buffermemory[i * buffer_size .. (i+1) * buffer_size];
57 this.size = stream.size;
58 this.stream = stream.move;
59 }
60
61 ~this()
62 {
63 if (this.buffers is null) return;
64
65 if (this.stream.writable)
66 flush();
67
68 () @trusted {
69 Mallocator.instance.dispose(this.buffermemory);
70 Mallocator.instance.dispose(this.buffers);
71 } ();
72 }
73
74 @disable this(this);
75
76 void flush()
77 {
78 foreach (i; 0 .. this.buffers.length)
79 flushBuffer(i);
80 this.stream.flush();
81 }
82
83 private size_t bufferChunk(ulong chunk_index)
84 {
85 auto idx = this.buffers.countUntil!((ref b) => b.chunk == chunk_index);
86 if (idx >= 0) return idx;
87
88 auto offset = chunk_index * this.bufferSize;
89 if (offset >= this.size) this.size = this.stream.size;
90 if (offset >= this.size) throw new Exception("Reading past end of stream.");
91
92 auto newidx = this.buffers.minIndex!((ref a, ref b) => a.lastAccess < b.lastAccess);
93 flushBuffer(newidx);
94
95 this.buffers[newidx].fill = 0;
96 this.buffers[newidx].chunk = chunk_index;
97 fillBuffer(newidx, min(this.size - chunk_index*this.bufferSize, this.bufferSize));
98 return newidx;
99 }
100
101 private void fillBuffer(size_t buffer, size_t size)
102 {
103 auto b = &this.buffers[buffer];
104 if (size <= b.fill) return;
105 assert(size <= this.bufferSize);
106
107 this.stream.seek(b.chunk*this.bufferSize + b.fill);
108 this.stream.read(b.memory[b.fill .. size]);
109 b.fill = size;
110 touchBuffer(buffer);
111 }
112
113 private void flushBuffer(size_t buffer)
114 {
115 auto b = &this.buffers[buffer];
116 if (!b.fill || !b.dirty) return;
117 this.stream.seek(b.chunk * this.bufferSize);
118 this.stream.write(b.memory[0 .. b.fill]);
119 if (b.chunk * this.bufferSize + b.fill > this.size)
120 this.size = b.chunk * this.bufferSize + b.fill;
121 b.dirty = false;
122 touchBuffer(buffer);
123 }
124
125 private void touchBuffer(size_t buffer)
126 {
127 this.buffers[buffer].lastAccess = ++this.accessCount;
128 }
129
130 private void iterateChunks(B)(ulong offset, scope B[] bytes,
131 scope bool delegate(ulong offset, scope B[] bytes, sizediff_t buffer, size_t buffer_begin, size_t buffer_end) @safe del)
132 @safe {
133 auto begin = offset;
134 auto end = offset + bytes.length;
135
136 if (bytes.length == 0) return;
137
138 auto chunk_begin = begin / this.bufferSize;
139 auto chunk_end = (end + this.bufferSize - 1) / this.bufferSize;
140
141 foreach (i; chunk_begin .. chunk_end) {
142 auto chunk_off = i * this.bufferSize;
143 auto cstart = max(chunk_off, begin);
144 auto cend = min(chunk_off + this.bufferSize, end);
145 assert(cend > cstart);
146
147 auto buf = this.buffers.countUntil!((ref b) => b.chunk == i);
148 auto buf_begin = cast(size_t)(cstart - chunk_off);
149 auto buf_end = cast(size_t)(cend - chunk_off);
150
151 auto bytes_chunk = bytes[cast(size_t)(cstart - begin) .. cast(size_t)(cend - begin)];
152
153 if (!del(cstart, bytes_chunk, buf, buf_begin, buf_end))
154 break;
155 }
156 }
157 }
158
159 private {
160 // makes the stream copyable and makes it small enough to be put into
161 // a stream proxy
162 State* m_state;
163 }
164
165 private this(S stream, size_t buffer_size, size_t buffer_count)
166 @safe {
167 m_state = new State(buffer_size, buffer_count, stream.move);
168 }
169
170 this(this)
171 {
172 if (m_state) m_state.refCount++;
173 }
174
175 ~this()
176 @safe {
177 if (m_state) {
178 if (!--m_state.refCount)
179 destroy(*m_state);
180 }
181 }
182
183 @property bool empty() @blocking { return state.ptr >= state.size; }
184 @property ulong leastSize() @blocking { return state.size - state.ptr; }
185 @property bool dataAvailableForRead() { return peek().length > 0; }
186 @property ulong size() const nothrow { return state.size; }
187 @property bool readable() const nothrow { return state.stream.readable; }
188 @property bool writable() const nothrow { return state.stream.writable; }
189
190 @property ref inout(S) underlying() inout { return state.stream; }
191
192 static if (isClosableRandomAccessStream!S) {
193 void close()
194 {
195 sync();
196 state.stream.close();
197 }
198
199 @property bool isOpen()
200 const {
201 return state.stream.isOpen();
202 }
203 }
204
205 static if (is(typeof(S.init.truncate(ulong.init))))
206 void truncate(ulong size)
207 {
208 sync();
209 state.stream.truncate(size);
210 state.size = size;
211 }
212
213 const(ubyte)[] peek()
214 {
215 auto limit = (state.ptr / state.bufferSize + 1) * state.bufferSize;
216 auto dummy = () @trusted { return (cast(const(ubyte)*)null)[0 .. cast(size_t)(limit - state.ptr)]; } ();
217
218 const(ubyte)[] ret;
219 state.iterateChunks!(const(ubyte))(state.ptr, dummy, (offset, scope _, buf, buf_begin, buf_end) {
220 if (buf >= 0) {
221 auto b = &state.buffers[buf];
222 ret = b.memory[buf_begin .. min(buf_end, b.fill)];
223 state.touchBuffer(buf);
224 }
225 return false;
226 });
227 return ret;
228 }
229
230 size_t read(scope ubyte[] dst, IOMode mode)
231 @blocking {
232 size_t nread = 0;
233
234 // update size if a read past EOF is expected
235 if (state.ptr + dst.length > state.size) state.size = state.stream.size;
236
237 state.iterateChunks!ubyte(state.ptr, dst, (offset, scope dst_chunk, buf, buf_begin, buf_end) {
238 if (buf < 0) {
239 if (mode == IOMode.immediate) return false;
240 if (mode == IOMode.once && nread) return false;
241 buf = state.bufferChunk(offset / state.bufferSize);
242 } else state.touchBuffer(buf);
243
244 if (state.buffers[buf].fill < buf_end) {
245 if (mode == IOMode.immediate) return false;
246 if (mode == IOMode.once && nread) return false;
247 state.fillBuffer(buf, buf_end);
248 }
249
250 // the whole of dst_chunk is now in the buffer
251 assert(buf_begin % state.bufferSize == offset % state.bufferSize);
252 assert(dst_chunk.length <= buf_end - buf_begin);
253 dst_chunk[] = state.buffers[buf].memory[buf_begin .. buf_begin + dst_chunk.length];
254 nread += dst_chunk.length;
255
256 return true;
257 });
258
259 if (mode == IOMode.all && dst.length != nread)
260 throw new Exception("Reading past end of stream.");
261
262 state.ptr += nread;
263
264 return nread;
265 }
266
267 void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); }
268
269 size_t write(in ubyte[] bytes, IOMode mode)
270 @blocking {
271 size_t nwritten = 0;
272
273 state.iterateChunks!(const(ubyte))(state.ptr, bytes, (offset, scope src_chunk, buf, buf_begin, buf_end) {
274 if (buf < 0) { // write through if not buffered
275 if (mode == IOMode.immediate) return false;
276 if (mode == IOMode.once && nwritten) return false;
277
278 state.stream.seek(offset);
279 state.stream.write(src_chunk);
280 } else {
281 auto b = &state.buffers[buf];
282 b.memory[buf_begin .. buf_begin + src_chunk.length] = src_chunk;
283 b.fill = max(b.fill, buf_begin + src_chunk.length);
284 b.dirty = true;
285 }
286
287 nwritten += src_chunk.length;
288 if (offset + src_chunk.length > state.size)
289 state.size = offset + src_chunk.length;
290
291 return true;
292 });
293
294 assert(mode != IOMode.all || nwritten == bytes.length);
295
296 state.ptr += nwritten;
297
298 return nwritten;
299 }
300
301 void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); }
302 void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
303
304 void flush() @blocking { state.flush(); }
305
306 /** Flushes and releases all buffers and updates the buffer size.
307
308 This forces the buffered view of the source stream to be in sync with
309 the actual source stream.
310 */
311 void sync()
312 @blocking {
313 flush();
314 foreach (ref b; state.buffers) {
315 b.chunk = ulong.max;
316 b.fill = 0;
317 }
318 state.size = state.stream.size;
319 }
320
321 void finalize() @blocking { flush(); }
322
323 void seek(ulong offset) { state.ptr = offset; }
324 ulong tell() nothrow { return state.ptr; }
325
326 private ref inout(State) state() @trusted nothrow return inout { return *m_state; }
327 }
328
329 mixin validateRandomAccessStream!(BufferedStream!RandomAccessStream);
330
331 @safe unittest {
332 import std.exception : assertThrown;
333 import vibe.stream.memory : createMemoryStream;
334 import vibe.stream.operations : readAll;
335
336 auto buf = new ubyte[](256);
337 foreach (i, ref b; buf) b = cast(ubyte)i;
338 auto str = createMemoryStream(buf, true, 128);
339 auto bstr = bufferedStream(str, 16, 4);
340
341 // test empty method
342 assert(!bstr.empty);
343 bstr.readAll();
344 assert(bstr.empty);
345
346 bstr.seek(0);
347
348 ubyte[1] bb;
349
350 // test that each byte is readable
351 foreach (i; 0 .. 128) {
352 bstr.read(bb);
353 assert(bb[0] == i);
354 }
355
356 // same in reverse
357 foreach_reverse (i; 0 .. 128) {
358 bstr.seek(i);
359 bstr.read(bb);
360 assert(bb[0] == i);
361 }
362
363 // the first chunk should be cached now
364 assert(bstr.dataAvailableForRead);
365 assert(bstr.peek().length == 15);
366 assert(bstr.peek()[0] == 1);
367
368 // the last one should not
369 bstr.seek(126);
370 assert(!bstr.dataAvailableForRead);
371 assert(bstr.peek().length == 0);
372 assert(bstr.leastSize == 2);
373
374 // a read brings it back
375 bstr.read(bb);
376 assert(bb[0] == 126);
377 assert(bstr.dataAvailableForRead);
378 assert(bstr.peek() == [127]);
379
380 // the first to third ones should still be there
381 ubyte[3*16-8] mb;
382 bstr.seek(0);
383 assert(bstr.dataAvailableForRead);
384 assert(bstr.read(mb, IOMode.immediate) == mb.length);
385 foreach (i, b; mb) assert(i == b);
386
387 // should read only the remaining 8 buffered bytes
388 assert(bstr.read(mb, IOMode.immediate) == 8);
389 assert(bstr.tell == 3*16);
390 bstr.seek(mb.length);
391
392 // should also read only the remaining 8 buffered bytes
393 assert(bstr.read(mb, IOMode.once) == 8);
394 assert(bstr.tell == 3*16);
395 bstr.seek(mb.length);
396
397 // should read the whole buffer, caching the fourth and fifth chunk
398 assert(bstr.read(mb, IOMode.all) == mb.length);
399 assert(bstr.tell == 2*mb.length);
400 foreach (i, b; mb) assert(i + mb.length == b);
401
402 // the first chunk should now be out of cache
403 bstr.seek(0);
404 assert(!bstr.dataAvailableForRead);
405
406 // reading with immediate should consequently fail
407 assert(bstr.read(mb, IOMode.immediate) == 0);
408
409 // the second/third ones should still be in
410 bstr.seek(16);
411 assert(bstr.dataAvailableForRead);
412 bstr.seek(2*16);
413 assert(bstr.dataAvailableForRead);
414
415 // reading uncached data followed by cached data should succeed for "once"
416 bstr.seek(0);
417 assert(bstr.read(mb, IOMode.once) == mb.length);
418 foreach (i, b; mb) assert(i == b);
419
420 // the first three and the fifth chunk should now be cached
421 bstr.seek(0);
422 assert(bstr.dataAvailableForRead);
423 bstr.seek(16);
424 assert(bstr.dataAvailableForRead);
425 bstr.seek(32);
426 assert(bstr.dataAvailableForRead);
427 bstr.seek(48);
428 assert(!bstr.dataAvailableForRead);
429 bstr.seek(64);
430 assert(bstr.dataAvailableForRead);
431
432 // reading once should read until the end of the cached chunk sequence
433 bstr.seek(0);
434 assert(bstr.read(mb, IOMode.once) == mb.length);
435 foreach (i, b; mb) assert(i == b);
436
437 // produce an EOF error
438 bstr.seek(128);
439 assertThrown(bstr.read(bb));
440 assert(bstr.tell == 128);
441
442 // add more data from the outside
443 str.seek(str.size);
444 str.write([cast(ubyte)128]);
445
446 // should now succeed
447 bstr.read(bb);
448 assert(bb[0] == 128);
449
450 // next byte should produce an EOF error again
451 assertThrown(bstr.read(bb));
452
453 // add more data from the inside
454 bstr.write([ubyte(129)]);
455
456 // should now succeed
457 bstr.seek(129);
458 bstr.read(bb);
459 assert(bb[0] == 129);
460 assert(bstr.size == 130);
461 assert(str.size == 129);
462 bstr.flush();
463 assert(str.size == 130);
464
465 // next byte should produce an EOF error again
466 bstr.seek(130);
467 assertThrown(bstr.read(bb));
468 assert(bstr.tell == 130);
469 assertThrown(bstr.read(bb, IOMode.once));
470 assert(bstr.tell == 130);
471
472 // add more data from the inside (chunk now cached)
473 bstr.write([ubyte(130)]);
474 assert(bstr.size == 131);
475 assert(str.size == 130);
476
477 // should now succeed
478 bstr.seek(130);
479 bstr.read(bb);
480 assert(bb[0] == 130);
481
482 // flush should write though to the file
483 bstr.flush();
484 assert(str.size == 131);
485
486 // read back the written data from the underlying file
487 bstr.sync();
488 bstr.seek(129);
489 bstr.read(bb);
490 assert(bb[0] == 129);
491 bstr.read(bb);
492 assert(bb[0] == 130);
493 }
494
495 unittest {
496 static assert(isTruncatableStream!(BufferedStream!TruncatableStream));
497 static assert(isClosableRandomAccessStream!(BufferedStream!ClosableRandomAccessStream));
498 }
499
500 private struct Buffer {
501 ulong chunk = ulong.max; // chunk index (offset = chunk * state.bufferSize)
502 ubyte[] memory;
503 ulong lastAccess;
504 size_t fill;
505 bool dirty;
506 }