Skip to content

Commit

Permalink
Merge remote-tracking branch 'asf/master' into HDDS-7593
Browse files Browse the repository at this point in the history
  • Loading branch information
smengcl committed Mar 7, 2024
2 parents 1d81c32 + 7c8160f commit 3cadb24
Show file tree
Hide file tree
Showing 256 changed files with 5,838 additions and 2,084 deletions.
21 changes: 21 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Configuration for .github/workflows/label-pr.yml

# This rule can be deleted once the container reconciliation feature branch is merged.
container-reconciliation:
- base-branch: HDDS-10239-container-reconciliation

29 changes: 29 additions & 0 deletions .github/workflows/label-pr.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This workflow reads its configuration from the .github/labeler.yml file.
name: pull-request-labeler
on:
- pull_request_target

jobs:
labeler:
permissions:
contents: read
pull-requests: write
runs-on: ubuntu-latest
steps:
- uses: actions/labeler@v5

Original file line number Diff line number Diff line change
Expand Up @@ -111,28 +111,23 @@ public void recordWriteChunk(Pipeline pipeline, long chunkSizeBytes) {
totalWriteChunkBytes.incr(chunkSizeBytes);
}

@VisibleForTesting
public MutableCounterLong getTotalWriteChunkBytes() {
MutableCounterLong getTotalWriteChunkBytes() {
return totalWriteChunkBytes;
}

@VisibleForTesting
public MutableCounterLong getTotalWriteChunkCalls() {
MutableCounterLong getTotalWriteChunkCalls() {
return totalWriteChunkCalls;
}

@VisibleForTesting
public Map<PipelineID, MutableCounterLong> getWriteChunkBytesByPipeline() {
Map<PipelineID, MutableCounterLong> getWriteChunkBytesByPipeline() {
return writeChunkBytesByPipeline;
}

@VisibleForTesting
public Map<PipelineID, MutableCounterLong> getWriteChunkCallsByPipeline() {
Map<PipelineID, MutableCounterLong> getWriteChunkCallsByPipeline() {
return writeChunkCallsByPipeline;
}

@VisibleForTesting
public Map<UUID, MutableCounterLong> getWriteChunksCallsByLeaders() {
Map<UUID, MutableCounterLong> getWriteChunksCallsByLeaders() {
return writeChunksCallsByLeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ private synchronized void connectToDatanode(DatanodeDetails dn)
// port.
int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
if (port == 0) {
port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
}

// Add credential context to the client call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
ConfigurationSource ozoneConf, ClientTrustManager trustManager) {
final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
.get(ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf), trustManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ SortedMap<Long, List<BUFFER>> getCommitIndexMap() {
return commitIndexMap;
}

void updateCommitInfoMap(long index, List<BUFFER> buffers) {
synchronized void updateCommitInfoMap(long index, List<BUFFER> buffers) {
commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
.addAll(buffers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -201,8 +200,7 @@ public BlockOutputStream(
(long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs
.getStreamBufferFlushSize());

// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
this.responseExecutor = blockOutputStreamResourceProvider.get();
bufferList = null;
totalDataFlushedLength = 0;
writtenDataLength = 0;
Expand Down Expand Up @@ -684,7 +682,6 @@ public void cleanup(boolean invalidateClient) {
bufferList.clear();
}
bufferList = null;
responseExecutor.shutdown();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
*/
package org.apache.hadoop.hdds.scm.storage;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.ozone.common.ChunkBuffer;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;

/**
* This class executes watchForCommit on ratis pipeline and releases
Expand All @@ -42,8 +44,8 @@ class CommitWatcher extends AbstractCommitWatcher<ChunkBuffer> {
private final BufferPool bufferPool;

// future Map to hold up all putBlock futures
private final ConcurrentMap<Long, CompletableFuture<
ContainerCommandResponseProto>> futureMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, CompletableFuture<ContainerCommandResponseProto>>
futureMap = new ConcurrentHashMap<>();

CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) {
super(xceiverClient);
Expand All @@ -67,11 +69,24 @@ void releaseBuffers(long index) {
+ totalLength + ": existing = " + futureMap.keySet());
}

ConcurrentMap<Long, CompletableFuture<
ContainerCommandResponseProto>> getFutureMap() {
@VisibleForTesting
ConcurrentMap<Long, CompletableFuture<ContainerCommandResponseProto>> getFutureMap() {
return futureMap;
}

public void putFlushFuture(long flushPos, CompletableFuture<ContainerCommandResponseProto> flushFuture) {
futureMap.compute(flushPos,
(key, previous) -> previous == null ? flushFuture :
previous.thenCombine(flushFuture, (prev, curr) -> curr));
}


public void waitOnFlushFutures() throws InterruptedException, ExecutionException {
// wait for all the transactions to complete
CompletableFuture.allOf(futureMap.values().toArray(
new CompletableFuture[0])).get();
}

@Override
public void cleanup() {
super.cleanup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,13 @@ void updateCommitInfo(XceiverClientReply reply, List<ChunkBuffer> buffers) {
}

@Override
void putFlushFuture(long flushPos,
CompletableFuture<ContainerCommandResponseProto> flushFuture) {
commitWatcher.getFutureMap().put(flushPos, flushFuture);
void putFlushFuture(long flushPos, CompletableFuture<ContainerCommandResponseProto> flushFuture) {
commitWatcher.putFlushFuture(flushPos, flushFuture);
}

@Override
void waitOnFlushFutures() throws InterruptedException, ExecutionException {
// wait for all the transactions to complete
CompletableFuture.allOf(commitWatcher.getFutureMap().values().toArray(
new CompletableFuture[0])).get();
commitWatcher.waitOnFlushFutures();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public void write(@Nonnull byte[] byteArray) throws IOException {
write(ByteBuffer.wrap(byteArray));
}

@Override
public void write(@Nonnull byte[] byteArray, int off, int len) throws IOException {
write(ByteBuffer.wrap(byteArray), off, len);
}

@Override
public void write(int b) throws IOException {
write(new byte[]{(byte) b});
Expand Down
4 changes: 0 additions & 4 deletions hadoop-hdds/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,6 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
</dependency>


<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk18on</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ public String getReplication() {
+ chunkKB();
}

/** Similar to {@link #getReplication()}, but applies to proto structure, without any validation. */
public static String toString(HddsProtos.ECReplicationConfig proto) {
return proto.getCodec() + EC_REPLICATION_PARAMS_DELIMITER
+ proto.getData() + EC_REPLICATION_PARAMS_DELIMITER
+ proto.getParity() + EC_REPLICATION_PARAMS_DELIMITER
+ proto.getEcChunkSize();
}

public HddsProtos.ECReplicationConfig toProto() {
return HddsProtos.ECReplicationConfig.newBuilder()
.setData(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,65 +326,65 @@ private static void addDeprecatedKeys() {
new DeprecationDelta("hdds.datanode.replication.work.dir",
OZONE_CONTAINER_COPY_WORKDIR),
new DeprecationDelta("dfs.container.chunk.write.sync",
OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_KEY),
OzoneConfigKeys.HDDS_CONTAINER_CHUNK_WRITE_SYNC_KEY),
new DeprecationDelta("dfs.container.ipc",
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT),
OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT),
new DeprecationDelta("dfs.container.ipc.random.port",
OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT),
OzoneConfigKeys.HDDS_CONTAINER_IPC_RANDOM_PORT),
new DeprecationDelta("dfs.container.ratis.admin.port",
OzoneConfigKeys.DFS_CONTAINER_RATIS_ADMIN_PORT),
OzoneConfigKeys.HDDS_CONTAINER_RATIS_ADMIN_PORT),
new DeprecationDelta("dfs.container.ratis.datanode.storage.dir",
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR),
OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATANODE_STORAGE_DIR),
new DeprecationDelta("dfs.container.ratis.datastream.enabled",
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLED),
OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED),
new DeprecationDelta("dfs.container.ratis.datastream.port",
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_PORT),
OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_PORT),
new DeprecationDelta("dfs.container.ratis.datastream.random.port",
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT),
OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT),
new DeprecationDelta("dfs.container.ratis.enabled",
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY),
ScmConfigKeys.HDDS_CONTAINER_RATIS_ENABLED_KEY),
new DeprecationDelta("dfs.container.ratis.ipc",
OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT),
OzoneConfigKeys.HDDS_CONTAINER_RATIS_IPC_PORT),
new DeprecationDelta("dfs.container.ratis.ipc.random.port",
OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT),
OzoneConfigKeys.HDDS_CONTAINER_RATIS_IPC_RANDOM_PORT),
new DeprecationDelta("dfs.container.ratis.leader.pending.bytes.limit",
ScmConfigKeys.DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT),
ScmConfigKeys.HDDS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT),
new DeprecationDelta("dfs.container.ratis.log.appender.queue.byte-limit",
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT),
ScmConfigKeys.HDDS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT),
new DeprecationDelta("dfs.container.ratis.log.appender.queue.num-elements",
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS),
ScmConfigKeys.HDDS_CONTAINER_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS),
new DeprecationDelta("dfs.container.ratis.log.purge.gap",
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_PURGE_GAP),
ScmConfigKeys.HDDS_CONTAINER_RATIS_LOG_PURGE_GAP),
new DeprecationDelta("dfs.container.ratis.log.queue.byte-limit",
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT),
ScmConfigKeys.HDDS_CONTAINER_RATIS_LOG_QUEUE_BYTE_LIMIT),
new DeprecationDelta("dfs.container.ratis.log.queue.num-elements",
ScmConfigKeys.DFS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS),
ScmConfigKeys.HDDS_CONTAINER_RATIS_LOG_QUEUE_NUM_ELEMENTS),
new DeprecationDelta("dfs.container.ratis.num.container.op.executors",
ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY),
ScmConfigKeys.HDDS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY),
new DeprecationDelta("dfs.container.ratis.num.write.chunk.threads.per.volume",
ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_PER_VOLUME),
ScmConfigKeys.HDDS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_PER_VOLUME),
new DeprecationDelta("dfs.container.ratis.replication.level",
ScmConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY),
ScmConfigKeys.HDDS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY),
new DeprecationDelta("dfs.container.ratis.rpc.type",
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY),
ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_KEY),
new DeprecationDelta("dfs.container.ratis.segment.preallocated.size",
ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY),
ScmConfigKeys.HDDS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY),
new DeprecationDelta("dfs.container.ratis.segment.size",
ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY),
ScmConfigKeys.HDDS_CONTAINER_RATIS_SEGMENT_SIZE_KEY),
new DeprecationDelta("dfs.container.ratis.server.port",
OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_PORT),
OzoneConfigKeys.HDDS_CONTAINER_RATIS_SERVER_PORT),
new DeprecationDelta("dfs.container.ratis.statemachinedata.sync.retries",
ScmConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES),
ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_RETRIES),
new DeprecationDelta("dfs.container.ratis.statemachinedata.sync.timeout",
ScmConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT),
ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT),
new DeprecationDelta("dfs.container.ratis.statemachine.max.pending.apply-transactions",
ScmConfigKeys.DFS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS),
ScmConfigKeys.HDDS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS),
new DeprecationDelta("dfs.ratis.leader.election.minimum.timeout.duration",
ScmConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY),
ScmConfigKeys.HDDS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY),
new DeprecationDelta("dfs.ratis.server.retry-cache.timeout.duration",
ScmConfigKeys.DFS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY),
ScmConfigKeys.HDDS_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DURATION_KEY),
new DeprecationDelta("dfs.ratis.snapshot.threshold",
ScmConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY)
ScmConfigKeys.HDDS_RATIS_SNAPSHOT_THRESHOLD_KEY)
});
}

Expand Down
Loading

0 comments on commit 3cadb24

Please sign in to comment.