Skip to content

Commit a8b28e7

Browse files
committed
client transport refactor; support enhanced reports;
1 parent fe8309b commit a8b28e7

11 files changed

Lines changed: 359 additions & 272 deletions

File tree

src/ThingSet.Client/ThingSetClient.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,8 @@ public IEnumerable<ThingSetNode> GetNodes(ThingSetNodeEnumerationOptions options
210210
}
211211
}
212212

213-
private void OnReportReceived(ReadOnlyMemory<byte> buffer)
213+
private void OnReportReceived(ulong? eui, CborReader reader)
214214
{
215-
CborReader reader = new CborReader(buffer);
216215
try
217216
{
218217
if (CborDeserialiser.Read(reader) is Dictionary<object, object> map)

src/ThingSet.Common.Transports.Can/CanClientTransport.cs

Lines changed: 78 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,23 @@
1111
using System.Threading.Tasks;
1212
using SocketCANSharp;
1313
using SocketCANSharp.Network;
14-
using ThingSet.Common.Protocols.Binary;
1514

1615
namespace ThingSet.Common.Transports.Can;
1716

1817
/// <summary>
1918
/// CAN transport for ThingSet clients. Supports request/response and
2019
/// asynchronous reports.
2120
/// </summary>
22-
public class CanClientTransport : CanTransportBase, IClientTransport
21+
public class CanClientTransport : ClientTransportBase<byte>, IClientTransport
2322
{
2423
private static readonly TimeSpan ExceptionBackoffInterval = TimeSpan.FromSeconds(5);
2524
private const int ExceptionThreshold = 12;
2625

2726
private delegate int CanFrameReader(out uint canId, out byte length, out byte[] data);
2827

28+
private readonly ThingSetCanInterface _canInterface;
29+
private bool _disposeInterface;
30+
2931
private readonly IsoTpCanSocket _requestResponseSocket;
3032
private readonly RawCanSocket _subscriptionSocket;
3133

@@ -34,23 +36,21 @@ public class CanClientTransport : CanTransportBase, IClientTransport
3436
private readonly byte _destinationBridge;
3537
private readonly byte _destinationNodeAddress;
3638

37-
private readonly Dictionary<byte, ReceiveBuffer> _buffersByNodeAddress = new Dictionary<byte, ReceiveBuffer>();
38-
39-
private readonly Thread _subscriptionThread;
40-
private bool _runSubscriptionThread = true;
41-
42-
private Action<ReadOnlyMemory<byte>>? _callback;
39+
private readonly CanReportParser _reportParser = new CanReportParser();
4340

4441
public CanClientTransport(ThingSetCanInterface canInterface, byte destinationNodeAddress, bool leaveOpen) : this(canInterface, CanID.LocalBridge, destinationNodeAddress, leaveOpen)
4542
{
4643
}
4744

48-
public CanClientTransport(ThingSetCanInterface canInterface, byte destinationBridge, byte destinationNodeAddress, bool leaveOpen) : base(canInterface, leaveOpen)
45+
public CanClientTransport(ThingSetCanInterface canInterface, byte destinationBridge, byte destinationNodeAddress, bool leaveOpen)
4946
{
47+
_canInterface = canInterface;
48+
_disposeInterface = !leaveOpen;
49+
5050
_destinationBridge = destinationBridge;
5151
_destinationNodeAddress = destinationNodeAddress;
5252

53-
_requestResponseSocket = CreateIsoTpCanSocket(canInterface.IsFdMode);
53+
_requestResponseSocket = IsoTpCanSocketFactory.CreateIsoTpCanSocket(canInterface.IsFdMode);
5454
_subscriptionSocket = new RawCanSocket
5555
{
5656
CanFilters = new[]
@@ -63,15 +63,11 @@ public CanClientTransport(ThingSetCanInterface canInterface, byte destinationBri
6363
};
6464

6565
_canFrameReader = canInterface.IsFdMode ? ReadCanFdFrame : ReadCanFrame;
66-
67-
_subscriptionThread = new Thread(RunSubscriptionThread)
68-
{
69-
IsBackground = true,
70-
Name = $"Subscription {_destinationNodeAddress:x}",
71-
};
7266
}
7367

74-
public ValueTask ConnectAsync()
68+
protected override string Address => $"{_destinationNodeAddress:x}";
69+
70+
public override ValueTask ConnectAsync()
7571
{
7672
_requestResponseSocket.Bind(_canInterface.Interface,
7773
txId: CanID.CreateCanID(MessageType.RequestResponse, MessagePriority.Channel, _destinationBridge, _canInterface.NodeAddress, _destinationNodeAddress),
@@ -82,29 +78,20 @@ public ValueTask ConnectAsync()
8278
return ValueTask.CompletedTask;
8379
}
8480

85-
public ValueTask SubscribeAsync(Action<ReadOnlyMemory<byte>> callback)
81+
protected override ValueTask SubscribeAsync()
8682
{
87-
if (_callback is not null)
88-
{
89-
throw new InvalidOperationException("There is already a subscription established.");
90-
}
91-
92-
_callback = callback;
93-
9483
Console.WriteLine("Binding subscription socket");
9584
_subscriptionSocket.Bind(_canInterface.Interface);
96-
_subscriptionThread.Start();
97-
9885
return ValueTask.CompletedTask;
9986
}
10087

101-
public bool Write(byte[] buffer, int length)
88+
public override bool Write(byte[] buffer, int length)
10289
{
10390
int written = LibcNativeMethods.Write(_requestResponseSocket.SafeHandle, buffer, length);
10491
return written == length;
10592
}
10693

107-
public int Read(byte[] buffer)
94+
public override int Read(byte[] buffer)
10895
{
10996
try
11097
{
@@ -119,14 +106,12 @@ public int Read(byte[] buffer)
119106

120107
protected override void Dispose(bool disposing)
121108
{
122-
base.Dispose(disposing);
123-
_runSubscriptionThread = false;
124-
if (_subscriptionThread.IsAlive)
125-
{
126-
_subscriptionThread.Join(1000);
127-
}
128109
_subscriptionSocket.Dispose();
129110
_requestResponseSocket.Dispose();
111+
if (_disposeInterface)
112+
{
113+
_canInterface.Dispose();
114+
}
130115
}
131116

132117
private int ReadCanFrame(out uint canId, out byte length, out byte[] data)
@@ -147,116 +132,86 @@ private int ReadCanFdFrame(out uint canId, out byte length, out byte[] data)
147132
return read;
148133
}
149134

150-
private void RunSubscriptionThread()
135+
protected override ValueTask HandleIncomingPublicationsAsync()
151136
{
152-
while (_runSubscriptionThread)
137+
List<SocketCanException> exceptions = new List<SocketCanException>();
138+
try
153139
{
154-
List<SocketCanException> exceptions = new List<SocketCanException>();
155-
try
140+
int read = _canFrameReader(out uint canId, out byte length, out byte[] data);
141+
if (read > 0)
156142
{
157-
int read = _canFrameReader(out uint canId, out byte length, out byte[] data);
158-
if (read > 0)
143+
exceptions.Clear();
144+
switch (CanID.GetType(canId))
159145
{
160-
exceptions.Clear();
161-
switch (CanID.GetType(canId))
162-
{
163-
case MessageType.SingleFrameReport:
164-
NotifyItem(canId, data);
165-
break;
166-
case MessageType.MultiFrameReport:
167-
byte source = CanID.GetSource(canId);
168-
byte messageNumber = CanID.GetMessageNumber(canId);
169-
byte sequence = CanID.GetSequenceNumber(canId);
170-
if (!_buffersByNodeAddress.TryGetValue(source, out ReceiveBuffer? buffer))
171-
{
172-
_buffersByNodeAddress[source] = buffer = new ReceiveBuffer();
173-
}
174-
MultiFrameMessageType type = CanID.GetMultiFrameMessageType(canId);
175-
if (type == MultiFrameMessageType.Single || type == MultiFrameMessageType.First)
176-
{
177-
buffer.Started = true;
178-
buffer.MessageNumber = messageNumber;
179-
}
180-
else if (buffer.MessageNumber != messageNumber)
181-
{
182-
buffer.Reset();
183-
continue;
184-
}
185-
else if (!buffer.Started)
186-
{
187-
buffer.Reset();
188-
continue;
189-
}
190-
191-
if (sequence == (buffer.Sequence++ & 0xf))
192-
{
193-
ReadOnlySpan<byte> span = data;
194-
Span<byte> target = buffer.Buffer;
195-
span.CopyTo(target.Slice(buffer.Position));
196-
buffer.Position += length;
197-
if (type == MultiFrameMessageType.Single || type == MultiFrameMessageType.Last)
198-
{
199-
ReadOnlyMemory<byte> memory = buffer.Buffer;
200-
if (buffer.Buffer[0] == (byte)ThingSetRequest.Report)
201-
{
202-
NotifyReport(memory.Slice(1, buffer.Position - 1));
203-
}
204-
buffer.Reset();
205-
}
206-
}
207-
else
208-
{
209-
// invalid sequence; reset
210-
}
211-
break;
212-
}
146+
case MessageType.SingleFrameReport:
147+
NotifyItem(canId, data);
148+
break;
149+
case MessageType.MultiFrameReport:
150+
MultiFrameMessageType type = CanID.GetMultiFrameMessageType(canId);
151+
byte sequenceNumber = CanID.GetSequenceNumber(canId);
152+
byte messageNumber = CanID.GetMessageNumber(canId);
153+
byte source = CanID.GetSource(canId);
154+
ReceiveBuffer buffer = _buffersBySender.GetOrAdd(source, _ => new ReceiveBuffer());
155+
if (_reportParser.TryParse(sequenceNumber, messageNumber, type, buffer, data, out ulong? eui, out CborReader? reader))
156+
{
157+
NotifyReport(eui, reader);
158+
}
159+
break;
213160
}
214161
}
215-
catch (SocketCanException scex)
162+
}
163+
catch (SocketCanException scex)
164+
{
165+
exceptions.Add(scex);
166+
if (exceptions.Count > ExceptionThreshold)
216167
{
217-
exceptions.Add(scex);
218-
if (exceptions.Count > ExceptionThreshold)
219-
{
220-
throw new AggregateException($"Multiple errors occurred while reading from CAN interface {_canInterface.Interface.Name}.");
221-
}
222-
Thread.Sleep(ExceptionBackoffInterval);
168+
throw new AggregateException($"Multiple errors occurred while reading from CAN interface {_canInterface.Interface.Name}.", exceptions);
223169
}
170+
Thread.Sleep(ExceptionBackoffInterval);
224171
}
225-
}
226-
227-
private void NotifyReport(ReadOnlyMemory<byte> body)
228-
{
229-
CborReader reader = new CborReader(body, CborConformanceMode.Lax, allowMultipleRootLevelValues: true);
230-
reader.ReadUInt32();
231-
_callback?.Invoke(body.Slice(body.Length - reader.BytesRemaining));
172+
return ValueTask.CompletedTask;
232173
}
233174

234175
private void NotifyItem(uint canId, ReadOnlyMemory<byte> body)
235176
{
236177
byte[] buffer = new byte[body.Length + 4];
237178
buffer[0] = 0xA1; // map with 1 element
238-
buffer[1] = 0x19; // ushort
239-
Span<byte> span = buffer;
240-
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(2), CanID.GetDataID(canId));
179+
ushort id = CanID.GetDataID(canId);
180+
int headerLength;
181+
if (id <= 23)
182+
{
183+
buffer[1] = (byte)id;
184+
headerLength = 2;
185+
}
186+
else if (id < 256)
187+
{
188+
buffer[1] = 0x18;
189+
buffer[2] = (byte)id;
190+
headerLength = 3;
191+
}
192+
else
193+
{
194+
buffer[1] = 0x19; // ushort
195+
Span<byte> span = buffer;
196+
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(2), id);
197+
headerLength = 4;
198+
}
241199
ReadOnlyMemory<byte> source = body;
242200
Memory<byte> memory = buffer;
243-
source.CopyTo(memory.Slice(4));
244-
_callback?.Invoke(buffer);
201+
source.CopyTo(memory.Slice(headerLength));
202+
NotifyReport(null, new CborReader(memory));
245203
}
246204

247-
private class ReceiveBuffer
205+
private class CanReportParser : ReportParser<MultiFrameMessageType>
248206
{
249-
public byte[] Buffer = new byte[32768];
250-
public int Position;
251-
public byte Sequence;
252-
public byte MessageNumber;
253-
public bool Started;
207+
protected override bool IsFirst(MultiFrameMessageType type)
208+
{
209+
return type == MultiFrameMessageType.First || type == MultiFrameMessageType.Last;
210+
}
254211

255-
public void Reset()
212+
protected override bool IsLast(MultiFrameMessageType type)
256213
{
257-
Position = 0;
258-
Sequence = 0;
259-
Started = false;
214+
return type == MultiFrameMessageType.First || type == MultiFrameMessageType.Last;
260215
}
261216
}
262217
}

src/ThingSet.Common.Transports.Can/CanServerTransport.cs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313

1414
namespace ThingSet.Common.Transports.Can;
1515

16-
public class CanServerTransport : CanTransportBase, IServerTransport
16+
public class CanServerTransport : ServerTransportBase, IServerTransport
1717
{
1818
private readonly ConcurrentDictionary<byte, IsoTpCanSocket> _peerSocketsById = new();
1919
private readonly ConcurrentDictionary<byte, Thread> _peerSocketThreadsById = new();
2020

21+
private readonly ThingSetCanInterface _canInterface;
22+
2123
private readonly AddressClaimListener _addressClaimListener;
2224
private readonly RawCanSocket _publishSocket;
2325

@@ -30,8 +32,10 @@ public class CanServerTransport : CanTransportBase, IServerTransport
3032

3133
private Func<object, Memory<byte>, Memory<byte>>? _messageCallback;
3234

33-
public CanServerTransport(ThingSetCanInterface canInterface) : base(canInterface, leaveOpen: false)
35+
public CanServerTransport(ThingSetCanInterface canInterface)
3436
{
37+
_canInterface = canInterface;
38+
3539
_publishSocket = new RawCanSocket
3640
{
3741
EnableCanFdFrames = canInterface.IsFdMode,
@@ -41,9 +45,9 @@ public CanServerTransport(ThingSetCanInterface canInterface) : base(canInterface
4145
_addressClaimListener.AddressClaimed += OnAddressClaimed;
4246
}
4347

44-
public event EventHandler<ErrorEventArgs>? Error;
48+
public override event EventHandler<ErrorEventArgs>? Error;
4549

46-
public ValueTask ListenAsync(Func<object, Memory<byte>, Memory<byte>> callback)
50+
public override ValueTask ListenAsync(Func<object, Memory<byte>, Memory<byte>> callback)
4751
{
4852
_addressClaimListener.Listen();
4953

@@ -73,7 +77,7 @@ public void PublishControl(ushort id, byte[] buffer)
7377
}
7478
}
7579

76-
public void PublishReport(byte[] buffer)
80+
public override void PublishReport(byte[] buffer)
7781
{
7882
int pos = 0;
7983
byte sequenceNumber = 0;
@@ -135,7 +139,6 @@ private int WriteFdFrame(uint canId, byte[] buffer)
135139

136140
protected override void Dispose(bool disposing)
137141
{
138-
base.Dispose(disposing);
139142
_runPeerSocketHandlers = false;
140143
foreach (Thread thread in _peerSocketThreadsById.Values)
141144
{
@@ -145,6 +148,7 @@ protected override void Dispose(bool disposing)
145148
{
146149
socket.Dispose();
147150
}
151+
_canInterface.Dispose();
148152
}
149153

150154
private void RunPeerSocketHandler(object? state)
@@ -199,7 +203,7 @@ private Thread CreateAndStartPeerSocketThread(byte peerId, IsoTpCanSocket socket
199203

200204
private IsoTpCanSocket CreateAndBindIsoTpCanSocket(byte targetId)
201205
{
202-
IsoTpCanSocket socket = CreateIsoTpCanSocket(_canInterface.IsFdMode);
206+
IsoTpCanSocket socket = IsoTpCanSocketFactory.CreateIsoTpCanSocket(_canInterface.IsFdMode);
203207
socket.Bind(_canInterface.Interface,
204208
txId: CanID.CreateCanID(MessageType.RequestResponse, MessagePriority.Channel, _canInterface.NodeAddress, targetId),
205209
rxId: CanID.CreateCanID(MessageType.RequestResponse, MessagePriority.Channel, targetId, _canInterface.NodeAddress));

0 commit comments

Comments
 (0)