1 /**
2 	Zlib input/output streams
3 
4 	Copyright: © 2012-2013 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig
7 */
8 module vibe.stream.zlib;
9 
10 import vibe.core.stream;
11 import vibe.utils.array;
12 import vibe.internal.freelistref;
13 import vibe.internal.interfaceproxy : InterfaceProxy, interfaceProxy;
14 
15 import std.algorithm;
16 import std.exception;
17 import etc.c.zlib;
18 
19 import vibe.core.log;
20 
21 
22 /** Creates a new deflate uncompression stream.
23 */
24 ZlibInputStream createDeflateInputStream(InputStream)(InputStream source) @safe
25 	if (isInputStream!InputStream)
26 {
27 	return new ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.deflate, true);
28 }
29 
30 /// private
31 FreeListRef!ZlibInputStream createDeflateInputStreamFL(InputStream)(InputStream source) @safe
32 	if (isInputStream!InputStream)
33 {
34 	return FreeListRef!ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.deflate, true);
35 }
36 
37 /** Creates a new deflate compression stream.
38 */
39 ZlibOutputStream createDeflateOutputStream(OutputStream)(OutputStream destination) @safe
40 	if (isOutputStream!OutputStream)
41 {
42 	return new ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.deflate, Z_DEFAULT_COMPRESSION, true);
43 }
44 
45 /// private
46 FreeListRef!ZlibOutputStream createDeflateOutputStreamFL(OutputStream)(OutputStream destination) @safe
47 	if (isOutputStream!OutputStream)
48 {
49 	return FreeListRef!ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.deflate, Z_DEFAULT_COMPRESSION, true);
50 }
51 
52 /** Creates a new deflate uncompression stream.
53 */
54 ZlibInputStream createGzipInputStream(InputStream)(InputStream source) @safe
55 	if (isInputStream!InputStream)
56 {
57 	return new ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.gzip, true);
58 }
59 
60 /// private
61 FreeListRef!ZlibInputStream createGzipInputStreamFL(InputStream)(InputStream source) @safe
62 	if (isInputStream!InputStream)
63 {
64 	return FreeListRef!ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.gzip, true);
65 }
66 
67 /** Creates a new deflate uncompression stream.
68 */
69 ZlibOutputStream createGzipOutputStream(OutputStream)(OutputStream destination) @safe
70 	if (isOutputStream!OutputStream)
71 {
72 	return new ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.gzip, Z_DEFAULT_COMPRESSION, true);
73 }
74 
75 /// private
76 FreeListRef!ZlibOutputStream createGzipOutputStreamFL(OutputStream)(OutputStream destination) @safe
77 	if (isOutputStream!OutputStream)
78 {
79 	return FreeListRef!ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.gzip, Z_DEFAULT_COMPRESSION, true);
80 }
81 
82 
83 /**
84 	Writes any data compressed in deflate format to the specified output stream.
85 */
86 deprecated("Use createDeflateOutputStream instead.")
87 final class DeflateOutputStream : ZlibOutputStream {
88 	@safe this(OutputStream dst)
89 	{
90 		super(dst, HeaderFormat.deflate);
91 	}
92 }
93 
94 
95 /**
96 	Writes any data compressed in gzip format to the specified output stream.
97 */
98 deprecated("Use createGzipOutputStream instead.")
99 final class GzipOutputStream : ZlibOutputStream {
100 	@safe this(OutputStream dst)
101 	{
102 		super(dst, HeaderFormat.gzip);
103 	}
104 }
105 
106 /**
107 	Generic zlib output stream.
108 */
109 class ZlibOutputStream : OutputStream {
110 @safe:
111 
112 	private {
113 		InterfaceProxy!OutputStream m_out;
114 		z_stream m_zstream;
115 		ubyte[1024] m_outbuffer;
116 		//ubyte[4096] m_inbuffer;
117 		bool m_finalized = false;
118 	}
119 
120 	enum HeaderFormat {
121 		gzip,
122 		deflate
123 	}
124 
125 	deprecated("Use createGzipOutputStream/createDeflateOutputStream instead.")
126 	this(OutputStream dst, HeaderFormat type, int level = Z_DEFAULT_COMPRESSION)
127 	{
128 		this(interfaceProxy!OutputStream(dst), type, level, true);
129 	}
130 
131 	/// private
132 	this(InterfaceProxy!OutputStream dst, HeaderFormat type, int level, bool dummy)
133 	{
134 		m_out = dst;
135 		zlibEnforce(() @trusted { return deflateInit2(&m_zstream, level, Z_DEFLATED, 15 + (type == HeaderFormat.gzip ? 16 : 0), 8, Z_DEFAULT_STRATEGY); } ());
136 	}
137 
138 	~this() {
139 		if (!m_finalized)
140 			() @trusted { deflateEnd(&m_zstream); } ();
141 	}
142 
143 	final size_t write(in ubyte[] data, IOMode mode)
144 	{
145 		// TODO: support IOMode!
146 		if (!data.length) return 0;
147 		assert(!m_finalized);
148 		assert(m_zstream.avail_in == 0);
149 		m_zstream.next_in = () @trusted { return cast(ubyte*)data.ptr; } ();
150 		assert(data.length < uint.max);
151 		m_zstream.avail_in = cast(uint)data.length;
152 		doFlush(Z_NO_FLUSH);
153 		assert(m_zstream.avail_in == 0);
154 		m_zstream.next_in = null;
155 		return data.length;
156 	}
157 
158 	alias write = OutputStream.write;
159 
160 	final void flush()
161 	{
162 		assert(!m_finalized);
163 		//doFlush(Z_SYNC_FLUSH);
164 		m_out.flush();
165 	}
166 
167 	final void finalize()
168 	{
169 		if (m_finalized) return;
170 		m_finalized = true;
171 		doFlush(Z_FINISH);
172 		m_out.flush();
173 		zlibEnforce(() @trusted { return deflateEnd(&m_zstream); }());
174 	}
175 
176 	private final void doFlush(int how)
177 	@safe {
178 		while (true) {
179 			m_zstream.next_out = &m_outbuffer[0];
180 			m_zstream.avail_out = cast(uint)m_outbuffer.length;
181 			//logInfo("deflate %s -> %s (%s)", m_zstream.avail_in, m_zstream.avail_out, how);
182 			auto ret = () @trusted { return deflate(&m_zstream, how); } ();
183 			//logInfo("    ... %s -> %s", m_zstream.avail_in, m_zstream.avail_out);
184 			switch (ret) {
185 				default:
186 					zlibEnforce(ret);
187 					assert(false, "Unknown return value for zlib deflate.");
188 				case Z_OK:
189 					assert(m_zstream.avail_out < m_outbuffer.length || m_zstream.avail_in == 0);
190 					m_out.write(m_outbuffer[0 .. m_outbuffer.length - m_zstream.avail_out]);
191 					break;
192 				case Z_BUF_ERROR:
193 					assert(m_zstream.avail_in == 0);
194 					return;
195 				case Z_STREAM_END:
196 					assert(how == Z_FINISH);
197 					m_out.write(m_outbuffer[0 .. m_outbuffer.length - m_zstream.avail_out]);
198 					return;
199 			}
200 		}
201 	}
202 }
203 
204 
205 /**
206 	Takes an input stream that contains data in deflate compressed format and outputs the
207 	uncompressed data.
208 */
209 deprecated("Use createDeflateInputStream instead.")
210 class DeflateInputStream : ZlibInputStream {
211 	@safe this(InputStream dst)
212 	{
213 		super(dst, HeaderFormat.deflate);
214 	}
215 }
216 
217 
218 /**
219 	Takes an input stream that contains data in gzip compressed format and outputs the
220 	uncompressed data.
221 */
222 deprecated("Use createGzipInputStream instead.")
223 class GzipInputStream : ZlibInputStream {
224 	this(InputStream dst)
225 	@safe {
226 		super(dst, HeaderFormat.gzip);
227 	}
228 }
229 
230 unittest {
231 	import vibe.stream.memory;
232 	import vibe.stream.operations;
233 
234 	auto raw = cast(ubyte[])"Hello, World!\n".dup;
235 	ubyte[] gzip = [
236 		0x1F, 0x8B, 0x08, 0x08, 0xAF, 0x12, 0x42, 0x56, 0x00, 0x03, 0x74, 0x65, 0x73, 0x74, 0x2E, 0x74,
237 		0x78, 0x74, 0x00, 0xF3, 0x48, 0xCD, 0xC9, 0xC9, 0xD7, 0x51, 0x08, 0xCF, 0x2F, 0xCA, 0x49, 0x51,
238 		0xE4, 0x02, 0x00, 0x84, 0x9E, 0xE8, 0xB4, 0x0E, 0x00, 0x00, 0x00];
239 
240 	auto gzipin = createGzipInputStream(createMemoryStream(gzip));
241 	assert(gzipin.readAll() == raw);
242 }
243 
244 unittest {
245 	import vibe.stream.memory;
246 	import vibe.stream.operations;
247 
248 	ubyte[] gzip_partial = [
249 		0x1F, 0x8B, 0x08, 0x08, 0xAF, 0x12, 0x42, 0x56, 0x00, 0x03, 0x74, 0x65, 0x73, 0x74, 0x2E, 0x74,
250 		0x78, 0x74, 0x00, 0xF3, 0x48, 0xCD, 0xC9, 0xC9, 0xD7, 0x51, 0x08, 0xCF, 0x2F, 0xCA, 0x49, 0x51,
251 	];
252 
253 	auto gzipin = createGzipInputStream(createMemoryStream(gzip_partial));
254 	try {
255 		gzipin.readAll();
256 		assert(false, "Expected exception.");
257 	} catch (Exception e) {}
258 	assert(gzipin.empty);
259 }
260 
261 /**
262 	Generic zlib input stream.
263 */
264 class ZlibInputStream : InputStream {
265 @safe:
266 
267 	import std.zlib;
268 	private {
269 		InterfaceProxy!InputStream m_in;
270 		z_stream m_zstream;
271 		FixedRingBuffer!(ubyte, 4096) m_outbuffer;
272 		ubyte[1024] m_inbuffer;
273 		bool m_finished = false;
274 		ulong m_ninflated, n_read;
275 	}
276 
277 	enum HeaderFormat {
278 		gzip,
279 		deflate,
280 		automatic
281 	}
282 
283 	deprecated("Use createGzipInputStream/createDeflateInputStream instead.")
284 	this(InputStream src, HeaderFormat type)
285 	{
286 		this(interfaceProxy!InputStream(src), type, true);
287 	}
288 
289 	/// private
290 	this(InterfaceProxy!InputStream src, HeaderFormat type, bool dummy)
291 	{
292 		m_in = src;
293 		if (m_in.empty) {
294 			m_finished = true;
295 		} else {
296 			int wndbits = 15;
297 			if(type == HeaderFormat.gzip) wndbits += 16;
298 			else if(type == HeaderFormat.automatic) wndbits += 32;
299 			zlibEnforce(() @trusted { return inflateInit2(&m_zstream, wndbits); } ());
300 			readChunk();
301 		}
302 	}
303 
304 	~this() {
305 		if (!m_finished)
306 			() @trusted { inflateEnd(&m_zstream); } ();
307 	}
308 
309 	@property bool empty() { return this.leastSize == 0; }
310 
311 	@property ulong leastSize()
312 	{
313 		assert(!m_finished || m_in.empty, "Input contains more data than expected.");
314 		if (m_outbuffer.length > 0) return m_outbuffer.length;
315 		if (m_finished) return 0;
316 		readChunk();
317 		assert(m_outbuffer.length || m_finished);
318 
319 		return m_outbuffer.length;
320 	}
321 
322 	@property bool dataAvailableForRead()
323 	{
324 		return m_outbuffer.length > 0;
325 	}
326 
327 	const(ubyte)[] peek() { return m_outbuffer.peek(); }
328 
329 	size_t read(scope ubyte[] dst, IOMode mode)
330 	{
331 		enforce(dst.length == 0 || !empty, "Reading empty stream");
332 
333 		size_t nread = 0;
334 
335 		while (dst.length > 0) {
336 			auto len = min(m_outbuffer.length, dst.length);
337 			m_outbuffer.read(dst[0 .. len]);
338 			dst = dst[len .. $];
339 
340 			nread += len;
341 
342 			if (!m_outbuffer.length && !m_finished) {
343 				if (mode == IOMode.immediate || mode == IOMode.once && !nread)
344 					break;
345 				readChunk();
346 			}
347 			enforce(dst.length == 0 || m_outbuffer.length || !m_finished, "Reading past end of zlib stream.");
348 		}
349 
350 		return nread;
351 	}
352 
353 	alias read = InputStream.read;
354 
355 	private void readChunk()
356 	@safe {
357 		assert(m_outbuffer.length == 0, "Buffer must be empty to read the next chunk.");
358 		assert(m_outbuffer.peekDst().length > 0);
359 		enforce (!m_finished, "Reading past end of zlib stream.");
360 
361 		m_zstream.next_out = &m_outbuffer.peekDst()[0];
362 		m_zstream.avail_out = cast(uint)m_outbuffer.peekDst().length;
363 
364 		while (!m_outbuffer.length) {
365 			if (m_zstream.avail_in == 0) {
366 				auto clen = min(m_inbuffer.length, m_in.leastSize);
367 				if (clen == 0) {
368 					m_finished = true;
369 					throw new Exception("Premature end of compressed input.");
370 				}
371 				m_in.read(m_inbuffer[0 .. clen]);
372 				m_zstream.next_in = &m_inbuffer[0];
373 				m_zstream.avail_in = cast(uint)clen;
374 			}
375 			auto avins = m_zstream.avail_in;
376 			//logInfo("inflate %s -> %s (@%s in @%s)", m_zstream.avail_in, m_zstream.avail_out, m_ninflated, n_read);
377 			auto ret = zlibEnforce(() @trusted { return inflate(&m_zstream, Z_SYNC_FLUSH); } ());
378 			//logInfo("    ... %s -> %s", m_zstream.avail_in, m_zstream.avail_out);
379 			assert(m_zstream.avail_out != m_outbuffer.peekDst.length || m_zstream.avail_in != avins);
380 			m_ninflated += m_outbuffer.peekDst().length - m_zstream.avail_out;
381 			n_read += avins - m_zstream.avail_in;
382 			m_outbuffer.putN(m_outbuffer.peekDst().length - m_zstream.avail_out);
383 			assert(m_zstream.avail_out == 0 || m_zstream.avail_out == m_outbuffer.peekDst().length);
384 
385 			if (ret == Z_STREAM_END) {
386 				m_finished = true;
387 				zlibEnforce(() @trusted { return inflateEnd(&m_zstream); }());
388 				enforce(m_in.empty, "Extra data after end of compressed input.");
389 				return;
390 			}
391 		}
392 	}
393 }
394 
395 unittest {
396 	import vibe.stream.memory;
397 
398 	auto data = new ubyte[5000];
399 
400 	auto mos = createMemoryOutputStream();
401 	auto gos = createGzipOutputStream(mos);
402 	gos.write(data);
403 	gos.finalize();
404 
405 	auto ms = createMemoryStream(mos.data, false);
406 	auto gis = createGzipInputStream(ms);
407 
408 	auto result = new ubyte[data.length];
409 	gis.read(result);
410 	assert(data == result);
411 }
412 
413 private int zlibEnforce(int result)
414 @safe {
415 	switch (result) {
416 		default:
417 			if (result < 0) throw new Exception("unknown zlib error");
418 			else return result;
419 		case Z_ERRNO: throw new Exception("zlib errno error");
420 		case Z_STREAM_ERROR: throw new Exception("zlib stream error");
421 		case Z_DATA_ERROR: throw new Exception("zlib data error");
422 		case Z_MEM_ERROR: throw new Exception("zlib memory error");
423 		case Z_BUF_ERROR: throw new Exception("zlib buffer error");
424 		case Z_VERSION_ERROR: throw new Exception("zlib version error");
425 	}
426 }