1 /** 2 Standard I/O streams 3 4 Copyright: © 2014 RejectedSoftware e.K. 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig, Eric Cornelius 7 */ 8 module vibe.stream.stdio; 9 10 import vibe.core.core; 11 import vibe.core.stream; 12 import vibe.stream.taskpipe; 13 14 import std.stdio; 15 import core.thread; 16 17 import std.exception; 18 19 class StdFileStream : ConnectionStream { 20 private { 21 std.stdio.File m_file; 22 TaskPipe m_readPipe; 23 TaskPipe m_writePipe; 24 Thread m_readThread; 25 Thread m_writeThread; 26 } 27 28 this(bool read, bool write) 29 { 30 if (read) m_readPipe = new TaskPipe; 31 if (write) m_writePipe = new TaskPipe; 32 } 33 34 void setup(std.stdio.File file) 35 { 36 m_file = file; 37 38 if (m_readPipe) { 39 m_readThread = new Thread(&readThreadFunc); 40 m_readThread.name = "StdFileStream reader"; 41 m_readThread.start(); 42 } 43 44 if (m_writePipe) { 45 m_writeThread = new Thread(&writeThreadFunc); 46 m_writeThread.name = "StdFileStream writer"; 47 m_writeThread.start(); 48 } 49 } 50 51 override @property bool empty() { enforceReadable(); return m_readPipe.empty; } 52 53 override @property ulong leastSize() 54 { 55 enforceReadable(); 56 return m_readPipe.leastSize; 57 } 58 59 override @property bool dataAvailableForRead() 60 { 61 enforceReadable(); 62 return m_readPipe.dataAvailableForRead; 63 } 64 65 override @property bool connected() const { return m_readPipe.connected; } 66 67 override void close() { m_writePipe.close(); } 68 69 override bool waitForData(Duration timeout) { return m_readPipe.waitForData(timeout); } 70 71 override const(ubyte)[] peek() 72 { 73 enforceReadable(); 74 return m_readPipe.peek(); 75 } 76 77 override size_t read(scope ubyte[] dst, IOMode mode) 78 { 79 enforceReadable(); 80 return m_readPipe.read(dst, mode); 81 } 82 83 alias read = ConnectionStream.read; 84 85 override size_t write(in ubyte[] bytes_, IOMode mode) 86 { 87 enforceWritable(); 88 return m_writePipe.write(bytes_, mode); 89 } 90 91 alias write = ConnectionStream.write; 92 93 override void flush() 94 { 95 enforceWritable(); 96 m_writePipe.flush(); 97 } 98 99 override void finalize() 100 { 101 enforceWritable(); 102 if (!m_writePipe.connected) return; 103 flush(); 104 m_writePipe.finalize(); 105 } 106 107 void enforceReadable() @safe { enforce(m_readPipe, "Stream is not readable!"); } 108 void enforceWritable() @safe { enforce(m_writePipe, "Stream is not writable!"); } 109 110 private void readThreadFunc() 111 { 112 bool loop_flag = false; 113 runTask({ 114 ubyte[1] buf; 115 scope(exit) { 116 if (m_file.isOpen) m_file.close(); 117 m_readPipe.finalize(); 118 if (loop_flag) exitEventLoop(); 119 else loop_flag = true; 120 } 121 while (!m_file.eof) { 122 auto data = m_file.rawRead(buf); 123 if (!data.length) break; 124 m_readPipe.write(data, IOMode.all); 125 vibe.core.core.yield(); 126 } 127 }); 128 if (!loop_flag) { 129 loop_flag = true; 130 runEventLoop(); 131 } 132 } 133 134 private void writeThreadFunc() 135 { 136 import std.algorithm : min; 137 138 bool loop_flag = false; 139 runTask({ 140 ubyte[1024] buf; 141 scope(exit) { 142 if (m_file.isOpen) m_file.close(); 143 if (loop_flag) exitEventLoop(); 144 else loop_flag = true; 145 } 146 while (m_file.isOpen && !m_writePipe.empty) { 147 auto len = min(buf.length, m_writePipe.leastSize); 148 if (!len) break; 149 m_writePipe.read(buf[0 .. len], IOMode.all); 150 m_file.rawWrite(buf[0 .. len]); 151 vibe.core.core.yield(); 152 } 153 }); 154 if (!loop_flag) { 155 loop_flag = true; 156 runEventLoop(); 157 } 158 } 159 } 160 161 /** 162 OutputStream that writes to stdout 163 */ 164 final class StdoutStream : StdFileStream { 165 this() { 166 super(false, true); 167 setup(stdout); 168 } 169 } 170 171 /** 172 OutputStream that writes to stderr 173 */ 174 final class StderrStream : StdFileStream { 175 this() { 176 super(false, true); 177 setup(stderr); 178 } 179 } 180 181 /** 182 InputStream that reads from stdin 183 */ 184 final class StdinStream : StdFileStream { 185 this() { 186 super(true, false); 187 setup(stdin); 188 } 189 }