1 /** Random access stream that caches a source input stream on disk. 2 3 Copyright: © 2023 Sönke Ludwig 4 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 5 Authors: Sönke Ludwig 6 */ 7 module vibe.stream.cached; 8 9 import vibe.core.file; 10 import vibe.core.log; 11 import vibe.core.path; 12 import vibe.core.stream; 13 14 import std.typecons : Flag, No, Yes; 15 16 17 /** Creates a new `CachedStream` instance. 18 19 Data will be read as needed from the source stream sequentially and gets 20 stored in the local file for random access. Note that the reported size 21 of the stream may change for source streams that do not report their full 22 size through the `leastSize` property. 23 24 Also note that when making the cached file writable, parts of the file that 25 have not yet been read from the source stream will get overwritten after 26 write operations to the cached file. Write operations should ideally only be 27 made after reading the complete source stream. 28 29 Params: 30 source = The source input stream to read from 31 writable = Optional flag to make the cached file writable 32 cached_file_path = Explicit path for storing the cached file - if not 33 given, uses a temporary file that gets deleted upon close 34 */ 35 CachedFileStream!InputStream createCachedFileStream(InputStream)(InputStream source, Flag!"writable" writable = No.writable) 36 if (isInputStream!InputStream) 37 { 38 return CachedFileStream!InputStream(source, writable, NativePath.init); 39 } 40 /// ditto 41 CachedFileStream!InputStream createCachedFileStream(InputStream)(InputStream source, NativePath cached_file_path, Flag!"writable" writable = No.writable) 42 if (isInputStream!InputStream) 43 { 44 return CachedFileStream!InputStream(source, writable, cached_file_path); 45 } 46 47 48 /** File backed cached random access stream wrapper. 49 50 See_also: `createCachedFileStream` 51 */ 52 struct CachedFileStream(InputStream) 53 if (isInputStream!InputStream) 54 { 55 import std.algorithm.comparison : max, min; 56 57 enum outputStreamVersion = 2; 58 59 private static struct CTX { 60 InputStream source; 61 FileStream cachedFile; 62 ulong readPtr; 63 ulong size; 64 bool canWrite; 65 bool deleteOnClose; 66 } 67 68 private { 69 CTX* m_ctx; 70 } 71 72 private this(InputStream source, bool writable, NativePath cached_file_path) 73 { 74 m_ctx = new CTX; 75 m_ctx.source = source; 76 m_ctx.canWrite = writable; 77 m_ctx.size = source.leastSize; 78 79 if (cached_file_path == NativePath.init) { 80 m_ctx.deleteOnClose = true; 81 m_ctx.cachedFile = createTempFile(); 82 } else m_ctx.cachedFile = openFile(cached_file_path, FileMode.createTrunc); 83 } 84 85 @property int fd() const nothrow { return m_ctx.cachedFile.fd; } 86 @property NativePath path() const nothrow { return m_ctx.cachedFile.path; } 87 @property bool isOpen() const nothrow { return m_ctx && m_ctx.cachedFile.isOpen; } 88 @property ulong size() const nothrow { return m_ctx ? max(m_ctx.cachedFile.size, m_ctx.size) : 0; } 89 @property bool readable() const nothrow { return true; } 90 @property bool writable() const nothrow { return m_ctx.canWrite; } 91 @property ulong leastSize() 92 @blocking { 93 auto pos = tell(); 94 auto size = size(); 95 if (pos > size) return 0; 96 return size - pos; 97 } 98 @property bool dataAvailableForRead() 99 { 100 if (!m_ctx) 101 return false; 102 if (m_ctx.cachedFile.dataAvailableForRead) 103 return true; 104 if (tell() == m_ctx.readPtr && m_ctx.source.dataAvailableForRead) 105 return true; 106 return false; 107 } 108 @property bool empty() @blocking { return leastSize() == 0; } 109 110 void close() 111 @blocking { 112 if (!m_ctx || !m_ctx.cachedFile.isOpen) return; 113 114 bool was_open = m_ctx.cachedFile.isOpen; 115 NativePath remove_path; 116 if (was_open) remove_path = m_ctx.cachedFile.path; 117 118 m_ctx.cachedFile.close(); 119 if (was_open && m_ctx.deleteOnClose) { 120 try removeFile(remove_path); 121 catch (Exception e) logException(e, "Failed to remove temporary cached stream file"); 122 } 123 124 static if (is(InputStream == struct)) 125 destroy(m_ctx.source); 126 destroy(m_ctx.cachedFile); 127 m_ctx = null; 128 } 129 130 void truncate(ulong size) @blocking { m_ctx.cachedFile.truncate(size); } 131 void seek(ulong offset) 132 @blocking { 133 readUpTo(offset); 134 m_ctx.cachedFile.seek(offset); 135 } 136 137 ulong tell() nothrow { return m_ctx.cachedFile.tell(); } 138 139 size_t write(scope const(ubyte)[] bytes, IOMode mode) @blocking { return m_ctx.cachedFile.write(bytes, mode); } 140 void write(scope const(ubyte)[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); } 141 void write(scope const(char)[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } 142 143 void flush() @blocking { m_ctx.cachedFile.flush(); } 144 void finalize() @blocking { m_ctx.cachedFile.flush(); } 145 146 const(ubyte)[] peek() 147 { 148 if (m_ctx.cachedFile.tell == m_ctx.readPtr) 149 return m_ctx.source.peek; 150 return m_ctx.cachedFile.peek; 151 } 152 153 size_t read(scope ubyte[] dst, IOMode mode) 154 @blocking { 155 readUpTo(tell() + dst.length); 156 return m_ctx.cachedFile.read(dst, mode); 157 } 158 void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); } 159 160 private void readUpTo(ulong offset) 161 { 162 if (offset <= m_ctx.readPtr) return; 163 164 auto ptr = m_ctx.cachedFile.tell; 165 scope (exit) m_ctx.cachedFile.seek(ptr); 166 167 m_ctx.cachedFile.seek(m_ctx.readPtr); 168 169 while (offset > m_ctx.readPtr) { 170 auto chunk = min(offset - m_ctx.readPtr, m_ctx.source.leastSize); 171 if (chunk == 0) break; 172 pipe(m_ctx.source, m_ctx.cachedFile, chunk); 173 m_ctx.readPtr += chunk; 174 m_ctx.size = m_ctx.readPtr + m_ctx.source.leastSize; 175 } 176 } 177 } 178 179 mixin validateClosableRandomAccessStream!(CachedFileStream!InputStream); 180 181 unittest { // basic random access reading 182 import vibe.stream.memory : createMemoryStream; 183 auto source = createMemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); 184 auto cs = createCachedFileStream(source); 185 auto path = cs.path; 186 assert(cs.size == 10); 187 188 assert(existsFile(path) && getFileInfo(path).size == 0); 189 190 void testRead(const(ubyte)[] expected) 191 { 192 auto buf = new ubyte[](expected.length); 193 cs.read(buf); 194 assert(buf[] == expected[]); 195 } 196 197 testRead([1, 2]); 198 assert(getFileInfo(path).size == 2); 199 assert(cs.size == 10); 200 assert(cs.leastSize == 8); 201 202 testRead([3, 4, 5, 6, 7, 8, 9, 10]); 203 assert(getFileInfo(path).size == 10); 204 assert(cs.empty); 205 206 cs.close(); 207 assert(!existsFile(path)); 208 } 209 210 unittest { // explicit cache file path 211 import vibe.stream.memory : createMemoryStream; 212 ubyte[] data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; 213 auto source = createMemoryStream(data); 214 auto path = NativePath("test-tmp-cached-file.dat"); 215 auto cs = createCachedFileStream(source, path); 216 assert(existsFile(path)); 217 ubyte[10] buf; 218 cs.read(buf); 219 assert(buf[] == data[]); 220 cs.close(); 221 assert(existsFile(path)); 222 assert(readFile(path) == data); 223 removeFile(path); 224 } 225 226 unittest { // write operations during read 227 import vibe.stream.memory : createMemoryStream; 228 auto source = createMemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); 229 auto cs = createCachedFileStream(source, Yes.writable); 230 auto path = cs.path; 231 assert(cs.size == 10); 232 233 assert(existsFile(path) && getFileInfo(path).size == 0); 234 235 void testRead(const(ubyte)[] expected) 236 { 237 auto buf = new ubyte[](expected.length); 238 cs.read(buf); 239 assert(buf[] == expected[]); 240 } 241 242 ubyte[] bts(ubyte[] bts...) { return bts.dup; } 243 244 cs.write(bts(11, 12, 13)); 245 assert(cs.size == 10); 246 testRead([4, 5, 6]); 247 cs.seek(0); 248 testRead([1, 2, 3, 4, 5, 6]); 249 250 cs.write(bts(14, 15, 16, 17, 18, 19)); 251 assert(cs.size == 12); 252 cs.seek(6); 253 testRead([7, 8, 9, 10, 18, 19]); 254 cs.close(); 255 }