From aca59e8d864fb3c15c7b48c5c0533c6a65fc50a7 Mon Sep 17 00:00:00 2001 From: Anairkoen Schno Date: Sat, 7 Dec 2019 21:58:00 -0600 Subject: [PATCH] Added a SingleThreadTaskScheduler --- IPA.Loader/IPA.Loader.csproj | 1 + .../Async/SingleThreadTaskScheduler.cs | 175 ++++++++++++++++++ 2 files changed, 176 insertions(+) create mode 100644 IPA.Loader/Utilities/Async/SingleThreadTaskScheduler.cs diff --git a/IPA.Loader/IPA.Loader.csproj b/IPA.Loader/IPA.Loader.csproj index a73ed13e..9e0a7a0e 100644 --- a/IPA.Loader/IPA.Loader.csproj +++ b/IPA.Loader/IPA.Loader.csproj @@ -127,6 +127,7 @@ + diff --git a/IPA.Loader/Utilities/Async/SingleThreadTaskScheduler.cs b/IPA.Loader/Utilities/Async/SingleThreadTaskScheduler.cs new file mode 100644 index 00000000..605e1dc6 --- /dev/null +++ b/IPA.Loader/Utilities/Async/SingleThreadTaskScheduler.cs @@ -0,0 +1,175 @@ +using System; +using System.Collections.Generic; +using System.Collections.Concurrent; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace IPA.Utilities.Async +{ + /// + /// A single-threaded task scheduler that runs all of its tasks on the same thread. + /// + public class SingleThreadTaskScheduler : TaskScheduler, IDisposable + { + private readonly Thread runThread = new Thread(ExecuteTasksS); + private readonly BlockingCollection tasks = new BlockingCollection(); + private readonly CancellationTokenSource exitTokenSource = new CancellationTokenSource(); + + /// + /// Starts the thread that executes tasks scheduled with this + /// + /// Thrown if this object has already been disposed. + public void StartThread() + { + ThrowIfDisposed(); + + runThread.Start(); + } + + /// + /// Terminates the runner thread, and waits for the currently running task to complete. + /// + /// + /// After this method returns, this object has been disposed and is no longer in a valid state. + /// + /// an of s that did not execute + /// Thrown if this object has already been disposed. + public IEnumerable Exit() + { + ThrowIfDisposed(); + + tasks.CompleteAdding(); + exitTokenSource.Cancel(); + runThread.Join(); + + var retTasks = new List(); + retTasks.AddRange(tasks); + + Dispose(true); + return retTasks; + } + + /// + /// Waits for the runner thread to complete all tasks in the queue, then exits. + /// + /// + /// After this method returns, this object has been disposed and is no longer in a valid state. + /// + /// Thrown if this object has already been disposed. + public void Join() + { + ThrowIfDisposed(); + + tasks.CompleteAdding(); + runThread.Join(); + Dispose(true); + } + + /// + /// Throws a . + /// + /// nothing + /// Always. + protected override IEnumerable GetScheduledTasks() + { + // this is only for debuggers which we can't use sooooo + throw new NotSupportedException(); + } + + /// + /// Queues a given to this scheduler. The must> be + /// scheduled for this by the runtime. + /// + /// the to queue + /// Thrown if this object has already been disposed. + protected override void QueueTask(Task task) + { + ThrowIfDisposed(); + + tasks.Add(task); + } + + /// + /// Rejects any attempts to execute a task inline. + /// + /// + /// This task scheduler always runs its tasks on the thread that it manages, therefore it doesn't + /// make sense to run it inline. + /// + /// the task to attempt to execute + /// whether the task was previously queued to this scheduler + /// + /// Thrown if this object has already been disposed. + protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) + { + ThrowIfDisposed(); + + return false; + } + + private void ThrowIfDisposed() + { + if (disposedValue) + throw new InvalidOperationException("Object already disposed"); + } + + private void ExecuteTasks() + { + ThrowIfDisposed(); + + var token = exitTokenSource.Token; + + try + { + // while we are still accepting tasks, and we can pull out a task with an infinite wait duration + while (!tasks.IsCompleted && tasks.TryTake(out var task, -1, token)) + { + TryExecuteTask(task); + } + } + catch (OperationCanceledException) + { + // TryTake was cancelled, we'll just leave + } + } + + private static void ExecuteTasksS(object param) + { + var self = param as SingleThreadTaskScheduler; + self.ExecuteTasks(); + } + + #region IDisposable Support + private bool disposedValue = false; // To detect redundant calls + + /// + /// Disposes this object. + /// + /// whether or not to dispose managed objects + protected virtual void Dispose(bool disposing) + { + if (!disposedValue) + { + if (disposing) + { + exitTokenSource.Dispose(); + tasks.Dispose(); + } + + disposedValue = true; + } + } + + /// + /// Disposes this object. This puts the object into an unusable state. + /// + public void Dispose() + { + // Do not change this code. Put cleanup code in Dispose(bool disposing) above. + Dispose(true); + } + #endregion + } +}