1 /** 2 Driver implementation for the libevent library 3 4 Libevent is a well-established event notification library. 5 It is currently the default driver for Vibe.d 6 7 See_Also: 8 `vibe.core.driver` = interface definition 9 http://libevent.org/ = Official website 10 `vibe.core.drivers.libevent2_tcp` = Implementation of TCPConnection and TCPListener 11 12 Copyright: © 2012-2015 RejectedSoftware e.K. 13 Authors: Sönke Ludwig 14 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 15 */ 16 module vibe.core.drivers.libevent2; 17 18 version(VibeLibeventDriver) 19 { 20 21 import vibe.core.driver; 22 import vibe.core.drivers.libevent2_tcp; 23 import vibe.core.drivers.threadedfile; 24 import vibe.core.drivers.timerqueue; 25 import vibe.core.drivers.utils; 26 import vibe.core.log; 27 import vibe.internal.meta.traits : synchronizedIsNothrow; 28 import vibe.utils.array : ArraySet; 29 import vibe.utils.hashmap; 30 import vibe.internal.allocator; 31 import vibe.internal.freelistref; 32 33 import core.memory; 34 import core.atomic; 35 import core.stdc.config; 36 import core.stdc.errno; 37 import core.stdc.stdlib; 38 import core.sync.condition; 39 import core.sync.mutex; 40 import core.sync.rwmutex; 41 import core.sys.posix.netinet.in_; 42 import core.sys.posix.netinet.tcp; 43 import core.thread; 44 import deimos.event2.bufferevent; 45 import deimos.event2.dns; 46 import deimos.event2.event; 47 import deimos.event2.thread; 48 import deimos.event2.util; 49 import std.conv; 50 import std.datetime; 51 import std.exception; 52 import std.string; 53 54 55 version (Windows) 56 { 57 version(VibePragmaLib) pragma(lib, "event2"); 58 pragma(lib, "ws2_32.lib"); 59 } 60 else 61 version(VibePragmaLib) pragma(lib, "event"); 62 63 version(Windows) 64 { 65 import core.sys.windows.winsock2; 66 67 alias EWOULDBLOCK = WSAEWOULDBLOCK; 68 } 69 70 version(OSX) 71 { 72 static if (__VERSION__ < 2077) 73 { 74 enum IP_ADD_MEMBERSHIP = 12; 75 enum IP_MULTICAST_LOOP = 11; 76 } 77 else 78 import core.sys.darwin.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP; 79 } else version(FreeBSD) 80 { 81 static if (__VERSION__ < 2077) 82 { 83 enum IP_ADD_MEMBERSHIP = 12; 84 enum IP_MULTICAST_LOOP = 11; 85 } 86 else 87 import core.sys.freebsd.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP; 88 } else version(linux) 89 { 90 static if (__VERSION__ < 2077) 91 { 92 enum IP_ADD_MEMBERSHIP = 35; 93 enum IP_MULTICAST_LOOP = 34; 94 } 95 else 96 import core.sys.linux.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP; 97 } else version(Windows) 98 { 99 // IP_ADD_MEMBERSHIP and IP_MULTICAST_LOOP are included in winsock(2) import above 100 } 101 102 final class Libevent2Driver : EventDriver { 103 @safe: 104 105 import std.container : DList; 106 import std.datetime : Clock; 107 108 private { 109 DriverCore m_core; 110 event_base* m_eventLoop; 111 evdns_base* m_dnsBase; 112 bool m_exit = false; 113 ArraySet!size_t m_ownedObjects; 114 debug Thread m_ownerThread; 115 116 event* m_timerEvent; 117 SysTime m_timerTimeout = SysTime.max; 118 TimerQueue!TimerInfo m_timers; 119 DList!AddressInfo m_addressInfoCache; 120 size_t m_addressInfoCacheLength = 0; 121 122 bool m_running = false; // runEventLoop in progress? 123 124 IAllocator m_allocator; 125 } 126 127 this(DriverCore core) @trusted nothrow 128 { 129 debug m_ownerThread = () @trusted { return Thread.getThis(); } (); 130 m_core = core; 131 s_driverCore = core; 132 133 m_allocator = Mallocator.instance.allocatorObject; 134 s_driver = this; 135 136 synchronized if (!s_threadObjectsMutex) { 137 s_threadObjectsMutex = new Mutex; 138 s_threadObjects.setAllocator(m_allocator); 139 140 // set the malloc/free versions of our runtime so we don't run into trouble 141 // because the libevent DLL uses a different one. 142 event_set_mem_functions(&lev_alloc, &lev_realloc, &lev_free); 143 144 evthread_lock_callbacks lcb; 145 lcb.lock_api_version = EVTHREAD_LOCK_API_VERSION; 146 lcb.supported_locktypes = EVTHREAD_LOCKTYPE_RECURSIVE|EVTHREAD_LOCKTYPE_READWRITE; 147 lcb.alloc = &lev_alloc_mutex; 148 lcb.free = &lev_free_mutex; 149 lcb.lock = &lev_lock_mutex; 150 lcb.unlock = &lev_unlock_mutex; 151 evthread_set_lock_callbacks(&lcb); 152 153 evthread_condition_callbacks ccb; 154 ccb.condition_api_version = EVTHREAD_CONDITION_API_VERSION; 155 ccb.alloc_condition = &lev_alloc_condition; 156 ccb.free_condition = &lev_free_condition; 157 ccb.signal_condition = &lev_signal_condition; 158 ccb.wait_condition = &lev_wait_condition; 159 evthread_set_condition_callbacks(&ccb); 160 161 evthread_set_id_callback(&lev_get_thread_id); 162 } 163 164 // initialize libevent 165 logDiagnostic("libevent version: %s", event_get_version()); 166 m_eventLoop = event_base_new(); 167 s_eventLoop = m_eventLoop; 168 logDiagnostic("libevent is using %s for events.", event_base_get_method(m_eventLoop)); 169 evthread_make_base_notifiable(m_eventLoop); 170 171 m_dnsBase = evdns_base_new(m_eventLoop, 1); 172 if( !m_dnsBase ) logError("Failed to initialize DNS lookup."); 173 evdns_base_set_option(m_dnsBase, "randomize-case:", "0"); 174 175 string hosts_file; 176 version (Windows) hosts_file = `C:\Windows\System32\drivers\etc\hosts`; 177 else hosts_file = `/etc/hosts`; 178 if (existsFile(hosts_file)) { 179 if (evdns_base_load_hosts(m_dnsBase, hosts_file.toStringz()) != 0) 180 logError("Failed to load hosts file at %s", hosts_file); 181 } 182 183 m_timerEvent = () @trusted { return event_new(m_eventLoop, -1, EV_TIMEOUT, &onTimerTimeout, cast(void*)this); } (); 184 } 185 186 void dispose() 187 { 188 debug assert(() @trusted { return Thread.getThis(); } () is m_ownerThread, "Event loop destroyed in foreign thread."); 189 190 () @trusted { event_free(m_timerEvent); } (); 191 192 // notify all other living objects about the shutdown 193 synchronized (() @trusted { return s_threadObjectsMutex; } ()) { 194 // destroy all living objects owned by this driver 195 foreach (ref key; m_ownedObjects) { 196 assert(key); 197 auto obj = () @trusted { return cast(Libevent2Object)cast(void*)key; } (); 198 debug assert(obj.m_ownerThread is m_ownerThread, "Owned object with foreign thread ID detected."); 199 debug assert(obj.m_driver is this, "Owned object with foreign driver reference detected."); 200 key = 0; 201 () @trusted { destroy(obj); } (); 202 } 203 204 ref getThreadObjects() @trusted { return s_threadObjects; } 205 206 foreach (ref key; getThreadObjects()) { 207 assert(key); 208 auto obj = () @trusted { return cast(Libevent2Object)cast(void*)key; } (); 209 debug assert(obj.m_ownerThread !is m_ownerThread, "Live object of this thread detected after all owned mutexes have been destroyed."); 210 debug assert(obj.m_driver !is this, "Live object of this driver detected with different thread ID after all owned mutexes have been destroyed."); 211 // WORKAROUND for a possible race-condition in case of concurrent GC collections 212 // Since this only occurs on shutdown and rarely, this should be an acceptable 213 // "solution" until this is all switched to RC. 214 if (auto me = cast(Libevent2ManualEvent)obj) 215 if (!me.m_mutex) continue; 216 obj.onThreadShutdown(); 217 } 218 } 219 220 // shutdown libevent for this thread 221 () @trusted { 222 evdns_base_free(m_dnsBase, 1); 223 event_base_free(m_eventLoop); 224 } (); 225 s_eventLoop = null; 226 s_alreadyDeinitialized = true; 227 } 228 229 @property event_base* eventLoop() nothrow { return m_eventLoop; } 230 @property evdns_base* dnsEngine() nothrow { return m_dnsBase; } 231 232 int runEventLoop() 233 { 234 m_running = true; 235 scope (exit) m_running = false; 236 237 int ret; 238 m_exit = false; 239 while (!m_exit && (ret = () @trusted { return event_base_loop(m_eventLoop, EVLOOP_ONCE); } ()) == 0) { 240 processTimers(); 241 () @trusted { return s_driverCore; } ().notifyIdle(); 242 } 243 m_exit = false; 244 return ret; 245 } 246 247 int runEventLoopOnce() 248 { 249 auto ret = () @trusted { return event_base_loop(m_eventLoop, EVLOOP_ONCE); } (); 250 processTimers(); 251 m_core.notifyIdle(); 252 return ret; 253 } 254 255 bool processEvents() 256 { 257 logDebugV("process events with exit == %s", m_exit); 258 () @trusted { event_base_loop(m_eventLoop, EVLOOP_NONBLOCK|EVLOOP_ONCE); } (); 259 processTimers(); 260 logDebugV("processed events with exit == %s", m_exit); 261 if (m_exit) { 262 // leave the flag set, if the event loop is still running to let it exit, too 263 if (!m_running) m_exit = false; 264 return false; 265 } 266 return true; 267 } 268 269 void exitEventLoop() 270 { 271 logDebug("Libevent2Driver.exitEventLoop called"); 272 m_exit = true; 273 enforce(() @trusted { return event_base_loopbreak(m_eventLoop); } () == 0, "Failed to exit libevent event loop."); 274 } 275 276 ThreadedFileStream openFile(Path path, FileMode mode) 277 { 278 return new ThreadedFileStream(path, mode); 279 } 280 281 DirectoryWatcher watchDirectory(Path path, bool recursive) 282 { 283 version (linux) return new InotifyDirectoryWatcher(m_core, path, recursive); 284 assert(false, "watchDirectory is not yet implemented in the libevent driver."); 285 } 286 287 NetworkAddress resolveHost(string host, ushort family = AF_UNSPEC, bool use_dns = true) 288 { 289 assert(m_dnsBase); 290 291 foreach (ai; m_addressInfoCache) 292 if (ai.host == host && ai.family == family && ai.useDNS == use_dns) 293 return ai.address; 294 295 evutil_addrinfo hints; 296 hints.ai_family = family; 297 if (!use_dns) { 298 //When this flag is set, we only resolve numeric IPv4 and IPv6 299 //addresses; if the nodename would require a name lookup, we instead 300 //give an EVUTIL_EAI_NONAME error. 301 hints.ai_flags = EVUTIL_AI_NUMERICHOST; 302 } 303 304 logDebug("dnsresolve %s", host); 305 GetAddrInfoMsg msg; 306 msg.core = m_core; 307 evdns_getaddrinfo_request* dnsReq = () @trusted { return evdns_getaddrinfo(m_dnsBase, toStringz(host), null, 308 &hints, &onAddrInfo, &msg); } (); 309 310 // wait if the request couldn't be fulfilled instantly 311 if (!msg.done) { 312 assert(dnsReq !is null); 313 msg.task = Task.getThis(); 314 logDebug("dnsresolve yield"); 315 while (!msg.done) m_core.yieldForEvent(); 316 } 317 318 logDebug("dnsresolve ret"); 319 enforce(msg.err == DNS_ERR_NONE, format("Failed to lookup host '%s': %s", host, () @trusted { return evutil_gai_strerror(msg.err); } ())); 320 321 if (m_addressInfoCacheLength >= 10) m_addressInfoCache.removeFront(); 322 else m_addressInfoCacheLength++; 323 m_addressInfoCache.insertBack(AddressInfo(msg.addr, host, family, use_dns)); 324 return msg.addr; 325 } 326 327 Libevent2TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_addr) 328 { 329 assert(addr.family == bind_addr.family, "Mismatching bind and target address."); 330 331 auto sockfd_raw = () @trusted { return socket(addr.family, SOCK_STREAM, 0); } (); 332 // on Win64 socket() returns a 64-bit value but libevent expects an int 333 static if (typeof(sockfd_raw).max > int.max) assert(sockfd_raw <= int.max || sockfd_raw == ~0); 334 auto sockfd = cast(int)sockfd_raw; 335 socketEnforce(sockfd != -1, "Failed to create socket."); 336 337 socketEnforce(() @trusted { return bind(sockfd, bind_addr.sockAddr, bind_addr.sockAddrLen); } () == 0, "Failed to bind socket."); 338 339 if (() @trusted { return evutil_make_socket_nonblocking(sockfd); } ()) 340 throw new Exception("Failed to make socket non-blocking."); 341 342 auto buf_event = () @trusted { return bufferevent_socket_new(m_eventLoop, sockfd, bufferevent_options.BEV_OPT_CLOSE_ON_FREE); } (); 343 if (!buf_event) throw new Exception("Failed to create buffer event for socket."); 344 345 auto cctx = () @trusted { return TCPContextAlloc.alloc(m_core, m_eventLoop, sockfd, buf_event, bind_addr, addr); } (); 346 scope(failure) () @trusted { 347 if (cctx.event) bufferevent_free(cctx.event); 348 TCPContextAlloc.free(cctx); 349 } (); 350 () @trusted { bufferevent_setcb(buf_event, &onSocketRead, &onSocketWrite, &onSocketEvent, cctx); } (); 351 if (() @trusted { return bufferevent_enable(buf_event, EV_READ|EV_WRITE); } ()) 352 throw new Exception("Error enabling buffered I/O event for socket."); 353 354 cctx.readOwner = Task.getThis(); 355 scope(exit) cctx.readOwner = Task(); 356 357 assert(cctx.exception is null); 358 socketEnforce(() @trusted { return bufferevent_socket_connect(buf_event, addr.sockAddr, addr.sockAddrLen); } () == 0, 359 "Failed to connect to " ~ addr.toString()); 360 361 try { 362 cctx.checkForException(); 363 364 // TODO: cctx.remote_addr6 = ...; 365 366 while (cctx.status == 0) 367 m_core.yieldForEvent(); 368 } catch (InterruptException e) { 369 throw e; 370 } catch (Exception e) { 371 throw new Exception(format("Failed to connect to %s: %s", addr.toString(), e.msg)); 372 } 373 374 logTrace("Connect result status: %d", cctx.status); 375 enforce(cctx.status == BEV_EVENT_CONNECTED, cctx.statusMessage 376 ? format("Failed to connect to host %s: %s", addr.toString(), cctx.statusMessage) 377 : format("Failed to connect to host %s: %s", addr.toString(), cctx.status)); 378 379 socklen_t balen = bind_addr.sockAddrLen; 380 socketEnforce(() @trusted { return getsockname(sockfd, bind_addr.sockAddr, &balen); } () == 0, "getsockname failed."); 381 cctx.local_addr = bind_addr; 382 383 return new Libevent2TCPConnection(cctx); 384 } 385 386 Libevent2TCPListener listenTCP(ushort port, void delegate(TCPConnection conn) @safe connection_callback, string address, TCPListenOptions options) 387 { 388 auto bind_addr = resolveHost(address, AF_UNSPEC, false); 389 bind_addr.port = port; 390 391 auto listenfd_raw = () @trusted { return socket(bind_addr.family, SOCK_STREAM, 0); } (); 392 // on Win64 socket() returns a 64-bit value but libevent expects an int 393 static if (typeof(listenfd_raw).max > int.max) assert(listenfd_raw <= int.max || listenfd_raw == ~0); 394 auto listenfd = cast(int)listenfd_raw; 395 socketEnforce(listenfd != -1, "Error creating listening socket"); 396 int tmp_reuse = 1; 397 socketEnforce(() @trusted { return setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof); } () == 0, 398 "Error enabling socket address reuse on listening socket"); 399 version (linux) { 400 if (options & TCPListenOptions.reusePort) { 401 if (() @trusted { return setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, &tmp_reuse, tmp_reuse.sizeof); } ()) { 402 if (errno != EINVAL && errno != ENOPROTOOPT) { 403 socketEnforce(false, "Error enabling socket port reuse on listening socket"); 404 } 405 } 406 } 407 } 408 socketEnforce(() @trusted { return bind(listenfd, bind_addr.sockAddr, bind_addr.sockAddrLen); } () == 0, 409 "Error binding listening socket"); 410 411 socketEnforce(() @trusted { return listen(listenfd, 128); } () == 0, 412 "Error listening to listening socket"); 413 414 // Set socket for non-blocking I/O 415 enforce(() @trusted { return evutil_make_socket_nonblocking(listenfd); } () == 0, 416 "Error setting listening socket to non-blocking I/O."); 417 418 socklen_t balen = bind_addr.sockAddrLen; 419 socketEnforce(() @trusted { return getsockname(listenfd, bind_addr.sockAddr, &balen); } () == 0, "getsockname failed."); 420 421 auto ret = new Libevent2TCPListener(bind_addr); 422 423 static final class HandlerContext { 424 Libevent2TCPListener listener; 425 int listenfd; 426 NetworkAddress bind_addr; 427 void delegate(TCPConnection) @safe connection_callback; 428 TCPListenOptions options; 429 } 430 431 auto hc = new HandlerContext; 432 hc.listener = ret; 433 hc.listenfd = listenfd; 434 hc.bind_addr = bind_addr; 435 hc.connection_callback = connection_callback; 436 hc.options = options; 437 438 static void setupConnectionHandler(shared(HandlerContext) handler_context_) 439 @safe { 440 auto handler_context = () @trusted { return cast(HandlerContext)handler_context_; } (); 441 auto evloop = getThreadLibeventEventLoop(); 442 auto core = getThreadLibeventDriverCore(); 443 // Add an event to wait for connections 444 auto ctx = () @trusted { return TCPContextAlloc.alloc(core, evloop, handler_context.listenfd, null, handler_context.bind_addr, NetworkAddress()); } (); 445 scope(failure) () @trusted { TCPContextAlloc.free(ctx); } (); 446 ctx.connectionCallback = handler_context.connection_callback; 447 ctx.listenEvent = () @trusted { return event_new(evloop, handler_context.listenfd, EV_READ | EV_PERSIST, &onConnect, ctx); } (); 448 ctx.listenOptions = handler_context.options; 449 enforce(() @trusted { return event_add(ctx.listenEvent, null); } () == 0, 450 "Error scheduling connection event on the event loop."); 451 handler_context.listener.addContext(ctx); 452 } 453 454 // FIXME: the API needs improvement with proper shared annotations, so the the following casts are not necessary 455 if (options & TCPListenOptions.distribute) () @trusted { return runWorkerTaskDist(&setupConnectionHandler, cast(shared)hc); } (); 456 else setupConnectionHandler(() @trusted { return cast(shared)hc; } ()); 457 458 return ret; 459 } 460 461 Libevent2UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0") 462 { 463 NetworkAddress bindaddr = resolveHost(bind_address, AF_UNSPEC, false); 464 bindaddr.port = port; 465 466 return new Libevent2UDPConnection(bindaddr, this); 467 } 468 469 Libevent2ManualEvent createManualEvent() 470 { 471 return new Libevent2ManualEvent(this); 472 } 473 474 Libevent2FileDescriptorEvent createFileDescriptorEvent(int fd, FileDescriptorEvent.Trigger events, FileDescriptorEvent.Mode mode) 475 { 476 return new Libevent2FileDescriptorEvent(this, fd, events, mode); 477 } 478 479 size_t createTimer(void delegate() @safe callback) { return m_timers.create(TimerInfo(callback)); } 480 481 void acquireTimer(size_t timer_id) { m_timers.getUserData(timer_id).refCount++; } 482 void releaseTimer(size_t timer_id) 483 nothrow { 484 debug assert(m_ownerThread is () @trusted { return Thread.getThis(); } ()); 485 if (!--m_timers.getUserData(timer_id).refCount) 486 m_timers.destroy(timer_id); 487 } 488 489 bool isTimerPending(size_t timer_id) { return m_timers.isPending(timer_id); } 490 491 void rearmTimer(size_t timer_id, Duration dur, bool periodic) 492 { 493 debug assert(m_ownerThread is () @trusted { return Thread.getThis(); } ()); 494 if (!isTimerPending(timer_id)) acquireTimer(timer_id); 495 m_timers.schedule(timer_id, dur, periodic); 496 rescheduleTimerEvent(Clock.currTime(UTC())); 497 } 498 499 void stopTimer(size_t timer_id) 500 { 501 logTrace("Stopping timer %s", timer_id); 502 if (m_timers.isPending(timer_id)) { 503 m_timers.unschedule(timer_id); 504 releaseTimer(timer_id); 505 } 506 } 507 508 void waitTimer(size_t timer_id) 509 { 510 debug assert(m_ownerThread is () @trusted { return Thread.getThis(); } ()); 511 while (true) { 512 assert(!m_timers.isPeriodic(timer_id), "Cannot wait for a periodic timer."); 513 if (!m_timers.isPending(timer_id)) return; 514 auto data = () @trusted { return &m_timers.getUserData(timer_id); } (); 515 assert(data.owner == Task.init, "Waiting for the same timer from multiple tasks is not supported."); 516 data.owner = Task.getThis(); 517 scope (exit) m_timers.getUserData(timer_id).owner = Task.init; 518 m_core.yieldForEvent(); 519 } 520 } 521 522 private void processTimers() 523 { 524 if (!m_timers.anyPending) return; 525 526 logTrace("Processing due timers"); 527 // process all timers that have expired up to now 528 auto now = Clock.currTime(UTC()); 529 m_timers.consumeTimeouts(now, (timer, periodic, ref data) @safe { 530 Task owner = data.owner; 531 auto callback = data.callback; 532 533 logTrace("Timer %s fired (%s/%s)", timer, owner != Task.init, callback !is null); 534 535 if (!periodic) releaseTimer(timer); 536 537 if (owner && owner.running) m_core.resumeTask(owner); 538 if (callback) () @trusted { runTask(callback); } (); 539 }); 540 541 rescheduleTimerEvent(now); 542 } 543 544 private void rescheduleTimerEvent(SysTime now) 545 { 546 auto next = m_timers.getFirstTimeout(); 547 if (next == SysTime.max || next == m_timerTimeout) return; 548 549 m_timerTimeout = now; 550 auto dur = next - now; 551 () @trusted { event_del(m_timerEvent); } (); 552 assert(dur.total!"seconds"() <= int.max); 553 dur += 9.hnsecs(); // round up to the next usec to avoid premature timer events 554 timeval tvdur = dur.toTimeVal(); 555 () @trusted { event_add(m_timerEvent, &tvdur); } (); 556 assert(() @trusted { return event_pending(m_timerEvent, EV_TIMEOUT, null); } ()); 557 logTrace("Rescheduled timer event for %s seconds", dur.total!"usecs" * 1e-6); 558 } 559 560 private static nothrow extern(C) 561 void onTimerTimeout(evutil_socket_t, short events, void* userptr) 562 { 563 import std.encoding : sanitize; 564 565 logTrace("timer event fired"); 566 auto drv = () @trusted { return cast(Libevent2Driver)userptr; } (); 567 try drv.processTimers(); 568 catch (Exception e) { 569 logError("Failed to process timers: %s", e.msg); 570 try logDiagnostic("Full error: %s", () @trusted { return e.toString().sanitize; } ()); 571 catch (Exception e) { 572 logError("Failed to process timers: %s", e.msg); 573 } 574 } 575 } 576 577 private static nothrow extern(C) void onAddrInfo(int err, evutil_addrinfo* res, void* arg) 578 { 579 auto msg = () @trusted { return cast(GetAddrInfoMsg*)arg; } (); 580 msg.err = err; 581 msg.done = true; 582 if (err == DNS_ERR_NONE) { 583 assert(res !is null); 584 scope (exit) () @trusted { evutil_freeaddrinfo(res); } (); 585 586 // Note that we are only returning the first address and ignoring the 587 // rest. Ideally we should return all of the NetworkAddress 588 msg.addr.family = cast(ushort)res.ai_family; 589 assert(res.ai_addrlen == msg.addr.sockAddrLen()); 590 switch (msg.addr.family) { 591 case AF_INET: 592 auto sock4 = cast(sockaddr_in*)res.ai_addr; 593 msg.addr.sockAddrInet4.sin_addr.s_addr = sock4.sin_addr.s_addr; 594 break; 595 case AF_INET6: 596 auto sock6 = () @trusted { return cast(sockaddr_in6*)res.ai_addr; } (); 597 msg.addr.sockAddrInet6.sin6_addr.s6_addr = sock6.sin6_addr.s6_addr; 598 break; 599 default: 600 logDiagnostic("DNS lookup yielded unknown address family: %s", msg.addr.family); 601 err = DNS_ERR_UNKNOWN; 602 break; 603 } 604 } 605 if (msg.task && msg.task.running) { 606 try msg.core.resumeTask(msg.task); 607 catch (Exception e) logWarn("Error resuming DNS query task: %s", e.msg); 608 } 609 } 610 611 private void registerObject(Libevent2Object obj) 612 nothrow { 613 debug assert(() @trusted { return Thread.getThis(); } () is m_ownerThread, "Event object created in foreign thread."); 614 auto key = () @trusted { return cast(size_t)cast(void*)obj; } (); 615 m_ownedObjects.insert(key); 616 if (obj.m_threadObject) 617 () @trusted { 618 scope (failure) assert(false); // synchronized is not nothrow 619 synchronized (s_threadObjectsMutex) 620 s_threadObjects.insert(key); 621 } (); 622 } 623 624 private void unregisterObject(Libevent2Object obj) 625 nothrow { 626 scope (failure) assert(false); // synchronized is not nothrow 627 628 auto key = () @trusted { return cast(size_t)cast(void*)obj; } (); 629 m_ownedObjects.remove(key); 630 if (obj.m_threadObject) 631 () @trusted { 632 synchronized (s_threadObjectsMutex) 633 s_threadObjects.remove(key); 634 } (); 635 } 636 } 637 638 private struct TimerInfo { 639 size_t refCount = 1; 640 void delegate() @safe callback; 641 Task owner; 642 643 this(void delegate() @safe callback) @safe { this.callback = callback; } 644 } 645 646 struct AddressInfo { 647 NetworkAddress address; 648 string host; 649 ushort family; 650 bool useDNS; 651 } 652 653 654 private struct GetAddrInfoMsg { 655 NetworkAddress addr; 656 bool done = false; 657 int err = 0; 658 DriverCore core; 659 Task task; 660 } 661 662 private class Libevent2Object { 663 protected Libevent2Driver m_driver; 664 debug private Thread m_ownerThread; 665 private bool m_threadObject; 666 667 this(Libevent2Driver driver, bool thread_object) 668 nothrow @safe { 669 m_threadObject = thread_object; 670 m_driver = driver; 671 m_driver.registerObject(this); 672 debug m_ownerThread = driver.m_ownerThread; 673 } 674 675 ~this() 676 @trusted { 677 // NOTE: m_driver will always be destroyed deterministically 678 // in static ~this(), so it can be used here safely 679 m_driver.unregisterObject(this); 680 } 681 682 protected void onThreadShutdown() @safe {} 683 } 684 685 /// private 686 struct ThreadSlot { 687 Libevent2Driver driver; 688 deimos.event2.event.event* event; 689 ArraySet!Task tasks; 690 } 691 /// private 692 alias ThreadSlotMap = HashMap!(Thread, ThreadSlot); 693 694 final class Libevent2ManualEvent : Libevent2Object, ManualEvent { 695 @safe: 696 697 private { 698 shared(int) m_emitCount = 0; 699 core.sync.mutex.Mutex m_mutex; 700 ThreadSlotMap m_waiters; 701 } 702 703 this(Libevent2Driver driver) 704 nothrow { 705 super(driver, true); 706 scope (failure) assert(false); 707 m_mutex = new core.sync.mutex.Mutex; 708 m_waiters = ThreadSlotMap(driver.m_allocator); 709 } 710 711 ~this() 712 { 713 m_mutex = null; // Optimistic race-condition detection (see Libevent2Driver.dispose()) 714 foreach (ref m_waiters.Value ts; m_waiters) 715 () @trusted { event_free(ts.event); } (); 716 } 717 718 void emit() 719 { 720 static if (!synchronizedIsNothrow) 721 scope (failure) assert(0, "Internal error: function should be nothrow"); 722 723 () @trusted { atomicOp!"+="(m_emitCount, 1); } (); 724 synchronized (m_mutex) { 725 foreach (ref m_waiters.Value sl; m_waiters) 726 () @trusted { event_active(sl.event, 0, 0); } (); 727 } 728 } 729 730 void wait() { wait(m_emitCount); } 731 int wait(int reference_emit_count) { return doWait!true(reference_emit_count); } 732 int wait(Duration timeout, int reference_emit_count) { return doWait!true(timeout, reference_emit_count); } 733 int waitUninterruptible(int reference_emit_count) { return doWait!false(reference_emit_count); } 734 int waitUninterruptible(Duration timeout, int reference_emit_count) { return doWait!false(timeout, reference_emit_count); } 735 736 void acquire() 737 { 738 auto task = Task.getThis(); 739 auto thread = task == Task() ? () @trusted { return Thread.getThis(); } () : task.thread; 740 741 synchronized (m_mutex) { 742 if (thread !in m_waiters) { 743 ThreadSlot slot; 744 slot.driver = cast(Libevent2Driver)getEventDriver(); 745 slot.event = () @trusted { return event_new(slot.driver.eventLoop, -1, EV_PERSIST, &onSignalTriggered, cast(void*)this); } (); 746 () @trusted { event_add(slot.event, null); } (); 747 m_waiters[thread] = slot; 748 } 749 750 if (task != Task()) { 751 assert(task !in m_waiters[thread].tasks, "Double acquisition of signal."); 752 m_waiters[thread].tasks.insert(task); 753 } 754 } 755 } 756 757 void release() 758 { 759 auto self = Task.getThis(); 760 if (self == Task()) return; 761 762 synchronized (m_mutex) { 763 assert(self.thread in m_waiters && self in m_waiters[self.thread].tasks, 764 "Releasing non-acquired signal."); 765 m_waiters[self.thread].tasks.remove(self); 766 } 767 } 768 769 bool amOwner() 770 { 771 auto self = Task.getThis(); 772 if (self == Task()) return false; 773 synchronized (m_mutex) { 774 if (self.thread !in m_waiters) return false; 775 return self in m_waiters[self.thread].tasks; 776 } 777 } 778 779 @property int emitCount() const @trusted { return atomicLoad(m_emitCount); } 780 781 protected override void onThreadShutdown() 782 { 783 auto thr = () @trusted { return Thread.getThis(); } (); 784 synchronized (m_mutex) { 785 if (thr in m_waiters) { 786 () @trusted { event_free(m_waiters[thr].event); } (); 787 m_waiters.remove(thr); 788 } 789 } 790 } 791 792 private int doWait(bool INTERRUPTIBLE)(int reference_emit_count) 793 { 794 static if (!INTERRUPTIBLE) scope (failure) assert(false); // still some function calls not marked nothrow 795 assert(!amOwner()); 796 797 auto ec = this.emitCount; 798 if (ec != reference_emit_count) return ec; 799 800 acquire(); 801 scope(exit) release(); 802 803 while (ec == reference_emit_count) { 804 static if (INTERRUPTIBLE) getThreadLibeventDriverCore().yieldForEvent(); 805 else getThreadLibeventDriverCore().yieldForEventDeferThrow(); 806 ec = this.emitCount; 807 } 808 return ec; 809 } 810 811 private int doWait(bool INTERRUPTIBLE)(Duration timeout, int reference_emit_count) 812 { 813 static if (!INTERRUPTIBLE) scope (failure) assert(false); // still some function calls not marked nothrow 814 assert(!amOwner()); 815 816 auto ec = this.emitCount; 817 if (ec != reference_emit_count) return ec; 818 819 acquire(); 820 scope(exit) release(); 821 auto tm = m_driver.createTimer(null); 822 scope (exit) m_driver.releaseTimer(tm); 823 m_driver.m_timers.getUserData(tm).owner = Task.getThis(); 824 m_driver.rearmTimer(tm, timeout, false); 825 826 while (ec == reference_emit_count) { 827 static if (INTERRUPTIBLE) getThreadLibeventDriverCore().yieldForEvent(); 828 else getThreadLibeventDriverCore().yieldForEventDeferThrow(); 829 ec = this.emitCount; 830 if (!m_driver.isTimerPending(tm)) break; 831 } 832 return ec; 833 } 834 835 private static nothrow extern(C) 836 void onSignalTriggered(evutil_socket_t, short events, void* userptr) 837 { 838 import std.encoding : sanitize; 839 840 try { 841 auto sig = () @trusted { return cast(Libevent2ManualEvent)userptr; } (); 842 auto thread = () @trusted { return Thread.getThis(); } (); 843 auto core = getThreadLibeventDriverCore(); 844 845 ArraySet!Task lst; 846 synchronized (sig.m_mutex) { 847 assert(thread in sig.m_waiters); 848 lst = sig.m_waiters[thread].tasks.dup; 849 } 850 851 foreach (l; lst) 852 core.resumeTask(l); 853 } catch (Exception e) { 854 logError("Exception while handling signal event: %s", e.msg); 855 try logDiagnostic("Full error: %s", () @trusted { return sanitize(e.msg); } ()); 856 catch(Exception) {} 857 debug assert(false); 858 } 859 } 860 } 861 862 863 final class Libevent2FileDescriptorEvent : Libevent2Object, FileDescriptorEvent { 864 @safe: 865 866 private { 867 int m_fd; 868 deimos.event2.event.event* m_event; 869 bool m_persistent; 870 Trigger m_activeEvents; 871 Task m_waiter; 872 } 873 874 this(Libevent2Driver driver, int file_descriptor, Trigger events, Mode mode) 875 { 876 assert(events != Trigger.none); 877 super(driver, false); 878 m_fd = file_descriptor; 879 m_persistent = mode != Mode.nonPersistent; 880 short evts = 0; 881 if (events & Trigger.read) evts |= EV_READ; 882 if (events & Trigger.write) evts |= EV_WRITE; 883 if (m_persistent) evts |= EV_PERSIST; 884 if (mode == Mode.edgeTriggered) evts |= EV_ET; 885 m_event = () @trusted { return event_new(driver.eventLoop, file_descriptor, evts, &onFileTriggered, cast(void*)this); } (); 886 if (m_persistent) () @trusted { event_add(m_event, null); } (); 887 } 888 889 ~this() 890 { 891 () @trusted { event_free(m_event); } (); 892 } 893 894 Trigger wait(Trigger which) 895 { 896 assert(!m_waiter, "Only one task may wait on a Libevent2FileEvent."); 897 m_waiter = Task.getThis(); 898 scope (exit) { 899 m_waiter = Task.init; 900 m_activeEvents &= ~which; 901 } 902 903 while ((m_activeEvents & which) == Trigger.none) { 904 if (!m_persistent) () @trusted { event_add(m_event, null); } (); 905 getThreadLibeventDriverCore().yieldForEvent(); 906 } 907 return m_activeEvents & which; 908 } 909 910 Trigger wait(Duration timeout, Trigger which) 911 { 912 assert(!m_waiter, "Only one task may wait on a Libevent2FileEvent."); 913 m_waiter = Task.getThis(); 914 scope (exit) { 915 m_waiter = Task.init; 916 m_activeEvents &= ~which; 917 } 918 919 auto tm = m_driver.createTimer(null); 920 scope (exit) m_driver.releaseTimer(tm); 921 m_driver.m_timers.getUserData(tm).owner = Task.getThis(); 922 m_driver.rearmTimer(tm, timeout, false); 923 924 while ((m_activeEvents & which) == Trigger.none) { 925 if (!m_persistent) () @trusted { event_add(m_event, null); } (); 926 getThreadLibeventDriverCore().yieldForEvent(); 927 if (!m_driver.isTimerPending(tm)) break; 928 } 929 return m_activeEvents & which; 930 } 931 932 private static nothrow extern(C) 933 void onFileTriggered(evutil_socket_t fd, short events, void* userptr) 934 { 935 import std.encoding : sanitize; 936 937 try { 938 auto core = getThreadLibeventDriverCore(); 939 auto evt = () @trusted { return cast(Libevent2FileDescriptorEvent)userptr; } (); 940 941 evt.m_activeEvents = Trigger.none; 942 if (events & EV_READ) evt.m_activeEvents |= Trigger.read; 943 if (events & EV_WRITE) evt.m_activeEvents |= Trigger.write; 944 if (evt.m_waiter) core.resumeTask(evt.m_waiter); 945 } catch (Exception e) { 946 logError("Exception while handling file event: %s", e.msg); 947 try logDiagnostic("Full error: %s", () @trusted { return sanitize(e.msg); } ()); 948 catch(Exception) {} 949 debug assert(false); 950 } 951 } 952 } 953 954 955 final class Libevent2UDPConnection : UDPConnection { 956 @safe: 957 958 private { 959 Libevent2Driver m_driver; 960 TCPContext* m_ctx; 961 NetworkAddress m_bindAddress; 962 string m_bindAddressString; 963 bool m_canBroadcast = false; 964 } 965 966 this(NetworkAddress bind_addr, Libevent2Driver driver) 967 { 968 m_driver = driver; 969 970 auto sockfd_raw = () @trusted { return socket(bind_addr.family, SOCK_DGRAM, IPPROTO_UDP); } (); 971 // on Win64 socket() returns a 64-bit value but libevent expects an int 972 static if (typeof(sockfd_raw).max > int.max) assert(sockfd_raw <= int.max || sockfd_raw == ~0); 973 auto sockfd = cast(int)sockfd_raw; 974 socketEnforce(sockfd != -1, "Failed to create socket."); 975 976 enforce(() @trusted { return evutil_make_socket_nonblocking(sockfd); } () == 0, "Failed to make socket non-blocking."); 977 978 int tmp_reuse = 1; 979 socketEnforce(() @trusted { return setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof); } () == 0, 980 "Error enabling socket address reuse on listening socket"); 981 982 // bind the socket to a local inteface/port 983 socketEnforce(() @trusted { return bind(sockfd, bind_addr.sockAddr, bind_addr.sockAddrLen); } () == 0, "Failed to bind UDP socket."); 984 // read back the actual bind address 985 socklen_t balen = bind_addr.sockAddrLen; 986 socketEnforce(() @trusted { return getsockname(sockfd, bind_addr.sockAddr, &balen); } () == 0, "getsockname failed."); 987 988 // generate the bind address string 989 m_bindAddress = bind_addr; 990 char[64] buf; 991 void* ptr; 992 if( bind_addr.family == AF_INET ) ptr = &bind_addr.sockAddrInet4.sin_addr; 993 else ptr = &bind_addr.sockAddrInet6.sin6_addr; 994 () @trusted { evutil_inet_ntop(bind_addr.family, ptr, buf.ptr, buf.length); } (); 995 m_bindAddressString = () @trusted { return to!string(buf.ptr); } (); 996 997 // create a context for storing connection information 998 m_ctx = () @trusted { return TCPContextAlloc.alloc(driver.m_core, driver.m_eventLoop, sockfd, null, bind_addr, NetworkAddress()); } (); 999 scope(failure) () @trusted { TCPContextAlloc.free(m_ctx); } (); 1000 m_ctx.listenEvent = () @trusted { return event_new(driver.m_eventLoop, sockfd, EV_READ|EV_PERSIST, &onUDPRead, m_ctx); } (); 1001 if (!m_ctx.listenEvent) throw new Exception("Failed to create buffer event for socket."); 1002 } 1003 1004 @property string bindAddress() const { return m_bindAddressString; } 1005 @property NetworkAddress localAddress() const { return m_bindAddress; } 1006 1007 @property bool canBroadcast() const { return m_canBroadcast; } 1008 @property void canBroadcast(bool val) 1009 { 1010 int tmp_broad = val; 1011 enforce(() @trusted { return setsockopt(m_ctx.socketfd, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0, 1012 "Failed to change the socket broadcast flag."); 1013 m_canBroadcast = val; 1014 } 1015 1016 1017 bool amOwner() { 1018 return m_ctx !is null && m_ctx.readOwner != Task() && m_ctx.readOwner == Task.getThis() && m_ctx.readOwner == m_ctx.writeOwner; 1019 } 1020 1021 void acquire() 1022 { 1023 assert(m_ctx, "Trying to acquire a closed UDP connection."); 1024 assert(m_ctx.readOwner == Task() && m_ctx.writeOwner == Task(), 1025 "Trying to acquire a UDP connection that is currently owned."); 1026 m_ctx.readOwner = m_ctx.writeOwner = Task.getThis(); 1027 } 1028 1029 void release() 1030 { 1031 if (!m_ctx) return; 1032 assert(m_ctx.readOwner == Task.getThis() && m_ctx.readOwner == m_ctx.writeOwner, 1033 "Trying to release a UDP connection that is not owned by the current task."); 1034 m_ctx.readOwner = m_ctx.writeOwner = Task.init; 1035 } 1036 1037 void close() 1038 { 1039 if (!m_ctx) return; 1040 acquire(); 1041 1042 if (m_ctx.listenEvent) () @trusted { event_free(m_ctx.listenEvent); } (); 1043 () @trusted { TCPContextAlloc.free(m_ctx); } (); 1044 m_ctx = null; 1045 } 1046 1047 void connect(string host, ushort port) 1048 { 1049 NetworkAddress addr = m_driver.resolveHost(host, m_ctx.local_addr.family); 1050 addr.port = port; 1051 connect(addr); 1052 } 1053 1054 void connect(NetworkAddress addr) 1055 { 1056 enforce(() @trusted { return .connect(m_ctx.socketfd, addr.sockAddr, addr.sockAddrLen); } () == 0, "Failed to connect UDP socket."~to!string(getLastSocketError())); 1057 } 1058 1059 void send(in ubyte[] data, in NetworkAddress* peer_address = null) 1060 { 1061 sizediff_t ret; 1062 assert(data.length <= int.max); 1063 if( peer_address ){ 1064 ret = () @trusted { return .sendto(m_ctx.socketfd, data.ptr, cast(int)data.length, 0, peer_address.sockAddr, peer_address.sockAddrLen); } (); 1065 } else { 1066 ret = () @trusted { return .send(m_ctx.socketfd, data.ptr, cast(int)data.length, 0); } (); 1067 } 1068 logTrace("send ret: %s, %s", ret, getLastSocketError()); 1069 enforce(ret >= 0, "Error sending UDP packet."); 1070 enforce(ret == data.length, "Unable to send full packet."); 1071 } 1072 1073 ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null) 1074 { 1075 return recv(Duration.max, buf, peer_address); 1076 } 1077 1078 ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null) 1079 { 1080 size_t tm = size_t.max; 1081 if (timeout >= 0.seconds && timeout != Duration.max) { 1082 tm = m_driver.createTimer(null); 1083 m_driver.m_timers.getUserData(tm).owner = Task.getThis(); 1084 m_driver.rearmTimer(tm, timeout, false); 1085 } 1086 1087 acquire(); 1088 // TODO: adds the event only when we actually read to avoid event loop 1089 // spinning when data is available, see #715. Since this may be 1090 // performance critical, a proper benchmark should be performed! 1091 enforce(() @trusted { return event_add(m_ctx.listenEvent, null); } () == 0); 1092 1093 scope (exit) { 1094 () @trusted { event_del(m_ctx.listenEvent); } (); 1095 release(); 1096 if (tm != size_t.max) m_driver.releaseTimer(tm); 1097 } 1098 1099 if (buf.length == 0) buf.length = 65507; 1100 1101 NetworkAddress from; 1102 from.family = m_ctx.local_addr.family; 1103 assert(buf.length <= int.max); 1104 while (true) { 1105 socklen_t addr_len = from.sockAddrLen; 1106 auto ret = () @trusted { return .recvfrom(m_ctx.socketfd, buf.ptr, cast(int)buf.length, 0, from.sockAddr, &addr_len); } (); 1107 if (ret > 0) { 1108 if( peer_address ) *peer_address = from; 1109 return buf[0 .. ret]; 1110 } 1111 if (ret < 0) { 1112 auto err = getLastSocketError(); 1113 if (err != EWOULDBLOCK) { 1114 logDebugV("UDP recv err: %s", err); 1115 throw new Exception("Error receiving UDP packet."); 1116 } 1117 if (timeout != Duration.max) { 1118 enforce(timeout > 0.seconds && m_driver.isTimerPending(tm), "UDP receive timeout."); 1119 } 1120 } 1121 m_ctx.core.yieldForEvent(); 1122 } 1123 } 1124 1125 override void addMembership(ref NetworkAddress multiaddr) 1126 { 1127 if (multiaddr.family == AF_INET) 1128 { 1129 version (Windows) 1130 { 1131 alias in_addr = core.sys.windows.winsock2.in_addr; 1132 } else 1133 { 1134 static import core.sys.posix.arpa.inet; 1135 alias in_addr = core.sys.posix.arpa.inet.in_addr; 1136 } 1137 struct ip_mreq { 1138 in_addr imr_multiaddr; /* IP multicast address of group */ 1139 in_addr imr_interface; /* local IP address of interface */ 1140 } 1141 auto inaddr = in_addr(); 1142 inaddr.s_addr = htonl(INADDR_ANY); 1143 auto mreq = ip_mreq(multiaddr.sockAddrInet4.sin_addr, inaddr); 1144 enforce(() @trusted { return setsockopt (m_ctx.socketfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, ip_mreq.sizeof); } () == 0, 1145 "Failed to add to multicast group"); 1146 } else 1147 { 1148 version (Windows) 1149 { 1150 alias in6_addr = core.sys.windows.winsock2.in6_addr; 1151 struct ipv6_mreq { 1152 in6_addr ipv6mr_multiaddr; 1153 uint ipv6mr_interface; 1154 } 1155 } 1156 auto mreq = ipv6_mreq(multiaddr.sockAddrInet6.sin6_addr, 0); 1157 enforce(() @trusted { return setsockopt (m_ctx.socketfd, IPPROTO_IP, IPV6_JOIN_GROUP, &mreq, ipv6_mreq.sizeof); } () == 0, 1158 "Failed to add to multicast group"); 1159 } 1160 } 1161 1162 @property void multicastLoopback(bool loop) 1163 { 1164 int tmp_loop = loop; 1165 enforce(() @trusted { return setsockopt (m_ctx.socketfd, IPPROTO_IP, IP_MULTICAST_LOOP, &tmp_loop, tmp_loop.sizeof); } () == 0, 1166 "Failed to add to multicast loopback"); 1167 } 1168 1169 private static nothrow extern(C) void onUDPRead(evutil_socket_t sockfd, short evts, void* arg) 1170 { 1171 auto ctx = () @trusted { return cast(TCPContext*)arg; } (); 1172 logTrace("udp socket %d read event!", ctx.socketfd); 1173 1174 try { 1175 auto f = ctx.readOwner; 1176 if (f && f.running) 1177 ctx.core.resumeTask(f); 1178 } catch( Exception e ){ 1179 logError("Exception onUDPRead: %s", e.msg); 1180 debug assert(false); 1181 } 1182 } 1183 } 1184 1185 /******************************************************************************/ 1186 /* InotifyDirectoryWatcher */ 1187 /******************************************************************************/ 1188 1189 version (linux) 1190 final class InotifyDirectoryWatcher : DirectoryWatcher { 1191 @safe: 1192 1193 import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify; 1194 import std.file; 1195 1196 private { 1197 Path m_path; 1198 string[int] m_watches; 1199 bool m_recursive; 1200 int m_handle; 1201 DriverCore m_core; 1202 Task m_owner; 1203 } 1204 1205 this(DriverCore core, Path path, bool recursive) 1206 { 1207 m_core = core; 1208 m_recursive = recursive; 1209 m_path = path; 1210 1211 enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect 1212 m_handle = () @trusted { return inotify_init1(IN_NONBLOCK); } (); 1213 errnoEnforce(m_handle != -1, "Failed to initialize inotify."); 1214 1215 auto spath = m_path.toString(); 1216 addWatch(spath); 1217 if (recursive && spath.isDir) 1218 { 1219 () @trusted { 1220 foreach (de; spath.dirEntries(SpanMode.shallow)) 1221 if (de.isDir) addWatch(de.name); 1222 } (); 1223 } 1224 } 1225 1226 ~this() 1227 { 1228 errnoEnforce(() @trusted { return close(m_handle); } () == 0); 1229 } 1230 1231 @property Path path() const { return m_path; } 1232 @property bool recursive() const { return m_recursive; } 1233 1234 void release() 1235 @safe { 1236 assert(m_owner == Task.getThis(), "Releasing DirectoyWatcher that is not owned by the calling task."); 1237 m_owner = Task(); 1238 } 1239 1240 void acquire() 1241 @safe { 1242 assert(m_owner == Task(), "Acquiring DirectoyWatcher that is already owned."); 1243 m_owner = Task.getThis(); 1244 } 1245 1246 bool amOwner() 1247 @safe { 1248 return m_owner == Task.getThis(); 1249 } 1250 1251 bool readChanges(ref DirectoryChange[] dst, Duration timeout) 1252 { 1253 import core.stdc.stdio : FILENAME_MAX; 1254 import core.stdc.string : strlen; 1255 1256 acquire(); 1257 scope(exit) release(); 1258 1259 ubyte[inotify_event.sizeof + FILENAME_MAX + 1] buf = void; 1260 auto nread = () @trusted { return read(m_handle, buf.ptr, buf.sizeof); } (); 1261 1262 if (nread == -1 && errno == EAGAIN) 1263 { 1264 if (!waitReadable(m_handle, timeout)) 1265 return false; 1266 nread = () @trusted { return read(m_handle, buf.ptr, buf.sizeof); } (); 1267 } 1268 errnoEnforce(nread != -1, "Error while reading inotify handle."); 1269 assert(nread > 0); 1270 1271 dst.length = 0; 1272 do 1273 { 1274 for (size_t i = 0; i < nread;) { 1275 auto ev = &(cast(inotify_event[])buf[i .. i+inotify_event.sizeof])[0]; 1276 if (ev.wd !in m_watches) { 1277 logDebug("Got unknown inotify watch ID %s. Ignoring.", ev.wd); 1278 continue; 1279 } 1280 1281 DirectoryChangeType type; 1282 if (ev.mask & (IN_CREATE|IN_MOVED_TO)) 1283 type = DirectoryChangeType.added; 1284 else if (ev.mask & (IN_DELETE|IN_DELETE_SELF|IN_MOVE_SELF|IN_MOVED_FROM)) 1285 type = DirectoryChangeType.removed; 1286 else if (ev.mask & IN_MODIFY) 1287 type = DirectoryChangeType.modified; 1288 1289 import std.path : buildPath; 1290 auto name = () @trusted { return ev.name.ptr[0 .. ev.name.ptr.strlen]; } (); 1291 auto path = Path(buildPath(m_watches[ev.wd], name)); 1292 1293 dst ~= DirectoryChange(type, path); 1294 1295 i += inotify_event.sizeof + ev.len; 1296 } 1297 nread = () @trusted { return read(m_handle, buf.ptr, buf.sizeof); } (); 1298 errnoEnforce(nread != -1 || errno == EAGAIN, "Error while reading inotify handle."); 1299 } while (nread > 0); 1300 return true; 1301 } 1302 1303 private bool waitReadable(int fd, Duration timeout) 1304 @safe { 1305 static struct Args { InotifyDirectoryWatcher watcher; bool readable, timeout; } 1306 1307 static extern(System) void cb(int fd, short what, void* p) { 1308 with (() @trusted { return cast(Args*)p; } ()) { 1309 if (what & EV_READ) readable = true; 1310 if (what & EV_TIMEOUT) timeout = true; 1311 if (watcher.m_owner) 1312 watcher.m_core.resumeTask(watcher.m_owner); 1313 } 1314 } 1315 1316 auto loop = getThreadLibeventEventLoop(); 1317 auto args = Args(this); 1318 auto ev = () @trusted { return event_new(loop, fd, EV_READ, &cb, &args); } (); 1319 scope(exit) () @trusted { event_free(ev); } (); 1320 1321 if (!timeout.isNegative) { 1322 auto tv = timeout.toTimeVal(); 1323 () @trusted { event_add(ev, &tv); } (); 1324 } else { 1325 () @trusted { event_add(ev, null); } (); 1326 } 1327 while (!args.readable && !args.timeout) 1328 m_core.yieldForEvent(); 1329 return args.readable; 1330 } 1331 1332 private void addWatch(string path) 1333 @safe { 1334 enum EVENTS = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY | 1335 IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO; 1336 immutable wd = () @trusted { return inotify_add_watch(m_handle, path.toStringz, EVENTS); } (); 1337 errnoEnforce(wd != -1, "Failed to add inotify watch."); 1338 m_watches[wd] = path; 1339 } 1340 } 1341 1342 1343 private { 1344 1345 event_base* s_eventLoop; // TLS 1346 Libevent2Driver s_driver; 1347 __gshared DriverCore s_driverCore; 1348 // protects s_threadObjects and the m_ownerThread and m_driver fields of Libevent2Object 1349 __gshared Mutex s_threadObjectsMutex; 1350 __gshared ArraySet!size_t s_threadObjects; 1351 debug __gshared size_t[void*] s_mutexes; 1352 debug __gshared Mutex s_mutexesLock; 1353 bool s_alreadyDeinitialized = false; 1354 } 1355 1356 package event_base* getThreadLibeventEventLoop() @safe nothrow 1357 { 1358 return s_eventLoop; 1359 } 1360 1361 package DriverCore getThreadLibeventDriverCore() @trusted nothrow 1362 { 1363 return s_driverCore; 1364 } 1365 1366 private int getLastSocketError() @trusted nothrow 1367 { 1368 version(Windows) { 1369 return WSAGetLastError(); 1370 } else { 1371 import core.stdc.errno; 1372 return errno; 1373 } 1374 } 1375 1376 struct LevCondition { 1377 Condition cond; 1378 LevMutex* mutex; 1379 } 1380 1381 struct LevMutex { 1382 core.sync.mutex.Mutex mutex; 1383 ReadWriteMutex rwmutex; 1384 } 1385 1386 alias LevConditionAlloc = FreeListObjectAlloc!(LevCondition, false); 1387 alias LevMutexAlloc = FreeListObjectAlloc!(LevMutex, false); 1388 alias MutexAlloc = FreeListObjectAlloc!(core.sync.mutex.Mutex, false); 1389 alias ReadWriteMutexAlloc = FreeListObjectAlloc!(ReadWriteMutex, false); 1390 alias ConditionAlloc = FreeListObjectAlloc!(Condition, false); 1391 1392 private nothrow extern(C) 1393 { 1394 version (VibeDebugCatchAll) alias UncaughtException = Throwable; 1395 else alias UncaughtException = Exception; 1396 1397 void* lev_alloc(size_t size) 1398 { 1399 try { 1400 auto mem = s_driver.m_allocator.allocate(size+size_t.sizeof); 1401 if (!mem.ptr) return null; 1402 *cast(size_t*)mem.ptr = size; 1403 return mem.ptr + size_t.sizeof; 1404 } catch (UncaughtException th) { 1405 logWarn("Exception in lev_alloc: %s", th.msg); 1406 return null; 1407 } 1408 } 1409 void* lev_realloc(void* p, size_t newsize) 1410 { 1411 try { 1412 if( !p ) return lev_alloc(newsize); 1413 auto oldsize = *cast(size_t*)(p-size_t.sizeof); 1414 auto oldmem = (p-size_t.sizeof)[0 .. oldsize+size_t.sizeof]; 1415 auto newmem = oldmem; 1416 if (!s_driver.m_allocator.reallocate(newmem, newsize+size_t.sizeof)) 1417 return null; 1418 *cast(size_t*)newmem.ptr = newsize; 1419 return newmem.ptr + size_t.sizeof; 1420 } catch (UncaughtException th) { 1421 logWarn("Exception in lev_realloc: %s", th.msg); 1422 return null; 1423 } 1424 } 1425 void lev_free(void* p) 1426 { 1427 try { 1428 auto size = *cast(size_t*)(p-size_t.sizeof); 1429 auto mem = (p-size_t.sizeof)[0 .. size+size_t.sizeof]; 1430 s_driver.m_allocator.deallocate(mem); 1431 } catch (UncaughtException th) { 1432 logCritical("Exception in lev_free: %s", th.msg); 1433 assert(false); 1434 } 1435 } 1436 1437 void* lev_alloc_mutex(uint locktype) 1438 { 1439 try { 1440 auto ret = LevMutexAlloc.alloc(); 1441 if( locktype == EVTHREAD_LOCKTYPE_READWRITE ) ret.rwmutex = ReadWriteMutexAlloc.alloc(); 1442 else ret.mutex = MutexAlloc.alloc(); 1443 //logInfo("alloc mutex %s", cast(void*)ret); 1444 debug if (!s_mutexesLock) s_mutexesLock = new Mutex; 1445 debug synchronized (s_mutexesLock) s_mutexes[cast(void*)ret] = 0; 1446 return ret; 1447 } catch (UncaughtException th) { 1448 logWarn("Exception in lev_alloc_mutex: %s", th.msg); 1449 return null; 1450 } 1451 } 1452 1453 void lev_free_mutex(void* lock, uint locktype) 1454 { 1455 try { 1456 import core.runtime; 1457 //logInfo("free mutex %s: %s", cast(void*)lock, defaultTraceHandler()); 1458 debug synchronized (s_mutexesLock) { 1459 auto pl = lock in s_mutexes; 1460 assert(pl !is null); 1461 assert(*pl == 0); 1462 s_mutexes.remove(lock); 1463 } 1464 auto lm = cast(LevMutex*)lock; 1465 if (lm.mutex) MutexAlloc.free(lm.mutex); 1466 if (lm.rwmutex) ReadWriteMutexAlloc.free(lm.rwmutex); 1467 LevMutexAlloc.free(lm); 1468 } catch (UncaughtException th) { 1469 logCritical("Exception in lev_free_mutex: %s", th.msg); 1470 assert(false); 1471 } 1472 } 1473 1474 int lev_lock_mutex(uint mode, void* lock) 1475 { 1476 try { 1477 //logInfo("lock mutex %s", cast(void*)lock); 1478 debug synchronized (s_mutexesLock) { 1479 auto pl = lock in s_mutexes; 1480 assert(pl !is null, "Unknown lock handle"); 1481 (*pl)++; 1482 } 1483 auto mtx = cast(LevMutex*)lock; 1484 1485 assert(mtx !is null, "null lock"); 1486 assert(mtx.mutex !is null || mtx.rwmutex !is null, "lock contains no mutex"); 1487 if( mode & EVTHREAD_WRITE ){ 1488 if( mode & EVTHREAD_TRY ) return mtx.rwmutex.writer().tryLock() ? 0 : 1; 1489 else mtx.rwmutex.writer().lock(); 1490 } else if( mode & EVTHREAD_READ ){ 1491 if( mode & EVTHREAD_TRY ) return mtx.rwmutex.reader().tryLock() ? 0 : 1; 1492 else mtx.rwmutex.reader().lock(); 1493 } else { 1494 assert(mtx.mutex !is null, "lock mutex is null"); 1495 if( mode & EVTHREAD_TRY ) return mtx.mutex.tryLock() ? 0 : 1; 1496 else mtx.mutex.lock(); 1497 } 1498 return 0; 1499 } catch (UncaughtException th) { 1500 logWarn("Exception in lev_lock_mutex: %s", th.msg); 1501 return -1; 1502 } 1503 } 1504 1505 int lev_unlock_mutex(uint mode, void* lock) 1506 { 1507 try { 1508 //logInfo("unlock mutex %s", cast(void*)lock); 1509 debug synchronized (s_mutexesLock) { 1510 auto pl = lock in s_mutexes; 1511 assert(pl !is null, "Unknown lock handle"); 1512 assert(*pl > 0, "Unlocking unlocked mutex"); 1513 (*pl)--; 1514 } 1515 1516 auto mtx = cast(LevMutex*)lock; 1517 1518 if( mode & EVTHREAD_WRITE ){ 1519 mtx.rwmutex.writer().unlock(); 1520 } else if( mode & EVTHREAD_READ ){ 1521 mtx.rwmutex.reader().unlock(); 1522 } else { 1523 mtx.mutex.unlock(); 1524 } 1525 return 0; 1526 } catch (UncaughtException th ) { 1527 logWarn("Exception in lev_unlock_mutex: %s", th.msg); 1528 return -1; 1529 } 1530 } 1531 1532 void* lev_alloc_condition(uint condtype) 1533 { 1534 try { 1535 return LevConditionAlloc.alloc(); 1536 } catch (UncaughtException th) { 1537 logWarn("Exception in lev_alloc_condition: %s", th.msg); 1538 return null; 1539 } 1540 } 1541 1542 void lev_free_condition(void* cond) 1543 { 1544 try { 1545 auto lc = cast(LevCondition*)cond; 1546 if (lc.cond) ConditionAlloc.free(lc.cond); 1547 LevConditionAlloc.free(lc); 1548 } catch (UncaughtException th) { 1549 logCritical("Exception in lev_free_condition: %s", th.msg); 1550 assert(false); 1551 } 1552 } 1553 1554 int lev_signal_condition(void* cond, int broadcast) 1555 { 1556 try { 1557 auto c = cast(LevCondition*)cond; 1558 if( c.cond ) c.cond.notifyAll(); 1559 return 0; 1560 } catch (UncaughtException th) { 1561 logWarn("Exception in lev_signal_condition: %s", th.msg); 1562 return -1; 1563 } 1564 } 1565 1566 int lev_wait_condition(void* cond, void* lock, const(timeval)* timeout) 1567 { 1568 try { 1569 auto c = cast(LevCondition*)cond; 1570 if( c.mutex is null ) c.mutex = cast(LevMutex*)lock; 1571 assert(c.mutex.mutex !is null); // RW mutexes are not supported for conditions! 1572 assert(c.mutex is lock); 1573 if( c.cond is null ) c.cond = ConditionAlloc.alloc(c.mutex.mutex); 1574 if( timeout ){ 1575 if( !c.cond.wait(dur!"seconds"(timeout.tv_sec) + dur!"usecs"(timeout.tv_usec)) ) 1576 return 1; 1577 } else c.cond.wait(); 1578 return 0; 1579 } catch (UncaughtException th) { 1580 logWarn("Exception in lev_wait_condition: %s", th.msg); 1581 return -1; 1582 } 1583 } 1584 1585 c_ulong lev_get_thread_id() 1586 { 1587 try return cast(c_ulong)cast(void*)Thread.getThis(); 1588 catch (UncaughtException th) { 1589 logWarn("Exception in lev_get_thread_id: %s", th.msg); 1590 return 0; 1591 } 1592 } 1593 } 1594 1595 package timeval toTimeVal(Duration dur) 1596 @safe { 1597 timeval tvdur; 1598 dur.split!("seconds", "usecs")(tvdur.tv_sec, tvdur.tv_usec); 1599 return tvdur; 1600 } 1601 1602 }