1 /** 2 Standard I/O streams 3 4 Copyright: © 2014 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig, Eric Cornelius 7 */ 8 module vibe.stream.stdio; 9 10 import vibe.core.core; 11 import vibe.core.log; 12 import vibe.core.stream; 13 import vibe.stream.taskpipe; 14 15 import std.stdio; 16 import core.thread; 17 18 import std.exception; 19 20 class StdFileStream : ConnectionStream { 21 private { 22 std.stdio.File m_file; 23 TaskPipe m_readPipe; 24 TaskPipe m_writePipe; 25 Thread m_readThread; 26 Thread m_writeThread; 27 } 28 29 this(bool read, bool write) 30 { 31 if (read) m_readPipe = new TaskPipe; 32 if (write) m_writePipe = new TaskPipe; 33 } 34 35 void setup(std.stdio.File file) 36 { 37 m_file = file; 38 39 if (m_readPipe) { 40 m_readThread = new Thread(&readThreadFunc); 41 m_readThread.name = "StdFileStream reader"; 42 m_readThread.start(); 43 } 44 45 if (m_writePipe) { 46 m_writeThread = new Thread(&writeThreadFunc); 47 m_writeThread.name = "StdFileStream writer"; 48 m_writeThread.start(); 49 } 50 } 51 52 @property std.stdio.File stdFile() { return m_file; } 53 54 override @property bool empty() { enforceReadable(); return m_readPipe.empty; } 55 56 override @property ulong leastSize() 57 { 58 enforceReadable(); 59 return m_readPipe.leastSize; 60 } 61 62 override @property bool dataAvailableForRead() 63 { 64 enforceReadable(); 65 return m_readPipe.dataAvailableForRead; 66 } 67 68 override @property bool connected() const { return m_readPipe.connected; } 69 70 override void close() { m_writePipe.close(); } 71 72 override bool waitForData(Duration timeout) { return m_readPipe.waitForData(timeout); } 73 74 override const(ubyte)[] peek() 75 { 76 enforceReadable(); 77 return m_readPipe.peek(); 78 } 79 80 override size_t read(scope ubyte[] dst, IOMode mode) 81 { 82 enforceReadable(); 83 return m_readPipe.read(dst, mode); 84 } 85 86 alias read = ConnectionStream.read; 87 88 override size_t write(in ubyte[] bytes_, IOMode mode) 89 { 90 enforceWritable(); 91 return m_writePipe.write(bytes_, mode); 92 } 93 94 alias write = ConnectionStream.write; 95 96 override void flush() 97 { 98 enforceWritable(); 99 m_writePipe.flush(); 100 } 101 102 override void finalize() 103 { 104 enforceWritable(); 105 if (!m_writePipe.connected) return; 106 flush(); 107 m_writePipe.finalize(); 108 } 109 110 void enforceReadable() @safe { enforce(m_readPipe, "Stream is not readable!"); } 111 void enforceWritable() @safe { enforce(m_writePipe, "Stream is not writable!"); } 112 113 private void readThreadFunc() 114 { 115 bool loop_flag = false; 116 runTask(() nothrow { 117 ubyte[1] buf; 118 try while (!m_file.eof) { 119 auto data = m_file.rawRead(buf); 120 if (!data.length) break; 121 m_readPipe.write(data, IOMode.all); 122 vibe.core.core.yield(); 123 } 124 catch (Exception e) logException!(LogLevel.diagnostic)(e, "Failed to read from File"); 125 126 try { 127 if (m_file.isOpen) m_file.close(); 128 } catch (Exception e) logException(e, "Failed to close File"); 129 try m_readPipe.finalize(); 130 catch (Exception e) assert(false, e.msg); 131 if (loop_flag) exitEventLoop(); 132 else loop_flag = true; 133 }); 134 if (!loop_flag) { 135 loop_flag = true; 136 runEventLoop(); 137 } 138 } 139 140 private void writeThreadFunc() 141 { 142 import std.algorithm : min; 143 144 bool loop_flag = false; 145 runTask(() nothrow { 146 ubyte[1024] buf; 147 try while (m_file.isOpen && !m_writePipe.empty) { 148 auto len = min(buf.length, m_writePipe.leastSize); 149 if (!len) break; 150 m_writePipe.read(buf[0 .. len], IOMode.all); 151 m_file.rawWrite(buf[0 .. len]); 152 vibe.core.core.yield(); 153 } 154 catch (Exception e) logException!(LogLevel.diagnostic)(e, "Failed to write to File"); 155 156 try { 157 if (m_file.isOpen) m_file.close(); 158 } catch (Exception e) logException(e, "Failed to close File"); 159 if (loop_flag) exitEventLoop(); 160 else loop_flag = true; 161 }); 162 if (!loop_flag) { 163 loop_flag = true; 164 runEventLoop(); 165 } 166 } 167 } 168 169 /** 170 OutputStream that writes to stdout 171 */ 172 final class StdoutStream : StdFileStream { 173 this() { 174 super(false, true); 175 setup(stdout); 176 } 177 } 178 179 /** 180 OutputStream that writes to stderr 181 */ 182 final class StderrStream : StdFileStream { 183 this() { 184 super(false, true); 185 setup(stderr); 186 } 187 } 188 189 /** 190 InputStream that reads from stdin 191 */ 192 final class StdinStream : StdFileStream { 193 this() { 194 super(true, false); 195 setup(stdin); 196 } 197 }