1 /**
2 	Multicasts an input stream to multiple output streams.
3 
4 	Copyright: © 2014-2020 Sönke Ludwig
5 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
6 	Authors: Eric Cornelius
7 */
8 module vibe.stream.multicast;
9 
10 import vibe.core.core;
11 import vibe.core.stream;
12 
13 import std.exception;
14 
15 
16 /** Creates a new multicast stream based on the given set of output streams.
17 */
18 MulticastStream!OutputStreams createMulticastStream(OutputStreams...)(OutputStreams output_streams)
19 	if (!is(OutputStreams[0] == MulticastMode))
20 {
21 	return MulticastStream!OutputStreams(output_streams, MulticastMode.serial);
22 }
23 /// ditto
24 MulticastStream!OutputStreams createMulticastStream(OutputStreams...)
25 	(MulticastMode mode, OutputStreams output_streams)
26 {
27 	return MulticastStream!OutputStreams(output_streams, mode);
28 }
29 
30 unittest {
31 	import vibe.stream.memory : createMemoryOutputStream;
32 	import std.traits : EnumMembers;
33 
34 	createMulticastStream(nullSink, nullSink);
35 
36 	ubyte[] bts = [1, 2, 3, 4];
37 
38 	foreach (m; EnumMembers!MulticastMode) {
39 		auto s1 = createMemoryOutputStream();
40 		auto s2 = createMemoryOutputStream();
41 		auto ms = createMulticastStream(m, s1, s2);
42 		ms.write(bts[0 .. 3]);
43 		ms.write(bts[3 .. 4]);
44 		ms.flush();
45 		assert(s1.data == bts);
46 		assert(s2.data == bts);
47 	}
48 }
49 
50 
51 struct MulticastStream(OutputStreams...) {
52 	import std.algorithm : swap;
53 
54 	private {
55 		OutputStreams m_outputs;
56 		Task[] m_tasks;
57 	}
58 
59 	private this(ref OutputStreams outputs, MulticastMode mode)
60 	{
61 		foreach (i, T; OutputStreams)
62 			swap(outputs[i], m_outputs[i]);
63 
64 		if (mode == MulticastMode.parallel)
65 			m_tasks.length = outputs.length - 1;
66 	}
67 
68 	void finalize()
69 	@safe @blocking {
70 		flush();
71 	}
72 
73 	void flush()
74 	@safe @blocking {
75 		if (m_tasks.length > 0) {
76 			Exception ex;
77 			foreach (i, T; OutputStreams[1 .. $])
78 				m_tasks[i] = runTask({
79 					try m_outputs[i+1].flush();
80 					catch (Exception e) ex = e;
81 				});
82 			m_outputs[0].flush();
83 			foreach (t; m_tasks) t.join();
84 			if (ex) throw ex;
85 		} else {
86 			foreach (i, T; OutputStreams)
87 				m_outputs[i].flush();
88 		}
89 	}
90 
91 	size_t write(in ubyte[] bytes, IOMode mode)
92 	@safe @blocking {
93 		if (!m_outputs.length) return bytes.length;
94 
95 		if (m_tasks.length > 0) {
96 			Exception ex;
97 			foreach (i, T; OutputStreams[1 .. $])
98 				m_tasks[i] = runTask({
99 					try m_outputs[i+1].write(bytes, mode);
100 					catch (Exception e) ex = e;
101 				});
102 			auto ret = m_outputs[0].write(bytes, mode);
103 			foreach (t; m_tasks) t.join();
104 			if (ex) throw ex;
105 			return ret;
106 		} else {
107 			auto ret = m_outputs[0].write(bytes, mode);
108 			foreach (i, T; OutputStreams[1 .. $])
109 				m_outputs[i+1].write(bytes[0 .. ret]);
110 			return ret;
111 		}
112 	}
113 	void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); }
114 	void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); }
115 }
116 
117 enum MulticastMode {
118 	/// Output streams are written in serial order
119 	serial,
120 	/// Output streams are written in parallel using multiple tasks
121 	parallel
122 }
123 
124 mixin validateOutputStream!(MulticastStream!NullOutputStream);