HDDS-13891. SCM-based health monitoring and batch processing in Recon#9258
HDDS-13891. SCM-based health monitoring and batch processing in Recon#9258devmadhuu wants to merge 38 commits intoapache:masterfrom
Conversation
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTaskV2.java
Outdated
Show resolved
Hide resolved
...hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
Outdated
Show resolved
Hide resolved
…to run the replication manager logic in Recon itself.
…to run the replication manager logic in Recon itself.
.../src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManagerV2.java
Show resolved
Hide resolved
...ration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconTasksV2MultiNode.java
Outdated
Show resolved
Hide resolved
| 0L, 0L, 1000); | ||
|
|
||
| // Should be empty in normal operation (all replicas healthy) | ||
| assertEquals(0, underReplicatedContainers.size()); |
There was a problem hiding this comment.
is there a test that reproduces a situation with different types of unhealthy containers?
Here both tests expects 0 unhealthy containers.
There was a problem hiding this comment.
Updated TestReconTasks — I updated the tests to cover actual unhealthy-container scenarios with ContainerHealthTaskV2 instead of only query-path checks that return 0.
- Kept SCM sync coverage
- Added/updated end-to-end unhealthy transitions for V2:
- MISSING detection after DN shutdown + cleanup after DN restart
- EMPTY_MISSING detection (with key count 0) + cleanup after restart
Updated hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestContainerHealthTask.java
| Multibinder.newSetBinder(binder(), ReconSchemaDefinition.class); | ||
| schemaBinder.addBinding().to(UtilizationSchemaDefinition.class); | ||
| schemaBinder.addBinding().to(ContainerSchemaDefinition.class); | ||
| schemaBinder.addBinding().to(ContainerSchemaDefinitionV2.class); |
There was a problem hiding this comment.
what code is going to delete 'ContainerSchemaDefinition' from the existing DB during the upgrade?
There was a problem hiding this comment.
is there a plan to migrate v1 -> v2 rows to avoid delay with displaying the status after upgrade?
There was a problem hiding this comment.
There is no V2 schema and since new schema is same as old legacy table, so no changes needed while upgrade. Existing upgrade action classes InitialConstraintUpgradeAction and ContainerReplicaMismatchAction should suffice.
.../recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/UnhealthyContainerMetadata.java
Outdated
Show resolved
Hide resolved
...cm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/NoOpsReplicationQueue.java
Outdated
Show resolved
Hide resolved
dombizita
left a comment
There was a problem hiding this comment.
Thank you for working on this @devmadhuu! This is quite big PR, I went through mostly the REPLICA_MISMATCH related changes (which look good), but tried to go over the whole change set, except the test changes.
Two overall comments: I believe you have a design document for this solution, please share that on the jira and the PR description too; not sure if you used any kind of AI tool, if yes please mention it in the description (as ASF asks for that). Thanks!
| @SuppressWarnings("checkstyle:ParameterNumber") | ||
| public UnhealthyContainerMetadata(long containerID, String containerState, | ||
| long unhealthySince, long expectedReplicaCount, long actualReplicaCount, | ||
| long replicaDeltaCount, String reason, List<ContainerHistory> replicas, | ||
| UUID pipelineID, long keyCount) { |
There was a problem hiding this comment.
Why did you change from UnhealthyContainers to this parameter list?
There was a problem hiding this comment.
Thanks for pointing out, I was doing some testing and did some refactoring, so forgot to revert that. I reverted this to the previous UnhealthyContainers based constructor pattern. ContainerEndpoint now constructs the jOOQ-generated POJO and passes it into UnhealthyContainerMetadata
| ReconReplicationManagerReport report, | ||
| List<ContainerInfo> allContainers) { | ||
|
|
||
| long currentTime = System.currentTimeMillis(); |
There was a problem hiding this comment.
This means that every time we call processAll() it'll use the current time as inStateSince? That makes inStateSince irrelevant as it'll always be the last timestamp the task ran, not the time since the container is in that state.
There was a problem hiding this comment.
You are right. I fixed this. We now read existing rows first and preserve in_state_since for the same (containerId, state). We only use current scan time when the container enters a new state.
| /** | ||
| * POJO representing a record in UNHEALTHY_CONTAINERS table. | ||
| */ | ||
| public static class UnhealthyContainerRecordV2 { |
There was a problem hiding this comment.
Previously these classes in Recon are generated by jOOQ, aren't they? Like UnhealthyContainersRecord:
package org.apache.ozone.recon.schema.generated.tables.records;
import javax.annotation.Generated;
import org.apache.ozone.recon.schema.generated.tables.UnhealthyContainersTable;
import org.jooq.Field;
import org.jooq.Record2;
import org.jooq.Record7;
import org.jooq.Row7;
import org.jooq.impl.UpdatableRecordImpl;
/**
* This class is generated by jOOQ.
*/
@Generated(
value = {
"http://www.jooq.org",
"jOOQ version:3.11.9"
},
comments = "This class is generated by jOOQ"
)
@SuppressWarnings({ "all", "unchecked", "rawtypes" })
public class UnhealthyContainersRecord extends UpdatableRecordImpl<UnhealthyContainersRecord> implements Record7<Long, String, Long, Integer, Integer, Integer, String> {
Are you intentionally moving away from that pattern?
Also it was UnhealthyContainersRecord and now you are using UnhealthyContainerRecordV2, container was in plural, it might be good to follow that.
There was a problem hiding this comment.
No, For API transport we still use jOOQ-generated UnhealthyContainers at the endpoint boundary. UnhealthyContainerRecordV2 remains an internal service DTO used by batch write/read flow and is not replacing generated DB records globally.
| public void testUnhealthyContainersFilteredResponse() | ||
| throws IOException, TimeoutException { | ||
| String missing = UnHealthyContainerStates.MISSING.toString(); | ||
| String emptyMissing = UnHealthyContainerStates.EMPTY_MISSING.toString(); |
There was a problem hiding this comment.
I'd keep it the way it was, it's easier to maintain.
There was a problem hiding this comment.
Agreed. I switched the test back to enum-driven state values (UnHealthyContainerStates.EMPTY_MISSING/NEGATIVE_SIZE) for easier maintenance.
| public String getJdbcUrl() { | ||
| return "jdbc:derby:" + tempDir.getAbsolutePath() + | ||
| File.separator + "derby_recon.db"; | ||
| File.separator + "derby_recon.db;create=true"; | ||
| } | ||
|
|
||
| @Override | ||
| public String getUserName() { | ||
| return null; | ||
| return "RECON"; |
There was a problem hiding this comment.
Why are these changes needed?
There was a problem hiding this comment.
Good catch, I was doing some test on performance with multiple users with multiple DBs. Those changes were not needed for this PR. I reverted them to keep the test DB wiring unchanged from baseline.
| try { | ||
| initializeAndRunTask(); | ||
| long elapsed = Time.monotonicNow() - cycleStart; | ||
| long sleepMs = Math.max(0, interval - elapsed); |
There was a problem hiding this comment.
to avoid continuous loop, we can have min-interval for next run to be 1 min
| * @param container The container ID to record | ||
| */ | ||
| @Override | ||
| public void incrementAndSample(ContainerHealthState stat, ContainerInfo container) { |
There was a problem hiding this comment.
we can remove other methods, just have 2 method for replica-mismatch state.
| */ | ||
| @InterfaceAudience.Private | ||
| @Metrics(about = "ContainerHealthTaskV2 Metrics", context = OzoneConsts.OZONE) | ||
| public final class ContainerHealthTaskV2Metrics { |
There was a problem hiding this comment.
we can remove V2 from all classname and variablename
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Long.hashCode(containerId) * 31 + state.hashCode(); |
There was a problem hiding this comment.
can use return Objects.hash(field1, field2); for combining 2 field, then own implementation
| int to = Math.min(from + MAX_DELETE_CHUNK_SIZE, containerIds.size()); | ||
| List<Long> chunk = containerIds.subList(from, to); | ||
|
|
||
| try { |
There was a problem hiding this comment.
delete and insert for same container to be in same transaction
| } | ||
|
|
||
| private void persistUnhealthyRecords( | ||
| List<Long> containerIdsToDelete, |
There was a problem hiding this comment.
perform delete and insert as atomic transaction
| recordsToInsert, chunkContainerIdSet); | ||
| totalReplicaMismatchCount += chunkReplicaMismatchCount; | ||
| totalStats.add(chunkStats); | ||
| persistUnhealthyRecords(chunkContainerIds, recordsToInsert); |
There was a problem hiding this comment.
we can pass existingInStateSinceByContainerAndState values to improve deletion performance in case very little entry or no entry present in db.
|
|
||
| for (int from = 0; from < allContainers.size(); from += PERSIST_CHUNK_SIZE) { | ||
| int to = Math.min(from + PERSIST_CHUNK_SIZE, allContainers.size()); | ||
| List<Long> chunkContainerIds = collectContainerIds(allContainers, from, to); |
There was a problem hiding this comment.
we can use only chunkContainerIdSet, chunkContainerIds is not requried.
| ProcessingStats chunkStats = new ProcessingStats(); | ||
| Set<Long> negativeSizeRecorded = new HashSet<>(); | ||
|
|
||
| report.forEachContainerByState((state, cid) -> { |
There was a problem hiding this comment.
we can have SCM default report with sample-limit as 0, and no need additional capture of State vs containerId.
For replica_mismatch, can have another list being passed, need not be part of report.
and here, can have loop of allContainers from'from' to 'to'with index access as its array list (no need loop report), and if check can be avoided chunkContaineridSet
ArafatKhan2198
left a comment
There was a problem hiding this comment.
Some comments on the patch @devmadhuu
| Set<Long> negativeSizeRecorded, | ||
| ProcessingStats stats) throws ContainerNotFoundException { | ||
| switch (state) { | ||
| case MISSING: |
There was a problem hiding this comment.
SCM's handler chain can emit composite health states like:
UNHEALTHY_UNDER_REPLICATED
QUASI_CLOSED_STUCK_MISSING
QUASI_CLOSED_STUCK_UNDER_REPLICATED
MISSING_UNDER_REPLICATED
etc.
But the switch statement in handleScmStateContainer() only handles 4 states: MISSING, UNDER_REPLICATED, OVER_REPLICATED, MIS_REPLICATED. Everything else falls into default: break; and is silently thrown away.
This means a container that is both quasi-closed-stuck AND has no replicas (QUASI_CLOSED_STUCK_MISSING) will never appear in the Recon UI or the UNHEALTHY_CONTAINERS table.
Are these composite states intentionally excluded from V2? or should we map them to their base state (e.g., QUASI_CLOSED_STUCK_UNDER_REPLICATED → store as UNDER_REPLICATED with an appropriate reason string).
| handleMissingContainer(containerId, currentTime, | ||
| existingInStateSinceByContainerAndState, recordsToInsert, stats); | ||
| break; | ||
| case UNDER_REPLICATED: |
There was a problem hiding this comment.
Shouldn't we have a state for REPLICA_MISMATCH also ?
| List<UnhealthyContainerRecordV2> recordsToInsert, | ||
| Set<Long> negativeSizeRecorded, | ||
| ProcessingStats stats) throws ContainerNotFoundException { | ||
| switch (state) { |
There was a problem hiding this comment.
Remember, in our offline discussion we tried checking what happens if we attempt to add a state to the Derby table that violates the allowed-state constraint. I believe this switch case will prevent a new state from being added to the database.
| healthSchemaManager.batchDeleteSCMStatesForContainers(containerIdsToDelete); | ||
|
|
||
| LOG.info("Inserting {} unhealthy container records", recordsToInsert.size()); | ||
| healthSchemaManager.insertUnhealthyContainerRecords(recordsToInsert); | ||
| } |
There was a problem hiding this comment.
healthSchemaManager.batchDeleteSCMStatesForContainers(containerIdsToDelete); // step 1
healthSchemaManager.insertUnhealthyContainerRecords(recordsToInsert); // step 2
These two calls are not in the same transaction. If Recon crashes between step 1 and step 2, all old health data is gone but the new data was never written. The API would return 0 unhealthy containers until the next scan runs. Also, if someone queries the API between step 1 and step 2, they get empty or partial results.
|
|
||
| for (int from = 0; from < allContainers.size(); from += PERSIST_CHUNK_SIZE) { | ||
| int to = Math.min(from + PERSIST_CHUNK_SIZE, allContainers.size()); | ||
| List<Long> chunkContainerIds = collectContainerIds(allContainers, from, to); |
There was a problem hiding this comment.
This collects every single container ID in the cluster (healthy and unhealthy) and runs DELETE statements for all of them. On a cluster with 1 million containers, that means:
- Allocating a list with 1M entries
- Running 1,000 chunked DELETE statements
- Most of those containers are healthy and have no rows in the table, so the DELETEs are wasted work
Suggestion: Instead, just delete all rows by state: DELETE FROM UNHEALTHY_CONTAINERS WHERE container_state IN (...) — one statement, no chunking, much faster.
|
|
||
| // Call inherited processContainer - this runs SCM's health check chain | ||
| // readOnly=true ensures no commands are generated | ||
| processContainer(container, nullQueue, report, true); |
There was a problem hiding this comment.
Minor Suggestion -
processContainer(container, nullQueue, report, true); // calls getContainerReplicas() internally
Set<ContainerReplica> replicas = containerManager.getContainerReplicas(cid); // calls again
Every container's replicas are fetched twice once inside the inherited processContainer() and once for the REPLICA_MISMATCH check. On a 1 million container cluster, that's 2M replica lookups. We can Extract replicas once and pass them to both operations.
| dslContext.createIndex("idx_state_container_id") | ||
| .on(DSL.table(UNHEALTHY_CONTAINERS_TABLE_NAME), | ||
| DSL.field(name(CONTAINER_STATE)), | ||
| DSL.field(name(CONTAINER_ID))) |
There was a problem hiding this comment.
if (!TABLE_EXISTS_CHECK.test(conn, UNHEALTHY_CONTAINERS_TABLE_NAME)) {
createUnhealthyContainersTable(); // creates table + composite index
}
The composite index idx_state_container_id is created inside createUnhealthyContainersTable(). This method is only called if the table doesn't exist.
If someone upgrades an existing Recon deployment, the UNHEALTHY_CONTAINERS table already exists (from V1), so this entire method is skipped. The new composite index is never created on existing clusters they're stuck with the old single-column index and get none of the 43–67× performance improvement.
What changes were proposed in this pull request?
This PR Implements
ContainerHealthTaskV2by extending SCM's ReplicationManager for use in Recon. This approach evaluates container health locally using SCM's proven health check logic without requiring network communication between SCM and Recon.Design
https://docs.google.com/document/d/1iea0eC4IpPa4Qpmc47Ae3KyneFCZ_fyyuhbZwqrR3cM/edit?pli=1&tab=t.0#heading=h.986yaoz7wnxv
Implementation Approach
Introduces ContainerHealthTaskV2, a new implementation that determines container health states by:
ReplicationManagerasReconReplicationManagerprocessAll()to evaluate all containers using SCM's proven health check logicContainer Health States Detected
ContainerHealthTaskV2 detects 5 distinct health states:
SCM Health States (Inherited)
Recon-Specific Health State
Implementation: ReconReplicationManager first runs SCM's health checks, then additionally checks for REPLICA_MISMATCH by comparing checksums across replicas. This ensures both replication health and data integrity are monitored.
Testing
UNHEALTHY_CONTAINERStableDatabase Schema
Uses existing
UNHEALTHY_CONTAINERS_V2table with support for all 5 health states:Each record includes:
Some code optimizations in this PR for Recon's
ContainerHealthTaskare done using Cursor AI tool.What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-13891
How was this patch tested?
Added junit test cases and tested using local docker cluster.
Recon
UNHEALTHY_CONTAINERSTable — Performance OptimisationsSummary
This PR also improves the read throughput of the
UNHEALTHY_CONTAINERSDerby table by43–67×, fixes a latent
ERROR XBCM4crash that would occur on any cluster largeenough to trigger a >2,000-container DELETE in one statement, and removes a redundant
Java-side sort that was executing on every paginated API response.
Changes
1. New composite index —
ContainerSchemaDefinition.javaOld index:
New index:
Why this matters for paginated reads:
With the old single-column index, Derby had to:
container_state = ?(up to 200K entries).container_idon every single page call — an O(n) operation repeated once per page.With the composite index, Derby jumps directly to
(state, minContainerId)and reads the nextLIMITentries sequentially — O(1) per page regardless of cursor position or total row count.The composite index also covers
COUNT(*) WHERE container_state = ?andGROUP BY container_statequeries via its leading column prefix, so those queries retain their index-only access path.2.
ContainerHealthSchemaManagerV2.getUnhealthyContainers()— two fixesa) Conditional Java-side sort removed for forward pagination
For the common forward-pagination path (
minContainerId), the SQLORDER BY container_id ASCalready delivers sorted rows. The redundant Java sort was calling
Comparator.comparingLongon every page (up to 200 pages per state per request).b) JDBC fetch-size hint added
Derby's default JDBC fetch size is 1 row per wire call. For a 5,000-row page this
meant 5,000 individual JDBC fetch round-trips inside the driver before any data reached
the application layer. Setting
fetchSize(limit)pre-buffers the full page in a singleJDBC call.
3.
ContainerHealthSchemaManagerV2.batchDeleteSCMStatesForContainers()— internal chunkingBug fixed: Derby's SQL compiler generates a Java class per prepared statement.
A
WHERE container_id IN (N values)predicate combined with the 7-statecontainer_state IN (…)predicate generates an expression tree whose compiledbytecode can exceed the JVM 65,535-byte per-method limit (
ERROR XBCM4).The method previously delegated chunking to callers. On a large cluster
ReconReplicationManager.persistUnhealthyRecords()passes the full container listin one call — which would crash Derby on any cluster with >2,000 containers in a
single scan batch.
Fix: the method now chunks internally at
MAX_DELETE_CHUNK_SIZE = 1,000IDs perSQL statement. Callers pass any size list; the method is safe by construction.
Performance Test (Added in last commit)
A new test class
TestUnhealthyContainersDerbyPerformancebenchmarks all operationsat 1 million records (5 states × 200,000 container IDs).
Test environment: macOS 14 (Apple M-series), JDK 8, Derby 10.14 embedded,
derby.storage.pageCacheSize = 20,000(~80 MB page cache).Results — baseline vs. optimised
COUNT(*)totalCOUNTby state (avg of 5)GROUP BYsummary (all states)COUNTby state after delete (avg)Raw logs
Optimised run output (click to expand)