1 /** 2 Functions and structures for dealing with threads and concurrent access. 3 4 This module is modeled after std.concurrency, but provides a fiber-aware alternative 5 to it. All blocking operations will yield the calling fiber instead of blocking it. 6 7 Copyright: © 2013-2016 RejectedSoftware e.K. 8 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 9 Authors: Sönke Ludwig 10 */ 11 module vibe.core.concurrency; 12 13 import core.time; 14 import std.traits; 15 import std.typecons; 16 import std.typetuple; 17 import std.variant; 18 import std.string; 19 import vibe.core.task; 20 import vibe.internal.allocator; 21 import vibe.internal.meta.traits : StripHeadConst; 22 23 public import std.concurrency; 24 25 private extern (C) pure nothrow void _d_monitorenter(Object h); 26 private extern (C) pure nothrow void _d_monitorexit(Object h); 27 28 /** 29 Locks the given shared object and returns a ScopedLock for accessing any unshared members. 30 31 Using this function will ensure that there are no data races. For this reason, the class 32 type T is required to contain no unshared or unisolated aliasing. 33 34 See_Also: core.concurrency.isWeaklyIsolated 35 */ 36 ScopedLock!T lock(T : const(Object))(shared(T) object) 37 pure nothrow @safe { 38 return ScopedLock!T(object); 39 } 40 /// ditto 41 void lock(T : const(Object))(shared(T) object, scope void delegate(scope T) accessor) 42 nothrow { 43 auto l = lock(object); 44 accessor(l.unsafeGet()); 45 } 46 47 /// 48 unittest { 49 import vibe.core.concurrency; 50 51 static class Item { 52 private double m_value; 53 54 this(double value) pure { m_value = value; } 55 56 @property double value() const pure { return m_value; } 57 } 58 59 static class Manager { 60 private { 61 string m_name; 62 Isolated!(Item) m_ownedItem; 63 Isolated!(shared(Item)[]) m_items; 64 } 65 66 pure this(string name) 67 { 68 m_name = name; 69 auto itm = makeIsolated!Item(3.5); 70 m_ownedItem = itm.move; 71 } 72 73 void addItem(shared(Item) item) pure { m_items ~= item; } 74 75 double getTotalValue() 76 const pure { 77 double sum = 0; 78 79 // lock() is required to access shared objects 80 foreach (itm; m_items.unsafeGet) { 81 auto l = itm.lock(); 82 sum += l.value; 83 } 84 85 // owned objects can be accessed without locking 86 sum += m_ownedItem.value; 87 88 return sum; 89 } 90 } 91 92 void test() 93 { 94 import std.stdio; 95 96 auto man = cast(shared)new Manager("My manager"); 97 { 98 auto l = man.lock(); 99 l.addItem(new shared(Item)(1.5)); 100 l.addItem(new shared(Item)(0.5)); 101 } 102 103 writefln("Total value: %s", man.lock().getTotalValue()); 104 } 105 } 106 107 108 /** 109 Proxy structure that keeps the monitor of the given object locked until it 110 goes out of scope. 111 112 Any unshared members of the object are safely accessible during this time. The usual 113 way to use it is by calling lock. 114 115 See_Also: lock 116 */ 117 struct ScopedLock(T) 118 { 119 static assert(is(T == class), "ScopedLock is only usable with classes."); 120 // static assert(isWeaklyIsolated!(FieldTypeTuple!T), T.stringof~" contains non-immutable, non-shared references. Accessing it in a multi-threaded environment is not safe."); 121 122 private Rebindable!T m_ref; 123 124 @disable this(this); 125 126 this(shared(T) obj) 127 pure nothrow @trusted 128 { 129 assert(obj !is null, "Attempting to lock null object."); 130 m_ref = cast(T)obj; 131 _d_monitorenter(getObject()); 132 assert(getObject().__monitor !is null); 133 } 134 135 ~this() 136 pure nothrow @trusted 137 { 138 assert(m_ref !is null); 139 assert(getObject().__monitor !is null); 140 _d_monitorexit(getObject()); 141 } 142 143 /** 144 Returns an unshared reference to the locked object. 145 146 Note that using this function breaks type safety. Be sure to not escape the reference beyond 147 the life time of the lock. 148 */ 149 @property inout(T) unsafeGet() inout nothrow { return m_ref; } 150 151 inout(T) opDot() inout nothrow { return m_ref; } 152 //pragma(msg, "In ScopedLock!("~T.stringof~")"); 153 //pragma(msg, isolatedRefMethods!T()); 154 // mixin(isolatedAggregateMethodsString!T()); 155 156 private Object getObject() 157 pure nothrow { 158 static if( is(Rebindable!T == struct) ) return cast(Unqual!T)m_ref.get(); 159 else return cast(Unqual!T)m_ref; 160 } 161 } 162 163 164 /** 165 Creates a new isolated object. 166 167 Isolated objects contain no mutable aliasing outside of their own reference tree. They can thus 168 be safely converted to immutable and they can be safely passed between threads. 169 170 The function returns an instance of Isolated that will allow proxied access to the members of 171 the object, as well as providing means to convert the object to immutable or to an ordinary 172 mutable object. 173 */ 174 pure Isolated!T makeIsolated(T, ARGS...)(ARGS args) 175 { 176 static if (is(T == class)) return Isolated!T(new T(args)); 177 else static if (is(T == struct)) return T(args); 178 else static if (isPointer!T && is(PointerTarget!T == struct)) { 179 alias TB = PointerTarget!T; 180 return Isolated!T(new TB(args)); 181 } else static assert(false, "makeIsolated works only for class and (pointer to) struct types."); 182 } 183 184 /// 185 unittest { 186 import vibe.core.concurrency; 187 import vibe.core.core; 188 189 static class Item { 190 double value; 191 string name; 192 } 193 194 static void modifyItem(Isolated!Item itm) 195 { 196 itm.value = 1.3; 197 // TODO: send back to initiating thread 198 } 199 200 void test() 201 { 202 immutable(Item)[] items; 203 204 // create immutable item procedurally 205 auto itm = makeIsolated!Item(); 206 itm.value = 2.4; 207 itm.name = "Test"; 208 items ~= itm.freeze(); 209 210 // send isolated item to other thread 211 auto itm2 = makeIsolated!Item(); 212 runWorkerTask(&modifyItem, itm2.move()); 213 // ... 214 } 215 } 216 217 unittest { 218 static class C { this(int x) pure {} } 219 static struct S { this(int x) pure {} } 220 221 alias CI = typeof(makeIsolated!C(0)); 222 alias SI = typeof(makeIsolated!S(0)); 223 alias SPI = typeof(makeIsolated!(S*)(0)); 224 static assert(isStronglyIsolated!CI); 225 static assert(is(CI == IsolatedRef!C)); 226 static assert(isStronglyIsolated!SI); 227 static assert(is(SI == S)); 228 static assert(isStronglyIsolated!SPI); 229 static assert(is(SPI == IsolatedRef!S)); 230 } 231 232 233 /** 234 Creates a new isolated array. 235 */ 236 pure Isolated!(T[]) makeIsolatedArray(T)(size_t size) 237 { 238 Isolated!(T[]) ret; 239 ret.length = size; 240 return ret.move(); 241 } 242 243 /// 244 unittest { 245 import vibe.core.concurrency; 246 import vibe.core.core; 247 248 static void compute(Task tid, Isolated!(double[]) array, size_t start_index) 249 { 250 foreach( i; 0 .. array.length ) 251 array[i] = (start_index + i) * 0.5; 252 253 sendCompat(tid, array.move()); 254 } 255 256 void test() 257 { 258 import std.stdio; 259 260 // compute contents of an array using multiple threads 261 auto arr = makeIsolatedArray!double(256); 262 263 // partition the array (no copying takes place) 264 size_t[] indices = [64, 128, 192, 256]; 265 Isolated!(double[])[] subarrays = arr.splice(indices); 266 267 // start processing in threads 268 Task[] tids; 269 foreach (i, idx; indices) 270 tids ~= runWorkerTaskH(&compute, Task.getThis(), subarrays[i].move(), idx); 271 272 // collect results 273 auto resultarrays = new Isolated!(double[])[tids.length]; 274 foreach( i, tid; tids ) 275 resultarrays[i] = receiveOnlyCompat!(Isolated!(double[])).move(); 276 277 // BUG: the arrays must be sorted here, but since there is no way to tell 278 // from where something was received, this is difficult here. 279 280 // merge results (no copying takes place again) 281 foreach( i; 1 .. resultarrays.length ) 282 resultarrays[0].merge(resultarrays[i]); 283 284 // convert the final result to immutable 285 auto result = resultarrays[0].freeze(); 286 287 writefln("Result: %s", result); 288 } 289 } 290 291 292 /** 293 Unsafe facility to assume that an existing reference is unique. 294 */ 295 Isolated!T assumeIsolated(T)(T object) 296 { 297 return Isolated!T(object); 298 } 299 300 /** 301 Encapsulates the given type in a way that guarantees memory isolation. 302 303 See_Also: makeIsolated, makeIsolatedArray 304 */ 305 template Isolated(T) 306 { 307 static if( isWeaklyIsolated!T ){ 308 alias Isolated = T; 309 } else static if( is(T == class) ){ 310 alias Isolated = IsolatedRef!T; 311 } else static if( isPointer!T ){ 312 alias Isolated = IsolatedRef!(PointerTarget!T); 313 } else static if( isDynamicArray!T ){ 314 alias Isolated = IsolatedArray!(typeof(T.init[0])); 315 } else static if( isAssociativeArray!T ){ 316 alias Isolated = IsolatedAssociativeArray!(KeyType!T, ValueType!T); 317 } else static assert(false, T.stringof~": Unsupported type for Isolated!T - must be class, pointer, array or associative array."); 318 } 319 320 321 // unit tests fails with DMD 2.064 due to some cyclic import regression 322 unittest 323 { 324 static class CE {} 325 static struct SE {} 326 327 static assert(is(Isolated!CE == IsolatedRef!CE)); 328 static assert(is(Isolated!(SE*) == IsolatedRef!SE)); 329 static assert(is(Isolated!(SE[]) == IsolatedArray!SE)); 330 version(EnablePhobosFails){ 331 // AAs don't work because they are impure 332 static assert(is(Isolated!(SE[string]) == IsolatedAssociativeArray!(string, SE))); 333 } 334 } 335 336 337 /// private 338 private struct IsolatedRef(T) 339 { 340 pure: 341 static assert(isWeaklyIsolated!(FieldTypeTuple!T), T.stringof ~ " contains non-immutable/non-shared references. Isolation cannot be guaranteed."); 342 enum __isWeakIsolatedType = true; 343 static if( isStronglyIsolated!(FieldTypeTuple!T) ) 344 enum __isIsolatedType = true; 345 346 alias BaseType = T; 347 348 static if( is(T == class) ){ 349 alias Tref = T; 350 alias Tiref = immutable(T); 351 } else { 352 alias Tref = T*; 353 alias Tiref = immutable(T)*; 354 } 355 356 private Tref m_ref; 357 358 //mixin isolatedAggregateMethods!T; 359 //pragma(msg, isolatedAggregateMethodsString!T()); 360 mixin(isolatedAggregateMethodsString!T()); 361 362 @disable this(this); 363 364 private this(Tref obj) 365 { 366 m_ref = obj; 367 } 368 369 this(ref IsolatedRef src) 370 { 371 m_ref = src.m_ref; 372 src.m_ref = null; 373 } 374 375 void opAssign(ref IsolatedRef src) 376 { 377 m_ref = src.m_ref; 378 src.m_ref = null; 379 } 380 381 /** 382 Returns the raw reference. 383 384 Note that using this function breaks type safety. Be sure to not escape the reference. 385 */ 386 inout(Tref) unsafeGet() inout { return m_ref; } 387 388 /** 389 Move the contained reference to a new IsolatedRef. 390 391 Since IsolatedRef is not copyable, using this function may be necessary when 392 passing a reference to a function or when returning it. The reference in 393 this instance will be set to null after the call returns. 394 */ 395 IsolatedRef move() { auto r = m_ref; m_ref = null; return IsolatedRef(r); } 396 /// ditto 397 void move(ref IsolatedRef target) { target.m_ref = m_ref; m_ref = null; } 398 399 /** 400 Convert the isolated reference to a normal mutable reference. 401 402 The reference in this instance will be set to null after the call returns. 403 */ 404 Tref extract() 405 { 406 auto ret = m_ref; 407 m_ref = null; 408 return ret; 409 } 410 411 /** 412 Converts the isolated reference to immutable. 413 414 The reference in this instance will be set to null after the call has returned. 415 Note that this method is only available for strongly isolated references, 416 which means references that do not contain shared aliasing. 417 */ 418 Tiref freeze()() 419 { 420 static assert(isStronglyIsolated!(FieldTypeTuple!T), "freeze() can only be called on strongly isolated values, but "~T.stringof~" contains shared references."); 421 auto ret = m_ref; 422 m_ref = null; 423 return cast(immutable)ret; 424 } 425 426 /** 427 Performs an up- or down-cast of the reference and moves it to a new IsolatedRef instance. 428 429 The reference in this instance will be set to null after the call has returned. 430 */ 431 U opCast(U)() 432 if (isInstanceOf!(IsolatedRef, U) && (is(U.BaseType : BaseType) || is(BaseType : U.BaseType))) 433 { 434 auto r = U(cast(U.BaseType)m_ref); 435 m_ref = null; 436 return r; 437 } 438 439 /** 440 Determines if the contained reference is non-null. 441 442 This method allows Isolated references to be used in boolean expressions without having to 443 extract the reference. 444 */ 445 U opCast(U)() const if(is(U == bool)) { return m_ref !is null; } 446 } 447 448 449 /// private 450 private struct IsolatedArray(T) 451 { 452 static assert(isWeaklyIsolated!T, T.stringof ~ " contains non-immutable references. Isolation cannot be guaranteed."); 453 enum __isWeakIsolatedType = true; 454 static if( isStronglyIsolated!T ) 455 enum __isIsolatedType = true; 456 457 alias BaseType = T[]; 458 459 private T[] m_array; 460 461 mixin isolatedArrayMethods!T; 462 463 @disable this(this); 464 465 /** 466 Returns the raw reference. 467 468 Note that using this function breaks type safety. Be sure to not escape the reference. 469 */ 470 inout(T[]) unsafeGet() inout { return m_array; } 471 472 IsolatedArray!T move() pure { auto r = m_array; m_array = null; return IsolatedArray(r); } 473 void move(ref IsolatedArray target) pure { target.m_array = m_array; m_array = null; } 474 475 T[] extract() 476 pure { 477 auto arr = m_array; 478 m_array = null; 479 return arr; 480 } 481 482 immutable(T)[] freeze()() pure 483 { 484 static assert(isStronglyIsolated!T, "Freeze can only be called on strongly isolated values, but "~T.stringof~" contains shared references."); 485 auto arr = m_array; 486 m_array = null; 487 return cast(immutable)arr; 488 } 489 490 491 /** 492 Splits the array into individual slices at the given incides. 493 494 The indices must be in ascending order. Any items that are larger than 495 the last given index will remain in this IsolatedArray. 496 */ 497 IsolatedArray!T[] splice(in size_t[] indices...) pure 498 in { 499 //import std.algorithm : isSorted; 500 assert(indices.length > 0, "At least one splice index must be given."); 501 //assert(isSorted(indices), "Indices must be in ascending order."); 502 assert(indices[$-1] <= m_array.length, "Splice index out of bounds."); 503 } 504 body { 505 auto ret = new IsolatedArray!T[indices.length]; 506 size_t lidx = 0; 507 foreach( i, sidx; indices ){ 508 ret[i].m_array = m_array[lidx .. sidx]; 509 lidx = sidx; 510 } 511 m_array = m_array[lidx .. $]; 512 return ret; 513 } 514 515 void merge(ref IsolatedArray!T array) pure 516 in { 517 assert(array.m_array.ptr == m_array.ptr+m_array.length || array.m_array.ptr+array.length == m_array.ptr, 518 "Argument to merge() must be a neighbouring array partition."); 519 } 520 body { 521 if( array.m_array.ptr == m_array.ptr + m_array.length ){ 522 m_array = m_array.ptr[0 .. m_array.length + array.length]; 523 } else { 524 m_array = array.m_array.ptr[0 .. m_array.length + array.length]; 525 } 526 array.m_array.length = 0; 527 } 528 } 529 530 531 /// private 532 private struct IsolatedAssociativeArray(K, V) 533 { 534 pure: 535 static assert(isWeaklyIsolated!K, "Key type has aliasing. Memory isolation cannot be guaranteed."); 536 static assert(isWeaklyIsolated!V, "Value type has aliasing. Memory isolation cannot be guaranteed."); 537 538 enum __isWeakIsolatedType = true; 539 static if( isStronglyIsolated!K && isStronglyIsolated!V ) 540 enum __isIsolatedType = true; 541 542 alias BaseType = V[K]; 543 544 private { 545 V[K] m_aa; 546 } 547 548 mixin isolatedAssociativeArrayMethods!(K, V); 549 550 /** 551 Returns the raw reference. 552 553 Note that using this function breaks type safety. Be sure to not escape the reference. 554 */ 555 inout(V[K]) unsafeGet() inout { return m_aa; } 556 557 IsolatedAssociativeArray move() { auto r = m_aa; m_aa = null; return IsolatedAssociativeArray(r); } 558 void move(ref IsolatedAssociativeArray target) { target.m_aa = m_aa; m_aa = null; } 559 560 V[K] extract() 561 { 562 auto arr = m_aa; 563 m_aa = null; 564 return arr; 565 } 566 567 static if( is(typeof(IsolatedAssociativeArray.__isIsolatedType)) ){ 568 immutable(V)[K] freeze() 569 { 570 auto arr = m_aa; 571 m_aa = null; 572 return cast(immutable(V)[K])(arr); 573 } 574 575 immutable(V[K]) freeze2() 576 { 577 auto arr = m_aa; 578 m_aa = null; 579 return cast(immutable(V[K]))(arr); 580 } 581 } 582 } 583 584 585 /** Encapsulates a reference in a way that disallows escaping it or any contained references. 586 */ 587 template ScopedRef(T) 588 { 589 static if( isAggregateType!T ) alias ScopedRef = ScopedRefAggregate!T; 590 else static if( isAssociativeArray!T ) alias ScopedRef = ScopedRefAssociativeArray!T; 591 else static if( isArray!T ) alias ScopedRef = ScopedRefArray!T; 592 else static if( isBasicType!T ) alias ScopedRef = ScopedRefBasic!T; 593 else static assert(false, "Unsupported type for ScopedRef: "~T.stringof); 594 } 595 596 /// private 597 private struct ScopedRefBasic(T) 598 { 599 private T* m_ref; 600 601 @disable this(this); 602 603 this(ref T tref) pure { m_ref = &tref; } 604 605 //void opAssign(T value) { *m_ref = value; } 606 607 ref T unsafeGet() pure { return *m_ref; } 608 609 alias unsafeGet this; 610 } 611 612 /// private 613 private struct ScopedRefAggregate(T) 614 { 615 private T* m_ref; 616 617 @disable this(this); 618 619 this(ref T tref) pure { m_ref = &tref; } 620 621 //void opAssign(T value) { *m_ref = value; } 622 623 ref T unsafeGet() pure { return *m_ref; } 624 625 static if( is(T == shared) ){ 626 auto lock() pure { return .lock(unsafeGet()); } 627 } else { 628 mixin(isolatedAggregateMethodsString!T()); 629 //mixin isolatedAggregateMethods!T; 630 } 631 } 632 633 /// private 634 private struct ScopedRefArray(T) 635 { 636 alias V = typeof(T.init[0]) ; 637 private T* m_ref; 638 639 private @property ref T m_array() pure { return *m_ref; } 640 private @property ref const(T) m_array() const pure { return *m_ref; } 641 642 mixin isolatedArrayMethods!(V, !is(T == const) && !is(T == immutable)); 643 644 @disable this(this); 645 646 this(ref T tref) pure { m_ref = &tref; } 647 648 //void opAssign(T value) { *m_ref = value; } 649 650 ref T unsafeGet() pure { return *m_ref; } 651 } 652 653 /// private 654 private struct ScopedRefAssociativeArray(K, V) 655 { 656 alias K = KeyType!T; 657 alias V = ValueType!T; 658 private T* m_ref; 659 660 private @property ref T m_array() pure { return *m_ref; } 661 private @property ref const(T) m_array() const pure { return *m_ref; } 662 663 mixin isolatedAssociativeArrayMethods!(K, V); 664 665 @disable this(this); 666 667 this(ref T tref) pure { m_ref = &tref; } 668 669 //void opAssign(T value) { *m_ref = value; } 670 671 ref T unsafeGet() pure { return *m_ref; } 672 673 } 674 675 /******************************************************************************/ 676 /* COMMON MIXINS FOR NON-REF-ESCAPING WRAPPER STRUCTS */ 677 /******************************************************************************/ 678 679 /// private 680 /*private mixin template(T) isolatedAggregateMethods 681 { 682 mixin(isolatedAggregateMethodsString!T()); 683 }*/ 684 685 /// private 686 private string isolatedAggregateMethodsString(T)() 687 { 688 import vibe.internal.meta.traits; 689 690 string ret = generateModuleImports!T(); 691 //pragma(msg, "Type '"~T.stringof~"'"); 692 foreach( mname; __traits(allMembers, T) ){ 693 static if (isPublicMember!(T, mname)) { 694 static if (isRWPlainField!(T, mname)) { 695 alias mtype = typeof(__traits(getMember, T, mname)) ; 696 auto mtypename = fullyQualifiedName!mtype; 697 //pragma(msg, " field " ~ mname ~ " : " ~ mtype.stringof); 698 ret ~= "@property ScopedRef!(const("~mtypename~")) "~mname~"() const pure { return ScopedRef!(const("~mtypename~"))(m_ref."~mname~"); }\n"; 699 ret ~= "@property ScopedRef!("~mtypename~") "~mname~"() pure { return ScopedRef!("~mtypename~")(m_ref."~mname~"); }\n"; 700 static if( !is(mtype == const) && !is(mtype == immutable) ){ 701 static if( isWeaklyIsolated!mtype ){ 702 ret ~= "@property void "~mname~"("~mtypename~" value) pure { m_ref."~mname~" = value; }\n"; 703 } else { 704 ret ~= "@property void "~mname~"(AT)(AT value) pure { static assert(isWeaklyIsolated!AT); m_ref."~mname~" = value.unsafeGet(); }\n"; 705 } 706 } 707 } else { 708 foreach( method; __traits(getOverloads, T, mname) ){ 709 alias ftype = FunctionTypeOf!method; 710 711 // only pure functions are allowed (or they could escape references to global variables) 712 // don't allow non-isolated references to be escaped 713 if( functionAttributes!ftype & FunctionAttribute.pure_ && 714 isWeaklyIsolated!(ReturnType!ftype) ) 715 { 716 static if( __traits(isStaticFunction, method) ){ 717 //pragma(msg, " static method " ~ mname ~ " : " ~ ftype.stringof); 718 ret ~= "static "~fullyQualifiedName!(ReturnType!ftype)~" "~mname~"("; 719 foreach( i, P; ParameterTypeTuple!ftype ){ 720 if( i > 0 ) ret ~= ", "; 721 ret ~= fullyQualifiedName!P ~ " p"~i.stringof; 722 } 723 ret ~= "){ return "~fullyQualifiedName!T~"."~mname~"("; 724 foreach( i, P; ParameterTypeTuple!ftype ){ 725 if( i > 0 ) ret ~= ", "; 726 ret ~= "p"~i.stringof; 727 } 728 ret ~= "); }\n"; 729 } else if (mname != "__ctor") { 730 //pragma(msg, " normal method " ~ mname ~ " : " ~ ftype.stringof); 731 if( is(ftype == const) ) ret ~= "const "; 732 if( is(ftype == shared) ) ret ~= "shared "; 733 if( is(ftype == immutable) ) ret ~= "immutable "; 734 if( functionAttributes!ftype & FunctionAttribute.pure_ ) ret ~= "pure "; 735 if( functionAttributes!ftype & FunctionAttribute.property ) ret ~= "@property "; 736 ret ~= fullyQualifiedName!(ReturnType!ftype)~" "~mname~"("; 737 foreach( i, P; ParameterTypeTuple!ftype ){ 738 if( i > 0 ) ret ~= ", "; 739 ret ~= fullyQualifiedName!P ~ " p"~i.stringof; 740 } 741 ret ~= "){ return m_ref."~mname~"("; 742 foreach( i, P; ParameterTypeTuple!ftype ){ 743 if( i > 0 ) ret ~= ", "; 744 ret ~= "p"~i.stringof; 745 } 746 ret ~= "); }\n"; 747 } 748 } 749 } 750 } 751 } //else pragma(msg, " non-public field " ~ mname); 752 } 753 return ret; 754 } 755 756 757 /// private 758 private mixin template isolatedArrayMethods(T, bool mutableRef = true) 759 { 760 @property size_t length() const pure { return m_array.length; } 761 762 @property bool empty() const pure { return m_array.length == 0; } 763 764 static if( mutableRef ){ 765 @property void length(size_t value) pure { m_array.length = value; } 766 767 768 void opCatAssign(T item) pure 769 { 770 static if( isCopyable!T ) m_array ~= item; 771 else { 772 m_array.length++; 773 m_array[$-1] = item; 774 } 775 } 776 777 void opCatAssign(IsolatedArray!T array) pure 778 { 779 static if( isCopyable!T ) m_array ~= array.m_array; 780 else { 781 size_t start = m_array.length; 782 m_array.length += array.length; 783 foreach( i, ref itm; array.m_array ) 784 m_array[start+i] = itm; 785 } 786 } 787 } 788 789 ScopedRef!(const(T)) opIndex(size_t idx) const pure { return ScopedRef!(const(T))(m_array[idx]); } 790 ScopedRef!T opIndex(size_t idx) pure { return ScopedRef!T(m_array[idx]); } 791 792 static if( !is(T == const) && !is(T == immutable) ) 793 void opIndexAssign(T value, size_t idx) pure { m_array[idx] = value; } 794 795 int opApply(int delegate(ref size_t, ref ScopedRef!T) del) 796 pure { 797 foreach( idx, ref v; m_array ){ 798 auto noref = ScopedRef!T(v); 799 if( auto ret = (cast(int delegate(ref size_t, ref ScopedRef!T) pure)del)(idx, noref) ) 800 return ret; 801 } 802 return 0; 803 } 804 805 int opApply(int delegate(ref size_t, ref ScopedRef!(const(T))) del) 806 const pure { 807 foreach( idx, ref v; m_array ){ 808 auto noref = ScopedRef!(const(T))(v); 809 if( auto ret = (cast(int delegate(ref size_t, ref ScopedRef!(const(T))) pure)del)(idx, noref) ) 810 return ret; 811 } 812 return 0; 813 } 814 815 int opApply(int delegate(ref ScopedRef!T) del) 816 pure { 817 foreach( v; m_array ){ 818 auto noref = ScopedRef!T(v); 819 if( auto ret = (cast(int delegate(ref ScopedRef!T) pure)del)(noref) ) 820 return ret; 821 } 822 return 0; 823 } 824 825 int opApply(int delegate(ref ScopedRef!(const(T))) del) 826 const pure { 827 foreach( v; m_array ){ 828 auto noref = ScopedRef!(const(T))(v); 829 if( auto ret = (cast(int delegate(ref ScopedRef!(const(T))) pure)del)(noref) ) 830 return ret; 831 } 832 return 0; 833 } 834 } 835 836 837 /// private 838 private mixin template isolatedAssociativeArrayMethods(K, V, bool mutableRef = true) 839 { 840 @property size_t length() const pure { return m_aa.length; } 841 @property bool empty() const pure { return m_aa.length == 0; } 842 843 static if( !is(V == const) && !is(V == immutable) ) 844 void opIndexAssign(V value, K key) pure { m_aa[key] = value; } 845 846 inout(V) opIndex(K key) inout pure { return m_aa[key]; } 847 848 int opApply(int delegate(ref ScopedRef!K, ref ScopedRef!V) del) 849 pure { 850 foreach( ref k, ref v; m_aa ) 851 if( auto ret = (cast(int delegate(ref ScopedRef!K, ref ScopedRef!V) pure)del)(k, v) ) 852 return ret; 853 return 0; 854 } 855 856 int opApply(int delegate(ref ScopedRef!V) del) 857 pure { 858 foreach( ref v; m_aa ) 859 if( auto ret = (cast(int delegate(ref ScopedRef!V) pure)del)(v) ) 860 return ret; 861 return 0; 862 } 863 864 int opApply(int delegate(ref ScopedRef!(const(K)), ref ScopedRef!(const(V))) del) 865 const pure { 866 foreach( ref k, ref v; m_aa ) 867 if( auto ret = (cast(int delegate(ref ScopedRef!(const(K)), ref ScopedRef!(const(V))) pure)del)(k, v) ) 868 return ret; 869 return 0; 870 } 871 872 int opApply(int delegate(ref ScopedRef!(const(V))) del) 873 const pure { 874 foreach( v; m_aa ) 875 if( auto ret = (cast(int delegate(ref ScopedRef!(const(V))) pure)del)(v) ) 876 return ret; 877 return 0; 878 } 879 } 880 881 882 /******************************************************************************/ 883 /* UTILITY FUNCTIONALITY */ 884 /******************************************************************************/ 885 886 // private 887 private @property string generateModuleImports(T)() 888 { 889 bool[string] visited; 890 //pragma(msg, "generateModuleImports "~T.stringof); 891 return generateModuleImportsImpl!T(visited); 892 } 893 894 private @property string generateModuleImportsImpl(T, TYPES...)(ref bool[string] visited) 895 { 896 string ret; 897 898 //pragma(msg, T); 899 //pragma(msg, TYPES); 900 901 static if( !haveTypeAlready!(T, TYPES) ){ 902 void addModule(string mod){ 903 if( mod !in visited ){ 904 ret ~= "static import "~mod~";\n"; 905 visited[mod] = true; 906 } 907 } 908 909 static if( isAggregateType!T && !is(typeof(T.__isWeakIsolatedType)) ){ // hack to avoid a recursive template instantiation when Isolated!T is passed to moduleName 910 addModule(moduleName!T); 911 912 foreach( member; __traits(allMembers, T) ){ 913 //static if( isPublicMember!(T, member) ){ 914 static if( !is(typeof(__traits(getMember, T, member))) ){ 915 // ignore sub types 916 } else static if( !is(FunctionTypeOf!(__traits(getMember, T, member)) == function) ){ 917 alias mtype = typeof(__traits(getMember, T, member)) ; 918 ret ~= generateModuleImportsImpl!(mtype, T, TYPES)(visited); 919 } else static if( is(T == class) || is(T == interface) ){ 920 foreach( overload; MemberFunctionsTuple!(T, member) ){ 921 ret ~= generateModuleImportsImpl!(ReturnType!overload, T, TYPES)(visited); 922 foreach( P; ParameterTypeTuple!overload ) 923 ret ~= generateModuleImportsImpl!(P, T, TYPES)(visited); 924 } 925 } // TODO: handle structs! 926 //} 927 } 928 } 929 else static if( isPointer!T ) ret ~= generateModuleImportsImpl!(PointerTarget!T, T, TYPES)(visited); 930 else static if( isArray!T ) ret ~= generateModuleImportsImpl!(typeof(T.init[0]), T, TYPES)(visited); 931 else static if( isAssociativeArray!T ) ret ~= generateModuleImportsImpl!(KeyType!T, T, TYPES)(visited) ~ generateModuleImportsImpl!(ValueType!T, T, TYPES)(visited); 932 } 933 934 return ret; 935 } 936 937 template haveTypeAlready(T, TYPES...) 938 { 939 static if( TYPES.length == 0 ) enum haveTypeAlready = false; 940 else static if( is(T == TYPES[0]) ) enum haveTypeAlready = true; 941 else alias haveTypeAlready = haveTypeAlready!(T, TYPES[1 ..$]); 942 } 943 944 945 /******************************************************************************/ 946 /* Additional traits useful for handling isolated data */ 947 /******************************************************************************/ 948 949 /** 950 Determines if the given list of types has any non-immutable aliasing outside of their object tree. 951 952 The types in particular may only contain plain data, pointers or arrays to immutable data, or references 953 encapsulated in `vibe.core.concurrency.Isolated`. 954 */ 955 template isStronglyIsolated(T...) 956 { 957 static if (T.length == 0) enum bool isStronglyIsolated = true; 958 else static if (T.length > 1) enum bool isStronglyIsolated = isStronglyIsolated!(T[0 .. $/2]) && isStronglyIsolated!(T[$/2 .. $]); 959 else { 960 static if (is(T[0] == immutable)) enum bool isStronglyIsolated = true; 961 else static if(isInstanceOf!(Rebindable, T[0])) enum bool isStronglyIsolated = isStronglyIsolated!(typeof(T[0].get())); 962 else static if (is(typeof(T[0].__isIsolatedType))) enum bool isStronglyIsolated = true; 963 else static if (is(T[0] == class)) enum bool isStronglyIsolated = false; 964 else static if (is(T[0] == interface)) enum bool isStronglyIsolated = false; // can't know if the implementation is isolated 965 else static if (is(T[0] == delegate)) enum bool isStronglyIsolated = false; // can't know to what a delegate points 966 else static if (isDynamicArray!(T[0])) enum bool isStronglyIsolated = is(typeof(T[0].init[0]) == immutable); 967 else static if (isAssociativeArray!(T[0])) enum bool isStronglyIsolated = false; // TODO: be less strict here 968 else static if (isSomeFunction!(T[0])) enum bool isStronglyIsolated = true; // functions are immutable 969 else static if (isPointer!(T[0])) enum bool isStronglyIsolated = is(typeof(*T[0].init) == immutable); 970 else static if (isAggregateType!(T[0])) enum bool isStronglyIsolated = isStronglyIsolated!(FieldTypeTuple!(T[0])); 971 else enum bool isStronglyIsolated = true; 972 } 973 } 974 975 976 /** 977 Determines if the given list of types has any non-immutable and unshared aliasing outside of their object tree. 978 979 The types in particular may only contain plain data, pointers or arrays to immutable or shared data, or references 980 encapsulated in `vibe.core.concurrency.Isolated`. Values that do not have unshared and unisolated aliasing are safe to be passed 981 between threads. 982 */ 983 template isWeaklyIsolated(T...) 984 { 985 static if (T.length == 0) enum bool isWeaklyIsolated = true; 986 else static if (T.length > 1) enum bool isWeaklyIsolated = isWeaklyIsolated!(T[0 .. $/2]) && isWeaklyIsolated!(T[$/2 .. $]); 987 else { 988 static if(is(T[0] == immutable)) enum bool isWeaklyIsolated = true; 989 else static if (is(T[0] == shared)) enum bool isWeaklyIsolated = true; 990 else static if (isInstanceOf!(Rebindable, T[0])) enum bool isWeaklyIsolated = isWeaklyIsolated!(typeof(T[0].get())); 991 else static if (is(T[0] == Tid)) enum bool isWeaklyIsolated = true; // Tid/MessageBox is not properly annotated with shared 992 else static if (is(T[0] : Throwable)) enum bool isWeaklyIsolated = true; // WARNING: this is unsafe, but needed for send/receive! 993 else static if (is(typeof(T[0].__isIsolatedType))) enum bool isWeaklyIsolated = true; 994 else static if (is(typeof(T[0].__isWeakIsolatedType))) enum bool isWeaklyIsolated = true; 995 else static if (is(T[0] == class)) enum bool isWeaklyIsolated = false; 996 else static if (is(T[0] == interface)) enum bool isWeaklyIsolated = false; // can't know if the implementation is isolated 997 else static if (is(T[0] == delegate)) enum bool isWeaklyIsolated = T[0].stringof.endsWith(" shared"); // can't know to what a delegate points - FIXME: use something better than a string comparison 998 else static if (isDynamicArray!(T[0])) enum bool isWeaklyIsolated = is(typeof(T[0].init[0]) == immutable); 999 else static if (isAssociativeArray!(T[0])) enum bool isWeaklyIsolated = false; // TODO: be less strict here 1000 else static if (isSomeFunction!(T[0])) enum bool isWeaklyIsolated = true; // functions are immutable 1001 else static if (isPointer!(T[0])) enum bool isWeaklyIsolated = is(typeof(*T[0].init) == immutable) || is(typeof(*T[0].init) == shared); 1002 else static if (isAggregateType!(T[0])) enum bool isWeaklyIsolated = isWeaklyIsolated!(FieldTypeTuple!(T[0])); 1003 else enum bool isWeaklyIsolated = true; 1004 } 1005 } 1006 1007 unittest { 1008 static class A { int x; string y; } 1009 1010 static struct B { 1011 string a; // strongly isolated 1012 Isolated!A b; // strongly isolated 1013 version(EnablePhobosFails) 1014 Isolated!(Isolated!A[]) c; // strongly isolated 1015 version(EnablePhobosFails) 1016 Isolated!(Isolated!A[string]) c; // AA implementation does not like this 1017 version(EnablePhobosFails) 1018 Isolated!(int[string]) d; // strongly isolated 1019 } 1020 1021 static struct C { 1022 string a; // strongly isolated 1023 shared(A) b; // weakly isolated 1024 Isolated!A c; // strongly isolated 1025 shared(A*) d; // weakly isolated 1026 shared(A[]) e; // weakly isolated 1027 shared(A[string]) f; // weakly isolated 1028 } 1029 1030 static struct D { A a; } // not isolated 1031 static struct E { void delegate() a; } // not isolated 1032 static struct F { void function() a; } // strongly isolated (functions are immutable) 1033 static struct G { void test(); } // strongly isolated 1034 static struct H { A[] a; } // not isolated 1035 static interface I {} 1036 1037 static assert(!isStronglyIsolated!A); 1038 static assert(isStronglyIsolated!(FieldTypeTuple!A)); 1039 static assert(isStronglyIsolated!B); 1040 static assert(!isStronglyIsolated!C); 1041 static assert(!isStronglyIsolated!D); 1042 static assert(!isStronglyIsolated!E); 1043 static assert(isStronglyIsolated!F); 1044 static assert(isStronglyIsolated!G); 1045 static assert(!isStronglyIsolated!H); 1046 static assert(!isStronglyIsolated!I); 1047 1048 static assert(!isWeaklyIsolated!A); 1049 static assert(isWeaklyIsolated!(FieldTypeTuple!A)); 1050 static assert(isWeaklyIsolated!B); 1051 static assert(isWeaklyIsolated!C); 1052 static assert(!isWeaklyIsolated!D); 1053 static assert(!isWeaklyIsolated!E); 1054 static assert(isWeaklyIsolated!F); 1055 static assert(isWeaklyIsolated!G); 1056 static assert(!isWeaklyIsolated!H); 1057 static assert(!isWeaklyIsolated!I); 1058 } 1059 1060 unittest { 1061 static assert(isWeaklyIsolated!Tid); 1062 } 1063 1064 1065 template isCopyable(T) 1066 { 1067 static if( __traits(compiles, {foreach( t; [T.init]){}}) ) enum isCopyable = true; 1068 else enum isCopyable = false; 1069 } 1070 1071 1072 /******************************************************************************/ 1073 /* Future (promise) suppport */ 1074 /******************************************************************************/ 1075 1076 /** 1077 Represents a values that will be computed asynchronously. 1078 1079 This type uses $(D alias this) to enable transparent access to the result 1080 value. 1081 */ 1082 struct Future(T) { 1083 import vibe.internal.freelistref : FreeListRef; 1084 1085 private { 1086 FreeListRef!(shared(T)) m_result; 1087 Task m_task; 1088 } 1089 1090 /// Checks if the values was fully computed. 1091 @property bool ready() const { return !m_task.running; } 1092 1093 /** Returns the computed value. 1094 1095 This function waits for the computation to finish, if necessary, and 1096 then returns the final value. In case of an uncaught exception 1097 happening during the computation, the exception will be thrown 1098 instead. 1099 */ 1100 ref T getResult() 1101 { 1102 if (!ready) m_task.join(); 1103 assert(ready, "Task still running after join()!?"); 1104 return *cast(T*)&m_result.get(); // casting away shared is safe, because this is a unique reference 1105 } 1106 1107 alias getResult this; 1108 1109 private void init() 1110 { 1111 m_result = FreeListRef!(shared(T))(); 1112 } 1113 } 1114 1115 1116 /** 1117 Starts an asynchronous computation and returns a future for the result value. 1118 1119 If the supplied callable and arguments are all weakly isolated, 1120 $(D vibe.core.core.runWorkerTask) will be used to perform the computation. 1121 Otherwise, $(D vibe.core.core.runTask) will be used. 1122 1123 Params: 1124 callable: A callable value, can be either a function, a delegate, or a 1125 user defined type that defines an $(D opCall). 1126 args: Arguments to pass to the callable. 1127 1128 Returns: 1129 Returns a $(D Future) object that can be used to access the result. 1130 1131 See_also: $(D isWeaklyIsolated) 1132 */ 1133 Future!(StripHeadConst!(ReturnType!CALLABLE)) async(CALLABLE, ARGS...)(CALLABLE callable, ARGS args) 1134 if (is(typeof(callable(args)) == ReturnType!CALLABLE)) 1135 { 1136 import vibe.internal.freelistref : FreeListRef; 1137 1138 import vibe.core.core; 1139 import std.functional : toDelegate; 1140 1141 alias RET = StripHeadConst!(ReturnType!CALLABLE); 1142 Future!RET ret; 1143 ret.init(); 1144 static void compute(FreeListRef!(shared(RET)) dst, CALLABLE callable, ARGS args) { 1145 dst = cast(shared(RET))callable(args); 1146 } 1147 static if (isWeaklyIsolated!CALLABLE && isWeaklyIsolated!ARGS) { 1148 ret.m_task = runWorkerTaskH(&compute, ret.m_result, callable, args); 1149 } else { 1150 ret.m_task = runTask(toDelegate(&compute), ret.m_result, callable, args); 1151 } 1152 return ret; 1153 } 1154 1155 /// 1156 unittest { 1157 import vibe.core.core; 1158 import vibe.core.log; 1159 1160 void test() 1161 { 1162 auto val = async({ 1163 logInfo("Starting to compute value in worker task."); 1164 sleep(500.msecs); // simulate some lengthy computation 1165 logInfo("Finished computing value in worker task."); 1166 return 32; 1167 }); 1168 1169 logInfo("Starting computation in main task"); 1170 sleep(200.msecs); // simulate some lengthy computation 1171 logInfo("Finished computation in main task. Waiting for async value."); 1172 logInfo("Result: %s", val.getResult()); 1173 } 1174 } 1175 1176 /// 1177 unittest { 1178 int sum(int a, int b) 1179 { 1180 return a + b; 1181 } 1182 1183 static int sum2(int a, int b) 1184 { 1185 return a + b; 1186 } 1187 1188 void test() 1189 { 1190 // Using a delegate will use runTask internally 1191 assert(async(&sum, 2, 3).getResult() == 5); 1192 1193 // Using a static function will use runTaskWorker internally, 1194 // if all arguments are weakly isolated 1195 assert(async(&sum2, 2, 3).getResult() == 5); 1196 } 1197 } 1198 1199 unittest { 1200 import vibe.core.core : sleep; 1201 1202 auto f = async({ 1203 immutable byte b = 1; 1204 return b; 1205 }); 1206 sleep(10.msecs); // let it finish first 1207 assert(f.getResult() == 1); 1208 1209 // currently not possible because Task.join only works within a single thread. 1210 /*f = async({ 1211 immutable byte b = 2; 1212 sleep(10.msecs); // let the caller wait a little 1213 return b; 1214 }); 1215 assert(f.getResult() == 1);*/ 1216 } 1217 1218 /******************************************************************************/ 1219 /******************************************************************************/ 1220 /* std.concurrency compatible interface for message passing */ 1221 /******************************************************************************/ 1222 /******************************************************************************/ 1223 1224 enum ConcurrencyPrimitive { 1225 task, // Task run in the caller's thread (`runTask`) 1226 workerTask, // Task run in the worker thread pool (`runWorkerTask`) 1227 thread // Separate thread 1228 } 1229 1230 /** Sets the concurrency primitive to use for `śtd.concurrency.spawn()`. 1231 1232 By default, `spawn()` will start a thread for each call, mimicking the 1233 default behavior of `std.concurrency`. 1234 */ 1235 void setConcurrencyPrimitive(ConcurrencyPrimitive primitive) 1236 { 1237 import core.atomic : atomicStore; 1238 atomicStore(st_concurrencyPrimitive, primitive); 1239 } 1240 1241 private shared ConcurrencyPrimitive st_concurrencyPrimitive = ConcurrencyPrimitive.thread; 1242 1243 void send(ARGS...)(Task task, ARGS args) { std.concurrency.send(task.tidInfo.ident, args); } 1244 void send(ARGS...)(Tid tid, ARGS args) { std.concurrency.send(tid, args); } 1245 void prioritySend(ARGS...)(Task task, ARGS args) { std.concurrency.prioritySend(task.tidInfo.ident, args); } 1246 void prioritySend(ARGS...)(Tid tid, ARGS args) { std.concurrency.prioritySend(tid, args); } 1247 1248 package class VibedScheduler : Scheduler { 1249 import core.sync.mutex; 1250 import vibe.core.core; 1251 import vibe.core.sync; 1252 1253 override void start(void delegate() op) { op(); } 1254 override void spawn(void delegate() op) 1255 { 1256 import core.thread : Thread; 1257 1258 final switch (st_concurrencyPrimitive) with (ConcurrencyPrimitive) { 1259 case task: runTask(op); break; 1260 case workerTask: 1261 static void wrapper(shared(void delegate()) op) { 1262 (cast(void delegate())op)(); 1263 } 1264 runWorkerTask(&wrapper, cast(shared)op); 1265 break; 1266 case thread: 1267 auto t = new Thread(op); 1268 t.start(); 1269 break; 1270 } 1271 } 1272 override void yield() {} 1273 override @property ref ThreadInfo thisInfo() { return Task.getThis().tidInfo; } 1274 override TaskCondition newCondition(Mutex m) 1275 { 1276 scope (failure) assert(false); 1277 version (VibeLibasyncDriver) { 1278 import vibe.core.drivers.libasync; 1279 if (LibasyncDriver.isControlThread) 1280 return null; 1281 } 1282 setupDriver(); 1283 return new TaskCondition(m); 1284 } 1285 } 1286 1287 1288 // Compatibility implementation of `send` using vibe.d's own std.concurrency implementation 1289 void sendCompat(ARGS...)(Task tid, ARGS args) 1290 { 1291 assert (tid != Task(), "Invalid task handle"); 1292 static assert(args.length > 0, "Need to send at least one value."); 1293 foreach(A; ARGS){ 1294 static assert(isWeaklyIsolated!A, "Only objects with no unshared or unisolated aliasing may be sent, not "~A.stringof~"."); 1295 } 1296 tid.messageQueue.send(Variant(IsolatedValueProxyTuple!ARGS(args))); 1297 } 1298 1299 // Compatibility implementation of `prioritySend` using vibe.d's own std.concurrency implementation 1300 void prioritySendCompat(ARGS...)(Task tid, ARGS args) 1301 { 1302 assert (tid != Task(), "Invalid task handle"); 1303 static assert(args.length > 0, "Need to send at least one value."); 1304 foreach(A; ARGS){ 1305 static assert(isWeaklyIsolated!A, "Only objects with no unshared or unisolated aliasing may be sent, not "~A.stringof~"."); 1306 } 1307 tid.messageQueue.prioritySend(Variant(IsolatedValueProxyTuple!ARGS(args))); 1308 } 1309 1310 // TODO: handle special exception types 1311 1312 // Compatibility implementation of `receive` using vibe.d's own std.concurrency implementation 1313 void receiveCompat(OPS...)(OPS ops) 1314 { 1315 auto tid = Task.getThis(); 1316 assert(tid != Task.init, "Cannot receive task messages outside of a task."); 1317 tid.messageQueue.receive(opsFilter(ops), opsHandler(ops)); 1318 } 1319 1320 // Compatibility implementation of `receiveOnly` using vibe.d's own std.concurrency implementation 1321 auto receiveOnlyCompat(ARGS...)() 1322 { 1323 import std.algorithm : move; 1324 ARGS ret; 1325 1326 receiveCompat( 1327 (ARGS val) { move(val, ret); }, 1328 (LinkTerminated e) { throw e; }, 1329 (OwnerTerminated e) { throw e; }, 1330 (Variant val) { throw new MessageMismatch(format("Unexpected message type %s, expected %s.", val.type, ARGS.stringof)); } 1331 ); 1332 1333 static if(ARGS.length == 1) return ret[0]; 1334 else return tuple(ret); 1335 } 1336 1337 // Compatibility implementation of `receiveTimeout` using vibe.d's own std.concurrency implementation 1338 bool receiveTimeoutCompat(OPS...)(Duration timeout, OPS ops) 1339 { 1340 auto tid = Task.getThis(); 1341 assert(tid != Task.init, "Cannot receive task messages outside of a task."); 1342 return tid.messageQueue.receiveTimeout!OPS(timeout, opsFilter(ops), opsHandler(ops)); 1343 } 1344 1345 // Compatibility implementation of `setMailboxSize` using vibe.d's own std.concurrency implementation 1346 void setMaxMailboxSizeCompat(Task tid, size_t messages, OnCrowding on_crowding) 1347 { 1348 final switch(on_crowding){ 1349 case OnCrowding.block: setMaxMailboxSizeCompat(tid, messages, null); break; 1350 case OnCrowding.throwException: setMaxMailboxSizeCompat(tid, messages, &onCrowdingThrow); break; 1351 case OnCrowding.ignore: setMaxMailboxSizeCompat(tid, messages, &onCrowdingDrop); break; 1352 } 1353 } 1354 1355 // Compatibility implementation of `setMailboxSize` using vibe.d's own std.concurrency implementation 1356 void setMaxMailboxSizeCompat(Task tid, size_t messages, bool function(Task) on_crowding) 1357 { 1358 tid.messageQueue.setMaxSize(messages, on_crowding); 1359 } 1360 1361 unittest { 1362 static class CLS {} 1363 static assert(is(typeof(sendCompat(Task.init, makeIsolated!CLS())))); 1364 static assert(is(typeof(sendCompat(Task.init, 1)))); 1365 static assert(is(typeof(sendCompat(Task.init, 1, "str", makeIsolated!CLS())))); 1366 static assert(!is(typeof(sendCompat(Task.init, new CLS)))); 1367 static assert(is(typeof(receiveCompat((Isolated!CLS){})))); 1368 static assert(is(typeof(receiveCompat((int){})))); 1369 static assert(is(typeof(receiveCompat!(void delegate(int, string, Isolated!CLS))((int, string, Isolated!CLS){})))); 1370 static assert(!is(typeof(receiveCompat((CLS){})))); 1371 } 1372 1373 private bool onCrowdingThrow(Task tid){ 1374 import std.concurrency : Tid; 1375 throw new MailboxFull(Tid()); 1376 } 1377 1378 private bool onCrowdingDrop(Task tid){ 1379 return false; 1380 } 1381 1382 private struct IsolatedValueProxyTuple(T...) 1383 { 1384 staticMap!(IsolatedValueProxy, T) fields; 1385 1386 this(ref T values) 1387 { 1388 foreach (i, Ti; T) { 1389 static if (isInstanceOf!(IsolatedSendProxy, IsolatedValueProxy!Ti)) { 1390 fields[i] = IsolatedValueProxy!Ti(values[i].unsafeGet()); 1391 } else fields[i] = values[i]; 1392 } 1393 } 1394 } 1395 1396 private template IsolatedValueProxy(T) 1397 { 1398 static if (isInstanceOf!(IsolatedRef, T) || isInstanceOf!(IsolatedArray, T) || isInstanceOf!(IsolatedAssociativeArray, T)) { 1399 alias IsolatedValueProxy = IsolatedSendProxy!(T.BaseType); 1400 } else { 1401 alias IsolatedValueProxy = T; 1402 } 1403 } 1404 1405 /+unittest { 1406 static class Test {} 1407 void test() { 1408 Task.getThis().send(new immutable Test, makeIsolated!Test()); 1409 } 1410 }+/ 1411 1412 private struct IsolatedSendProxy(T) { alias BaseType = T; T value; } 1413 1414 private bool callBool(F, T...)(F fnc, T args) 1415 { 1416 static string caller(string prefix) 1417 { 1418 import std.conv; 1419 string ret = prefix ~ "fnc("; 1420 foreach (i, Ti; T) { 1421 static if (i > 0) ret ~= ", "; 1422 static if (isInstanceOf!(IsolatedSendProxy, Ti)) ret ~= "assumeIsolated(args["~to!string(i)~"].value)"; 1423 else ret ~= "args["~to!string(i)~"]"; 1424 } 1425 ret ~= ");"; 1426 return ret; 1427 } 1428 static assert(is(ReturnType!F == bool) || is(ReturnType!F == void), 1429 "Message handlers must return either bool or void."); 1430 static if (is(ReturnType!F == bool)) mixin(caller("return ")); 1431 else { 1432 mixin(caller("")); 1433 return true; 1434 } 1435 } 1436 1437 private bool delegate(Variant) @safe opsFilter(OPS...)(OPS ops) 1438 { 1439 return (Variant msg) @trusted { // Variant 1440 if (msg.convertsTo!Throwable) return true; 1441 foreach (i, OP; OPS) 1442 if (matchesHandler!OP(msg)) 1443 return true; 1444 return false; 1445 }; 1446 } 1447 1448 private void delegate(Variant) @safe opsHandler(OPS...)(OPS ops) 1449 { 1450 return (Variant msg) @trusted { // Variant 1451 foreach (i, OP; OPS) { 1452 alias PTypes = ParameterTypeTuple!OP; 1453 if (matchesHandler!OP(msg)) { 1454 static if (PTypes.length == 1 && is(PTypes[0] == Variant)) { 1455 if (callBool(ops[i], msg)) return; // WARNING: proxied isolated values will go through verbatim! 1456 } else { 1457 auto msgt = msg.get!(IsolatedValueProxyTuple!PTypes); 1458 if (callBool(ops[i], msgt.fields)) return; 1459 } 1460 } 1461 } 1462 if (msg.convertsTo!Throwable) 1463 throw msg.get!Throwable(); 1464 }; 1465 } 1466 1467 private bool matchesHandler(F)(Variant msg) 1468 { 1469 alias PARAMS = ParameterTypeTuple!F; 1470 if (PARAMS.length == 1 && is(PARAMS[0] == Variant)) return true; 1471 else return msg.convertsTo!(IsolatedValueProxyTuple!PARAMS); 1472 }