1 /** 2 Stream interface for passing data between different tasks. 3 4 Copyright: © 2013 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Sönke Ludwig 7 */ 8 module vibe.stream.taskpipe; 9 10 public import vibe.core.stream; 11 12 import core.sync.mutex; 13 import core.time; 14 import std.algorithm : min; 15 import std.exception; 16 import vibe.core.core; 17 import vibe.core.sync; 18 import vibe.utils.array; 19 20 21 /** 22 Implements a unidirectional data pipe between two tasks. 23 */ 24 final class TaskPipe : ConnectionStream { 25 private { 26 TaskPipeImpl m_pipe; 27 } 28 29 /** Constructs a new pipe ready for use. 30 */ 31 this(bool grow_when_full = false) 32 { 33 m_pipe = new TaskPipeImpl(grow_when_full); 34 } 35 36 /// Size of the (fixed) FIFO buffer used to transfer data between tasks 37 @property size_t bufferSize() const { return m_pipe.bufferSize; } 38 /// ditto 39 @property void bufferSize(size_t nbytes) { m_pipe.bufferSize = nbytes; } 40 41 @property bool empty() { return leastSize() == 0; } 42 @property ulong leastSize() { m_pipe.waitForData(); return m_pipe.fill; } 43 @property bool dataAvailableForRead() { return m_pipe.fill > 0; } 44 @property bool connected() const { return m_pipe.open; } 45 46 void close() { m_pipe.close(); } 47 bool waitForData(Duration timeout) 48 { 49 if (dataAvailableForRead) return true; 50 m_pipe.waitForData(timeout); 51 return dataAvailableForRead; 52 } 53 const(ubyte)[] peek() { return m_pipe.peek; } 54 size_t read(scope ubyte[] dst, IOMode mode) { return m_pipe.read(dst, mode); } 55 alias read = ConnectionStream.read; 56 size_t write(in ubyte[] bytes, IOMode mode) { return m_pipe.write(bytes, mode); } 57 alias write = ConnectionStream.write; 58 void flush() {} 59 void finalize() { m_pipe.close(); } 60 } 61 62 63 /** 64 Underyling pipe implementation for TaskPipe with no Stream interface. 65 */ 66 private final class TaskPipeImpl { 67 @safe: 68 69 private { 70 Mutex m_mutex; 71 InterruptibleTaskCondition m_condition; 72 vibe.utils.array.FixedRingBuffer!ubyte m_buffer; 73 bool m_closed = false; 74 bool m_growWhenFull; 75 } 76 77 /** Constructs a new pipe ready for use. 78 */ 79 this(bool grow_when_full = false) 80 { 81 m_mutex = new Mutex; 82 () @trusted { m_condition = new InterruptibleTaskCondition(m_mutex); } (); 83 m_buffer.capacity = 2048; 84 m_growWhenFull = grow_when_full; 85 } 86 87 /// Size of the (fixed) buffer used to transfer data between tasks 88 @property size_t bufferSize() const { return m_buffer.capacity; } 89 /// ditto 90 @property void bufferSize(size_t nbytes) { m_buffer.capacity = nbytes; } 91 92 /// Number of bytes currently in the transfer buffer 93 @property size_t fill() 94 const { 95 synchronized (m_mutex) { 96 return m_buffer.length; 97 } 98 } 99 100 @property bool open() const { return !m_closed; } 101 102 /** Closes the pipe. 103 */ 104 void close() 105 { 106 synchronized (m_mutex) m_closed = true; 107 () @trusted { m_condition.notifyAll(); } (); 108 } 109 110 /** Blocks until at least one byte of data has been written to the pipe. 111 */ 112 void waitForData(Duration timeout = Duration.max) 113 { 114 import std.datetime : Clock, SysTime, UTC; 115 bool have_timeout = timeout > 0.seconds && timeout != Duration.max; 116 SysTime now = Clock.currTime(UTC()); 117 SysTime timeout_target; 118 if (have_timeout) timeout_target = now + timeout; 119 120 synchronized (m_mutex) { 121 while (m_buffer.empty && !m_closed && (!have_timeout || now < timeout_target)) { 122 if (have_timeout) 123 () @trusted { m_condition.wait(timeout_target - now); } (); 124 else () @trusted { m_condition.wait(); } (); 125 now = Clock.currTime(UTC()); 126 } 127 } 128 } 129 130 /** Writes the given byte array to the pipe. 131 */ 132 size_t write(const(ubyte)[] data, IOMode mode) 133 { 134 size_t ret = 0; 135 136 enforce(!m_closed, "Writing to closed task pipe."); 137 138 while (data.length > 0){ 139 bool need_signal; 140 synchronized (m_mutex) { 141 if (m_growWhenFull && m_buffer.full) { 142 size_t new_sz = m_buffer.capacity; 143 while (new_sz - m_buffer.capacity < data.length) new_sz += 2; 144 m_buffer.capacity = new_sz; 145 } else while (m_buffer.full) { 146 if (mode == IOMode.immediate || mode == IOMode.once && ret > 0) 147 return ret; 148 () @trusted { m_condition.wait(); } (); 149 } 150 151 need_signal = m_buffer.empty; 152 auto len = min(m_buffer.freeSpace, data.length); 153 m_buffer.put(data[0 .. len]); 154 data = data[len .. $]; 155 ret += len; 156 } 157 if (need_signal) () @trusted { m_condition.notifyAll(); } (); 158 } 159 if (!m_growWhenFull) vibe.core.core.yield(); 160 161 return ret; 162 } 163 164 /** Returns a temporary view of the beginning of the transfer buffer. 165 166 Note that a call to read invalidates this array slice. Blocks in case 167 of a filled up transfer buffer. 168 */ 169 const(ubyte[]) peek() 170 { 171 synchronized (m_mutex) { 172 return m_buffer.peek(); 173 } 174 } 175 176 /** Reads data into the supplied buffer. 177 178 Blocks until a sufficient amount of data has been written to the pipe. 179 */ 180 size_t read(scope ubyte[] dst, IOMode mode) 181 { 182 size_t ret = 0; 183 184 while (dst.length > 0) { 185 bool need_signal; 186 size_t len; 187 synchronized (m_mutex) { 188 while (m_buffer.empty && !m_closed) { 189 if (mode == IOMode.immediate || mode == IOMode.once && ret > 0) 190 return ret; 191 () @trusted { m_condition.wait(); } (); 192 } 193 194 need_signal = m_buffer.full; 195 enforce(!m_buffer.empty, "Reading past end of closed pipe."); 196 len = min(dst.length, m_buffer.length); 197 m_buffer.read(dst[0 .. len]); 198 ret += len; 199 } 200 if (need_signal) () @trusted { m_condition.notifyAll(); } (); 201 dst = dst[len .. $]; 202 } 203 vibe.core.core.yield(); 204 205 return ret; 206 } 207 } 208 209 unittest { // issue #1501 - deadlock in TaskPipe 210 import std.datetime : Clock, UTC; 211 import core.time : msecs; 212 213 // test read after write and write after read 214 foreach (i; 0 .. 2) { 215 auto p = new TaskPipe; 216 p.bufferSize = 2048; 217 218 Task a, b; 219 a = runTask({ ubyte[2100] buf; if (i == 0) p.read(buf, IOMode.all); else p.write(buf, IOMode.all); }); 220 b = runTask({ ubyte[2100] buf; if (i == 0) p.write(buf, IOMode.all); else p.read(buf, IOMode.all); }); 221 222 auto joiner = runTask({ 223 auto starttime = Clock.currTime(UTC()); 224 while (a.running || b.running) { 225 if (Clock.currTime(UTC()) - starttime > 500.msecs) 226 assert(false, "TaskPipe is dead locked."); 227 yield(); 228 } 229 }); 230 231 joiner.join(); 232 } 233 } 234 235 unittest { // issue # 236 auto t = runTask({ 237 auto tp = new TaskPipeImpl; 238 tp.waitForData(10.msecs); 239 exitEventLoop(); 240 }); 241 runTask({ 242 sleep(500.msecs); 243 assert(!t.running, "TaskPipeImpl.waitForData didn't timeout."); 244 }); 245 runEventLoop(); 246 }