From 908402a5135305c052103e794fe506d1e3c3dac1 Mon Sep 17 00:00:00 2001 From: Michael Meling Date: Mon, 1 Aug 2022 19:11:46 +0100 Subject: [PATCH 1/4] Resolved problem with StackOverflowException for .Net6.0 by using StreamThreads library to flatten the recursion in receive. The flattened code has been put into StreamThreader.cs --- websocket-sharp/StreamThreader.cs | 169 +++++++++ .../StreamThreads/BackgroundState.cs | 20 ++ .../StreamThreads/IteratorReturnVariable.cs | 61 ++++ .../StreamThreads/StreamExtensions.cs | 183 ++++++++++ websocket-sharp/StreamThreads/StreamState.cs | 24 ++ .../StreamThreads/StreamStateAwait.cs | 328 ++++++++++++++++++ .../StreamThreads/StreamStateBackground.cs | 57 +++ .../StreamThreads/StreamStateContinue.cs | 20 ++ .../StreamThreads/StreamStateError.cs | 29 ++ .../StreamThreads/StreamStateLambda.cs | 49 +++ .../StreamThreads/StreamStateReturn.cs | 52 +++ .../StreamThreads/StreamStateSwitch.cs | 32 ++ .../StreamThreads/StreamStateWaitForever.cs | 8 + websocket-sharp/WebSocket.cs | 46 +-- websocket-sharp/WebSocketFrame.cs | 175 +--------- websocket-sharp/websocket-sharp.csproj | 32 +- 16 files changed, 1072 insertions(+), 213 deletions(-) create mode 100644 websocket-sharp/StreamThreader.cs create mode 100644 websocket-sharp/StreamThreads/BackgroundState.cs create mode 100644 websocket-sharp/StreamThreads/IteratorReturnVariable.cs create mode 100644 websocket-sharp/StreamThreads/StreamExtensions.cs create mode 100644 websocket-sharp/StreamThreads/StreamState.cs create mode 100644 websocket-sharp/StreamThreads/StreamStateAwait.cs create mode 100644 websocket-sharp/StreamThreads/StreamStateBackground.cs create mode 100644 websocket-sharp/StreamThreads/StreamStateContinue.cs create mode 100644 websocket-sharp/StreamThreads/StreamStateError.cs create mode 100644 websocket-sharp/StreamThreads/StreamStateLambda.cs create mode 100644 websocket-sharp/StreamThreads/StreamStateReturn.cs create mode 100644 websocket-sharp/StreamThreads/StreamStateSwitch.cs create mode 100644 websocket-sharp/StreamThreads/StreamStateWaitForever.cs diff --git a/websocket-sharp/StreamThreader.cs b/websocket-sharp/StreamThreader.cs new file mode 100644 index 000000000..d0bd2489d --- /dev/null +++ b/websocket-sharp/StreamThreader.cs @@ -0,0 +1,169 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Net.Sockets; +using StreamThreads; +using static StreamThreads.StreamExtensions; + +namespace WebSocketSharp +{ + internal class StreamThreader + { + public WebSocket socket; + public NetworkStream Stream; + public int buffersize = 1000000; + + private MemoryStream _stream = new MemoryStream(); + private StreamState state; + private byte[] buff; + + public void Run() + { + buff = new byte[buffersize]; + state = MainLoop().Await(); + + Stream.BeginRead(buff, 0, buffersize, StreamReader, null); + + } + + private void StreamReader(IAsyncResult at) + { + int len = Stream.EndRead(at); + if (len == 0) return; + + var oldpos = _stream.Position; + _stream.Position = _stream.Length; + _stream.WriteBytes(buff.SubArray(0, len), len); + _stream.Position = oldpos; + + if (!Loop()) return; + + if (_stream.Position != 0) + { + var oldstream = _stream; + _stream.CopyTo(_stream = new MemoryStream(), (int)(_stream.Length - _stream.Position)); + oldstream.Dispose(); + _stream.Position = 0; + } + + Stream.BeginRead(buff, 0, buffersize, StreamReader, null); + } + + private bool Loop() + { + long curpos = _stream.Position; + while (true) + { + if (_stream.Position == _stream.Length) + break; + + if (state.Loop()) return false; + + if (curpos == _stream.Position) + break; + + curpos = _stream.Position; + } + + return true; + } + + private IEnumerable MainLoop() + { + while (true) + { + + yield return ReadStream(2).Await(out var header); + + var frame = WebSocketFrame.processHeader(header); + + yield return ReadExtendedPayloadLength(frame).Await(); + + yield return ReadMaskingKey(frame).Await(); + + yield return ReadPayloadData(frame).Await(); + + if (!socket.processReceivedFrame(frame)) + continue; + + if (!socket.HasMessage) + continue; + + socket._inMessage = false; + + if (socket._readyState != WebSocketState.Open) + break; + + socket.message(); + } + + } + + private IEnumerable> ReadStream(int len) + { + yield return WaitFor(() => _stream.Length - _stream.Position >= len); + + byte[] bytes = new byte[len]; + _stream.Read(bytes, 0, len); + + yield return Return(bytes); + } + + private IEnumerable ReadExtendedPayloadLength(WebSocketFrame frame) + { + var len = frame.ExtendedPayloadLengthWidth; + if (len == 0) + { + frame._extPayloadLength = WebSocket.EmptyBytes; + yield break; + } + + yield return ReadStream(len).Await(out var bytes); + + if (bytes.Value.Length != len) + throw new WebSocketException("The extended payload length of a frame cannot be read from the stream."); + + frame._extPayloadLength = bytes; + } + + private IEnumerable ReadMaskingKey(WebSocketFrame frame) + { + var len = frame.IsMasked ? 4 : 0; + if (len == 0) + { + frame._maskingKey = WebSocket.EmptyBytes; + yield break; + } + + yield return ReadStream(len).Await(out var bytes); + + if (bytes.Value.Length != len) + throw new WebSocketException("The masking key of a frame cannot be read from the stream."); + + frame._maskingKey = bytes; + } + + private IEnumerable ReadPayloadData(WebSocketFrame frame) + { + var exactLen = frame.ExactPayloadLength; + + if (exactLen > PayloadData.MaxLength) + throw new WebSocketException(CloseStatusCode.TooBig, "A frame has too long payload length."); + + if (exactLen == 0) + { + frame._payloadData = PayloadData.Empty; + yield break; + } + + yield return ReadStream((int)exactLen).Await(out var bytes); + + if (bytes.Value.Length != (int)exactLen) + throw new WebSocketException("The payload data of a frame cannot be read from the stream."); + + frame._payloadData = new PayloadData(bytes, (long)exactLen); + } + + + } +} diff --git a/websocket-sharp/StreamThreads/BackgroundState.cs b/websocket-sharp/StreamThreads/BackgroundState.cs new file mode 100644 index 000000000..61b548413 --- /dev/null +++ b/websocket-sharp/StreamThreads/BackgroundState.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; + +namespace StreamThreads +{ + internal class BackgroundState + { + internal bool Enabled = true; + internal bool SwitchState; + internal Action Lambda; + internal Predicate Condition; + internal StreamState BackgroundLoop; + internal IEnumerator SwitchFunction; + } + + internal class BackgroundState : BackgroundState + { + internal new IEnumerator> SwitchFunction; + } +} \ No newline at end of file diff --git a/websocket-sharp/StreamThreads/IteratorReturnVariable.cs b/websocket-sharp/StreamThreads/IteratorReturnVariable.cs new file mode 100644 index 000000000..32ceaebdc --- /dev/null +++ b/websocket-sharp/StreamThreads/IteratorReturnVariable.cs @@ -0,0 +1,61 @@ +using System; + +namespace StreamThreads +{ + public enum IteratorStates { Inactive, Running, Ended, Terminated, Faulted } + + public class IteratorReturnVariable + { + private object _value; + private bool _hasvalue = false; + + public IteratorStates IteratorState = IteratorStates.Inactive; + + public object Value + { + get + { + if (!_hasvalue) + throw new Exception("Variable value has not yet been set"); + + return _value; + } + + set + { + _value = value; + _hasvalue = true; + } + } + public bool HasValue() + { + return _hasvalue; + } + public bool HasEnded => IteratorState == IteratorStates.Ended; + public bool WasTerminated => IteratorState == IteratorStates.Terminated; + public bool IsRunning => IteratorState == IteratorStates.Running; + public bool Faulted => IteratorState == IteratorStates.Faulted; + public override string ToString() + { + return $"{_value}"; + } + public StreamState Await() + { + return new StreamStateLambda(() => !(IteratorState == IteratorStates.Inactive || IteratorState == IteratorStates.Running)); + } + } + + public class IteratorReturnVariable : IteratorReturnVariable + { + public new T Value + { + get => (T)base.Value; + set => base.Value = value; + } + + public static implicit operator T(IteratorReturnVariable v) + { + return v.Value; + } + } +} \ No newline at end of file diff --git a/websocket-sharp/StreamThreads/StreamExtensions.cs b/websocket-sharp/StreamThreads/StreamExtensions.cs new file mode 100644 index 000000000..e00670f99 --- /dev/null +++ b/websocket-sharp/StreamThreads/StreamExtensions.cs @@ -0,0 +1,183 @@ +using System; +using System.Collections.Generic; + +namespace StreamThreads +{ + public static class StreamExtensions + { + + public static readonly StreamState OK = new StreamState(); + public static readonly StreamState WaitForever = new StreamStateWaitForever(); + + [ThreadStatic] + private static DateTime _lastrun; + public static double SecondsSinceLast + { + get + { + var now = DateTime.Now; + return (now - _lastrun).TotalSeconds; + } + set + { + _lastrun = DateTime.Now - TimeSpan.FromSeconds(value); + } + } + + public static IEnumerable Until(this IEnumerable me, Predicate condition) + { + if (condition()) yield break; + + foreach (var item in me) + { + yield return item; + + if (condition()) yield break; + } + } + public static IEnumerable While(this IEnumerable me, Predicate condition) + { + var itr = me.GetEnumerator(); + while (true) + { + if (!condition()) + yield return OK; + else + { + if (!itr.MoveNext()) + yield break; + else + yield return itr.Current; + } + } + } + public static IEnumerable ExitOnError(this IEnumerable me) + { + var itr = me.GetEnumerator(); + while (true) + { + try + { + if (!itr.MoveNext()) yield break; + } + catch (Exception e) + { + Console.WriteLine(e.Message); + yield break; + } + + yield return itr.Current; + + } + } + public static IEnumerable ResumeOnError(this IEnumerable me) + { + var itr = me.GetEnumerator(); + while (true) + { + try + { + if (!itr.MoveNext()) yield break; + } + catch (Exception) { } + + yield return itr.Current; + } + } + public static IEnumerable RestartOnError(this IEnumerable me) + { + int maxretries = 1; + var itr = me.GetEnumerator(); + while (true) + { + try + { + if (!itr.MoveNext()) yield break; + maxretries = 1; + + } + catch (Exception) + { + if (--maxretries < 0) + throw; + + itr = me.GetEnumerator(); + } + + if (itr.Current != null) + yield return itr.Current; + + } + } + public static StreamState Await(this IEnumerable c) + { + return new StreamStateAwait(c, null); + } + public static StreamState Await(this IEnumerable c, out IteratorReturnVariable returnvalue) + { + return new StreamStateAwait(c, returnvalue = new IteratorReturnVariable()); + } + public static StreamStateAwait Await(this IEnumerable> c, out IteratorReturnVariable returnvalue) + { + return new StreamStateAwait(c, returnvalue = new IteratorReturnVariable()); + } + public static StreamState Background(this IEnumerable c) + { + return c.Background(out var notused); + } + public static StreamState Background(this IEnumerable c, out IteratorReturnVariable returnvalue) + { + return new StreamStateBackground(c.Await(out returnvalue)); + } + public static StreamState Background(Action lambda) + { + return new StreamStateBackground(lambda); + } + public static StreamStateBackground Background(this IEnumerable> c, out IteratorReturnVariable returnvalue) + { + return new StreamStateBackground(c.Await(out returnvalue)); + } + public static StreamState OnError(this IEnumerable c) + { + return new StreamStateError(c); + } + public static StreamState SwitchOnCondition(this IEnumerable c, Predicate condition) + { + return new StreamStateSwitch(c, condition); + } + public static StreamStateReturn Return(object returnvalue) + { + return new StreamStateReturn(returnvalue); + } + public static StreamStateReturn Return(T returnvalue) + { + return new StreamStateReturn(returnvalue); + } + public static StreamState Sleep(int millis) + { + var t = DateTime.Now + TimeSpan.FromMilliseconds(millis); + + return new StreamStateLambda(() => DateTime.Now > t); + } + public static StreamState WaitFor(Predicate trigger) + { + if (trigger()) + return new StreamStateContinue(); + else + return new StreamStateLambda(trigger); + } + + public static StreamState WaitFor(Predicate trigger) + { + if (trigger()) + return new StreamStateContinue(); + else + return new StreamStateLambda(trigger); + } + + public static void SimulatedError(double probability = 0.1) + { + if (new Random().NextDouble() > 1 - probability) throw new Exception("Simulated Error"); + } + } +} \ No newline at end of file diff --git a/websocket-sharp/StreamThreads/StreamState.cs b/websocket-sharp/StreamThreads/StreamState.cs new file mode 100644 index 000000000..2ac9ef3ad --- /dev/null +++ b/websocket-sharp/StreamThreads/StreamState.cs @@ -0,0 +1,24 @@ +namespace StreamThreads +{ + public delegate bool Predicate(); + internal enum StateTypes { Normal, Background, Error, Switch, Return, + Continue + } + + public class StreamState + { + public virtual bool Loop() => true; + + public virtual void Terminate() + { + + } + + internal virtual StateTypes StateType => StateTypes.Normal; + } + + public class StreamState : StreamState + { + + } +} diff --git a/websocket-sharp/StreamThreads/StreamStateAwait.cs b/websocket-sharp/StreamThreads/StreamStateAwait.cs new file mode 100644 index 000000000..44ffc29ff --- /dev/null +++ b/websocket-sharp/StreamThreads/StreamStateAwait.cs @@ -0,0 +1,328 @@ +using System; +using System.Collections; +using System.Collections.Generic; + +namespace StreamThreads +{ + + public class StreamStateAwait : StreamState + { + internal override StateTypes StateType => StateTypes.Continue; + + public IteratorReturnVariable ReturnValue; + internal IEnumerator Iterator; + internal IEnumerable ErrorHandler; + internal List BackgroundThreads = new List(); + + public StreamStateAwait(IEnumerable c, IteratorReturnVariable returnvalue) : base() + { + Iterator = c.GetEnumerator(); + ErrorHandler = null; + ReturnValue = returnvalue; + + if (returnvalue != null) + ReturnValue.IteratorState = IteratorStates.Running; + } + + public override bool Loop() + { + while (true) + { + try + { + bool continueonce = false; + bool running = true; + if (Iterator.Current == null) + { + running = Iterator.MoveNext(); + continueonce = running && Iterator.Current.StateType == StateTypes.Continue; + } + else if (Iterator.Current.Loop()) + { + running = Iterator.MoveNext(); + continueonce = running && Iterator.Current.StateType == StateTypes.Continue; + } + + exitfunction: + if (!running) + { + if (Iterator.Current != null) + Iterator.Current.Terminate(); + + foreach (var item in BackgroundThreads) + { + item.BackgroundLoop.Terminate(); + } + + if (ReturnValue != null) + ReturnValue.IteratorState = IteratorStates.Ended; + return true; + } + + switch (Iterator.Current.StateType) + { + case StateTypes.Continue: + if (continueonce) + continue; + else + break; + case StateTypes.Error: + ErrorHandler = ((StreamStateError)Iterator.Current).OnError; + continue; + + case StateTypes.Switch: + var sm = new BackgroundState() + { + SwitchState = true, + SwitchFunction = ((StreamStateSwitch)Iterator.Current).OnSwitch.GetEnumerator(), + Condition = ((StreamStateSwitch)Iterator.Current).Condition + }; + BackgroundThreads.Add(sm); + continue; + + case StateTypes.Background: + var bgs = new BackgroundState() + { + BackgroundLoop = ((StreamStateBackground)Iterator.Current).Background, + Lambda = ((StreamStateBackground)Iterator.Current).Lambda + }; + BackgroundThreads.Add(bgs); + continue; + + case StateTypes.Return: + if (ReturnValue != null) + + ReturnValue.Value = ((IStreamStateReturn)Iterator.Current).GetValue(); + + running = false; + goto exitfunction; + + default: + break; + } + + for (int i = 0; i < BackgroundThreads.Count; i++) + { + var item = BackgroundThreads[i]; + try + { + if (!item.Enabled) continue; + + if (item.Condition.Invoke()) + { + item.Lambda.Invoke(); + + if (item.SwitchState) + { + Iterator = item.SwitchFunction; + BackgroundThreads.Clear(); + ErrorHandler = null; + } + else if (item.BackgroundLoop != null) + { + if (item.BackgroundLoop.Loop()) + { + BackgroundThreads.RemoveAt(i); + } + } + } + } + catch (Exception) + { + BackgroundThreads.RemoveAt(i--); + throw; + } + } + + return false; + } + catch (Exception e) + { + Console.WriteLine(e.StackTrace); + + if (ErrorHandler != null) + { + Iterator = ErrorHandler.GetEnumerator(); + BackgroundThreads.Clear(); + ErrorHandler = null; + } + else + { + if (ReturnValue != null) + ReturnValue.IteratorState = IteratorStates.Faulted; + throw; + } + } + } + + } + public override void Terminate() + { + foreach (var item in BackgroundThreads) + { + item.BackgroundLoop.Terminate(); + } + } + } + + public class StreamStateAwait : StreamState + { + internal override StateTypes StateType => StateTypes.Continue; + + public IteratorReturnVariable ReturnValue; + internal IEnumerator> Iterator; + internal IEnumerable> ErrorHandler; + internal List BackgroundThreads = new List(); + + public StreamStateAwait(IEnumerable> c, IteratorReturnVariable returnvalue) : base() + { + Iterator = c.GetEnumerator(); + ErrorHandler = null; + ReturnValue = returnvalue; + + if (returnvalue != null) + ReturnValue.IteratorState = IteratorStates.Running; + } + + public override bool Loop() + { + while (true) + { + try + { + bool continueonce = false; + bool running = true; + if (Iterator.Current == null) + { + running = Iterator.MoveNext(); + continueonce = running && Iterator.Current.StateType == StateTypes.Continue; + } + else if (Iterator.Current.Loop()) + { + running = Iterator.MoveNext(); + continueonce = running && Iterator.Current.StateType == StateTypes.Continue; + } + + exitfunction: + if (!running) + { + Iterator.Current.Terminate(); + + foreach (var item in BackgroundThreads) + { + item.BackgroundLoop.Terminate(); + } + + if (ReturnValue != null) + ReturnValue.IteratorState = IteratorStates.Ended; + return true; + } + + switch (Iterator.Current.StateType) + { + case StateTypes.Continue: + if (continueonce) + continue; + else + break; + case StateTypes.Error: + ErrorHandler = ((StreamStateError)Iterator.Current).OnError; + continue; + + case StateTypes.Switch: + var sm = new BackgroundState() + { + SwitchState = true, + SwitchFunction = ((StreamStateSwitch)Iterator.Current).OnSwitch.GetEnumerator(), + Condition = ((StreamStateSwitch)Iterator.Current).Condition + }; + BackgroundThreads.Add(sm); + continue; + + case StateTypes.Background: + var bgs = new BackgroundState() + { + BackgroundLoop = ((StreamStateBackground)Iterator.Current).Background, + Lambda = ((StreamStateBackground)Iterator.Current).Lambda + }; + BackgroundThreads.Add(bgs); + continue; + + case StateTypes.Return: + if (ReturnValue != null) + + ReturnValue.Value = ((IStreamStateReturn)Iterator.Current).GetValue(); + + running = false; + goto exitfunction; + + default: + break; + } + + for (int i = 0; i < BackgroundThreads.Count; i++) + { + var item = BackgroundThreads[i]; + try + { + if (!item.Enabled) continue; + + if (item.Condition.Invoke()) + { + item.Lambda.Invoke(); + + if (item.SwitchState) + { + Iterator = (IEnumerator>)item.SwitchFunction; + BackgroundThreads.Clear(); + ErrorHandler = null; + } + else if (item.BackgroundLoop != null) + { + if (item.BackgroundLoop.Loop()) + { + BackgroundThreads.RemoveAt(i); + } + } + } + } + catch (Exception) + { + BackgroundThreads.RemoveAt(i--); + throw; + } + } + + return false; + } + catch (Exception e) + { + Console.WriteLine(e.StackTrace); + + if (ErrorHandler != null) + { + Iterator = ErrorHandler.GetEnumerator(); + BackgroundThreads.Clear(); + ErrorHandler = null; + } + else + { + if (ReturnValue != null) + ReturnValue.IteratorState = IteratorStates.Faulted; + throw; + } + } + } + + } + public override void Terminate() + { + foreach (var item in BackgroundThreads) + { + item.BackgroundLoop.Terminate(); + } + } + + } + +} diff --git a/websocket-sharp/StreamThreads/StreamStateBackground.cs b/websocket-sharp/StreamThreads/StreamStateBackground.cs new file mode 100644 index 000000000..d0c4eb8a4 --- /dev/null +++ b/websocket-sharp/StreamThreads/StreamStateBackground.cs @@ -0,0 +1,57 @@ +using System; + +namespace StreamThreads +{ + public class StreamStateBackground : StreamState + { + public StreamState Background; + public Action Lambda; + + public StreamStateBackground(StreamState background) + { + Background = background; + Lambda = null; + } + + public StreamStateBackground(Action lambda) + { + Background = null; + Lambda = lambda; + } + + public StreamStateBackground(StreamState background, Action lambda) + { + Background = background; + Lambda = lambda; + } + + internal override StateTypes StateType => StateTypes.Background; + } + public class StreamStateBackground : StreamState + { + public StreamState Background; + public Action Lambda; + + public StreamStateBackground(StreamState background) + { + Background = background; + Lambda = null; + } + + public StreamStateBackground(Action lambda) + { + Background = null; + Lambda = lambda; + } + + public StreamStateBackground(StreamState background, Action lambda) + { + Background = background; + Lambda = lambda; + } + + internal override StateTypes StateType => StateTypes.Background; + } + + +} diff --git a/websocket-sharp/StreamThreads/StreamStateContinue.cs b/websocket-sharp/StreamThreads/StreamStateContinue.cs new file mode 100644 index 000000000..2b6ee699a --- /dev/null +++ b/websocket-sharp/StreamThreads/StreamStateContinue.cs @@ -0,0 +1,20 @@ +namespace StreamThreads +{ + internal class StreamStateContinue : StreamState + { + internal override StateTypes StateType => StateTypes.Continue; + public override bool Loop() + { + return true; + } + } + + internal class StreamStateContinue : StreamState + { + internal override StateTypes StateType => StateTypes.Continue; + public override bool Loop() + { + return true; + } + } +} \ No newline at end of file diff --git a/websocket-sharp/StreamThreads/StreamStateError.cs b/websocket-sharp/StreamThreads/StreamStateError.cs new file mode 100644 index 000000000..d0bc283d2 --- /dev/null +++ b/websocket-sharp/StreamThreads/StreamStateError.cs @@ -0,0 +1,29 @@ +using System.Collections.Generic; + +namespace StreamThreads +{ + public class StreamStateError : StreamState + { + public IEnumerable OnError; + + public StreamStateError(IEnumerable onError) + { + OnError = onError; + } + + internal override StateTypes StateType => StateTypes.Error; + } + + public class StreamStateError : StreamState + { + public IEnumerable> OnError; + + public StreamStateError(IEnumerable> onError) + { + OnError = onError; + } + + internal override StateTypes StateType => StateTypes.Error; + } + +} diff --git a/websocket-sharp/StreamThreads/StreamStateLambda.cs b/websocket-sharp/StreamThreads/StreamStateLambda.cs new file mode 100644 index 000000000..bcdd0e6f4 --- /dev/null +++ b/websocket-sharp/StreamThreads/StreamStateLambda.cs @@ -0,0 +1,49 @@ +using System; + +namespace StreamThreads +{ + public class StreamStateLambda : StreamState + { + internal Action TerminateLambda; + internal Predicate Lambda; + + public StreamStateLambda(Predicate lambdaloop) : base() + { + Lambda = lambdaloop; + TerminateLambda = null; + } + + public override bool Loop() + { + return Lambda(); + } + + public override void Terminate() + { + TerminateLambda.Invoke(); + } + } + public class StreamStateLambda : StreamState + { + internal Action TerminateLambda; + internal Predicate Lambda; + + public StreamStateLambda(Predicate lambdaloop) : base() + { + Lambda = lambdaloop; + TerminateLambda = null; + } + + public override bool Loop() + { + return Lambda(); + } + + public override void Terminate() + { + TerminateLambda.Invoke(); + } + } + + +} diff --git a/websocket-sharp/StreamThreads/StreamStateReturn.cs b/websocket-sharp/StreamThreads/StreamStateReturn.cs new file mode 100644 index 000000000..14a85a469 --- /dev/null +++ b/websocket-sharp/StreamThreads/StreamStateReturn.cs @@ -0,0 +1,52 @@ +namespace StreamThreads +{ + public interface IStreamStateReturn + { + object GetValue(); + } + + public class StreamStateReturn : StreamState, IStreamStateReturn + { + internal override StateTypes StateType => StateTypes.Return; + + public object Return; + + public StreamStateReturn() + { + + } + + public StreamStateReturn(object ret) + { + Return = ret; + } + + public object GetValue() + { + return Return; + } + } + + public class StreamStateReturn : StreamState, IStreamStateReturn + { + internal override StateTypes StateType => StateTypes.Return; + + public T Return; + + public StreamStateReturn() + { + } + + public StreamStateReturn(T ret) + { + Return = ret; + } + + public object GetValue() + { + return Return; + } + } + + +} diff --git a/websocket-sharp/StreamThreads/StreamStateSwitch.cs b/websocket-sharp/StreamThreads/StreamStateSwitch.cs new file mode 100644 index 000000000..ffb57b90f --- /dev/null +++ b/websocket-sharp/StreamThreads/StreamStateSwitch.cs @@ -0,0 +1,32 @@ +using System.Collections.Generic; + +namespace StreamThreads +{ + public class StreamStateSwitch : StreamState + { + public IEnumerable OnSwitch; + internal Predicate Condition; + + public StreamStateSwitch(IEnumerable onSwitch, Predicate condition) + { + OnSwitch = onSwitch; + Condition = condition; + } + + internal override StateTypes StateType => StateTypes.Switch; + } + public class StreamStateSwitch : StreamState + { + public IEnumerable OnSwitch; + internal Predicate Condition; + + public StreamStateSwitch(IEnumerable onSwitch, Predicate condition) + { + OnSwitch = onSwitch; + Condition = condition; + } + + internal override StateTypes StateType => StateTypes.Switch; + } + +} diff --git a/websocket-sharp/StreamThreads/StreamStateWaitForever.cs b/websocket-sharp/StreamThreads/StreamStateWaitForever.cs new file mode 100644 index 000000000..b26970a2e --- /dev/null +++ b/websocket-sharp/StreamThreads/StreamStateWaitForever.cs @@ -0,0 +1,8 @@ +namespace StreamThreads +{ + public class StreamStateWaitForever : StreamState + { + public override bool Loop() => false; + + } +} diff --git a/websocket-sharp/WebSocket.cs b/websocket-sharp/WebSocket.cs index 3b6619da8..0f090fcbe 100644 --- a/websocket-sharp/WebSocket.cs +++ b/websocket-sharp/WebSocket.cs @@ -95,7 +95,7 @@ public class WebSocket : IDisposable private Func _handshakeRequestChecker; private bool _ignoreExtensions; private bool _inContinuation; - private volatile bool _inMessage; + internal volatile bool _inMessage; private volatile Logger _logger; private static readonly int _maxRetryCountForConnect; private Action _message; @@ -109,7 +109,7 @@ public class WebSocket : IDisposable private bool _protocolsRequested; private NetworkCredential _proxyCredentials; private Uri _proxyUri; - private volatile WebSocketState _readyState; + internal volatile WebSocketState _readyState; private ManualResetEvent _receivingExited; private int _retryCountForConnect; private bool _secure; @@ -1548,7 +1548,7 @@ private void init () _readyState = WebSocketState.Connecting; } - private void message () + internal void message () { MessageEventArgs e = null; lock (_forMessageEventQueue) { @@ -1766,7 +1766,7 @@ private bool processPongFrame (WebSocketFrame frame) return true; } - private bool processReceivedFrame (WebSocketFrame frame) + internal bool processReceivedFrame (WebSocketFrame frame) { string msg; if (!checkReceivedFrame (frame, out msg)) @@ -2307,39 +2307,15 @@ private void startReceiving () _pongReceived = new ManualResetEvent (false); _receivingExited = new ManualResetEvent (false); - Action receive = null; - receive = - () => - WebSocketFrame.ReadFrameAsync ( - _stream, - false, - frame => { - if (!processReceivedFrame (frame) || _readyState == WebSocketState.Closed) { - var exited = _receivingExited; - if (exited != null) - exited.Set (); - - return; - } - - // Receive next asap because the Ping or Close needs a response to it. - receive (); - - if (_inMessage || !HasMessage || _readyState != WebSocketState.Open) - return; - - message (); - }, - ex => { - _logger.Fatal (ex.ToString ()); - fatal ("An exception has occurred while receiving.", ex); - } - ); - - receive (); + var st = new StreamThreader() + { + Stream = _stream as NetworkStream, + socket = this + }; + st.Run(); } - // As client + // As client private bool validateSecWebSocketExtensionsServerHeader (string value) { if (value == null) diff --git a/websocket-sharp/WebSocketFrame.cs b/websocket-sharp/WebSocketFrame.cs index cbc53f32c..903f61ec7 100644 --- a/websocket-sharp/WebSocketFrame.cs +++ b/websocket-sharp/WebSocketFrame.cs @@ -45,12 +45,12 @@ internal class WebSocketFrame : IEnumerable { #region Private Fields - private byte[] _extPayloadLength; + internal byte[] _extPayloadLength; private Fin _fin; private Mask _mask; - private byte[] _maskingKey; + internal byte[] _maskingKey; private Opcode _opcode; - private PayloadData _payloadData; + internal PayloadData _payloadData; private byte _payloadLength; private Rsv _rsv1; private Rsv _rsv2; @@ -451,7 +451,7 @@ private static string print (WebSocketFrame frame) ); } - private static WebSocketFrame processHeader (byte[] header) + internal static WebSocketFrame processHeader (byte[] header) { if (header.Length != 2) { var msg = "The header part of a frame could not be read."; @@ -543,40 +543,6 @@ private static WebSocketFrame readExtendedPayloadLength ( return frame; } - private static void readExtendedPayloadLengthAsync ( - Stream stream, - WebSocketFrame frame, - Action completed, - Action error - ) - { - var len = frame.ExtendedPayloadLengthWidth; - - if (len == 0) { - frame._extPayloadLength = WebSocket.EmptyBytes; - - completed (frame); - - return; - } - - stream.ReadBytesAsync ( - len, - bytes => { - if (bytes.Length != len) { - var msg = "The extended payload length of a frame could not be read."; - - throw new WebSocketException (msg); - } - - frame._extPayloadLength = bytes; - - completed (frame); - }, - error - ); - } - private static WebSocketFrame readHeader (Stream stream) { var bytes = stream.ReadBytes (2); @@ -584,21 +550,6 @@ private static WebSocketFrame readHeader (Stream stream) return processHeader (bytes); } - private static void readHeaderAsync ( - Stream stream, Action completed, Action error - ) - { - stream.ReadBytesAsync ( - 2, - bytes => { - var frame = processHeader (bytes); - - completed (frame); - }, - error - ); - } - private static WebSocketFrame readMaskingKey ( Stream stream, WebSocketFrame frame ) @@ -623,40 +574,6 @@ private static WebSocketFrame readMaskingKey ( return frame; } - private static void readMaskingKeyAsync ( - Stream stream, - WebSocketFrame frame, - Action completed, - Action error - ) - { - if (!frame.IsMasked) { - frame._maskingKey = WebSocket.EmptyBytes; - - completed (frame); - - return; - } - - var len = 4; - - stream.ReadBytesAsync ( - len, - bytes => { - if (bytes.Length != len) { - var msg = "The masking key of a frame could not be read."; - - throw new WebSocketException (msg); - } - - frame._maskingKey = bytes; - - completed (frame); - }, - error - ); - } - private static WebSocketFrame readPayloadData ( Stream stream, WebSocketFrame frame ) @@ -691,53 +608,6 @@ private static WebSocketFrame readPayloadData ( return frame; } - private static void readPayloadDataAsync ( - Stream stream, - WebSocketFrame frame, - Action completed, - Action error - ) - { - var exactLen = frame.ExactPayloadLength; - - if (exactLen > PayloadData.MaxLength) { - var msg = "A frame has too long payload length."; - - throw new WebSocketException (CloseStatusCode.TooBig, msg); - } - - if (exactLen == 0) { - frame._payloadData = PayloadData.Empty; - - completed (frame); - - return; - } - - var len = (long) exactLen; - - Action comp = - bytes => { - if (bytes.LongLength != len) { - var msg = "The payload data of a frame could not be read."; - - throw new WebSocketException (msg); - } - - frame._payloadData = new PayloadData (bytes, len); - - completed (frame); - }; - - if (frame._payloadLength < 127) { - stream.ReadBytesAsync ((int) exactLen, comp, error); - - return; - } - - stream.ReadBytesAsync (len, 1024, comp, error); - } - private static string utf8Decode (byte[] bytes) { try { @@ -798,43 +668,6 @@ internal static WebSocketFrame ReadFrame (Stream stream, bool unmask) return frame; } - internal static void ReadFrameAsync ( - Stream stream, - bool unmask, - Action completed, - Action error - ) - { - readHeaderAsync ( - stream, - frame => - readExtendedPayloadLengthAsync ( - stream, - frame, - frame1 => - readMaskingKeyAsync ( - stream, - frame1, - frame2 => - readPayloadDataAsync ( - stream, - frame2, - frame3 => { - if (unmask) - frame3.Unmask (); - - completed (frame3); - }, - error - ), - error - ), - error - ), - error - ); - } - internal void Unmask () { if (_mask == Mask.Off) diff --git a/websocket-sharp/websocket-sharp.csproj b/websocket-sharp/websocket-sharp.csproj index 0860c0313..44dca06e5 100644 --- a/websocket-sharp/websocket-sharp.csproj +++ b/websocket-sharp/websocket-sharp.csproj @@ -1,5 +1,5 @@ - - + + Debug AnyCPU @@ -12,6 +12,11 @@ v3.5 true websocket-sharp.snk + + + + + 3.5 true @@ -22,6 +27,7 @@ prompt 4 false + MinimumRecommendedRules.ruleset none @@ -30,6 +36,7 @@ prompt 4 false + MinimumRecommendedRules.ruleset true @@ -40,6 +47,7 @@ prompt 4 false + MinimumRecommendedRules.ruleset none @@ -54,6 +62,7 @@ + MinimumRecommendedRules.ruleset @@ -67,6 +76,19 @@ + + + + + + + + + + + + + @@ -141,9 +163,5 @@ - - - - - + \ No newline at end of file From 56ee35cfccace82db82f090ec634dba564748884 Mon Sep 17 00:00:00 2001 From: Michael Meling Date: Fri, 12 Aug 2022 22:33:30 +0100 Subject: [PATCH 2/4] Error handling --- websocket-sharp/StreamThreader.cs | 36 +++++++++++++++++++------------ websocket-sharp/WebSocket.cs | 4 ++-- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/websocket-sharp/StreamThreader.cs b/websocket-sharp/StreamThreader.cs index d0bd2489d..12f213e35 100644 --- a/websocket-sharp/StreamThreader.cs +++ b/websocket-sharp/StreamThreader.cs @@ -28,25 +28,33 @@ public void Run() private void StreamReader(IAsyncResult at) { - int len = Stream.EndRead(at); - if (len == 0) return; + try + { + int len = Stream.EndRead(at); + if (len == 0) return; + + var oldpos = _stream.Position; + _stream.Position = _stream.Length; + _stream.WriteBytes(buff.SubArray(0, len), len); + _stream.Position = oldpos; - var oldpos = _stream.Position; - _stream.Position = _stream.Length; - _stream.WriteBytes(buff.SubArray(0, len), len); - _stream.Position = oldpos; + if (!Loop()) return; - if (!Loop()) return; + if (_stream.Position != 0) + { + var oldstream = _stream; + _stream.CopyTo(_stream = new MemoryStream(), (int)(_stream.Length - _stream.Position)); + oldstream.Dispose(); + _stream.Position = 0; + } - if (_stream.Position != 0) + Stream.BeginRead(buff, 0, buffersize, StreamReader, null); + } + catch (Exception ex) { - var oldstream = _stream; - _stream.CopyTo(_stream = new MemoryStream(), (int)(_stream.Length - _stream.Position)); - oldstream.Dispose(); - _stream.Position = 0; + socket._logger.Fatal(ex.ToString()); + socket.fatal("An exception has occurred while receiving.", ex); } - - Stream.BeginRead(buff, 0, buffersize, StreamReader, null); } private bool Loop() diff --git a/websocket-sharp/WebSocket.cs b/websocket-sharp/WebSocket.cs index 0f090fcbe..c714693f5 100644 --- a/websocket-sharp/WebSocket.cs +++ b/websocket-sharp/WebSocket.cs @@ -96,7 +96,7 @@ public class WebSocket : IDisposable private bool _ignoreExtensions; private bool _inContinuation; internal volatile bool _inMessage; - private volatile Logger _logger; + internal volatile Logger _logger; private static readonly int _maxRetryCountForConnect; private Action _message; private Queue _messageEventQueue; @@ -1508,7 +1508,7 @@ private void error (string message, Exception exception) } } - private void fatal (string message, Exception exception) + internal void fatal (string message, Exception exception) { var code = exception is WebSocketException ? ((WebSocketException) exception).Code From b93827ee49ccc8f2a5a3b9f2dcbc22e0e7014e84 Mon Sep 17 00:00:00 2001 From: Michael Meling Date: Wed, 17 Aug 2022 16:33:18 +0100 Subject: [PATCH 3/4] Corrected WSS error --- websocket-sharp/StreamThreader.cs | 2 +- websocket-sharp/WebSocket.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/websocket-sharp/StreamThreader.cs b/websocket-sharp/StreamThreader.cs index 12f213e35..e2b4a2f45 100644 --- a/websocket-sharp/StreamThreader.cs +++ b/websocket-sharp/StreamThreader.cs @@ -10,7 +10,7 @@ namespace WebSocketSharp internal class StreamThreader { public WebSocket socket; - public NetworkStream Stream; + public Stream Stream; public int buffersize = 1000000; private MemoryStream _stream = new MemoryStream(); diff --git a/websocket-sharp/WebSocket.cs b/websocket-sharp/WebSocket.cs index c714693f5..2cca1d87f 100644 --- a/websocket-sharp/WebSocket.cs +++ b/websocket-sharp/WebSocket.cs @@ -2309,7 +2309,7 @@ private void startReceiving () var st = new StreamThreader() { - Stream = _stream as NetworkStream, + Stream = _stream, socket = this }; st.Run(); From 9877a262c570846b7c2999f192ca3c2483b2edd1 Mon Sep 17 00:00:00 2001 From: Michael Meling Date: Thu, 8 Dec 2022 22:28:16 +0100 Subject: [PATCH 4/4] Fixed issue with Close() --- websocket-sharp/StreamThreader.cs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/websocket-sharp/StreamThreader.cs b/websocket-sharp/StreamThreader.cs index e2b4a2f45..89b861b5b 100644 --- a/websocket-sharp/StreamThreader.cs +++ b/websocket-sharp/StreamThreader.cs @@ -22,7 +22,18 @@ public void Run() buff = new byte[buffersize]; state = MainLoop().Await(); - Stream.BeginRead(buff, 0, buffersize, StreamReader, null); + try + { + Stream.BeginRead(buff, 0, buffersize, StreamReader, null); + } + catch (ObjectDisposedException) + { + } + catch (Exception ex) + { + socket._logger.Fatal(ex.ToString()); + socket.fatal("An exception has occurred while receiving.", ex); + } } @@ -50,6 +61,9 @@ private void StreamReader(IAsyncResult at) Stream.BeginRead(buff, 0, buffersize, StreamReader, null); } + catch (ObjectDisposedException) + { + } catch (Exception ex) { socket._logger.Fatal(ex.ToString());