1 /** Random access stream that caches a source input stream on disk.
2 
3 	Copyright: © 2023 Sönke Ludwig
4 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
5 	Authors: Sönke Ludwig
6 */
7 module vibe.stream.cached;
8 
9 import vibe.core.file;
10 import vibe.core.log;
11 import vibe.core.path;
12 import vibe.core.stream;
13 
14 import std.typecons : Flag, No, Yes;
15 
16 
17 /** Creates a new `CachedStream` instance.
18 
19 	Data will be read as needed from the source stream sequentially and gets
20 	stored in the local file for random access. Note that the reported size
21 	of the stream may change for source streams that do not report their full
22 	size through the `leastSize` property.
23 
24 	Also note that when making the cached file writable, parts of the file that
25 	have not yet been read from the source stream will get overwritten after
26 	write operations to the cached file. Write operations should ideally only be
27 	made after reading the complete source stream.
28 
29 	Params:
30 		source = The source input stream to read from
31 		writable = Optional flag to make the cached file writable
32 		cached_file_path = Explicit path for storing the cached file - if not
33 			given, uses a temporary file that gets deleted upon close
34 */
35 CachedFileStream!InputStream createCachedFileStream(InputStream)(InputStream source, Flag!"writable" writable = No.writable)
36 	if (isInputStream!InputStream)
37 {
38 	return CachedFileStream!InputStream(source, writable, NativePath.init);
39 }
40 /// ditto
41 CachedFileStream!InputStream createCachedFileStream(InputStream)(InputStream source, NativePath cached_file_path, Flag!"writable" writable = No.writable)
42 	if (isInputStream!InputStream)
43 {
44 	return CachedFileStream!InputStream(source, writable, cached_file_path);
45 }
46 
47 
48 /** File backed cached random access stream wrapper.
49 
50 	See_also: `createCachedFileStream`
51 */
52 struct CachedFileStream(InputStream)
53 	if (isInputStream!InputStream)
54 {
55 	import std.algorithm.comparison : max, min;
56 
57 	enum outputStreamVersion = 2;
58 
59 	private static struct CTX {
60 		InputStream source;
61 		FileStream cachedFile;
62 		ulong readPtr;
63 		ulong size;
64 		bool canWrite;
65 		bool deleteOnClose;
66 	}
67 
68 	private {
69 		CTX* m_ctx;
70 	}
71 
72 	private this(InputStream source, bool writable, NativePath cached_file_path)
73 	{
74 		m_ctx = new CTX;
75 		m_ctx.source = source;
76 		m_ctx.canWrite = writable;
77 		m_ctx.size = source.leastSize;
78 
79 		if (cached_file_path == NativePath.init) {
80 			m_ctx.deleteOnClose = true;
81 			m_ctx.cachedFile = createTempFile();
82 		} else m_ctx.cachedFile = openFile(cached_file_path, FileMode.createTrunc);
83 	}
84 
85 	@property int fd() const nothrow { return m_ctx.cachedFile.fd; }
86 	@property NativePath path() const nothrow { return m_ctx.cachedFile.path; }
87 	@property bool isOpen() const nothrow { return m_ctx && m_ctx.cachedFile.isOpen; }
88 	@property ulong size() const nothrow { return m_ctx ? max(m_ctx.cachedFile.size, m_ctx.size) : 0; }
89 	@property bool readable() const nothrow { return true; }
90 	@property bool writable() const nothrow { return m_ctx.canWrite; }
91 	@property ulong leastSize()
92 	@blocking {
93 		auto pos = tell();
94 		auto size = size();
95 		if (pos > size) return 0;
96 		return size - pos;
97 	}
98 	@property bool dataAvailableForRead()
99 	{
100 		if (!m_ctx)
101 			return false;
102 		if (m_ctx.cachedFile.dataAvailableForRead)
103 			return true;
104 		if (tell() == m_ctx.readPtr && m_ctx.source.dataAvailableForRead)
105 			return true;
106 		return false;
107 	}
108 	@property bool empty() @blocking { return leastSize() == 0; }
109 
110 	void close()
111 	@blocking {
112 		if (!m_ctx || !m_ctx.cachedFile.isOpen) return;
113 
114 		bool was_open = m_ctx.cachedFile.isOpen;
115 		NativePath remove_path;
116 		if (was_open) remove_path = m_ctx.cachedFile.path;
117 
118 		m_ctx.cachedFile.close();
119 		if (was_open && m_ctx.deleteOnClose) {
120 			try removeFile(remove_path);
121 			catch (Exception e) logException(e, "Failed to remove temporary cached stream file");
122 		}
123 
124 		static if (is(InputStream == struct))
125 			destroy(m_ctx.source);
126 		destroy(m_ctx.cachedFile);
127 		m_ctx = null;
128 	}
129 
130 	void truncate(ulong size) @blocking { m_ctx.cachedFile.truncate(size); }
131 	void seek(ulong offset)
132 	@blocking {
133 		readUpTo(offset);
134 		m_ctx.cachedFile.seek(offset);
135 	}
136 
137 	ulong tell() nothrow { return m_ctx.cachedFile.tell(); }
138 
139 	size_t write(scope const(ubyte)[] bytes, IOMode mode) @blocking { return m_ctx.cachedFile.write(bytes, mode); }
140 	void write(scope const(ubyte)[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); }
141 	void write(scope const(char)[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
142 
143 	void flush() @blocking { m_ctx.cachedFile.flush(); }
144 	void finalize() @blocking { m_ctx.cachedFile.flush(); }
145 
146 	const(ubyte)[] peek()
147 	{
148 		if (m_ctx.cachedFile.tell == m_ctx.readPtr)
149 			return m_ctx.source.peek;
150 		return m_ctx.cachedFile.peek;
151 	}
152 
153 	size_t read(scope ubyte[] dst, IOMode mode)
154 	@blocking {
155 		readUpTo(tell() + dst.length);
156 		return m_ctx.cachedFile.read(dst, mode);
157 	}
158 	void read(scope ubyte[] dst) @blocking { auto n = read(dst, IOMode.all); assert(n == dst.length); }
159 
160 	private void readUpTo(ulong offset)
161 	{
162 		if (offset <= m_ctx.readPtr) return;
163 
164 		auto ptr = m_ctx.cachedFile.tell;
165 		scope (exit) m_ctx.cachedFile.seek(ptr);
166 
167 		m_ctx.cachedFile.seek(m_ctx.readPtr);
168 
169 		while (offset > m_ctx.readPtr) {
170 			auto chunk = min(offset - m_ctx.readPtr, m_ctx.source.leastSize);
171 			if (chunk == 0) break;
172 			pipe(m_ctx.source, m_ctx.cachedFile, chunk);
173 			m_ctx.readPtr += chunk;
174 			m_ctx.size = m_ctx.readPtr + m_ctx.source.leastSize;
175 		}
176 	}
177 }
178 
179 mixin validateClosableRandomAccessStream!(CachedFileStream!InputStream);
180 
181 unittest { // basic random access reading
182 	import vibe.stream.memory : createMemoryStream;
183 	auto source = createMemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
184 	auto cs = createCachedFileStream(source);
185 	auto path = cs.path;
186 	assert(cs.size == 10);
187 
188 	assert(existsFile(path) && getFileInfo(path).size == 0);
189 
190 	void testRead(const(ubyte)[] expected)
191 	{
192 		auto buf = new ubyte[](expected.length);
193 		cs.read(buf);
194 		assert(buf[] == expected[]);
195 	}
196 
197 	testRead([1, 2]);
198 	assert(getFileInfo(path).size == 2);
199 	assert(cs.size == 10);
200 	assert(cs.leastSize == 8);
201 
202 	testRead([3, 4, 5, 6, 7, 8, 9, 10]);
203 	assert(getFileInfo(path).size == 10);
204 	assert(cs.empty);
205 
206 	cs.close();
207 	assert(!existsFile(path));
208 }
209 
210 unittest { // explicit cache file path
211 	import vibe.stream.memory : createMemoryStream;
212 	ubyte[] data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
213 	auto source = createMemoryStream(data);
214 	auto path = NativePath("test-tmp-cached-file.dat");
215 	auto cs = createCachedFileStream(source, path);
216 	assert(existsFile(path));
217 	ubyte[10] buf;
218 	cs.read(buf);
219 	assert(buf[] == data[]);
220 	cs.close();
221 	assert(existsFile(path));
222 	assert(readFile(path) == data);
223 	removeFile(path);
224 }
225 
226 unittest { // write operations during read
227 	import vibe.stream.memory : createMemoryStream;
228 	auto source = createMemoryStream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
229 	auto cs = createCachedFileStream(source, Yes.writable);
230 	auto path = cs.path;
231 	assert(cs.size == 10);
232 
233 	assert(existsFile(path) && getFileInfo(path).size == 0);
234 
235 	void testRead(const(ubyte)[] expected)
236 	{
237 		auto buf = new ubyte[](expected.length);
238 		cs.read(buf);
239 		assert(buf[] == expected[]);
240 	}
241 
242 	ubyte[] bts(ubyte[] bts...) { return bts.dup; }
243 
244 	cs.write(bts(11, 12, 13));
245 	assert(cs.size == 10);
246 	testRead([4, 5, 6]);
247 	cs.seek(0);
248 	testRead([1, 2, 3, 4, 5, 6]);
249 
250 	cs.write(bts(14, 15, 16, 17, 18, 19));
251 	assert(cs.size == 12);
252 	cs.seek(6);
253 	testRead([7, 8, 9, 10, 18, 19]);
254 	cs.close();
255 }