Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 32 additions & 15 deletions VisualPinball.Engine.Mpf/MpfClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ public class MpfClient
private MpfHardwareService.MpfHardwareServiceClient _client;

private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
private Thread _commandsThread;
private AsyncServerStreamingCall<Commands> _commandStream;
private Task _receiveCommandsTask;
private Task _writeSwitchChangeTask;
private AsyncClientStreamingCall<SwitchChanges, EmptyResponse> _switchStream;
private CancellationTokenSource _cts;

public void Connect(string serverIpPort = "127.0.0.1:50051")
{
Expand All @@ -55,29 +56,32 @@ public void StartGame(Dictionary<string, bool> initialSwitches, bool handleStrea
}

Logger.Info("Starting player with machine state: " + ms);
_commandStream = _client.Start(ms);

_cts = new CancellationTokenSource();

if (handleStream) {
_commandsThread = new Thread(ReceiveCommands) { IsBackground = true };
_commandsThread.Start();
AsyncServerStreamingCall<Commands> commandsStream = _client.Start(ms);
_receiveCommandsTask = Task.Run(() => ReceiveCommands(commandsStream, _cts.Token));
}

_switchStream = _client.SendSwitchChanges();
}

public async Task Switch(string swName, bool swValue)
{
await _switchStream.RequestStream.WriteAsync(new SwitchChanges
_writeSwitchChangeTask = _switchStream.RequestStream.WriteAsync(new SwitchChanges
{SwitchNumber = swName, SwitchState = swValue});
await _writeSwitchChangeTask;
_writeSwitchChangeTask = null;
}

private async void ReceiveCommands()
private async Task ReceiveCommands(AsyncServerStreamingCall<Commands> commandsStream, CancellationToken ct)
{
Logger.Info("Client started, retrieving commands...");

try {
while (await _commandStream.ResponseStream.MoveNext()) {
var commands = _commandStream.ResponseStream.Current;
while (await commandsStream.ResponseStream.MoveNext(ct)) {
var commands = commandsStream.ResponseStream.Current;
switch (commands.CommandCase) {
case Commands.CommandOneofCase.None:
break;
Expand Down Expand Up @@ -107,9 +111,11 @@ private async void ReceiveCommands()
}
}
}

catch(RpcException e) {
Logger.Error($"Unable to retrieve commands: Status={e.Status}");
if (!ct.IsCancellationRequested)
Logger.Error($"Unable to retrieve commands: Status={e.Status}");
} finally {
commandsStream.Dispose();
}
}

Expand All @@ -122,10 +128,21 @@ public MachineDescription GetMachineDescription()
public void Shutdown()
{
Logger.Info("Shutting down...");
_client.Quit(new QuitRequest());
_commandStream?.Dispose();
_channel.ShutdownAsync().Wait();
Logger.Info("All down.");
if (_channel.State == ChannelState.Ready)
_client.Quit(new QuitRequest());
_cts?.Cancel();
_cts?.Dispose();
_cts = null;
if (!_receiveCommandsTask.Wait(500))
Logger.Warn("Receive commands task shutdown timed out after 500ms");
_receiveCommandsTask = null;
if (_writeSwitchChangeTask != null && !_writeSwitchChangeTask.Wait(500))
Logger.Warn("Write switch change task shutdown timed out after 500ms");
_writeSwitchChangeTask = null;
_switchStream?.Dispose();
if (!_channel.ShutdownAsync().Wait(500))
Logger.Warn("GRPC channel shutdown timed out after 500ms");
Logger.Info("All down.");
}
}
}