1 /** 2 Interruptible Task synchronization facilities 3 4 Copyright: © 2012-2015 RejectedSoftware e.K. 5 Authors: Leonid Kramer, Sönke Ludwig, Manuel Frischknecht 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 */ 8 module vibe.core.sync; 9 10 import std.exception; 11 12 import vibe.core.driver; 13 14 import core.atomic; 15 import core.sync.mutex; 16 import core.sync.condition; 17 import std.stdio; 18 import std.traits : ReturnType; 19 20 21 enum LockMode { 22 lock, 23 tryLock, 24 defer 25 } 26 27 interface Lockable { 28 @safe: 29 void lock(); 30 void unlock(); 31 bool tryLock(); 32 } 33 34 /** RAII lock for the Mutex class. 35 */ 36 struct ScopedMutexLock 37 { 38 @safe: 39 @disable this(this); 40 private { 41 Mutex m_mutex; 42 bool m_locked; 43 LockMode m_mode; 44 } 45 46 this(core.sync.mutex.Mutex mutex, LockMode mode = LockMode.lock) { 47 assert(mutex !is null); 48 m_mutex = mutex; 49 50 final switch (mode) { 51 case LockMode.lock: lock(); break; 52 case LockMode.tryLock: tryLock(); break; 53 case LockMode.defer: break; 54 } 55 } 56 57 ~this() 58 { 59 if( m_locked ) 60 m_mutex.unlock(); 61 } 62 63 @property bool locked() const { return m_locked; } 64 65 void unlock() 66 { 67 enforce(m_locked); 68 m_mutex.unlock(); 69 m_locked = false; 70 } 71 72 bool tryLock() 73 { 74 enforce(!m_locked); 75 return m_locked = () @trusted { return m_mutex.tryLock(); } (); 76 } 77 78 void lock() 79 { 80 enforce(!m_locked); 81 m_locked = true; 82 () @trusted { m_mutex.lock(); } (); 83 } 84 } 85 86 87 /* 88 Only for internal use: 89 Ensures that a mutex is locked while executing the given procedure. 90 91 This function works for all kinds of mutexes, in particular for 92 $(D core.sync.mutex.Mutex), $(D TaskMutex) and $(D InterruptibleTaskMutex). 93 94 Returns: 95 Returns the value returned from $(D PROC), if any. 96 */ 97 /// private 98 package(vibe) ReturnType!PROC performLocked(alias PROC, MUTEX)(MUTEX mutex) 99 { 100 mutex.lock(); 101 scope (exit) mutex.unlock(); 102 return PROC(); 103 } 104 105 /// 106 unittest { 107 int protected_var = 0; 108 auto mtx = new TaskMutex; 109 mtx.performLocked!({ 110 protected_var++; 111 }); 112 } 113 114 115 /** 116 Thread-local semaphore implementation for tasks. 117 118 When the semaphore runs out of concurrent locks, it will suspend. This class 119 is used in `vibe.core.connectionpool` to limit the number of concurrent 120 connections. 121 */ 122 class LocalTaskSemaphore 123 { 124 @safe: 125 126 // requires a queue 127 import std.container.binaryheap; 128 import std.container.array; 129 130 private { 131 struct Waiter { 132 ManualEvent signal; 133 ubyte priority; 134 uint seq; 135 } 136 137 BinaryHeap!(Array!Waiter, asc) m_waiters; 138 uint m_maxLocks; 139 uint m_locks; 140 uint m_seq; 141 } 142 143 this(uint max_locks) 144 { 145 m_maxLocks = max_locks; 146 } 147 148 /// Maximum number of concurrent locks 149 @property void maxLocks(uint max_locks) { m_maxLocks = max_locks; } 150 /// ditto 151 @property uint maxLocks() const { return m_maxLocks; } 152 153 /// Number of concurrent locks still available 154 @property uint available() const { return m_maxLocks - m_locks; } 155 156 /** Try to acquire a lock. 157 158 If a lock cannot be acquired immediately, returns `false` and leaves the 159 semaphore in its previous state. 160 161 Returns: 162 `true` is returned $(I iff) the number of available locks is greater 163 than one. 164 */ 165 bool tryLock() 166 { 167 if (available > 0) 168 { 169 m_locks++; 170 return true; 171 } 172 return false; 173 } 174 175 /** Acquires a lock. 176 177 Once the limit of concurrent locks is reached, this method will block 178 until the number of locks drops below the limit. 179 */ 180 void lock(ubyte priority = 0) 181 { 182 import std.algorithm : min; 183 184 if (tryLock()) 185 return; 186 187 Waiter w; 188 w.signal = getEventDriver().createManualEvent(); 189 scope(exit) 190 () @trusted { return destroy(w.signal); } (); 191 w.priority = priority; 192 w.seq = min(0, m_seq - w.priority); 193 if (++m_seq == uint.max) 194 rewindSeq(); 195 196 () @trusted { m_waiters.insert(w); } (); 197 w.signal.waitUninterruptible(w.signal.emitCount); 198 } 199 200 /** Gives up an existing lock. 201 */ 202 void unlock() 203 { 204 if (m_waiters.length > 0) { 205 ManualEvent s = m_waiters.front().signal; 206 () @trusted { m_waiters.removeFront(); } (); 207 s.emit(); // resume one 208 } else m_locks--; 209 } 210 211 // if true, a goes after b. ie. b comes out front() 212 /// private 213 static bool asc(ref Waiter a, ref Waiter b) 214 { 215 if (a.seq == b.seq) { 216 if (a.priority == b.priority) { 217 // resolve using the pointer address 218 return (cast(size_t)&a.signal) > (cast(size_t) &b.signal); 219 } 220 // resolve using priority 221 return a.priority < b.priority; 222 } 223 // resolve using seq number 224 return a.seq > b.seq; 225 } 226 227 private void rewindSeq() 228 @trusted { 229 Array!Waiter waiters = m_waiters.release(); 230 ushort min_seq; 231 import std.algorithm : min; 232 foreach (ref waiter; waiters[]) 233 min_seq = min(waiter.seq, min_seq); 234 foreach (ref waiter; waiters[]) 235 waiter.seq -= min_seq; 236 m_waiters.assume(waiters); 237 } 238 } 239 240 241 /** 242 Mutex implementation for fibers. 243 244 This mutex type can be used in exchange for a core.sync.mutex.Mutex, but 245 does not block the event loop when contention happens. Note that this 246 mutex does not allow recursive locking. 247 248 Notice: 249 Because this class is annotated nothrow, it cannot be interrupted 250 using $(D vibe.core.task.Task.interrupt()). The corresponding 251 $(D InterruptException) will be deferred until the next blocking 252 operation yields the event loop. 253 254 Use $(D InterruptibleTaskMutex) as an alternative that can be 255 interrupted. 256 257 See_Also: InterruptibleTaskMutex, RecursiveTaskMutex, core.sync.mutex.Mutex 258 */ 259 class TaskMutex : core.sync.mutex.Mutex, Lockable { 260 @safe: 261 262 private TaskMutexImpl!false m_impl; 263 264 this(Object o) @trusted { m_impl.setup(); super(o); } 265 this() @trusted { m_impl.setup(); } 266 267 override bool tryLock() nothrow { return m_impl.tryLock(); } 268 override void lock() nothrow { m_impl.lock(); } 269 override void unlock() nothrow { m_impl.unlock(); } 270 } 271 272 unittest { 273 auto mutex = new TaskMutex; 274 275 { 276 auto lock = ScopedMutexLock(mutex); 277 assert(lock.locked); 278 assert(mutex.m_impl.m_locked); 279 280 auto lock2 = ScopedMutexLock(mutex, LockMode.tryLock); 281 assert(!lock2.locked); 282 } 283 assert(!mutex.m_impl.m_locked); 284 285 auto lock = ScopedMutexLock(mutex, LockMode.tryLock); 286 assert(lock.locked); 287 lock.unlock(); 288 assert(!lock.locked); 289 290 synchronized(mutex){ 291 assert(mutex.m_impl.m_locked); 292 } 293 assert(!mutex.m_impl.m_locked); 294 295 mutex.performLocked!({ 296 assert(mutex.m_impl.m_locked); 297 }); 298 assert(!mutex.m_impl.m_locked); 299 300 with(mutex.ScopedMutexLock) { 301 assert(mutex.m_impl.m_locked); 302 } 303 } 304 305 unittest { // test deferred throwing 306 import vibe.core.core; 307 308 auto mutex = new TaskMutex; 309 auto t1 = runTask({ 310 try { 311 mutex.lock(); 312 scope (exit) mutex.unlock(); 313 sleep(20.msecs); 314 } catch (Exception e) { 315 assert(false, "No exception expected in first task: "~e.msg); 316 } 317 }); 318 319 auto t2 = runTask({ 320 try mutex.lock(); 321 catch (Exception e) { 322 assert(false, "No exception supposed to be thrown: "~e.msg); 323 } 324 scope (exit) mutex.unlock(); 325 try { 326 yield(); 327 assert(false, "Yield is supposed to have thrown an InterruptException."); 328 } catch (InterruptException) { 329 // as expected! 330 } catch (Exception e) { 331 assert(false, "Only InterruptException supposed to be thrown: "~e.msg); 332 } 333 }); 334 335 runTask({ 336 // mutex is now locked in first task for 20 ms 337 // the second tasks is waiting in lock() 338 t2.interrupt(); 339 t1.join(); 340 t2.join(); 341 assert(!mutex.m_impl.m_locked); // ensure that the scope(exit) has been executed 342 exitEventLoop(); 343 }); 344 345 runEventLoop(); 346 } 347 348 unittest { 349 runMutexUnitTests!TaskMutex(); 350 } 351 352 353 /** 354 Alternative to $(D TaskMutex) that supports interruption. 355 356 This class supports the use of $(D vibe.core.task.Task.interrupt()) while 357 waiting in the $(D lock()) method. However, because the interface is not 358 $(D nothrow), it cannot be used as an object monitor. 359 360 See_Also: $(D TaskMutex), $(D InterruptibleRecursiveTaskMutex) 361 */ 362 final class InterruptibleTaskMutex : Lockable { 363 @safe: 364 365 private TaskMutexImpl!true m_impl; 366 367 this() { m_impl.setup(); } 368 369 bool tryLock() nothrow { return m_impl.tryLock(); } 370 void lock() { m_impl.lock(); } 371 void unlock() nothrow { m_impl.unlock(); } 372 } 373 374 unittest { 375 runMutexUnitTests!InterruptibleTaskMutex(); 376 } 377 378 379 380 /** 381 Recursive mutex implementation for tasks. 382 383 This mutex type can be used in exchange for a core.sync.mutex.Mutex, but 384 does not block the event loop when contention happens. 385 386 Notice: 387 Because this class is annotated nothrow, it cannot be interrupted 388 using $(D vibe.core.task.Task.interrupt()). The corresponding 389 $(D InterruptException) will be deferred until the next blocking 390 operation yields the event loop. 391 392 Use $(D InterruptibleRecursiveTaskMutex) as an alternative that can be 393 interrupted. 394 395 See_Also: TaskMutex, core.sync.mutex.Mutex 396 */ 397 class RecursiveTaskMutex : core.sync.mutex.Mutex, Lockable { 398 @safe: 399 400 private RecursiveTaskMutexImpl!false m_impl; 401 402 this(Object o) { m_impl.setup(); super(o); } 403 this() { m_impl.setup(); } 404 405 override bool tryLock() { return m_impl.tryLock(); } 406 override void lock() { m_impl.lock(); } 407 override void unlock() { m_impl.unlock(); } 408 } 409 410 unittest { 411 runMutexUnitTests!RecursiveTaskMutex(); 412 } 413 414 415 /** 416 Alternative to $(D RecursiveTaskMutex) that supports interruption. 417 418 This class supports the use of $(D vibe.core.task.Task.interrupt()) while 419 waiting in the $(D lock()) method. However, because the interface is not 420 $(D nothrow), it cannot be used as an object monitor. 421 422 See_Also: $(D RecursiveTaskMutex), $(D InterruptibleTaskMutex) 423 */ 424 final class InterruptibleRecursiveTaskMutex : Lockable { 425 @safe: 426 427 private RecursiveTaskMutexImpl!true m_impl; 428 429 this() { m_impl.setup(); } 430 431 bool tryLock() { return m_impl.tryLock(); } 432 void lock() { m_impl.lock(); } 433 void unlock() { m_impl.unlock(); } 434 } 435 436 unittest { 437 runMutexUnitTests!InterruptibleRecursiveTaskMutex(); 438 } 439 440 441 private void runMutexUnitTests(M)() 442 { 443 import vibe.core.core; 444 445 auto m = new M; 446 Task t1, t2; 447 void runContendedTasks(bool interrupt_t1, bool interrupt_t2) { 448 assert(!m.m_impl.m_locked); 449 450 // t1 starts first and acquires the mutex for 20 ms 451 // t2 starts second and has to wait in m.lock() 452 t1 = runTask({ 453 assert(!m.m_impl.m_locked); 454 m.lock(); 455 assert(m.m_impl.m_locked); 456 if (interrupt_t1) assertThrown!InterruptException(sleep(100.msecs)); 457 else assertNotThrown(sleep(20.msecs)); 458 m.unlock(); 459 }); 460 t2 = runTask({ 461 assert(!m.tryLock()); 462 if (interrupt_t2) { 463 try m.lock(); 464 catch (InterruptException) return; 465 try yield(); // rethrows any deferred exceptions 466 catch (InterruptException) { 467 m.unlock(); 468 return; 469 } 470 assert(false, "Supposed to have thrown an InterruptException."); 471 } else assertNotThrown(m.lock()); 472 assert(m.m_impl.m_locked); 473 sleep(20.msecs); 474 m.unlock(); 475 assert(!m.m_impl.m_locked); 476 }); 477 } 478 479 // basic lock test 480 m.performLocked!({ 481 assert(m.m_impl.m_locked); 482 }); 483 assert(!m.m_impl.m_locked); 484 485 // basic contention test 486 runContendedTasks(false, false); 487 runTask({ 488 assert(t1.running && t2.running); 489 assert(m.m_impl.m_locked); 490 t1.join(); 491 assert(!t1.running && t2.running); 492 yield(); // give t2 a chance to take the lock 493 assert(m.m_impl.m_locked); 494 t2.join(); 495 assert(!t2.running); 496 assert(!m.m_impl.m_locked); 497 exitEventLoop(); 498 }); 499 runEventLoop(); 500 assert(!m.m_impl.m_locked); 501 502 // interruption test #1 503 runContendedTasks(true, false); 504 runTask({ 505 assert(t1.running && t2.running); 506 assert(m.m_impl.m_locked); 507 t1.interrupt(); 508 t1.join(); 509 assert(!t1.running && t2.running); 510 yield(); // give t2 a chance to take the lock 511 assert(m.m_impl.m_locked); 512 t2.join(); 513 assert(!t2.running); 514 assert(!m.m_impl.m_locked); 515 exitEventLoop(); 516 }); 517 runEventLoop(); 518 assert(!m.m_impl.m_locked); 519 520 // interruption test #2 521 runContendedTasks(false, true); 522 runTask({ 523 assert(t1.running && t2.running); 524 assert(m.m_impl.m_locked); 525 t2.interrupt(); 526 t2.join(); 527 assert(!t2.running); 528 static if (is(M == InterruptibleTaskMutex) || is (M == InterruptibleRecursiveTaskMutex)) 529 assert(t1.running && m.m_impl.m_locked); 530 t1.join(); 531 assert(!t1.running); 532 assert(!m.m_impl.m_locked); 533 exitEventLoop(); 534 }); 535 runEventLoop(); 536 assert(!m.m_impl.m_locked); 537 } 538 539 540 /** 541 Event loop based condition variable or "event" implementation. 542 543 This class can be used in exchange for a $(D core.sync.condition.Condition) 544 to avoid blocking the event loop when waiting. 545 546 Notice: 547 Because this class is annotated nothrow, it cannot be interrupted 548 using $(D vibe.core.task.Task.interrupt()). The corresponding 549 $(D InterruptException) will be deferred until the next blocking 550 operation yields to the event loop. 551 552 Use $(D InterruptibleTaskCondition) as an alternative that can be 553 interrupted. 554 555 Note that it is generally not safe to use a `TaskCondition` together with an 556 interruptible mutex type. 557 558 See_Also: InterruptibleTaskCondition 559 */ 560 class TaskCondition : core.sync.condition.Condition { 561 @safe: 562 563 private TaskConditionImpl!(false, Mutex) m_impl; 564 565 this(core.sync.mutex.Mutex mtx) nothrow { m_impl.setup(mtx); super(mtx); } 566 567 override @property Mutex mutex() nothrow { return m_impl.mutex; } 568 override void wait() { m_impl.wait(); } 569 override bool wait(Duration timeout) { return m_impl.wait(timeout); } 570 override void notify() { m_impl.notify(); } 571 override void notifyAll() { m_impl.notifyAll(); } 572 } 573 574 /** This example shows the typical usage pattern using a `while` loop to make 575 sure that the final condition is reached. 576 */ 577 unittest { 578 import vibe.core.core; 579 580 __gshared Mutex mutex; 581 __gshared TaskCondition condition; 582 __gshared int workers_still_running = 0; 583 584 // setup the task condition 585 mutex = new Mutex; 586 condition = new TaskCondition(mutex); 587 588 // start up the workers and count how many are running 589 foreach (i; 0 .. 4) { 590 workers_still_running++; 591 runWorkerTask({ 592 // simulate some work 593 sleep(100.msecs); 594 595 // notify the waiter that we're finished 596 synchronized (mutex) 597 workers_still_running--; 598 condition.notify(); 599 }); 600 } 601 602 // wait until all tasks have decremented the counter back to zero 603 synchronized (mutex) { 604 while (workers_still_running > 0) 605 condition.wait(); 606 } 607 } 608 609 610 /** 611 Alternative to `TaskCondition` that supports interruption. 612 613 This class supports the use of `vibe.core.task.Task.interrupt()` while 614 waiting in the `wait()` method. 615 616 See `TaskCondition` for an example. 617 618 Notice: 619 Note that it is generally not safe to use an 620 `InterruptibleTaskCondition` together with an interruptible mutex type. 621 622 See_Also: `TaskCondition` 623 */ 624 final class InterruptibleTaskCondition { 625 @safe: 626 627 private TaskConditionImpl!(true, Lockable) m_impl; 628 629 this(core.sync.mutex.Mutex mtx) nothrow { m_impl.setup(mtx); } 630 this(Lockable mtx) nothrow { m_impl.setup(mtx); } 631 632 @property Lockable mutex() { return m_impl.mutex; } 633 void wait() { m_impl.wait(); } 634 bool wait(Duration timeout) { return m_impl.wait(timeout); } 635 void notify() { m_impl.notify(); } 636 void notifyAll() { m_impl.notifyAll(); } 637 } 638 639 640 /** Creates a new signal that can be shared between fibers. 641 */ 642 ManualEvent createManualEvent() 643 @safe nothrow { 644 return getEventDriver().createManualEvent(); 645 } 646 647 /** Creates a new signal that can be shared between fibers. 648 */ 649 LocalManualEvent createLocalManualEvent() 650 @safe nothrow { 651 return getEventDriver().createManualEvent(); 652 } 653 654 alias LocalManualEvent = ManualEvent; 655 656 /** A manually triggered cross-task event. 657 658 Note: the ownership can be shared between multiple fibers and threads. 659 */ 660 interface ManualEvent { 661 @safe: 662 663 /// A counter that is increased with every emit() call 664 @property int emitCount() const nothrow; 665 666 /// Emits the signal, waking up all owners of the signal. 667 void emit() nothrow; 668 669 /** Acquires ownership and waits until the signal is emitted. 670 671 Throws: 672 May throw an $(D InterruptException) if the task gets interrupted 673 using $(D Task.interrupt()). 674 */ 675 void wait(); 676 677 /** Acquires ownership and waits until the emit count differs from the given one. 678 679 Throws: 680 May throw an $(D InterruptException) if the task gets interrupted 681 using $(D Task.interrupt()). 682 */ 683 int wait(int reference_emit_count); 684 685 /** Acquires ownership and waits until the emit count differs from the given one or until a timeout is reached. 686 687 Throws: 688 May throw an $(D InterruptException) if the task gets interrupted 689 using $(D Task.interrupt()). 690 */ 691 int wait(Duration timeout, int reference_emit_count); 692 693 /** Same as $(D wait), but defers throwing any $(D InterruptException). 694 695 This method is annotated $(D nothrow) at the expense that it cannot be 696 interrupted. 697 */ 698 int waitUninterruptible(int reference_emit_count) nothrow; 699 700 /// ditto 701 int waitUninterruptible(Duration timeout, int reference_emit_count) nothrow; 702 } 703 704 705 private struct TaskMutexImpl(bool INTERRUPTIBLE) { 706 import std.stdio; 707 private { 708 shared(bool) m_locked = false; 709 shared(uint) m_waiters = 0; 710 ManualEvent m_signal; 711 debug Task m_owner; 712 } 713 714 void setup() 715 nothrow { 716 m_signal = createManualEvent(); 717 } 718 719 720 @trusted bool tryLock() 721 { 722 if (cas(&m_locked, false, true)) { 723 debug m_owner = Task.getThis(); 724 version(MutexPrint) writefln("mutex %s lock %s", cast(void*)this, atomicLoad(m_waiters)); 725 return true; 726 } 727 return false; 728 } 729 730 @trusted void lock() 731 { 732 if (tryLock()) return; 733 debug assert(m_owner == Task() || m_owner != Task.getThis(), "Recursive mutex lock."); 734 atomicOp!"+="(m_waiters, 1); 735 version(MutexPrint) writefln("mutex %s wait %s", cast(void*)this, atomicLoad(m_waiters)); 736 scope(exit) atomicOp!"-="(m_waiters, 1); 737 auto ecnt = m_signal.emitCount(); 738 while (!tryLock()) { 739 static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt); 740 else ecnt = m_signal.waitUninterruptible(ecnt); 741 } 742 } 743 744 @trusted void unlock() 745 { 746 assert(m_locked); 747 debug { 748 assert(m_owner == Task.getThis()); 749 m_owner = Task(); 750 } 751 atomicStore!(MemoryOrder.rel)(m_locked, false); 752 version(MutexPrint) writefln("mutex %s unlock %s", cast(void*)this, atomicLoad(m_waiters)); 753 if (atomicLoad(m_waiters) > 0) 754 m_signal.emit(); 755 } 756 } 757 758 private struct RecursiveTaskMutexImpl(bool INTERRUPTIBLE) { 759 import std.stdio; 760 private { 761 core.sync.mutex.Mutex m_mutex; 762 Task m_owner; 763 size_t m_recCount = 0; 764 shared(uint) m_waiters = 0; 765 ManualEvent m_signal; 766 @property bool m_locked() const { return m_recCount > 0; } 767 } 768 769 void setup() 770 { 771 m_signal = createManualEvent(); 772 m_mutex = new core.sync.mutex.Mutex; 773 } 774 775 @trusted bool tryLock() 776 { 777 auto self = Task.getThis(); 778 return m_mutex.performLocked!({ 779 if (!m_owner) { 780 assert(m_recCount == 0); 781 m_recCount = 1; 782 m_owner = self; 783 return true; 784 } else if (m_owner == self) { 785 m_recCount++; 786 return true; 787 } 788 return false; 789 }); 790 } 791 792 @trusted void lock() 793 { 794 if (tryLock()) return; 795 atomicOp!"+="(m_waiters, 1); 796 version(MutexPrint) writefln("mutex %s wait %s", cast(void*)this, atomicLoad(m_waiters)); 797 scope(exit) atomicOp!"-="(m_waiters, 1); 798 auto ecnt = m_signal.emitCount(); 799 while (!tryLock()) { 800 static if (INTERRUPTIBLE) ecnt = m_signal.wait(ecnt); 801 else ecnt = m_signal.waitUninterruptible(ecnt); 802 } 803 } 804 805 @trusted void unlock() 806 { 807 auto self = Task.getThis(); 808 m_mutex.performLocked!({ 809 assert(m_owner == self); 810 assert(m_recCount > 0); 811 m_recCount--; 812 if (m_recCount == 0) { 813 m_owner = Task.init; 814 } 815 }); 816 version(MutexPrint) writefln("mutex %s unlock %s", cast(void*)this, atomicLoad(m_waiters)); 817 if (atomicLoad(m_waiters) > 0) 818 m_signal.emit(); 819 } 820 } 821 822 private struct TaskConditionImpl(bool INTERRUPTIBLE, LOCKABLE) { 823 private { 824 LOCKABLE m_mutex; 825 826 ManualEvent m_signal; 827 } 828 829 static if (is(LOCKABLE == Lockable)) { 830 final class MutexWrapper : Lockable { 831 private core.sync.mutex.Mutex m_mutex; 832 this(core.sync.mutex.Mutex mtx) { m_mutex = mtx; } 833 @trusted void lock() { m_mutex.lock(); } 834 @trusted void unlock() { m_mutex.unlock(); } 835 @trusted bool tryLock() { return m_mutex.tryLock(); } 836 } 837 838 void setup(core.sync.mutex.Mutex mtx) 839 { 840 setup(new MutexWrapper(mtx)); 841 } 842 } 843 844 void setup(LOCKABLE mtx) 845 { 846 m_mutex = mtx; 847 m_signal = createManualEvent(); 848 } 849 850 @property LOCKABLE mutex() { return m_mutex; } 851 852 @trusted void wait() 853 { 854 if (auto tm = cast(TaskMutex)m_mutex) { 855 assert(tm.m_impl.m_locked); 856 debug assert(tm.m_impl.m_owner == Task.getThis()); 857 } 858 859 auto refcount = m_signal.emitCount; 860 m_mutex.unlock(); 861 scope(exit) m_mutex.lock(); 862 static if (INTERRUPTIBLE) m_signal.wait(refcount); 863 else m_signal.waitUninterruptible(refcount); 864 } 865 866 @trusted bool wait(Duration timeout) 867 { 868 assert(!timeout.isNegative()); 869 if (auto tm = cast(TaskMutex)m_mutex) { 870 assert(tm.m_impl.m_locked); 871 debug assert(tm.m_impl.m_owner == Task.getThis()); 872 } 873 874 auto refcount = m_signal.emitCount; 875 m_mutex.unlock(); 876 scope(exit) m_mutex.lock(); 877 878 static if (INTERRUPTIBLE) return m_signal.wait(timeout, refcount) != refcount; 879 else return m_signal.waitUninterruptible(timeout, refcount) != refcount; 880 } 881 882 @trusted void notify() 883 { 884 m_signal.emit(); 885 } 886 887 @trusted void notifyAll() 888 { 889 m_signal.emit(); 890 } 891 } 892 893 /** Contains the shared state of a $(D TaskReadWriteMutex). 894 * 895 * Since a $(D TaskReadWriteMutex) consists of two actual Mutex 896 * objects that rely on common memory, this class implements 897 * the actual functionality of their method calls. 898 * 899 * The method implementations are based on two static parameters 900 * ($(D INTERRUPTIBLE) and $(D INTENT)), which are configured through 901 * template arguments: 902 * 903 * - $(D INTERRUPTIBLE) determines whether the mutex implementation 904 * are interruptible by vibe.d's $(D vibe.core.task.Task.interrupt()) 905 * method or not. 906 * 907 * - $(D INTENT) describes the intent, with which a locking operation is 908 * performed (i.e. $(D READ_ONLY) or $(D READ_WRITE)). RO locking allows for 909 * multiple Tasks holding the mutex, whereas RW locking will cause 910 * a "bottleneck" so that only one Task can write to the protected 911 * data at once. 912 */ 913 private struct ReadWriteMutexState(bool INTERRUPTIBLE) 914 { 915 @safe: 916 /** The policy with which the mutex should operate. 917 * 918 * The policy determines how the acquisition of the locks is 919 * performed and can be used to tune the mutex according to the 920 * underlying algorithm in which it is used. 921 * 922 * According to the provided policy, the mutex will either favor 923 * reading or writing tasks and could potentially starve the 924 * respective opposite. 925 * 926 * cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy) 927 */ 928 enum Policy : int 929 { 930 /** Readers are prioritized, writers may be starved as a result. */ 931 PREFER_READERS = 0, 932 /** Writers are prioritized, readers may be starved as a result. */ 933 PREFER_WRITERS 934 } 935 936 /** The intent with which a locking operation is performed. 937 * 938 * Since both locks share the same underlying algorithms, the actual 939 * intent with which a lock operation is performed (i.e read/write) 940 * are passed as a template parameter to each method. 941 */ 942 enum LockingIntent : bool 943 { 944 /** Perform a read lock/unlock operation. Multiple reading locks can be 945 * active at a time. */ 946 READ_ONLY = 0, 947 /** Perform a write lock/unlock operation. Only a single writer can 948 * hold a lock at any given time. */ 949 READ_WRITE = 1 950 } 951 952 private { 953 //Queue counters 954 /** The number of reading tasks waiting for the lock to become available. */ 955 shared(uint) m_waitingForReadLock = 0; 956 /** The number of writing tasks waiting for the lock to become available. */ 957 shared(uint) m_waitingForWriteLock = 0; 958 959 //Lock counters 960 /** The number of reading tasks that currently hold the lock. */ 961 uint m_activeReadLocks = 0; 962 /** The number of writing tasks that currently hold the lock (binary). */ 963 ubyte m_activeWriteLocks = 0; 964 965 /** The policy determining the lock's behavior. */ 966 Policy m_policy; 967 968 //Queue Events 969 /** The event used to wake reading tasks waiting for the lock while it is blocked. */ 970 ManualEvent m_readyForReadLock; 971 /** The event used to wake writing tasks waiting for the lock while it is blocked. */ 972 ManualEvent m_readyForWriteLock; 973 974 /** The underlying mutex that gates the access to the shared state. */ 975 Mutex m_counterMutex; 976 } 977 978 this(Policy policy) 979 { 980 m_policy = policy; 981 m_counterMutex = new Mutex(); 982 m_readyForReadLock = createManualEvent(); 983 m_readyForWriteLock = createManualEvent(); 984 } 985 986 @disable this(this); 987 988 /** The policy with which the lock has been created. */ 989 @property policy() const { return m_policy; } 990 991 version(RWMutexPrint) 992 { 993 /** Print out debug information during lock operations. */ 994 void printInfo(string OP, LockingIntent INTENT)() nothrow 995 { 996 import std.string; 997 try 998 { 999 import std.stdio; 1000 writefln("RWMutex: %s (%s), active: RO: %d, RW: %d; waiting: RO: %d, RW: %d", 1001 OP.leftJustify(10,' '), 1002 INTENT == LockingIntent.READ_ONLY ? "RO" : "RW", 1003 m_activeReadLocks, m_activeWriteLocks, 1004 m_waitingForReadLock, m_waitingForWriteLock 1005 ); 1006 } 1007 catch (Exception t){} 1008 } 1009 } 1010 1011 /** An internal shortcut method to determine the queue event for a given intent. */ 1012 @property ref auto queueEvent(LockingIntent INTENT)() 1013 { 1014 static if (INTENT == LockingIntent.READ_ONLY) 1015 return m_readyForReadLock; 1016 else 1017 return m_readyForWriteLock; 1018 } 1019 1020 /** An internal shortcut method to determine the queue counter for a given intent. */ 1021 @property ref auto queueCounter(LockingIntent INTENT)() 1022 { 1023 static if (INTENT == LockingIntent.READ_ONLY) 1024 return m_waitingForReadLock; 1025 else 1026 return m_waitingForWriteLock; 1027 } 1028 1029 /** An internal shortcut method to determine the current emitCount of the queue counter for a given intent. */ 1030 int emitCount(LockingIntent INTENT)() 1031 { 1032 return queueEvent!INTENT.emitCount(); 1033 } 1034 1035 /** An internal shortcut method to determine the active counter for a given intent. */ 1036 @property ref auto activeCounter(LockingIntent INTENT)() 1037 { 1038 static if (INTENT == LockingIntent.READ_ONLY) 1039 return m_activeReadLocks; 1040 else 1041 return m_activeWriteLocks; 1042 } 1043 1044 /** An internal shortcut method to wait for the queue event for a given intent. 1045 * 1046 * This method is used during the `lock()` operation, after a 1047 * `tryLock()` operation has been unsuccessfully finished. 1048 * The active fiber will yield and be suspended until the queue event 1049 * for the given intent will be fired. 1050 */ 1051 int wait(LockingIntent INTENT)(int count) 1052 { 1053 static if (INTERRUPTIBLE) 1054 return queueEvent!INTENT.wait(count); 1055 else 1056 return queueEvent!INTENT.waitUninterruptible(count); 1057 } 1058 1059 /** An internal shortcut method to notify tasks waiting for the lock to become available again. 1060 * 1061 * This method is called whenever the number of owners of the mutex hits 1062 * zero; this is basically the counterpart to `wait()`. 1063 * It wakes any Task currently waiting for the mutex to be released. 1064 */ 1065 @trusted void notify(LockingIntent INTENT)() 1066 { 1067 static if (INTENT == LockingIntent.READ_ONLY) 1068 { //If the last reader unlocks the mutex, notify all waiting writers 1069 if (atomicLoad(m_waitingForWriteLock) > 0) 1070 m_readyForWriteLock.emit(); 1071 } 1072 else 1073 { //If a writer unlocks the mutex, notify both readers and writers 1074 if (atomicLoad(m_waitingForReadLock) > 0) 1075 m_readyForReadLock.emit(); 1076 1077 if (atomicLoad(m_waitingForWriteLock) > 0) 1078 m_readyForWriteLock.emit(); 1079 } 1080 } 1081 1082 /** An internal method that performs the acquisition attempt in different variations. 1083 * 1084 * Since both locks rely on a common TaskMutex object which gates the access 1085 * to their common data acquisition attempts for this lock are more complex 1086 * than for simple mutex variants. This method will thus be performing the 1087 * `tryLock()` operation in two variations, depending on the callee: 1088 * 1089 * If called from the outside ($(D WAIT_FOR_BLOCKING_MUTEX) = false), the method 1090 * will instantly fail if the underlying mutex is locked (i.e. during another 1091 * `tryLock()` or `unlock()` operation), in order to guarantee the fastest 1092 * possible locking attempt. 1093 * 1094 * If used internally by the `lock()` method ($(D WAIT_FOR_BLOCKING_MUTEX) = true), 1095 * the operation will wait for the mutex to be available before deciding if 1096 * the lock can be acquired, since the attempt would anyway be repeated until 1097 * it succeeds. This will prevent frequent retries under heavy loads and thus 1098 * should ensure better performance. 1099 */ 1100 @trusted bool tryLock(LockingIntent INTENT, bool WAIT_FOR_BLOCKING_MUTEX)() 1101 { 1102 //Log a debug statement for the attempt 1103 version(RWMutexPrint) 1104 printInfo!("tryLock",INTENT)(); 1105 1106 //Try to acquire the lock 1107 static if (!WAIT_FOR_BLOCKING_MUTEX) 1108 { 1109 if (!m_counterMutex.tryLock()) 1110 return false; 1111 } 1112 else 1113 m_counterMutex.lock(); 1114 1115 scope(exit) 1116 m_counterMutex.unlock(); 1117 1118 //Log a debug statement for the attempt 1119 version(RWMutexPrint) 1120 printInfo!("checkCtrs",INTENT)(); 1121 1122 //Check if there's already an active writer 1123 if (m_activeWriteLocks > 0) 1124 return false; 1125 1126 //If writers are preferred over readers, check whether there 1127 //currently is a writer in the waiting queue and abort if 1128 //that's the case. 1129 static if (INTENT == LockingIntent.READ_ONLY) 1130 if (m_policy.PREFER_WRITERS && m_waitingForWriteLock > 0) 1131 return false; 1132 1133 //If we are locking the mutex for writing, make sure that 1134 //there's no reader active. 1135 static if (INTENT == LockingIntent.READ_WRITE) 1136 if (m_activeReadLocks > 0) 1137 return false; 1138 1139 //We can successfully acquire the lock! 1140 //Log a debug statement for the success. 1141 version(RWMutexPrint) 1142 printInfo!("lock",INTENT)(); 1143 1144 //Increase the according counter 1145 //(number of active readers/writers) 1146 //and return a success code. 1147 activeCounter!INTENT += 1; 1148 return true; 1149 } 1150 1151 /** Attempt to acquire the lock for a given intent. 1152 * 1153 * Returns: 1154 * `true`, if the lock was successfully acquired; 1155 * `false` otherwise. 1156 */ 1157 @trusted bool tryLock(LockingIntent INTENT)() 1158 { 1159 //Try to lock this mutex without waiting for the underlying 1160 //TaskMutex - fail if it is already blocked. 1161 return tryLock!(INTENT,false)(); 1162 } 1163 1164 /** Acquire the lock for the given intent; yield and suspend until the lock has been acquired. */ 1165 @trusted void lock(LockingIntent INTENT)() 1166 { 1167 //Prepare a waiting action before the first 1168 //`tryLock()` call in order to avoid a race 1169 //condition that could lead to the queue notification 1170 //not being fired. 1171 auto count = emitCount!INTENT; 1172 atomicOp!"+="(queueCounter!INTENT,1); 1173 scope(exit) 1174 atomicOp!"-="(queueCounter!INTENT,1); 1175 1176 //Try to lock the mutex 1177 auto locked = tryLock!(INTENT,true)(); 1178 if (locked) 1179 return; 1180 1181 //Retry until we successfully acquired the lock 1182 while(!locked) 1183 { 1184 version(RWMutexPrint) 1185 printInfo!("wait",INTENT)(); 1186 1187 count = wait!INTENT(count); 1188 locked = tryLock!(INTENT,true)(); 1189 } 1190 } 1191 1192 /** Unlock the mutex after a successful acquisition. */ 1193 @trusted void unlock(LockingIntent INTENT)() 1194 { 1195 version(RWMutexPrint) 1196 printInfo!("unlock",INTENT)(); 1197 1198 debug assert(activeCounter!INTENT > 0); 1199 1200 synchronized(m_counterMutex) 1201 { 1202 //Decrement the counter of active lock holders. 1203 //If the counter hits zero, notify waiting Tasks 1204 activeCounter!INTENT -= 1; 1205 if (activeCounter!INTENT == 0) 1206 { 1207 version(RWMutexPrint) 1208 printInfo!("notify",INTENT)(); 1209 1210 notify!INTENT(); 1211 } 1212 } 1213 } 1214 } 1215 1216 /** A ReadWriteMutex implementation for fibers. 1217 * 1218 * This mutex can be used in exchange for a $(D core.sync.mutex.ReadWriteMutex), 1219 * but does not block the event loop in contention situations. The `reader` and `writer` 1220 * members are used for locking. Locking the `reader` mutex allows access to multiple 1221 * readers at once, while the `writer` mutex only allows a single writer to lock it at 1222 * any given time. Locks on `reader` and `writer` are mutually exclusive (i.e. whenever a 1223 * writer is active, no readers can be active at the same time, and vice versa). 1224 * 1225 * Notice: 1226 * Mutexes implemented by this class cannot be interrupted 1227 * using $(D vibe.core.task.Task.interrupt()). The corresponding 1228 * InterruptException will be deferred until the next blocking 1229 * operation yields the event loop. 1230 * 1231 * Use $(D InterruptibleTaskReadWriteMutex) as an alternative that can be 1232 * interrupted. 1233 * 1234 * cf. $(D core.sync.mutex.ReadWriteMutex) 1235 */ 1236 class TaskReadWriteMutex 1237 { 1238 @safe: 1239 1240 private { 1241 alias State = ReadWriteMutexState!false; 1242 alias LockingIntent = State.LockingIntent; 1243 alias READ_ONLY = LockingIntent.READ_ONLY; 1244 alias READ_WRITE = LockingIntent.READ_WRITE; 1245 1246 /** The shared state used by the reader and writer mutexes. */ 1247 State m_state; 1248 } 1249 1250 /** The policy with which the mutex should operate. 1251 * 1252 * The policy determines how the acquisition of the locks is 1253 * performed and can be used to tune the mutex according to the 1254 * underlying algorithm in which it is used. 1255 * 1256 * According to the provided policy, the mutex will either favor 1257 * reading or writing tasks and could potentially starve the 1258 * respective opposite. 1259 * 1260 * cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy) 1261 */ 1262 alias Policy = State.Policy; 1263 1264 /** A common baseclass for both of the provided mutexes. 1265 * 1266 * The intent for the according mutex is specified through the 1267 * $(D INTENT) template argument, which determines if a mutex is 1268 * used for read or write locking. 1269 */ 1270 final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable 1271 { 1272 /** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */ 1273 override bool tryLock() { return m_state.tryLock!INTENT(); } 1274 /** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */ 1275 override void lock() { m_state.lock!INTENT(); } 1276 /** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */ 1277 override void unlock() { m_state.unlock!INTENT(); } 1278 } 1279 alias Reader = Mutex!READ_ONLY; 1280 alias Writer = Mutex!READ_WRITE; 1281 1282 Reader reader; 1283 Writer writer; 1284 1285 this(Policy policy = Policy.PREFER_WRITERS) 1286 { 1287 m_state = State(policy); 1288 reader = new Reader(); 1289 writer = new Writer(); 1290 } 1291 1292 /** The policy with which the lock has been created. */ 1293 @property Policy policy() const { return m_state.policy; } 1294 } 1295 1296 /** Alternative to $(D TaskReadWriteMutex) that supports interruption. 1297 * 1298 * This class supports the use of $(D vibe.core.task.Task.interrupt()) while 1299 * waiting in the `lock()` method. 1300 * 1301 * cf. $(D core.sync.mutex.ReadWriteMutex) 1302 */ 1303 class InterruptibleTaskReadWriteMutex 1304 { 1305 @safe: 1306 1307 private { 1308 alias State = ReadWriteMutexState!true; 1309 alias LockingIntent = State.LockingIntent; 1310 alias READ_ONLY = LockingIntent.READ_ONLY; 1311 alias READ_WRITE = LockingIntent.READ_WRITE; 1312 1313 /** The shared state used by the reader and writer mutexes. */ 1314 State m_state; 1315 } 1316 1317 /** The policy with which the mutex should operate. 1318 * 1319 * The policy determines how the acquisition of the locks is 1320 * performed and can be used to tune the mutex according to the 1321 * underlying algorithm in which it is used. 1322 * 1323 * According to the provided policy, the mutex will either favor 1324 * reading or writing tasks and could potentially starve the 1325 * respective opposite. 1326 * 1327 * cf. $(D core.sync.rwmutex.ReadWriteMutex.Policy) 1328 */ 1329 alias Policy = State.Policy; 1330 1331 /** A common baseclass for both of the provided mutexes. 1332 * 1333 * The intent for the according mutex is specified through the 1334 * $(D INTENT) template argument, which determines if a mutex is 1335 * used for read or write locking. 1336 * 1337 */ 1338 final class Mutex(LockingIntent INTENT): core.sync.mutex.Mutex, Lockable 1339 { 1340 /** Try to lock the mutex. cf. $(D core.sync.mutex.Mutex) */ 1341 override bool tryLock() { return m_state.tryLock!INTENT(); } 1342 /** Lock the mutex. cf. $(D core.sync.mutex.Mutex) */ 1343 override void lock() { m_state.lock!INTENT(); } 1344 /** Unlock the mutex. cf. $(D core.sync.mutex.Mutex) */ 1345 override void unlock() { m_state.unlock!INTENT(); } 1346 } 1347 alias Reader = Mutex!READ_ONLY; 1348 alias Writer = Mutex!READ_WRITE; 1349 1350 Reader reader; 1351 Writer writer; 1352 1353 this(Policy policy = Policy.PREFER_WRITERS) 1354 { 1355 m_state = State(policy); 1356 reader = new Reader(); 1357 writer = new Writer(); 1358 } 1359 1360 /** The policy with which the lock has been created. */ 1361 @property Policy policy() const { return m_state.policy; } 1362 }