diff --git a/core/pva/src/main/java/org/epics/pva/client/FieldRequest.java b/core/pva/src/main/java/org/epics/pva/client/FieldRequest.java index 1de85548cb..1ab5b0c6c8 100644 --- a/core/pva/src/main/java/org/epics/pva/client/FieldRequest.java +++ b/core/pva/src/main/java/org/epics/pva/client/FieldRequest.java @@ -53,74 +53,45 @@ class FieldRequest */ public FieldRequest(final String request) { - this(0, false, request); + this(RecordOptions.DEFAULT, request); } - /** Parse field request for "monitor" with optional pipeline - * @param pipeline Number of elements for 'pipeline' mode, 0 to disable + /** Parse plain field request for "get" + * @param pipeline pipeline flag * @param request Examples: * "", "field()", * "value", "field(value)", * "field(value, timeStamp.userTag)" */ - public FieldRequest(final int pipeline, final String request) + public FieldRequest(final int pipeline, String request) { - this(pipeline, false, request); + this(RecordOptions.builder().pipeline(pipeline).build(), request); } - /** Parse field request for "put" with optional completion - * @param completion Perform a write that triggers processing and only returns on completion? + /** Parse plain field request for "get" + * @param completion Completion flag * @param request Examples: * "", "field()", * "value", "field(value)", * "field(value, timeStamp.userTag)" */ - public FieldRequest(final boolean completion, final String request) + public FieldRequest(final boolean completion, String request) { - this(0, completion, request); + this(RecordOptions.builder().completion(completion).build(), request); } /** Parse field request - * @param pipeline Number of elements for 'pipeline' mode, 0 to disable - * @param completion Perform a write that triggers processing and only returns on completion? + * @param recordOptions Options for record monitoring * @param request Examples: * "", "field()", * "value", "field(value)", * "field(value, timeStamp.userTag)" */ - private FieldRequest(final int pipeline, final boolean completion, final String request) + public FieldRequest(final RecordOptions recordOptions, final String request) { - if (pipeline > 0 && completion) - throw new IllegalStateException("Cannot use both 'pipeline' (for get) " + - "and 'completion' (for put) within same request"); - final List items = new ArrayList<>(); - if (pipeline > 0) - { - // record._options.pipeline=true - // 'pvmonitor' encodes as PVAString 'true', not PVABool - items.add( - new PVAStructure("record", "", - new PVAStructure("_options", "", - new PVABool("pipeline", true), - new PVAInt("queueSize", pipeline) - ))); - } - else if (completion) - { - // Similar to Channel Access put-callback: - // Process passive record (could also use "true" to always process), - // then block until processing completes - // record._options.process="passive" - // record._options.block=true - items.add( - new PVAStructure("record", "", - new PVAStructure("_options", "", - new PVAString("process", "passive"), - new PVABool("block", true) - ))); - } + final List items = new ArrayList<>(recordOptions.structureItems()); // XXX Not using any client type registry, // but (re-)defining from 1 each time diff --git a/core/pva/src/main/java/org/epics/pva/client/MonitorRequest.java b/core/pva/src/main/java/org/epics/pva/client/MonitorRequest.java index 660706040c..d55291d70d 100644 --- a/core/pva/src/main/java/org/epics/pva/client/MonitorRequest.java +++ b/core/pva/src/main/java/org/epics/pva/client/MonitorRequest.java @@ -45,6 +45,8 @@ class MonitorRequest implements AutoCloseable, RequestEncoder, ResponseHandler private final int request_id; + private final RecordOptions recordOptions; + /** Next request to send, cycling from INIT to START. * Cancel() then sets it to DESTROY. */ @@ -52,7 +54,6 @@ class MonitorRequest implements AutoCloseable, RequestEncoder, ResponseHandler private volatile PVAStructure data; - private final int pipeline; private final AtomicInteger received_updates = new AtomicInteger(); /** @param channel Channel to 'monitor' @@ -62,10 +63,21 @@ class MonitorRequest implements AutoCloseable, RequestEncoder, ResponseHandler * @throws Exception on error */ public MonitorRequest(final PVAChannel channel, final String request, final int pipeline, final MonitorListener listener) throws Exception + { + this(channel, request, RecordOptions.builder().pipeline(pipeline).build(), listener); + } + + /** @param channel Channel to 'monitor' + * @param request Request string to monitor only selected fields of PV + * @param recordOptions Number of updates that server should pipeline, 0 to disable + * @param listener Listener to invoke with received updates + * @throws Exception on error + */ + public MonitorRequest(final PVAChannel channel, final String request, final RecordOptions recordOptions, final MonitorListener listener) throws Exception { this.channel = channel; this.request = request; - this.pipeline = pipeline; + this.recordOptions = recordOptions; this.listener = listener; this.request_id = channel.getClient().allocateRequestID(); channel.getTCP().submit(this, this); @@ -77,6 +89,10 @@ public int getRequestID() return request_id; } + private int pipeline() { + return recordOptions.pipeline(); + } + @Override public void encodeRequest(final byte version, final ByteBuffer buffer) throws Exception { @@ -91,18 +107,18 @@ public void encodeRequest(final byte version, final ByteBuffer buffer) throws Ex buffer.putInt(channel.getSID()); buffer.putInt(request_id); - if (pipeline > 0) + if (pipeline() > 0) state = (byte) (PVAHeader.CMD_SUB_PIPELINE | PVAHeader.CMD_SUB_INIT); buffer.put(state); // For pipeline, add record._options.pipeline=true to request - final FieldRequest field_request = new FieldRequest(pipeline, request); + final FieldRequest field_request = new FieldRequest(recordOptions, request); logger.log(Level.FINE, () -> "Monitor INIT request " + field_request); field_request.encodeType(buffer); field_request.encode(buffer); // Encode pipeline 'nfree' - if (pipeline > 0) - buffer.putInt(pipeline); + if (pipeline() > 0) + buffer.putInt(pipeline()); buffer.putInt(size_offset, buffer.position() - payload_start); } else if (state == PVAHeader.CMD_SUB_PIPELINE) @@ -200,8 +216,8 @@ else if (subcmd == PVAHeader.CMD_SUB_DESTROY) // With a responsive client, this jumps nfree up to the original 'pipeline' count. // With a slow client, for example stuck in listener.handleMonitor(), // the server will stop after sending nfree updates. - if (pipeline > 0 && - received_updates.incrementAndGet() >= pipeline/2) + if (pipeline() > 0 && + received_updates.incrementAndGet() >= pipeline()/2) { state = PVAHeader.CMD_SUB_PIPELINE; channel.getTCP().submit(this, this); diff --git a/core/pva/src/main/java/org/epics/pva/client/PVAChannel.java b/core/pva/src/main/java/org/epics/pva/client/PVAChannel.java index e79bae9ca0..ced2f37e09 100644 --- a/core/pva/src/main/java/org/epics/pva/client/PVAChannel.java +++ b/core/pva/src/main/java/org/epics/pva/client/PVAChannel.java @@ -335,11 +335,29 @@ public AutoCloseable subscribe(final String request, final MonitorListener liste * @throws Exception on error */ public AutoCloseable subscribe(final String request, final int pipeline, final MonitorListener listener) throws Exception + { + return subscribe(request, RecordOptions.builder().pipeline(pipeline).build(), listener); + } + + /** Start a subscription with different record options + * + *

Asks the server to send a certain number of 'pipelined' updates. + * Client automatically requests more updates as soon as half the pipelined updates + * are received. In case the client gets overloaded and cannot do this, + * the server will thus pause after sending the pipelined updates. + * + * @param request Request, "" for all fields, or "field_a, field_b.subfield" + * @param recordOptions Number of updates to recordOptions + * @param listener Will be invoked with channel and latest value + * @return {@link AutoCloseable}, used to close the subscription + * @throws Exception on error + */ + public AutoCloseable subscribe(final String request, final RecordOptions recordOptions, final MonitorListener listener) throws Exception { // MonitorRequest submits itself to TCPHandler // and registers as response handler, // so we can later retrieve it via its requestID - final MonitorRequest subscription = new MonitorRequest(this, request, pipeline, listener); + final MonitorRequest subscription = new MonitorRequest(this, request, recordOptions, listener); subscriptions.add(subscription); return subscription; } diff --git a/core/pva/src/main/java/org/epics/pva/client/RecordOptions.java b/core/pva/src/main/java/org/epics/pva/client/RecordOptions.java new file mode 100644 index 0000000000..e8c94f839f --- /dev/null +++ b/core/pva/src/main/java/org/epics/pva/client/RecordOptions.java @@ -0,0 +1,108 @@ +package org.epics.pva.client; + +import org.epics.pva.data.PVABool; +import org.epics.pva.data.PVAData; +import org.epics.pva.data.PVAInt; +import org.epics.pva.data.PVAString; +import org.epics.pva.data.PVAStructure; + +import java.util.ArrayList; +import java.util.List; + +public class RecordOptions { + private static final String STRUCTURE_RECORD_NAME = "record"; + private static final String STRUCTURE_OPTIONS_NAME = "_options"; + + public static final RecordOptions DEFAULT = new RecordOptions(false, 0, null); + private final boolean completion; + private final int pipeline; + private final DBEMask dbeMask; + + /** DBE mask to use for monitoring + * */ + public enum DBEMask { + DBE_NOTHING(0), + DBE_VALUE(1), + DBE_ARCHIVE(2), + DBE_ALARM(4); + + private final int mask; + DBEMask(final int mask) { this.mask = mask; } + public int getMask() { return mask; } + } + + private RecordOptions(boolean completion, int pipeline, DBEMask dbeMask) { + if (pipeline > 0 && completion) + throw new IllegalStateException("Cannot use both 'pipeline' (for get) " + + "and 'completion' (for put) within same request"); + + this.completion = completion; + this.pipeline = pipeline; + this.dbeMask = dbeMask; + + } + public static Builder builder() { return new Builder(); } + + public static class Builder { + private boolean completion; + private int pipeline; + private DBEMask dbeMask = null; + public Builder() { + } + + Builder completion(boolean completion) { + this.completion = completion; + return this; + } + Builder pipeline(int pipeline) { + this.pipeline = pipeline; + return this; + } + Builder dbeMask(DBEMask dbeMask) { + this.dbeMask = dbeMask; + return this; + } + public RecordOptions build() { + return new RecordOptions(completion, pipeline, dbeMask); + } + } + + public boolean completion() { return completion; } + public int pipeline() { return pipeline; } + public DBEMask dbeMask() { return dbeMask; } + + public List structureItems() { + List items = new ArrayList<>(); + + if (pipeline > 0) { + // record._options.pipeline=true + // 'pvmonitor' encodes as PVAString 'true', not PVABool + items.add( + new PVAStructure(STRUCTURE_RECORD_NAME, "", + new PVAStructure(STRUCTURE_OPTIONS_NAME, "", + new PVABool("pipeline", true), + new PVAInt("queueSize", pipeline) + ))); + } else if (completion) { + // Similar to Channel Access put-callback: + // Process passive record (could also use "true" to always process), + // then block until processing completes + // record._options.process="passive" + // record._options.block=true + items.add( + new PVAStructure(STRUCTURE_RECORD_NAME, "", + new PVAStructure(STRUCTURE_OPTIONS_NAME, "", + new PVAString("process", "passive"), + new PVABool("block", true) + ))); + } + if (dbeMask != null) { + items.add( + new PVAStructure(STRUCTURE_RECORD_NAME, "", + new PVAStructure(STRUCTURE_OPTIONS_NAME, "", + new PVAInt("DBE", dbeMask.getMask()) + ))); + } + return items; + } +} diff --git a/core/pva/src/test/java/org/epics/pva/client/ClientDemo.java b/core/pva/src/test/java/org/epics/pva/client/ClientDemo.java index c4e3e5ac0d..693ad17dcc 100644 --- a/core/pva/src/test/java/org/epics/pva/client/ClientDemo.java +++ b/core/pva/src/test/java/org/epics/pva/client/ClientDemo.java @@ -162,6 +162,51 @@ public void testMonitor() throws Exception } } + /** + * Test monitoring only the alarm updates. + * Requires using softIocPVX + */ + @Test + public void testMonitorAlarm() throws Exception + { + // Create a client (auto-close) + try (final PVAClient pva = new PVAClient()) + { + // Handler for received values + final CountDownLatch done = new CountDownLatch(300000); + final MonitorListener handle_values = (channel, changes, overruns, data) -> + { + System.out.println(channel.getName() + " = " + data); + done.countDown(); + }; + + // When channel (re-)connects, subscribe. + // When channel disconnects, subscription is automatically dropped. + final ClientChannelListener handle_state = (channel, state) -> + { + if (state == ClientChannelState.CONNECTED) + try + { + channel.subscribe("", RecordOptions.builder().dbeMask(RecordOptions.DBEMask.DBE_ALARM).build(), handle_values); + } + catch (Exception ex) + { + ex.printStackTrace(); + } + else + System.out.println(channel.getName() + ": " + state); + }; + + // Create channel which then subscribes on connect + final PVAChannel ch1 = pva.getChannel("demo", handle_state); + + done.await(); + + // Close channels + ch1.close(); + } + } + @Test public void testPipeline() throws Exception diff --git a/core/pva/src/test/resources/demo.db b/core/pva/src/test/resources/demo.db index cc573117ee..d28a4a1426 100644 --- a/core/pva/src/test/resources/demo.db +++ b/core/pva/src/test/resources/demo.db @@ -14,6 +14,24 @@ record(calc, "ramp$(N)") field(DESC, "ramp going up") } +record(calc, "demo$(N)") { +field(SCAN, "1 second") +field(INPA, "demo$(N).VAL") +field(CALC, "(A<100)?A+1:0") +field(HIHI, "90") +field(HHSV, "MAJOR") +field(HIGH, "60") +field(HSV, "MINOR") +field(LOW, "30") +field(LSV, "MINOR") +field(LOLO, "10") +field(LLSV, "MAJOR") +field(HOPR, "125") +field(ADEL, "0") +field(MDEL, "0") +} + + record(calc, "saw$(N)") { field(SCAN, "1 second")