Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 12 additions & 41 deletions core/pva/src/main/java/org/epics/pva/client/FieldRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,74 +53,45 @@ class FieldRequest
*/
public FieldRequest(final String request)
{
this(0, false, request);
this(RecordOptions.DEFAULT, request);
}

/** Parse field request for "monitor" with optional pipeline
* @param pipeline Number of elements for 'pipeline' mode, 0 to disable
/** Parse plain field request for "get"
* @param pipeline pipeline flag
* @param request Examples:
* "", "field()",
* "value", "field(value)",
* "field(value, timeStamp.userTag)"
*/
public FieldRequest(final int pipeline, final String request)
public FieldRequest(final int pipeline, String request)
{
this(pipeline, false, request);
this(RecordOptions.builder().pipeline(pipeline).build(), request);
}

/** Parse field request for "put" with optional completion
* @param completion Perform a write that triggers processing and only returns on completion?
/** Parse plain field request for "get"
* @param completion Completion flag
* @param request Examples:
* "", "field()",
* "value", "field(value)",
* "field(value, timeStamp.userTag)"
*/
public FieldRequest(final boolean completion, final String request)
public FieldRequest(final boolean completion, String request)
{
this(0, completion, request);
this(RecordOptions.builder().completion(completion).build(), request);
}


/** Parse field request
* @param pipeline Number of elements for 'pipeline' mode, 0 to disable
* @param completion Perform a write that triggers processing and only returns on completion?
* @param recordOptions Options for record monitoring
* @param request Examples:
* "", "field()",
* "value", "field(value)",
* "field(value, timeStamp.userTag)"
*/
private FieldRequest(final int pipeline, final boolean completion, final String request)
public FieldRequest(final RecordOptions recordOptions, final String request)
{
if (pipeline > 0 && completion)
throw new IllegalStateException("Cannot use both 'pipeline' (for get) " +
"and 'completion' (for put) within same request");
final List<PVAData> items = new ArrayList<>();

if (pipeline > 0)
{
// record._options.pipeline=true
// 'pvmonitor' encodes as PVAString 'true', not PVABool
items.add(
new PVAStructure("record", "",
new PVAStructure("_options", "",
new PVABool("pipeline", true),
new PVAInt("queueSize", pipeline)
)));
}
else if (completion)
{
// Similar to Channel Access put-callback:
// Process passive record (could also use "true" to always process),
// then block until processing completes
// record._options.process="passive"
// record._options.block=true
items.add(
new PVAStructure("record", "",
new PVAStructure("_options", "",
new PVAString("process", "passive"),
new PVABool("block", true)
)));
}
final List<PVAData> items = new ArrayList<>(recordOptions.structureItems());

// XXX Not using any client type registry,
// but (re-)defining from 1 each time
Expand Down
32 changes: 24 additions & 8 deletions core/pva/src/main/java/org/epics/pva/client/MonitorRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ class MonitorRequest implements AutoCloseable, RequestEncoder, ResponseHandler

private final int request_id;

private final RecordOptions recordOptions;

/** Next request to send, cycling from INIT to START.
* Cancel() then sets it to DESTROY.
*/
private volatile byte state = PVAHeader.CMD_SUB_INIT;

private volatile PVAStructure data;

private final int pipeline;
private final AtomicInteger received_updates = new AtomicInteger();

/** @param channel Channel to 'monitor'
Expand All @@ -62,10 +63,21 @@ class MonitorRequest implements AutoCloseable, RequestEncoder, ResponseHandler
* @throws Exception on error
*/
public MonitorRequest(final PVAChannel channel, final String request, final int pipeline, final MonitorListener listener) throws Exception
{
this(channel, request, RecordOptions.builder().pipeline(pipeline).build(), listener);
}

/** @param channel Channel to 'monitor'
* @param request Request string to monitor only selected fields of PV
* @param recordOptions Number of updates that server should pipeline, 0 to disable
* @param listener Listener to invoke with received updates
* @throws Exception on error
*/
public MonitorRequest(final PVAChannel channel, final String request, final RecordOptions recordOptions, final MonitorListener listener) throws Exception
{
this.channel = channel;
this.request = request;
this.pipeline = pipeline;
this.recordOptions = recordOptions;
this.listener = listener;
this.request_id = channel.getClient().allocateRequestID();
channel.getTCP().submit(this, this);
Expand All @@ -77,6 +89,10 @@ public int getRequestID()
return request_id;
}

private int pipeline() {
return recordOptions.pipeline();
}

@Override
public void encodeRequest(final byte version, final ByteBuffer buffer) throws Exception
{
Expand All @@ -91,18 +107,18 @@ public void encodeRequest(final byte version, final ByteBuffer buffer) throws Ex
buffer.putInt(channel.getSID());
buffer.putInt(request_id);

if (pipeline > 0)
if (pipeline() > 0)
state = (byte) (PVAHeader.CMD_SUB_PIPELINE | PVAHeader.CMD_SUB_INIT);
buffer.put(state);

// For pipeline, add record._options.pipeline=true to request
final FieldRequest field_request = new FieldRequest(pipeline, request);
final FieldRequest field_request = new FieldRequest(recordOptions, request);
logger.log(Level.FINE, () -> "Monitor INIT request " + field_request);
field_request.encodeType(buffer);
field_request.encode(buffer);
// Encode pipeline 'nfree'
if (pipeline > 0)
buffer.putInt(pipeline);
if (pipeline() > 0)
buffer.putInt(pipeline());
buffer.putInt(size_offset, buffer.position() - payload_start);
}
else if (state == PVAHeader.CMD_SUB_PIPELINE)
Expand Down Expand Up @@ -200,8 +216,8 @@ else if (subcmd == PVAHeader.CMD_SUB_DESTROY)
// With a responsive client, this jumps nfree up to the original 'pipeline' count.
// With a slow client, for example stuck in listener.handleMonitor(),
// the server will stop after sending nfree updates.
if (pipeline > 0 &&
received_updates.incrementAndGet() >= pipeline/2)
if (pipeline() > 0 &&
received_updates.incrementAndGet() >= pipeline()/2)
{
state = PVAHeader.CMD_SUB_PIPELINE;
channel.getTCP().submit(this, this);
Expand Down
20 changes: 19 additions & 1 deletion core/pva/src/main/java/org/epics/pva/client/PVAChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -335,11 +335,29 @@ public AutoCloseable subscribe(final String request, final MonitorListener liste
* @throws Exception on error
*/
public AutoCloseable subscribe(final String request, final int pipeline, final MonitorListener listener) throws Exception
{
return subscribe(request, RecordOptions.builder().pipeline(pipeline).build(), listener);
}

/** Start a subscription with different record options
*
* <p>Asks the server to send a certain number of 'pipelined' updates.
* Client automatically requests more updates as soon as half the pipelined updates
* are received. In case the client gets overloaded and cannot do this,
* the server will thus pause after sending the pipelined updates.
*
* @param request Request, "" for all fields, or "field_a, field_b.subfield"
* @param recordOptions Number of updates to recordOptions
* @param listener Will be invoked with channel and latest value
* @return {@link AutoCloseable}, used to close the subscription
* @throws Exception on error
*/
public AutoCloseable subscribe(final String request, final RecordOptions recordOptions, final MonitorListener listener) throws Exception
{
// MonitorRequest submits itself to TCPHandler
// and registers as response handler,
// so we can later retrieve it via its requestID
final MonitorRequest subscription = new MonitorRequest(this, request, pipeline, listener);
final MonitorRequest subscription = new MonitorRequest(this, request, recordOptions, listener);
subscriptions.add(subscription);
return subscription;
}
Expand Down
108 changes: 108 additions & 0 deletions core/pva/src/main/java/org/epics/pva/client/RecordOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.epics.pva.client;

import org.epics.pva.data.PVABool;
import org.epics.pva.data.PVAData;
import org.epics.pva.data.PVAInt;
import org.epics.pva.data.PVAString;
import org.epics.pva.data.PVAStructure;

import java.util.ArrayList;
import java.util.List;

public class RecordOptions {
private static final String STRUCTURE_RECORD_NAME = "record";
private static final String STRUCTURE_OPTIONS_NAME = "_options";

public static final RecordOptions DEFAULT = new RecordOptions(false, 0, null);
private final boolean completion;
private final int pipeline;
private final DBEMask dbeMask;

/** DBE mask to use for monitoring
* */
public enum DBEMask {
DBE_NOTHING(0),
DBE_VALUE(1),
DBE_ARCHIVE(2),
DBE_ALARM(4);

private final int mask;
DBEMask(final int mask) { this.mask = mask; }
public int getMask() { return mask; }
}

private RecordOptions(boolean completion, int pipeline, DBEMask dbeMask) {
if (pipeline > 0 && completion)
throw new IllegalStateException("Cannot use both 'pipeline' (for get) " +
"and 'completion' (for put) within same request");

this.completion = completion;
this.pipeline = pipeline;
this.dbeMask = dbeMask;

}
public static Builder builder() { return new Builder(); }

public static class Builder {
private boolean completion;
private int pipeline;
private DBEMask dbeMask = null;
public Builder() {
}

Builder completion(boolean completion) {
this.completion = completion;
return this;
}
Builder pipeline(int pipeline) {
this.pipeline = pipeline;
return this;
}
Builder dbeMask(DBEMask dbeMask) {
this.dbeMask = dbeMask;
return this;
}
public RecordOptions build() {
return new RecordOptions(completion, pipeline, dbeMask);
}
}

public boolean completion() { return completion; }
public int pipeline() { return pipeline; }
public DBEMask dbeMask() { return dbeMask; }

public List<PVAData> structureItems() {
List<PVAData> items = new ArrayList<>();

if (pipeline > 0) {
// record._options.pipeline=true
// 'pvmonitor' encodes as PVAString 'true', not PVABool
items.add(
new PVAStructure(STRUCTURE_RECORD_NAME, "",
new PVAStructure(STRUCTURE_OPTIONS_NAME, "",
new PVABool("pipeline", true),
new PVAInt("queueSize", pipeline)
)));
} else if (completion) {
// Similar to Channel Access put-callback:
// Process passive record (could also use "true" to always process),
// then block until processing completes
// record._options.process="passive"
// record._options.block=true
items.add(
new PVAStructure(STRUCTURE_RECORD_NAME, "",
new PVAStructure(STRUCTURE_OPTIONS_NAME, "",
new PVAString("process", "passive"),
new PVABool("block", true)
)));
}
if (dbeMask != null) {
items.add(
new PVAStructure(STRUCTURE_RECORD_NAME, "",
new PVAStructure(STRUCTURE_OPTIONS_NAME, "",
new PVAInt("DBE", dbeMask.getMask())
)));
}
return items;
}
}
45 changes: 45 additions & 0 deletions core/pva/src/test/java/org/epics/pva/client/ClientDemo.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,51 @@ public void testMonitor() throws Exception
}
}

/**
* Test monitoring only the alarm updates.
* Requires using softIocPVX
*/
@Test
public void testMonitorAlarm() throws Exception
{
// Create a client (auto-close)
try (final PVAClient pva = new PVAClient())
{
// Handler for received values
final CountDownLatch done = new CountDownLatch(300000);
final MonitorListener handle_values = (channel, changes, overruns, data) ->
{
System.out.println(channel.getName() + " = " + data);
done.countDown();
};

// When channel (re-)connects, subscribe.
// When channel disconnects, subscription is automatically dropped.
final ClientChannelListener handle_state = (channel, state) ->
{
if (state == ClientChannelState.CONNECTED)
try
{
channel.subscribe("", RecordOptions.builder().dbeMask(RecordOptions.DBEMask.DBE_ALARM).build(), handle_values);
}
catch (Exception ex)
{
ex.printStackTrace();
}
else
System.out.println(channel.getName() + ": " + state);
};

// Create channel which then subscribes on connect
final PVAChannel ch1 = pva.getChannel("demo", handle_state);

done.await();

// Close channels
ch1.close();
}
}


@Test
public void testPipeline() throws Exception
Expand Down
18 changes: 18 additions & 0 deletions core/pva/src/test/resources/demo.db
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,24 @@ record(calc, "ramp$(N)")
field(DESC, "ramp going up")
}

record(calc, "demo$(N)") {
field(SCAN, "1 second")
field(INPA, "demo$(N).VAL")
field(CALC, "(A<100)?A+1:0")
field(HIHI, "90")
field(HHSV, "MAJOR")
field(HIGH, "60")
field(HSV, "MINOR")
field(LOW, "30")
field(LSV, "MINOR")
field(LOLO, "10")
field(LLSV, "MAJOR")
field(HOPR, "125")
field(ADEL, "0")
field(MDEL, "0")
}


record(calc, "saw$(N)")
{
field(SCAN, "1 second")
Expand Down