Skip to content

Commit 61ee2f4

Browse files
committed
fix
1 parent c5cd32a commit 61ee2f4

File tree

15 files changed

+448
-494
lines changed

15 files changed

+448
-494
lines changed

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3550,7 +3550,7 @@ private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueue
35503550

35513551
long minOffsetByTime = 0L;
35523552
try {
3553-
minOffsetByTime = rocksDBMessageStore.getConsumeQueueStore().getOffsetInQueueByTime(topic, queueId, checkpointByStoreTime, BoundaryType.UPPER);
3553+
minOffsetByTime = rocksDBMessageStore.getQueueStore().getOffsetInQueueByTime(topic, queueId, checkpointByStoreTime, BoundaryType.UPPER);
35543554
} catch (Exception e) {
35553555
// ignore
35563556
}

broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public boolean correctDelayOffset() {
236236
try {
237237
for (int delayLevel : delayLevelTable.keySet()) {
238238
ConsumeQueueInterface cq =
239-
brokerController.getMessageStore().getQueueStore().findOrCreateConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
239+
brokerController.getMessageStore().findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
240240
delayLevel2QueueId(delayLevel));
241241
Long currentDelayOffset = offsetTable.get(delayLevel);
242242
if (currentDelayOffset == null || cq == null) {

broker/src/test/java/org/apache/rocketmq/broker/offset/RocksdbTransferOffsetAndCqTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public void testRocksdbCqWrite() throws RocksDBException {
135135
return;
136136
}
137137
RocksDBMessageStore kvStore = defaultMessageStore.getRocksDBMessageStore();
138-
ConsumeQueueStoreInterface store = kvStore.getConsumeQueueStore();
138+
ConsumeQueueStoreInterface store = kvStore.getQueueStore();
139139
store.start();
140140
ConsumeQueueInterface rocksdbCq = defaultMessageStore.getRocksDBMessageStore().findConsumeQueue(topic, queueId);
141141
ConsumeQueueInterface fileCq = defaultMessageStore.findConsumeQueue(topic, queueId);

client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616
*/
1717
package org.apache.rocketmq.client.impl.consumer;
1818

19+
import java.lang.reflect.Field;
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.TreeMap;
1924
import org.apache.commons.lang3.reflect.FieldUtils;
2025
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
2126
import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -29,12 +34,6 @@
2934
import org.junit.runner.RunWith;
3035
import org.mockito.junit.MockitoJUnitRunner;
3136

32-
import java.lang.reflect.Field;
33-
import java.util.ArrayList;
34-
import java.util.Collections;
35-
import java.util.List;
36-
import java.util.TreeMap;
37-
3837
import static org.assertj.core.api.Assertions.assertThat;
3938
import static org.junit.Assert.assertEquals;
4039
import static org.junit.Assert.assertFalse;
@@ -158,7 +157,6 @@ public void testProcessQueue() {
158157
ProcessQueue processQueue2 = createProcessQueue();
159158
assertEquals(processQueue1.getMsgAccCnt(), processQueue2.getMsgAccCnt());
160159
assertEquals(processQueue1.getTryUnlockTimes(), processQueue2.getTryUnlockTimes());
161-
assertEquals(processQueue1.getLastLockTimestamp(), processQueue2.getLastLockTimestamp());
162160
assertEquals(processQueue1.getLastPullTimestamp(), processQueue2.getLastPullTimestamp());
163161
}
164162

store/src/main/java/org/apache/rocketmq/store/CommitLog.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.apache.rocketmq.store;
1818

1919
import com.google.common.base.Strings;
20+
import com.sun.jna.NativeLong;
21+
import com.sun.jna.Pointer;
2022
import java.net.Inet6Address;
2123
import java.net.InetSocketAddress;
2224
import java.nio.ByteBuffer;
@@ -34,8 +36,6 @@
3436
import java.util.concurrent.TimeoutException;
3537
import java.util.function.Supplier;
3638
import java.util.stream.Collectors;
37-
import com.sun.jna.NativeLong;
38-
import com.sun.jna.Pointer;
3939
import org.apache.rocketmq.common.MixAll;
4040
import org.apache.rocketmq.common.ServiceThread;
4141
import org.apache.rocketmq.common.SystemClock;
@@ -66,7 +66,6 @@
6666
import org.apache.rocketmq.store.logfile.MappedFile;
6767
import org.apache.rocketmq.store.util.LibC;
6868
import org.rocksdb.RocksDBException;
69-
7069
import sun.nio.ch.DirectBuffer;
7170

7271
/**
@@ -408,8 +407,7 @@ else if (!dispatchRequest.isSuccess()) {
408407
log.warn("The commitlog files are deleted, and delete the consume queue files");
409408
this.mappedFileQueue.setFlushedWhere(0);
410409
this.mappedFileQueue.setCommittedWhere(0);
411-
this.defaultMessageStore.getQueueStore().destroy();
412-
this.defaultMessageStore.getQueueStore().loadAfterDestroy();
410+
this.defaultMessageStore.destroyConsumeQueueStore(true);
413411
}
414412
}
415413

@@ -818,8 +816,7 @@ else if (size == 0) {
818816
log.warn("The commitlog files are deleted, and delete the consume queue files");
819817
this.mappedFileQueue.setFlushedWhere(0);
820818
this.mappedFileQueue.setCommittedWhere(0);
821-
this.defaultMessageStore.getQueueStore().destroy();
822-
this.defaultMessageStore.getQueueStore().loadAfterDestroy();
819+
this.defaultMessageStore.destroyConsumeQueueStore(true);
823820
}
824821
}
825822

store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.rocketmq.store.config.StorePathConfigHelper;
3636
import org.apache.rocketmq.store.logfile.MappedFile;
3737
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
38+
import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;
3839
import org.apache.rocketmq.store.queue.CqUnit;
3940
import org.apache.rocketmq.store.queue.FileQueueLifeCycle;
4041
import org.apache.rocketmq.store.queue.MultiDispatchUtils;
@@ -61,6 +62,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCycle {
6162
private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
6263

6364
private final MessageStore messageStore;
65+
private final ConsumeQueueStoreInterface consumeQueueStore;
6466

6567
private final MappedFileQueue mappedFileQueue;
6668
private final String topic;
@@ -83,9 +85,20 @@ public ConsumeQueue(
8385
final String storePath,
8486
final int mappedFileSize,
8587
final MessageStore messageStore) {
88+
this(topic, queueId, storePath, mappedFileSize, messageStore, messageStore.getQueueStore());
89+
}
90+
91+
public ConsumeQueue(
92+
final String topic,
93+
final int queueId,
94+
final String storePath,
95+
final int mappedFileSize,
96+
final MessageStore messageStore,
97+
final ConsumeQueueStoreInterface consumeQueueStore) {
8698
this.storePath = storePath;
8799
this.mappedFileSize = mappedFileSize;
88100
this.messageStore = messageStore;
101+
this.consumeQueueStore = consumeQueueStore;
89102

90103
this.topic = topic;
91104
this.queueId = queueId;
@@ -899,14 +912,14 @@ public CqUnit get(long offset) {
899912
@Override
900913
public Pair<CqUnit, Long> getCqUnitAndStoreTime(long index) {
901914
CqUnit cqUnit = get(index);
902-
Long messageStoreTime = this.messageStore.getQueueStore().getStoreTime(cqUnit);
915+
Long messageStoreTime = this.consumeQueueStore.getStoreTime(cqUnit);
903916
return new Pair<>(cqUnit, messageStoreTime);
904917
}
905918

906919
@Override
907920
public Pair<CqUnit, Long> getEarliestUnitAndStoreTime() {
908921
CqUnit cqUnit = getEarliestUnit();
909-
Long messageStoreTime = this.messageStore.getQueueStore().getStoreTime(cqUnit);
922+
Long messageStoreTime = this.consumeQueueStore.getStoreTime(cqUnit);
910923
return new Pair<>(cqUnit, messageStoreTime);
911924
}
912925

0 commit comments

Comments
 (0)