1 /**
2 	Zlib input/output streams
3 
4 	Copyright: © 2012-2013 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.stream.zlib;
9 
10 import vibe.core.stream;
11 import vibe.utils.array;
12 import vibe.internal.freelistref;
13 import vibe.internal.interfaceproxy : InterfaceProxy, interfaceProxy;
14 
15 import std.algorithm;
16 import std.exception;
17 import etc.c.zlib;
18 
19 import vibe.core.log;
20 
21 
22 /** Creates a new deflate uncompression stream.
23 */
24 ZlibInputStream createDeflateInputStream(InputStream)(InputStream source) @safe
25 	if (isInputStream!InputStream)
26 {
27 	return new ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.deflate, true);
28 }
29 
30 /// private
31 FreeListRef!ZlibInputStream createDeflateInputStreamFL(InputStream)(InputStream source) @safe
32 	if (isInputStream!InputStream)
33 {
34 	return FreeListRef!ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.deflate, true);
35 }
36 
37 /** Creates a new deflate compression stream.
38 */
39 ZlibOutputStream createDeflateOutputStream(OutputStream)(OutputStream destination) @safe
40 	if (isOutputStream!OutputStream)
41 {
42 	return new ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.deflate, Z_DEFAULT_COMPRESSION, true);
43 }
44 
45 /// private
46 FreeListRef!ZlibOutputStream createDeflateOutputStreamFL(OutputStream)(OutputStream destination) @safe
47 	if (isOutputStream!OutputStream)
48 {
49 	return FreeListRef!ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.deflate, Z_DEFAULT_COMPRESSION, true);
50 }
51 
52 /** Creates a new deflate uncompression stream.
53 */
54 ZlibInputStream createGzipInputStream(InputStream)(InputStream source) @safe
55 	if (isInputStream!InputStream)
56 {
57 	return new ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.gzip, true);
58 }
59 
60 /// private
61 FreeListRef!ZlibInputStream createGzipInputStreamFL(InputStream)(InputStream source) @safe
62 	if (isInputStream!InputStream)
63 {
64 	return FreeListRef!ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.gzip, true);
65 }
66 
67 /** Creates a new deflate uncompression stream.
68 */
69 ZlibOutputStream createGzipOutputStream(OutputStream)(OutputStream destination) @safe
70 	if (isOutputStream!OutputStream)
71 {
72 	return new ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.gzip, Z_DEFAULT_COMPRESSION, true);
73 }
74 
75 /// private
76 FreeListRef!ZlibOutputStream createGzipOutputStreamFL(OutputStream)(OutputStream destination) @safe
77 	if (isOutputStream!OutputStream)
78 {
79 	return FreeListRef!ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.gzip, Z_DEFAULT_COMPRESSION, true);
80 }
81 
82 
83 /**
84 	Generic zlib output stream.
85 */
86 class ZlibOutputStream : OutputStream {
87 @safe:
88 
89 	private {
90 		InterfaceProxy!OutputStream m_out;
91 		z_stream m_zstream;
92 		ubyte[1024] m_outbuffer;
93 		//ubyte[4096] m_inbuffer;
94 		bool m_finalized = false;
95 	}
96 
97 	enum HeaderFormat {
98 		gzip,
99 		deflate
100 	}
101 
102 	/// private
103 	this(InterfaceProxy!OutputStream dst, HeaderFormat type, int level, bool dummy)
104 	{
105 		m_out = dst;
106 		zlibEnforce(() @trusted { return deflateInit2(&m_zstream, level, Z_DEFLATED, 15 + (type == HeaderFormat.gzip ? 16 : 0), 8, Z_DEFAULT_STRATEGY); } ());
107 	}
108 
109 	~this() {
110 		if (!m_finalized)
111 			() @trusted { deflateEnd(&m_zstream); } ();
112 	}
113 
114 	final size_t write(in ubyte[] data, IOMode mode)
115 	{
116 		// TODO: support IOMode!
117 		if (!data.length) return 0;
118 		assert(!m_finalized);
119 		assert(m_zstream.avail_in == 0);
120 		m_zstream.next_in = () @trusted { return cast(ubyte*)data.ptr; } ();
121 		assert(data.length < uint.max);
122 		m_zstream.avail_in = cast(uint)data.length;
123 		doFlush(Z_NO_FLUSH);
124 		assert(m_zstream.avail_in == 0);
125 		m_zstream.next_in = null;
126 		return data.length;
127 	}
128 
129 	alias write = OutputStream.write;
130 
131 	final void flush()
132 	{
133 		assert(!m_finalized);
134 		//doFlush(Z_SYNC_FLUSH);
135 		m_out.flush();
136 	}
137 
138 	final void finalize()
139 	{
140 		if (m_finalized) return;
141 		m_finalized = true;
142 		doFlush(Z_FINISH);
143 		m_out.flush();
144 		zlibEnforce(() @trusted { return deflateEnd(&m_zstream); }());
145 	}
146 
147 	private final void doFlush(int how)
148 	@safe {
149 		while (true) {
150 			m_zstream.next_out = &m_outbuffer[0];
151 			m_zstream.avail_out = cast(uint)m_outbuffer.length;
152 			//logInfo("deflate %s -> %s (%s)", m_zstream.avail_in, m_zstream.avail_out, how);
153 			auto ret = () @trusted { return deflate(&m_zstream, how); } ();
154 			//logInfo("    ... %s -> %s", m_zstream.avail_in, m_zstream.avail_out);
155 			switch (ret) {
156 				default:
157 					zlibEnforce(ret);
158 					assert(false, "Unknown return value for zlib deflate.");
159 				case Z_OK:
160 					assert(m_zstream.avail_out < m_outbuffer.length || m_zstream.avail_in == 0);
161 					m_out.write(m_outbuffer[0 .. m_outbuffer.length - m_zstream.avail_out]);
162 					break;
163 				case Z_BUF_ERROR:
164 					assert(m_zstream.avail_in == 0);
165 					return;
166 				case Z_STREAM_END:
167 					assert(how == Z_FINISH);
168 					m_out.write(m_outbuffer[0 .. m_outbuffer.length - m_zstream.avail_out]);
169 					return;
170 			}
171 		}
172 	}
173 }
174 
175 
176 unittest {
177 	import vibe.stream.memory;
178 	import vibe.stream.operations;
179 
180 	auto raw = cast(ubyte[])"Hello, World!\n".dup;
181 	ubyte[] gzip = [
182 		0x1F, 0x8B, 0x08, 0x08, 0xAF, 0x12, 0x42, 0x56, 0x00, 0x03, 0x74, 0x65, 0x73, 0x74, 0x2E, 0x74,
183 		0x78, 0x74, 0x00, 0xF3, 0x48, 0xCD, 0xC9, 0xC9, 0xD7, 0x51, 0x08, 0xCF, 0x2F, 0xCA, 0x49, 0x51,
184 		0xE4, 0x02, 0x00, 0x84, 0x9E, 0xE8, 0xB4, 0x0E, 0x00, 0x00, 0x00];
185 
186 	auto gzipin = createGzipInputStream(createMemoryStream(gzip));
187 	assert(gzipin.readAll() == raw);
188 }
189 
190 unittest {
191 	import vibe.stream.memory;
192 	import vibe.stream.operations;
193 
194 	ubyte[] gzip_partial = [
195 		0x1F, 0x8B, 0x08, 0x08, 0xAF, 0x12, 0x42, 0x56, 0x00, 0x03, 0x74, 0x65, 0x73, 0x74, 0x2E, 0x74,
196 		0x78, 0x74, 0x00, 0xF3, 0x48, 0xCD, 0xC9, 0xC9, 0xD7, 0x51, 0x08, 0xCF, 0x2F, 0xCA, 0x49, 0x51,
197 	];
198 
199 	auto gzipin = createGzipInputStream(createMemoryStream(gzip_partial));
200 	try {
201 		gzipin.readAll();
202 		assert(false, "Expected exception.");
203 	} catch (Exception e) {}
204 	assert(gzipin.empty);
205 }
206 
207 /**
208 	Generic zlib input stream.
209 */
210 class ZlibInputStream : InputStream {
211 @safe:
212 
213 	import std.zlib;
214 	private {
215 		InterfaceProxy!InputStream m_in;
216 		z_stream m_zstream;
217 		FixedRingBuffer!(ubyte, 4096) m_outbuffer;
218 		ubyte[1024] m_inbuffer;
219 		bool m_finished = false;
220 		ulong m_ninflated, n_read;
221 	}
222 
223 	enum HeaderFormat {
224 		gzip,
225 		deflate,
226 		automatic
227 	}
228 
229 	/// private
230 	this(InterfaceProxy!InputStream src, HeaderFormat type, bool dummy)
231 	{
232 		m_in = src;
233 		if (m_in.empty) {
234 			m_finished = true;
235 		} else {
236 			int wndbits = 15;
237 			if(type == HeaderFormat.gzip) wndbits += 16;
238 			else if(type == HeaderFormat.automatic) wndbits += 32;
239 			zlibEnforce(() @trusted { return inflateInit2(&m_zstream, wndbits); } ());
240 			readChunk();
241 		}
242 	}
243 
244 	~this() {
245 		if (!m_finished)
246 			() @trusted { inflateEnd(&m_zstream); } ();
247 	}
248 
249 	@property bool empty() { return this.leastSize == 0; }
250 
251 	@property ulong leastSize()
252 	{
253 		assert(!m_finished || m_in.empty, "Input contains more data than expected.");
254 		if (m_outbuffer.length > 0) return m_outbuffer.length;
255 		if (m_finished) return 0;
256 		readChunk();
257 		assert(m_outbuffer.length || m_finished);
258 
259 		return m_outbuffer.length;
260 	}
261 
262 	@property bool dataAvailableForRead()
263 	{
264 		return m_outbuffer.length > 0;
265 	}
266 
267 	const(ubyte)[] peek() { return m_outbuffer.peek(); }
268 
269 	size_t read(scope ubyte[] dst, IOMode mode)
270 	{
271 		enforce(dst.length == 0 || !empty, "Reading empty stream");
272 
273 		size_t nread = 0;
274 
275 		while (dst.length > 0) {
276 			auto len = min(m_outbuffer.length, dst.length);
277 			m_outbuffer.read(dst[0 .. len]);
278 			dst = dst[len .. $];
279 
280 			nread += len;
281 
282 			if (!m_outbuffer.length && !m_finished) {
283 				if (mode == IOMode.immediate || mode == IOMode.once && !nread)
284 					break;
285 				readChunk();
286 			}
287 			enforce(dst.length == 0 || m_outbuffer.length || !m_finished, "Reading past end of zlib stream.");
288 		}
289 
290 		return nread;
291 	}
292 
293 	alias read = InputStream.read;
294 
295 	private void readChunk()
296 	@safe {
297 		assert(m_outbuffer.length == 0, "Buffer must be empty to read the next chunk.");
298 		assert(m_outbuffer.peekDst().length > 0);
299 		enforce (!m_finished, "Reading past end of zlib stream.");
300 
301 		m_zstream.next_out = &m_outbuffer.peekDst()[0];
302 		m_zstream.avail_out = cast(uint)m_outbuffer.peekDst().length;
303 
304 		while (!m_outbuffer.length) {
305 			if (m_zstream.avail_in == 0) {
306 				auto clen = min(m_inbuffer.length, m_in.leastSize);
307 				if (clen == 0) {
308 					m_finished = true;
309 					throw new Exception("Premature end of compressed input.");
310 				}
311 				m_in.read(m_inbuffer[0 .. clen]);
312 				m_zstream.next_in = &m_inbuffer[0];
313 				m_zstream.avail_in = cast(uint)clen;
314 			}
315 			auto avins = m_zstream.avail_in;
316 			//logInfo("inflate %s -> %s (@%s in @%s)", m_zstream.avail_in, m_zstream.avail_out, m_ninflated, n_read);
317 			auto ret = zlibEnforce(() @trusted { return inflate(&m_zstream, Z_SYNC_FLUSH); } ());
318 			//logInfo("    ... %s -> %s", m_zstream.avail_in, m_zstream.avail_out);
319 			assert(m_zstream.avail_out != m_outbuffer.peekDst.length || m_zstream.avail_in != avins);
320 			m_ninflated += m_outbuffer.peekDst().length - m_zstream.avail_out;
321 			n_read += avins - m_zstream.avail_in;
322 			m_outbuffer.putN(m_outbuffer.peekDst().length - m_zstream.avail_out);
323 			assert(m_zstream.avail_out == 0 || m_zstream.avail_out == m_outbuffer.peekDst().length);
324 
325 			if (ret == Z_STREAM_END) {
326 				m_finished = true;
327 				zlibEnforce(() @trusted { return inflateEnd(&m_zstream); }());
328 				enforce(m_in.empty, "Extra data after end of compressed input.");
329 				return;
330 			}
331 		}
332 	}
333 }
334 
335 unittest {
336 	import vibe.stream.memory;
337 
338 	auto data = new ubyte[5000];
339 
340 	auto mos = createMemoryOutputStream();
341 	auto gos = createGzipOutputStream(mos);
342 	gos.write(data);
343 	gos.finalize();
344 
345 	auto ms = createMemoryStream(mos.data, false);
346 	auto gis = createGzipInputStream(ms);
347 
348 	auto result = new ubyte[data.length];
349 	gis.read(result);
350 	assert(data == result);
351 }
352 
353 private int zlibEnforce(int result)
354 @safe {
355 	switch (result) {
356 		default:
357 			if (result < 0) throw new Exception("unknown zlib error");
358 			else return result;
359 		case Z_ERRNO: throw new Exception("zlib errno error");
360 		case Z_STREAM_ERROR: throw new Exception("zlib stream error");
361 		case Z_DATA_ERROR: throw new Exception("zlib data error");
362 		case Z_MEM_ERROR: throw new Exception("zlib memory error");
363 		case Z_BUF_ERROR: throw new Exception("zlib buffer error");
364 		case Z_VERSION_ERROR: throw new Exception("zlib version error");
365 	}
366 }