1 /**
2 	Standard I/O streams
3 
4 	Copyright: © 2014 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Sönke Ludwig, Eric Cornelius
7 */
8 module vibe.stream.stdio;
9 
10 import vibe.core.core;
11 import vibe.core.log;
12 import vibe.core.stream;
13 import vibe.stream.taskpipe;
14 
15 import std.stdio;
16 import core.thread;
17 
18 import std.exception;
19 
20 class StdFileStream : ConnectionStream {
21 	private {
22 		std.stdio.File m_file;
23 		TaskPipe m_readPipe;
24 		TaskPipe m_writePipe;
25 		Thread m_readThread;
26 		Thread m_writeThread;
27 	}
28 
29 	this(bool read, bool write)
30 	{
31 		if (read) m_readPipe = new TaskPipe;
32 		if (write) m_writePipe = new TaskPipe;
33 	}
34 
35 	void setup(std.stdio.File file)
36 	{
37 		m_file = file;
38 
39 		if (m_readPipe) {
40 			m_readThread = new Thread(&readThreadFunc);
41 			m_readThread.name = "StdFileStream reader";
42 			m_readThread.start();
43 		}
44 
45 		if (m_writePipe) {
46 			m_writeThread = new Thread(&writeThreadFunc);
47 			m_writeThread.name = "StdFileStream writer";
48 			m_writeThread.start();
49 		}
50 	}
51 
52 	@property std.stdio.File stdFile() { return m_file; }
53 
54 	override @property bool empty() { enforceReadable(); return m_readPipe.empty; }
55 
56 	override @property ulong leastSize()
57 	{
58 		enforceReadable();
59 		return m_readPipe.leastSize;
60 	}
61 
62 	override @property bool dataAvailableForRead()
63 	{
64 		enforceReadable();
65 		return m_readPipe.dataAvailableForRead;
66 	}
67 
68 	override @property bool connected() const { return m_readPipe.connected; }
69 
70 	override void close() { m_writePipe.close(); }
71 
72 	override bool waitForData(Duration timeout) { return m_readPipe.waitForData(timeout); }
73 
74 	override const(ubyte)[] peek()
75 	{
76 		enforceReadable();
77 		return m_readPipe.peek();
78 	}
79 
80 	override size_t read(scope ubyte[] dst, IOMode mode)
81 	{
82 		enforceReadable();
83 		return m_readPipe.read(dst, mode);
84 	}
85 
86 	alias read = ConnectionStream.read;
87 
88 	static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) {
89 		override size_t write(scope const(ubyte)[] bytes_, IOMode mode) { return doWrite(bytes_, mode); }
90 	} else {
91 		override size_t write(in ubyte[] bytes_, IOMode mode) { return doWrite(bytes_, mode); }
92 	}
93 
94 	alias write = ConnectionStream.write;
95 
96 	private size_t doWrite(scope const(ubyte)[] bytes_, IOMode mode)
97 	@safe {
98 		enforceWritable();
99 		return m_writePipe.write(bytes_, mode);
100 	}
101 
102 	override void flush()
103 	{
104 		enforceWritable();
105 		m_writePipe.flush();
106 	}
107 
108 	override void finalize()
109 	{
110 		enforceWritable();
111 		if (!m_writePipe.connected) return;
112 		flush();
113 		m_writePipe.finalize();
114 	}
115 
116 	void enforceReadable() @safe { enforce(m_readPipe, "Stream is not readable!"); }
117 	void enforceWritable() @safe { enforce(m_writePipe, "Stream is not writable!"); }
118 
119 	private void readThreadFunc()
120 	{
121 		bool loop_flag = false;
122 		runTask(() nothrow {
123 			ubyte[1] buf;
124 			try while (!m_file.eof) {
125 				auto data = m_file.rawRead(buf);
126 				if (!data.length) break;
127 				m_readPipe.write(data, IOMode.all);
128 				vibe.core.core.yield();
129 			}
130 			catch (Exception e) logException!(LogLevel.diagnostic)(e, "Failed to read from File");
131 
132 			try {
133 				if (m_file.isOpen) m_file.close();
134 			} catch (Exception e) logException(e, "Failed to close File");
135 			try m_readPipe.finalize();
136 			catch (Exception e) assert(false, e.msg);
137 			if (loop_flag) exitEventLoop();
138 			else loop_flag = true;
139 		});
140 		if (!loop_flag) {
141 			loop_flag = true;
142 			runEventLoop();
143 		}
144 	}
145 
146 	private void writeThreadFunc()
147 	{
148 		import std.algorithm : min;
149 
150 		bool loop_flag = false;
151 		runTask(() nothrow {
152 			ubyte[1024] buf;
153 			try while (m_file.isOpen && !m_writePipe.empty) {
154 				auto len = min(buf.length, m_writePipe.leastSize);
155 				if (!len) break;
156 				m_writePipe.read(buf[0 .. len], IOMode.all);
157 				m_file.rawWrite(buf[0 .. len]);
158 				vibe.core.core.yield();
159 			}
160 			catch (Exception e) logException!(LogLevel.diagnostic)(e, "Failed to write to File");
161 
162 			try {
163 				if (m_file.isOpen) m_file.close();
164 			} catch (Exception e) logException(e, "Failed to close File");
165 			if (loop_flag) exitEventLoop();
166 			else loop_flag = true;
167 		});
168 		if (!loop_flag) {
169 			loop_flag = true;
170 			runEventLoop();
171 		}
172 	}
173 }
174 
175 /**
176 	OutputStream that writes to stdout
177 */
178 final class StdoutStream : StdFileStream {
179 	this() {
180 		super(false, true);
181 		setup(stdout);
182 	}
183 }
184 
185 /**
186 	OutputStream that writes to stderr
187 */
188 final class StderrStream : StdFileStream {
189 	this() {
190 		super(false, true);
191 		setup(stderr);
192 	}
193 }
194 
195 /**
196 	InputStream that reads from stdin
197 */
198 final class StdinStream : StdFileStream {
199 	this() {
200 		super(true, false);
201 		setup(stdin);
202 	}
203 }