IO.Pipelines for testing. Data written via LoopbackTransport.SendAsync can be read back via LoopbackTransport.ReceiveAsync on the paired transport instance." /> IO.Pipelines for testing. Data written via LoopbackTransport.SendAsync can be read back via LoopbackTransport.ReceiveAsync on the paired transport instance." /> IO.Pipelines for testing. Data written via LoopbackTransport.SendAsync can be read back via LoopbackTransport.ReceiveAsync on the paired transport instance." />
Class Sealed
public sealed class LoopbackTransport : SharpMeter.Transport.ITransport

Namespace: SharpMeter.Transport

In-memory loopback transport using IO.Pipelines for testing. Data written via LoopbackTransport.SendAsync can be read back via LoopbackTransport.ReceiveAsync on the paired transport instance.

Inheritance

Implements: SharpMeter.Transport.ITransport

Constructors

NameDescription
LoopbackTransport(…) Initializes a new instance of LoopbackTransport.

LoopbackTransport(Pipe outbound, Pipe inbound, LoopbackTransport> logger)

LoopbackTransport.LoopbackTransport(Pipe outbound, Pipe inbound, ILogger<LoopbackTransport> logger)

Initializes a new instance of LoopbackTransport.

Parameters

NameTypeDescription
outboundPipePipe for data sent by this side (written to by SendAsync).
inboundPipePipe for data received by this side (read from by ReceiveAsync).
loggerILogger<SharpMeter.Transport.LoopbackTransport>The logger instance.

Properties

NameDescription
IsConnected Gets a value indicating whether the transport connection is open.

IsConnected

bool LoopbackTransport.IsConnected { get; set; }

Gets a value indicating whether the transport connection is open.

Methods

NameDescription
ConnectAsync(CancellationToken cancellationToken) Opens the transport connection.
CreatePair(LoopbackTransport> logger) static Creates a paired set of loopback transports where one side's sends become the other side's receives.
DisconnectAsync(CancellationToken cancellationToken) Closes the transport connection.
DisposeAsync()
ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken) Receives raw bytes from the transport.
SendAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken) Sends raw bytes over the transport.
SendControlAsync(byte controlByte, CancellationToken cancellationToken) Sends a single control byte (ACK, NAK, CAN).

ConnectAsync(CancellationToken cancellationToken)

ValueTask<Result<bool>> LoopbackTransport.ConnectAsync(CancellationToken cancellationToken = null)

Opens the transport connection.

Parameters

NameTypeDescription
cancellationTokenCancellationTokenCancellation token.

Returns: A result indicating success or transport error.

CreatePair(LoopbackTransport> logger)

(LoopbackTransport Client, LoopbackTransport Server) LoopbackTransport.CreatePair(ILogger<LoopbackTransport> logger)

Creates a paired set of loopback transports where one side's sends become the other side's receives.

Parameters

NameTypeDescription
loggerILogger<SharpMeter.Transport.LoopbackTransport>The logger instance.

Returns: A tuple of (client, server) transport instances.

DisconnectAsync(CancellationToken cancellationToken)

ValueTask LoopbackTransport.DisconnectAsync(CancellationToken cancellationToken = null)

Closes the transport connection.

Parameters

NameTypeDescription
cancellationTokenCancellationTokenCancellation token.

ReceiveAsync(Memory buffer, CancellationToken cancellationToken)

ValueTask<Result<int>> LoopbackTransport.ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken = null)

Receives raw bytes from the transport.

Parameters

NameTypeDescription
bufferMemory<byte>The buffer to receive into.
cancellationTokenCancellationTokenCancellation token.

Returns: A result containing the number of bytes received, or a transport error.

SendAsync(ReadOnlyMemory data, CancellationToken cancellationToken)

ValueTask<Result<bool>> LoopbackTransport.SendAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = null)

Sends raw bytes over the transport.

Parameters

NameTypeDescription
dataReadOnlyMemory<byte>The data to send.
cancellationTokenCancellationTokenCancellation token.

Returns: A result indicating success or transport error.

SendControlAsync(byte controlByte, CancellationToken cancellationToken)

ValueTask LoopbackTransport.SendControlAsync(byte controlByte, CancellationToken cancellationToken = null)

Sends a single control byte (ACK, NAK, CAN).

Parameters

NameTypeDescription
controlBytebyteThe control byte to send.
cancellationTokenCancellationTokenCancellation token.
Type Relationships
classDiagram
                    style LoopbackTransport fill:#f9f,stroke:#333,stroke-width:2px
                    LoopbackTransport ..|> ITransport : implements
                
View Source
/// <summary>
///     In-memory loopback transport using <see cref = "System.IO.Pipelines"/> for testing.
///     Data written via <see cref = "SendAsync"/> can be read back via <see cref = "ReceiveAsync"/>
///     on the paired transport instance.
/// </summary>
public sealed partial class LoopbackTransport : ITransport
{
#region IAsyncDisposable
    /// <inheritdoc/>
    public async ValueTask DisposeAsync()
    {
        await _outbound.Writer.CompleteAsync();
        await _inbound.Reader.CompleteAsync();
        await DisconnectAsync();
    }

#endregion
#region Log
    private static partial class Log
    {
        [LoggerMessage(Level = LogLevel.Debug, Message = "Loopback transport connected")]
        public static partial void Connected(ILogger logger);
        [LoggerMessage(Level = LogLevel.Debug, Message = "Loopback transport disconnected")]
        public static partial void Disconnected(ILogger logger);
        [LoggerMessage(Level = LogLevel.Debug, Message = "Loopback TX [{Length} bytes]: {Data}")]
        public static partial void DataSent(ILogger logger, int length, string data);
        [LoggerMessage(Level = LogLevel.Debug, Message = "Loopback RX [{Length} bytes]: {Data}")]
        public static partial void DataReceived(ILogger logger, int length, string data);
        [LoggerMessage(Level = LogLevel.Error, Message = "Loopback send failed")]
        public static partial void SendFailed(ILogger logger, Exception ex);
        [LoggerMessage(Level = LogLevel.Error, Message = "Loopback receive failed")]
        public static partial void ReceiveFailed(ILogger logger, Exception ex);
    }

#endregion
#region Fields
    private readonly Pipe _outbound;
    private readonly Pipe _inbound;
    private readonly ILogger<LoopbackTransport> _logger;
#endregion
#region Constructor
    /// <summary>
    ///     Initializes a new instance of <see cref = "LoopbackTransport"/>.
    /// </summary>
    /// <param name = "outbound">Pipe for data sent by this side (written to by SendAsync).</param>
    /// <param name = "inbound">Pipe for data received by this side (read from by ReceiveAsync).</param>
    /// <param name = "logger">The logger instance.</param>
    public LoopbackTransport(Pipe outbound, Pipe inbound, ILogger<LoopbackTransport> logger)
    {
        _outbound = outbound;
        _inbound = inbound;
        _logger = logger;
    }

    /// <summary>
    ///     Creates a paired set of loopback transports where one side's sends become the other side's receives.
    /// </summary>
    /// <param name = "logger">The logger instance.</param>
    /// <returns>A tuple of (client, server) transport instances.</returns>
    public static (LoopbackTransport Client, LoopbackTransport Server) CreatePair(ILogger<LoopbackTransport> logger)
    {
        var clientToServer = new Pipe();
        var serverToClient = new Pipe();
        var client = new LoopbackTransport(clientToServer, serverToClient, logger);
        var server = new LoopbackTransport(serverToClient, clientToServer, logger);
        return (client, server);
    }

#endregion
#region ITransport Members
    /// <inheritdoc/>
    public bool IsConnected { get; private set; }

    /// <inheritdoc/>
    public ValueTask<Result<bool>> ConnectAsync(CancellationToken cancellationToken = default)
    {
        IsConnected = true;
        Log.Connected(_logger);
        return ValueTask.FromResult<Result<bool>>(true);
    }

    /// <inheritdoc/>
    public ValueTask DisconnectAsync(CancellationToken cancellationToken = default)
    {
        IsConnected = false;
        Log.Disconnected(_logger);
        return ValueTask.CompletedTask;
    }

    /// <inheritdoc/>
    public async ValueTask<Result<bool>> SendAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
    {
        if (!IsConnected)
            return PsemError.Transport("Loopback transport is not connected");
        try
        {
            Memory<byte> result = _outbound.Writer.GetMemory(data.Length);
            data.CopyTo(result);
            _outbound.Writer.Advance(data.Length);
            await _outbound.Writer.FlushAsync(cancellationToken);
            if (_logger.IsEnabled(LogLevel.Debug))
            {
                var hex = Convert.ToHexString(data.Span);
                Log.DataSent(_logger, data.Length, hex);
            }

            return true;
        }
        catch (ObjectDisposedException ex)
        {
            Log.SendFailed(_logger, ex);
            return PsemError.Transport($"Loopback send failed: {ex.Message}");
        }
        catch (InvalidOperationException ex)
        {
            Log.SendFailed(_logger, ex);
            return PsemError.Transport($"Loopback send failed: {ex.Message}");
        }
    }

    /// <inheritdoc/>
    public async ValueTask<Result<int>> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
    {
        if (!IsConnected)
            return PsemError.Transport("Loopback transport is not connected");
        try
        {
            ReadResult readResult = await _inbound.Reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> sequence = readResult.Buffer;
            if (sequence.IsEmpty && readResult.IsCompleted)
                return 0;
            var bytesToCopy = (int)Math.Min(sequence.Length, buffer.Length);
            ReadOnlySequence<byte> sliced = sequence.Slice(0, bytesToCopy);
            var offset = 0;
            foreach (ReadOnlyMemory<byte> segment in sliced)
            {
                segment.Span.CopyTo(buffer.Span[offset..]);
                offset += segment.Length;
            }

            _inbound.Reader.AdvanceTo(sequence.GetPosition(bytesToCopy));
            if (_logger.IsEnabled(LogLevel.Debug))
            {
                var hex = Convert.ToHexString(buffer.Span[..bytesToCopy]);
                Log.DataReceived(_logger, bytesToCopy, hex);
            }

            return bytesToCopy;
        }
        catch (OperationCanceledException)
        {
            return PsemError.Timeout("loopback receive");
        }
        catch (ObjectDisposedException ex)
        {
            Log.ReceiveFailed(_logger, ex);
            return PsemError.Transport($"Loopback receive failed: {ex.Message}");
        }
        catch (InvalidOperationException ex)
        {
            Log.ReceiveFailed(_logger, ex);
            return PsemError.Transport($"Loopback receive failed: {ex.Message}");
        }
    }

    /// <inheritdoc/>
    public async ValueTask SendControlAsync(byte controlByte, CancellationToken cancellationToken = default)
    {
        var buf = new[]
        {
            controlByte
        };
        await SendAsync(buf, cancellationToken);
    }
#endregion
}
Was this page helpful?