Skip to content

[ISSUE #9254] Refactor CQ-related in DefaultMessageStorage #9256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from

Conversation

qianye1001
Copy link
Contributor

Which Issue(s) This PR Fixes

Fixes #9254

Brief Description

How Did You Test This Change?

@qianye1001 qianye1001 marked this pull request as draft March 17, 2025 12:56
@codecov-commenter
Copy link

codecov-commenter commented Mar 18, 2025

Codecov Report

Attention: Patch coverage is 51.82927% with 316 lines in your changes missing coverage. Please review.

Project coverage is 48.12%. Comparing base (c5cd32a) to head (c0852e7).
Report is 20 commits behind head on develop.

Files with missing lines Patch % Lines
...rocketmq/store/queue/CombineConsumeQueueStore.java 41.10% 136 Missing and 13 partials ⚠️
...apache/rocketmq/store/queue/ConsumeQueueStore.java 50.00% 77 Missing and 6 partials ⚠️
...rocketmq/store/queue/RocksDBConsumeQueueStore.java 47.05% 26 Missing and 1 partial ⚠️
...etmq/store/metrics/RocksDBStoreMetricsManager.java 0.00% 16 Missing ⚠️
...pache/rocketmq/store/dledger/DLedgerCommitLog.java 46.66% 4 Missing and 4 partials ⚠️
...ache/rocketmq/store/queue/RocksDBConsumeQueue.java 75.00% 8 Missing ⚠️
...main/java/org/apache/rocketmq/store/CommitLog.java 66.66% 2 Missing and 3 partials ⚠️
...org/apache/rocketmq/store/DefaultMessageStore.java 87.80% 2 Missing and 3 partials ⚠️
...ache/rocketmq/store/config/MessageStoreConfig.java 75.00% 5 Missing ⚠️
...ocketmq/store/queue/AbstractConsumeQueueStore.java 69.23% 3 Missing and 1 partial ⚠️
... and 4 more
Additional details and impacted files
@@             Coverage Diff             @@
##             develop    #9256    +/-   ##
===========================================
  Coverage      48.11%   48.12%            
- Complexity     12110    12218   +108     
===========================================
  Files           1322     1324     +2     
  Lines          93126    93525   +399     
  Branches       11940    12011    +71     
===========================================
+ Hits           44812    45013   +201     
- Misses         42775    42955   +180     
- Partials        5539     5557    +18     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@qianye1001 qianye1001 changed the title [ISSUE #9254] Refactor DefaultMessageStorage type/enhancement [ISSUE #9254] Refactor DefaultMessageStorage Mar 18, 2025
@qianye1001 qianye1001 marked this pull request as ready for review March 18, 2025 03:31
@qianye1001 qianye1001 changed the title [ISSUE #9254] Refactor DefaultMessageStorage [ISSUE #9254] Refactor CQ-related thread in DefaultMessageStorage Mar 18, 2025
@qianye1001 qianye1001 changed the title [ISSUE #9254] Refactor CQ-related thread in DefaultMessageStorage [ISSUE #9254] Refactor CQ-related in DefaultMessageStorage Mar 19, 2025
@qianye1001 qianye1001 force-pushed the Refactor-DefaultMessageStore branch 2 times, most recently from ad190f2 to 61ee2f4 Compare March 25, 2025 07:03
@qianye1001 qianye1001 force-pushed the Refactor-DefaultMessageStore branch 2 times, most recently from dc014b5 to a8996fa Compare April 10, 2025 07:03
@qianye1001 qianye1001 force-pushed the Refactor-DefaultMessageStore branch from da78405 to 59ee17a Compare April 18, 2025 03:33
@@ -1204,4 +1217,18 @@ public long estimateMessageCount(long from, long to, MessageFilter filter) {
log.debug("Result={}, raw={}, match={}, sample={}", result, raw, match, sample);
return result;
}

public void initializeWithOffset(long offset) {
destroy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling destroy method in init, it's hard to get the point.

}
}

public boolean checkAssignOffset(boolean initializeOffset) throws RocksDBException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about rename to initAssginOffsetForAllConsumeQueueStores, and add javadoc here for readability.

@@ -520,7 +516,7 @@ private void compensateForHA(ConcurrentMap<String, Long> cqOffsetTable) {
}

@Override
public void destroy() {
public void destroy(boolean loadAfterDestroy) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here no need to handle switch for loadAfterDestroy ?

final RocksDBConsumeQueueStore consumeQueueStore,
final String topic, final int queueId) {
this.messageStoreConfig = messageStoreConfig;
this.consumeQueueStore = consumeQueueStore;
this.topic = topic;
this.queueId = queueId;
}

public RocksDBConsumeQueue(final String topic, final int queueId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This construct function is only used for passing params.

return this.consumeQueueStore.getMinOffsetInQueue(topic, queueId);
} catch (RocksDBException e) {
ERROR_LOG.error("getMinOffsetInQueue Failed. topic: {}, queueId: {}", topic, queueId, e);
return -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure here the default value is -1, but not 0.

@@ -480,4 +486,21 @@ public CqUnit nextAndRelease() {
}
}
}

public void initializeWithOffset(long offset) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used for mount --> main in container mode? If so, add some java docs here.


// update the max and min offset
if (offset > 0) {
this.consumeQueueStore.updateCqOffset(topic, queueId, 0L, offset - 1, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use two methods for updating CQ offset for min and max offsets.

if (spaceFull || timeUp) {
cleanExpired(minOffset);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some java docs here, to delete the CQ Units whose offset is little than min physical offset in commitLog.

.getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir());
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
if (logicsRatio > diskSpaceWarningLevelRatio) {
boolean diskOk = messageStore.getRunningFlags().getAndMakeLogicDiskFull();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about rename to diskMayBeFullSoon ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Enhancement] Refactor CQ-related thread in DefaultMessageStorage
4 participants