1 /** 2 Stream interface for passing data between different tasks. 3 4 Copyright: © 2013 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.stream.taskpipe; 9 10 public import vibe.core.stream; 11 12 import core.sync.mutex; 13 import core.time; 14 import std.algorithm : min; 15 import std.exception; 16 import vibe.core.core; 17 import vibe.core.sync; 18 import vibe.utils.array; 19 20 21 /** 22 Implements a unidirectional data pipe between two tasks. 23 */ 24 final class TaskPipe : ConnectionStream { 25 private { 26 TaskPipeImpl m_pipe; 27 } 28 29 /** Constructs a new pipe ready for use. 30 */ 31 this(bool grow_when_full = false) 32 { 33 m_pipe = new TaskPipeImpl(grow_when_full); 34 } 35 36 /// Size of the (fixed) FIFO buffer used to transfer data between tasks 37 @property size_t bufferSize() const { return m_pipe.bufferSize; } 38 /// ditto 39 @property void bufferSize(size_t nbytes) { m_pipe.bufferSize = nbytes; } 40 41 @property bool empty() { return leastSize() == 0; } 42 @property ulong leastSize() { m_pipe.waitForData(); return m_pipe.fill; } 43 @property bool dataAvailableForRead() { return m_pipe.fill > 0; } 44 @property bool connected() const { return m_pipe.open; } 45 46 void close() { m_pipe.close(); } 47 bool waitForData(Duration timeout) 48 { 49 if (dataAvailableForRead) return true; 50 m_pipe.waitForData(timeout); 51 return dataAvailableForRead; 52 } 53 const(ubyte)[] peek() { return m_pipe.peek; } 54 size_t read(scope ubyte[] dst, IOMode mode) { return m_pipe.read(dst, mode); } 55 alias read = ConnectionStream.read; 56 static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) { 57 override size_t write(scope const(ubyte)[] bytes, IOMode mode) { return m_pipe.write(bytes, mode); } 58 } else { 59 override size_t write(in ubyte[] bytes, IOMode mode) { return m_pipe.write(bytes, mode); } 60 } 61 alias write = ConnectionStream.write; 62 void flush() {} 63 void finalize() { m_pipe.close(); } 64 } 65 66 67 /** 68 Underyling pipe implementation for TaskPipe with no Stream interface. 69 */ 70 private final class TaskPipeImpl { 71 @safe: 72 73 private { 74 Mutex m_mutex; 75 InterruptibleTaskCondition m_condition; 76 vibe.utils.array.FixedRingBuffer!ubyte m_buffer; 77 bool m_closed = false; 78 bool m_growWhenFull; 79 } 80 81 /** Constructs a new pipe ready for use. 82 */ 83 this(bool grow_when_full = false) 84 { 85 m_mutex = new Mutex; 86 () @trusted { m_condition = new InterruptibleTaskCondition(m_mutex); } (); 87 m_buffer.capacity = 2048; 88 m_growWhenFull = grow_when_full; 89 } 90 91 /// Size of the (fixed) buffer used to transfer data between tasks 92 @property size_t bufferSize() const { return m_buffer.capacity; } 93 /// ditto 94 @property void bufferSize(size_t nbytes) { m_buffer.capacity = nbytes; } 95 96 /// Number of bytes currently in the transfer buffer 97 @property size_t fill() 98 const { 99 synchronized (m_mutex) { 100 return m_buffer.length; 101 } 102 } 103 104 @property bool open() const { return !m_closed; } 105 106 /** Closes the pipe. 107 */ 108 void close() 109 { 110 synchronized (m_mutex) m_closed = true; 111 () @trusted { m_condition.notifyAll(); } (); 112 } 113 114 /** Blocks until at least one byte of data has been written to the pipe. 115 */ 116 void waitForData(Duration timeout = Duration.max) 117 { 118 import std.datetime : Clock, SysTime, UTC; 119 bool have_timeout = timeout > 0.seconds && timeout != Duration.max; 120 SysTime now = Clock.currTime(UTC()); 121 SysTime timeout_target; 122 if (have_timeout) timeout_target = now + timeout; 123 124 synchronized (m_mutex) { 125 while (m_buffer.empty && !m_closed && (!have_timeout || now < timeout_target)) { 126 if (have_timeout) 127 () @trusted { m_condition.wait(timeout_target - now); } (); 128 else () @trusted { m_condition.wait(); } (); 129 now = Clock.currTime(UTC()); 130 } 131 } 132 } 133 134 /** Writes the given byte array to the pipe. 135 */ 136 size_t write(scope const(ubyte)[] data, IOMode mode) 137 { 138 size_t ret = 0; 139 140 enforce(!m_closed, "Writing to closed task pipe."); 141 142 while (data.length > 0){ 143 bool need_signal; 144 synchronized (m_mutex) { 145 if (m_growWhenFull && m_buffer.full) { 146 size_t new_sz = m_buffer.capacity; 147 while (new_sz - m_buffer.capacity < data.length) new_sz += 2; 148 m_buffer.capacity = new_sz; 149 } else while (m_buffer.full) { 150 if (mode == IOMode.immediate || mode == IOMode.once && ret > 0) 151 return ret; 152 if (m_closed) throw new Exception("Pipe closed while writing data"); 153 () @trusted { m_condition.wait(); } (); 154 } 155 156 need_signal = m_buffer.empty; 157 auto len = min(m_buffer.freeSpace, data.length); 158 m_buffer.put(data[0 .. len]); 159 data = data[len .. $]; 160 ret += len; 161 } 162 if (need_signal) () @trusted { m_condition.notifyAll(); } (); 163 } 164 if (!m_growWhenFull) vibe.core.core.yield(); 165 166 return ret; 167 } 168 169 /** Returns a temporary view of the beginning of the transfer buffer. 170 171 Note that a call to read invalidates this array slice. Blocks in case 172 of a filled up transfer buffer. 173 */ 174 const(ubyte[]) peek() 175 { 176 synchronized (m_mutex) { 177 return m_buffer.peek(); 178 } 179 } 180 181 /** Reads data into the supplied buffer. 182 183 Blocks until a sufficient amount of data has been written to the pipe. 184 */ 185 size_t read(scope ubyte[] dst, IOMode mode) 186 { 187 size_t ret = 0; 188 189 while (dst.length > 0) { 190 bool need_signal; 191 size_t len; 192 synchronized (m_mutex) { 193 while (m_buffer.empty && !m_closed) { 194 if (mode == IOMode.immediate || mode == IOMode.once && ret > 0) 195 return ret; 196 () @trusted { m_condition.wait(); } (); 197 } 198 199 need_signal = m_buffer.full; 200 enforce(!m_buffer.empty, "Reading past end of closed pipe."); 201 len = min(dst.length, m_buffer.length); 202 m_buffer.read(dst[0 .. len]); 203 ret += len; 204 } 205 if (need_signal) () @trusted { m_condition.notifyAll(); } (); 206 dst = dst[len .. $]; 207 } 208 vibe.core.core.yield(); 209 210 return ret; 211 } 212 } 213 214 unittest { // issue #1501 - deadlock in TaskPipe 215 import std.datetime : Clock, UTC; 216 import core.time : msecs; 217 218 // test read after write and write after read 219 foreach (i; 0 .. 2) { 220 auto p = new TaskPipe; 221 p.bufferSize = 2048; 222 223 Task a, b; 224 a = runTask({ 225 ubyte[2100] buf; 226 try { 227 if (i == 0) p.read(buf, IOMode.all); 228 else p.write(buf, IOMode.all); 229 } catch (Exception e) assert(false, e.msg); 230 }); 231 b = runTask({ 232 ubyte[2100] buf; 233 try { 234 if (i == 0) p.write(buf, IOMode.all); 235 else p.read(buf, IOMode.all); 236 } catch (Exception e) assert(false, e.msg); 237 }); 238 239 auto joiner = runTask({ 240 try { 241 auto starttime = Clock.currTime(UTC()); 242 while (a.running || b.running) { 243 if (Clock.currTime(UTC()) - starttime > 500.msecs) 244 assert(false, "TaskPipe is dead locked."); 245 yield(); 246 } 247 } catch (Exception e) assert(false, e.msg); 248 }); 249 250 joiner.join(); 251 } 252 } 253 254 unittest { // issue # 255 auto t = runTask({ 256 try { 257 auto tp = new TaskPipeImpl; 258 tp.waitForData(10.msecs); 259 exitEventLoop(); 260 } catch (Exception e) assert(false, e.msg); 261 }); 262 runTask({ 263 sleepUninterruptible(500.msecs); 264 assert(!t.running, "TaskPipeImpl.waitForData didn't timeout."); 265 }); 266 runEventLoop(); 267 }