1 /** 2 Multicasts an input stream to multiple output streams. 3 4 Copyright: © 2014-2020 Sönke Ludwig 5 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 6 Authors: Eric Cornelius 7 */ 8 module vibe.stream.multicast; 9 10 import vibe.core.core; 11 import vibe.core.stream; 12 13 import std.exception; 14 15 16 /** Creates a new multicast stream based on the given set of output streams. 17 */ 18 MulticastStream!OutputStreams createMulticastStream(OutputStreams...)(OutputStreams output_streams) 19 if (!is(OutputStreams[0] == MulticastMode)) 20 { 21 return MulticastStream!OutputStreams(output_streams, MulticastMode.serial); 22 } 23 /// ditto 24 MulticastStream!OutputStreams createMulticastStream(OutputStreams...) 25 (MulticastMode mode, OutputStreams output_streams) 26 { 27 return MulticastStream!OutputStreams(output_streams, mode); 28 } 29 30 unittest { 31 import vibe.stream.memory : createMemoryOutputStream; 32 import std.traits : EnumMembers; 33 34 createMulticastStream(nullSink, nullSink); 35 36 ubyte[] bts = [1, 2, 3, 4]; 37 38 foreach (m; EnumMembers!MulticastMode) { 39 auto s1 = createMemoryOutputStream(); 40 auto s2 = createMemoryOutputStream(); 41 auto ms = createMulticastStream(m, s1, s2); 42 ms.write(bts[0 .. 3]); 43 ms.write(bts[3 .. 4]); 44 ms.flush(); 45 assert(s1.data == bts); 46 assert(s2.data == bts); 47 } 48 } 49 50 51 struct MulticastStream(OutputStreams...) { 52 import std.algorithm : swap; 53 54 private { 55 OutputStreams m_outputs; 56 Task[] m_tasks; 57 } 58 59 private this(ref OutputStreams outputs, MulticastMode mode) 60 { 61 foreach (i, T; OutputStreams) 62 swap(outputs[i], m_outputs[i]); 63 64 if (mode == MulticastMode.parallel) 65 m_tasks.length = outputs.length - 1; 66 } 67 68 void finalize() 69 @safe @blocking { 70 flush(); 71 } 72 73 void flush() 74 @safe @blocking { 75 if (m_tasks.length > 0) { 76 Exception ex; 77 foreach (i, T; OutputStreams[1 .. $]) 78 m_tasks[i] = runTask({ 79 try m_outputs[i+1].flush(); 80 catch (Exception e) ex = e; 81 }); 82 m_outputs[0].flush(); 83 foreach (t; m_tasks) t.join(); 84 if (ex) throw ex; 85 } else { 86 foreach (i, T; OutputStreams) 87 m_outputs[i].flush(); 88 } 89 } 90 91 size_t write(in ubyte[] bytes, IOMode mode) 92 @safe @blocking { 93 if (!m_outputs.length) return bytes.length; 94 95 if (m_tasks.length > 0) { 96 Exception ex; 97 foreach (i, T; OutputStreams[1 .. $]) 98 m_tasks[i] = runTask({ 99 try m_outputs[i+1].write(bytes, mode); 100 catch (Exception e) ex = e; 101 }); 102 auto ret = m_outputs[0].write(bytes, mode); 103 foreach (t; m_tasks) t.join(); 104 if (ex) throw ex; 105 return ret; 106 } else { 107 auto ret = m_outputs[0].write(bytes, mode); 108 foreach (i, T; OutputStreams[1 .. $]) 109 m_outputs[i+1].write(bytes[0 .. ret]); 110 return ret; 111 } 112 } 113 void write(in ubyte[] bytes) @blocking { auto n = write(bytes, IOMode.all); assert(n == bytes.length); } 114 void write(in char[] bytes) @blocking { write(cast(const(ubyte)[])bytes); } 115 } 116 117 enum MulticastMode { 118 /// Output streams are written in serial order 119 serial, 120 /// Output streams are written in parallel using multiple tasks 121 parallel 122 } 123 124 mixin validateOutputStream!(MulticastStream!NullOutputStream);