Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10431. Merge recent commits from master (7c8160fe) to HDDS-7593 #6286

Merged
merged 111 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
111 commits
Select commit Hold shift + click to select a range
039dea9
HDDS-10229. Fixes for Grafana dashboards (#6120)
kerneltime Feb 8, 2024
3d8365f
HDDS-10296. Orphan blocks during overwrite of key. (#6180)
sumitagrawl Feb 8, 2024
06399b5
HDDS-10226. Refactor OMRequestTestUtils.createOmKeyInfo (#6184)
Galsza Feb 8, 2024
e93e781
HDDS-10291. Set simple properties in MiniOzoneCluster via Configurati…
adoroszlai Feb 8, 2024
5715aee
HDDS-10328. Support cross realm Kerberos out of box. (#6192)
jojochuang Feb 8, 2024
601fd41
HDDS-10322. Make VolumeArgs immutable (#6193)
adoroszlai Feb 8, 2024
c1efa33
HDDS-8627. Recon - API for Count of deletePending directories (#5037)
ArafatKhan2198 Feb 9, 2024
15b62de
HDDS-10333. RocksDB logger not closed (#6200)
adoroszlai Feb 9, 2024
75df6c1
HDDS-9843. Ozone client high memory (heap) utilization (#6153)
duongkame Feb 9, 2024
d3e2e59
HDDS-10319. Also consider bucket layout deciding whether to normalize…
devmadhuu Feb 9, 2024
2f2234c
HDDS-10262. Encapsulate SnapshotCache inside OmSnapshotManager (#6135)
Cyrill Feb 9, 2024
af25a48
HDDS-10340. Skip ci on dashboard updates (#6203)
kerneltime Feb 9, 2024
47ef84c
HDDS-10318. Add OM client protocol metrics to Ozone - ListKey Metrics…
tanvipenumudy Feb 9, 2024
7c79246
HDDS-10256. Retry block allocation when SCM is in safe mode. (#6189)
ashishkumar50 Feb 10, 2024
c35e99f
HDDS-10250. Use SnapshotId as key in SnapshotCache (#6139)
hemantk-12 Feb 10, 2024
ffd8221
HDDS-10343. Remove dependency on jsr305 (#6208)
adoroszlai Feb 12, 2024
cd00691
HDDS-10218. Speed up TestSstFilteringService (#6196)
raju-balpande Feb 12, 2024
45c853c
HDDS-10325. Make BucketArgs immutable (#6205)
adoroszlai Feb 12, 2024
c289c67
HDDS-10344. Schedule dependabot for weekend (#6209)
adoroszlai Feb 12, 2024
68662a7
HDDS-10347. Bump jacoco to 0.8.11 (#6214)
dependabot[bot] Feb 12, 2024
bacb184
HDDS-10356. Bump exec-maven-plugin to 3.1.1 (#6215)
dependabot[bot] Feb 13, 2024
7370676
HDDS-9680. Use md5 hash of multipart object part's content as ETag (#…
vtutrinov Feb 13, 2024
3c4683e
HDDS-9738. Display startTime, pipeline and container counts for decom…
Tejaskriya Feb 13, 2024
44adf80
HDDS-10359. Recursively deleting volume with OBS bucket shows error d…
ashishkumar50 Feb 14, 2024
3876852
HDDS-10368. Bump guice to 6.0.0 (#6212)
dependabot[bot] Feb 14, 2024
78fc781
HDDS-10369. Set Times API doesn't work with linked buckets. (#6220)
sadanand48 Feb 15, 2024
6194d42
HDDS-10301. Recon - Fold the pipeline info for a DN on Datanode page.…
smitajoshi12 Feb 15, 2024
2348784
HDDS-10288. Checksum to support direct buffers (#6162)
duongkame Feb 15, 2024
cce2f96
HDDS-10339. Add S3 API level dashboard (#6202)
kerneltime Feb 15, 2024
e0f3ae1
HDDS-10391. Bump joda-time to 2.12.7 (#6230)
dependabot[bot] Feb 18, 2024
c672453
HDDS-10387. Fix parameter number warning in KeyOutputStream and relat…
szetszwo Feb 20, 2024
932a0ac
HDDS-10342. Reduce code duplication in MiniOzoneCluster builders (#6206)
adoroszlai Feb 20, 2024
c8e6cab
HDDS-10345. No need to sort excluded datanodes during Ratis pipeline …
siddhantsangwan Feb 20, 2024
a1f8390
HDDS-10010. Support snapshot rename operation (#6006)
Cyrill Feb 20, 2024
db1561a
HDDS-10385. Memory leak for thread local usages in OMClientRequest. (…
sumitagrawl Feb 21, 2024
652b9bc
HDDS-10401. Bump commons-compress to 1.26.0 (#6240)
dependabot[bot] Feb 21, 2024
3a01fea
HDDS-10406. Bump aws-java-sdk to 1.12.661 (#6249)
rohit-kb Feb 21, 2024
9c187df
HDDS-10398. Remove deleted_blocks table in container schema V2 and V3…
ChenSammi Feb 22, 2024
c9d3b23
HDDS-10397. Restrict legacy bucket directory deletion through sh comm…
ashishkumar50 Feb 22, 2024
5f6306d
HDDS-10395. Fix eTag compatibility issues for MPU (#6235)
ivandika3 Feb 22, 2024
45d420a
HDDS-10293. IllegalArgumentException: containerSize Negative (#6178)
ArafatKhan2198 Feb 22, 2024
f0b75b7
HDDS-10383. Introduce a Provider for client-side thread resources pas…
xichen01 Feb 22, 2024
6dfd7d4
HDDS-10408. NPE causes OM crash in Snapshot Purge request (#6250)
aswinshakil Feb 22, 2024
b537a6a
HDDS-10396. Encapsulate fields in WithMetadata and subclasses (#6238)
adoroszlai Feb 22, 2024
d883d7d
HDDS-10149. New JNI layer for RawSSTFileReader & RawSSTFileReaderIter…
swamirishi Feb 22, 2024
284846f
HDDS-10363. HDDS-9388 broke encryption. (#6219)
jojochuang Feb 23, 2024
83cb2b7
HDDS-10320. Introduce factory to configure MiniOzoneCluster's datanod…
adoroszlai Feb 23, 2024
13b635c
HDDS-10410. Avoid creating ChunkInfo. (#6258)
szetszwo Feb 23, 2024
babf85c
HDDS-10403. CopyObject should set ETag based on the key content (#6251)
ivandika3 Feb 23, 2024
e9f11f0
HDDS-10404. Ozone admin reconfig command fails with security enabled …
ChenSammi Feb 23, 2024
0bac7ef
HDDS-10405. ozone admin has hard-coded info loglevel (#6254)
adoroszlai Feb 23, 2024
c325315
HDDS-10418. Bump commons-io to 2.15.1 (#6266)
dependabot[bot] Feb 24, 2024
decacde
HDDS-10419. Bump maven-gpg-plugin to 3.1.0 (#6211)
dependabot[bot] Feb 24, 2024
f62a8e3
HDDS-10420. Bump restrict-imports-enforcer-rule to 2.5.0 (#6264)
dependabot[bot] Feb 24, 2024
0cd6b3b
HDDS-10399. IndexOutOfBoundsException when shallow listing empty dire…
SaketaChalamchala Feb 25, 2024
dc9bd61
HDDS-10365. Fix description for `ozone getconf ozonemanagers` (#6263)
david1859168 Feb 25, 2024
df68290
HDDS-10214. Update supported versions in security policy up to 1.4.0 …
ivandika3 Feb 25, 2024
84c6e4d
HDDS-10384. RPC client Reusing thread resources. (#6270)
xichen01 Feb 26, 2024
50d43e8
HDDS-10394. Fix parameter number warning in om.helpers (#6271)
adoroszlai Feb 26, 2024
1b48186
HDDS-10415. Remove duplicate HA MiniOzoneCluster factory method (#6261)
adoroszlai Feb 26, 2024
9c2fb3a
HDDS-10346. Make test cases in TestSstFilteringService independent (#…
raju-balpande Feb 26, 2024
2d77fb4
HDDS-10423. Datanode fails to start with invalid checksum size settin…
adoroszlai Feb 27, 2024
f6d455f
HDDS-10360. Make cleanupTest compatible with enableFileSystemPath val…
ashishkumar50 Feb 27, 2024
083a45e
Revert "HDDS-10384. RPC client Reusing thread resources. (#6270)" (#6…
adoroszlai Feb 27, 2024
7939faf
HDDS-815. Rename HDDS config keys prefixed with dfs. (#6274)
sarvekshayr Feb 27, 2024
0e413c9
HDDS-10428. OzoneClientConfig#validate does not get called (#6282)
ChenSammi Feb 27, 2024
54548aa
HDDS-10327. S3G does not work in a single-node deployment (#6257)
Tejaskriya Feb 27, 2024
d3c5cce
Merge remote-tracking branch 'asf/master' into HDDS-7593
smengcl Feb 27, 2024
9fb61ff
HDDS-10413. Recon - UnsupportedOperationException while merging Incre…
devmadhuu Feb 27, 2024
1e98ebb
HDDS-10432. Hadoop FS client write(byte[], int, int) is very slow in …
duongkame Feb 27, 2024
e0bf7b4
HDDS-10370. Recon - Handle the pre-existing missing empty containers …
devmadhuu Feb 28, 2024
8c4ab8e
HDDS-8683. Container balancer thread interrupt may not work (#6179)
Tejaskriya Feb 28, 2024
f440654
HDDS-10416. Move HA-specific settings to MiniOzoneHAClusterImpl.Build…
adoroszlai Feb 28, 2024
4da5a64
HDDS-10425. Increase OM transaction index for non-Ratis based on exis…
whbing Feb 28, 2024
543c9e7
HDDS-9235. ReplicationManager metrics not collected after restart. (#…
aswinshakil Feb 28, 2024
1830fe2
HDDS-10367. Fix possible NPE in listKeysLight, listStatus, listStatus…
ivanzlenko Feb 28, 2024
aa68aec
HDDS-10324. Metadata are not updated when keys are overwritten. (#6273)
ArafatKhan2198 Feb 28, 2024
4095ef1
HDDS-10278. Simplify tests using assertDoesNotThrow (#6291)
wzhallright Feb 28, 2024
8fcd039
HDDS-10437. Rename method to getContainersPendingReplication (#6293)
Tejaskriya Feb 28, 2024
01f8d62
HDDS-10331. Rename Java constants of ex-DFS config keys (#6290)
sarvekshayr Feb 28, 2024
1eeaa0b
HDDS-10144. Zero-Copy in replication (#6049)
guohao-rosicky Feb 28, 2024
c3271b8
HDDS-10144. (addendum) Zero-Copy in replication
adoroszlai Feb 28, 2024
083e914
HDDS-10433. Add Prometheus scrape target for Datanodes in Compose v2 …
kerneltime Feb 28, 2024
804366a
HDDS-6713. Avoid the need to cast to MiniOzoneHAClusterImpl (#6295)
adoroszlai Feb 29, 2024
384103a
HDDS-10282. Fix pagination on the OM DB Insights page in Recon (#6190)
dombizita Feb 29, 2024
0a5fc69
HDDS-7810. Support namespace summaries (du, dist & counts) for OBJECT…
ArafatKhan2198 Mar 1, 2024
11fddc4
HDDS-10041. Do not start the daemon inside the OzoneManagerDoubleBuff…
szetszwo Mar 1, 2024
3a872b4
HDDS-10447. Extract helper methods from Ozone native ACL unit tests (…
adoroszlai Mar 1, 2024
2710129
HDDS-10453. Bump httpclient to 4.5.14 (#6311)
dependabot[bot] Mar 2, 2024
61dbb08
HDDS-10455. Bump protobuf-maven-plugin to 0.6.1 (#6313)
dependabot[bot] Mar 2, 2024
11c5eb8
HDDS-10456. Bump slf4j to 2.0.12 (#6312)
dependabot[bot] Mar 2, 2024
b513cdc
HDDS-10439. Remove setConf from MiniOzoneCluster public interface (#6…
adoroszlai Mar 4, 2024
a65991f
HDDS-10444. Reduce string concatenation in ContainerImporter#importCo…
jianghuazhu Mar 4, 2024
650e777
HDDS-10459. Bump snappy-java to 1.1.10.5 (#6324)
vtutrinov Mar 4, 2024
a248ed1
HDDS-10329. [Snapshot] Add unit-test for recreating snapshots with de…
jyotirmoy-gh Mar 5, 2024
ff1e414
HDDS-10430. Race condition around Pipeline#nodesInOrder (#6316)
adoroszlai Mar 5, 2024
f7a421b
HDDS-10446. Refactor Node2ObjectsMap, Node2PipelineMap, Node2Containe…
Montura Mar 5, 2024
8059213
HDDS-10458. Mention `ozone admin datanode status decommission` in doc…
Tejaskriya Mar 5, 2024
b69674c
HDDS-10412. Prefix ACL check needs to resolve the bucket link (#6268)
ivandika3 Mar 5, 2024
f16b1af
HDDS-10424. Improve error message for prefix without trailing slash (…
sarvekshayr Mar 5, 2024
9a6ece2
HDDS-10457. Remove dependency commons-pool2 (#6317)
adoroszlai Mar 5, 2024
4243721
HDDS-10450. Add GitHub actions labeler for the reconciliation feature…
errose28 Mar 5, 2024
87d8d61
HDDS-10467. Reduce metrics visibility (#6332)
adoroszlai Mar 6, 2024
2f05353
HDDS-10384. RPC client reusing thread resources. (#6326)
xichen01 Mar 6, 2024
418528a
HDDS-10460. Refine audit logging for bucket property update operation…
tanvipenumudy Mar 6, 2024
309e459
HDDS-10472. Audit log should include EC replication config (#6338)
adoroszlai Mar 6, 2024
1d81c32
Merge remote-tracking branch 'asf/HDDS-7593' into HDDS-7593
smengcl Mar 6, 2024
140c5de
HDDS-9343. Shift sortDatanodes logic to OM (#5391)
tanvipenumudy Mar 6, 2024
a145dd5
HDDS-9343. (addendum) Shift sortDatanodes logic to OM (#5391)
adoroszlai Mar 7, 2024
7c8160f
HDDS-10482. OMRequestTestUtils.createOmKeyInfo should set key modific…
smengcl Mar 7, 2024
3cadb24
Merge remote-tracking branch 'asf/master' into HDDS-7593
smengcl Mar 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 2 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,7 @@ updates:
directory: "/"
schedule:
interval: "weekly"
day: "saturday"
time: "07:00" # UTC
pull-request-branch-name:
separator: "-"
21 changes: 21 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Configuration for .github/workflows/label-pr.yml

# This rule can be deleted once the container reconciliation feature branch is merged.
container-reconciliation:
- base-branch: HDDS-10239-container-reconciliation

29 changes: 29 additions & 0 deletions .github/workflows/label-pr.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This workflow reads its configuration from the .github/labeler.yml file.
name: pull-request-labeler
on:
- pull_request_target

jobs:
labeler:
permissions:
contents: read
pull-requests: write
runs-on: ubuntu-latest
steps:
- uses: actions/labeler@v5

9 changes: 6 additions & 3 deletions SECURITY.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
The first stable release of Apache Ozone is 1.0, the previous alpha and beta releases are not supported by the community.

| Version | Supported |
| ------------- | ------------------ |
|---------------| ------------------ |
| 0.3.0 (alpha) | :x: |
| 0.4.0 (alpha) | :x: |
| 0.4.1 (alpha) | :x: |
| 0.5.0 (beta) | :x: |
| 1.0 | :white_check_mark: |
| 1.1 | :white_check_mark: |
| 1.0.0 | :x: |
| 1.1.0 | :x: |
| 1.2.1 | :x: |
| 1.3.0 | :x: |
| 1.4.0 | :white_check_mark: |

## Reporting a Vulnerability

Expand Down
12 changes: 12 additions & 0 deletions dev-support/ci/selective_ci_checks.bats
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ load bats-assert/load.bash
assert_output -p needs-kubernetes-tests=false
}

@test "dashboard only" {
run dev-support/ci/selective_ci_checks.sh 039dea9

assert_output -p 'basic-checks=["rat"]'
assert_output -p needs-build=false
assert_output -p needs-compile=false
assert_output -p needs-compose-tests=false
assert_output -p needs-dependency-check=false
assert_output -p needs-integration-tests=false
assert_output -p needs-kubernetes-tests=false
}

@test "compose and robot" {
run dev-support/ci/selective_ci_checks.sh b83039eef

Expand Down
2 changes: 2 additions & 0 deletions dev-support/ci/selective_ci_checks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ function get_count_compose_files() {
local ignore_array=(
"^hadoop-ozone/dist/src/main/k8s"
"^hadoop-ozone/dist/src/main/license"
"^hadoop-ozone/dist/src/main/compose/common/grafana/dashboards"
"\.md$"
)
filter_changed_files true
Expand Down Expand Up @@ -494,6 +495,7 @@ function get_count_misc_files() {
"\.md$"
"findbugsExcludeFile.xml"
"/NOTICE$"
"^hadoop-ozone/dist/src/main/compose/common/grafana/dashboards"
)
local ignore_array=(
"^.github/workflows/post-commit.yml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,28 +111,23 @@ public void recordWriteChunk(Pipeline pipeline, long chunkSizeBytes) {
totalWriteChunkBytes.incr(chunkSizeBytes);
}

@VisibleForTesting
public MutableCounterLong getTotalWriteChunkBytes() {
MutableCounterLong getTotalWriteChunkBytes() {
return totalWriteChunkBytes;
}

@VisibleForTesting
public MutableCounterLong getTotalWriteChunkCalls() {
MutableCounterLong getTotalWriteChunkCalls() {
return totalWriteChunkCalls;
}

@VisibleForTesting
public Map<PipelineID, MutableCounterLong> getWriteChunkBytesByPipeline() {
Map<PipelineID, MutableCounterLong> getWriteChunkBytesByPipeline() {
return writeChunkBytesByPipeline;
}

@VisibleForTesting
public Map<PipelineID, MutableCounterLong> getWriteChunkCallsByPipeline() {
Map<PipelineID, MutableCounterLong> getWriteChunkCallsByPipeline() {
return writeChunkCallsByPipeline;
}

@VisibleForTesting
public Map<UUID, MutableCounterLong> getWriteChunksCallsByLeaders() {
Map<UUID, MutableCounterLong> getWriteChunksCallsByLeaders() {
return writeChunksCallsByLeaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,13 @@ public enum ChecksumCombineMode {
// 3 concurrent stripe read should be enough.
private int ecReconstructStripeReadPoolLimit = 10 * 3;

@Config(key = "ec.reconstruct.stripe.write.pool.limit",
defaultValue = "30",
description = "Thread pool max size for parallelly write" +
" available ec chunks to reconstruct the whole stripe.",
tags = ConfigTag.CLIENT)
private int ecReconstructStripeWritePoolLimit = 10 * 3;

@Config(key = "checksum.combine.mode",
defaultValue = "COMPOSITE_CRC",
description = "The combined checksum type [MD5MD5CRC / COMPOSITE_CRC] "
Expand Down Expand Up @@ -233,7 +240,7 @@ public enum ChecksumCombineMode {
private boolean incrementalChunkList = true;

@PostConstruct
private void validate() {
public void validate() {
Preconditions.checkState(streamBufferSize > 0);
Preconditions.checkState(streamBufferFlushSize > 0);
Preconditions.checkState(streamBufferMaxSize > 0);
Expand Down Expand Up @@ -396,6 +403,14 @@ public int getEcReconstructStripeReadPoolLimit() {
return ecReconstructStripeReadPoolLimit;
}

public void setEcReconstructStripeWritePoolLimit(int poolLimit) {
this.ecReconstructStripeWritePoolLimit = poolLimit;
}

public int getEcReconstructStripeWritePoolLimit() {
return ecReconstructStripeWritePoolLimit;
}

public void setFsDefaultBucketLayout(String bucketLayout) {
if (!bucketLayout.isEmpty()) {
this.fsDefaultBucketLayout = bucketLayout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ private synchronized void connectToDatanode(DatanodeDetails dn)
// port.
int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
if (port == 0) {
port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
port = config.getInt(OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT,
OzoneConfigKeys.HDDS_CONTAINER_IPC_PORT_DEFAULT);
}

// Add credential context to the client call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public static XceiverClientRatis newXceiverClientRatis(
org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
ConfigurationSource ozoneConf, ClientTrustManager trustManager) {
final String rpcType = ozoneConf
.get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
.get(ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
SecurityConfig(ozoneConf), trustManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ SortedMap<Long, List<BUFFER>> getCommitIndexMap() {
return commitIndexMap;
}

void updateCommitInfoMap(long index, List<BUFFER> buffers) {
synchronized void updateCommitInfoMap(long index, List<BUFFER> buffers) {
commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
.addAll(buffers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -157,7 +157,8 @@ public BlockOutputStream(
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> blockOutputStreamResourceProvider
) throws IOException {
this.xceiverClientFactory = xceiverClientManager;
this.config = config;
Expand Down Expand Up @@ -199,8 +200,7 @@ public BlockOutputStream(
(long) flushPeriod * streamBufferArgs.getStreamBufferSize() == streamBufferArgs
.getStreamBufferFlushSize());

// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
this.responseExecutor = blockOutputStreamResourceProvider.get();
bufferList = null;
totalDataFlushedLength = 0;
writtenDataLength = 0;
Expand Down Expand Up @@ -682,7 +682,6 @@ public void cleanup(boolean invalidateClient) {
bufferList.clear();
}
bufferList = null;
responseExecutor.shutdown();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ void releaseBuffer(ChunkBuffer chunkBuffer) {
}

public void clearBufferPool() {
bufferList.forEach(ChunkBuffer::close);
bufferList.clear();
currentBufferIndex = -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
*/
package org.apache.hadoop.hdds.scm.storage;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.ozone.common.ChunkBuffer;
Expand All @@ -32,6 +33,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;

/**
* This class executes watchForCommit on ratis pipeline and releases
Expand All @@ -42,8 +44,8 @@ class CommitWatcher extends AbstractCommitWatcher<ChunkBuffer> {
private final BufferPool bufferPool;

// future Map to hold up all putBlock futures
private final ConcurrentMap<Long, CompletableFuture<
ContainerCommandResponseProto>> futureMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, CompletableFuture<ContainerCommandResponseProto>>
futureMap = new ConcurrentHashMap<>();

CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) {
super(xceiverClient);
Expand All @@ -67,11 +69,24 @@ void releaseBuffers(long index) {
+ totalLength + ": existing = " + futureMap.keySet());
}

ConcurrentMap<Long, CompletableFuture<
ContainerCommandResponseProto>> getFutureMap() {
@VisibleForTesting
ConcurrentMap<Long, CompletableFuture<ContainerCommandResponseProto>> getFutureMap() {
return futureMap;
}

public void putFlushFuture(long flushPos, CompletableFuture<ContainerCommandResponseProto> flushFuture) {
futureMap.compute(flushPos,
(key, previous) -> previous == null ? flushFuture :
previous.thenCombine(flushFuture, (prev, curr) -> curr));
}


public void waitOnFlushFutures() throws InterruptedException, ExecutionException {
// wait for all the transactions to complete
CompletableFuture.allOf(futureMap.values().toArray(
new CompletableFuture[0])).get();
}

@Override
public void cleanup() {
super.cleanup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.putBlockAsync;
Expand Down Expand Up @@ -75,10 +77,11 @@ public ECBlockOutputStream(
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> executorServiceSupplier
) throws IOException {
super(blockID, xceiverClientManager,
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs);
pipeline, bufferPool, config, token, clientMetrics, streamBufferArgs, executorServiceSupplier);
// In EC stream, there will be only one node in pipeline.
this.datanodeDetails = pipeline.getClosestNode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

/**
* An {@link OutputStream} used by the REST service in combination with the
Expand Down Expand Up @@ -65,8 +67,8 @@ public class RatisBlockOutputStream extends BlockOutputStream
/**
* Creates a new BlockOutputStream.
*
* @param blockID block ID
* @param bufferPool pool of buffers
* @param blockID block ID
* @param bufferPool pool of buffers
*/
@SuppressWarnings("checkstyle:ParameterNumber")
public RatisBlockOutputStream(
Expand All @@ -76,10 +78,11 @@ public RatisBlockOutputStream(
BufferPool bufferPool,
OzoneClientConfig config,
Token<? extends TokenIdentifier> token,
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs
ContainerClientMetrics clientMetrics, StreamBufferArgs streamBufferArgs,
Supplier<ExecutorService> blockOutputStreamResourceProvider
) throws IOException {
super(blockID, xceiverClientManager, pipeline,
bufferPool, config, token, clientMetrics, streamBufferArgs);
bufferPool, config, token, clientMetrics, streamBufferArgs, blockOutputStreamResourceProvider);
this.commitWatcher = new CommitWatcher(bufferPool, getXceiverClient());
}

Expand Down Expand Up @@ -110,16 +113,13 @@ void updateCommitInfo(XceiverClientReply reply, List<ChunkBuffer> buffers) {
}

@Override
void putFlushFuture(long flushPos,
CompletableFuture<ContainerCommandResponseProto> flushFuture) {
commitWatcher.getFutureMap().put(flushPos, flushFuture);
void putFlushFuture(long flushPos, CompletableFuture<ContainerCommandResponseProto> flushFuture) {
commitWatcher.putFlushFuture(flushPos, flushFuture);
}

@Override
void waitOnFlushFutures() throws InterruptedException, ExecutionException {
// wait for all the transactions to complete
CompletableFuture.allOf(commitWatcher.getFutureMap().values().toArray(
new CompletableFuture[0])).get();
commitWatcher.waitOnFlushFutures();
}

@Override
Expand Down
Loading