1 /**
2 	Driver implementation for the libevent library
3 
4 	Libevent is a well-established event notification library.
5 	It is currently the default driver for Vibe.d
6 
7 	See_Also:
8 		`vibe.core.driver` = interface definition
9 		http://libevent.org/ = Official website
10 		`vibe.core.drivers.libevent2_tcp` = Implementation of TCPConnection and TCPListener
11 
12 	Copyright: © 2012-2015 RejectedSoftware e.K.
13 	Authors: Sönke Ludwig
14 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
15 */
16 module vibe.core.drivers.libevent2;
17 
18 version(VibeLibeventDriver)
19 {
20 
21 import vibe.core.driver;
22 import vibe.core.drivers.libevent2_tcp;
23 import vibe.core.drivers.threadedfile;
24 import vibe.core.drivers.timerqueue;
25 import vibe.core.drivers.utils;
26 import vibe.core.log;
27 import vibe.internal.meta.traits : synchronizedIsNothrow;
28 import vibe.utils.array : ArraySet;
29 import vibe.utils.hashmap;
30 import vibe.internal.allocator;
31 import vibe.internal.freelistref;
32 
33 import core.memory;
34 import core.atomic;
35 import core.stdc.config;
36 import core.stdc.errno;
37 import core.stdc.stdlib;
38 import core.sync.condition;
39 import core.sync.mutex;
40 import core.sync.rwmutex;
41 import core.sys.posix.netinet.in_;
42 import core.sys.posix.netinet.tcp;
43 import core.thread;
44 import deimos.event2.bufferevent;
45 import deimos.event2.dns;
46 import deimos.event2.event;
47 import deimos.event2.thread;
48 import deimos.event2.util;
49 import std.conv;
50 import std.datetime;
51 import std.exception;
52 import std.string;
53 
54 
55 version (Windows)
56 {
57 	version(VibePragmaLib) pragma(lib, "event2");
58 	pragma(lib, "ws2_32.lib");
59 }
60 else
61 	version(VibePragmaLib) pragma(lib, "event");
62 
63 version(Windows)
64 {
65 	import core.sys.windows.winsock2;
66 
67 	alias EWOULDBLOCK = WSAEWOULDBLOCK;
68 }
69 
70 version(OSX)
71 {
72 	static if (__VERSION__ < 2077)
73 	{
74 		enum IP_ADD_MEMBERSHIP = 12;
75 		enum IP_MULTICAST_LOOP = 11;
76 	}
77 	else
78 		import core.sys.darwin.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP;
79 } else version(FreeBSD)
80 {
81 	static if (__VERSION__ < 2077)
82 	{
83 		enum IP_ADD_MEMBERSHIP  = 12;
84 		enum IP_MULTICAST_LOOP  = 11;
85 	}
86 	else
87 		import core.sys.freebsd.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP;
88 } else version(linux)
89 {
90 	static if (__VERSION__ < 2077)
91 	{
92 		enum IP_ADD_MEMBERSHIP =  35;
93 		enum IP_MULTICAST_LOOP =  34;
94 	}
95 	else
96 		import core.sys.linux.netinet.in_ : IP_ADD_MEMBERSHIP, IP_MULTICAST_LOOP;
97 } else version(Windows)
98 {
99 	// IP_ADD_MEMBERSHIP and IP_MULTICAST_LOOP are included in winsock(2) import above
100 }
101 
102 final class Libevent2Driver : EventDriver {
103 @safe:
104 
105 	import std.container : DList;
106 	import std.datetime : Clock;
107 
108 	private {
109 		DriverCore m_core;
110 		event_base* m_eventLoop;
111 		evdns_base* m_dnsBase;
112 		bool m_exit = false;
113 		ArraySet!size_t m_ownedObjects;
114 		debug Thread m_ownerThread;
115 
116 		event* m_timerEvent;
117 		SysTime m_timerTimeout = SysTime.max;
118 		TimerQueue!TimerInfo m_timers;
119 		DList!AddressInfo m_addressInfoCache;
120 		size_t m_addressInfoCacheLength = 0;
121 
122 		bool m_running = false; // runEventLoop in progress?
123 
124 		IAllocator m_allocator;
125 	}
126 
127 	this(DriverCore core) @trusted nothrow
128 	{
129 		debug m_ownerThread = () @trusted { return Thread.getThis(); } ();
130 		m_core = core;
131 		s_driverCore = core;
132 
133 		m_allocator = Mallocator.instance.allocatorObject;
134 		s_driver = this;
135 
136 		synchronized if (!s_threadObjectsMutex) {
137 			s_threadObjectsMutex = new Mutex;
138 			s_threadObjects.setAllocator(m_allocator);
139 
140 			// set the malloc/free versions of our runtime so we don't run into trouble
141 			// because the libevent DLL uses a different one.
142 			event_set_mem_functions(&lev_alloc, &lev_realloc, &lev_free);
143 
144 			evthread_lock_callbacks lcb;
145 			lcb.lock_api_version = EVTHREAD_LOCK_API_VERSION;
146 			lcb.supported_locktypes = EVTHREAD_LOCKTYPE_RECURSIVE|EVTHREAD_LOCKTYPE_READWRITE;
147 			lcb.alloc = &lev_alloc_mutex;
148 			lcb.free = &lev_free_mutex;
149 			lcb.lock = &lev_lock_mutex;
150 			lcb.unlock = &lev_unlock_mutex;
151 			evthread_set_lock_callbacks(&lcb);
152 
153 			evthread_condition_callbacks ccb;
154 			ccb.condition_api_version = EVTHREAD_CONDITION_API_VERSION;
155 			ccb.alloc_condition = &lev_alloc_condition;
156 			ccb.free_condition = &lev_free_condition;
157 			ccb.signal_condition = &lev_signal_condition;
158 			ccb.wait_condition = &lev_wait_condition;
159 			evthread_set_condition_callbacks(&ccb);
160 
161 			evthread_set_id_callback(&lev_get_thread_id);
162 		}
163 
164 		// initialize libevent
165 		logDiagnostic("libevent version: %s", event_get_version());
166 		m_eventLoop = event_base_new();
167 		s_eventLoop = m_eventLoop;
168 		logDiagnostic("libevent is using %s for events.", event_base_get_method(m_eventLoop));
169 		evthread_make_base_notifiable(m_eventLoop);
170 
171 		m_dnsBase = evdns_base_new(m_eventLoop, 1);
172 		if( !m_dnsBase ) logError("Failed to initialize DNS lookup.");
173 		evdns_base_set_option(m_dnsBase, "randomize-case:", "0");
174 
175 		string hosts_file;
176 		version (Windows) hosts_file = `C:\Windows\System32\drivers\etc\hosts`;
177 		else hosts_file = `/etc/hosts`;
178 		if (existsFile(hosts_file)) {
179 			if (evdns_base_load_hosts(m_dnsBase, hosts_file.toStringz()) != 0)
180 				logError("Failed to load hosts file at %s", hosts_file);
181 		}
182 
183 		m_timerEvent = () @trusted { return event_new(m_eventLoop, -1, EV_TIMEOUT, &onTimerTimeout, cast(void*)this); } ();
184 	}
185 
186 	void dispose()
187 	{
188 		debug assert(() @trusted { return Thread.getThis(); } () is m_ownerThread, "Event loop destroyed in foreign thread.");
189 
190 		() @trusted { event_free(m_timerEvent); } ();
191 
192 		// notify all other living objects about the shutdown
193 		synchronized (() @trusted { return s_threadObjectsMutex; } ()) {
194 			// destroy all living objects owned by this driver
195 			foreach (ref key; m_ownedObjects) {
196 				assert(key);
197 				auto obj = () @trusted { return cast(Libevent2Object)cast(void*)key; } ();
198 				debug assert(obj.m_ownerThread is m_ownerThread, "Owned object with foreign thread ID detected.");
199 				debug assert(obj.m_driver is this, "Owned object with foreign driver reference detected.");
200 				key = 0;
201 				() @trusted { destroy(obj); } ();
202 			}
203 
204 			ref getThreadObjects() @trusted { return s_threadObjects; }
205 
206 			foreach (ref key; getThreadObjects()) {
207 				assert(key);
208 				auto obj = () @trusted { return cast(Libevent2Object)cast(void*)key; } ();
209 				debug assert(obj.m_ownerThread !is m_ownerThread, "Live object of this thread detected after all owned mutexes have been destroyed.");
210 				debug assert(obj.m_driver !is this, "Live object of this driver detected with different thread ID after all owned mutexes have been destroyed.");
211 				// WORKAROUND for a possible race-condition in case of concurrent GC collections
212 				// Since this only occurs on shutdown and rarely, this should be an acceptable
213 				// "solution" until this is all switched to RC.
214 				if (auto me = cast(Libevent2ManualEvent)obj)
215 					if (!me.m_mutex) continue;
216 				obj.onThreadShutdown();
217 			}
218 		}
219 
220 		// shutdown libevent for this thread
221 		() @trusted {
222 			evdns_base_free(m_dnsBase, 1);
223 			event_base_free(m_eventLoop);
224 		} ();
225 		s_eventLoop = null;
226 		s_alreadyDeinitialized = true;
227 	}
228 
229 	@property event_base* eventLoop() nothrow { return m_eventLoop; }
230 	@property evdns_base* dnsEngine() nothrow { return m_dnsBase; }
231 
232 	int runEventLoop()
233 	{
234 		m_running = true;
235 		scope (exit) m_running = false;
236 
237 		int ret;
238 		m_exit = false;
239 		while (!m_exit && (ret = () @trusted { return event_base_loop(m_eventLoop, EVLOOP_ONCE); } ()) == 0) {
240 			processTimers();
241 			() @trusted { return s_driverCore; } ().notifyIdle();
242 		}
243 		m_exit = false;
244 		return ret;
245 	}
246 
247 	int runEventLoopOnce()
248 	{
249 		auto ret = () @trusted { return event_base_loop(m_eventLoop, EVLOOP_ONCE); } ();
250 		processTimers();
251 		m_core.notifyIdle();
252 		return ret;
253 	}
254 
255 	bool processEvents()
256 	{
257 		logDebugV("process events with exit == %s", m_exit);
258 		() @trusted { event_base_loop(m_eventLoop, EVLOOP_NONBLOCK|EVLOOP_ONCE); } ();
259 		processTimers();
260 		logDebugV("processed events with exit == %s", m_exit);
261 		if (m_exit) {
262 			// leave the flag set, if the event loop is still running to let it exit, too
263 			if (!m_running) m_exit = false;
264 			return false;
265 		}
266 		return true;
267 	}
268 
269 	void exitEventLoop()
270 	{
271 		logDebug("Libevent2Driver.exitEventLoop called");
272 		m_exit = true;
273 		enforce(() @trusted { return event_base_loopbreak(m_eventLoop); } () == 0, "Failed to exit libevent event loop.");
274 	}
275 
276 	ThreadedFileStream openFile(Path path, FileMode mode)
277 	{
278 		return new ThreadedFileStream(path, mode);
279 	}
280 
281 	DirectoryWatcher watchDirectory(Path path, bool recursive)
282 	{
283 		version (linux) return new InotifyDirectoryWatcher(m_core, path, recursive);
284 		assert(false, "watchDirectory is not yet implemented in the libevent driver.");
285 	}
286 
287 	NetworkAddress resolveHost(string host, ushort family = AF_UNSPEC, bool use_dns = true)
288 	{
289 		assert(m_dnsBase);
290 
291 		foreach (ai; m_addressInfoCache)
292 			if (ai.host == host && ai.family == family && ai.useDNS == use_dns)
293 				return ai.address;
294 
295 		evutil_addrinfo hints;
296 		hints.ai_family = family;
297 		if (!use_dns) {
298 			//When this flag is set, we only resolve numeric IPv4 and IPv6
299 			//addresses; if the nodename would require a name lookup, we instead
300 			//give an EVUTIL_EAI_NONAME error.
301 			hints.ai_flags = EVUTIL_AI_NUMERICHOST;
302 		}
303 
304 		logDebug("dnsresolve %s", host);
305 		GetAddrInfoMsg msg;
306 		msg.core = m_core;
307 		evdns_getaddrinfo_request* dnsReq = () @trusted { return evdns_getaddrinfo(m_dnsBase, toStringz(host), null,
308 			&hints, &onAddrInfo, &msg); } ();
309 
310 		// wait if the request couldn't be fulfilled instantly
311 		if (!msg.done) {
312 			assert(dnsReq !is null);
313 			msg.task = Task.getThis();
314 			logDebug("dnsresolve yield");
315 			while (!msg.done) m_core.yieldForEvent();
316 		}
317 
318 		logDebug("dnsresolve ret");
319 		enforce(msg.err == DNS_ERR_NONE, format("Failed to lookup host '%s': %s", host, () @trusted { return evutil_gai_strerror(msg.err); } ()));
320 
321 		if (m_addressInfoCacheLength >= 10) m_addressInfoCache.removeFront();
322 		else m_addressInfoCacheLength++;
323 		m_addressInfoCache.insertBack(AddressInfo(msg.addr, host, family, use_dns));
324 		return msg.addr;
325 	}
326 
327 	Libevent2TCPConnection connectTCP(NetworkAddress addr, NetworkAddress bind_addr)
328 	{
329 		assert(addr.family == bind_addr.family, "Mismatching bind and target address.");
330 
331 		auto sockfd_raw = () @trusted { return socket(addr.family, SOCK_STREAM, 0); } ();
332 		// on Win64 socket() returns a 64-bit value but libevent expects an int
333 		static if (typeof(sockfd_raw).max > int.max) assert(sockfd_raw <= int.max || sockfd_raw == ~0);
334 		auto sockfd = cast(int)sockfd_raw;
335 		socketEnforce(sockfd != -1, "Failed to create socket.");
336 
337 		socketEnforce(() @trusted { return bind(sockfd, bind_addr.sockAddr, bind_addr.sockAddrLen); } () == 0, "Failed to bind socket.");
338 
339 		if (() @trusted { return evutil_make_socket_nonblocking(sockfd); } ())
340 			throw new Exception("Failed to make socket non-blocking.");
341 
342 		auto buf_event = () @trusted { return bufferevent_socket_new(m_eventLoop, sockfd, bufferevent_options.BEV_OPT_CLOSE_ON_FREE); } ();
343 		if (!buf_event) throw new Exception("Failed to create buffer event for socket.");
344 
345 		auto cctx = () @trusted { return TCPContextAlloc.alloc(m_core, m_eventLoop, sockfd, buf_event, bind_addr, addr); } ();
346 		scope(failure) () @trusted {
347 			if (cctx.event) bufferevent_free(cctx.event);
348 			TCPContextAlloc.free(cctx);
349 		} ();
350 		() @trusted { bufferevent_setcb(buf_event, &onSocketRead, &onSocketWrite, &onSocketEvent, cctx); } ();
351 		if (() @trusted { return bufferevent_enable(buf_event, EV_READ|EV_WRITE); } ())
352 			throw new Exception("Error enabling buffered I/O event for socket.");
353 
354 		cctx.readOwner = Task.getThis();
355 		scope(exit) cctx.readOwner = Task();
356 
357 		assert(cctx.exception is null);
358 		socketEnforce(() @trusted { return bufferevent_socket_connect(buf_event, addr.sockAddr, addr.sockAddrLen); } () == 0,
359 			"Failed to connect to " ~ addr.toString());
360 
361 		try {
362 			cctx.checkForException();
363 
364 			// TODO: cctx.remote_addr6 = ...;
365 
366 			while (cctx.status == 0)
367 				m_core.yieldForEvent();
368 		} catch (InterruptException e) {
369 			throw e;
370 		} catch (Exception e) {
371 			throw new Exception(format("Failed to connect to %s: %s", addr.toString(), e.msg));
372 		}
373 
374 		logTrace("Connect result status: %d", cctx.status);
375 		enforce(cctx.status == BEV_EVENT_CONNECTED, cctx.statusMessage
376 			? format("Failed to connect to host %s: %s", addr.toString(), cctx.statusMessage)
377 			: format("Failed to connect to host %s: %s", addr.toString(), cctx.status));
378 
379 		socklen_t balen = bind_addr.sockAddrLen;
380 		socketEnforce(() @trusted { return getsockname(sockfd, bind_addr.sockAddr, &balen); } () == 0, "getsockname failed.");
381 		cctx.local_addr = bind_addr;
382 
383 		return new Libevent2TCPConnection(cctx);
384 	}
385 
386 	Libevent2TCPListener listenTCP(ushort port, void delegate(TCPConnection conn) @safe connection_callback, string address, TCPListenOptions options)
387 	{
388 		auto bind_addr = resolveHost(address, AF_UNSPEC, false);
389 		bind_addr.port = port;
390 
391 		auto listenfd_raw = () @trusted { return socket(bind_addr.family, SOCK_STREAM, 0); } ();
392 		// on Win64 socket() returns a 64-bit value but libevent expects an int
393 		static if (typeof(listenfd_raw).max > int.max) assert(listenfd_raw <= int.max || listenfd_raw == ~0);
394 		auto listenfd = cast(int)listenfd_raw;
395 		socketEnforce(listenfd != -1, "Error creating listening socket");
396 		int tmp_reuse = 1;
397 		socketEnforce(() @trusted { return setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof); } () == 0,
398 			"Error enabling socket address reuse on listening socket");
399 		version (linux) {
400 			if (options & TCPListenOptions.reusePort) {
401 				if (() @trusted { return setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, &tmp_reuse, tmp_reuse.sizeof); } ()) {
402 					if (errno != EINVAL && errno != ENOPROTOOPT) {
403 						socketEnforce(false, "Error enabling socket port reuse on listening socket");
404 					}
405 				}
406 			}
407 		}
408 		socketEnforce(() @trusted { return bind(listenfd, bind_addr.sockAddr, bind_addr.sockAddrLen); } () == 0,
409 			"Error binding listening socket");
410 
411 		socketEnforce(() @trusted { return listen(listenfd, 128); } () == 0,
412 			"Error listening to listening socket");
413 
414 		// Set socket for non-blocking I/O
415 		enforce(() @trusted { return evutil_make_socket_nonblocking(listenfd); } () == 0,
416 			"Error setting listening socket to non-blocking I/O.");
417 
418 		socklen_t balen = bind_addr.sockAddrLen;
419 		socketEnforce(() @trusted { return getsockname(listenfd, bind_addr.sockAddr, &balen); } () == 0, "getsockname failed.");
420 
421 		auto ret = new Libevent2TCPListener(bind_addr);
422 
423 		static final class HandlerContext {
424 			Libevent2TCPListener listener;
425 			int listenfd;
426 			NetworkAddress bind_addr;
427 			void delegate(TCPConnection) @safe connection_callback;
428 			TCPListenOptions options;
429 		}
430 
431 		auto hc = new HandlerContext;
432 		hc.listener = ret;
433 		hc.listenfd = listenfd;
434 		hc.bind_addr = bind_addr;
435 		hc.connection_callback = connection_callback;
436 		hc.options = options;
437 
438 		static void setupConnectionHandler(shared(HandlerContext) handler_context_)
439 		@safe {
440 			auto handler_context = () @trusted { return cast(HandlerContext)handler_context_; } ();
441 			auto evloop = getThreadLibeventEventLoop();
442 			auto core = getThreadLibeventDriverCore();
443 			// Add an event to wait for connections
444 			auto ctx = () @trusted { return TCPContextAlloc.alloc(core, evloop, handler_context.listenfd, null, handler_context.bind_addr, NetworkAddress()); } ();
445 			scope(failure) () @trusted { TCPContextAlloc.free(ctx); } ();
446 			ctx.connectionCallback = handler_context.connection_callback;
447 			ctx.listenEvent = () @trusted { return event_new(evloop, handler_context.listenfd, EV_READ | EV_PERSIST, &onConnect, ctx); } ();
448 			ctx.listenOptions = handler_context.options;
449 			enforce(() @trusted { return event_add(ctx.listenEvent, null); } () == 0,
450 				"Error scheduling connection event on the event loop.");
451 			handler_context.listener.addContext(ctx);
452 		}
453 
454 		// FIXME: the API needs improvement with proper shared annotations, so the the following casts are not necessary
455 		if (options & TCPListenOptions.distribute) () @trusted { return runWorkerTaskDist(&setupConnectionHandler, cast(shared)hc); } ();
456 		else setupConnectionHandler(() @trusted { return cast(shared)hc; } ());
457 
458 		return ret;
459 	}
460 
461 	Libevent2UDPConnection listenUDP(ushort port, string bind_address = "0.0.0.0")
462 	{
463 		NetworkAddress bindaddr = resolveHost(bind_address, AF_UNSPEC, false);
464 		bindaddr.port = port;
465 
466 		return new Libevent2UDPConnection(bindaddr, this);
467 	}
468 
469 	Libevent2ManualEvent createManualEvent()
470 	{
471 		return new Libevent2ManualEvent(this);
472 	}
473 
474 	Libevent2FileDescriptorEvent createFileDescriptorEvent(int fd, FileDescriptorEvent.Trigger events, FileDescriptorEvent.Mode mode)
475 	{
476 		return new Libevent2FileDescriptorEvent(this, fd, events, mode);
477 	}
478 
479 	size_t createTimer(void delegate() @safe callback) { return m_timers.create(TimerInfo(callback)); }
480 
481 	void acquireTimer(size_t timer_id) { m_timers.getUserData(timer_id).refCount++; }
482 	void releaseTimer(size_t timer_id)
483 	nothrow {
484 		debug assert(m_ownerThread is () @trusted { return Thread.getThis(); } ());
485 		if (!--m_timers.getUserData(timer_id).refCount)
486 			m_timers.destroy(timer_id);
487 	}
488 
489 	bool isTimerPending(size_t timer_id) { return m_timers.isPending(timer_id); }
490 
491 	void rearmTimer(size_t timer_id, Duration dur, bool periodic)
492 	{
493 		debug assert(m_ownerThread is () @trusted { return Thread.getThis(); } ());
494 		if (!isTimerPending(timer_id)) acquireTimer(timer_id);
495 		m_timers.schedule(timer_id, dur, periodic);
496 		rescheduleTimerEvent(Clock.currTime(UTC()));
497 	}
498 
499 	void stopTimer(size_t timer_id)
500 	{
501 		logTrace("Stopping timer %s", timer_id);
502 		if (m_timers.isPending(timer_id)) {
503 			m_timers.unschedule(timer_id);
504 			releaseTimer(timer_id);
505 		}
506 	}
507 
508 	void waitTimer(size_t timer_id)
509 	{
510 		debug assert(m_ownerThread is () @trusted { return Thread.getThis(); } ());
511 		while (true) {
512 			assert(!m_timers.isPeriodic(timer_id), "Cannot wait for a periodic timer.");
513 			if (!m_timers.isPending(timer_id)) return;
514 			auto data = () @trusted { return &m_timers.getUserData(timer_id); } ();
515 			assert(data.owner == Task.init, "Waiting for the same timer from multiple tasks is not supported.");
516 			data.owner = Task.getThis();
517 			scope (exit) m_timers.getUserData(timer_id).owner = Task.init;
518 			m_core.yieldForEvent();
519 		}
520 	}
521 
522 	private void processTimers()
523 	{
524 		if (!m_timers.anyPending) return;
525 
526 		logTrace("Processing due timers");
527 		// process all timers that have expired up to now
528 		auto now = Clock.currTime(UTC());
529 		m_timers.consumeTimeouts(now, (timer, periodic, ref data) @safe {
530 			Task owner = data.owner;
531 			auto callback = data.callback;
532 
533 			logTrace("Timer %s fired (%s/%s)", timer, owner != Task.init, callback !is null);
534 
535 			if (!periodic) releaseTimer(timer);
536 
537 			if (owner && owner.running) m_core.resumeTask(owner);
538 			if (callback) () @trusted { runTask(callback); } ();
539 		});
540 
541 		rescheduleTimerEvent(now);
542 	}
543 
544 	private void rescheduleTimerEvent(SysTime now)
545 	{
546 		auto next = m_timers.getFirstTimeout();
547 		if (next == SysTime.max || next == m_timerTimeout) return;
548 
549 		m_timerTimeout = now;
550 		auto dur = next - now;
551 		() @trusted { event_del(m_timerEvent); } ();
552 		assert(dur.total!"seconds"() <= int.max);
553 		dur += 9.hnsecs(); // round up to the next usec to avoid premature timer events
554 		timeval tvdur = dur.toTimeVal();
555 		() @trusted { event_add(m_timerEvent, &tvdur); } ();
556 		assert(() @trusted { return event_pending(m_timerEvent, EV_TIMEOUT, null); } ());
557 		logTrace("Rescheduled timer event for %s seconds", dur.total!"usecs" * 1e-6);
558 	}
559 
560 	private static nothrow extern(C)
561 	void onTimerTimeout(evutil_socket_t, short events, void* userptr)
562 	{
563 		import std.encoding : sanitize;
564 
565 		logTrace("timer event fired");
566 		auto drv = () @trusted { return cast(Libevent2Driver)userptr; } ();
567 		try drv.processTimers();
568 		catch (Exception e) {
569 			logError("Failed to process timers: %s", e.msg);
570 			try logDiagnostic("Full error: %s", () @trusted { return e.toString().sanitize; } ());
571 			catch (Exception e) {
572 				logError("Failed to process timers: %s", e.msg);
573 			}
574 		}
575 	}
576 
577 	private static nothrow extern(C) void onAddrInfo(int err, evutil_addrinfo* res, void* arg)
578 	{
579 		auto msg = () @trusted { return cast(GetAddrInfoMsg*)arg; } ();
580 		msg.err = err;
581 		msg.done = true;
582 		if (err == DNS_ERR_NONE) {
583 			assert(res !is null);
584 			scope (exit) () @trusted { evutil_freeaddrinfo(res); } ();
585 
586 			// Note that we are only returning the first address and ignoring the
587 			// rest. Ideally we should return all of the NetworkAddress
588 			msg.addr.family = cast(ushort)res.ai_family;
589 			assert(res.ai_addrlen == msg.addr.sockAddrLen());
590 			switch (msg.addr.family) {
591 				case AF_INET:
592 					auto sock4 = cast(sockaddr_in*)res.ai_addr;
593 					msg.addr.sockAddrInet4.sin_addr.s_addr = sock4.sin_addr.s_addr;
594 					break;
595 				case AF_INET6:
596 					auto sock6 = () @trusted { return cast(sockaddr_in6*)res.ai_addr; } ();
597 					msg.addr.sockAddrInet6.sin6_addr.s6_addr = sock6.sin6_addr.s6_addr;
598 					break;
599 				default:
600 					logDiagnostic("DNS lookup yielded unknown address family: %s", msg.addr.family);
601 					err = DNS_ERR_UNKNOWN;
602 					break;
603 			}
604 		}
605 		if (msg.task && msg.task.running) {
606 			try msg.core.resumeTask(msg.task);
607 			catch (Exception e) logWarn("Error resuming DNS query task: %s", e.msg);
608 		}
609 	}
610 
611 	private void registerObject(Libevent2Object obj)
612 	nothrow {
613 		debug assert(() @trusted { return Thread.getThis(); } () is m_ownerThread, "Event object created in foreign thread.");
614 		auto key = () @trusted { return cast(size_t)cast(void*)obj; } ();
615 		m_ownedObjects.insert(key);
616 		if (obj.m_threadObject)
617 			() @trusted {
618 				scope (failure) assert(false); // synchronized is not nothrow
619 				synchronized (s_threadObjectsMutex)
620 					s_threadObjects.insert(key);
621 			} ();
622 	}
623 
624 	private void unregisterObject(Libevent2Object obj)
625 	nothrow {
626 		scope (failure) assert(false); // synchronized is not nothrow
627 
628 		auto key = () @trusted { return cast(size_t)cast(void*)obj; } ();
629 		m_ownedObjects.remove(key);
630 		if (obj.m_threadObject)
631 			() @trusted {
632 				synchronized (s_threadObjectsMutex)
633 					s_threadObjects.remove(key);
634 			} ();
635 	}
636 }
637 
638 private struct TimerInfo {
639 	size_t refCount = 1;
640 	void delegate() @safe callback;
641 	Task owner;
642 
643 	this(void delegate() @safe callback) @safe { this.callback = callback; }
644 }
645 
646 struct AddressInfo {
647 	NetworkAddress address;
648 	string host;
649 	ushort family;
650 	bool useDNS;
651 }
652 
653 
654 private struct GetAddrInfoMsg {
655 	NetworkAddress addr;
656 	bool done = false;
657 	int err = 0;
658 	DriverCore core;
659 	Task task;
660 }
661 
662 private class Libevent2Object {
663 	protected Libevent2Driver m_driver;
664 	debug private Thread m_ownerThread;
665 	private bool m_threadObject;
666 
667 	this(Libevent2Driver driver, bool thread_object)
668 	nothrow @safe {
669 		m_threadObject = thread_object;
670 		m_driver = driver;
671 		m_driver.registerObject(this);
672 		debug m_ownerThread = driver.m_ownerThread;
673 	}
674 
675 	~this()
676 	@trusted {
677 		// NOTE: m_driver will always be destroyed deterministically
678 		//       in static ~this(), so it can be used here safely
679 		m_driver.unregisterObject(this);
680 	}
681 
682 	protected void onThreadShutdown() @safe {}
683 }
684 
685 /// private
686 struct ThreadSlot {
687 	Libevent2Driver driver;
688 	deimos.event2.event.event* event;
689 	ArraySet!Task tasks;
690 }
691 /// private
692 alias ThreadSlotMap = HashMap!(Thread, ThreadSlot);
693 
694 final class Libevent2ManualEvent : Libevent2Object, ManualEvent {
695 @safe:
696 
697 	private {
698 		shared(int) m_emitCount = 0;
699 		core.sync.mutex.Mutex m_mutex;
700 		ThreadSlotMap m_waiters;
701 	}
702 
703 	this(Libevent2Driver driver)
704 	nothrow {
705 		super(driver, true);
706 		scope (failure) assert(false);
707 		m_mutex = new core.sync.mutex.Mutex;
708 		m_waiters = ThreadSlotMap(driver.m_allocator);
709 	}
710 
711 	~this()
712 	{
713 		m_mutex = null; // Optimistic race-condition detection (see Libevent2Driver.dispose())
714 		foreach (ref m_waiters.Value ts; m_waiters)
715 			() @trusted { event_free(ts.event); } ();
716 	}
717 
718 	void emit()
719 	{
720 		static if (!synchronizedIsNothrow)
721 			scope (failure) assert(0, "Internal error: function should be nothrow");
722 
723 		() @trusted { atomicOp!"+="(m_emitCount, 1); } ();
724 		synchronized (m_mutex) {
725 			foreach (ref m_waiters.Value sl; m_waiters)
726 				() @trusted { event_active(sl.event, 0, 0); } ();
727 		}
728 	}
729 
730 	void wait() { wait(m_emitCount); }
731 	int wait(int reference_emit_count) { return  doWait!true(reference_emit_count); }
732 	int wait(Duration timeout, int reference_emit_count) { return doWait!true(timeout, reference_emit_count); }
733 	int waitUninterruptible(int reference_emit_count) { return  doWait!false(reference_emit_count); }
734 	int waitUninterruptible(Duration timeout, int reference_emit_count) { return doWait!false(timeout, reference_emit_count); }
735 
736 	void acquire()
737 	{
738 		auto task = Task.getThis();
739 		auto thread = task == Task() ? () @trusted { return Thread.getThis(); } () : task.thread;
740 
741 		synchronized (m_mutex) {
742 			if (thread !in m_waiters) {
743 				ThreadSlot slot;
744 				slot.driver = cast(Libevent2Driver)getEventDriver();
745 				slot.event = () @trusted { return event_new(slot.driver.eventLoop, -1, EV_PERSIST, &onSignalTriggered, cast(void*)this); } ();
746 				() @trusted { event_add(slot.event, null); } ();
747 				m_waiters[thread] = slot;
748 			}
749 
750 			if (task != Task()) {
751 				assert(task !in m_waiters[thread].tasks, "Double acquisition of signal.");
752 				m_waiters[thread].tasks.insert(task);
753 			}
754 		}
755 	}
756 
757 	void release()
758 	{
759 		auto self = Task.getThis();
760 		if (self == Task()) return;
761 
762 		synchronized (m_mutex) {
763 			assert(self.thread in m_waiters && self in m_waiters[self.thread].tasks,
764 				"Releasing non-acquired signal.");
765 			m_waiters[self.thread].tasks.remove(self);
766 		}
767 	}
768 
769 	bool amOwner()
770 	{
771 		auto self = Task.getThis();
772 		if (self == Task()) return false;
773 		synchronized (m_mutex) {
774 			if (self.thread !in m_waiters) return false;
775 			return self in m_waiters[self.thread].tasks;
776 		}
777 	}
778 
779 	@property int emitCount() const @trusted { return atomicLoad(m_emitCount); }
780 
781 	protected override void onThreadShutdown()
782 	{
783 		auto thr = () @trusted { return Thread.getThis(); } ();
784 		synchronized (m_mutex) {
785 			if (thr in m_waiters) {
786 				() @trusted { event_free(m_waiters[thr].event); } ();
787 				m_waiters.remove(thr);
788 			}
789 		}
790 	}
791 
792 	private int doWait(bool INTERRUPTIBLE)(int reference_emit_count)
793 	{
794 		static if (!INTERRUPTIBLE) scope (failure) assert(false); // still some function calls not marked nothrow
795 		assert(!amOwner());
796 
797 		auto ec = this.emitCount;
798 		if (ec != reference_emit_count) return ec;
799 
800 		acquire();
801 		scope(exit) release();
802 
803 		while (ec == reference_emit_count) {
804 			static if (INTERRUPTIBLE) getThreadLibeventDriverCore().yieldForEvent();
805 			else getThreadLibeventDriverCore().yieldForEventDeferThrow();
806 			ec = this.emitCount;
807 		}
808 		return ec;
809 	}
810 
811 	private int doWait(bool INTERRUPTIBLE)(Duration timeout, int reference_emit_count)
812 	{
813 		static if (!INTERRUPTIBLE) scope (failure) assert(false); // still some function calls not marked nothrow
814 		assert(!amOwner());
815 
816 		auto ec = this.emitCount;
817 		if (ec != reference_emit_count) return ec;
818 
819 		acquire();
820 		scope(exit) release();
821 		auto tm = m_driver.createTimer(null);
822 		scope (exit) m_driver.releaseTimer(tm);
823 		m_driver.m_timers.getUserData(tm).owner = Task.getThis();
824 		m_driver.rearmTimer(tm, timeout, false);
825 
826 		while (ec == reference_emit_count) {
827 			static if (INTERRUPTIBLE) getThreadLibeventDriverCore().yieldForEvent();
828 			else getThreadLibeventDriverCore().yieldForEventDeferThrow();
829 			ec = this.emitCount;
830 			if (!m_driver.isTimerPending(tm)) break;
831 		}
832 		return ec;
833 	}
834 
835 	private static nothrow extern(C)
836 	void onSignalTriggered(evutil_socket_t, short events, void* userptr)
837 	{
838 		import std.encoding : sanitize;
839 
840 		try {
841 			auto sig = () @trusted { return cast(Libevent2ManualEvent)userptr; } ();
842 			auto thread = () @trusted { return Thread.getThis(); } ();
843 			auto core = getThreadLibeventDriverCore();
844 
845 			ArraySet!Task lst;
846 			synchronized (sig.m_mutex) {
847 				assert(thread in sig.m_waiters);
848 				lst = sig.m_waiters[thread].tasks.dup;
849 			}
850 
851 			foreach (l; lst)
852 				core.resumeTask(l);
853 		} catch (Exception e) {
854 			logError("Exception while handling signal event: %s", e.msg);
855 			try logDiagnostic("Full error: %s", () @trusted { return sanitize(e.msg); } ());
856 			catch(Exception) {}
857 			debug assert(false);
858 		}
859 	}
860 }
861 
862 
863 final class Libevent2FileDescriptorEvent : Libevent2Object, FileDescriptorEvent {
864 @safe:
865 
866 	private {
867 		int m_fd;
868 		deimos.event2.event.event* m_event;
869 		bool m_persistent;
870 		Trigger m_activeEvents;
871 		Task m_waiter;
872 	}
873 
874 	this(Libevent2Driver driver, int file_descriptor, Trigger events, Mode mode)
875 	{
876 		assert(events != Trigger.none);
877 		super(driver, false);
878 		m_fd = file_descriptor;
879 		m_persistent = mode != Mode.nonPersistent;
880 		short evts = 0;
881 		if (events & Trigger.read) evts |= EV_READ;
882 		if (events & Trigger.write) evts |= EV_WRITE;
883 		if (m_persistent) evts |= EV_PERSIST;
884 		if (mode == Mode.edgeTriggered) evts |= EV_ET;
885 		m_event = () @trusted { return event_new(driver.eventLoop, file_descriptor, evts, &onFileTriggered, cast(void*)this); } ();
886 		if (m_persistent) () @trusted { event_add(m_event, null); } ();
887 	}
888 
889 	~this()
890 	{
891 		() @trusted { event_free(m_event); } ();
892 	}
893 
894 	Trigger wait(Trigger which)
895 	{
896 		assert(!m_waiter, "Only one task may wait on a Libevent2FileEvent.");
897 		m_waiter = Task.getThis();
898 		scope (exit) {
899 			m_waiter = Task.init;
900 			m_activeEvents &= ~which;
901 		}
902 
903 		while ((m_activeEvents & which) == Trigger.none) {
904 			if (!m_persistent) () @trusted { event_add(m_event, null); } ();
905 			getThreadLibeventDriverCore().yieldForEvent();
906 		}
907 		return m_activeEvents & which;
908 	}
909 
910 	Trigger wait(Duration timeout, Trigger which)
911 	{
912 		assert(!m_waiter, "Only one task may wait on a Libevent2FileEvent.");
913 		m_waiter = Task.getThis();
914 		scope (exit) {
915 			m_waiter = Task.init;
916 			m_activeEvents &= ~which;
917 		}
918 
919 		auto tm = m_driver.createTimer(null);
920 		scope (exit) m_driver.releaseTimer(tm);
921 		m_driver.m_timers.getUserData(tm).owner = Task.getThis();
922 		m_driver.rearmTimer(tm, timeout, false);
923 
924 		while ((m_activeEvents & which) == Trigger.none) {
925 			if (!m_persistent) () @trusted { event_add(m_event, null); } ();
926 			getThreadLibeventDriverCore().yieldForEvent();
927 			if (!m_driver.isTimerPending(tm)) break;
928 		}
929 		return m_activeEvents & which;
930 	}
931 
932 	private static nothrow extern(C)
933 	void onFileTriggered(evutil_socket_t fd, short events, void* userptr)
934 	{
935 		import std.encoding : sanitize;
936 
937 		try {
938 			auto core = getThreadLibeventDriverCore();
939 			auto evt = () @trusted { return cast(Libevent2FileDescriptorEvent)userptr; } ();
940 
941 			evt.m_activeEvents = Trigger.none;
942 			if (events & EV_READ) evt.m_activeEvents |= Trigger.read;
943 			if (events & EV_WRITE) evt.m_activeEvents |= Trigger.write;
944 			if (evt.m_waiter) core.resumeTask(evt.m_waiter);
945 		} catch (Exception e) {
946 			logError("Exception while handling file event: %s", e.msg);
947 			try logDiagnostic("Full error: %s", () @trusted { return sanitize(e.msg); } ());
948 			catch(Exception) {}
949 			debug assert(false);
950 		}
951 	}
952 }
953 
954 
955 final class Libevent2UDPConnection : UDPConnection {
956 @safe:
957 
958 	private {
959 		Libevent2Driver m_driver;
960 		TCPContext* m_ctx;
961 		NetworkAddress m_bindAddress;
962 		string m_bindAddressString;
963 		bool m_canBroadcast = false;
964 	}
965 
966 	this(NetworkAddress bind_addr, Libevent2Driver driver)
967 	{
968 		m_driver = driver;
969 
970 		auto sockfd_raw = () @trusted { return socket(bind_addr.family, SOCK_DGRAM, IPPROTO_UDP); } ();
971 		// on Win64 socket() returns a 64-bit value but libevent expects an int
972 		static if (typeof(sockfd_raw).max > int.max) assert(sockfd_raw <= int.max || sockfd_raw == ~0);
973 		auto sockfd = cast(int)sockfd_raw;
974 		socketEnforce(sockfd != -1, "Failed to create socket.");
975 
976 		enforce(() @trusted { return evutil_make_socket_nonblocking(sockfd); } () == 0, "Failed to make socket non-blocking.");
977 
978 		int tmp_reuse = 1;
979 		socketEnforce(() @trusted { return setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &tmp_reuse, tmp_reuse.sizeof); } () == 0,
980 			"Error enabling socket address reuse on listening socket");
981 
982 		// bind the socket to a local inteface/port
983 		socketEnforce(() @trusted { return bind(sockfd, bind_addr.sockAddr, bind_addr.sockAddrLen); } () == 0, "Failed to bind UDP socket.");
984 		// read back the actual bind address
985 		socklen_t balen = bind_addr.sockAddrLen;
986 		socketEnforce(() @trusted { return getsockname(sockfd, bind_addr.sockAddr, &balen); } () == 0, "getsockname failed.");
987 
988 		// generate the bind address string
989 		m_bindAddress = bind_addr;
990 		char[64] buf;
991 		void* ptr;
992 		if( bind_addr.family == AF_INET ) ptr = &bind_addr.sockAddrInet4.sin_addr;
993 		else ptr = &bind_addr.sockAddrInet6.sin6_addr;
994 		() @trusted { evutil_inet_ntop(bind_addr.family, ptr, buf.ptr, buf.length); } ();
995 		m_bindAddressString = () @trusted { return to!string(buf.ptr); } ();
996 
997 		// create a context for storing connection information
998 		m_ctx = () @trusted { return TCPContextAlloc.alloc(driver.m_core, driver.m_eventLoop, sockfd, null, bind_addr, NetworkAddress()); } ();
999 		scope(failure) () @trusted { TCPContextAlloc.free(m_ctx); } ();
1000 		m_ctx.listenEvent = () @trusted { return event_new(driver.m_eventLoop, sockfd, EV_READ|EV_PERSIST, &onUDPRead, m_ctx); } ();
1001 		if (!m_ctx.listenEvent) throw new Exception("Failed to create buffer event for socket.");
1002 	}
1003 
1004 	@property string bindAddress() const { return m_bindAddressString; }
1005 	@property NetworkAddress localAddress() const { return m_bindAddress; }
1006 
1007 	@property bool canBroadcast() const { return m_canBroadcast; }
1008 	@property void canBroadcast(bool val)
1009 	{
1010 		int tmp_broad = val;
1011 		enforce(() @trusted { return setsockopt(m_ctx.socketfd, SOL_SOCKET, SO_BROADCAST, &tmp_broad, tmp_broad.sizeof); } () == 0,
1012 			"Failed to change the socket broadcast flag.");
1013 		m_canBroadcast = val;
1014 	}
1015 
1016 
1017 	bool amOwner() {
1018 		return m_ctx !is null && m_ctx.readOwner != Task() && m_ctx.readOwner == Task.getThis() && m_ctx.readOwner == m_ctx.writeOwner;
1019 	}
1020 
1021 	void acquire()
1022 	{
1023 		assert(m_ctx, "Trying to acquire a closed UDP connection.");
1024 		assert(m_ctx.readOwner == Task() && m_ctx.writeOwner == Task(),
1025 			"Trying to acquire a UDP connection that is currently owned.");
1026 		m_ctx.readOwner = m_ctx.writeOwner = Task.getThis();
1027 	}
1028 
1029 	void release()
1030 	{
1031 		if (!m_ctx) return;
1032 		assert(m_ctx.readOwner == Task.getThis() && m_ctx.readOwner == m_ctx.writeOwner,
1033 			"Trying to release a UDP connection that is not owned by the current task.");
1034 		m_ctx.readOwner = m_ctx.writeOwner = Task.init;
1035 	}
1036 
1037 	void close()
1038 	{
1039 		if (!m_ctx) return;
1040 		acquire();
1041 
1042 		if (m_ctx.listenEvent) () @trusted { event_free(m_ctx.listenEvent); } ();
1043 		() @trusted { TCPContextAlloc.free(m_ctx); } ();
1044 		m_ctx = null;
1045 	}
1046 
1047 	void connect(string host, ushort port)
1048 	{
1049 		NetworkAddress addr = m_driver.resolveHost(host, m_ctx.local_addr.family);
1050 		addr.port = port;
1051 		connect(addr);
1052 	}
1053 
1054 	void connect(NetworkAddress addr)
1055 	{
1056 		enforce(() @trusted { return .connect(m_ctx.socketfd, addr.sockAddr, addr.sockAddrLen); } () == 0, "Failed to connect UDP socket."~to!string(getLastSocketError()));
1057 	}
1058 
1059 	void send(in ubyte[] data, in NetworkAddress* peer_address = null)
1060 	{
1061 		sizediff_t ret;
1062 		assert(data.length <= int.max);
1063 		if( peer_address ){
1064 			ret = () @trusted { return .sendto(m_ctx.socketfd, data.ptr, cast(int)data.length, 0, peer_address.sockAddr, peer_address.sockAddrLen); } ();
1065 		} else {
1066 			ret = () @trusted { return .send(m_ctx.socketfd, data.ptr, cast(int)data.length, 0); } ();
1067 		}
1068 		logTrace("send ret: %s, %s", ret, getLastSocketError());
1069 		enforce(ret >= 0, "Error sending UDP packet.");
1070 		enforce(ret == data.length, "Unable to send full packet.");
1071 	}
1072 
1073 	ubyte[] recv(ubyte[] buf = null, NetworkAddress* peer_address = null)
1074 	{
1075 		return recv(Duration.max, buf, peer_address);
1076 	}
1077 
1078 	ubyte[] recv(Duration timeout, ubyte[] buf = null, NetworkAddress* peer_address = null)
1079 	{
1080 		size_t tm = size_t.max;
1081 		if (timeout >= 0.seconds && timeout != Duration.max) {
1082 			tm = m_driver.createTimer(null);
1083 			m_driver.m_timers.getUserData(tm).owner = Task.getThis();
1084 			m_driver.rearmTimer(tm, timeout, false);
1085 		}
1086 
1087 		acquire();
1088 		// TODO: adds the event only when we actually read to avoid event loop
1089 		// spinning when data is available, see #715. Since this may be
1090 		// performance critical, a proper benchmark should be performed!
1091 		enforce(() @trusted { return event_add(m_ctx.listenEvent, null); } () == 0);
1092 
1093 		scope (exit) {
1094 			() @trusted { event_del(m_ctx.listenEvent); } ();
1095 			release();
1096 			if (tm != size_t.max) m_driver.releaseTimer(tm);
1097 		}
1098 
1099 		if (buf.length == 0) buf.length = 65507;
1100 
1101 		NetworkAddress from;
1102 		from.family = m_ctx.local_addr.family;
1103 		assert(buf.length <= int.max);
1104 		while (true) {
1105 			socklen_t addr_len = from.sockAddrLen;
1106 			auto ret = () @trusted { return .recvfrom(m_ctx.socketfd, buf.ptr, cast(int)buf.length, 0, from.sockAddr, &addr_len); } ();
1107 			if (ret > 0) {
1108 				if( peer_address ) *peer_address = from;
1109 				return buf[0 .. ret];
1110 			}
1111 			if (ret < 0) {
1112 				auto err = getLastSocketError();
1113 				if (err != EWOULDBLOCK) {
1114 					logDebugV("UDP recv err: %s", err);
1115 					throw new Exception("Error receiving UDP packet.");
1116 				}
1117 				if (timeout != Duration.max) {
1118 					enforce(timeout > 0.seconds && m_driver.isTimerPending(tm), "UDP receive timeout.");
1119 				}
1120 			}
1121 			m_ctx.core.yieldForEvent();
1122 		}
1123 	}
1124 
1125 	override void addMembership(ref NetworkAddress multiaddr)
1126 	{
1127 		if (multiaddr.family == AF_INET)
1128 		{
1129 			version (Windows)
1130 			{
1131 				alias in_addr = core.sys.windows.winsock2.in_addr;
1132 			} else
1133 			{
1134 				static import core.sys.posix.arpa.inet;
1135 				alias in_addr = core.sys.posix.arpa.inet.in_addr;
1136 			}
1137 			struct ip_mreq {
1138 				in_addr imr_multiaddr;   /* IP multicast address of group */
1139 				in_addr imr_interface;   /* local IP address of interface */
1140 			}
1141 			auto inaddr = in_addr();
1142 			inaddr.s_addr = htonl(INADDR_ANY);
1143 			auto mreq = ip_mreq(multiaddr.sockAddrInet4.sin_addr, inaddr);
1144 			enforce(() @trusted { return setsockopt (m_ctx.socketfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, ip_mreq.sizeof); } () == 0,
1145 				"Failed to add to multicast group");
1146 		} else
1147 		{
1148 			version (Windows)
1149 			{
1150 				alias in6_addr = core.sys.windows.winsock2.in6_addr;
1151 				struct ipv6_mreq {
1152 					in6_addr ipv6mr_multiaddr;
1153 					uint ipv6mr_interface;
1154 				}
1155 			}
1156 			auto mreq = ipv6_mreq(multiaddr.sockAddrInet6.sin6_addr, 0);
1157 			enforce(() @trusted { return setsockopt (m_ctx.socketfd, IPPROTO_IP, IPV6_JOIN_GROUP, &mreq, ipv6_mreq.sizeof); } () == 0,
1158 				"Failed to add to multicast group");
1159 		}
1160 	}
1161 
1162 	@property void multicastLoopback(bool loop)
1163 	{
1164 		int tmp_loop = loop;
1165 		enforce(() @trusted { return setsockopt (m_ctx.socketfd, IPPROTO_IP, IP_MULTICAST_LOOP, &tmp_loop, tmp_loop.sizeof); } () == 0,
1166 			"Failed to add to multicast loopback");
1167 	}
1168 
1169 	private static nothrow extern(C) void onUDPRead(evutil_socket_t sockfd, short evts, void* arg)
1170 	{
1171 		auto ctx = () @trusted { return cast(TCPContext*)arg; } ();
1172 		logTrace("udp socket %d read event!", ctx.socketfd);
1173 
1174 		try {
1175 			auto f = ctx.readOwner;
1176 			if (f && f.running)
1177 				ctx.core.resumeTask(f);
1178 		} catch( Exception e ){
1179 			logError("Exception onUDPRead: %s", e.msg);
1180 			debug assert(false);
1181 		}
1182 	}
1183 }
1184 
1185 /******************************************************************************/
1186 /* InotifyDirectoryWatcher                                                    */
1187 /******************************************************************************/
1188 
1189 version (linux)
1190 final class InotifyDirectoryWatcher : DirectoryWatcher {
1191 @safe:
1192 
1193 	import core.sys.posix.fcntl, core.sys.posix.unistd, core.sys.linux.sys.inotify;
1194 	import std.file;
1195 
1196 	private {
1197 		Path m_path;
1198 		string[int] m_watches;
1199 		bool m_recursive;
1200 		int m_handle;
1201 		DriverCore m_core;
1202 		Task m_owner;
1203 	}
1204 
1205 	this(DriverCore core, Path path, bool recursive)
1206 	{
1207 		m_core = core;
1208 		m_recursive = recursive;
1209 		m_path = path;
1210 
1211 		enum IN_NONBLOCK = 0x800; // value in core.sys.linux.sys.inotify is incorrect
1212 		m_handle = () @trusted { return inotify_init1(IN_NONBLOCK); } ();
1213 		errnoEnforce(m_handle != -1, "Failed to initialize inotify.");
1214 
1215 		auto spath = m_path.toString();
1216 		addWatch(spath);
1217 		if (recursive && spath.isDir)
1218 		{
1219 			() @trusted {
1220 				foreach (de; spath.dirEntries(SpanMode.shallow))
1221 					if (de.isDir) addWatch(de.name);
1222 			} ();
1223 		}
1224 	}
1225 
1226 	~this()
1227 	{
1228 		errnoEnforce(() @trusted { return close(m_handle); } () == 0);
1229 	}
1230 
1231 	@property Path path() const { return m_path; }
1232 	@property bool recursive() const { return m_recursive; }
1233 
1234 	void release()
1235 	@safe {
1236 		assert(m_owner == Task.getThis(), "Releasing DirectoyWatcher that is not owned by the calling task.");
1237 		m_owner = Task();
1238 	}
1239 
1240 	void acquire()
1241 	@safe {
1242 		assert(m_owner == Task(), "Acquiring DirectoyWatcher that is already owned.");
1243 		m_owner = Task.getThis();
1244 	}
1245 
1246 	bool amOwner()
1247 	@safe {
1248 		return m_owner == Task.getThis();
1249 	}
1250 
1251 	bool readChanges(ref DirectoryChange[] dst, Duration timeout)
1252 	{
1253 		import core.stdc.stdio : FILENAME_MAX;
1254 		import core.stdc.string : strlen;
1255 
1256 		acquire();
1257 		scope(exit) release();
1258 
1259 		ubyte[inotify_event.sizeof + FILENAME_MAX + 1] buf = void;
1260 		auto nread = () @trusted { return read(m_handle, buf.ptr, buf.sizeof); } ();
1261 
1262 		if (nread == -1 && errno == EAGAIN)
1263 		{
1264 			if (!waitReadable(m_handle, timeout))
1265 				return false;
1266 			nread = () @trusted { return read(m_handle, buf.ptr, buf.sizeof); } ();
1267 		}
1268 		errnoEnforce(nread != -1, "Error while reading inotify handle.");
1269 		assert(nread > 0);
1270 
1271 		dst.length = 0;
1272 		do
1273 		{
1274 			for (size_t i = 0; i < nread;) {
1275 				auto ev = &(cast(inotify_event[])buf[i .. i+inotify_event.sizeof])[0];
1276 				if (ev.wd !in m_watches) {
1277 					logDebug("Got unknown inotify watch ID %s. Ignoring.", ev.wd);
1278 					continue;
1279 				}
1280 
1281 				DirectoryChangeType type;
1282 				if (ev.mask & (IN_CREATE|IN_MOVED_TO))
1283 					type = DirectoryChangeType.added;
1284 				else if (ev.mask & (IN_DELETE|IN_DELETE_SELF|IN_MOVE_SELF|IN_MOVED_FROM))
1285 					type = DirectoryChangeType.removed;
1286 				else if (ev.mask & IN_MODIFY)
1287 					type = DirectoryChangeType.modified;
1288 
1289 				import std.path : buildPath;
1290 				auto name = () @trusted { return ev.name.ptr[0 .. ev.name.ptr.strlen]; } ();
1291 				auto path = Path(buildPath(m_watches[ev.wd], name));
1292 
1293 				dst ~= DirectoryChange(type, path);
1294 
1295 				i += inotify_event.sizeof + ev.len;
1296 			}
1297 			nread = () @trusted { return read(m_handle, buf.ptr, buf.sizeof); } ();
1298 			errnoEnforce(nread != -1 || errno == EAGAIN, "Error while reading inotify handle.");
1299 		} while (nread > 0);
1300 		return true;
1301 	}
1302 
1303 	private bool waitReadable(int fd, Duration timeout)
1304 	@safe {
1305 		static struct Args { InotifyDirectoryWatcher watcher; bool readable, timeout; }
1306 
1307 		static extern(System) void cb(int fd, short what, void* p) {
1308 			with (() @trusted { return cast(Args*)p; } ()) {
1309 				if (what & EV_READ) readable = true;
1310 				if (what & EV_TIMEOUT) timeout = true;
1311 				if (watcher.m_owner)
1312 					watcher.m_core.resumeTask(watcher.m_owner);
1313 			}
1314 		}
1315 
1316 		auto loop = getThreadLibeventEventLoop();
1317 		auto args = Args(this);
1318 		auto ev = () @trusted { return event_new(loop, fd, EV_READ, &cb, &args); } ();
1319 		scope(exit) () @trusted { event_free(ev); } ();
1320 
1321 		if (!timeout.isNegative) {
1322 			auto tv = timeout.toTimeVal();
1323 			() @trusted { event_add(ev, &tv); } ();
1324 		} else {
1325 			() @trusted { event_add(ev, null); } ();
1326 		}
1327 		while (!args.readable && !args.timeout)
1328 			m_core.yieldForEvent();
1329 		return args.readable;
1330 	}
1331 
1332 	private void addWatch(string path)
1333 	@safe {
1334 		enum EVENTS = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY |
1335 			IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO;
1336 		immutable wd = () @trusted { return inotify_add_watch(m_handle, path.toStringz, EVENTS); } ();
1337 		errnoEnforce(wd != -1, "Failed to add inotify watch.");
1338 		m_watches[wd] = path;
1339 	}
1340 }
1341 
1342 
1343 private {
1344 
1345 	event_base* s_eventLoop; // TLS
1346 	Libevent2Driver s_driver;
1347 	__gshared DriverCore s_driverCore;
1348 	// protects s_threadObjects and the m_ownerThread and m_driver fields of Libevent2Object
1349 	__gshared Mutex s_threadObjectsMutex;
1350 	__gshared ArraySet!size_t s_threadObjects;
1351 	debug __gshared size_t[void*] s_mutexes;
1352 	debug __gshared Mutex s_mutexesLock;
1353 	bool s_alreadyDeinitialized = false;
1354 }
1355 
1356 package event_base* getThreadLibeventEventLoop() @safe nothrow
1357 {
1358 	return s_eventLoop;
1359 }
1360 
1361 package DriverCore getThreadLibeventDriverCore() @trusted nothrow
1362 {
1363 	return s_driverCore;
1364 }
1365 
1366 private int getLastSocketError() @trusted nothrow
1367 {
1368 	version(Windows) {
1369 		return WSAGetLastError();
1370 	} else {
1371 		import core.stdc.errno;
1372 		return errno;
1373 	}
1374 }
1375 
1376 struct LevCondition {
1377 	Condition cond;
1378 	LevMutex* mutex;
1379 }
1380 
1381 struct LevMutex {
1382 	core.sync.mutex.Mutex mutex;
1383 	ReadWriteMutex rwmutex;
1384 }
1385 
1386 alias LevConditionAlloc = FreeListObjectAlloc!(LevCondition, false);
1387 alias LevMutexAlloc = FreeListObjectAlloc!(LevMutex, false);
1388 alias MutexAlloc = FreeListObjectAlloc!(core.sync.mutex.Mutex, false);
1389 alias ReadWriteMutexAlloc = FreeListObjectAlloc!(ReadWriteMutex, false);
1390 alias ConditionAlloc = FreeListObjectAlloc!(Condition, false);
1391 
1392 private nothrow extern(C)
1393 {
1394 	version (VibeDebugCatchAll) alias UncaughtException = Throwable;
1395 	else alias UncaughtException = Exception;
1396 
1397 	void* lev_alloc(size_t size)
1398 	{
1399 		try {
1400 			auto mem = s_driver.m_allocator.allocate(size+size_t.sizeof);
1401 			if (!mem.ptr) return null;
1402 			*cast(size_t*)mem.ptr = size;
1403 			return mem.ptr + size_t.sizeof;
1404 		} catch (UncaughtException th) {
1405 			logWarn("Exception in lev_alloc: %s", th.msg);
1406 			return null;
1407 		}
1408 	}
1409 	void* lev_realloc(void* p, size_t newsize)
1410 	{
1411 		try {
1412 			if( !p ) return lev_alloc(newsize);
1413 			auto oldsize = *cast(size_t*)(p-size_t.sizeof);
1414 			auto oldmem = (p-size_t.sizeof)[0 .. oldsize+size_t.sizeof];
1415 			auto newmem = oldmem;
1416 			if (!s_driver.m_allocator.reallocate(newmem, newsize+size_t.sizeof))
1417 				return null;
1418 			*cast(size_t*)newmem.ptr = newsize;
1419 			return newmem.ptr + size_t.sizeof;
1420 		} catch (UncaughtException th) {
1421 			logWarn("Exception in lev_realloc: %s", th.msg);
1422 			return null;
1423 		}
1424 	}
1425 	void lev_free(void* p)
1426 	{
1427 		try {
1428 			auto size = *cast(size_t*)(p-size_t.sizeof);
1429 			auto mem = (p-size_t.sizeof)[0 .. size+size_t.sizeof];
1430 			s_driver.m_allocator.deallocate(mem);
1431 		} catch (UncaughtException th) {
1432 			logCritical("Exception in lev_free: %s", th.msg);
1433 			assert(false);
1434 		}
1435 	}
1436 
1437 	void* lev_alloc_mutex(uint locktype)
1438 	{
1439 		try {
1440 			auto ret = LevMutexAlloc.alloc();
1441 			if( locktype == EVTHREAD_LOCKTYPE_READWRITE ) ret.rwmutex = ReadWriteMutexAlloc.alloc();
1442 			else ret.mutex = MutexAlloc.alloc();
1443 			//logInfo("alloc mutex %s", cast(void*)ret);
1444 			debug if (!s_mutexesLock) s_mutexesLock = new Mutex;
1445 			debug synchronized (s_mutexesLock) s_mutexes[cast(void*)ret] = 0;
1446 			return ret;
1447 		} catch (UncaughtException th) {
1448 			logWarn("Exception in lev_alloc_mutex: %s", th.msg);
1449 			return null;
1450 		}
1451 	}
1452 
1453 	void lev_free_mutex(void* lock, uint locktype)
1454 	{
1455 		try {
1456 			import core.runtime;
1457 			//logInfo("free mutex %s: %s", cast(void*)lock, defaultTraceHandler());
1458 			debug synchronized (s_mutexesLock) {
1459 				auto pl = lock in s_mutexes;
1460 				assert(pl !is null);
1461 				assert(*pl == 0);
1462 				s_mutexes.remove(lock);
1463 			}
1464 			auto lm = cast(LevMutex*)lock;
1465 			if (lm.mutex) MutexAlloc.free(lm.mutex);
1466 			if (lm.rwmutex) ReadWriteMutexAlloc.free(lm.rwmutex);
1467 			LevMutexAlloc.free(lm);
1468 		} catch (UncaughtException th) {
1469 			logCritical("Exception in lev_free_mutex: %s", th.msg);
1470 			assert(false);
1471 		}
1472 	}
1473 
1474 	int lev_lock_mutex(uint mode, void* lock)
1475 	{
1476 		try {
1477 			//logInfo("lock mutex %s", cast(void*)lock);
1478 			debug synchronized (s_mutexesLock) {
1479 				auto pl = lock in s_mutexes;
1480 				assert(pl !is null, "Unknown lock handle");
1481 				(*pl)++;
1482 			}
1483 			auto mtx = cast(LevMutex*)lock;
1484 
1485 			assert(mtx !is null, "null lock");
1486 			assert(mtx.mutex !is null || mtx.rwmutex !is null, "lock contains no mutex");
1487 			if( mode & EVTHREAD_WRITE ){
1488 				if( mode & EVTHREAD_TRY ) return mtx.rwmutex.writer().tryLock() ? 0 : 1;
1489 				else mtx.rwmutex.writer().lock();
1490 			} else if( mode & EVTHREAD_READ ){
1491 				if( mode & EVTHREAD_TRY ) return mtx.rwmutex.reader().tryLock() ? 0 : 1;
1492 				else mtx.rwmutex.reader().lock();
1493 			} else {
1494 				assert(mtx.mutex !is null, "lock mutex is null");
1495 				if( mode & EVTHREAD_TRY ) return mtx.mutex.tryLock() ? 0 : 1;
1496 				else mtx.mutex.lock();
1497 			}
1498 			return 0;
1499 		} catch (UncaughtException th) {
1500 			logWarn("Exception in lev_lock_mutex: %s", th.msg);
1501 			return -1;
1502 		}
1503 	}
1504 
1505 	int lev_unlock_mutex(uint mode, void* lock)
1506 	{
1507 		try {
1508 			//logInfo("unlock mutex %s", cast(void*)lock);
1509 			debug synchronized (s_mutexesLock) {
1510 				auto pl = lock in s_mutexes;
1511 				assert(pl !is null, "Unknown lock handle");
1512 				assert(*pl > 0, "Unlocking unlocked mutex");
1513 				(*pl)--;
1514 			}
1515 
1516 			auto mtx = cast(LevMutex*)lock;
1517 
1518 			if( mode & EVTHREAD_WRITE ){
1519 				mtx.rwmutex.writer().unlock();
1520 			} else if( mode & EVTHREAD_READ ){
1521 				mtx.rwmutex.reader().unlock();
1522 			} else {
1523 				mtx.mutex.unlock();
1524 			}
1525 			return 0;
1526 		} catch (UncaughtException th ) {
1527 			logWarn("Exception in lev_unlock_mutex: %s", th.msg);
1528 			return -1;
1529 		}
1530 	}
1531 
1532 	void* lev_alloc_condition(uint condtype)
1533 	{
1534 		try {
1535 			return LevConditionAlloc.alloc();
1536 		} catch (UncaughtException th) {
1537 			logWarn("Exception in lev_alloc_condition: %s", th.msg);
1538 			return null;
1539 		}
1540 	}
1541 
1542 	void lev_free_condition(void* cond)
1543 	{
1544 		try {
1545 			auto lc = cast(LevCondition*)cond;
1546 			if (lc.cond) ConditionAlloc.free(lc.cond);
1547 			LevConditionAlloc.free(lc);
1548 		} catch (UncaughtException th) {
1549 			logCritical("Exception in lev_free_condition: %s", th.msg);
1550 			assert(false);
1551 		}
1552 	}
1553 
1554 	int lev_signal_condition(void* cond, int broadcast)
1555 	{
1556 		try {
1557 			auto c = cast(LevCondition*)cond;
1558 			if( c.cond ) c.cond.notifyAll();
1559 			return 0;
1560 		} catch (UncaughtException th) {
1561 			logWarn("Exception in lev_signal_condition: %s", th.msg);
1562 			return -1;
1563 		}
1564 	}
1565 
1566 	int lev_wait_condition(void* cond, void* lock, const(timeval)* timeout)
1567 	{
1568 		try {
1569 			auto c = cast(LevCondition*)cond;
1570 			if( c.mutex is null ) c.mutex = cast(LevMutex*)lock;
1571 			assert(c.mutex.mutex !is null); // RW mutexes are not supported for conditions!
1572 			assert(c.mutex is lock);
1573 			if( c.cond is null ) c.cond = ConditionAlloc.alloc(c.mutex.mutex);
1574 			if( timeout ){
1575 				if( !c.cond.wait(dur!"seconds"(timeout.tv_sec) + dur!"usecs"(timeout.tv_usec)) )
1576 					return 1;
1577 			} else c.cond.wait();
1578 			return 0;
1579 		} catch (UncaughtException th) {
1580 			logWarn("Exception in lev_wait_condition: %s", th.msg);
1581 			return -1;
1582 		}
1583 	}
1584 
1585 	c_ulong lev_get_thread_id()
1586 	{
1587 		try return cast(c_ulong)cast(void*)Thread.getThis();
1588 		catch (UncaughtException th) {
1589 			logWarn("Exception in lev_get_thread_id: %s", th.msg);
1590 			return 0;
1591 		}
1592 	}
1593 }
1594 
1595 package timeval toTimeVal(Duration dur)
1596 @safe {
1597 	timeval tvdur;
1598 	dur.split!("seconds", "usecs")(tvdur.tv_sec, tvdur.tv_usec);
1599 	return tvdur;
1600 }
1601 
1602 }