1 /** 2 Zlib input/output streams 3 4 Copyright: © 2012-2013 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.stream.zlib; 9 10 import vibe.core.stream; 11 import vibe.utils.array; 12 import vibe.internal.freelistref; 13 import vibe.internal.interfaceproxy : InterfaceProxy, interfaceProxy; 14 15 import std.algorithm; 16 import std.exception; 17 import etc.c.zlib; 18 19 import vibe.core.log; 20 21 22 /** Creates a new deflate uncompression stream. 23 */ 24 ZlibInputStream createDeflateInputStream(InputStream)(InputStream source) @safe 25 if (isInputStream!InputStream) 26 { 27 return new ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.deflate, true); 28 } 29 30 /// private 31 FreeListRef!ZlibInputStream createDeflateInputStreamFL(InputStream)(InputStream source) @safe 32 if (isInputStream!InputStream) 33 { 34 return FreeListRef!ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.deflate, true); 35 } 36 37 /** Creates a new deflate compression stream. 38 */ 39 ZlibOutputStream createDeflateOutputStream(OutputStream)(OutputStream destination) @safe 40 if (isOutputStream!OutputStream) 41 { 42 return new ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.deflate, Z_DEFAULT_COMPRESSION, true); 43 } 44 45 /// private 46 FreeListRef!ZlibOutputStream createDeflateOutputStreamFL(OutputStream)(OutputStream destination) @safe 47 if (isOutputStream!OutputStream) 48 { 49 return FreeListRef!ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.deflate, Z_DEFAULT_COMPRESSION, true); 50 } 51 52 /** Creates a new deflate uncompression stream. 53 */ 54 ZlibInputStream createGzipInputStream(InputStream)(InputStream source) @safe 55 if (isInputStream!InputStream) 56 { 57 return new ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.gzip, true); 58 } 59 60 /// private 61 FreeListRef!ZlibInputStream createGzipInputStreamFL(InputStream)(InputStream source) @safe 62 if (isInputStream!InputStream) 63 { 64 return FreeListRef!ZlibInputStream(interfaceProxy!(.InputStream)(source), ZlibInputStream.HeaderFormat.gzip, true); 65 } 66 67 /** Creates a new deflate uncompression stream. 68 */ 69 ZlibOutputStream createGzipOutputStream(OutputStream)(OutputStream destination) @safe 70 if (isOutputStream!OutputStream) 71 { 72 return new ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.gzip, Z_DEFAULT_COMPRESSION, true); 73 } 74 75 /// private 76 FreeListRef!ZlibOutputStream createGzipOutputStreamFL(OutputStream)(OutputStream destination) @safe 77 if (isOutputStream!OutputStream) 78 { 79 return FreeListRef!ZlibOutputStream(interfaceProxy!(.OutputStream)(destination), ZlibOutputStream.HeaderFormat.gzip, Z_DEFAULT_COMPRESSION, true); 80 } 81 82 83 /** 84 Generic zlib output stream. 85 */ 86 class ZlibOutputStream : OutputStream { 87 @safe: 88 89 private { 90 InterfaceProxy!OutputStream m_out; 91 z_stream m_zstream; 92 ubyte[1024] m_outbuffer; 93 //ubyte[4096] m_inbuffer; 94 bool m_finalized = false; 95 } 96 97 enum HeaderFormat { 98 gzip, 99 deflate 100 } 101 102 /// private 103 this(InterfaceProxy!OutputStream dst, HeaderFormat type, int level, bool dummy) 104 { 105 m_out = dst; 106 zlibEnforce(() @trusted { return deflateInit2(&m_zstream, level, Z_DEFLATED, 15 + (type == HeaderFormat.gzip ? 16 : 0), 8, Z_DEFAULT_STRATEGY); } ()); 107 } 108 109 ~this() { 110 if (!m_finalized) 111 () @trusted { deflateEnd(&m_zstream); } (); 112 } 113 114 static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) { 115 final override size_t write(scope const(ubyte)[] bytes_, IOMode mode) { return doWrite(bytes_, mode); } 116 } else { 117 final override size_t write(in ubyte[] bytes_, IOMode mode) { return doWrite(bytes_, mode); } 118 } 119 120 alias write = OutputStream.write; 121 122 private size_t doWrite(scope const(ubyte)[] data, IOMode mode) 123 { 124 // TODO: support IOMode! 125 if (!data.length) return 0; 126 assert(!m_finalized); 127 assert(m_zstream.avail_in == 0); 128 m_zstream.next_in = () @trusted { return cast(ubyte*)data.ptr; } (); 129 assert(data.length < uint.max); 130 m_zstream.avail_in = cast(uint)data.length; 131 doFlush(Z_NO_FLUSH); 132 assert(m_zstream.avail_in == 0); 133 m_zstream.next_in = null; 134 return data.length; 135 } 136 137 final void flush() 138 { 139 assert(!m_finalized); 140 //doFlush(Z_SYNC_FLUSH); 141 m_out.flush(); 142 } 143 144 final void finalize() 145 { 146 if (m_finalized) return; 147 m_finalized = true; 148 doFlush(Z_FINISH); 149 m_out.flush(); 150 zlibEnforce(() @trusted { return deflateEnd(&m_zstream); }()); 151 } 152 153 private final void doFlush(int how) 154 @safe { 155 while (true) { 156 m_zstream.next_out = &m_outbuffer[0]; 157 m_zstream.avail_out = cast(uint)m_outbuffer.length; 158 //logInfo("deflate %s -> %s (%s)", m_zstream.avail_in, m_zstream.avail_out, how); 159 auto ret = () @trusted { return deflate(&m_zstream, how); } (); 160 //logInfo(" ... %s -> %s", m_zstream.avail_in, m_zstream.avail_out); 161 switch (ret) { 162 default: 163 zlibEnforce(ret); 164 assert(false, "Unknown return value for zlib deflate."); 165 case Z_OK: 166 assert(m_zstream.avail_out < m_outbuffer.length || m_zstream.avail_in == 0); 167 m_out.write(m_outbuffer[0 .. m_outbuffer.length - m_zstream.avail_out]); 168 break; 169 case Z_BUF_ERROR: 170 assert(m_zstream.avail_in == 0); 171 return; 172 case Z_STREAM_END: 173 assert(how == Z_FINISH); 174 m_out.write(m_outbuffer[0 .. m_outbuffer.length - m_zstream.avail_out]); 175 return; 176 } 177 } 178 } 179 } 180 181 182 unittest { 183 import vibe.stream.memory; 184 import vibe.stream.operations; 185 186 auto raw = cast(ubyte[])"Hello, World!\n".dup; 187 ubyte[] gzip = [ 188 0x1F, 0x8B, 0x08, 0x08, 0xAF, 0x12, 0x42, 0x56, 0x00, 0x03, 0x74, 0x65, 0x73, 0x74, 0x2E, 0x74, 189 0x78, 0x74, 0x00, 0xF3, 0x48, 0xCD, 0xC9, 0xC9, 0xD7, 0x51, 0x08, 0xCF, 0x2F, 0xCA, 0x49, 0x51, 190 0xE4, 0x02, 0x00, 0x84, 0x9E, 0xE8, 0xB4, 0x0E, 0x00, 0x00, 0x00]; 191 192 auto gzipin = createGzipInputStream(createMemoryStream(gzip)); 193 assert(gzipin.readAll() == raw); 194 } 195 196 unittest { 197 import vibe.stream.memory; 198 import vibe.stream.operations; 199 200 ubyte[] gzip_partial = [ 201 0x1F, 0x8B, 0x08, 0x08, 0xAF, 0x12, 0x42, 0x56, 0x00, 0x03, 0x74, 0x65, 0x73, 0x74, 0x2E, 0x74, 202 0x78, 0x74, 0x00, 0xF3, 0x48, 0xCD, 0xC9, 0xC9, 0xD7, 0x51, 0x08, 0xCF, 0x2F, 0xCA, 0x49, 0x51, 203 ]; 204 205 auto gzipin = createGzipInputStream(createMemoryStream(gzip_partial)); 206 try { 207 gzipin.readAll(); 208 assert(false, "Expected exception."); 209 } catch (Exception e) {} 210 assert(gzipin.empty); 211 } 212 213 /** 214 Generic zlib input stream. 215 */ 216 class ZlibInputStream : InputStream { 217 @safe: 218 219 import std.zlib; 220 private { 221 InterfaceProxy!InputStream m_in; 222 z_stream m_zstream; 223 FixedRingBuffer!(ubyte, 4096) m_outbuffer; 224 ubyte[1024] m_inbuffer; 225 bool m_finished = false; 226 ulong m_ninflated, n_read; 227 } 228 229 enum HeaderFormat { 230 gzip, 231 deflate, 232 automatic 233 } 234 235 /// private 236 this(InterfaceProxy!InputStream src, HeaderFormat type, bool dummy) 237 { 238 m_in = src; 239 if (m_in.empty) { 240 m_finished = true; 241 } else { 242 int wndbits = 15; 243 if(type == HeaderFormat.gzip) wndbits += 16; 244 else if(type == HeaderFormat.automatic) wndbits += 32; 245 zlibEnforce(() @trusted { return inflateInit2(&m_zstream, wndbits); } ()); 246 readChunk(); 247 } 248 } 249 250 ~this() { 251 if (!m_finished) 252 () @trusted { inflateEnd(&m_zstream); } (); 253 } 254 255 @property bool empty() { return this.leastSize == 0; } 256 257 @property ulong leastSize() 258 { 259 assert(!m_finished || m_in.empty, "Input contains more data than expected."); 260 if (m_outbuffer.length > 0) return m_outbuffer.length; 261 if (m_finished) return 0; 262 readChunk(); 263 assert(m_outbuffer.length || m_finished); 264 265 return m_outbuffer.length; 266 } 267 268 @property bool dataAvailableForRead() 269 { 270 return m_outbuffer.length > 0; 271 } 272 273 const(ubyte)[] peek() { return m_outbuffer.peek(); } 274 275 size_t read(scope ubyte[] dst, IOMode mode) 276 { 277 enforce(dst.length == 0 || !empty, "Reading empty stream"); 278 279 size_t nread = 0; 280 281 while (dst.length > 0) { 282 auto len = min(m_outbuffer.length, dst.length); 283 m_outbuffer.read(dst[0 .. len]); 284 dst = dst[len .. $]; 285 286 nread += len; 287 288 if (!m_outbuffer.length && !m_finished) { 289 if (mode == IOMode.immediate || mode == IOMode.once && !nread) 290 break; 291 readChunk(); 292 } 293 enforce(dst.length == 0 || m_outbuffer.length || !m_finished, "Reading past end of zlib stream."); 294 } 295 296 return nread; 297 } 298 299 alias read = InputStream.read; 300 301 private void readChunk() 302 @safe { 303 assert(m_outbuffer.length == 0, "Buffer must be empty to read the next chunk."); 304 assert(m_outbuffer.peekDst().length > 0); 305 enforce (!m_finished, "Reading past end of zlib stream."); 306 307 m_zstream.next_out = &m_outbuffer.peekDst()[0]; 308 m_zstream.avail_out = cast(uint)m_outbuffer.peekDst().length; 309 310 while (!m_outbuffer.length) { 311 if (m_zstream.avail_in == 0) { 312 auto clen = min(m_inbuffer.length, m_in.leastSize); 313 if (clen == 0) { 314 m_finished = true; 315 throw new Exception("Premature end of compressed input."); 316 } 317 m_in.read(m_inbuffer[0 .. clen]); 318 m_zstream.next_in = &m_inbuffer[0]; 319 m_zstream.avail_in = cast(uint)clen; 320 } 321 auto avins = m_zstream.avail_in; 322 //logInfo("inflate %s -> %s (@%s in @%s)", m_zstream.avail_in, m_zstream.avail_out, m_ninflated, n_read); 323 auto ret = zlibEnforce(() @trusted { return inflate(&m_zstream, Z_SYNC_FLUSH); } ()); 324 //logInfo(" ... %s -> %s", m_zstream.avail_in, m_zstream.avail_out); 325 assert(m_zstream.avail_out != m_outbuffer.peekDst.length || m_zstream.avail_in != avins); 326 m_ninflated += m_outbuffer.peekDst().length - m_zstream.avail_out; 327 n_read += avins - m_zstream.avail_in; 328 m_outbuffer.putN(m_outbuffer.peekDst().length - m_zstream.avail_out); 329 assert(m_zstream.avail_out == 0 || m_zstream.avail_out == m_outbuffer.peekDst().length); 330 331 if (ret == Z_STREAM_END) { 332 m_finished = true; 333 zlibEnforce(() @trusted { return inflateEnd(&m_zstream); }()); 334 enforce(m_in.empty, "Extra data after end of compressed input."); 335 return; 336 } 337 } 338 } 339 } 340 341 unittest { 342 import vibe.stream.memory; 343 344 auto data = new ubyte[5000]; 345 346 auto mos = createMemoryOutputStream(); 347 auto gos = createGzipOutputStream(mos); 348 gos.write(data); 349 gos.finalize(); 350 351 auto ms = createMemoryStream(mos.data, false); 352 auto gis = createGzipInputStream(ms); 353 354 auto result = new ubyte[data.length]; 355 gis.read(result); 356 assert(data == result); 357 } 358 359 private int zlibEnforce(int result) 360 @safe { 361 switch (result) { 362 default: 363 if (result < 0) throw new Exception("unknown zlib error"); 364 else return result; 365 case Z_ERRNO: throw new Exception("zlib errno error"); 366 case Z_STREAM_ERROR: throw new Exception("zlib stream error"); 367 case Z_DATA_ERROR: throw new Exception("zlib data error"); 368 case Z_MEM_ERROR: throw new Exception("zlib memory error"); 369 case Z_BUF_ERROR: throw new Exception("zlib buffer error"); 370 case Z_VERSION_ERROR: throw new Exception("zlib version error"); 371 } 372 }