public final class AepBusConnection extends UtlListElement implements IEventHandler
The AEP bus connection class. An instance of this class is created for
each bus configured for an AEP application. This class is responsible for
- Opening the bus connection including retrying the open in case of open
failure and scheduling reopens when the connection fails.
- Dispatching events to the application indicating when channels change
state from offline to online and vice versa
- Handling and routing of events from the binding to the application.
| Modifier and Type | Class and Description |
|---|---|
static interface |
AepBusConnection.MessageTracer
Message trace callback
|
static class |
AepBusConnection.State
Enumerates the different states the bus handler can be in
|
| Modifier and Type | Field and Description |
|---|---|
static String |
PROPNAME_BINDING_OPEN_RETRY_INTERVAL
Configures the interval to wait between retries to establish the messaging
connection (i.e. open the binding) on binding failure
|
static int |
PROPNAME_BINDING_OPEN_RETRY_INTERVAL_DEFAULT
The default value for the
PROPNAME_BINDING_OPEN_RETRY_INTERVAL property |
static String |
PROPNAME_DETACHED
Configures whether the commits should be performed in a detached thread
|
static boolean |
PROPNAME_DETACHED_DEFAULT
The default value for the
PROPNAME_DETACHED property |
static String |
PROPNAME_DETACHED_QUEUE_DEPTH
Configures the depth of the queue used for detached commits
|
static int |
PROPNAME_DETACHED_QUEUE_DEPTH_DEFAULT
The default value for the
PROPNAME_DETACHED_QUEUE_DEPTH property |
static String |
PROPNAME_DETACHED_QUEUE_DRAINER_AFFINITY_MASK
Configures the affinity mask of the drainer of the queue used for detached commits
|
static String |
PROPNAME_DETACHED_QUEUE_DRAINER_AFFINITY_MASK_DEFAULT
The default value for the
PROPNAME_DETACHED_QUEUE_DRAINER_AFFINITY_MASK property |
static String |
PROPNAME_DETACHED_QUEUE_OFFER_STRATEGY
Configures the offer strategy of the queue used for detached commits
|
static String |
PROPNAME_DETACHED_QUEUE_OFFER_STRATEGY_DEFAULT
The default value for the
PROPNAME_DETACHED_QUEUE_OFFER_STRATEGY property |
static String |
PROPNAME_DETACHED_QUEUE_WAIT_STRATEGY
Configures the wait strategy of the queue used for detached commits
|
static String |
PROPNAME_DETACHED_QUEUE_WAIT_STRATEGY_DEFAULT
The default value for the
PROPNAME_DETACHED_QUEUE_WAIT_STRATEGY property
A default value of null means that the default is resolved by UtlDisruptor.getWaitStrategy(java.lang.String, boolean) |
static String |
PROPNAME_ENABLE_OUTBOUND_LOG
Configures whether outbound logging is enabled
|
static boolean |
PROPNAME_ENABLE_OUTBOUND_LOG_DEFAULT
The default value for the
PROPNAME_ENABLE_OUTBOUND_LOG property |
static String |
PROPNAME_OUTBOUND_LOG_AUTO_REPAIR
Configures whether outbound log should be auto-repaired, if corrupt, on startup
|
static String |
PROPNAME_OUTBOUND_LOG_DIRECTORY
Configures the directory in which to create the outbound log
|
static String |
PROPNAME_OUTBOUND_LOG_FLUSH_ON_COMMIT
Configures whether outbound logging should flush on commit
|
static String |
PROPNAME_OUTBOUND_LOG_FLUSH_USING_MAPPED_MEMORY
Configures whether outbound logging should flush using memory mapped IO
|
static String |
PROPNAME_OUTBOUND_LOG_INITIAL_LOG_LENGTH
Configures the initial length of the outbound log
|
static String |
PROPNAME_OUTBOUND_LOG_NAME
Configures the name of the outbound log
|
static String |
PROPNAME_OUTBOUND_LOG_PAGE_SIZE
Configures the page size of the outbound log
|
static String |
PROPNAME_OUTBOUND_LOG_WRITE_BUFFER_SIZE
Configures the outbound log write buffer size (auto flush size)
|
static String |
PROPNAME_OUTBOUND_LOG_ZERO_OUT_INITIAL
Configures whether to zero out the outbound log on creation
|
static String |
PROPNAME_PROVIDER_DESCRIPTOR
Configures the messaging bus provider descriptor string
|
static String |
PROPNAME_RECONNECT_ON_FAIL
Configures whether the bus connection will automatically attempt to restablish
the bus binding when it fails.
|
static boolean |
PROPNAME_RECONNECT_ON_FAIL_DEFAULT
The default value for the
PROPNAME_RECONNECT_ON_FAIL property |
static String |
PROPNAME_SEND_EXCEPTION_HANDLING_POLICY
Configures the behaviour of the bus connection when it encounters an exception
when sending an outbound message (i.e. committing a message batch).
|
static String |
PROPNAME_SEND_EXCEPTION_HANDLING_POLICY_DEFAULT
The default value for the
PROPNAME_SEND_EXCEPTION_HANDLING_POLICY property |
static String |
PROPNAME_USER_NAME
Configures the user name to be used with the messaging connection (bus binding).
|
count, head, next, prev| Modifier and Type | Method and Description |
|---|---|
void |
close(Exception cause,
boolean preserveChannelJoins)
Close a bus connection.
|
void |
commit(com.neeve.aep.AepBusConnection.OutboundQueue.Slice slice,
AepSendCommitCompletionEvent event,
long sendTs)
Complete and commit a message batch
|
void |
commit(AepSendCommitCompletionEvent event,
long sendTs)
Complete and commit current message batch.
|
com.neeve.aep.AepBusConnection.OutboundQueue.Slice |
complete(com.neeve.aep.AepBusConnection.OutboundQueue.Slice slice)
Close current message batch but do not commit.
|
static AepBusConnection |
create(String appName,
int id,
MessageBusDescriptor busDescriptor,
Map<String,AepEngineDescriptor.ChannelConfig> channelsConfig,
IEventHandler eventHandler,
MessageBusBinding.SequenceNumberGenerator snoGenerator,
Properties props,
AepBusConnection.MessageTracer msgTracer,
String tracePrefix)
Create a bus connection
|
static AepBusConnection |
create(String appName,
MessageBusDescriptor busDescriptor,
Map<String,AepEngineDescriptor.ChannelConfig> channelsConfig,
IEventHandler eventHandler,
Properties props)
Create a bus connection
This method invokes
create(appName, 0, busDescriptor, channelsConfig, eventHandler, null, props, null, null)
|
void |
enque(MessageChannel channel,
IRogMessage message,
int flags)
Enque a message in a bus connection's outbound queue.
|
String |
getAppName()
Get the name of the app using the bus connection
The app name is the app name supplied to the
create(java.lang.String, int, com.neeve.sma.MessageBusDescriptor, java.util.Map<java.lang.String, com.neeve.aep.AepEngineDescriptor.ChannelConfig>, com.neeve.event.IEventHandler, com.neeve.sma.MessageBusBinding.SequenceNumberGenerator, java.util.Properties, com.neeve.aep.AepBusConnection.MessageTracer, java.lang.String) method
|
MessageBusBinding |
getBusBinding()
Get the message bus binding managed by an AEP bus connection.
|
MessageBusDescriptor |
getBusDescriptor()
Get the descriptor of the message bus binding managed by a bus connection.
|
MessageChannel |
getChannel(String channelName)
Get the message channel managed by an AEP bus connection.
|
int |
getDisruptorCapacity()
Get a bus connection's disruptor capacity
This method returns the capacity of the disruptor used by a
bus connection operating in detached mode.
|
String |
getDisruptorClaimStrategy()
Get the claim strategy of a bus connection's disruptor
This method returns the claim strategy of the disruptor used
by a bus connection operating in detached mode.
|
int |
getDisruptorRemaining()
Get number of slots remaining in a bus connection's disruptor
This method returns the number of free slots remaining in the
disruptor used by a bus connection operating in detached mode.
|
String |
getDisruptorWaitStrategy()
Get the wait strategy of a bus connection's disruptor
This method returns the wait strategy of the disruptor used
by a bus connection operating in detached mode.
|
int |
getId()
Get a bus connection's id
|
String |
getName()
Get a bus connection's fully qualified name.
|
AepBusConnection.State |
getState()
Get a bus connection's state.
|
AepBusConnectionStats |
getStats()
Get a bus connection's statistics object.
|
String |
getUserName()
Get the user name that the bus connection uses when opening the underlying bus binding
The user name is the app name unless overriden by a
PROPNAME_USER_NAME configuration
property
|
boolean |
isAcksRequireFlush()
Get whether the binding managed by a bus connection requires flush on acks
|
boolean |
isDetachedCommit()
Get whhether a bus connection is operating in detached mode
|
boolean |
isInternal()
Get whether a bus connection is internal to AEP
An
AepEngine creates internal bus bindings for control information. |
boolean |
isReconnectOnFail()
Get whether a bus connection has been configured for reconnection on failure
|
void |
onEvent(Event event)
Implementation of
IEventHandler.onEvent(com.neeve.event.Event)
Note: This method is for internal use by the bus connection. |
void |
open()
Open and start a bus connection.
|
EAepException |
openNoStart()
Open but do not start a bus connection
This method opens a bus connection.
|
void |
resetSavepoint()
Reset the current savepoint.
|
void |
rollback()
Rollback current message batch.
|
void |
rollback(int savepoint)
Rollback current message batch to the given savepoint.
|
void |
send(MessageChannel channel,
IRogMessage message,
int flags)
Send a message
|
void |
setSavepoint(int savepoint)
Set a savepoint.
|
void |
start()
Start a bus connection.
|
String |
toString()
Get a string representation of a bus connection
|
count, insertAfter, insertBefore, isLinked, next, previous, unlink, wipepublic static final String PROPNAME_USER_NAME
public static final String PROPNAME_PROVIDER_DESCRIPTOR
public static final String PROPNAME_RECONNECT_ON_FAIL
public static final boolean PROPNAME_RECONNECT_ON_FAIL_DEFAULT
PROPNAME_RECONNECT_ON_FAIL propertypublic static final String PROPNAME_SEND_EXCEPTION_HANDLING_POLICY
AepEngine#MessageSendExceptionHandlingPolicy for permissible values
for this property.public static final String PROPNAME_SEND_EXCEPTION_HANDLING_POLICY_DEFAULT
PROPNAME_SEND_EXCEPTION_HANDLING_POLICY propertypublic static final String PROPNAME_BINDING_OPEN_RETRY_INTERVAL
public static final int PROPNAME_BINDING_OPEN_RETRY_INTERVAL_DEFAULT
PROPNAME_BINDING_OPEN_RETRY_INTERVAL propertypublic static final String PROPNAME_DETACHED
public static final boolean PROPNAME_DETACHED_DEFAULT
PROPNAME_DETACHED propertypublic static final String PROPNAME_DETACHED_QUEUE_DEPTH
public static final int PROPNAME_DETACHED_QUEUE_DEPTH_DEFAULT
PROPNAME_DETACHED_QUEUE_DEPTH propertypublic static final String PROPNAME_DETACHED_QUEUE_OFFER_STRATEGY
public static final String PROPNAME_DETACHED_QUEUE_OFFER_STRATEGY_DEFAULT
PROPNAME_DETACHED_QUEUE_OFFER_STRATEGY propertypublic static final String PROPNAME_DETACHED_QUEUE_WAIT_STRATEGY
public static final String PROPNAME_DETACHED_QUEUE_WAIT_STRATEGY_DEFAULT
PROPNAME_DETACHED_QUEUE_WAIT_STRATEGY property
A default value of null means that the default is resolved by UtlDisruptor.getWaitStrategy(java.lang.String, boolean)
public static final String PROPNAME_DETACHED_QUEUE_DRAINER_AFFINITY_MASK
public static final String PROPNAME_DETACHED_QUEUE_DRAINER_AFFINITY_MASK_DEFAULT
PROPNAME_DETACHED_QUEUE_DRAINER_AFFINITY_MASK propertypublic static final String PROPNAME_ENABLE_OUTBOUND_LOG
public static final boolean PROPNAME_ENABLE_OUTBOUND_LOG_DEFAULT
PROPNAME_ENABLE_OUTBOUND_LOG propertypublic static final String PROPNAME_OUTBOUND_LOG_DIRECTORY
public static final String PROPNAME_OUTBOUND_LOG_NAME
public static final String PROPNAME_OUTBOUND_LOG_FLUSH_ON_COMMIT
public static final String PROPNAME_OUTBOUND_LOG_FLUSH_USING_MAPPED_MEMORY
public static final String PROPNAME_OUTBOUND_LOG_WRITE_BUFFER_SIZE
public static final String PROPNAME_OUTBOUND_LOG_AUTO_REPAIR
public static final String PROPNAME_OUTBOUND_LOG_INITIAL_LOG_LENGTH
public static final String PROPNAME_OUTBOUND_LOG_ZERO_OUT_INITIAL
public static final String PROPNAME_OUTBOUND_LOG_PAGE_SIZE
public static final AepBusConnection create(String appName, int id, MessageBusDescriptor busDescriptor, Map<String,AepEngineDescriptor.ChannelConfig> channelsConfig, IEventHandler eventHandler, MessageBusBinding.SequenceNumberGenerator snoGenerator, Properties props, AepBusConnection.MessageTracer msgTracer, String tracePrefix) throws Exception
appName - The name of the app instantiating the bus connection. The
bus connection uses this name in conjunction with the bus descriptor name
to create a unique name for the app (across all its bus connections)
and as the default for the user name to use with the underlying SMA
bus binding. This parameter cannot be null.id - A user assigned unique id of the bus connection. The bus connection
does not use this value. It is for sole use by the user for situations
in which the caller is using multiple bus connections and needs a numeric
mechanism to uniquely identify each bus connection.busDescriptor - The descriptor descrbing the bus connection to
establish. This parameter cannot be null.channelConfig - A map describing the app configuration for each
of the channels in the bus that the app is interested in. This parameter
cannot be null.eventHandler - The handler of events dispatched by the bus connection.
This parameter cannot be null.snoGenerator - A sequence number generator. This parameter can be
null in which case a non-persistent sequence number generator is assigned
by the bus connection.props - A property table with properties used to configure the bus
connection. This parameter can be null.msgTracer - A message tracer to use to trace messages. This parameter
can be null.tracePrefix - A prefix string to use with trace. This parameter can
be null.Exceptionpublic static final AepBusConnection create(String appName, MessageBusDescriptor busDescriptor, Map<String,AepEngineDescriptor.ChannelConfig> channelsConfig, IEventHandler eventHandler, Properties props) throws Exception
This method invokes create(appName, 0, busDescriptor, channelsConfig, eventHandler, null, props, null, null)
Exceptionpublic final String getName()
A bus connection's fully qualified name is the [appName].[busName]
public final String getAppName()
The app name is the app name supplied to the create(java.lang.String, int, com.neeve.sma.MessageBusDescriptor, java.util.Map<java.lang.String, com.neeve.aep.AepEngineDescriptor.ChannelConfig>, com.neeve.event.IEventHandler, com.neeve.sma.MessageBusBinding.SequenceNumberGenerator, java.util.Properties, com.neeve.aep.AepBusConnection.MessageTracer, java.lang.String) method
public final String getUserName()
The user name is the app name unless overriden by a PROPNAME_USER_NAME configuration
property
public final int getId()
public final AepBusConnection.State getState()
public final AepBusConnectionStats getStats()
public final MessageBusDescriptor getBusDescriptor()
public final boolean isInternal()
An AepEngine creates internal bus bindings for control information. This
method tests if a bus connection such.
public final boolean isReconnectOnFail()
public final boolean isAcksRequireFlush()
public final boolean isDetachedCommit()
public final int getDisruptorCapacity()
This method returns the capacity of the disruptor used by a bus connection operating in detached mode.
public final int getDisruptorRemaining()
This method returns the number of free slots remaining in the disruptor used by a bus connection operating in detached mode.
public final String getDisruptorClaimStrategy()
This method returns the claim strategy of the disruptor used by a bus connection operating in detached mode.
public final String getDisruptorWaitStrategy()
This method returns the wait strategy of the disruptor used by a bus connection operating in detached mode.
public final MessageBusBinding getBusBinding()
The binding returned by this method can be null in case the connection is in the open process.
Note: This method is for internal use. Developers should never need to use this method. To gain access to a binding, developers should wait for the appropriate lifecycle event that notifies of binding creation/open
public final MessageChannel getChannel(String channelName)
channelName - The name of the channel to retrieve
The channel returned by this method can be null in case the connection is in the open process.
Note: This method is for internal use. Developers should never need to use this method. To gain access to a channel, developers should wait for the appropriate lifecycle event that notifies of channel creation/open
public final void open()
throws EAepException
This method opens and starts a bus connection. The method effectively
invokes openNoStart() followed by start() and throws an
exception if any of these operations encounters an error. The semantics
of the exception thrown is the same as the semantics of the exception
returned by openNoStart()
EAepExceptionpublic final EAepException openNoStart()
This method opens a bus connection. The method starts the background
binding opener thread, opens the outbound queue, sets the state to
AepBusConnection.State.OPENING and creates the bus binding. The method then
updates the state of the connection to AepBusConnection.State.OPEN if the
binding was created successfully. If the binding create fails, then
the method leaves the state as AepBusConnection.State.OPENING and returns
the exception encountered wrapped in an EAepException.
This method does not start the underlying messaging binding. To do
so, the caller needs to subsequently invoke the start() method.
Note: This method is intended for use with start() by advanced
users who want to have fine control on the behavior of the system across
the various types of failures that could be encountered during open and
separate control over the opening and starting of the connection. The
#openAndStart method is a simpler and more traditional method
for use by users who wish to use a single method to open and start the
connection and throw an exception if one is encountered during either
of these operations.
EAepException that wraps around the exception
encountered in creating the binding. A wrapped SmaException
indicates that a temporary error was encouneterd during the creation
of the binding i.e. a retry could fix the error while a wrapped
RuntimeException indicates a permanent error was encountered.public final void start()
This method starts the underlying message bus binding
public final void enque(MessageChannel channel, IRogMessage message, int flags)
channel - The channel through which the enqueued message should
be sent when the enqueued message batch is committedmessage - The message to enqueuflags - The flags to use when sending the message through the
channel.public final com.neeve.aep.AepBusConnection.OutboundQueue.Slice complete(com.neeve.aep.AepBusConnection.OutboundQueue.Slice slice)
This method closes the current message batch (the outbound slice to
which messages have been enqueued till this method was invoked) and starts
a new message batch. The method returns the message batch. It is
up to the user to invoke commit(com.neeve.aep.AepBusConnection.OutboundQueue.Slice, com.neeve.aep.AepSendCommitCompletionEvent, long) at a later point to send those
messages outbound.
public final void commit(com.neeve.aep.AepBusConnection.OutboundQueue.Slice slice,
AepSendCommitCompletionEvent event,
long sendTs)
slice - The message batch to commit. This message batch must
be obtained from the call to complete(com.neeve.aep.AepBusConnection.OutboundQueue.Slice)event - The event to dispatch on completion of the commit. This
event is dispatched when message stability event has been received
for all messages in the supplied batch.sendTs - The send time to be stamped on each message in the
committed batch.
This method commits a message batch (slice) returned by a prior
invocation to complete(com.neeve.aep.AepBusConnection.OutboundQueue.Slice) or the current active batch if no
batch is provided. The act of committing a slice sends each message
in the batch through the underlying bus binding.
public final void commit(AepSendCommitCompletionEvent event, long sendTs)
This method invokes commit(null, event, sendTs)
public final void setSavepoint(int savepoint)
savepoint - The savepoint to set
Savepoints are a monotonically increasing set of values that are used to demarcate sections of the current message batch. It is provided to enable portions of the current batch to be rolled back. Rollback to a savepoint is performed by pruning elements in the current batch backwards from the end of the batch that have savepoint values greater or equal to the savepoint to where the rollback is to be performed. For example, if savepoints 1, 2 and 3 have been set of a message batch and the user rolls back to savepoint #2, then all elements in the batch tagged with savepoint #2 and #3 are removed i.e. only elements with savepoint=1 are left in the batch remain
This method sets the current savepoint. The savepoint set here is tagged to all messages enqueued subsequent to the invocation of this method.
IllegalArgumentException - Thrown if the supplied savepoint is less
than zero or less than the current savepointpublic final void resetSavepoint()
This method forcibly resets the current savepoint to 0. This method should be used carefully since it is up to the user to ensure that no messages are in the current message batch with savepoint > 0 since messages enqueued subsequent to this method invocation will have a savepoint value less than previous messages which can cause rollback to not behave in the expected way.
public final void rollback()
This method empties the current message batch entirely.
public final void rollback(int savepoint)
savepoint - The savepoint to roll back to.public final void send(MessageChannel channel, IRogMessage message, int flags) throws SmaException
channel - The message channel to send the message throughmessage - The message to sendflags - The flags to use when sending the message through the channel.event - The completion event to dispatch on send stabilization. This
parameter can be null in which case no stabilization event is dispatched
on completion
This method is used to send a message
SmaExceptionpublic final void close(Exception cause, boolean preserveChannelJoins)
cause - The reason why the bus connection is being closed (null
means normal shutdown)preserveChannelJoins - Indicates whether or not joined channels' subscriptions
should be preservedpublic final void onEvent(Event event)
IEventHandler.onEvent(com.neeve.event.Event)
Note: This method is for internal use by the bus connection. It is used to
receive events from the underlying message bus binding. Developers
should never invoke this method directly.
onEvent in interface IEventHandlerevent - The event to be handled.Copyright © 2019 N5 Technologies, Inc. All Rights Reserved.