Skip to content

Commit c6bdec7

Browse files
authored
Merge pull request #51 from swarwick/master
Issue #47 and Issue #48
2 parents dbfa153 + 6c275e8 commit c6bdec7

21 files changed

+112
-71
lines changed

core/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
group 'com.epam.cme'
2-
version '2.0.2'
2+
version '2.1.0'
33

44
apply plugin: 'java'
55

core/src/main/java/com/epam/cme/mdp3/MdpPacket.java

-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
import com.epam.cme.mdp3.sbe.message.SbeBuffer;
1616
import com.epam.cme.mdp3.sbe.message.SbeBufferImpl;
17-
import com.epam.cme.mdp3.sbe.message.SbeConstants;
1817
import com.epam.cme.mdp3.sbe.message.SbeMessage;
1918

2019
import java.nio.ByteBuffer;

core/src/main/java/com/epam/cme/mdp3/core/channel/MdpFeedException.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212

1313
package com.epam.cme.mdp3.core.channel;
1414

15-
public class
16-
MdpFeedException extends Exception {
15+
public class MdpFeedException extends Exception {
16+
17+
private static final long serialVersionUID = -6425092985801210129L;
18+
1719
public MdpFeedException() {
1820
super();
1921
}

core/src/main/java/com/epam/cme/mdp3/core/channel/MdpFeedWorker.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@
3737

3838
public class MdpFeedWorker implements Runnable {
3939
private static final Logger logger = LoggerFactory.getLogger(MdpFeedWorker.class);
40-
private static final int ACTIVE_MARK = 1;
41-
private static final int SHUTDOWN_MARK = 2;
4240
public static final int RCV_BUFFER_SIZE = 4*1024*1024;
4341

4442
private final ConnectionCfg cfg;
@@ -167,7 +165,7 @@ public void run() {
167165

168166
private void select(final ByteBuffer byteBuffer, final MdpPacket mdpPacket) throws IOException {
169167
if (selector.select() > 0) {
170-
Iterator selectedKeys = selector.selectedKeys().iterator();
168+
Iterator<?> selectedKeys = selector.selectedKeys().iterator();
171169
while (selectedKeys.hasNext()) {
172170
final SelectionKey key = (SelectionKey) selectedKeys.next();
173171
selectedKeys.remove();

core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeBuffer.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ public interface SbeBuffer {
2626
*/
2727
void wrapForParse(final ByteBuffer bb);
2828

29-
void copyTo(BytesStore store);
29+
void copyTo(BytesStore<?, ?> store);
3030

31-
void copyTo(int offset, BytesStore store, int len);
31+
void copyTo(int offset, BytesStore<?, ?> store, int len);
3232

33-
void copyFrom(BytesStore store);
33+
void copyFrom(BytesStore<?, ?> store);
3434

3535
void copyFrom(SbeBuffer buffer);
3636

core/src/main/java/com/epam/cme/mdp3/sbe/message/SbeBufferImpl.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
public class SbeBufferImpl extends AbstractSbeBuffer implements SbeBuffer {
2121
public static final byte BYTE_MASK = (byte) 0xff;
22-
protected BytesStore bytes;
22+
protected BytesStore<?, ?> bytes;
2323

2424
@Override
2525
public void wrap(final SbeBuffer sb) {
@@ -31,17 +31,17 @@ public void wrap(final SbeBuffer sb) {
3131
}
3232

3333
@Override
34-
public void copyTo(final BytesStore store) {
34+
public void copyTo(final BytesStore<?, ?> store) {
3535
store.write(0, this.bytes, 0, this.length());
3636
}
3737

3838
@Override
39-
public void copyTo(final int offset, final BytesStore store, final int len) {
39+
public void copyTo(final int offset, final BytesStore<?, ?> store, final int len) {
4040
store.write(0, this.bytes, offset, len);
4141
}
4242

4343
@Override
44-
public void copyFrom(BytesStore store) {
44+
public void copyFrom(BytesStore<?, ?> store) {
4545
this.bytes.write(0, store, 0, store.length());
4646
this.length = store.length();
4747
}

core/src/main/java/com/epam/cme/mdp3/sbe/message/meta/SbePrimitiveType.java

+4
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ public long getMaxValue() {
8686
public long getNullValue() {
8787
return nullValue;
8888
}
89+
90+
public String getJavaType() {
91+
return javaType;
92+
}
8993

9094
public boolean isNull(final long value) {
9195
return value == this.nullValue;

core/src/main/java/com/epam/cme/mdp3/sbe/schema/MdpMessageTypeBuildException.java

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
package com.epam.cme.mdp3.sbe.schema;
1414

1515
public class MdpMessageTypeBuildException extends Exception {
16+
private static final long serialVersionUID = -6385550323191516252L;
17+
1618
public MdpMessageTypeBuildException() {
1719
super();
1820
}

core/src/main/java/com/epam/cme/mdp3/sbe/schema/SchemaUnmarshallingException.java

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
package com.epam.cme.mdp3.sbe.schema;
1414

1515
public class SchemaUnmarshallingException extends Exception {
16+
private static final long serialVersionUID = 1944148130312471541L;
17+
1618
public SchemaUnmarshallingException() {
1719
super();
1820
}

mbp-only/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ plugins {
77
id "me.champeau.gradle.jmh" version "0.3.1"
88
}
99
group = 'com.epam.cme'
10-
version = '2.0.2'
10+
version = '2.1.0'
1111
sourceCompatibility = 1.8
1212
targetCompatibility = 1.8
1313

mbp-with-mbo/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
}
77

88
group = 'com.epam.cme'
9-
version = '2.0.2'
9+
version = '2.1.0'
1010
sourceCompatibility = 1.8
1111
targetCompatibility = 1.8
1212

mbp-with-mbo/src/main/java/com/epam/cme/mdp3/ChannelListener.java

+16-6
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,39 @@ public interface ChannelListener extends CoreChannelListener {
2121
*
2222
* Only when MBO is enabled.
2323
*
24+
* @param mdpMessage The full MDP Message
2425
* @param channelId ID of MDP Channel
25-
* @param matchEventIndicator MDP Event indicator (bitset, @see <a href="http://www.cmegroup.com/confluence/display/EPICSANDBOX/MDP+3.0+-+Market+Data+Incremental+Refresh">MDP 3.0 - Market Data Incremental Refresh</a>)
2626
* @param secDesc Security description
2727
* @param msgSeqNum Message sequence number of message.
2828
* @param securityId Security ID
2929
* @param orderEntry MBO Entry of Group from MDP Incremental Refresh Message. It can not be null.
3030
* @param mdEntry MBP Entry of Group from MDP Incremental Refresh Message. It can be null when MBO Incremental Refresh is received in separated template.
3131
*/
32-
void onIncrementalMBORefresh(final String channelId, final short matchEventIndicator, final int securityId,
33-
final String secDesc, final long msgSeqNum, final FieldSet orderEntry, final FieldSet mdEntry);
32+
void onIncrementalMBORefresh(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum, final FieldSet orderEntry, final FieldSet mdEntry);
3433

3534
/**
3635
*
36+
* @param mdpMessage The full MDP Message
3737
* @param channelId ID of MDP Channel
38-
* @param matchEventIndicator MDP Event indicator (bitset, @see <a href="http://www.cmegroup.com/confluence/display/EPICSANDBOX/MDP+3.0+-+Market+Data+Incremental+Refresh">MDP 3.0 - Market Data Incremental Refresh</a>)
3938
* @param securityId Security ID
4039
* @param secDesc Security description
4140
* @param msgSeqNum Message sequence number of message.
4241
* @param mdEntry MBP Entry of Group from MDP Incremental Refresh Message. It can not be null.
4342
*/
44-
void onIncrementalMBPRefresh(final String channelId, final short matchEventIndicator, final int securityId,
45-
final String secDesc, final long msgSeqNum, final FieldSet mdEntry);
43+
void onIncrementalMBPRefresh(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum, final FieldSet mdEntry);
4644

45+
/**
46+
* Called when a Incremental MsgSeqNum has been fully processed
47+
* This callback will be called for each securityId found in the MsgSeqNum packet
48+
*
49+
* @param mdpMessage The full MDP Message
50+
* @param channelId ID of MDP Channel
51+
* @param securityId Security ID
52+
* @param secDesc Security description
53+
* @param msgSeqNum Message sequence number of message.
54+
*/
55+
void onIncrementalComplete(final MdpMessage mdpMessage, final String channelId, final int securityId, final String secDesc, final long msgSeqNum);
56+
4757
/**
4858
* Called when MDP Snapshot Full Refresh Message for MBO is received and processed.
4959
*

mbp-with-mbo/src/main/java/com/epam/cme/mdp3/VoidChannelListener.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,17 @@ default void onSecurityStatus(String channelId, int securityId, MdpMessage secSt
6060
}
6161

6262
@Override
63-
default void onIncrementalMBORefresh(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, FieldSet orderEntry, FieldSet mdEntry) {
63+
default void onIncrementalMBORefresh(MdpMessage mdpMessage, String channelId, int securityId, String secDesc, long msgSeqNum, FieldSet orderEntry, FieldSet mdEntry) {
6464

6565
}
6666

6767
@Override
68-
default void onIncrementalMBPRefresh(String channelId, short matchEventIndicator, int securityId, String secDesc, long msgSeqNum, FieldSet mdEntry) {
68+
default void onIncrementalMBPRefresh(MdpMessage mdpMessage, String channelId, int securityId, String secDesc, long msgSeqNum, FieldSet mdEntry) {
69+
70+
}
71+
72+
@Override
73+
default void onIncrementalComplete(MdpMessage mdpMessage, String channelId, int securityId, String secDesc, long msgSeqNum) {
6974

7075
}
7176

mbp-with-mbo/src/main/java/com/epam/cme/mdp3/channel/MdpChannelBuilder.java

-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.epam.cme.mdp3.sbe.schema.MdpMessageTypes;
2424

2525
import java.net.URI;
26-
import java.util.Arrays;
2726
import java.util.HashMap;
2827
import java.util.List;
2928
import java.util.Map;
@@ -35,7 +34,6 @@ public class MdpChannelBuilder {
3534
private final String channelId;
3635
private URI cfgURI;
3736
private URI schemaURI;
38-
private MdpMessageTypes mdpMessageTypes;
3937
private Map<FeedType, String> feedANetworkInterfaces = new HashMap<>();
4038
private Map<FeedType, String> feedBNetworkInterfaces = new HashMap<>();
4139
private ChannelListener channelListener;
@@ -66,7 +64,6 @@ public MdpChannelBuilder setConfiguration(final URI cfgURI) {
6664

6765
public MdpChannelBuilder setSchema(final URI schemaURI) throws MdpMessageTypeBuildException {
6866
this.schemaURI = schemaURI;
69-
this.mdpMessageTypes = new MdpMessageTypes(this.schemaURI);
7067
return this;
7168
}
7269

mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/ChannelControllerRouter.java

+16
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.epam.cme.mdp3.sbe.schema.MdpMessageTypes;
2222
import org.slf4j.Logger;
2323
import org.slf4j.LoggerFactory;
24+
import org.agrona.collections.IntHashSet;
2425

2526
import java.util.List;
2627
import java.util.function.Consumer;
@@ -39,6 +40,7 @@ public class ChannelControllerRouter implements MdpChannelController {
3940
private final String channelId;
4041
private List<Integer> mboIncrementMessageTemplateIds;
4142
private List<Integer> mboSnapshotMessageTemplateIds;
43+
private IntHashSet securityIds = new IntHashSet();
4244

4345
public ChannelControllerRouter(String channelId, InstrumentManager instrumentManager,
4446
MdpMessageTypes mdpMessageTypes, List<ChannelListener> channelListeners,
@@ -131,14 +133,25 @@ protected void routeMBPEntry(int securityId, MdpMessage mdpMessage, MdpGroupEntr
131133
}
132134
}
133135

136+
protected void routeIncrementalComplete(IntHashSet securityIds, MdpMessage mdpMessage, long msgSeqNum) {
137+
for (int securityId : securityIds) {
138+
InstrumentController instrumentController = instrumentManager.getInstrumentController(securityId);
139+
if (instrumentController != null) {
140+
instrumentController.handleIncrementalComplete(mdpMessage, msgSeqNum);
141+
}
142+
}
143+
}
144+
134145
private void handleIncrementalMessage(MdpMessage mdpMessage, MdpGroup mdpGroup, MdpGroupEntry mdpGroupEntry, long msgSeqNum){
135146
if (isIncrementalMessageSupported(mdpMessage)) {
147+
securityIds.clear();
136148
if (isIncrementOnlyForMBO(mdpMessage)) {
137149
mdpMessage.getGroup(MdConstants.NO_MD_ENTRIES, mdpGroup);
138150
while (mdpGroup.hashNext()) {
139151
mdpGroup.next();
140152
mdpGroup.getEntry(mdpGroupEntry);
141153
int securityId = getSecurityId(mdpGroupEntry);
154+
securityIds.add(securityId);
142155
routeMBOEntry(securityId, mdpMessage, mdpGroupEntry, null, msgSeqNum);
143156
}
144157
} else {
@@ -151,6 +164,7 @@ private void handleIncrementalMessage(MdpMessage mdpMessage, MdpGroup mdpGroup,
151164
emptyBookConsumers.forEach(mdpMessageConsumer -> mdpMessageConsumer.accept(mdpMessage));
152165
} else {
153166
int securityId = mdEntry.getInt32(MdConstants.SECURITY_ID);
167+
securityIds.add(securityId);
154168
routeMBPEntry(securityId, mdpMessage, mdEntry, msgSeqNum);
155169
}
156170
}
@@ -163,10 +177,12 @@ private void handleIncrementalMessage(MdpMessage mdpMessage, MdpGroup mdpGroup,
163177
short entryNum = mdpGroupEntry.getUInt8(MdConstants.REFERENCE_ID);
164178
noMdEntriesGroup.getEntry(entryNum, mdEntry);
165179
int securityId = mdEntry.getInt32(MdConstants.SECURITY_ID);
180+
securityIds.add(securityId);
166181
routeMBOEntry(securityId, mdpMessage, mdpGroupEntry, mdEntry, msgSeqNum);
167182
}
168183
}
169184
}
185+
routeIncrementalComplete(securityIds, mdpMessage, msgSeqNum);
170186
}
171187
}
172188

mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,13 @@ public GapChannelController(List<ChannelListener> channelListeners, ChannelContr
7878
}
7979

8080
@Override
81-
public List<Integer> getMBOIncrementMessageTemplateIds() {
82-
return mboIncrementMessageTemplateIds == null ? MdpChannelController.super.getMBOIncrementMessageTemplateIds() : mboIncrementMessageTemplateIds;
81+
public List<Integer> getMBOIncrementMessageTemplateIds() {
82+
return mboIncrementMessageTemplateIds == null ? MdpChannelController.super.getMBOIncrementMessageTemplateIds() : mboIncrementMessageTemplateIds;
8383
}
8484

8585
@Override
8686
public List<Integer> getMBOSnapshotMessageTemplateIds() {
87-
return mboSnapshotMessageTemplateIds == null ? MdpChannelController.super.getMBOSnapshotMessageTemplateIds() : mboSnapshotMessageTemplateIds;
87+
return mboSnapshotMessageTemplateIds == null ? MdpChannelController.super.getMBOSnapshotMessageTemplateIds() : mboSnapshotMessageTemplateIds;
8888
}
8989

9090
@Override
@@ -143,6 +143,8 @@ public void handleSnapshotPacket(MdpFeedContext feedContext, MdpPacket mdpPacket
143143
target.handleSnapshotPacket(feedContext, mdpPacket);
144144
}
145145
break;
146+
default:
147+
break;
146148
}
147149
} finally {
148150
lock.unlock();
@@ -199,6 +201,8 @@ public void handleIncrementalPacket(MdpFeedContext feedContext, MdpPacket mdpPac
199201
feedContext.getFeedType(), feedContext.getFeed(), currentState, pkgSequence);
200202
}
201203
break;
204+
default:
205+
break;
202206
}
203207
} finally {
204208
lock.unlock();

mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/InstrumentController.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
import com.epam.cme.mdp3.ChannelListener;
1616
import com.epam.cme.mdp3.MdpGroupEntry;
1717
import com.epam.cme.mdp3.MdpMessage;
18-
import com.epam.cme.mdp3.sbe.message.SbeConstants;
19-
2018
import java.util.List;
2119

2220
public class InstrumentController {
@@ -35,18 +33,24 @@ public InstrumentController(List<ChannelListener> listeners, String channelId, i
3533

3634
public void handleMBOIncrementMDEntry(MdpMessage mdpMessage, MdpGroupEntry orderIDEntry, MdpGroupEntry mdEntry, long msgSeqNum){
3735
if(enable) {
38-
short matchEventIndicator = mdpMessage.getUInt8(SbeConstants.MATCHEVENTINDICATOR_TAG);
3936
for (ChannelListener channelListener : listeners) {
40-
channelListener.onIncrementalMBORefresh(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, orderIDEntry, mdEntry);
37+
channelListener.onIncrementalMBORefresh(mdpMessage, channelId, securityId, secDesc, msgSeqNum, orderIDEntry, mdEntry);
4138
}
4239
}
4340
}
4441

4542
public void handleMBPIncrementMDEntry(MdpMessage mdpMessage, MdpGroupEntry mdEntry, long msgSeqNum){
4643
if(enable) {
47-
short matchEventIndicator = mdpMessage.getUInt8(SbeConstants.MATCHEVENTINDICATOR_TAG);
4844
for (ChannelListener channelListener : listeners) {
49-
channelListener.onIncrementalMBPRefresh(channelId, matchEventIndicator, securityId, secDesc, msgSeqNum, mdEntry);
45+
channelListener.onIncrementalMBPRefresh(mdpMessage, channelId, securityId, secDesc, msgSeqNum, mdEntry);
46+
}
47+
}
48+
}
49+
50+
public void handleIncrementalComplete(MdpMessage mdpMessage, long msgSeqNum) {
51+
if(enable) {
52+
for (ChannelListener channelListener : listeners) {
53+
channelListener.onIncrementalComplete(mdpMessage, channelId, securityId, secDesc, msgSeqNum);
5054
}
5155
}
5256
}

mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/OffHeapSnapshotCycleHandler.java

+1-13
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ private void clearArray(LongArray array) {
123123
}
124124

125125
private class LongArray {
126-
private BytesStore bytes;
126+
private BytesStore<?, ?> bytes;
127127
private long length;
128128

129129
public LongArray(long length) {
@@ -149,14 +149,6 @@ public void setValue(long index, long value) {
149149
}
150150
}
151151

152-
public BytesStore getBytes() {
153-
return bytes;
154-
}
155-
156-
public void setBytes(BytesStore bytes) {
157-
this.bytes = bytes;
158-
}
159-
160152
public long getLength() {
161153
return length;
162154
}
@@ -183,10 +175,6 @@ public T getValue() {
183175
return value;
184176
}
185177

186-
public void setValue(T value) {
187-
this.value = value;
188-
}
189-
190178
@Override
191179
public boolean equals(Object o) {
192180
if (this == o) return true;

0 commit comments

Comments
 (0)