1 /**
2 	Zlib input/output streams
3 
4 	Copyright: © 2012-2013 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.stream.zlib;
9 
10 import vibe.core.stream;
11 import vibe.utils.array;
12 import vibe.internal.freelistref;
13 import vibe.internal.interfaceproxy : InterfaceProxy, interfaceProxy;
14 
15 import std.algorithm;
16 import std.exception;
17 import etc.c.zlib;
18 
19 import vibe.core.log;
20 
21 
22 /** Creates a new deflate uncompression stream.
23 */
24 ZlibInputStream createDeflateInputStream(InputStream)(InputStream source) @safe
25 	if (isInputStream!InputStream)
26 {
27 	return new ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.deflate, true);
28 }
29 
30 /// private
31 FreeListRef!ZlibInputStream createDeflateInputStreamFL(InputStream)(InputStream source) @safe
32 	if (isInputStream!InputStream)
33 {
34 	return FreeListRef!ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.deflate, true);
35 }
36 
37 /** Creates a new deflate compression stream.
38 */
39 ZlibOutputStream createDeflateOutputStream(OutputStream)(OutputStream destination) @safe
40 	if (isOutputStream!OutputStream)
41 {
42 	return new ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.deflate, Z_DEFAULT_COMPRESSION, true);
43 }
44 
45 /// private
46 FreeListRef!ZlibOutputStream createDeflateOutputStreamFL(OutputStream)(OutputStream destination) @safe
47 	if (isOutputStream!OutputStream)
48 {
49 	return FreeListRef!ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.deflate, Z_DEFAULT_COMPRESSION, true);
50 }
51 
52 /** Creates a new deflate uncompression stream.
53 */
54 ZlibInputStream createGzipInputStream(InputStream)(InputStream source) @safe
55 	if (isInputStream!InputStream)
56 {
57 	return new ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.gzip, true);
58 }
59 
60 /// private
61 FreeListRef!ZlibInputStream createGzipInputStreamFL(InputStream)(InputStream source) @safe
62 	if (isInputStream!InputStream)
63 {
64 	return FreeListRef!ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.gzip, true);
65 }
66 
67 /** Creates a new deflate uncompression stream.
68 */
69 ZlibOutputStream createGzipOutputStream(OutputStream)(OutputStream destination) @safe
70 	if (isOutputStream!OutputStream)
71 {
72 	return new ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.gzip, Z_DEFAULT_COMPRESSION, true);
73 }
74 
75 /// private
76 FreeListRef!ZlibOutputStream createGzipOutputStreamFL(OutputStream)(OutputStream destination) @safe
77 	if (isOutputStream!OutputStream)
78 {
79 	return FreeListRef!ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.gzip, Z_DEFAULT_COMPRESSION, true);
80 }
81 
82 
83 /**
84 	Generic zlib output stream.
85 */
86 class ZlibOutputStream : OutputStream {
87 @safe:
88 
89 	private {
90 		InterfaceProxy!OutputStream m_out;
91 		z_stream m_zstream;
92 		ubyte[1024] m_outbuffer;
93 		//ubyte[4096] m_inbuffer;
94 		bool m_finalized = false;
95 	}
96 
97 	enum HeaderFormat {
98 		gzip,
99 		deflate
100 	}
101 
102 	/// private
103 	this(InterfaceProxy!OutputStream dst, HeaderFormat type, int level, bool dummy)
104 	{
105 		m_out = dst;
106 		zlibEnforce(() @trusted { return deflateInit2(&m_zstream, level, Z_DEFLATED, 15 + (type == HeaderFormat.gzip ? 16 : 0), 8, Z_DEFAULT_STRATEGY); } ());
107 	}
108 
109 	~this() {
110 		if (!m_finalized)
111 			() @trusted { deflateEnd(&m_zstream); } ();
112 	}
113 
114 	static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) {
115 		final override size_t write(scope const(ubyte)[] bytes_, IOMode mode) { return doWrite(bytes_, mode); }
116 	} else {
117 		final override size_t write(in ubyte[] bytes_, IOMode mode) { return doWrite(bytes_, mode); }
118 	}
119 
120 	alias write = OutputStream.write;
121 
122 	private size_t doWrite(scope const(ubyte)[] data, IOMode mode)
123 	{
124 		// TODO: support IOMode!
125 		if (!data.length) return 0;
126 		assert(!m_finalized);
127 		assert(m_zstream.avail_in == 0);
128 		m_zstream.next_in = () @trusted { return cast(ubyte*)data.ptr; } ();
129 		assert(data.length < uint.max);
130 		m_zstream.avail_in = cast(uint)data.length;
131 		doFlush(Z_NO_FLUSH);
132 		assert(m_zstream.avail_in == 0);
133 		m_zstream.next_in = null;
134 		return data.length;
135 	}
136 
137 	final void flush()
138 	{
139 		assert(!m_finalized);
140 		//doFlush(Z_SYNC_FLUSH);
141 		m_out.flush();
142 	}
143 
144 	final void finalize()
145 	{
146 		if (m_finalized) return;
147 		m_finalized = true;
148 		doFlush(Z_FINISH);
149 		m_out.flush();
150 		zlibEnforce(() @trusted { return deflateEnd(&m_zstream); }());
151 	}
152 
153 	private final void doFlush(int how)
154 	@safe {
155 		while (true) {
156 			m_zstream.next_out = &m_outbuffer[0];
157 			m_zstream.avail_out = cast(uint)m_outbuffer.length;
158 			//logInfo("deflate %s -> %s (%s)", m_zstream.avail_in, m_zstream.avail_out, how);
159 			auto ret = () @trusted { return deflate(&m_zstream, how); } ();
160 			//logInfo("    ... %s -> %s", m_zstream.avail_in, m_zstream.avail_out);
161 			switch (ret) {
162 				default:
163 					zlibEnforce(ret);
164 					assert(false, "Unknown return value for zlib deflate.");
165 				case Z_OK:
166 					assert(m_zstream.avail_out < m_outbuffer.length || m_zstream.avail_in == 0);
167 					m_out.write(m_outbuffer[0 .. m_outbuffer.length - m_zstream.avail_out]);
168 					break;
169 				case Z_BUF_ERROR:
170 					assert(m_zstream.avail_in == 0);
171 					return;
172 				case Z_STREAM_END:
173 					assert(how == Z_FINISH);
174 					m_out.write(m_outbuffer[0 .. m_outbuffer.length - m_zstream.avail_out]);
175 					return;
176 			}
177 		}
178 	}
179 }
180 
181 
182 unittest {
183 	import vibe.stream.memory;
184 	import vibe.stream.operations;
185 
186 	auto raw = cast(ubyte[])"Hello, World!\n".dup;
187 	ubyte[] gzip = [
188 		0x1F, 0x8B, 0x08, 0x08, 0xAF, 0x12, 0x42, 0x56, 0x00, 0x03, 0x74, 0x65, 0x73, 0x74, 0x2E, 0x74,
189 		0x78, 0x74, 0x00, 0xF3, 0x48, 0xCD, 0xC9, 0xC9, 0xD7, 0x51, 0x08, 0xCF, 0x2F, 0xCA, 0x49, 0x51,
190 		0xE4, 0x02, 0x00, 0x84, 0x9E, 0xE8, 0xB4, 0x0E, 0x00, 0x00, 0x00];
191 
192 	auto gzipin = createGzipInputStream(createMemoryStream(gzip));
193 	assert(gzipin.readAll() == raw);
194 }
195 
196 unittest {
197 	import vibe.stream.memory;
198 	import vibe.stream.operations;
199 
200 	ubyte[] gzip_partial = [
201 		0x1F, 0x8B, 0x08, 0x08, 0xAF, 0x12, 0x42, 0x56, 0x00, 0x03, 0x74, 0x65, 0x73, 0x74, 0x2E, 0x74,
202 		0x78, 0x74, 0x00, 0xF3, 0x48, 0xCD, 0xC9, 0xC9, 0xD7, 0x51, 0x08, 0xCF, 0x2F, 0xCA, 0x49, 0x51,
203 	];
204 
205 	auto gzipin = createGzipInputStream(createMemoryStream(gzip_partial));
206 	try {
207 		gzipin.readAll();
208 		assert(false, "Expected exception.");
209 	} catch (Exception e) {}
210 	assert(gzipin.empty);
211 }
212 
213 /**
214 	Generic zlib input stream.
215 */
216 class ZlibInputStream : InputStream {
217 @safe:
218 
219 	import std.zlib;
220 	private {
221 		InterfaceProxy!InputStream m_in;
222 		z_stream m_zstream;
223 		FixedRingBuffer!(ubyte, 4096) m_outbuffer;
224 		ubyte[1024] m_inbuffer;
225 		bool m_finished = false;
226 		ulong m_ninflated, n_read;
227 	}
228 
229 	enum HeaderFormat {
230 		gzip,
231 		deflate,
232 		automatic
233 	}
234 
235 	/// private
236 	this(InterfaceProxy!InputStream src, HeaderFormat type, bool dummy)
237 	{
238 		m_in = src;
239 		if (m_in.empty) {
240 			m_finished = true;
241 		} else {
242 			int wndbits = 15;
243 			if(type == HeaderFormat.gzip) wndbits += 16;
244 			else if(type == HeaderFormat.automatic) wndbits += 32;
245 			zlibEnforce(() @trusted { return inflateInit2(&m_zstream, wndbits); } ());
246 			readChunk();
247 		}
248 	}
249 
250 	~this() {
251 		if (!m_finished)
252 			() @trusted { inflateEnd(&m_zstream); } ();
253 	}
254 
255 	@property bool empty() { return this.leastSize == 0; }
256 
257 	@property ulong leastSize()
258 	{
259 		assert(!m_finished || m_in.empty, "Input contains more data than expected.");
260 		if (m_outbuffer.length > 0) return m_outbuffer.length;
261 		if (m_finished) return 0;
262 		readChunk();
263 		assert(m_outbuffer.length || m_finished);
264 
265 		return m_outbuffer.length;
266 	}
267 
268 	@property bool dataAvailableForRead()
269 	{
270 		return m_outbuffer.length > 0;
271 	}
272 
273 	const(ubyte)[] peek() { return m_outbuffer.peek(); }
274 
275 	size_t read(scope ubyte[] dst, IOMode mode)
276 	{
277 		enforce(dst.length == 0 || !empty, "Reading empty stream");
278 
279 		size_t nread = 0;
280 
281 		while (dst.length > 0) {
282 			auto len = min(m_outbuffer.length, dst.length);
283 			m_outbuffer.read(dst[0 .. len]);
284 			dst = dst[len .. $];
285 
286 			nread += len;
287 
288 			if (!m_outbuffer.length && !m_finished) {
289 				if (mode == IOMode.immediate || mode == IOMode.once && !nread)
290 					break;
291 				readChunk();
292 			}
293 			enforce(dst.length == 0 || m_outbuffer.length || !m_finished, "Reading past end of zlib stream.");
294 		}
295 
296 		return nread;
297 	}
298 
299 	alias read = InputStream.read;
300 
301 	private void readChunk()
302 	@safe {
303 		assert(m_outbuffer.length == 0, "Buffer must be empty to read the next chunk.");
304 		assert(m_outbuffer.peekDst().length > 0);
305 		enforce (!m_finished, "Reading past end of zlib stream.");
306 
307 		m_zstream.next_out = &m_outbuffer.peekDst()[0];
308 		m_zstream.avail_out = cast(uint)m_outbuffer.peekDst().length;
309 
310 		while (!m_outbuffer.length) {
311 			if (m_zstream.avail_in == 0) {
312 				auto clen = min(m_inbuffer.length, m_in.leastSize);
313 				if (clen == 0) {
314 					m_finished = true;
315 					throw new Exception("Premature end of compressed input.");
316 				}
317 				m_in.read(m_inbuffer[0 .. clen]);
318 				m_zstream.next_in = &m_inbuffer[0];
319 				m_zstream.avail_in = cast(uint)clen;
320 			}
321 			auto avins = m_zstream.avail_in;
322 			//logInfo("inflate %s -> %s (@%s in @%s)", m_zstream.avail_in, m_zstream.avail_out, m_ninflated, n_read);
323 			auto ret = zlibEnforce(() @trusted { return inflate(&m_zstream, Z_SYNC_FLUSH); } ());
324 			//logInfo("    ... %s -> %s", m_zstream.avail_in, m_zstream.avail_out);
325 			assert(m_zstream.avail_out != m_outbuffer.peekDst.length || m_zstream.avail_in != avins);
326 			m_ninflated += m_outbuffer.peekDst().length - m_zstream.avail_out;
327 			n_read += avins - m_zstream.avail_in;
328 			m_outbuffer.putN(m_outbuffer.peekDst().length - m_zstream.avail_out);
329 			assert(m_zstream.avail_out == 0 || m_zstream.avail_out == m_outbuffer.peekDst().length);
330 
331 			if (ret == Z_STREAM_END) {
332 				m_finished = true;
333 				zlibEnforce(() @trusted { return inflateEnd(&m_zstream); }());
334 				enforce(m_in.empty, "Extra data after end of compressed input.");
335 				return;
336 			}
337 		}
338 	}
339 }
340 
341 unittest {
342 	import vibe.stream.memory;
343 
344 	auto data = new ubyte[5000];
345 
346 	auto mos = createMemoryOutputStream();
347 	auto gos = createGzipOutputStream(mos);
348 	gos.write(data);
349 	gos.finalize();
350 
351 	auto ms = createMemoryStream(mos.data, false);
352 	auto gis = createGzipInputStream(ms);
353 
354 	auto result = new ubyte[data.length];
355 	gis.read(result);
356 	assert(data == result);
357 }
358 
359 private int zlibEnforce(int result)
360 @safe {
361 	switch (result) {
362 		default:
363 			if (result < 0) throw new Exception("unknown zlib error");
364 			else return result;
365 		case Z_ERRNO: throw new Exception("zlib errno error");
366 		case Z_STREAM_ERROR: throw new Exception("zlib stream error");
367 		case Z_DATA_ERROR: throw new Exception("zlib data error");
368 		case Z_MEM_ERROR: throw new Exception("zlib memory error");
369 		case Z_BUF_ERROR: throw new Exception("zlib buffer error");
370 		case Z_VERSION_ERROR: throw new Exception("zlib version error");
371 	}
372 }