-
Notifications
You must be signed in to change notification settings - Fork 11.8k
[ISSUE #9254] Refactor CQ-related in DefaultMessageStorage #9256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
[ISSUE #9254] Refactor CQ-related in DefaultMessageStorage #9256
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #9256 +/- ##
===========================================
Coverage 48.11% 48.12%
- Complexity 12110 12218 +108
===========================================
Files 1322 1324 +2
Lines 93126 93525 +399
Branches 11940 12011 +71
===========================================
+ Hits 44812 45013 +201
- Misses 42775 42955 +180
- Partials 5539 5557 +18 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
ad190f2
to
61ee2f4
Compare
dc014b5
to
a8996fa
Compare
da78405
to
59ee17a
Compare
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
Outdated
Show resolved
Hide resolved
@@ -1204,4 +1217,18 @@ public long estimateMessageCount(long from, long to, MessageFilter filter) { | |||
log.debug("Result={}, raw={}, match={}, sample={}", result, raw, match, sample); | |||
return result; | |||
} | |||
|
|||
public void initializeWithOffset(long offset) { | |||
destroy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling destroy method in init, it's hard to get the point.
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/metrics/RocksDBStoreMetricsManager.java
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
public boolean checkAssignOffset(boolean initializeOffset) throws RocksDBException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about rename to initAssginOffsetForAllConsumeQueueStores, and add javadoc here for readability.
@@ -520,7 +516,7 @@ private void compensateForHA(ConcurrentMap<String, Long> cqOffsetTable) { | |||
} | |||
|
|||
@Override | |||
public void destroy() { | |||
public void destroy(boolean loadAfterDestroy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here no need to handle switch for loadAfterDestroy ?
final RocksDBConsumeQueueStore consumeQueueStore, | ||
final String topic, final int queueId) { | ||
this.messageStoreConfig = messageStoreConfig; | ||
this.consumeQueueStore = consumeQueueStore; | ||
this.topic = topic; | ||
this.queueId = queueId; | ||
} | ||
|
||
public RocksDBConsumeQueue(final String topic, final int queueId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This construct function is only used for passing params.
return this.consumeQueueStore.getMinOffsetInQueue(topic, queueId); | ||
} catch (RocksDBException e) { | ||
ERROR_LOG.error("getMinOffsetInQueue Failed. topic: {}, queueId: {}", topic, queueId, e); | ||
return -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make sure here the default value is -1, but not 0.
@@ -480,4 +486,21 @@ public CqUnit nextAndRelease() { | |||
} | |||
} | |||
} | |||
|
|||
public void initializeWithOffset(long offset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used for mount --> main in container mode? If so, add some java docs here.
|
||
// update the max and min offset | ||
if (offset > 0) { | ||
this.consumeQueueStore.updateCqOffset(topic, queueId, 0L, offset - 1, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about use two methods for updating CQ offset for min and max offsets.
if (spaceFull || timeUp) { | ||
cleanExpired(minOffset); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some java docs here, to delete the CQ Units whose offset is little than min physical offset in commitLog.
.getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()); | ||
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); | ||
if (logicsRatio > diskSpaceWarningLevelRatio) { | ||
boolean diskOk = messageStore.getRunningFlags().getAndMakeLogicDiskFull(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about rename to diskMayBeFullSoon ?
Which Issue(s) This PR Fixes
Fixes #9254
Brief Description
How Did You Test This Change?