1 /**
2 	TCP side of  the libevent driver
3 
4 	For the base driver implementation, see `vibe.core.drivers.libevent2`.
5 
6 	Copyright: © 2012-2015 RejectedSoftware e.K.
7 	Authors: Sönke Ludwig
8 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
9 */
10 module vibe.core.drivers.libevent2_tcp;
11 
12 version(VibeLibeventDriver)
13 {
14 
15 public import vibe.core.core;
16 
17 import vibe.core.log;
18 import vibe.core.drivers.libevent2;
19 import vibe.core.drivers.utils;
20 import vibe.internal.freelistref;
21 
22 import deimos.event2.buffer;
23 import deimos.event2.bufferevent;
24 import deimos.event2.bufferevent_ssl;
25 import deimos.event2.event;
26 import deimos.event2.util;
27 
28 import std.algorithm;
29 import std.encoding : sanitize;
30 import std.exception;
31 import std.conv;
32 import std.string;
33 
34 import core.stdc.errno;
35 import core.thread;
36 import core.sys.posix.netinet.in_;
37 import core.sys.posix.netinet.tcp;
38 import core.sys.posix.sys.socket;
39 
40 
41 private {
42 	version (Windows) {
43 		import core.sys.windows.winsock2;
44 		// make some neccessary parts of the socket interface public
45 		alias in6_addr = core.sys.windows.winsock2.in6_addr;
46 		alias INADDR_ANY = core.sys.windows.winsock2.INADDR_ANY;
47 		alias IN6ADDR_ANY = core.sys.windows.winsock2.IN6ADDR_ANY;
48 
49 		enum EWOULDBLOCK = WSAEWOULDBLOCK;
50 	} else {
51 		alias in6_addr = core.sys.posix.netinet.in_.in6_addr;
52 		alias IN6ADDR_ANY = core.sys.posix.netinet.in_.in6addr_any;
53 		alias INADDR_ANY = core.sys.posix.netinet.in_.INADDR_ANY;
54 		alias TCP_NODELAY = core.sys.posix.netinet.tcp.TCP_NODELAY;
55 	}
56 }
57 
58 package final class Libevent2TCPConnection : TCPConnection {
59 @safe:
60 
61 	import vibe.utils.array : FixedRingBuffer;
62 	private {
63 		bool m_timeout_triggered;
64 		TCPContext* m_ctx;
65 		FixedRingBuffer!(ubyte, 4096, false) m_readBuffer;
66 		string m_peerAddress;
67 		bool m_tcpNoDelay = false;
68 		bool m_tcpKeepAlive = false;
69 		Duration m_readTimeout;
70 		char[64] m_peerAddressBuf;
71 		NetworkAddress m_localAddress, m_remoteAddress;
72 		event* m_waitDataEvent;
73 	}
74 
75 	this(TCPContext* ctx)
76 	{
77 		m_ctx = ctx;
78 		m_waitDataEvent = () @trusted { return event_new(m_ctx.eventLoop, -1, 0, &onTimeout, cast(void*)this); } ();
79 
80 		assert(!amOwner());
81 
82 		m_localAddress = ctx.local_addr;
83 		m_remoteAddress = ctx.remote_addr;
84 
85 		void* ptr;
86 		switch (ctx.remote_addr.family) {
87 			default: throw new Exception("Unsupported address family.");
88 			case AF_INET: ptr = &ctx.remote_addr.sockAddrInet4.sin_addr; break;
89 			case AF_INET6: ptr = &ctx.remote_addr.sockAddrInet6.sin6_addr; break;
90 			version (Posix) {
91 				case AF_UNIX: ptr = &ctx.remote_addr.sockAddrUnix.sun_path; break;
92 			}
93 		}
94 
95 		if (() @trusted { return evutil_inet_ntop(ctx.remote_addr.family, ptr, m_peerAddressBuf.ptr, m_peerAddressBuf.length); } () !is null)
96 			m_peerAddress = () @trusted { return cast(string)m_peerAddressBuf[0 .. m_peerAddressBuf[].indexOf('\0')]; } ();
97 
98 		() @trusted {
99 			bufferevent_setwatermark(m_ctx.event, EV_WRITE, 4096, 65536);
100 			bufferevent_setwatermark(m_ctx.event, EV_READ, 0, 65536);
101 		} ();
102 	}
103 
104 	/*~this()
105 	{
106 		//assert(m_ctx is null, "Leaking TCPContext because it has not been cleaned up and we are not allowed to touch the GC in finalizers..");
107 	}*/
108 
109 	@property void tcpNoDelay(bool enabled)
110 	{
111 		m_tcpNoDelay = enabled;
112 		auto fd = m_ctx.socketfd;
113 		int opt = enabled;
114 		assert(fd <= int.max, "Socket descriptor > int.max");
115 		() @trusted { setsockopt(cast(int)fd, IPPROTO_TCP, TCP_NODELAY, cast(char*)&opt, opt.sizeof); } ();
116 	}
117 	@property bool tcpNoDelay() const { return m_tcpNoDelay; }
118 
119 	@property void readTimeout(Duration v)
120 	{
121 		m_readTimeout = v;
122 		if( v == dur!"seconds"(0) ){
123 			() @trusted { bufferevent_set_timeouts(m_ctx.event, null, null); } ();
124 		} else {
125 			assert(v.total!"seconds" <= int.max);
126 			timeval toread = v.toTimeVal();
127 			() @trusted { bufferevent_set_timeouts(m_ctx.event, &toread, null); } ();
128 		}
129 	}
130 	@property Duration readTimeout() const { return m_readTimeout; }
131 
132 	@property void keepAlive(bool enable)
133 	{
134 		m_tcpKeepAlive = enable;
135 		auto fd = m_ctx.socketfd;
136 		ubyte opt = enable;
137 		assert(fd <= int.max, "Socket descriptor > int.max");
138 		() @trusted { setsockopt(cast(int)fd, SOL_SOCKET, SO_KEEPALIVE, &opt, opt.sizeof); } ();
139 	}
140 	@property bool keepAlive() const { return m_tcpKeepAlive; }
141 
142 	@property NetworkAddress localAddress() const { return m_localAddress; }
143 	@property NetworkAddress remoteAddress() const { return m_remoteAddress; }
144 
145 	private void acquire()
146 	@safe {
147 		assert(m_ctx, "Trying to acquire a closed TCP connection.");
148 		assert(m_ctx.readOwner == Task() && m_ctx.writeOwner == Task(), "Trying to acquire a TCP connection that is currently owned.");
149 		m_ctx.readOwner = m_ctx.writeOwner = Task.getThis();
150 	}
151 
152 	private void release()
153 	@safe {
154 		if( !m_ctx ) return;
155 		assert(m_ctx.readOwner != Task() && m_ctx.writeOwner != Task(), "Trying to release a TCP connection that is not owned.");
156 		assert(m_ctx.readOwner == Task.getThis() && m_ctx.readOwner == m_ctx.writeOwner, "Trying to release a foreign TCP connection.");
157 		m_ctx.readOwner = m_ctx.writeOwner = Task();
158 	}
159 
160 	private bool amOwner()
161 	@safe {
162 		return m_ctx !is null && m_ctx.readOwner != Task() && m_ctx.readOwner == Task.getThis() && m_ctx.readOwner == m_ctx.writeOwner;
163 	}
164 
165 	/// Closes the connection.
166 	void close()
167 	{
168 		logDebug("TCP close request %s %s", m_ctx !is null, m_ctx ? m_ctx.state : ConnectionState.open);
169 		if (!m_ctx || m_ctx.state == ConnectionState.activeClose) return;
170 
171 		if (!getThreadLibeventEventLoop()) {
172 			import std.stdio;
173 			() @trusted { stderr.writefln("Warning: Attempt to close dangling TCP connection to %s at shutdown. "
174 				~ "Please avoid closing connections in GC finalizers.", m_remoteAddress); } ();
175 			return;
176 		}
177 
178 		// set the closing flag
179 		m_ctx.state = ConnectionState.activeClose;
180 
181 		// resume any reader, so that the read operation can be ended with a failure
182 		while (m_ctx.readOwner != Task.init) {
183 			logTrace("resuming reader first");
184 			m_ctx.core.yieldAndResumeTask(m_ctx.readOwner);
185 			logTrace("back (%s)!", m_ctx !is null);
186 			// test if the resumed task has already closed the connection
187 			if (!m_ctx) return;
188 		}
189 
190 		// acquire read+write access
191 		acquire();
192 
193 		scope (exit) cleanup();
194 
195 		if (m_ctx.event) {
196 			logDiagnostic("Actively closing TCP connection");
197 			auto fd = m_ctx.socketfd;
198 
199 			scope (exit) () @trusted {
200 				version(Windows) shutdown(m_ctx.socketfd, SD_SEND);
201 				else shutdown(m_ctx.socketfd, SHUT_WR);
202 				if (m_ctx.event) bufferevent_free(m_ctx.event);
203 				logTrace("...socket %d closed.", fd);
204 			} ();
205 
206 			m_ctx.shutdown = true;
207 			() @trusted {
208 				bufferevent_setwatermark(m_ctx.event, EV_WRITE, 1, 0);
209 				bufferevent_flush(m_ctx.event, EV_WRITE, bufferevent_flush_mode.BEV_FINISHED);
210 			} ();
211 			logTrace("Closing socket %d...", fd);
212 			auto buf = () @trusted { return bufferevent_get_output(m_ctx.event); } ();
213 			while (m_ctx.event && () @trusted { return evbuffer_get_length(buf); } () > 0)
214 				m_ctx.core.yieldForEvent();
215 		}
216 	}
217 
218 	/// The 'connected' status of this connection
219 	@property bool connected() const { return m_ctx !is null && m_ctx.state == ConnectionState.open && m_ctx.event !is null; }
220 
221 	@property bool empty() { return leastSize == 0; }
222 
223 	@property ulong leastSize()
224 	{
225 		if (!m_ctx || !m_ctx.event) return 0;
226 		if (m_readBuffer.length) {
227 			checkReader();
228 			return m_readBuffer.length;
229 		}
230 		acquireReader();
231 		scope(exit) releaseReader();
232 		fillReadBuffer(true, false);
233 		return m_readBuffer.length;
234 	}
235 
236 	@property bool dataAvailableForRead()
237 	{
238 		if (!m_ctx || !m_ctx.event) return false;
239 		checkReader();
240 		if (!m_readBuffer.length)
241 			fillReadBuffer(false);
242 		return m_readBuffer.length > 0;
243 	}
244 
245 	@property string peerAddress() const { return m_peerAddress; }
246 
247 	const(ubyte)[] peek()
248 	{
249 		if (!m_ctx || !m_ctx.event) return null;
250 		checkReader();
251 		if (!m_readBuffer.length)
252 			fillReadBuffer(false);
253 		return m_readBuffer.peek();
254 	}
255 
256 	void skip(ulong count)
257 	{
258 		checkConnected(false);
259 
260 		if (m_readBuffer.length >= count) {
261 			checkReader();
262 			m_readBuffer.popFrontN(cast(size_t)count);
263 			if (m_readBuffer.empty) m_readBuffer.clear(); // start filling at index 0 again
264 			return;
265 		}
266 
267 		acquireReader();
268 		scope(exit) releaseReader();
269 
270 		while (true) {
271 			auto nbytes = min(count, m_readBuffer.length);
272 			m_readBuffer.popFrontN(nbytes);
273 			if (m_readBuffer.empty) m_readBuffer.clear(); // start filling at index 0 again
274 			count -= nbytes;
275 
276 			if (!count) break;
277 
278 			fillReadBuffer(true);
279 			checkConnected(false);
280 		}
281 	}
282 
283 	/** Reads as many bytes as 'dst' can hold.
284 	*/
285 	size_t read(scope ubyte[] dst, IOMode)
286 	{
287 		checkConnected(false);
288 
289 		if (m_readBuffer.length >= dst.length) {
290 			checkReader();
291 			m_readBuffer.read(dst);
292 			if (m_readBuffer.empty) m_readBuffer.clear(); // start filling at index 0 again
293 			return dst.length;
294 		}
295 
296 		acquireReader();
297 		scope(exit) releaseReader();
298 
299 		size_t len = dst.length;
300 
301 		while (true) {
302 			auto nbytes = min(dst.length, m_readBuffer.length);
303 			m_readBuffer.read(dst[0 .. nbytes]);
304 			if (m_readBuffer.empty) m_readBuffer.clear(); // start filling at index 0 again
305 			dst = dst[nbytes .. $];
306 
307 			if (!dst.length) break;
308 
309 			fillReadBuffer(true);
310 			checkConnected(false);
311 		}
312 		logTrace("read data");
313 
314 		return len;
315 	}
316 
317 	bool waitForData(Duration timeout)
318 	{
319 		if (timeout == 0.seconds)
320 			logDebug("Warning: use Duration.max as an argument to waitForData() to wait infinitely, not 0.seconds.");
321 
322 		if (dataAvailableForRead) return true;
323 		if (!m_ctx || m_ctx.state != ConnectionState.open) return false;
324 
325 		acquireReader();
326 		scope(exit) releaseReader();
327 		m_timeout_triggered = false;
328 
329 		if (timeout != 0.seconds && timeout != Duration.max) { // 0.seconds is for compatibility with old code
330 			assert(timeout.total!"seconds"() <= int.max, "Timeouts must not be larger than int.max seconds!");
331 			timeval t = timeout.toTimeVal();
332 			logTrace("add timeout event with %d/%d", t.tv_sec, t.tv_usec);
333 			() @trusted { event_add(m_waitDataEvent, &t); } ();
334 		}
335 
336 		logTrace("wait for data");
337 		while (m_ctx && m_ctx.event) {
338 			if (m_readBuffer.length) return true;
339 			if (m_ctx.state != ConnectionState.open) return false;
340 			try {
341 				if (fillReadBuffer(true, false, true))
342 					return false;
343 			} catch (Exception e) {
344 				logDiagnostic("Connection error during waitForData: %s", e.msg);
345 			}
346 		}
347 
348 		return false;
349 	}
350 
351 	alias write = Stream.write;
352 
353 	/** Writes the given byte array.
354 	*/
355 	size_t write(in ubyte[] bytes, IOMode)
356 	{
357 		checkConnected();
358 		acquireWriter();
359 		scope(exit) releaseWriter();
360 
361 		if (!bytes.length) return 0;
362 		//logTrace("evbuffer_add (fd %d): %s", m_ctx.socketfd, bytes);
363 		//logTrace("evbuffer_add (fd %d): <%s>", m_ctx.socketfd, cast(string)bytes);
364 		logTrace("evbuffer_add (fd %d): %d B", m_ctx.socketfd, bytes.length);
365 		auto outbuf = () @trusted { return bufferevent_get_output(m_ctx.event); } ();
366 		if (() @trusted { return bufferevent_write(m_ctx.event, cast(char*)bytes.ptr, bytes.length); } () != 0 )
367 			throw new Exception("Failed to write data to buffer");
368 
369 		// wait for the data to be written up the the low watermark
370 		while (() @trusted { return evbuffer_get_length(outbuf); } () > 4096) {
371 			rawYield();
372 			checkConnected();
373 		}
374 
375 		return bytes.length;
376 	}
377 
378 	/** Causes any buffered data to be written.
379 	*/
380 	void flush()
381 	{
382 		checkConnected();
383 		acquireWriter();
384 		scope(exit) releaseWriter();
385 		logTrace("bufferevent_flush");
386 		() @trusted { bufferevent_flush(m_ctx.event, EV_WRITE, bufferevent_flush_mode.BEV_NORMAL); } ();
387 	}
388 
389 	void finalize()
390 	{
391 		flush();
392 	}
393 
394 	private bool fillReadBuffer(bool block, bool throw_on_fail = true, bool wait_for_timeout = false)
395 	@safe {
396 		if (m_readBuffer.length) return false;
397 		m_readBuffer.clear();
398 		assert(m_readBuffer.peekDst.length > 0);
399 		while (m_ctx && m_ctx.event) {
400 			auto nbytes = () @trusted { return bufferevent_read(m_ctx.event, m_readBuffer.peekDst.ptr, m_readBuffer.peekDst.length); } ();
401 			m_readBuffer.putN(nbytes);
402 			if (m_readBuffer.length || !block) break;
403 			if (throw_on_fail) checkConnected(false);
404 			else if (!m_ctx || !m_ctx.event) return false;
405 			else if (m_ctx.state != ConnectionState.open
406 				&& () @trusted { return evbuffer_get_length(bufferevent_get_input(m_ctx.event)); } () == 0)
407 					return false;
408 			if (wait_for_timeout && m_timeout_triggered) return true;
409 			m_ctx.core.yieldForEvent();
410 		}
411 		return false;
412 	}
413 
414 	private void checkReader() @safe { assert(m_ctx.readOwner == Task(), "Acquiring reader of already owned connection."); }
415 	private void acquireReader() @safe { checkReader(); m_ctx.readOwner = Task.getThis(); }
416 	private void releaseReader() @safe { if (!m_ctx) return; assert(m_ctx.readOwner == Task.getThis(), "Releasing reader of unowned connection."); m_ctx.readOwner = Task(); }
417 
418 	private void acquireWriter() @safe { assert(m_ctx.writeOwner == Task(), "Acquiring writer of already owned connection."); m_ctx.writeOwner = Task.getThis(); }
419 	private void releaseWriter() @safe { if (!m_ctx) return; assert(m_ctx.writeOwner == Task.getThis(), "Releasing reader of already unowned connection."); m_ctx.writeOwner = Task(); }
420 
421 	private void checkConnected(bool write = true)
422 	@safe {
423 		enforce(m_ctx !is null, "Operating on closed TCPConnection.");
424 		if (m_ctx.event is null) {
425 			cleanup();
426 			throw new Exception(format("Connection error while %s TCPConnection.", write ? "writing to" : "reading from"));
427 		}
428 		if (m_ctx.state == ConnectionState.activeClose) throw new Exception("Connection was actively closed.");
429 		enforce (!write || m_ctx.state == ConnectionState.open, "Remote hung up while writing to TCPConnection.");
430 		if (!write && m_ctx.state == ConnectionState.passiveClose) {
431 			auto buf = () @trusted { return bufferevent_get_input(m_ctx.event); } ();
432 			auto data_left = m_readBuffer.length > 0 || () @trusted { return evbuffer_get_length(buf); } () > 0;
433 			enforce(data_left, "Remote hung up while reading from TCPConnection.");
434 		}
435 	}
436 
437 	private void cleanup()
438 	@safe {
439 		() @trusted {
440 			event_free(m_waitDataEvent);
441 			TCPContextAlloc.free(m_ctx);
442 		} ();
443 		m_ctx = null;
444 	}
445 }
446 
447 final class Libevent2TCPListener : TCPListener {
448 @safe:
449 
450 	private {
451 		TCPContext*[] m_ctx;
452 		NetworkAddress m_bindAddress;
453 	}
454 
455 	this(NetworkAddress bind_address)
456 	{
457 		m_bindAddress = bind_address;
458 	}
459 
460 	@property NetworkAddress bindAddress()
461 	{
462 		return m_bindAddress;
463 	}
464 
465 	void addContext(TCPContext* ctx)
466 	{
467 		synchronized(this) m_ctx ~= ctx;
468 	}
469 
470 	void stopListening()
471 	{
472 		synchronized(this)
473 		{
474 			foreach (ctx; m_ctx) () @trusted {
475 				event_free(ctx.listenEvent);
476 				evutil_closesocket(ctx.socketfd);
477 				TCPContextAlloc.free(ctx);
478 			} ();
479 			m_ctx = null;
480 		}
481 	}
482 }
483 
484 
485 /**************************************************************************************************/
486 /* Private types                                                                                  */
487 /**************************************************************************************************/
488 
489 package struct TCPContext
490 {
491 	@safe:
492 
493 	this(DriverCore c, event_base* evbase, int sock, bufferevent* evt, NetworkAddress bindaddr, NetworkAddress peeraddr){
494 		core = c;
495 		eventLoop = evbase;
496 		socketfd = sock;
497 		event = evt;
498 		local_addr = bindaddr;
499 		remote_addr = peeraddr;
500 	}
501 
502 	this(DriverCore c, event_base* evbase, int sock, bufferevent* evt){
503 		core = c;
504 		eventLoop = evbase;
505 		socketfd = sock;
506 		event = evt;
507 	}
508 
509 	~this()
510 	{
511 		magic__ = 0;
512 	}
513 
514 	void checkForException() {
515 		if (auto ex = this.exception) {
516 			this.exception = null;
517 			throw ex;
518 		}
519 	}
520 
521 	enum MAGIC = 0x1F3EC272;
522 	uint magic__ = MAGIC;
523 	DriverCore core;
524 	event_base* eventLoop;
525 	void delegate(TCPConnection conn) connectionCallback;
526 	bufferevent* event;
527 	deimos.event2.event_struct.event* listenEvent;
528 	NetworkAddress local_addr;
529 	NetworkAddress remote_addr;
530 	bool shutdown = false;
531 	int socketfd = -1;
532 	int status = 0;
533 	const(char)* statusMessage;
534 	Task readOwner;
535 	Task writeOwner;
536 	Exception exception; // set during onSocketEvent calls that were emitted synchronously
537 	TCPListenOptions listenOptions;
538 	ConnectionState state;
539 }
540 alias TCPContextAlloc = FreeListObjectAlloc!(TCPContext, false, true);
541 
542 package enum ConnectionState {
543 	open,         // connection CTR and CTS
544 	activeClose,  // TCPConnection.close() was called
545 	passiveClose, // remote has hung up
546 }
547 
548 /**************************************************************************************************/
549 /* Private functions                                                                              */
550 /**************************************************************************************************/
551 
552 package nothrow extern(C)
553 {
554 	version (VibeDebugCatchAll) alias UncaughtException = Throwable;
555 	else alias UncaughtException = Exception;
556 
557 	// should be a nested static struct in onConnect, but that triggers an ICE in ldc2-0.14.0
558 	private extern(D) struct ClientTask {
559 		TCPContext* listen_ctx;
560 		NetworkAddress bind_addr;
561 		NetworkAddress remote_addr;
562 		int sockfd;
563 		TCPListenOptions options;
564 
565 		void execute()
566 		{
567 			assert(sockfd > 0);
568 			if( evutil_make_socket_nonblocking(sockfd) ){
569 				logError("Error setting non-blocking I/O on an incoming connection.");
570 			}
571 
572 			auto eventloop = getThreadLibeventEventLoop();
573 			auto drivercore = getThreadLibeventDriverCore();
574 
575 			// Initialize a buffered I/O event
576 			auto buf_event = bufferevent_socket_new(eventloop, sockfd, bufferevent_options.BEV_OPT_CLOSE_ON_FREE);
577 			if( !buf_event ){
578 				logError("Error initializing buffered I/O event for fd %d.", sockfd);
579 				return;
580 			}
581 
582 			auto client_ctx = TCPContextAlloc.alloc(drivercore, eventloop, sockfd, buf_event, bind_addr, remote_addr);
583 			assert(client_ctx.event !is null, "event is null although it was just != null?");
584 			bufferevent_setcb(buf_event, &onSocketRead, &onSocketWrite, &onSocketEvent, client_ctx);
585 			if( bufferevent_enable(buf_event, EV_READ|EV_WRITE) ){
586 				bufferevent_free(buf_event);
587 				TCPContextAlloc.free(client_ctx);
588 				logError("Error enabling buffered I/O event for fd %d.", sockfd);
589 				return;
590 			}
591 
592 			assert(client_ctx.event !is null, "Client task called without event!?");
593 			if (options & TCPListenOptions.disableAutoClose) {
594 				auto conn = new Libevent2TCPConnection(client_ctx);
595 				assert(conn.connected, "Connection closed directly after accept?!");
596 				logDebug("start task (fd %d).", client_ctx.socketfd);
597 				try {
598 					listen_ctx.connectionCallback(conn);
599 					logDebug("task out (fd %d).", client_ctx.socketfd);
600 				} catch (Exception e) {
601 					logWarn("Handling of connection failed: %s", e.msg);
602 					logDiagnostic("%s", e.toString().sanitize);
603 				} finally {
604 					logDebug("task finished.");
605 					FreeListObjectAlloc!ClientTask.free(&this);
606 				}
607 			} else {
608 				auto conn = FreeListRef!Libevent2TCPConnection(client_ctx);
609 				assert(conn.connected, "Connection closed directly after accept?!");
610 				logDebug("start task (fd %d).", client_ctx.socketfd);
611 				try {
612 					listen_ctx.connectionCallback(conn);
613 					logDebug("task out (fd %d).", client_ctx.socketfd);
614 				} catch (Exception e) {
615 					logWarn("Handling of connection failed: %s", e.msg);
616 					logDiagnostic("%s", e.toString().sanitize);
617 				} finally {
618 					logDebug("task finished.");
619 					FreeListObjectAlloc!ClientTask.free(&this);
620 					conn.close();
621 				}
622 			}
623 		}
624 	}
625 
626 	void onConnect(evutil_socket_t listenfd, short evtype, void *arg)
627 	{
628 		logTrace("connect callback");
629 		auto ctx = cast(TCPContext*)arg;
630 		assert(ctx.magic__ == TCPContext.MAGIC);
631 
632 		if( !(evtype & EV_READ) ){
633 			logError("Unknown event type in connect callback: 0x%hx", evtype);
634 			return;
635 		}
636 
637 		try {
638 			// Accept and configure incoming connections (up to 10 connections in one go)
639 			foreach( i; 0 .. 10 ){
640 				logTrace("accept");
641 				assert(listenfd < int.max, "Listen socket descriptor >= int.max?!");
642 				sockaddr_in6 remote_addr;
643 				socklen_t addrlen = remote_addr.sizeof;
644 				auto sockfd_raw = accept(cast(int)listenfd, cast(sockaddr*)&remote_addr, &addrlen);
645 				logDebug("FD: %s", sockfd_raw);
646 				static if (typeof(sockfd_raw).max > int.max) assert(sockfd_raw <= int.max || sockfd_raw == ~0);
647 				auto sockfd = cast(int)sockfd_raw;
648 				logTrace("accepted %d", sockfd);
649 				if (sockfd == -1) {
650 					version(Windows) auto err = evutil_socket_geterror(sockfd);
651 					else auto err = errno;
652 					if( err != EWOULDBLOCK && err != EAGAIN && err != 0 ){
653 						version(Windows)
654 							logError("Error accepting an incoming connection: %s", to!string(evutil_socket_error_to_string(err)));
655 						else
656 							logError("Error accepting an incoming connection: %d", err);
657 					}
658 					break;
659 				}
660 
661 				auto task = FreeListObjectAlloc!ClientTask.alloc();
662 				task.listen_ctx = ctx;
663 				task.bind_addr = ctx.local_addr;
664 				*cast(sockaddr_in6*)task.remote_addr.sockAddr = remote_addr;
665 				task.sockfd = sockfd;
666 				task.options = ctx.listenOptions;
667 
668 				runTask(&task.execute);
669 			}
670 		} catch (UncaughtException e) {
671 			logWarn("Got exception while accepting new connections: %s", e.msg);
672 			try logDebug("Full error: %s", e.toString().sanitize());
673 			catch (Throwable) {}
674 		}
675 
676 		logTrace("handled incoming connections...");
677 	}
678 
679 	void onSocketRead(bufferevent *buf_event, void *arg)
680 	{
681 		auto ctx = cast(TCPContext*)arg;
682 		assert(ctx.magic__ == TCPContext.MAGIC);
683 		logTrace("socket %d read event!", ctx.socketfd);
684 
685 		auto f = ctx.readOwner;
686 		try {
687 			if (f && f.running && !ctx.core.isScheduledForResume(f))
688 				ctx.core.resumeTask(f);
689 		} catch (UncaughtException e) {
690 			logWarn("Got exception when resuming task onSocketRead: %s", e.msg);
691 		}
692 	}
693 
694 	void onSocketWrite(bufferevent *buf_event, void *arg)
695 	{
696 		try {
697 			auto ctx = cast(TCPContext*)arg;
698 			assert(ctx.magic__ == TCPContext.MAGIC);
699 			assert(ctx.event is buf_event, "Write event on bufferevent that does not match the TCPContext");
700 			logTrace("socket %d write event (%s)!", ctx.socketfd, ctx.shutdown);
701 			if (ctx.writeOwner != Task.init && ctx.writeOwner.running && !ctx.core.isScheduledForResume(ctx.writeOwner)) {
702 				bufferevent_flush(buf_event, EV_WRITE, bufferevent_flush_mode.BEV_FLUSH);
703 				ctx.core.resumeTask(ctx.writeOwner);
704 			}
705 		} catch (UncaughtException e) {
706 			logWarn("Got exception when resuming task onSocketRead: %s", e.msg);
707 		}
708 	}
709 
710 	void onSocketEvent(bufferevent *buf_event, short status, void *arg)
711 	{
712 		try {
713 			auto ctx = cast(TCPContext*)arg;
714 			assert(ctx.magic__ == TCPContext.MAGIC);
715 			ctx.status = status;
716 			logDebug("Socket event on fd %d: %d (%s vs %s)", ctx.socketfd, status, cast(void*)buf_event, cast(void*)ctx.event);
717 			assert(ctx.event is buf_event, "Status event on bufferevent that does not match the TCPContext");
718 
719 			Exception ex;
720 			bool free_event = false;
721 
722 			string errorMessage;
723 			if (status & BEV_EVENT_EOF) {
724 				logDebug("Connection was closed by remote peer (fd %d).", ctx.socketfd);
725 				if (ctx.state != ConnectionState.activeClose)
726 					ctx.state = ConnectionState.passiveClose;
727 				evbuffer* buf = bufferevent_get_input(buf_event);
728 				if (evbuffer_get_length(buf) == 0) free_event = true;
729 			} else if (status & BEV_EVENT_TIMEOUT) {
730 				logDebug("Remote host on fd %d timed out.", ctx.socketfd);
731 				free_event = true;
732 			} else if (status & BEV_EVENT_ERROR) {
733 				//auto msg = format("Error %s socket %s",
734 				//	(status & BEV_EVENT_READING) ? "reading from" : (status & BEV_EVENT_WRITING) ? "writing to" : "on",
735 				//	ctx.socketfd);
736 				//ex = new SystemSocketException(msg);
737 				ctx.statusMessage = evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
738 				free_event = true;
739 			}
740 
741 			if (free_event) {
742 				bufferevent_free(buf_event);
743 				ctx.event = null;
744 			}
745 
746 			ctx.core.eventException = ex;
747 
748 			// ctx can be destroyed after resuming the reader, so get everything that is required from it first
749 			auto reader = ctx.readOwner;
750 			auto writer = ctx.writeOwner;
751 			auto core = ctx.core;
752 
753 			if (ex && (reader && reader.fiber.state == Fiber.State.EXEC || writer && writer.fiber.state == Fiber.State.EXEC))
754 				ctx.exception = ex;
755 
756 			if (writer && writer.running && writer.fiber.state != Fiber.State.EXEC) {
757 				logTrace("resuming corresponding write task%s...", ex is null ? "" : " with exception");
758 				core.resumeTask(writer, ex);
759 			}
760 
761 			if (reader && writer != reader && reader.running && !core.isScheduledForResume(reader) && reader.fiber.state != Fiber.State.EXEC) {
762 				logTrace("resuming corresponding read task%s...", ex is null ? "" : " with exception");
763 				core.resumeTask(reader, ex);
764 			}
765 		} catch (UncaughtException e) {
766 			logWarn("Got exception when resuming task onSocketEvent: %s", e.msg);
767 			try logDiagnostic("Full error: %s", e.toString().sanitize); catch (Throwable) {}
768 		}
769 	}
770 
771 	private extern(C) void onTimeout(evutil_socket_t, short events, void* userptr)
772 	{
773 		try {
774 			logTrace("data wait timeout");
775 			auto conn = cast(Libevent2TCPConnection)userptr;
776 			conn.m_timeout_triggered = true;
777 			if (conn.m_ctx) {
778 				if (conn.m_ctx.readOwner) conn.m_ctx.core.resumeTask(conn.m_ctx.readOwner);
779 			} else logDebug("waitForData timeout after connection was closed!");
780 		} catch (UncaughtException e) {
781 			logWarn("Exception onTimeout: %s", e.msg);
782 		}
783 	}
784 }
785 
786 /// private
787 package void removeFromArray(T)(ref T[] array, T item)
788 {
789 	foreach( i; 0 .. array.length )
790 		if( array[i] is item ){
791 			array = array[0 .. i] ~ array[i+1 .. $];
792 			return;
793 		}
794 }
795 
796 }