1 /** 2 This module contains the core functionality of the vibe.d framework. 3 4 See `runApplication` for the main entry point for typical vibe.d 5 server or GUI applications. 6 7 Copyright: © 2012-2016 RejectedSoftware e.K. 8 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 9 Authors: Sönke Ludwig 10 */ 11 module vibe.core.core; 12 13 public import vibe.core.driver; 14 15 import vibe.core.args; 16 import vibe.core.concurrency; 17 import vibe.core.log; 18 import vibe.utils.array; 19 import std.algorithm; 20 import std.conv; 21 import std.encoding; 22 import core.exception; 23 import std.exception; 24 import std.functional; 25 import std.range : empty, front, popFront; 26 import std.string; 27 import std.variant; 28 import std.typecons : Typedef, Tuple, tuple; 29 import core.atomic; 30 import core.sync.condition; 31 import core.sync.mutex; 32 import core.stdc.stdlib; 33 import core.thread; 34 35 alias TaskEventCb = void function(TaskEvent, Task) nothrow; 36 37 version(Posix) 38 { 39 import core.sys.posix.signal; 40 import core.sys.posix.unistd; 41 import core.sys.posix.pwd; 42 43 static if (__traits(compiles, {import core.sys.posix.grp; getgrgid(0);})) { 44 import core.sys.posix.grp; 45 } else { 46 extern (C) { 47 struct group { 48 char* gr_name; 49 char* gr_passwd; 50 gid_t gr_gid; 51 char** gr_mem; 52 } 53 group* getgrgid(gid_t); 54 group* getgrnam(in char*); 55 } 56 } 57 } 58 59 version (Windows) 60 { 61 import core.stdc.signal; 62 } 63 64 65 /**************************************************************************************************/ 66 /* Public functions */ 67 /**************************************************************************************************/ 68 69 /** 70 Performs final initialization and runs the event loop. 71 72 This function performs three tasks: 73 $(OL 74 $(LI Makes sure that no unrecognized command line options are passed to 75 the application and potentially displays command line help. See also 76 `vibe.core.args.finalizeCommandLineOptions`.) 77 $(LI Performs privilege lowering if required.) 78 $(LI Runs the event loop and blocks until it finishes.) 79 ) 80 81 Params: 82 args_out = Optional parameter to receive unrecognized command line 83 arguments. If left to `null`, an error will be reported if 84 any unrecognized argument is passed. 85 86 See_also: ` vibe.core.args.finalizeCommandLineOptions`, `lowerPrivileges`, 87 `runEventLoop` 88 */ 89 int runApplication(scope void delegate(string[]) args_out = null) 90 { 91 try { 92 string[] args; 93 if (!finalizeCommandLineOptions(args_out is null ? null : &args)) return 0; 94 if (args_out) args_out(args); 95 } catch (Exception e) { 96 logDiagnostic("Error processing command line: %s", e.msg); 97 return 1; 98 } 99 100 lowerPrivileges(); 101 102 logDiagnostic("Running event loop..."); 103 int status; 104 version (VibeDebugCatchAll) { 105 try { 106 status = runEventLoop(); 107 } catch( Throwable th ){ 108 logError("Unhandled exception in event loop: %s", th.msg); 109 logDiagnostic("Full exception: %s", th.toString().sanitize()); 110 return 1; 111 } 112 } else { 113 status = runEventLoop(); 114 } 115 116 logDiagnostic("Event loop exited with status %d.", status); 117 return status; 118 } 119 120 /// A simple echo server, listening on a privileged TCP port. 121 unittest { 122 import vibe.core.core; 123 import vibe.core.net; 124 125 int main() 126 { 127 // first, perform any application specific setup (privileged ports still 128 // available if run as root) 129 listenTCP(7, (conn) { conn.pipe(conn); }); 130 131 // then use runApplication to perform the remaining initialization and 132 // to run the event loop 133 return runApplication(); 134 } 135 } 136 137 /** The same as above, but performing the initialization sequence manually. 138 139 This allows to skip any additional initialization (opening the listening 140 port) if an invalid command line argument or the `--help` switch is 141 passed to the application. 142 */ 143 unittest { 144 import vibe.core.core; 145 import vibe.core.net; 146 147 int main() 148 { 149 // process the command line first, to be able to skip the application 150 // setup if not required 151 if (!finalizeCommandLineOptions()) return 0; 152 153 // then set up the application 154 listenTCP(7, (conn) { conn.pipe(conn); }); 155 156 // finally, perform privilege lowering (safe to skip for non-server 157 // applications) 158 lowerPrivileges(); 159 160 // and start the event loop 161 return runEventLoop(); 162 } 163 } 164 165 /** 166 Starts the vibe.d event loop for the calling thread. 167 168 Note that this function is usually called automatically by the vibe.d 169 framework. However, if you provide your own `main()` function, you may need 170 to call either this or `runApplication` manually. 171 172 The event loop will by default continue running during the whole life time 173 of the application, but calling `runEventLoop` multiple times in sequence 174 is allowed. Tasks will be started and handled only while the event loop is 175 running. 176 177 Returns: 178 The returned value is the suggested code to return to the operating 179 system from the `main` function. 180 181 See_Also: `runApplication` 182 */ 183 int runEventLoop() 184 { 185 setupSignalHandlers(); 186 187 logDebug("Starting event loop."); 188 s_eventLoopRunning = true; 189 scope (exit) { 190 s_eventLoopRunning = false; 191 s_exitEventLoop = false; 192 st_threadShutdownCondition.notifyAll(); 193 } 194 195 // runs any yield()ed tasks first 196 assert(!s_exitEventLoop); 197 s_exitEventLoop = false; 198 driverCore.notifyIdle(); 199 if (getExitFlag()) return 0; 200 201 // handle exit flag in the main thread to exit when 202 // exitEventLoop(true) is called from a thread) 203 if (Thread.getThis() is st_threads[0].thread) 204 runTask(toDelegate(&watchExitFlag)); 205 206 if (auto err = getEventDriver().runEventLoop() != 0) { 207 if (err == 1) { 208 logDebug("No events active, exiting message loop."); 209 return 0; 210 } 211 logError("Error running event loop: %d", err); 212 return 1; 213 } 214 215 logDebug("Event loop done."); 216 return 0; 217 } 218 219 /** 220 Stops the currently running event loop. 221 222 Calling this function will cause the event loop to stop event processing and 223 the corresponding call to runEventLoop() will return to its caller. 224 225 Params: 226 shutdown_all_threads = If true, exits event loops of all threads - 227 false by default. Note that the event loops of all threads are 228 automatically stopped when the main thread exits, so usually 229 there is no need to set shutdown_all_threads to true. 230 */ 231 void exitEventLoop(bool shutdown_all_threads = false) 232 { 233 logDebug("exitEventLoop called (%s)", shutdown_all_threads); 234 235 assert(s_eventLoopRunning || shutdown_all_threads, 236 "Trying to exit event loop when no loop is running."); 237 238 if (shutdown_all_threads) { 239 atomicStore(st_term, true); 240 st_threadsSignal.emit(); 241 } 242 243 // shutdown the calling thread 244 s_exitEventLoop = true; 245 if (s_eventLoopRunning) getEventDriver().exitEventLoop(); 246 } 247 248 /** 249 Process all pending events without blocking. 250 251 Checks if events are ready to trigger immediately, and run their callbacks if so. 252 253 Returns: Returns false $(I iff) exitEventLoop was called in the process. 254 */ 255 bool processEvents() 256 { 257 if (!getEventDriver().processEvents()) return false; 258 driverCore.notifyIdle(); 259 return true; 260 } 261 262 /** 263 Sets a callback that is called whenever no events are left in the event queue. 264 265 The callback delegate is called whenever all events in the event queue have been 266 processed. Returning true from the callback will cause another idle event to 267 be triggered immediately after processing any events that have arrived in the 268 meantime. Returning false will instead wait until another event has arrived first. 269 */ 270 void setIdleHandler(void delegate() @safe del) 271 { 272 s_idleHandler = { del(); return false; }; 273 } 274 /// ditto 275 void setIdleHandler(bool delegate() @safe del) 276 { 277 s_idleHandler = del; 278 } 279 280 /// Scheduled for deprecation - use a `@safe` callback instead. 281 void setIdleHandler(void delegate() @system del) 282 @system { 283 s_idleHandler = () @trusted { del(); return false; }; 284 } 285 /// ditto 286 void setIdleHandler(bool delegate() @system del) 287 @system { 288 s_idleHandler = () @trusted => del(); 289 } 290 291 /** 292 Runs a new asynchronous task. 293 294 task will be called synchronously from within the vibeRunTask call. It will 295 continue to run until vibeYield() or any of the I/O or wait functions is 296 called. 297 298 Note that the maximum size of all args must not exceed `maxTaskParameterSize`. 299 */ 300 Task runTask(ARGS...)(void delegate(ARGS) @safe task, ARGS args) 301 { 302 auto tfi = makeTaskFuncInfo(task, args); 303 return runTask_internal(tfi); 304 } 305 /// ditto 306 Task runTask(ARGS...)(void delegate(ARGS) task, ARGS args) 307 { 308 auto tfi = makeTaskFuncInfo(task, args); 309 return runTask_internal(tfi); 310 } 311 312 private Task runTask_internal(ref TaskFuncInfo tfi) 313 @safe nothrow { 314 import std.typecons : Tuple, tuple; 315 316 CoreTask f; 317 while (!f && !s_availableFibers.empty) { 318 f = s_availableFibers.back; 319 s_availableFibers.popBack(); 320 if (() @trusted nothrow { return f.state; } () != Fiber.State.HOLD) f = null; 321 } 322 323 if (f is null) { 324 // if there is no fiber available, create one. 325 if (s_availableFibers.capacity == 0) s_availableFibers.capacity = 1024; 326 logDebugV("Creating new fiber..."); 327 s_fiberCount++; 328 f = new CoreTask; 329 } 330 331 f.m_taskFunc = tfi; 332 333 f.bumpTaskCounter(); 334 auto handle = f.task(); 335 336 debug Task self = Task.getThis(); 337 debug if (s_taskEventCallback) { 338 if (self != Task.init) () @trusted { s_taskEventCallback(TaskEvent.yield, self); } (); 339 () @trusted { s_taskEventCallback(TaskEvent.preStart, handle); } (); 340 } 341 driverCore.resumeTask(handle, null, true); 342 debug if (s_taskEventCallback) { 343 () @trusted { s_taskEventCallback(TaskEvent.postStart, handle); } (); 344 if (self != Task.init) () @trusted { s_taskEventCallback(TaskEvent.resume, self); } (); 345 } 346 347 return handle; 348 } 349 350 @safe unittest { 351 runTask({}); 352 } 353 354 /** 355 Runs a new asynchronous task in a worker thread. 356 357 Only function pointers with weakly isolated arguments are allowed to be 358 able to guarantee thread-safety. 359 */ 360 void runWorkerTask(FT, ARGS...)(FT func, auto ref ARGS args) 361 if (is(typeof(*func) == function)) 362 { 363 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 364 runWorkerTask_unsafe(func, args); 365 } 366 367 /// ditto 368 void runWorkerTask(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) 369 if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) 370 { 371 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 372 auto func = &__traits(getMember, object, __traits(identifier, method)); 373 runWorkerTask_unsafe(func, args); 374 } 375 376 /** 377 Runs a new asynchronous task in a worker thread, returning the task handle. 378 379 This function will yield and wait for the new task to be created and started 380 in the worker thread, then resume and return it. 381 382 Only function pointers with weakly isolated arguments are allowed to be 383 able to guarantee thread-safety. 384 */ 385 Task runWorkerTaskH(FT, ARGS...)(FT func, auto ref ARGS args) 386 if (is(typeof(*func) == function)) 387 { 388 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 389 390 alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__); 391 Task caller = Task.getThis(); 392 393 // workaround for runWorkerTaskH to work when called outside of a task 394 if (caller == Task.init) { 395 Task ret; 396 runTask({ ret = runWorkerTaskH(func, args); }).join(); 397 return ret; 398 } 399 400 assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task."); 401 static void taskFun(Task caller, FT func, ARGS args) { 402 PrivateTask callee = Task.getThis(); 403 caller.prioritySendCompat(callee); 404 mixin(callWithMove!ARGS("func", "args")); 405 } 406 runWorkerTask_unsafe(&taskFun, caller, func, args); 407 return () @trusted { return cast(Task)receiveOnlyCompat!PrivateTask(); } (); 408 } 409 /// ditto 410 Task runWorkerTaskH(alias method, T, ARGS...)(shared(T) object, auto ref ARGS args) 411 if (is(typeof(__traits(getMember, object, __traits(identifier, method))))) 412 { 413 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 414 415 auto func = &__traits(getMember, object, __traits(identifier, method)); 416 alias FT = typeof(func); 417 418 alias PrivateTask = Typedef!(Task, Task.init, __PRETTY_FUNCTION__); 419 Task caller = Task.getThis(); 420 421 // workaround for runWorkerTaskH to work when called outside of a task 422 if (caller == Task.init) { 423 Task ret; 424 runTask({ ret = runWorkerTaskH!method(object, args); }).join(); 425 return ret; 426 } 427 428 assert(caller != Task.init, "runWorkderTaskH can currently only be called from within a task."); 429 static void taskFun(Task caller, FT func, ARGS args) { 430 PrivateTask callee = Task.getThis(); 431 () @trusted { caller.prioritySendCompat(callee); } (); 432 mixin(callWithMove!ARGS("func", "args")); 433 } 434 runWorkerTask_unsafe(&taskFun, caller, func, args); 435 return cast(Task)receiveOnlyCompat!PrivateTask(); 436 } 437 438 /// Running a worker task using a function 439 unittest { 440 static void workerFunc(int param) 441 { 442 logInfo("Param: %s", param); 443 } 444 445 static void test() 446 { 447 runWorkerTask(&workerFunc, 42); 448 runWorkerTask(&workerFunc, cast(ubyte)42); // implicit conversion #719 449 runWorkerTaskDist(&workerFunc, 42); 450 runWorkerTaskDist(&workerFunc, cast(ubyte)42); // implicit conversion #719 451 } 452 } 453 454 /// Running a worker task using a class method 455 unittest { 456 static class Test { 457 void workerMethod(int param) 458 shared { 459 logInfo("Param: %s", param); 460 } 461 } 462 463 static void test() 464 { 465 auto cls = new shared Test; 466 runWorkerTask!(Test.workerMethod)(cls, 42); 467 runWorkerTask!(Test.workerMethod)(cls, cast(ubyte)42); // #719 468 runWorkerTaskDist!(Test.workerMethod)(cls, 42); 469 runWorkerTaskDist!(Test.workerMethod)(cls, cast(ubyte)42); // #719 470 } 471 } 472 473 /// Running a worker task using a function and communicating with it 474 unittest { 475 static void workerFunc(Task caller) 476 { 477 int counter = 10; 478 while (receiveOnlyCompat!string() == "ping" && --counter) { 479 logInfo("pong"); 480 caller.sendCompat("pong"); 481 } 482 caller.sendCompat("goodbye"); 483 484 } 485 486 static void test() 487 { 488 Task callee = runWorkerTaskH(&workerFunc, Task.getThis); 489 do { 490 logInfo("ping"); 491 callee.sendCompat("ping"); 492 } while (receiveOnlyCompat!string() == "pong"); 493 } 494 495 static void work719(int) {} 496 static void test719() { runWorkerTaskH(&work719, cast(ubyte)42); } 497 } 498 499 /// Running a worker task using a class method and communicating with it 500 unittest { 501 static class Test { 502 void workerMethod(Task caller) shared { 503 int counter = 10; 504 while (receiveOnlyCompat!string() == "ping" && --counter) { 505 logInfo("pong"); 506 caller.sendCompat("pong"); 507 } 508 caller.sendCompat("goodbye"); 509 } 510 } 511 512 static void test() 513 { 514 auto cls = new shared Test; 515 Task callee = runWorkerTaskH!(Test.workerMethod)(cls, Task.getThis()); 516 do { 517 logInfo("ping"); 518 callee.sendCompat("ping"); 519 } while (receiveOnlyCompat!string() == "pong"); 520 } 521 522 static class Class719 { 523 void work(int) shared {} 524 } 525 static void test719() { 526 auto cls = new shared Class719; 527 runWorkerTaskH!(Class719.work)(cls, cast(ubyte)42); 528 } 529 } 530 531 unittest { // run and join worker task from outside of a task 532 __gshared int i = 0; 533 auto t = runWorkerTaskH({ sleep(5.msecs); i = 1; }); 534 // FIXME: joining between threads not yet supported 535 //t.join(); 536 //assert(i == 1); 537 } 538 539 private void runWorkerTask_unsafe(CALLABLE, ARGS...)(CALLABLE callable, ref ARGS args) 540 { 541 import std.traits : ParameterTypeTuple; 542 import vibe.internal.meta.traits : areConvertibleTo; 543 import vibe.internal.meta.typetuple; 544 545 alias FARGS = ParameterTypeTuple!CALLABLE; 546 static assert(areConvertibleTo!(Group!ARGS, Group!FARGS), 547 "Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'."); 548 549 setupWorkerThreads(); 550 551 auto tfi = makeTaskFuncInfo(callable, args); 552 553 () @trusted { 554 synchronized (st_threadsMutex) st_workerTasks ~= tfi; 555 st_threadsSignal.emit(); 556 } (); 557 } 558 559 560 /** 561 Runs a new asynchronous task in all worker threads concurrently. 562 563 This function is mainly useful for long-living tasks that distribute their 564 work across all CPU cores. Only function pointers with weakly isolated 565 arguments are allowed to be able to guarantee thread-safety. 566 567 The number of tasks started is guaranteed to be equal to 568 `workerThreadCount`. 569 */ 570 void runWorkerTaskDist(FT, ARGS...)(FT func, auto ref ARGS args) 571 if (is(typeof(*func) == function)) 572 { 573 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 574 runWorkerTaskDist_unsafe(func, args); 575 } 576 /// ditto 577 void runWorkerTaskDist(alias method, T, ARGS...)(shared(T) object, ARGS args) 578 { 579 auto func = &__traits(getMember, object, __traits(identifier, method)); 580 foreach (T; ARGS) static assert(isWeaklyIsolated!T, "Argument type "~T.stringof~" is not safe to pass between threads."); 581 582 runWorkerTaskDist_unsafe(func, args); 583 } 584 585 private void runWorkerTaskDist_unsafe(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args) 586 { 587 import std.traits : ParameterTypeTuple; 588 import vibe.internal.meta.traits : areConvertibleTo; 589 import vibe.internal.meta.typetuple; 590 591 alias FARGS = ParameterTypeTuple!CALLABLE; 592 static assert(areConvertibleTo!(Group!ARGS, Group!FARGS), 593 "Cannot convert arguments '"~ARGS.stringof~"' to function arguments '"~FARGS.stringof~"'."); 594 595 setupWorkerThreads(); 596 597 auto tfi = makeTaskFuncInfo(callable, args); 598 599 synchronized (st_threadsMutex) { 600 foreach (ref ctx; st_threads) 601 if (ctx.isWorker) 602 ctx.taskQueue ~= tfi; 603 } 604 st_threadsSignal.emit(); 605 } 606 607 private TaskFuncInfo makeTaskFuncInfo(CALLABLE, ARGS...)(ref CALLABLE callable, ref ARGS args) 608 { 609 import std.algorithm : move; 610 import std.traits : hasElaborateAssign; 611 612 static struct TARGS { ARGS expand; } 613 614 static assert(CALLABLE.sizeof <= TaskFuncInfo.callable.length); 615 static assert(TARGS.sizeof <= maxTaskParameterSize, 616 "The arguments passed to run(Worker)Task must not exceed "~ 617 maxTaskParameterSize.to!string~" bytes in total size."); 618 619 static void callDelegate(TaskFuncInfo* tfi) { 620 assert(tfi.func is &callDelegate); 621 622 // copy original call data to stack 623 CALLABLE c; 624 TARGS args; 625 move(*(cast(CALLABLE*)tfi.callable.ptr), c); 626 move(*(cast(TARGS*)tfi.args.ptr), args); 627 628 // reset the info 629 tfi.func = null; 630 631 // make the call 632 mixin(callWithMove!ARGS("c", "args.expand")); 633 } 634 635 TaskFuncInfo tfi; 636 tfi.func = &callDelegate; 637 static if (hasElaborateAssign!CALLABLE) tfi.initCallable!CALLABLE(); 638 static if (hasElaborateAssign!TARGS) tfi.initArgs!TARGS(); 639 640 () @trusted { 641 tfi.typedCallable!CALLABLE = callable; 642 foreach (i, A; ARGS) { 643 static if (needsMove!A) args[i].move(tfi.typedArgs!TARGS.expand[i]); 644 else tfi.typedArgs!TARGS.expand[i] = args[i]; 645 } 646 } (); 647 return tfi; 648 } 649 650 import core.cpuid : threadsPerCPU; 651 /** 652 Sets up the thread pool used for executing worker tasks. 653 654 This function gives explicit control over the number of worker threads. 655 Note, to have an effect the function must be called before any worker 656 tasks are started. Otherwise the default number of worker threads 657 (`logicalProcessorCount`) will be used automatically. 658 659 Params: 660 num = The number of worker threads to initialize. Defaults to 661 `logicalProcessorCount`. 662 See_also: `runWorkerTask`, `runWorkerTaskH`, `runWorkerTaskDist` 663 */ 664 public void setupWorkerThreads(uint num = logicalProcessorCount()) 665 @safe { 666 static bool s_workerThreadsStarted = false; 667 if (s_workerThreadsStarted) return; 668 s_workerThreadsStarted = true; 669 670 () @trusted { 671 synchronized (st_threadsMutex) { 672 if (st_threads.any!(t => t.isWorker)) 673 return; 674 675 foreach (i; 0 .. num) { 676 auto thr = new Thread(&workerThreadFunc); 677 thr.name = format("Vibe Task Worker #%s", i); 678 st_threads ~= ThreadContext(thr, true); 679 thr.start(); 680 } 681 } 682 } (); 683 } 684 685 686 /** 687 Determines the number of logical processors in the system. 688 689 This number includes virtual cores on hyper-threading enabled CPUs. 690 */ 691 public @property uint logicalProcessorCount() 692 { 693 version (linux) { 694 import core.sys.linux.sys.sysinfo; 695 return get_nprocs(); 696 } else version (OSX) { 697 int count; 698 size_t count_len = count.sizeof; 699 sysctlbyname("hw.logicalcpu", &count, &count_len, null, 0); 700 return cast(uint)count_len; 701 } else version (FreeBSD) { 702 int count; 703 size_t count_len = count.sizeof; 704 sysctlbyname("hw.logicalcpu", &count, &count_len, null, 0); 705 return cast(uint)count_len; 706 } else version (NetBSD) { 707 int count; 708 size_t count_len = count.sizeof; 709 sysctlbyname("hw.logicalcpu", &count, &count_len, null, 0); 710 return cast(uint)count_len; 711 } else version (Solaris) { 712 return get_nprocs(); 713 } else version (Windows) { 714 import core.sys.windows.windows; 715 SYSTEM_INFO sysinfo; 716 GetSystemInfo(&sysinfo); 717 return sysinfo.dwNumberOfProcessors; 718 } else static assert(false, "Unsupported OS!"); 719 } 720 version (OSX) private extern(C) int sysctlbyname(const(char)* name, void* oldp, size_t* oldlen, void* newp, size_t newlen); 721 version (FreeBSD) private extern(C) int sysctlbyname(const(char)* name, void* oldp, size_t* oldlen, void* newp, size_t newlen); 722 version (NetBSD) private extern(C) int sysctlbyname(const(char)* name, void* oldp, size_t* oldlen, void* newp, size_t newlen); 723 version (Solaris) private extern(C) int get_nprocs(); 724 725 /** 726 Suspends the execution of the calling task to let other tasks and events be 727 handled. 728 729 Calling this function in short intervals is recommended if long CPU 730 computations are carried out by a task. It can also be used in conjunction 731 with Signals to implement cross-fiber events with no polling. 732 733 Throws: 734 May throw an `InterruptException` if `Task.interrupt()` gets called on 735 the calling task. 736 */ 737 void yield() 738 @safe { 739 // throw any deferred exceptions 740 driverCore.processDeferredExceptions(); 741 742 auto t = CoreTask.getThis(); 743 if (t && t !is CoreTask.ms_coreTask) { 744 assert(!t.m_queue, "Calling yield() when already yielded!?"); 745 if (!t.m_queue) 746 s_yieldedTasks.insertBack(t); 747 scope (exit) assert(t.m_queue is null, "Task not removed from yielders queue after being resumed."); 748 rawYield(); 749 } else { 750 // Let yielded tasks execute 751 () @trusted { driverCore.notifyIdle(); } (); 752 } 753 } 754 755 756 /** 757 Yields execution of this task until an event wakes it up again. 758 759 Beware that the task will starve if no event wakes it up. 760 */ 761 void rawYield() 762 @safe { 763 driverCore.yieldForEvent(); 764 } 765 766 /** 767 Suspends the execution of the calling task for the specified amount of time. 768 769 Note that other tasks of the same thread will continue to run during the 770 wait time, in contrast to $(D core.thread.Thread.sleep), which shouldn't be 771 used in vibe.d applications. 772 */ 773 void sleep(Duration timeout) 774 @safe { 775 assert(timeout >= 0.seconds, "Argument to sleep must not be negative."); 776 if (timeout <= 0.seconds) return; 777 auto tm = setTimer(timeout, null); 778 tm.wait(); 779 } 780 /// 781 unittest { 782 import vibe.core.core : sleep; 783 import vibe.core.log : logInfo; 784 import core.time : msecs; 785 786 void test() 787 { 788 logInfo("Sleeping for half a second..."); 789 sleep(500.msecs); 790 logInfo("Done sleeping."); 791 } 792 } 793 794 795 /** 796 Returns a new armed timer. 797 798 Note that timers can only work if an event loop is running. 799 800 Passing a `@system` callback is scheduled for deprecation. Use a 801 `@safe` callback instead. 802 803 Params: 804 timeout = Determines the minimum amount of time that elapses before the timer fires. 805 callback = This delegate will be called when the timer fires 806 periodic = Speficies if the timer fires repeatedly or only once 807 808 Returns: 809 Returns a Timer object that can be used to identify and modify the timer. 810 811 See_also: createTimer 812 */ 813 Timer setTimer(Duration timeout, void delegate() @safe callback, bool periodic = false) 814 @safe { 815 auto tm = createTimer(callback); 816 tm.rearm(timeout, periodic); 817 return tm; 818 } 819 /// ditto 820 Timer setTimer(Duration timeout, void delegate() @system callback, bool periodic = false) 821 @system { 822 return setTimer(timeout, () @trusted => callback(), periodic); 823 } 824 /// 825 unittest { 826 void printTime() 827 @safe { 828 import std.datetime; 829 logInfo("The time is: %s", Clock.currTime()); 830 } 831 832 void test() 833 { 834 import vibe.core.core; 835 // start a periodic timer that prints the time every second 836 setTimer(1.seconds, &printTime, true); 837 } 838 } 839 840 841 /** 842 Creates a new timer without arming it. 843 844 Passing a `@system` callback is scheduled for deprecation. Use a 845 `@safe` callback instead. 846 847 See_also: setTimer 848 */ 849 Timer createTimer(void delegate() @safe callback) 850 @safe { 851 auto drv = getEventDriver(); 852 return Timer(drv, drv.createTimer(callback)); 853 } 854 /// ditto 855 Timer createTimer(void delegate() @system callback) 856 @system { 857 return createTimer(() @trusted => callback()); 858 } 859 860 861 /** 862 Creates an event to wait on an existing file descriptor. 863 864 The file descriptor usually needs to be a non-blocking socket for this to 865 work. 866 867 Params: 868 file_descriptor = The Posix file descriptor to watch 869 event_mask = Specifies which events will be listened for 870 event_mode = Specifies event waiting mode 871 872 Returns: 873 Returns a newly created FileDescriptorEvent associated with the given 874 file descriptor. 875 */ 876 FileDescriptorEvent createFileDescriptorEvent(int file_descriptor, FileDescriptorEvent.Trigger event_mask, FileDescriptorEvent.Mode event_mode = FileDescriptorEvent.Mode.persistent) 877 { 878 auto drv = getEventDriver(); 879 return drv.createFileDescriptorEvent(file_descriptor, event_mask, event_mode); 880 } 881 882 883 /** 884 Sets the stack size to use for tasks. 885 886 The default stack size is set to 512 KiB on 32-bit systems and to 16 MiB 887 on 64-bit systems, which is sufficient for most tasks. Tuning this value 888 can be used to reduce memory usage for large numbers of concurrent tasks 889 or to avoid stack overflows for applications with heavy stack use. 890 891 Note that this function must be called at initialization time, before any 892 task is started to have an effect. 893 894 Also note that the stack will initially not consume actual physical memory - 895 it just reserves virtual address space. Only once the stack gets actually 896 filled up with data will physical memory then be reserved page by page. This 897 means that the stack can safely be set to large sizes on 64-bit systems 898 without having to worry about memory usage. 899 */ 900 void setTaskStackSize(size_t sz) 901 { 902 s_taskStackSize = sz; 903 } 904 905 906 /** 907 The number of worker threads used for processing worker tasks. 908 909 Note that this function will cause the worker threads to be started, 910 if they haven't already. 911 912 See_also: `runWorkerTask`, `runWorkerTaskH`, `runWorkerTaskDist`, 913 `setupWorkerThreads` 914 */ 915 @property size_t workerThreadCount() 916 out(count) { assert(count > 0); } 917 body { 918 setupWorkerThreads(); 919 return st_threads.count!(c => c.isWorker); 920 } 921 922 923 /** 924 Disables the signal handlers usually set up by vibe.d. 925 926 During the first call to `runEventLoop`, vibe.d usually sets up a set of 927 event handlers for SIGINT, SIGTERM and SIGPIPE. Since in some situations 928 this can be undesirable, this function can be called before the first 929 invocation of the event loop to avoid this. 930 931 Calling this function after `runEventLoop` will have no effect. 932 */ 933 void disableDefaultSignalHandlers() 934 { 935 synchronized (st_threadsMutex) 936 s_disableSignalHandlers = true; 937 } 938 939 /** 940 Sets the effective user and group ID to the ones configured for privilege lowering. 941 942 This function is useful for services run as root to give up on the privileges that 943 they only need for initialization (such as listening on ports <= 1024 or opening 944 system log files). 945 946 Note that this function is called automatically by vibe.d's default main 947 implementation, as well as by `runApplication`. 948 */ 949 void lowerPrivileges(string uname, string gname) @safe 950 { 951 if (!isRoot()) return; 952 if (uname != "" || gname != "") { 953 static bool tryParse(T)(string s, out T n) 954 { 955 import std.conv, std.ascii; 956 if (!isDigit(s[0])) return false; 957 n = parse!T(s); 958 return s.length==0; 959 } 960 int uid = -1, gid = -1; 961 if (uname != "" && !tryParse(uname, uid)) uid = getUID(uname); 962 if (gname != "" && !tryParse(gname, gid)) gid = getGID(gname); 963 setUID(uid, gid); 964 } else logWarn("Vibe was run as root, and no user/group has been specified for privilege lowering. Running with full permissions."); 965 } 966 967 // ditto 968 void lowerPrivileges() @safe 969 { 970 lowerPrivileges(s_privilegeLoweringUserName, s_privilegeLoweringGroupName); 971 } 972 973 974 /** 975 Sets a callback that is invoked whenever a task changes its status. 976 977 This function is useful mostly for implementing debuggers that 978 analyze the life time of tasks, including task switches. Note that 979 the callback will only be called for debug builds. 980 */ 981 void setTaskEventCallback(TaskEventCb func) 982 { 983 debug s_taskEventCallback = func; 984 } 985 986 987 /** 988 A version string representing the current vibe.d version 989 */ 990 enum vibeVersionString = "0.8.1"; 991 992 993 /** 994 The maximum combined size of all parameters passed to a task delegate 995 996 See_Also: runTask 997 */ 998 enum maxTaskParameterSize = 128; 999 1000 1001 /** 1002 Represents a timer. 1003 */ 1004 struct Timer { 1005 @safe: 1006 1007 private { 1008 EventDriver m_driver; 1009 size_t m_id; 1010 debug uint m_magicNumber = 0x4d34f916; 1011 } 1012 1013 private this(EventDriver driver, size_t id) 1014 { 1015 m_driver = driver; 1016 m_id = id; 1017 } 1018 1019 this(this) 1020 { 1021 debug assert(m_magicNumber == 0x4d34f916); 1022 if (m_driver) m_driver.acquireTimer(m_id); 1023 } 1024 1025 ~this() 1026 { 1027 debug assert(m_magicNumber == 0x4d34f916); 1028 if (m_driver && driverCore) m_driver.releaseTimer(m_id); 1029 } 1030 1031 /// True if the timer is yet to fire. 1032 @property bool pending() { return m_driver.isTimerPending(m_id); } 1033 1034 /// The internal ID of the timer. 1035 @property size_t id() const { return m_id; } 1036 1037 bool opCast() const { return m_driver !is null; } 1038 1039 /** Resets the timer to the specified timeout 1040 */ 1041 void rearm(Duration dur, bool periodic = false) 1042 in { assert(dur > 0.seconds); } 1043 body { m_driver.rearmTimer(m_id, dur, periodic); } 1044 1045 /** Resets the timer and avoids any firing. 1046 */ 1047 void stop() nothrow { m_driver.stopTimer(m_id); } 1048 1049 /** Waits until the timer fires. 1050 */ 1051 void wait() { m_driver.waitTimer(m_id); } 1052 } 1053 1054 1055 /** 1056 Implements a task local storage variable. 1057 1058 Task local variables, similar to thread local variables, exist separately 1059 in each task. Consequently, they do not need any form of synchronization 1060 when accessing them. 1061 1062 Note, however, that each TaskLocal variable will increase the memory footprint 1063 of any task that uses task local storage. There is also an overhead to access 1064 TaskLocal variables, higher than for thread local variables, but generelly 1065 still O(1) (since actual storage acquisition is done lazily the first access 1066 can require a memory allocation with unknown computational costs). 1067 1068 Notice: 1069 FiberLocal instances MUST be declared as static/global thread-local 1070 variables. Defining them as a temporary/stack variable will cause 1071 crashes or data corruption! 1072 1073 Examples: 1074 --- 1075 TaskLocal!string s_myString = "world"; 1076 1077 void taskFunc() 1078 { 1079 assert(s_myString == "world"); 1080 s_myString = "hello"; 1081 assert(s_myString == "hello"); 1082 } 1083 1084 shared static this() 1085 { 1086 // both tasks will get independent storage for s_myString 1087 runTask(&taskFunc); 1088 runTask(&taskFunc); 1089 } 1090 --- 1091 */ 1092 struct TaskLocal(T) 1093 { 1094 private { 1095 size_t m_offset = size_t.max; 1096 size_t m_id; 1097 T m_initValue; 1098 bool m_hasInitValue = false; 1099 } 1100 1101 this(T init_val) { m_initValue = init_val; m_hasInitValue = true; } 1102 1103 @disable this(this); 1104 1105 void opAssign(T value) { this.storage = value; } 1106 1107 @property ref T storage() 1108 { 1109 auto fiber = CoreTask.getThis(); 1110 1111 // lazily register in FLS storage 1112 if (m_offset == size_t.max) { 1113 static assert(T.alignof <= 8, "Unsupported alignment for type "~T.stringof); 1114 assert(CoreTask.ms_flsFill % 8 == 0, "Misaligned fiber local storage pool."); 1115 m_offset = CoreTask.ms_flsFill; 1116 m_id = CoreTask.ms_flsCounter++; 1117 1118 1119 CoreTask.ms_flsFill += T.sizeof; 1120 while (CoreTask.ms_flsFill % 8 != 0) 1121 CoreTask.ms_flsFill++; 1122 } 1123 1124 // make sure the current fiber has enough FLS storage 1125 if (fiber.m_fls.length < CoreTask.ms_flsFill) { 1126 fiber.m_fls.length = CoreTask.ms_flsFill + 128; 1127 fiber.m_flsInit.length = CoreTask.ms_flsCounter + 64; 1128 } 1129 1130 // return (possibly default initialized) value 1131 auto data = fiber.m_fls.ptr[m_offset .. m_offset+T.sizeof]; 1132 if (!fiber.m_flsInit[m_id]) { 1133 fiber.m_flsInit[m_id] = true; 1134 import std.traits : hasElaborateDestructor, hasAliasing; 1135 static if (hasElaborateDestructor!T || hasAliasing!T) { 1136 void function(void[], size_t) destructor = (void[] fls, size_t offset){ 1137 static if (hasElaborateDestructor!T) { 1138 auto obj = cast(T*)&fls[offset]; 1139 // call the destructor on the object if a custom one is known declared 1140 obj.destroy(); 1141 } 1142 else static if (hasAliasing!T) { 1143 // zero the memory to avoid false pointers 1144 foreach (size_t i; offset .. offset + T.sizeof) { 1145 ubyte* u = cast(ubyte*)&fls[i]; 1146 *u = 0; 1147 } 1148 } 1149 }; 1150 FLSInfo fls_info; 1151 fls_info.fct = destructor; 1152 fls_info.offset = m_offset; 1153 1154 // make sure flsInfo has enough space 1155 if (fiber.ms_flsInfo.length <= m_id) 1156 fiber.ms_flsInfo.length = m_id + 64; 1157 1158 fiber.ms_flsInfo[m_id] = fls_info; 1159 } 1160 1161 if (m_hasInitValue) { 1162 static if (__traits(compiles, emplace!T(data, m_initValue))) 1163 emplace!T(data, m_initValue); 1164 else assert(false, "Cannot emplace initialization value for type "~T.stringof); 1165 } else emplace!T(data); 1166 } 1167 return (cast(T[])data)[0]; 1168 } 1169 1170 alias storage this; 1171 } 1172 1173 private struct FLSInfo { 1174 void function(void[], size_t) fct; 1175 size_t offset; 1176 void destroy(void[] fls) { 1177 fct(fls, offset); 1178 } 1179 } 1180 1181 /** 1182 High level state change events for a Task 1183 */ 1184 enum TaskEvent { 1185 preStart, /// Just about to invoke the fiber which starts execution 1186 postStart, /// After the fiber has returned for the first time (by yield or exit) 1187 start, /// Just about to start execution 1188 yield, /// Temporarily paused 1189 resume, /// Resumed from a prior yield 1190 end, /// Ended normally 1191 fail /// Ended with an exception 1192 } 1193 1194 1195 /**************************************************************************************************/ 1196 /* private types */ 1197 /**************************************************************************************************/ 1198 1199 private class CoreTask : TaskFiber { 1200 import std.bitmanip; 1201 private { 1202 static CoreTask ms_coreTask; 1203 CoreTask m_nextInQueue; 1204 CoreTaskQueue* m_queue; 1205 TaskFuncInfo m_taskFunc; 1206 Exception m_exception; 1207 Task[] m_yielders; 1208 1209 // task local storage 1210 static FLSInfo[] ms_flsInfo; 1211 static size_t ms_flsFill = 0; // thread-local 1212 static size_t ms_flsCounter = 0; 1213 BitArray m_flsInit; 1214 void[] m_fls; 1215 } 1216 1217 static CoreTask getThis() 1218 @safe nothrow { 1219 auto f = () @trusted nothrow { 1220 return Fiber.getThis(); 1221 } (); 1222 if (f) return cast(CoreTask)f; 1223 if (!ms_coreTask) ms_coreTask = new CoreTask; 1224 return ms_coreTask; 1225 } 1226 1227 this() 1228 @trusted nothrow { 1229 super(&run, s_taskStackSize); 1230 } 1231 1232 // expose Fiber.state as @safe on older DMD versions 1233 static if (!__traits(compiles, () @safe { return Fiber.init.state; } ())) 1234 @property State state() @trusted const nothrow { return super.state; } 1235 1236 @property size_t taskCounter() const { return m_taskCounter; } 1237 1238 private void run() 1239 { 1240 version (VibeDebugCatchAll) alias UncaughtException = Throwable; 1241 else alias UncaughtException = Exception; 1242 try { 1243 while(true){ 1244 while (!m_taskFunc.func) { 1245 try { 1246 Fiber.yield(); 1247 } catch( Exception e ){ 1248 logWarn("CoreTaskFiber was resumed with exception but without active task!"); 1249 logDiagnostic("Full error: %s", e.toString().sanitize()); 1250 } 1251 } 1252 1253 auto task = m_taskFunc; 1254 m_taskFunc = TaskFuncInfo.init; 1255 Task handle = this.task; 1256 try { 1257 m_running = true; 1258 scope(exit) m_running = false; 1259 1260 static import std.concurrency; 1261 std.concurrency.thisTid; // force creation of a new Tid 1262 1263 debug if (s_taskEventCallback) s_taskEventCallback(TaskEvent.start, handle); 1264 if (!s_eventLoopRunning) { 1265 logTrace("Event loop not running at task start - yielding."); 1266 .yield(); 1267 logTrace("Initial resume of task."); 1268 } 1269 task.func(&task); 1270 debug if (s_taskEventCallback) s_taskEventCallback(TaskEvent.end, handle); 1271 } catch( Exception e ){ 1272 debug if (s_taskEventCallback) s_taskEventCallback(TaskEvent.fail, handle); 1273 import std.encoding; 1274 logCritical("Task terminated with uncaught exception: %s", e.msg); 1275 logDebug("Full error: %s", e.toString().sanitize()); 1276 } 1277 1278 this.tidInfo.ident = Tid.init; // reset Tid 1279 1280 // check for any unhandled deferred exceptions 1281 if (m_exception !is null) { 1282 if (cast(InterruptException)m_exception) { 1283 logDebug("InterruptException not handled by task before exit."); 1284 } else { 1285 logCritical("Deferred exception not handled by task before exit: %s", m_exception.msg); 1286 logDebug("Full error: %s", m_exception.toString().sanitize()); 1287 } 1288 } 1289 1290 foreach (t; m_yielders) s_yieldedTasks.insertBack(cast(CoreTask)t.fiber); 1291 m_yielders.length = 0; 1292 1293 // make sure that the task does not get left behind in the yielder queue if terminated during yield() 1294 if (m_queue) { 1295 s_core.resumeYieldedTasks(); 1296 assert(m_queue is null, "Still in yielder queue at the end of task after resuming all yielders!?"); 1297 } 1298 1299 // zero the fls initialization ByteArray for memory safety 1300 foreach (size_t i, ref bool b; m_flsInit) { 1301 if (b) { 1302 if (ms_flsInfo !is null && ms_flsInfo.length >= i && ms_flsInfo[i] != FLSInfo.init) 1303 ms_flsInfo[i].destroy(m_fls); 1304 b = false; 1305 } 1306 } 1307 1308 // make the fiber available for the next task 1309 if (s_availableFibers.full) 1310 s_availableFibers.capacity = 2 * s_availableFibers.capacity; 1311 1312 // clear the message queue for the next task 1313 messageQueue.clear(); 1314 1315 s_availableFibers.put(this); 1316 } 1317 } catch (UncaughtException th) { 1318 logCritical("CoreTaskFiber was terminated unexpectedly: %s", th.msg); 1319 logDiagnostic("Full error: %s", th.toString().sanitize()); 1320 s_fiberCount--; 1321 } 1322 } 1323 1324 override void join() 1325 { 1326 auto caller = Task.getThis(); 1327 if (!m_running) return; 1328 if (caller != Task.init) { 1329 assert(caller.fiber !is this, "A task cannot join itself."); 1330 assert(caller.thread is this.thread, "Joining tasks in foreign threads is currently not supported."); 1331 m_yielders ~= caller; 1332 } else assert(() @trusted { return Thread.getThis(); } () is this.thread, "Joining tasks in different threads is not yet supported."); 1333 auto run_count = m_taskCounter; 1334 if (caller == Task.init) () @trusted { return s_core; } ().resumeYieldedTasks(); // let the task continue (it must be yielded currently) 1335 while (m_running && run_count == m_taskCounter) rawYield(); 1336 } 1337 1338 override void interrupt() 1339 { 1340 auto caller = Task.getThis(); 1341 if (caller != Task.init) { 1342 assert(caller != this.task, "A task cannot interrupt itself."); 1343 assert(caller.thread is this.thread, "Interrupting tasks in different threads is not yet supported."); 1344 } else assert(Thread.getThis() is this.thread, "Interrupting tasks in different threads is not yet supported."); 1345 s_core.yieldAndResumeTask(this.task, new InterruptException); 1346 } 1347 1348 override void terminate() 1349 { 1350 assert(false, "Not implemented"); 1351 } 1352 } 1353 1354 1355 private class VibeDriverCore : DriverCore { 1356 @safe: 1357 1358 private { 1359 Duration m_gcCollectTimeout; 1360 Timer m_gcTimer; 1361 bool m_ignoreIdleForGC = false; 1362 Exception m_eventException; 1363 } 1364 1365 private void setupGcTimer() 1366 { 1367 m_gcTimer = createTimer(&collectGarbage); 1368 m_gcCollectTimeout = dur!"seconds"(2); 1369 } 1370 1371 @property void eventException(Exception e) { m_eventException = e; } 1372 1373 void yieldForEventDeferThrow() 1374 @safe nothrow { 1375 yieldForEventDeferThrow(Task.getThis()); 1376 } 1377 1378 void processDeferredExceptions() 1379 @safe { 1380 processDeferredExceptions(Task.getThis()); 1381 } 1382 1383 void yieldForEvent() 1384 @safe { 1385 auto task = Task.getThis(); 1386 processDeferredExceptions(task); 1387 yieldForEventDeferThrow(task); 1388 processDeferredExceptions(task); 1389 } 1390 1391 void resumeTask(Task task, Exception event_exception = null) 1392 @safe nothrow { 1393 assert(Task.getThis() == Task.init, "Calling resumeTask from another task."); 1394 resumeTask(task, event_exception, false); 1395 } 1396 1397 void yieldAndResumeTask(Task task, Exception event_exception = null) 1398 @safe { 1399 auto thisct = CoreTask.getThis(); 1400 1401 if (thisct is null || thisct is CoreTask.ms_coreTask) { 1402 resumeTask(task, event_exception); 1403 return; 1404 } 1405 1406 auto otherct = cast(CoreTask)task.fiber; 1407 assert(!thisct || otherct.thread is thisct.thread, "Resuming task in foreign thread."); 1408 assert(() @trusted { return otherct.state; } () == Fiber.State.HOLD, "Resuming fiber that is not on HOLD."); 1409 1410 if (event_exception) otherct.m_exception = event_exception; 1411 if (!otherct.m_queue) s_yieldedTasks.insertBack(otherct); 1412 yield(); 1413 } 1414 1415 void resumeTask(Task task, Exception event_exception, bool initial_resume) 1416 @safe nothrow { 1417 assert(initial_resume || task.running, "Resuming terminated task."); 1418 resumeCoreTask(cast(CoreTask)task.fiber, event_exception); 1419 } 1420 1421 void resumeCoreTask(CoreTask ctask, Exception event_exception = null) 1422 nothrow @safe { 1423 assert(ctask.thread is () @trusted { return Thread.getThis(); } (), "Resuming task in foreign thread."); 1424 assert(() @trusted nothrow { return ctask.state; } () == Fiber.State.HOLD, "Resuming fiber that is not on HOLD"); 1425 1426 if (event_exception) { 1427 extrap(); 1428 assert(!ctask.m_exception, "Resuming task with exception that is already scheduled to be resumed with exception."); 1429 ctask.m_exception = event_exception; 1430 } 1431 1432 // do nothing if the task is aready scheduled to be resumed 1433 if (ctask.m_queue) return; 1434 1435 try () @trusted { ctask.call!(Fiber.Rethrow.yes)(); } (); 1436 catch (Exception e) { 1437 extrap(); 1438 1439 assert(() @trusted nothrow { return ctask.state; } () == Fiber.State.TERM); 1440 logError("Task terminated with unhandled exception: %s", e.msg); 1441 logDebug("Full error: %s", () @trusted { return e.toString().sanitize; } ()); 1442 } 1443 } 1444 1445 void notifyIdle() 1446 { 1447 bool again = !getExitFlag(); 1448 while (again) { 1449 if (s_idleHandler) 1450 again = s_idleHandler(); 1451 else again = false; 1452 1453 resumeYieldedTasks(); 1454 1455 again = (again || !s_yieldedTasks.empty) && !getExitFlag(); 1456 1457 if (again && !getEventDriver().processEvents()) { 1458 logDebug("Setting exit flag due to driver signalling exit"); 1459 s_exitEventLoop = true; 1460 return; 1461 } 1462 } 1463 if (!s_yieldedTasks.empty) logDebug("Exiting from idle processing although there are still yielded tasks (exit=%s)", getExitFlag()); 1464 1465 if (() @trusted { return Thread.getThis() is st_mainThread; } ()) { 1466 if (!m_ignoreIdleForGC && m_gcTimer) { 1467 m_gcTimer.rearm(m_gcCollectTimeout); 1468 } else m_ignoreIdleForGC = false; 1469 } 1470 } 1471 1472 bool isScheduledForResume(Task t) 1473 { 1474 if (t == Task.init) return false; 1475 if (!t.running) return false; 1476 auto cf = cast(CoreTask)t.fiber; 1477 return cf.m_queue !is null; 1478 } 1479 1480 private void resumeYieldedTasks() 1481 nothrow @safe { 1482 for (auto limit = s_yieldedTasks.length; limit > 0 && !s_yieldedTasks.empty; limit--) { 1483 auto tf = s_yieldedTasks.front; 1484 s_yieldedTasks.popFront(); 1485 if (tf.state == Fiber.State.HOLD) resumeCoreTask(tf); 1486 } 1487 } 1488 1489 private void yieldForEventDeferThrow(Task task) 1490 @safe nothrow { 1491 if (task != Task.init) { 1492 debug if (s_taskEventCallback) () @trusted { s_taskEventCallback(TaskEvent.yield, task); } (); 1493 () @trusted { task.fiber.yield(); } (); 1494 debug if (s_taskEventCallback) () @trusted { s_taskEventCallback(TaskEvent.resume, task); } (); 1495 // leave fiber.m_exception untouched, so that it gets thrown on the next yieldForEvent call 1496 } else { 1497 assert(!s_eventLoopRunning, "Event processing outside of a fiber should only happen before the event loop is running!?"); 1498 m_eventException = null; 1499 () @trusted nothrow { resumeYieldedTasks(); } (); // let tasks that yielded because they were started outside of an event loop 1500 try if (auto err = () @trusted { return getEventDriver().runEventLoopOnce(); } ()) { 1501 logError("Error running event loop: %d", err); 1502 assert(err != 1, "No events registered, exiting event loop."); 1503 assert(false, "Error waiting for events."); 1504 } 1505 catch (Exception e) { 1506 assert(false, "Driver.runEventLoopOnce() threw: "~e.msg); 1507 } 1508 // leave m_eventException untouched, so that it gets thrown on the next yieldForEvent call 1509 } 1510 } 1511 1512 private void processDeferredExceptions(Task task) 1513 @safe { 1514 if (task != Task.init) { 1515 auto fiber = cast(CoreTask)task.fiber; 1516 if (auto e = fiber.m_exception) { 1517 fiber.m_exception = null; 1518 throw e; 1519 } 1520 } else { 1521 if (auto e = m_eventException) { 1522 m_eventException = null; 1523 throw e; 1524 } 1525 } 1526 } 1527 1528 private void collectGarbage() 1529 { 1530 import core.memory; 1531 logTrace("gc idle collect"); 1532 () @trusted { 1533 GC.collect(); 1534 GC.minimize(); 1535 } (); 1536 m_ignoreIdleForGC = true; 1537 } 1538 } 1539 1540 private struct ThreadContext { 1541 Thread thread; 1542 bool isWorker; 1543 TaskFuncInfo[] taskQueue; 1544 1545 this(Thread thr, bool worker) { this.thread = thr; this.isWorker = worker; } 1546 } 1547 1548 private struct TaskFuncInfo { 1549 void function(TaskFuncInfo*) func; 1550 void[2*size_t.sizeof] callable; 1551 void[maxTaskParameterSize] args; 1552 1553 @property ref C typedCallable(C)() 1554 @trusted { 1555 static assert(C.sizeof <= callable.sizeof); 1556 return *cast(C*)callable.ptr; 1557 } 1558 1559 @property ref A typedArgs(A)() 1560 @trusted { 1561 static assert(A.sizeof <= args.sizeof); 1562 return *cast(A*)args.ptr; 1563 } 1564 1565 void initCallable(C)() 1566 @trusted { 1567 C cinit; 1568 this.callable[0 .. C.sizeof] = cast(void[])(&cinit)[0 .. 1]; 1569 } 1570 1571 void initArgs(A)() 1572 @trusted { 1573 A ainit; 1574 this.args[0 .. A.sizeof] = cast(void[])(&ainit)[0 .. 1]; 1575 } 1576 } 1577 1578 alias TaskArgsVariant = VariantN!maxTaskParameterSize; 1579 1580 /**************************************************************************************************/ 1581 /* private functions */ 1582 /**************************************************************************************************/ 1583 1584 private { 1585 static if ((void*).sizeof >= 8) enum defaultTaskStackSize = 16*1024*1024; 1586 else enum defaultTaskStackSize = 512*1024; 1587 1588 __gshared VibeDriverCore s_core; 1589 __gshared size_t s_taskStackSize = defaultTaskStackSize; 1590 1591 __gshared core.sync.mutex.Mutex st_threadsMutex; 1592 __gshared ManualEvent st_threadsSignal; 1593 __gshared Thread st_mainThread; 1594 __gshared ThreadContext[] st_threads; 1595 __gshared TaskFuncInfo[] st_workerTasks; 1596 __gshared Condition st_threadShutdownCondition; 1597 __gshared debug TaskEventCb s_taskEventCallback; 1598 shared bool st_term = false; 1599 1600 bool s_exitEventLoop = false; 1601 bool s_eventLoopRunning = false; 1602 bool delegate() @safe s_idleHandler; 1603 CoreTaskQueue s_yieldedTasks; 1604 Variant[string] s_taskLocalStorageGlobal; // for use outside of a task 1605 FixedRingBuffer!CoreTask s_availableFibers; 1606 size_t s_fiberCount; 1607 1608 string s_privilegeLoweringUserName; 1609 string s_privilegeLoweringGroupName; 1610 __gshared bool s_disableSignalHandlers = false; 1611 } 1612 1613 private static @property VibeDriverCore driverCore() @trusted nothrow { return s_core; } 1614 1615 private bool getExitFlag() 1616 @trusted nothrow { 1617 return s_exitEventLoop || atomicLoad(st_term); 1618 } 1619 1620 private void setupSignalHandlers() 1621 { 1622 __gshared bool s_setup = false; 1623 1624 // only initialize in main thread 1625 synchronized (st_threadsMutex) { 1626 if (s_setup) return; 1627 s_setup = true; 1628 1629 if (s_disableSignalHandlers) return; 1630 1631 logTrace("setup signal handler"); 1632 version(Posix){ 1633 // support proper shutdown using signals 1634 sigset_t sigset; 1635 sigemptyset(&sigset); 1636 sigaction_t siginfo; 1637 siginfo.sa_handler = &onSignal; 1638 siginfo.sa_mask = sigset; 1639 siginfo.sa_flags = SA_RESTART; 1640 sigaction(SIGINT, &siginfo, null); 1641 sigaction(SIGTERM, &siginfo, null); 1642 1643 siginfo.sa_handler = &onBrokenPipe; 1644 sigaction(SIGPIPE, &siginfo, null); 1645 } 1646 1647 version(Windows){ 1648 // WORKAROUND: we don't care about viral @nogc attribute here! 1649 import std.traits; 1650 signal(SIGABRT, cast(ParameterTypeTuple!signal[1])&onSignal); 1651 signal(SIGTERM, cast(ParameterTypeTuple!signal[1])&onSignal); 1652 signal(SIGINT, cast(ParameterTypeTuple!signal[1])&onSignal); 1653 } 1654 } 1655 } 1656 1657 // per process setup 1658 shared static this() 1659 { 1660 st_mainThread = Thread.getThis(); 1661 1662 version(Windows){ 1663 version(VibeLibeventDriver) enum need_wsa = true; 1664 else version(VibeWin32Driver) enum need_wsa = true; 1665 else enum need_wsa = false; 1666 static if (need_wsa) { 1667 logTrace("init winsock"); 1668 // initialize WinSock2 1669 import core.sys.windows.winsock2; 1670 WSADATA data; 1671 WSAStartup(0x0202, &data); 1672 1673 } 1674 } 1675 1676 // COMPILER BUG: Must be some kind of module constructor order issue: 1677 // without this, the stdout/stderr handles are not initialized before 1678 // the log module is set up. 1679 import std.stdio; File f; f.close(); 1680 1681 initializeLogModule(); 1682 1683 logTrace("create driver core"); 1684 1685 s_core = new VibeDriverCore; 1686 st_threadsMutex = new Mutex; 1687 st_threadShutdownCondition = new Condition(st_threadsMutex); 1688 1689 auto thisthr = Thread.getThis(); 1690 thisthr.name = "Main"; 1691 assert(st_threads.length == 0, "Main thread not the first thread!?"); 1692 st_threads ~= ThreadContext(thisthr, false); 1693 1694 setupDriver(); 1695 1696 st_threadsSignal = getEventDriver().createManualEvent(); 1697 1698 version(VibeIdleCollect){ 1699 logTrace("setup gc"); 1700 driverCore.setupGcTimer(); 1701 } 1702 1703 version (VibeNoDefaultArgs) {} 1704 else { 1705 readOption("uid|user", &s_privilegeLoweringUserName, "Sets the user name or id used for privilege lowering."); 1706 readOption("gid|group", &s_privilegeLoweringGroupName, "Sets the group name or id used for privilege lowering."); 1707 } 1708 1709 // set up vibe.d compatibility for std.concurrency 1710 static import std.concurrency; 1711 std.concurrency.scheduler = new VibedScheduler; 1712 } 1713 1714 shared static ~this() 1715 { 1716 deleteEventDriver(); 1717 1718 size_t tasks_left; 1719 1720 synchronized (st_threadsMutex) { 1721 if( !st_workerTasks.empty ) tasks_left = st_workerTasks.length; 1722 } 1723 1724 if (!s_yieldedTasks.empty) tasks_left += s_yieldedTasks.length; 1725 if (tasks_left > 0) { 1726 logWarn("There were still %d tasks running at exit.", tasks_left); 1727 } 1728 1729 destroy(s_core); 1730 s_core = null; 1731 } 1732 1733 // per thread setup 1734 static this() 1735 { 1736 /// workaround for: 1737 // object.Exception@src/rt/minfo.d(162): Aborting: Cycle detected between modules with ctors/dtors: 1738 // vibe.core.core -> vibe.core.drivers.native -> vibe.core.drivers.libasync -> vibe.core.core 1739 if (Thread.getThis().isDaemon && Thread.getThis().name == "CmdProcessor") return; 1740 1741 assert(s_core !is null); 1742 1743 auto thisthr = Thread.getThis(); 1744 synchronized (st_threadsMutex) 1745 if (!st_threads.any!(c => c.thread is thisthr)) 1746 st_threads ~= ThreadContext(thisthr, false); 1747 1748 //CoreTask.ms_coreTask = new CoreTask; 1749 1750 setupDriver(); 1751 } 1752 1753 static ~this() 1754 { 1755 // Issue #1374: Sometimes Druntime for some reason calls `static ~this` after `shared static ~this` 1756 if (!s_core) return; 1757 1758 version(VibeLibasyncDriver) { 1759 import vibe.core.drivers.libasync; 1760 if (LibasyncDriver.isControlThread) 1761 return; 1762 } 1763 auto thisthr = Thread.getThis(); 1764 1765 bool is_main_thread = false; 1766 1767 synchronized (st_threadsMutex) { 1768 auto idx = st_threads.countUntil!(c => c.thread is thisthr); 1769 1770 // if we are the main thread, wait for all others before terminating 1771 is_main_thread = idx == 0; 1772 if (is_main_thread) { // we are the main thread, wait for others 1773 atomicStore(st_term, true); 1774 st_threadsSignal.emit(); 1775 // wait for all non-daemon threads to shut down 1776 while (st_threads[1 .. $].any!(th => !th.thread.isDaemon)) { 1777 logDiagnostic("Main thread still waiting for other threads: %s", 1778 st_threads[1 .. $].map!(t => t.thread.name ~ (t.isWorker ? " (worker thread)" : "")).join(", ")); 1779 st_threadShutdownCondition.wait(); 1780 } 1781 logDiagnostic("Main thread exiting"); 1782 } 1783 1784 assert(idx >= 0, "No more threads registered"); 1785 if (idx >= 0) { 1786 st_threads[idx] = st_threads[$-1]; 1787 st_threads.length--; 1788 } 1789 } 1790 1791 // delay deletion of the main event driver to "~shared static this()" 1792 if (!is_main_thread) deleteEventDriver(); 1793 1794 st_threadShutdownCondition.notifyAll(); 1795 } 1796 1797 package void setupDriver() 1798 { 1799 if (getEventDriver(true) !is null) return; 1800 1801 logTrace("create driver"); 1802 setupEventDriver(driverCore); 1803 logTrace("driver %s created", (cast(Object)getEventDriver()).classinfo.name); 1804 } 1805 1806 private void workerThreadFunc() 1807 nothrow { 1808 try { 1809 assert(s_core !is null); 1810 if (getExitFlag()) return; 1811 logDebug("entering worker thread"); 1812 runTask(toDelegate(&handleWorkerTasks)); 1813 logDebug("running event loop"); 1814 if (!getExitFlag()) runEventLoop(); 1815 logDebug("Worker thread exit."); 1816 } catch (Exception e) { 1817 scope (failure) abort(); 1818 logFatal("Worker thread terminated due to uncaught exception: %s", e.msg); 1819 logDebug("Full error: %s", e.toString().sanitize()); 1820 } catch (Throwable th) { 1821 scope (exit) abort(); 1822 logFatal("Worker thread terminated due to uncaught error: %s (%s)", th.msg); 1823 logFatal("Error type: %s", th.classinfo.name); 1824 logDebug("Full error: %s", th.toString().sanitize()); 1825 } 1826 } 1827 1828 private void handleWorkerTasks() 1829 { 1830 logDebug("worker thread enter"); 1831 1832 auto thisthr = Thread.getThis(); 1833 1834 logDebug("worker thread loop enter"); 1835 while(true){ 1836 auto emit_count = st_threadsSignal.emitCount; 1837 TaskFuncInfo task; 1838 1839 synchronized (st_threadsMutex) { 1840 auto idx = st_threads.countUntil!(c => c.thread is thisthr); 1841 assert(idx >= 0); 1842 logDebug("worker thread check"); 1843 1844 if (getExitFlag()) { 1845 if (st_threads[idx].taskQueue.length > 0) 1846 logWarn("Worker thread shuts down with specific worker tasks left in its queue."); 1847 if (st_threads.count!(c => c.isWorker) == 1 && st_workerTasks.length > 0) 1848 logWarn("Worker threads shut down with worker tasks still left in the queue."); 1849 break; 1850 } 1851 1852 if (!st_workerTasks.empty) { 1853 logDebug("worker thread got task"); 1854 task = st_workerTasks.front; 1855 st_workerTasks.popFront(); 1856 } else if (!st_threads[idx].taskQueue.empty) { 1857 logDebug("worker thread got specific task"); 1858 task = st_threads[idx].taskQueue.front; 1859 st_threads[idx].taskQueue.popFront(); 1860 } 1861 } 1862 1863 if (task.func !is null) runTask_internal(task); 1864 else emit_count = st_threadsSignal.wait(emit_count); 1865 } 1866 1867 logDebug("worker thread exit"); 1868 getEventDriver().exitEventLoop(); 1869 } 1870 1871 private void watchExitFlag() 1872 { 1873 auto emit_count = st_threadsSignal.emitCount; 1874 while (true) { 1875 synchronized (st_threadsMutex) { 1876 if (getExitFlag()) break; 1877 } 1878 1879 emit_count = st_threadsSignal.wait(emit_count); 1880 } 1881 1882 logDebug("main thread exit"); 1883 getEventDriver().exitEventLoop(); 1884 } 1885 1886 private extern(C) void extrap() 1887 @safe nothrow { 1888 logTrace("exception trap"); 1889 } 1890 1891 private extern(C) void onSignal(int signal) 1892 nothrow { 1893 atomicStore(st_term, true); 1894 try st_threadsSignal.emit(); catch (Throwable) {} 1895 1896 logInfo("Received signal %d. Shutting down.", signal); 1897 } 1898 1899 private extern(C) void onBrokenPipe(int signal) 1900 nothrow { 1901 logTrace("Broken pipe."); 1902 } 1903 1904 version(Posix) 1905 { 1906 private bool isRoot() @safe { return geteuid() == 0; } 1907 1908 private void setUID(int uid, int gid) @safe 1909 { 1910 logInfo("Lowering privileges to uid=%d, gid=%d...", uid, gid); 1911 if (gid >= 0) { 1912 enforce(() @trusted { return getgrgid(gid); }() !is null, "Invalid group id!"); 1913 enforce(setegid(gid) == 0, "Error setting group id!"); 1914 } 1915 //if( initgroups(const char *user, gid_t group); 1916 if (uid >= 0) { 1917 enforce(() @trusted { return getpwuid(uid); }() !is null, "Invalid user id!"); 1918 enforce(seteuid(uid) == 0, "Error setting user id!"); 1919 } 1920 } 1921 1922 private int getUID(string name) @safe 1923 { 1924 auto pw = () @trusted { return getpwnam(name.toStringz()); }(); 1925 enforce(pw !is null, "Unknown user name: "~name); 1926 return pw.pw_uid; 1927 } 1928 1929 private int getGID(string name) @safe 1930 { 1931 auto gr = () @trusted { return getgrnam(name.toStringz()); }(); 1932 enforce(gr !is null, "Unknown group name: "~name); 1933 return gr.gr_gid; 1934 } 1935 } else version(Windows){ 1936 private bool isRoot() @safe { return false; } 1937 1938 private void setUID(int uid, int gid) @safe 1939 { 1940 enforce(false, "UID/GID not supported on Windows."); 1941 } 1942 1943 private int getUID(string name) @safe 1944 { 1945 enforce(false, "Privilege lowering not supported on Windows."); 1946 assert(false); 1947 } 1948 1949 private int getGID(string name) @safe 1950 { 1951 enforce(false, "Privilege lowering not supported on Windows."); 1952 assert(false); 1953 } 1954 } 1955 1956 private struct CoreTaskQueue { 1957 @safe nothrow: 1958 1959 CoreTask first, last; 1960 size_t length; 1961 1962 @disable this(this); 1963 1964 @property bool empty() const { return first is null; } 1965 1966 @property CoreTask front() { return first; } 1967 1968 void insertBack(CoreTask task) 1969 { 1970 assert(task.m_queue == null, "Task is already scheduled to be resumed!"); 1971 assert(task.m_nextInQueue is null, "Task has m_nextInQueue set without being in a queue!?"); 1972 task.m_queue = &this; 1973 if (empty) 1974 first = task; 1975 else 1976 last.m_nextInQueue = task; 1977 last = task; 1978 length++; 1979 } 1980 1981 void popFront() 1982 { 1983 if (first is last) last = null; 1984 assert(first && first.m_queue == &this); 1985 auto next = first.m_nextInQueue; 1986 first.m_nextInQueue = null; 1987 first.m_queue = null; 1988 first = next; 1989 length--; 1990 } 1991 } 1992 1993 // mixin string helper to call a function with arguments that potentially have 1994 // to be moved 1995 private string callWithMove(ARGS...)(string func, string args) 1996 { 1997 import std.string; 1998 string ret = func ~ "("; 1999 foreach (i, T; ARGS) { 2000 if (i > 0) ret ~= ", "; 2001 ret ~= format("%s[%s]", args, i); 2002 static if (needsMove!T) ret ~= ".move"; 2003 } 2004 return ret ~ ");"; 2005 } 2006 2007 private template needsMove(T) 2008 { 2009 template isCopyable(T) 2010 { 2011 enum isCopyable = __traits(compiles, (T a) { return a; }); 2012 } 2013 2014 template isMoveable(T) 2015 { 2016 enum isMoveable = __traits(compiles, (T a) { return a.move; }); 2017 } 2018 2019 enum needsMove = !isCopyable!T; 2020 2021 static assert(isCopyable!T || isMoveable!T, 2022 "Non-copyable type "~T.stringof~" must be movable with a .move property."); 2023 } 2024 2025 unittest { 2026 enum E { a, move } 2027 static struct S { 2028 @disable this(this); 2029 @property S move() { return S.init; } 2030 } 2031 static struct T { @property T move() { return T.init; } } 2032 static struct U { } 2033 static struct V { 2034 @disable this(); 2035 @disable this(this); 2036 @property V move() { return V.init; } 2037 } 2038 static struct W { @disable this(); } 2039 2040 static assert(needsMove!S); 2041 static assert(!needsMove!int); 2042 static assert(!needsMove!string); 2043 static assert(!needsMove!E); 2044 static assert(!needsMove!T); 2045 static assert(!needsMove!U); 2046 static assert(needsMove!V); 2047 static assert(!needsMove!W); 2048 } 2049 2050 // DMD currently has no option to set merging of coverage files at compile-time 2051 // This needs to be done via a Druntime API 2052 // As this option is solely for Vibed's internal testsuite, it's hidden behind 2053 // a long version 2054 version(VibedSetCoverageMerge) 2055 shared static this() { 2056 import core.runtime : dmd_coverSetMerge; 2057 dmd_coverSetMerge(true); 2058 }