1 /** 2 Zlib input/output streams 3 4 Copyright: © 2012-2013 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.stream.zlib; 9 10 import vibe.core.stream; 11 import vibe.utils.array; 12 import vibe.internal.freelistref; 13 import vibe.internal.interfaceproxy : InterfaceProxy, interfaceProxy; 14 15 import std.algorithm; 16 import std.exception; 17 import etc.c.zlib; 18 19 import vibe.core.log; 20 21 22 /** Creates a new deflate uncompression stream. 23 */ 24 ZlibInputStream createDeflateInputStream(InputStream)(InputStream source) @safe 25 if (isInputStream!InputStream) 26 { 27 return new ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.deflate, true); 28 } 29 30 /// private 31 FreeListRef!ZlibInputStream createDeflateInputStreamFL(InputStream)(InputStream source) @safe 32 if (isInputStream!InputStream) 33 { 34 return FreeListRef!ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.deflate, true); 35 } 36 37 /** Creates a new deflate compression stream. 38 */ 39 ZlibOutputStream createDeflateOutputStream(OutputStream)(OutputStream destination) @safe 40 if (isOutputStream!OutputStream) 41 { 42 return new ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.deflate, Z_DEFAULT_COMPRESSION, true); 43 } 44 45 /// private 46 FreeListRef!ZlibOutputStream createDeflateOutputStreamFL(OutputStream)(OutputStream destination) @safe 47 if (isOutputStream!OutputStream) 48 { 49 return FreeListRef!ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.deflate, Z_DEFAULT_COMPRESSION, true); 50 } 51 52 /** Creates a new deflate uncompression stream. 53 */ 54 ZlibInputStream createGzipInputStream(InputStream)(InputStream source) @safe 55 if (isInputStream!InputStream) 56 { 57 return new ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.gzip, true); 58 } 59 60 /// private 61 FreeListRef!ZlibInputStream createGzipInputStreamFL(InputStream)(InputStream source) @safe 62 if (isInputStream!InputStream) 63 { 64 return FreeListRef!ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.gzip, true); 65 } 66 67 /** Creates a new deflate uncompression stream. 68 */ 69 ZlibOutputStream createGzipOutputStream(OutputStream)(OutputStream destination) @safe 70 if (isOutputStream!OutputStream) 71 { 72 return new ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.gzip, Z_DEFAULT_COMPRESSION, true); 73 } 74 75 /// private 76 FreeListRef!ZlibOutputStream createGzipOutputStreamFL(OutputStream)(OutputStream destination) @safe 77 if (isOutputStream!OutputStream) 78 { 79 return FreeListRef!ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.gzip, Z_DEFAULT_COMPRESSION, true); 80 } 81 82 83 /** 84 Writes any data compressed in deflate format to the specified output stream. 85 */ 86 deprecated("Use createDeflateOutputStream instead.") 87 final class DeflateOutputStream : ZlibOutputStream { 88 @safe this(OutputStream dst) 89 { 90 super(dst, HeaderFormat.deflate); 91 } 92 } 93 94 95 /** 96 Writes any data compressed in gzip format to the specified output stream. 97 */ 98 deprecated("Use createGzipOutputStream instead.") 99 final class GzipOutputStream : ZlibOutputStream { 100 @safe this(OutputStream dst) 101 { 102 super(dst, HeaderFormat.gzip); 103 } 104 } 105 106 /** 107 Generic zlib output stream. 108 */ 109 class ZlibOutputStream : OutputStream { 110 @safe: 111 112 private { 113 InterfaceProxy!OutputStream m_out; 114 z_stream m_zstream; 115 ubyte[1024] m_outbuffer; 116 //ubyte[4096] m_inbuffer; 117 bool m_finalized = false; 118 } 119 120 enum HeaderFormat { 121 gzip, 122 deflate 123 } 124 125 deprecated("Use createGzipOutputStream/createDeflateOutputStream instead.") 126 this(OutputStream dst, HeaderFormat type, int level = Z_DEFAULT_COMPRESSION) 127 { 128 this(interfaceProxy!OutputStream(dst), type, level, true); 129 } 130 131 /// private 132 this(InterfaceProxy!OutputStream dst, HeaderFormat type, int level, bool dummy) 133 { 134 m_out = dst; 135 zlibEnforce(() @trusted { return deflateInit2(&m_zstream, level, Z_DEFLATED, 15 + (type == HeaderFormat.gzip ? 16 : 0), 8, Z_DEFAULT_STRATEGY); } ()); 136 } 137 138 ~this() { 139 if (!m_finalized) 140 () @trusted { deflateEnd(&m_zstream); } (); 141 } 142 143 final size_t write(in ubyte[] data, IOMode mode) 144 { 145 // TODO: support IOMode! 146 if (!data.length) return 0; 147 assert(!m_finalized); 148 assert(m_zstream.avail_in == 0); 149 m_zstream.next_in = () @trusted { return cast(ubyte*)data.ptr; } (); 150 assert(data.length < uint.max); 151 m_zstream.avail_in = cast(uint)data.length; 152 doFlush(Z_NO_FLUSH); 153 assert(m_zstream.avail_in == 0); 154 m_zstream.next_in = null; 155 return data.length; 156 } 157 158 alias write = OutputStream.write; 159 160 final void flush() 161 { 162 assert(!m_finalized); 163 //doFlush(Z_SYNC_FLUSH); 164 m_out.flush(); 165 } 166 167 final void finalize() 168 { 169 if (m_finalized) return; 170 m_finalized = true; 171 doFlush(Z_FINISH); 172 m_out.flush(); 173 zlibEnforce(() @trusted { return deflateEnd(&m_zstream); }()); 174 } 175 176 private final void doFlush(int how) 177 @safe { 178 while (true) { 179 m_zstream.next_out = &m_outbuffer[0]; 180 m_zstream.avail_out = cast(uint)m_outbuffer.length; 181 //logInfo("deflate %s -> %s (%s)", m_zstream.avail_in, m_zstream.avail_out, how); 182 auto ret = () @trusted { return deflate(&m_zstream, how); } (); 183 //logInfo(" ... %s -> %s", m_zstream.avail_in, m_zstream.avail_out); 184 switch (ret) { 185 default: 186 zlibEnforce(ret); 187 assert(false, "Unknown return value for zlib deflate."); 188 case Z_OK: 189 assert(m_zstream.avail_out < m_outbuffer.length || m_zstream.avail_in == 0); 190 m_out.write(m_outbuffer[0 .. m_outbuffer.length - m_zstream.avail_out]); 191 break; 192 case Z_BUF_ERROR: 193 assert(m_zstream.avail_in == 0); 194 return; 195 case Z_STREAM_END: 196 assert(how == Z_FINISH); 197 m_out.write(m_outbuffer[0 .. m_outbuffer.length - m_zstream.avail_out]); 198 return; 199 } 200 } 201 } 202 } 203 204 205 /** 206 Takes an input stream that contains data in deflate compressed format and outputs the 207 uncompressed data. 208 */ 209 deprecated("Use createDeflateInputStream instead.") 210 class DeflateInputStream : ZlibInputStream { 211 @safe this(InputStream dst) 212 { 213 super(dst, HeaderFormat.deflate); 214 } 215 } 216 217 218 /** 219 Takes an input stream that contains data in gzip compressed format and outputs the 220 uncompressed data. 221 */ 222 deprecated("Use createGzipInputStream instead.") 223 class GzipInputStream : ZlibInputStream { 224 this(InputStream dst) 225 @safe { 226 super(dst, HeaderFormat.gzip); 227 } 228 } 229 230 unittest { 231 import vibe.stream.memory; 232 import vibe.stream.operations; 233 234 auto raw = cast(ubyte[])"Hello, World!\n".dup; 235 ubyte[] gzip = [ 236 0x1F, 0x8B, 0x08, 0x08, 0xAF, 0x12, 0x42, 0x56, 0x00, 0x03, 0x74, 0x65, 0x73, 0x74, 0x2E, 0x74, 237 0x78, 0x74, 0x00, 0xF3, 0x48, 0xCD, 0xC9, 0xC9, 0xD7, 0x51, 0x08, 0xCF, 0x2F, 0xCA, 0x49, 0x51, 238 0xE4, 0x02, 0x00, 0x84, 0x9E, 0xE8, 0xB4, 0x0E, 0x00, 0x00, 0x00]; 239 240 auto gzipin = createGzipInputStream(createMemoryStream(gzip)); 241 assert(gzipin.readAll() == raw); 242 } 243 244 unittest { 245 import vibe.stream.memory; 246 import vibe.stream.operations; 247 248 ubyte[] gzip_partial = [ 249 0x1F, 0x8B, 0x08, 0x08, 0xAF, 0x12, 0x42, 0x56, 0x00, 0x03, 0x74, 0x65, 0x73, 0x74, 0x2E, 0x74, 250 0x78, 0x74, 0x00, 0xF3, 0x48, 0xCD, 0xC9, 0xC9, 0xD7, 0x51, 0x08, 0xCF, 0x2F, 0xCA, 0x49, 0x51, 251 ]; 252 253 auto gzipin = createGzipInputStream(createMemoryStream(gzip_partial)); 254 try { 255 gzipin.readAll(); 256 assert(false, "Expected exception."); 257 } catch (Exception e) {} 258 assert(gzipin.empty); 259 } 260 261 /** 262 Generic zlib input stream. 263 */ 264 class ZlibInputStream : InputStream { 265 @safe: 266 267 import std.zlib; 268 private { 269 InterfaceProxy!InputStream m_in; 270 z_stream m_zstream; 271 FixedRingBuffer!(ubyte, 4096) m_outbuffer; 272 ubyte[1024] m_inbuffer; 273 bool m_finished = false; 274 ulong m_ninflated, n_read; 275 } 276 277 enum HeaderFormat { 278 gzip, 279 deflate, 280 automatic 281 } 282 283 deprecated("Use createGzipInputStream/createDeflateInputStream instead.") 284 this(InputStream src, HeaderFormat type) 285 { 286 this(interfaceProxy!InputStream(src), type, true); 287 } 288 289 /// private 290 this(InterfaceProxy!InputStream src, HeaderFormat type, bool dummy) 291 { 292 m_in = src; 293 if (m_in.empty) { 294 m_finished = true; 295 } else { 296 int wndbits = 15; 297 if(type == HeaderFormat.gzip) wndbits += 16; 298 else if(type == HeaderFormat.automatic) wndbits += 32; 299 zlibEnforce(() @trusted { return inflateInit2(&m_zstream, wndbits); } ()); 300 readChunk(); 301 } 302 } 303 304 ~this() { 305 if (!m_finished) 306 () @trusted { inflateEnd(&m_zstream); } (); 307 } 308 309 @property bool empty() { return this.leastSize == 0; } 310 311 @property ulong leastSize() 312 { 313 assert(!m_finished || m_in.empty, "Input contains more data than expected."); 314 if (m_outbuffer.length > 0) return m_outbuffer.length; 315 if (m_finished) return 0; 316 readChunk(); 317 assert(m_outbuffer.length || m_finished); 318 319 return m_outbuffer.length; 320 } 321 322 @property bool dataAvailableForRead() 323 { 324 return m_outbuffer.length > 0; 325 } 326 327 const(ubyte)[] peek() { return m_outbuffer.peek(); } 328 329 size_t read(scope ubyte[] dst, IOMode mode) 330 { 331 enforce(dst.length == 0 || !empty, "Reading empty stream"); 332 333 size_t nread = 0; 334 335 while (dst.length > 0) { 336 auto len = min(m_outbuffer.length, dst.length); 337 m_outbuffer.read(dst[0 .. len]); 338 dst = dst[len .. $]; 339 340 nread += len; 341 342 if (!m_outbuffer.length && !m_finished) { 343 if (mode == IOMode.immediate || mode == IOMode.once && !nread) 344 break; 345 readChunk(); 346 } 347 enforce(dst.length == 0 || m_outbuffer.length || !m_finished, "Reading past end of zlib stream."); 348 } 349 350 return nread; 351 } 352 353 alias read = InputStream.read; 354 355 private void readChunk() 356 @safe { 357 assert(m_outbuffer.length == 0, "Buffer must be empty to read the next chunk."); 358 assert(m_outbuffer.peekDst().length > 0); 359 enforce (!m_finished, "Reading past end of zlib stream."); 360 361 m_zstream.next_out = &m_outbuffer.peekDst()[0]; 362 m_zstream.avail_out = cast(uint)m_outbuffer.peekDst().length; 363 364 while (!m_outbuffer.length) { 365 if (m_zstream.avail_in == 0) { 366 auto clen = min(m_inbuffer.length, m_in.leastSize); 367 if (clen == 0) { 368 m_finished = true; 369 throw new Exception("Premature end of compressed input."); 370 } 371 m_in.read(m_inbuffer[0 .. clen]); 372 m_zstream.next_in = &m_inbuffer[0]; 373 m_zstream.avail_in = cast(uint)clen; 374 } 375 auto avins = m_zstream.avail_in; 376 //logInfo("inflate %s -> %s (@%s in @%s)", m_zstream.avail_in, m_zstream.avail_out, m_ninflated, n_read); 377 auto ret = zlibEnforce(() @trusted { return inflate(&m_zstream, Z_SYNC_FLUSH); } ()); 378 //logInfo(" ... %s -> %s", m_zstream.avail_in, m_zstream.avail_out); 379 assert(m_zstream.avail_out != m_outbuffer.peekDst.length || m_zstream.avail_in != avins); 380 m_ninflated += m_outbuffer.peekDst().length - m_zstream.avail_out; 381 n_read += avins - m_zstream.avail_in; 382 m_outbuffer.putN(m_outbuffer.peekDst().length - m_zstream.avail_out); 383 assert(m_zstream.avail_out == 0 || m_zstream.avail_out == m_outbuffer.peekDst().length); 384 385 if (ret == Z_STREAM_END) { 386 m_finished = true; 387 zlibEnforce(() @trusted { return inflateEnd(&m_zstream); }()); 388 enforce(m_in.empty, "Extra data after end of compressed input."); 389 return; 390 } 391 } 392 } 393 } 394 395 unittest { 396 import vibe.stream.memory; 397 398 auto data = new ubyte[5000]; 399 400 auto mos = createMemoryOutputStream(); 401 auto gos = createGzipOutputStream(mos); 402 gos.write(data); 403 gos.finalize(); 404 405 auto ms = createMemoryStream(mos.data, false); 406 auto gis = createGzipInputStream(ms); 407 408 auto result = new ubyte[data.length]; 409 gis.read(result); 410 assert(data == result); 411 } 412 413 private int zlibEnforce(int result) 414 @safe { 415 switch (result) { 416 default: 417 if (result < 0) throw new Exception("unknown zlib error"); 418 else return result; 419 case Z_ERRNO: throw new Exception("zlib errno error"); 420 case Z_STREAM_ERROR: throw new Exception("zlib stream error"); 421 case Z_DATA_ERROR: throw new Exception("zlib data error"); 422 case Z_MEM_ERROR: throw new Exception("zlib memory error"); 423 case Z_BUF_ERROR: throw new Exception("zlib buffer error"); 424 case Z_VERSION_ERROR: throw new Exception("zlib version error"); 425 } 426 }