1 /**
2 	Interruptible Task synchronization facilities
3 
4 	Copyright: © 2012-2015 RejectedSoftware e.K.
5 	Authors: Leonid Kramer, Sönke Ludwig, Manuel Frischknecht
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 */
8 module vibe.core.sync;
9 
10 import std.exception;
11 
12 import vibe.core.driver;
13 
14 import core.atomic;
15 import core.sync.mutex;
16 import core.sync.condition;
17 import std.stdio;
18 import std.traits : ReturnType;
19 
20 
21 enum LockMode {
22 	lock,
23 	tryLock,
24 	defer
25 }
26 
27 interface Lockable {
28 	@safe:
29 	void lock();
30 	void unlock();
31 	bool tryLock();
32 }
33 
34 /** RAII lock for the Mutex class.
35 */
36 struct ScopedMutexLock
37 {
38 @safe:
39 	@disable this(this);
40 	private {
41 		Mutex m_mutex;
42 		bool m_locked;
43 		LockMode m_mode;
44 	}
45 
46 	this(core.sync.mutex.Mutex mutex, LockMode mode = LockMode.lock) {
47 		assert(mutex !is null);
48 		m_mutex = mutex;
49 
50 		final switch (mode) {
51 			case LockMode.lock: lock(); break;
52 			case LockMode.tryLock: tryLock(); break;
53 			case LockMode.defer: break;
54 		}
55 	}
56 
57 	~this()
58 	{
59 		if( m_locked )
60 			m_mutex.unlock();
61 	}
62 
63 	@property bool locked() const { return m_locked; }
64 
65 	void unlock()
66 	{
67 		enforce(m_locked);
68 		m_mutex.unlock();
69 		m_locked = false;
70 	}
71 
72 	bool tryLock()
73 	{
74 		enforce(!m_locked);
75 		return m_locked = () @trusted { return m_mutex.tryLock(); } ();
76 	}
77 
78 	void lock()
79 	{
80 		enforce(!m_locked);
81 		m_locked = true;
82 		() @trusted { m_mutex.lock(); } ();
83 	}
84 }
85 
86 
87 /*
88 	Only for internal use:
89 	Ensures that a mutex is locked while executing the given procedure.
90 
91 	This function works for all kinds of mutexes, in particular for
92 	$(D core.sync.mutex.Mutex), $(D TaskMutex) and $(D InterruptibleTaskMutex).
93 
94 	Returns:
95 		Returns the value returned from $(D PROC), if any.
96 */
97 /// private
98 package(vibe) ReturnType!PROC performLocked(alias PROC, MUTEX)(MUTEX mutex)
99 {
100 	mutex.lock();
101 	scope (exit) mutex.unlock();
102 	return PROC();
103 }
104 
105 ///
106 unittest {
107 	int protected_var = 0;
108 	auto mtx = new TaskMutex;
109 	mtx.performLocked!({
110 		protected_var++;
111 	});
112 }
113 
114 
115 /**
116 	Thread-local semaphore implementation for tasks.
117 
118 	When the semaphore runs out of concurrent locks, it will suspend. This class
119 	is used in `vibe.core.connectionpool` to limit the number of concurrent
120 	connections.
121 */
122 class LocalTaskSemaphore
123 {
124 @safe:
125 
126 	// requires a queue
127 	import std.container.binaryheap;
128 	import std.container.array;
129 
130 	private {
131 		struct Waiter {
132 			ManualEvent signal;
133 			ubyte priority;
134 			uint seq;
135 		}
136 
137 		BinaryHeap!(Array!Waiter, asc) m_waiters;
138 		uint m_maxLocks;
139 		uint m_locks;
140 		uint m_seq;
141 	}
142 
143 	this(uint max_locks)
144 	{
145 		m_maxLocks = max_locks;
146 	}
147 
148 	/// Maximum number of concurrent locks
149 	@property void maxLocks(uint max_locks) { m_maxLocks = max_locks; }
150 	/// ditto
151 	@property uint maxLocks() const { return m_maxLocks; }
152 
153 	/// Number of concurrent locks still available
154 	@property uint available() const { return m_maxLocks - m_locks; }
155 
156 	/** Try to acquire a lock.
157 
158 		If a lock cannot be acquired immediately, returns `false` and leaves the
159 		semaphore in its previous state.
160 
161 		Returns:
162 			`true` is returned $(I iff) the number of available locks is greater
163 			than one.
164 	*/
165 	bool tryLock()
166 	{
167 		if (available > 0)
168 		{
169 			m_locks++;
170 			return true;
171 		}
172 		return false;
173 	}
174 
175 	/** Acquires a lock.
176 
177 		Once the limit of concurrent locks is reached, this method will block
178 		until the number of locks drops below the limit.
179 	*/
180 	void lock(ubyte priority = 0)
181 	{
182 		import std.algorithm : min;
183 
184 		if (tryLock())
185 			return;
186 
187 		Waiter w;
188 		w.signal = getEventDriver().createManualEvent();
189 		scope(exit)
190 			() @trusted { return destroy(w.signal); } ();
191 		w.priority = priority;
192 		w.seq = min(0, m_seq - w.priority);
193 		if (++m_seq == uint.max)
194 			rewindSeq();
195 
196 		() @trusted { m_waiters.insert(w); } ();
197 		w.signal.waitUninterruptible(w.signal.emitCount);
198 	}
199 
200 	/** Gives up an existing lock.
201 	*/
202 	void unlock()
203 	{
204 		if (m_waiters.length > 0) {
205 			ManualEvent s = m_waiters.front().signal;
206 			() @trusted { m_waiters.removeFront(); } ();
207 			s.emit(); // resume one
208 		} else m_locks--;
209 	}
210 
211 	// if true, a goes after b. ie. b comes out front()
212 	/// private
213 	static bool asc(ref Waiter a, ref Waiter b)
214 	{
215 		if (a.seq == b.seq) {
216 			if (a.priority == b.priority) {
217 				// resolve using the pointer address
218 				return (cast(size_t)&a.signal) > (cast(size_t) &b.signal);
219 			}
220 			// resolve using priority
221 			return a.priority < b.priority;
222 		}
223 		// resolve using seq number
224 		return a.seq > b.seq;
225 	}
226 
227 	private void rewindSeq()
228 	@trusted {
229 		Array!Waiter waiters = m_waiters.release();
230 		ushort min_seq;
231 		import std.algorithm : min;
232 		foreach (ref waiter; waiters[])
233 			min_seq = min(waiter.seq, min_seq);
234 		foreach (ref waiter; waiters[])
235 			waiter.seq -= min_seq;
236 		m_waiters.assume(waiters);
237 	}
238 }
239 
240 
241 /**
242 	Mutex implementation for fibers.
243 
244 	This mutex type can be used in exchange for a core.sync.mutex.Mutex, but
245 	does not block the event loop when contention happens. Note that this
246 	mutex does not allow recursive locking.
247 
248 	Notice:
249 		Because this class is annotated nothrow, it cannot be interrupted
250 		using $(D vibe.core.task.Task.interrupt()). The corresponding
251 		$(D InterruptException) will be deferred until the next blocking
252 		operation yields the event loop.
253 
254 		Use $(D InterruptibleTaskMutex) as an alternative that can be
255 		interrupted.
256 
257 	See_Also: InterruptibleTaskMutex, RecursiveTaskMutex, core.sync.mutex.Mutex
258 */
259 class TaskMutex : core.sync.mutex.Mutex, Lockable {
260 @safe:
261 
262 	private TaskMutexImpl!false m_impl;
263 
264 	this(Object o) @trusted { m_impl.setup(); super(o); }
265 	this() @trusted { m_impl.setup(); }
266 
267 	override bool tryLock() nothrow { return m_impl.tryLock(); }
268 	override void lock() nothrow { m_impl.lock(); }
269 	override void unlock() nothrow { m_impl.unlock(); }
270 }
271 
272 unittest {
273 	auto mutex = new TaskMutex;
274 
275 	{
276 		auto lock = ScopedMutexLock(mutex);
277 		assert(lock.locked);
278 		assert(mutex.m_impl.m_locked);
279 
280 		auto lock2 = ScopedMutexLock(mutex, LockMode.tryLock);
281 		assert(!lock2.locked);
282 	}
283 	assert(!mutex.m_impl.m_locked);
284 
285 	auto lock = ScopedMutexLock(mutex, LockMode.tryLock);
286 	assert(lock.locked);
287 	lock.unlock();
288 	assert(!lock.locked);
289 
290 	synchronized(mutex){
291 		assert(mutex.m_impl.m_locked);
292 	}
293 	assert(!mutex.m_impl.m_locked);
294 
295 	mutex.performLocked!({
296 		assert(mutex.m_impl.m_locked);
297 	});
298 	assert(!mutex.m_impl.m_locked);
299 
300 	with(mutex.ScopedMutexLock) {
301 		assert(mutex.m_impl.m_locked);
302 	}
303 }
304 
305 unittest { // test deferred throwing
306 	import vibe.core.core;
307 
308 	auto mutex = new TaskMutex;
309 	auto t1 = runTask({
310 		try {
311 			mutex.lock();
312 			scope (exit) mutex.unlock();
313 			sleep(20.msecs);
314 		} catch (Exception e) {
315 			assert(false, "No exception expected in first task: "~e.msg);
316 		}
317 	});
318 
319 	auto t2 = runTask({
320 		try mutex.lock();
321 		catch (Exception e) {
322 			assert(false, "No exception supposed to be thrown: "~e.msg);
323 		}
324 		scope (exit) mutex.unlock();
325 		try {
326 			yield();
327 			assert(false, "Yield is supposed to have thrown an InterruptException.");
328 		} catch (InterruptException) {
329 			// as expected!
330 		} catch (Exception e) {
331 			assert(false, "Only InterruptException supposed to be thrown: "~e.msg);
332 		}
333 	});
334 
335 	runTask({
336 		// mutex is now locked in first task for 20 ms
337 		// the second tasks is waiting in lock()
338 		t2.interrupt();
339 		t1.join();
340 		t2.join();
341 		assert(!mutex.m_impl.m_locked); // ensure that the scope(exit) has been executed
342 		exitEventLoop();
343 	});
344 
345 	runEventLoop();
346 }
347 
348 unittest {
349 	runMutexUnitTests!TaskMutex();
350 }
351 
352 
353 /**
354 	Alternative to $(D TaskMutex) that supports interruption.
355 
356 	This class supports the use of $(D vibe.core.task.Task.interrupt()) while
357 	waiting in the $(D lock()) method. However, because the interface is not
358 	$(D nothrow), it cannot be used as an object monitor.
359 
360 	See_Also: $(D TaskMutex), $(D InterruptibleRecursiveTaskMutex)
361 */
362 final class InterruptibleTaskMutex : Lockable {
363 @safe:
364 
365 	private TaskMutexImpl!true m_impl;
366 
367 	this() { m_impl.setup(); }
368 
369 	bool tryLock() nothrow { return m_impl.tryLock(); }
370 	void lock() { m_impl.lock(); }
371 	void unlock() nothrow { m_impl.unlock(); }
372 }
373 
374 unittest {
375 	runMutexUnitTests!InterruptibleTaskMutex();
376 }
377 
378 
379 
380 /**
381 	Recursive mutex implementation for tasks.
382 
383 	This mutex type can be used in exchange for a core.sync.mutex.Mutex, but
384 	does not block the event loop when contention happens.
385 
386 	Notice:
387 		Because this class is annotated nothrow, it cannot be interrupted
388 		using $(D vibe.core.task.Task.interrupt()). The corresponding
389 		$(D InterruptException) will be deferred until the next blocking
390 		operation yields the event loop.
391 
392 		Use $(D InterruptibleRecursiveTaskMutex) as an alternative that can be
393 		interrupted.
394 
395 	See_Also: TaskMutex, core.sync.mutex.Mutex
396 */
397 class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable {
398 @safe:
399 
400 	private RecursiveTaskMutexImpl!false m_impl;
401 
402 	this(Object o) { m_impl.setup(); super(o); }
403 	this() { m_impl.setup(); }
404 
405 	override bool tryLock() { return m_impl.tryLock(); }
406 	override void lock() { m_impl.lock(); }
407 	override void unlock() { m_impl.unlock(); }
408 }
409 
410 unittest {
411 	runMutexUnitTests!RecursiveTaskMutex();
412 }
413 
414 
415 /**
416 	Alternative to $(D RecursiveTaskMutex) that supports interruption.
417 
418 	This class supports the use of $(D vibe.core.task.Task.interrupt()) while
419 	waiting in the $(D lock()) method. However, because the interface is not
420 	$(D nothrow), it cannot be used as an object monitor.
421 
422 	See_Also: $(D RecursiveTaskMutex), $(D InterruptibleTaskMutex)
423 */
424 final class InterruptibleRecursiveTaskMutex : Lockable {
425 @safe:
426 
427 	private RecursiveTaskMutexImpl!true m_impl;
428 
429 	this() { m_impl.setup(); }
430 
431 	bool tryLock() { return m_impl.tryLock(); }
432 	void lock() { m_impl.lock(); }
433 	void unlock() { m_impl.unlock(); }
434 }
435 
436 unittest {
437 	runMutexUnitTests!InterruptibleRecursiveTaskMutex();
438 }
439 
440 
441 private void runMutexUnitTests(M)()
442 {
443 	import vibe.core.core;
444 
445 	auto m = new M;
446 	Task t1, t2;
447 	void runContendedTasks(bool interrupt_t1, bool interrupt_t2) {
448 		assert(!m.m_impl.m_locked);
449 
450 		// t1 starts first and acquires the mutex for 20 ms
451 		// t2 starts second and has to wait in m.lock()
452 		t1 = runTask({
453 			assert(!m.m_impl.m_locked);
454 			m.lock();
455 			assert(m.m_impl.m_locked);
456 			if (interrupt_t1) assertThrown!InterruptException(sleep(100.msecs));
457 			else assertNotThrown(sleep(20.msecs));
458 			m.unlock();
459 		});
460 		t2 = runTask({
461 			assert(!m.tryLock());
462 			if (interrupt_t2) {
463 				try m.lock();
464 				catch (InterruptException) return;
465 				try yield(); // rethrows any deferred exceptions
466 				catch (InterruptException) {
467 					m.unlock();
468 					return;
469 				}
470 				assert(false, "Supposed to have thrown an InterruptException.");
471 			} else assertNotThrown(m.lock());
472 			assert(m.m_impl.m_locked);
473 			sleep(20.msecs);
474 			m.unlock();
475 			assert(!m.m_impl.m_locked);
476 		});
477 	}
478 
479 	// basic lock test
480 	m.performLocked!({
481 		assert(m.m_impl.m_locked);
482 	});
483 	assert(!m.m_impl.m_locked);
484 
485 	// basic contention test
486 	runContendedTasks(false, false);
487 	runTask({
488 		assert(t1.running && t2.running);
489 		assert(m.m_impl.m_locked);
490 		t1.join();
491 		assert(!t1.running && t2.running);
492 		yield(); // give t2 a chance to take the lock
493 		assert(m.m_impl.m_locked);
494 		t2.join();
495 		assert(!t2.running);
496 		assert(!m.m_impl.m_locked);
497 		exitEventLoop();
498 	});
499 	runEventLoop();
500 	assert(!m.m_impl.m_locked);
501 
502 	// interruption test #1
503 	runContendedTasks(true, false);
504 	runTask({
505 		assert(t1.running && t2.running);
506 		assert(m.m_impl.m_locked);
507 		t1.interrupt();
508 		t1.join();
509 		assert(!t1.running && t2.running);
510 		yield(); // give t2 a chance to take the lock
511 		assert(m.m_impl.m_locked);
512 		t2.join();
513 		assert(!t2.running);
514 		assert(!m.m_impl.m_locked);
515 		exitEventLoop();
516 	});
517 	runEventLoop();
518 	assert(!m.m_impl.m_locked);
519 
520 	// interruption test #2
521 	runContendedTasks(false, true);
522 	runTask({
523 		assert(t1.running && t2.running);
524 		assert(m.m_impl.m_locked);
525 		t2.interrupt();
526 		t2.join();
527 		assert(!t2.running);
528 		static if (is(M == InterruptibleTaskMutex) || is (M == InterruptibleRecursiveTaskMutex))
529 			assert(t1.running && m.m_impl.m_locked);
530 		t1.join();
531 		assert(!t1.running);
532 		assert(!m.m_impl.m_locked);
533 		exitEventLoop();
534 	});
535 	runEventLoop();
536 	assert(!m.m_impl.m_locked);
537 }
538 
539 
540 /**
541 	Event loop based condition variable or "event" implementation.
542 
543 	This class can be used in exchange for a $(D core.sync.condition.Condition)
544 	to avoid blocking the event loop when waiting.
545 
546 	Notice:
547 		Because this class is annotated nothrow, it cannot be interrupted
548 		using $(D vibe.core.task.Task.interrupt()). The corresponding
549 		$(D InterruptException) will be deferred until the next blocking
550 		operation yields to the event loop.
551 
552 		Use $(D InterruptibleTaskCondition) as an alternative that can be
553 		interrupted.
554 
555 		Note that it is generally not safe to use a `TaskCondition` together with an
556 		interruptible mutex type.
557 
558 	See_Also: InterruptibleTaskCondition
559 */
560 class TaskCondition : core.sync.condition.Condition {
561 @safe:
562 
563 	private TaskConditionImpl!(false, Mutex) m_impl;
564 
565 	this(core.sync.mutex.Mutex mtx) nothrow { m_impl.setup(mtx); super(mtx); }
566 
567 	override @property Mutex mutex() nothrow { return m_impl.mutex; }
568 	override void wait() { m_impl.wait(); }
569 	override bool wait(Duration timeout) { return m_impl.wait(timeout); }
570 	override void notify() { m_impl.notify(); }
571 	override void notifyAll() { m_impl.notifyAll(); }
572 }
573 
574 /** This example shows the typical usage pattern using a `while` loop to make
575 	sure that the final condition is reached.
576 */
577 unittest {
578 	import vibe.core.core;
579 
580 	__gshared Mutex mutex;
581 	__gshared TaskCondition condition;
582 	__gshared int workers_still_running = 0;
583 
584 	// setup the task condition
585 	mutex = new Mutex;
586 	condition = new TaskCondition(mutex);
587 
588 	// start up the workers and count how many are running
589 	foreach (i; 0 .. 4) {
590 		workers_still_running++;
591 		runWorkerTask({
592 			// simulate some work
593 			sleep(100.msecs);
594 
595 			// notify the waiter that we're finished
596 			synchronized (mutex)
597 				workers_still_running--;
598 			condition.notify();
599 		});
600 	}
601 
602 	// wait until all tasks have decremented the counter back to zero
603 	synchronized (mutex) {
604 		while (workers_still_running > 0)
605 			condition.wait();
606 	}
607 }
608 
609 
610 /**
611 	Alternative to `TaskCondition` that supports interruption.
612 
613 	This class supports the use of `vibe.core.task.Task.interrupt()` while
614 	waiting in the `wait()` method.
615 
616 	See `TaskCondition` for an example.
617 
618 	Notice:
619 		Note that it is generally not safe to use an
620 		`InterruptibleTaskCondition` together with an interruptible mutex type.
621 
622 	See_Also: `TaskCondition`
623 */
624 final class InterruptibleTaskCondition {
625 @safe:
626 
627 	private TaskConditionImpl!(true, Lockable) m_impl;
628 
629 	this(core.sync.mutex.Mutex mtx) nothrow { m_impl.setup(mtx); }
630 	this(Lockable mtx) nothrow { m_impl.setup(mtx); }
631 
632 	@property Lockable mutex() { return m_impl.mutex; }
633 	void wait() { m_impl.wait(); }
634 	bool wait(Duration timeout) { return m_impl.wait(timeout); }
635 	void notify() { m_impl.notify(); }
636 	void notifyAll() { m_impl.notifyAll(); }
637 }
638 
639 
640 /** Creates a new signal that can be shared between fibers.
641 */
642 ManualEvent createManualEvent()
643 @safe nothrow {
644 	return getEventDriver().createManualEvent();
645 }
646 
647 /** Creates a new signal that can be shared between fibers.
648 */
649 LocalManualEvent createLocalManualEvent()
650 @safe nothrow {
651 	return getEventDriver().createManualEvent();
652 }
653 
654 alias LocalManualEvent = ManualEvent;
655 
656 /** A manually triggered cross-task event.
657 
658 	Note: the ownership can be shared between multiple fibers and threads.
659 */
660 interface ManualEvent {
661 @safe:
662 
663 	/// A counter that is increased with every emit() call
664 	@property int emitCount() const nothrow;
665 
666 	/// Emits the signal, waking up all owners of the signal.
667 	void emit() nothrow;
668 
669 	/** Acquires ownership and waits until the signal is emitted.
670 
671 		Throws:
672 			May throw an $(D InterruptException) if the task gets interrupted
673 			using $(D Task.interrupt()).
674 	*/
675 	void wait();
676 
677 	/** Acquires ownership and waits until the emit count differs from the given one.
678 
679 		Throws:
680 			May throw an $(D InterruptException) if the task gets interrupted
681 			using $(D Task.interrupt()).
682 	*/
683 	int wait(int reference_emit_count);
684 
685 	/** Acquires ownership and waits until the emit count differs from the given one or until a timeout is reached.
686 
687 		Throws:
688 			May throw an $(D InterruptException) if the task gets interrupted
689 			using $(D Task.interrupt()).
690 	*/
691 	int wait(Duration timeout, int reference_emit_count);
692 
693 	/** Same as $(D wait), but defers throwing any $(D InterruptException).
694 
695 		This method is annotated $(D nothrow) at the expense that it cannot be
696 		interrupted.
697 	*/
698 	int waitUninterruptible(int reference_emit_count) nothrow;
699 
700 	/// ditto
701 	int waitUninterruptible(Duration timeout, int reference_emit_count) nothrow;
702 }
703 
704 
705 private struct TaskMutexImpl(bool INTERRUPTIBLE) {
706 	import std.stdio;
707 	private {
708 		shared(bool) m_locked = false;
709 		shared(uint) m_waiters = 0;
710 		ManualEvent m_signal;
711 		debug Task m_owner;
712 	}
713 
714 	void setup()
715 	nothrow {
716 		m_signal = createManualEvent();
717 	}
718 
719 
720 	@trusted bool tryLock()
721 	{
722 		if (cas(&m_locked, false, true)) {
723 			debug m_owner = Task.getThis();
724 			version(MutexPrint) writefln("mutex %s lock %s", cast(void*)this, atomicLoad(m_waiters));
725 			return true;
726 		}
727 		return false;
728 	}
729 
730 	@trusted void lock()
731 	{
732 		if (tryLock()) return;
733 		debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock.");
734 		atomicOp!"+="(m_waiters, 1);
735 		version(MutexPrint) writefln("mutex %s wait %s", cast(void*)this, atomicLoad(m_waiters));
736 		scope(exit) atomicOp!"-="(m_waiters, 1);
737 		auto ecnt = m_signal.emitCount();
738 		while (!tryLock()) {
739 			static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt);
740 			else ecnt = m_signal.waitUninterruptible(ecnt);
741 		}
742 	}
743 
744 	@trusted void unlock()
745 	{
746 		assert(m_locked);
747 		debug {
748 			assert(m_owner == Task.getThis());
749 			m_owner = Task();
750 		}
751 		atomicStore!(MemoryOrder.rel)(m_locked, false);
752 		version(MutexPrint) writefln("mutex %s unlock %s", cast(void*)this, atomicLoad(m_waiters));
753 		if (atomicLoad(m_waiters) > 0)
754 			m_signal.emit();
755 	}
756 }
757 
758 private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) {
759 	import std.stdio;
760 	private {
761 		core.sync.mutex.Mutex m_mutex;
762 		Task m_owner;
763 		size_t m_recCount = 0;
764 		shared(uint) m_waiters = 0;
765 		ManualEvent m_signal;
766 		@property bool m_locked() const { return m_recCount > 0; }
767 	}
768 
769 	void setup()
770 	{
771 		m_signal = createManualEvent();
772 		m_mutex = new core.sync.mutex.Mutex;
773 	}
774 
775 	@trusted bool tryLock()
776 	{
777 		auto self = Task.getThis();
778 		return m_mutex.performLocked!({
779 			if (!m_owner) {
780 				assert(m_recCount == 0);
781 				m_recCount = 1;
782 				m_owner = self;
783 				return true;
784 			} else if (m_owner == self) {
785 				m_recCount++;
786 				return true;
787 			}
788 			return false;
789 		});
790 	}
791 
792 	@trusted void lock()
793 	{
794 		if (tryLock()) return;
795 		atomicOp!"+="(m_waiters, 1);
796 		version(MutexPrint) writefln("mutex %s wait %s", cast(void*)this, atomicLoad(m_waiters));
797 		scope(exit) atomicOp!"-="(m_waiters, 1);
798 		auto ecnt = m_signal.emitCount();
799 		while (!tryLock()) {
800 			static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt);
801 			else ecnt = m_signal.waitUninterruptible(ecnt);
802 		}
803 	}
804 
805 	@trusted void unlock()
806 	{
807 		auto self = Task.getThis();
808 		m_mutex.performLocked!({
809 			assert(m_owner == self);
810 			assert(m_recCount > 0);
811 			m_recCount--;
812 			if (m_recCount == 0) {
813 				m_owner = Task.init;
814 			}
815 		});
816 		version(MutexPrint) writefln("mutex %s unlock %s", cast(void*)this, atomicLoad(m_waiters));
817 		if (atomicLoad(m_waiters) > 0)
818 			m_signal.emit();
819 	}
820 }
821 
822 private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) {
823 	private {
824 		LOCKABLE m_mutex;
825 
826 		ManualEvent m_signal;
827 	}
828 
829 	static if (is(LOCKABLE == Lockable)) {
830 		final class MutexWrapper : Lockable {
831 			private core.sync.mutex.Mutex m_mutex;
832 			this(core.sync.mutex.Mutex mtx) { m_mutex = mtx; }
833 			@trusted void lock() { m_mutex.lock(); }
834 			@trusted void unlock() { m_mutex.unlock(); }
835 			@trusted bool tryLock() { return m_mutex.tryLock(); }
836 		}
837 
838 		void setup(core.sync.mutex.Mutex mtx)
839 		{
840 			setup(new MutexWrapper(mtx));
841 		}
842 	}
843 
844 	void setup(LOCKABLE mtx)
845 	{
846 		m_mutex = mtx;
847 		m_signal = createManualEvent();
848 	}
849 
850 	@property LOCKABLE mutex() { return m_mutex; }
851 
852 	@trusted void wait()
853 	{
854 		if (auto tm = cast(TaskMutex)m_mutex) {
855 			assert(tm.m_impl.m_locked);
856 			debug assert(tm.m_impl.m_owner == Task.getThis());
857 		}
858 
859 		auto refcount = m_signal.emitCount;
860 		m_mutex.unlock();
861 		scope(exit) m_mutex.lock();
862 		static if (INTERRUPTIBLE) m_signal.wait(refcount);
863 		else m_signal.waitUninterruptible(refcount);
864 	}
865 
866 	@trusted bool wait(Duration timeout)
867 	{
868 		assert(!timeout.isNegative());
869 		if (auto tm = cast(TaskMutex)m_mutex) {
870 			assert(tm.m_impl.m_locked);
871 			debug assert(tm.m_impl.m_owner == Task.getThis());
872 		}
873 
874 		auto refcount = m_signal.emitCount;
875 		m_mutex.unlock();
876 		scope(exit) m_mutex.lock();
877 
878 		static if (INTERRUPTIBLE) return m_signal.wait(timeout, refcount) != refcount;
879 		else return m_signal.waitUninterruptible(timeout, refcount) != refcount;
880 	}
881 
882 	@trusted void notify()
883 	{
884 		m_signal.emit();
885 	}
886 
887 	@trusted void notifyAll()
888 	{
889 		m_signal.emit();
890 	}
891 }
892 
893 /** Contains the shared state of a $(D TaskReadWriteMutex).
894  *
895  *  Since a $(D TaskReadWriteMutex) consists of two actual Mutex
896  *  objects that rely on common memory, this class implements
897  *  the actual functionality of their method calls.
898  *
899  *  The method implementations are based on two static parameters
900  *  ($(D INTERRUPTIBLE) and $(D INTENT)), which are configured through
901  *  template arguments:
902  *
903  *  - $(D INTERRUPTIBLE) determines whether the mutex implementation
904  *    are interruptible by vibe.d's $(D vibe.core.task.Task.interrupt())
905  *    method or not.
906  *
907  *  - $(D INTENT) describes the intent, with which a locking operation is
908  *    performed (i.e. $(D READ_ONLY) or $(D READ_WRITE)). RO locking allows for
909  *    multiple Tasks holding the mutex, whereas RW locking will cause
910  *    a "bottleneck" so that only one Task can write to the protected
911  *    data at once.
912  */
913 private struct ReadWriteMutexState(bool INTERRUPTIBLE)
914 {
915 @safe:
916     /** The policy with which the mutex should operate.
917      *
918      *  The policy determines how the acquisition of the locks is
919      *  performed and can be used to tune the mutex according to the
920      *  underlying algorithm in which it is used.
921      *
922      *  According to the provided policy, the mutex will either favor
923      *  reading or writing tasks and could potentially starve the
924      *  respective opposite.
925      *
926      *  cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
927      */
928     enum Policy : int
929     {
930         /** Readers are prioritized, writers may be starved as a result. */
931         PREFER_READERS = 0,
932         /** Writers are prioritized, readers may be starved as a result. */
933         PREFER_WRITERS
934     }
935 
936     /** The intent with which a locking operation is performed.
937      *
938      *  Since both locks share the same underlying algorithms, the actual
939      *  intent with which a lock operation is performed (i.e read/write)
940      *  are passed as a template parameter to each method.
941      */
942     enum LockingIntent : bool
943     {
944         /** Perform a read lock/unlock operation. Multiple reading locks can be
945          *  active at a time. */
946         READ_ONLY = 0,
947         /** Perform a write lock/unlock operation. Only a single writer can
948          *  hold a lock at any given time. */
949         READ_WRITE = 1
950     }
951 
952     private {
953         //Queue counters
954         /** The number of reading tasks waiting for the lock to become available. */
955         shared(uint)  m_waitingForReadLock = 0;
956         /** The number of writing tasks waiting for the lock to become available. */
957         shared(uint)  m_waitingForWriteLock = 0;
958 
959         //Lock counters
960         /** The number of reading tasks that currently hold the lock. */
961         uint  m_activeReadLocks = 0;
962         /** The number of writing tasks that currently hold the lock (binary). */
963         ubyte m_activeWriteLocks = 0;
964 
965         /** The policy determining the lock's behavior. */
966         Policy m_policy;
967 
968         //Queue Events
969         /** The event used to wake reading tasks waiting for the lock while it is blocked. */
970         ManualEvent m_readyForReadLock;
971         /** The event used to wake writing tasks waiting for the lock while it is blocked. */
972         ManualEvent m_readyForWriteLock;
973 
974         /** The underlying mutex that gates the access to the shared state. */
975         Mutex m_counterMutex;
976     }
977 
978     this(Policy policy)
979     {
980         m_policy = policy;
981         m_counterMutex = new Mutex();
982         m_readyForReadLock  = createManualEvent();
983         m_readyForWriteLock = createManualEvent();
984     }
985 
986     @disable this(this);
987 
988     /** The policy with which the lock has been created. */
989     @property policy() const { return m_policy; }
990 
991     version(RWMutexPrint)
992     {
993         /** Print out debug information during lock operations. */
994         void printInfo(string OP, LockingIntent INTENT)() nothrow
995         {
996         	import std.string;
997             try
998             {
999                 import std.stdio;
1000                 writefln("RWMutex: %s (%s), active: RO: %d, RW: %d; waiting: RO: %d, RW: %d",
1001                     OP.leftJustify(10,' '),
1002                     INTENT == LockingIntent.READ_ONLY ? "RO" : "RW",
1003                     m_activeReadLocks,    m_activeWriteLocks,
1004                     m_waitingForReadLock, m_waitingForWriteLock
1005                     );
1006             }
1007             catch (Exception t){}
1008         }
1009     }
1010 
1011     /** An internal shortcut method to determine the queue event for a given intent. */
1012     @property ref auto queueEvent(LockingIntent INTENT)()
1013     {
1014         static if (INTENT == LockingIntent.READ_ONLY)
1015             return m_readyForReadLock;
1016         else
1017             return m_readyForWriteLock;
1018     }
1019 
1020     /** An internal shortcut method to determine the queue counter for a given intent. */
1021     @property ref auto queueCounter(LockingIntent INTENT)()
1022     {
1023         static if (INTENT == LockingIntent.READ_ONLY)
1024             return m_waitingForReadLock;
1025         else
1026             return m_waitingForWriteLock;
1027     }
1028 
1029     /** An internal shortcut method to determine the current emitCount of the queue counter for a given intent. */
1030     int emitCount(LockingIntent INTENT)()
1031     {
1032         return queueEvent!INTENT.emitCount();
1033     }
1034 
1035     /** An internal shortcut method to determine the active counter for a given intent. */
1036     @property ref auto activeCounter(LockingIntent INTENT)()
1037     {
1038         static if (INTENT == LockingIntent.READ_ONLY)
1039             return m_activeReadLocks;
1040         else
1041             return m_activeWriteLocks;
1042     }
1043 
1044     /** An internal shortcut method to wait for the queue event for a given intent.
1045      *
1046      *  This method is used during the `lock()` operation, after a
1047      *  `tryLock()` operation has been unsuccessfully finished.
1048      *  The active fiber will yield and be suspended until the queue event
1049      *  for the given intent will be fired.
1050      */
1051     int wait(LockingIntent INTENT)(int count)
1052     {
1053         static if (INTERRUPTIBLE)
1054             return queueEvent!INTENT.wait(count);
1055         else
1056             return queueEvent!INTENT.waitUninterruptible(count);
1057     }
1058 
1059     /** An internal shortcut method to notify tasks waiting for the lock to become available again.
1060      *
1061      *  This method is called whenever the number of owners of the mutex hits
1062      *  zero; this is basically the counterpart to `wait()`.
1063      *  It wakes any Task currently waiting for the mutex to be released.
1064      */
1065     @trusted void notify(LockingIntent INTENT)()
1066     {
1067         static if (INTENT == LockingIntent.READ_ONLY)
1068         { //If the last reader unlocks the mutex, notify all waiting writers
1069             if (atomicLoad(m_waitingForWriteLock) > 0)
1070                 m_readyForWriteLock.emit();
1071         }
1072         else
1073         { //If a writer unlocks the mutex, notify both readers and writers
1074             if (atomicLoad(m_waitingForReadLock) > 0)
1075                 m_readyForReadLock.emit();
1076 
1077             if (atomicLoad(m_waitingForWriteLock) > 0)
1078                 m_readyForWriteLock.emit();
1079         }
1080     }
1081 
1082     /** An internal method that performs the acquisition attempt in different variations.
1083      *
1084      *  Since both locks rely on a common TaskMutex object which gates the access
1085      *  to their common data acquisition attempts for this lock are more complex
1086      *  than for simple mutex variants. This method will thus be performing the
1087      *  `tryLock()` operation in two variations, depending on the callee:
1088      *
1089      *  If called from the outside ($(D WAIT_FOR_BLOCKING_MUTEX) = false), the method
1090      *  will instantly fail if the underlying mutex is locked (i.e. during another
1091      *  `tryLock()` or `unlock()` operation), in order to guarantee the fastest
1092      *  possible locking attempt.
1093      *
1094      *  If used internally by the `lock()` method ($(D WAIT_FOR_BLOCKING_MUTEX) = true),
1095      *  the operation will wait for the mutex to be available before deciding if
1096      *  the lock can be acquired, since the attempt would anyway be repeated until
1097      *  it succeeds. This will prevent frequent retries under heavy loads and thus
1098      *  should ensure better performance.
1099      */
1100     @trusted bool tryLock(LockingIntent INTENT, bool WAIT_FOR_BLOCKING_MUTEX)()
1101     {
1102         //Log a debug statement for the attempt
1103         version(RWMutexPrint)
1104             printInfo!("tryLock",INTENT)();
1105 
1106         //Try to acquire the lock
1107         static if (!WAIT_FOR_BLOCKING_MUTEX)
1108         {
1109             if (!m_counterMutex.tryLock())
1110                 return false;
1111         }
1112         else
1113             m_counterMutex.lock();
1114 
1115         scope(exit)
1116             m_counterMutex.unlock();
1117 
1118         //Log a debug statement for the attempt
1119         version(RWMutexPrint)
1120             printInfo!("checkCtrs",INTENT)();
1121 
1122         //Check if there's already an active writer
1123         if (m_activeWriteLocks > 0)
1124             return false;
1125 
1126         //If writers are preferred over readers, check whether there
1127         //currently is a writer in the waiting queue and abort if
1128         //that's the case.
1129         static if (INTENT == LockingIntent.READ_ONLY)
1130             if (m_policy.PREFER_WRITERS && m_waitingForWriteLock > 0)
1131                 return false;
1132 
1133         //If we are locking the mutex for writing, make sure that
1134         //there's no reader active.
1135         static if (INTENT == LockingIntent.READ_WRITE)
1136             if (m_activeReadLocks > 0)
1137                 return false;
1138 
1139         //We can successfully acquire the lock!
1140         //Log a debug statement for the success.
1141         version(RWMutexPrint)
1142             printInfo!("lock",INTENT)();
1143 
1144         //Increase the according counter
1145         //(number of active readers/writers)
1146         //and return a success code.
1147         activeCounter!INTENT += 1;
1148         return true;
1149     }
1150 
1151     /** Attempt to acquire the lock for a given intent.
1152      *
1153      *  Returns:
1154      *      `true`, if the lock was successfully acquired;
1155      *      `false` otherwise.
1156      */
1157     @trusted bool tryLock(LockingIntent INTENT)()
1158     {
1159         //Try to lock this mutex without waiting for the underlying
1160         //TaskMutex - fail if it is already blocked.
1161         return tryLock!(INTENT,false)();
1162     }
1163 
1164     /** Acquire the lock for the given intent; yield and suspend until the lock has been acquired. */
1165     @trusted void lock(LockingIntent INTENT)()
1166     {
1167         //Prepare a waiting action before the first
1168         //`tryLock()` call in order to avoid a race
1169         //condition that could lead to the queue notification
1170         //not being fired.
1171         auto count = emitCount!INTENT;
1172         atomicOp!"+="(queueCounter!INTENT,1);
1173         scope(exit)
1174             atomicOp!"-="(queueCounter!INTENT,1);
1175 
1176         //Try to lock the mutex
1177         auto locked = tryLock!(INTENT,true)();
1178         if (locked)
1179             return;
1180 
1181         //Retry until we successfully acquired the lock
1182         while(!locked)
1183         {
1184             version(RWMutexPrint)
1185                 printInfo!("wait",INTENT)();
1186 
1187             count  = wait!INTENT(count);
1188             locked = tryLock!(INTENT,true)();
1189         }
1190     }
1191 
1192     /** Unlock the mutex after a successful acquisition. */
1193     @trusted void unlock(LockingIntent INTENT)()
1194     {
1195         version(RWMutexPrint)
1196             printInfo!("unlock",INTENT)();
1197 
1198         debug assert(activeCounter!INTENT > 0);
1199 
1200         synchronized(m_counterMutex)
1201         {
1202             //Decrement the counter of active lock holders.
1203             //If the counter hits zero, notify waiting Tasks
1204             activeCounter!INTENT -= 1;
1205             if (activeCounter!INTENT == 0)
1206             {
1207                 version(RWMutexPrint)
1208                     printInfo!("notify",INTENT)();
1209 
1210                 notify!INTENT();
1211             }
1212         }
1213     }
1214 }
1215 
1216 /** A ReadWriteMutex implementation for fibers.
1217  *
1218  *  This mutex can be used in exchange for a $(D core.sync.mutex.ReadWriteMutex),
1219  *  but does not block the event loop in contention situations. The `reader` and `writer`
1220  *  members are used for locking. Locking the `reader` mutex allows access to multiple
1221  *  readers at once, while the `writer` mutex only allows a single writer to lock it at
1222  *  any given time. Locks on `reader` and `writer` are mutually exclusive (i.e. whenever a
1223  *  writer is active, no readers can be active at the same time, and vice versa).
1224  *
1225  *  Notice:
1226  *      Mutexes implemented by this class cannot be interrupted
1227  *      using $(D vibe.core.task.Task.interrupt()). The corresponding
1228  *      InterruptException will be deferred until the next blocking
1229  *      operation yields the event loop.
1230  *
1231  *      Use $(D InterruptibleTaskReadWriteMutex) as an alternative that can be
1232  *      interrupted.
1233  *
1234  *  cf. $(D core.sync.mutex.ReadWriteMutex)
1235  */
1236 class TaskReadWriteMutex
1237 {
1238 @safe:
1239 
1240     private {
1241         alias State = ReadWriteMutexState!false;
1242         alias LockingIntent = State.LockingIntent;
1243         alias READ_ONLY  = LockingIntent.READ_ONLY;
1244         alias READ_WRITE = LockingIntent.READ_WRITE;
1245 
1246         /** The shared state used by the reader and writer mutexes. */
1247         State m_state;
1248     }
1249 
1250     /** The policy with which the mutex should operate.
1251      *
1252      *  The policy determines how the acquisition of the locks is
1253      *  performed and can be used to tune the mutex according to the
1254      *  underlying algorithm in which it is used.
1255      *
1256      *  According to the provided policy, the mutex will either favor
1257      *  reading or writing tasks and could potentially starve the
1258      *  respective opposite.
1259      *
1260      *  cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
1261      */
1262     alias Policy = State.Policy;
1263 
1264     /** A common baseclass for both of the provided mutexes.
1265      *
1266      *  The intent for the according mutex is specified through the
1267      *  $(D INTENT) template argument, which determines if a mutex is
1268      *  used for read or write locking.
1269      */
1270     final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
1271     {
1272         /** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */
1273         override bool tryLock() { return m_state.tryLock!INTENT(); }
1274         /** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */
1275         override void lock()    { m_state.lock!INTENT(); }
1276         /** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */
1277         override void unlock()  { m_state.unlock!INTENT(); }
1278     }
1279     alias Reader = Mutex!READ_ONLY;
1280     alias Writer = Mutex!READ_WRITE;
1281 
1282     Reader reader;
1283     Writer writer;
1284 
1285     this(Policy policy = Policy.PREFER_WRITERS)
1286     {
1287         m_state = State(policy);
1288         reader = new Reader();
1289         writer = new Writer();
1290     }
1291 
1292     /** The policy with which the lock has been created. */
1293     @property Policy policy() const { return m_state.policy; }
1294 }
1295 
1296 /** Alternative to $(D TaskReadWriteMutex) that supports interruption.
1297  *
1298  *  This class supports the use of $(D vibe.core.task.Task.interrupt()) while
1299  *  waiting in the `lock()` method.
1300  *
1301  *  cf. $(D core.sync.mutex.ReadWriteMutex)
1302  */
1303 class InterruptibleTaskReadWriteMutex
1304 {
1305 @safe:
1306 
1307     private {
1308         alias State = ReadWriteMutexState!true;
1309         alias LockingIntent = State.LockingIntent;
1310         alias READ_ONLY  = LockingIntent.READ_ONLY;
1311         alias READ_WRITE = LockingIntent.READ_WRITE;
1312 
1313         /** The shared state used by the reader and writer mutexes. */
1314         State m_state;
1315     }
1316 
1317     /** The policy with which the mutex should operate.
1318      *
1319      *  The policy determines how the acquisition of the locks is
1320      *  performed and can be used to tune the mutex according to the
1321      *  underlying algorithm in which it is used.
1322      *
1323      *  According to the provided policy, the mutex will either favor
1324      *  reading or writing tasks and could potentially starve the
1325      *  respective opposite.
1326      *
1327      *  cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy)
1328      */
1329     alias Policy = State.Policy;
1330 
1331     /** A common baseclass for both of the provided mutexes.
1332      *
1333      *  The intent for the according mutex is specified through the
1334      *  $(D INTENT) template argument, which determines if a mutex is
1335      *  used for read or write locking.
1336      *
1337      */
1338     final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable
1339     {
1340         /** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */
1341         override bool tryLock() { return m_state.tryLock!INTENT(); }
1342         /** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */
1343         override void lock()    { m_state.lock!INTENT(); }
1344         /** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */
1345         override void unlock()  { m_state.unlock!INTENT(); }
1346     }
1347     alias Reader = Mutex!READ_ONLY;
1348     alias Writer = Mutex!READ_WRITE;
1349 
1350     Reader reader;
1351     Writer writer;
1352 
1353     this(Policy policy = Policy.PREFER_WRITERS)
1354     {
1355         m_state = State(policy);
1356         reader = new Reader();
1357         writer = new Writer();
1358     }
1359 
1360     /** The policy with which the lock has been created. */
1361     @property Policy policy() const { return m_state.policy; }
1362 }