Skip to content

Commit 3409e9d

Browse files
committed
make report handling synchronous for now
1 parent be9fdf1 commit 3409e9d

3 files changed

Lines changed: 17 additions & 37 deletions

File tree

src/ThingSet.Common.Transports.Can/CanClientTransport.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ private int ReadCanFdFrame(out uint canId, out byte length, out byte[] data)
132132
return read;
133133
}
134134

135-
protected override ValueTask HandleIncomingPublicationsAsync()
135+
protected override void HandleIncomingPublications()
136136
{
137137
List<SocketCanException> exceptions = new List<SocketCanException>();
138138
try
@@ -166,7 +166,6 @@ protected override ValueTask HandleIncomingPublicationsAsync()
166166
}
167167
Thread.Sleep(ExceptionBackoffInterval);
168168
}
169-
return ValueTask.CompletedTask;
170169
}
171170

172171
private void NotifyControl(uint canId, ReadOnlyMemory<byte> body)

src/ThingSet.Common.Transports.Ip/IpClientTransport.cs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
*
44
* SPDX-License-Identifier: Apache-2.0
55
*/
6-
using System.Buffers;
7-
using System.Formats.Cbor;
86
using System.Net;
97
using System.Net.Sockets;
108
using System.Threading.Tasks;
@@ -68,16 +66,17 @@ protected override void Dispose(bool disposing)
6866
_udpClient.Dispose();
6967
}
7068

71-
protected async override ValueTask HandleIncomingPublicationsAsync()
69+
protected override void HandleIncomingPublications()
7270
{
7371
try
7472
{
75-
UdpReceiveResult data = await _udpClient.ReceiveAsync();
76-
MessageType messageType = (MessageType)(data.Buffer[0] & 0xF0);
77-
byte sequenceNumber = (byte)(data.Buffer[0] & 0x0F);
78-
byte messageNumber = data.Buffer[1];
79-
ReceiveBuffer receiveBuffer = GetOrCreateBuffer(data.RemoteEndPoint);
80-
_reportParser.TryParse(sequenceNumber, messageNumber, messageType, receiveBuffer, data.Buffer, NotifyReport);
73+
IPEndPoint? sender = null;
74+
byte[] data = _udpClient.Receive(ref sender);
75+
MessageType messageType = (MessageType)(data[0] & 0xF0);
76+
byte sequenceNumber = (byte)(data[0] & 0x0F);
77+
byte messageNumber = data[1];
78+
ReceiveBuffer receiveBuffer = GetOrCreateBuffer(sender);
79+
_reportParser.TryParse(sequenceNumber, messageNumber, messageType, receiveBuffer, data, NotifyReport);
8180
}
8281
catch (SocketException)
8382
{

src/ThingSet.Common/Transports/ClientTransportBase.cs

Lines changed: 8 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,10 @@
44
* SPDX-License-Identifier: Apache-2.0
55
*/
66
using System;
7-
using System.Buffers;
87
using System.Collections.Concurrent;
9-
using System.Diagnostics.CodeAnalysis;
108
using System.Formats.Cbor;
119
using System.Threading;
1210
using System.Threading.Tasks;
13-
using Microsoft.VisualBasic;
1411
using ThingSet.Common.Protocols.Binary;
1512

1613
namespace ThingSet.Common.Transports;
@@ -88,11 +85,11 @@ protected async void RunSubscriptionThread()
8885
{
8986
while (_runSubscriptionThread)
9087
{
91-
await HandleIncomingPublicationsAsync();
88+
HandleIncomingPublications();
9289
}
9390
}
9491

95-
protected abstract ValueTask HandleIncomingPublicationsAsync();
92+
protected abstract void HandleIncomingPublications();
9693

9794
protected ReceiveBuffer GetOrCreateBuffer(TEndpoint endpoint)
9895
{
@@ -140,23 +137,14 @@ public bool TryParse(byte sequenceNumber, byte messageNumber, TMessageType messa
140137
}
141138
if (IsLast(messageType))
142139
{
143-
byte[] copy = ArrayPool<byte>.Shared.Rent(buffer.Position);
144-
try
140+
ReadOnlyMemory<byte> memory = buffer.Buffer;
141+
CborReader reader = new CborReader(memory.Slice(1), CborConformanceMode.Lax, allowMultipleRootLevelValues: true);
142+
ulong? eui = null;
143+
if (buffer.Buffer[0] == (byte)ThingSetRequest.ReportEnhanced)
145144
{
146-
buffer.CopyTo(copy);
147-
ReadOnlyMemory<byte> memory = copy;
148-
CborReader reader = new CborReader(memory.Slice(1), CborConformanceMode.Lax, allowMultipleRootLevelValues: true);
149-
ulong? eui = null;
150-
if (buffer.Buffer[0] == (byte)ThingSetRequest.ReportEnhanced)
151-
{
152-
eui = reader.ReadUInt64();
153-
}
154-
notifier(eui, reader);
155-
}
156-
finally
157-
{
158-
ArrayPool<byte>.Shared.Return(copy);
145+
eui = reader.ReadUInt64();
159146
}
147+
notifier(eui, reader);
160148
buffer.Reset();
161149
return true;
162150
}
@@ -196,11 +184,5 @@ public void Append(Span<byte> source)
196184
source.CopyTo(destination.Slice(Position));
197185
Position += source.Length;
198186
}
199-
200-
public void CopyTo(Span<byte> target)
201-
{
202-
Span<byte> source = Buffer;
203-
source.Slice(0, Position).CopyTo(target);
204-
}
205187
}
206188
}

0 commit comments

Comments
 (0)