Skip to content

Commit

Permalink
Single buffer for small add requests (#3783)
Browse files Browse the repository at this point in the history
* Single buffer for small add requests

* Fixed checkstyle

* Fixed treating of ComposityByteBuf

* Fixed merge issues

* Fixed merge issues

* WIP

* Fixed test and removed dead code

* Removed unused import

* Fixed BookieJournalTest

* removed unused import

* fix the checkstyle

* fix failed test

* fix failed test

---------

Co-authored-by: chenhang <[email protected]>
  • Loading branch information
merlimat and hangc0276 authored Mar 20, 2023
1 parent e19cb9d commit 234b817
Show file tree
Hide file tree
Showing 20 changed files with 187 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import static org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY;

import com.google.common.util.concurrent.RateLimiter;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCounted;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -403,17 +405,24 @@ public void readComplete(int rc, LedgerHandle lh,
numEntriesRead.inc();
numBytesRead.registerSuccessfulValue(dataLength);

ByteBufList toSend = lh.getDigestManager()
ReferenceCounted toSend = lh.getDigestManager()
.computeDigestAndPackageForSending(entryId,
lh.getLastAddConfirmed(), entry.getLength(),
Unpooled.wrappedBuffer(data, 0, data.length));
Unpooled.wrappedBuffer(data, 0, data.length),
lh.getLedgerKey(),
0
);
if (replicationThrottle != null) {
updateAverageEntrySize(toSend.readableBytes());
if (toSend instanceof ByteBuf) {
updateAverageEntrySize(((ByteBuf) toSend).readableBytes());
} else if (toSend instanceof ByteBufList) {
updateAverageEntrySize(((ByteBufList) toSend).readableBytes());
}
}
for (BookieId newBookie : newBookies) {
long startWriteEntryTime = MathUtils.nowInNano();
bkc.getBookieClient().addEntry(newBookie, lh.getId(),
lh.getLedgerKey(), entryId, ByteBufList.clone(toSend),
lh.getLedgerKey(), entryId, toSend,
multiWriteCallback, dataLength, BookieProtocol.FLAG_RECOVERY_ADD,
false, WriteFlag.NONE);
writeDataLatency.registerSuccessfulEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
Expand All @@ -38,7 +39,6 @@
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,7 +56,7 @@ class PendingAddOp implements WriteCallback {
private static final Logger LOG = LoggerFactory.getLogger(PendingAddOp.class);

ByteBuf payload;
ByteBufList toSend;
ReferenceCounted toSend;
AddCallbackWithLatency cb;
Object ctx;
long entryId;
Expand Down Expand Up @@ -242,9 +242,10 @@ public synchronized void initiate() {
checkNotNull(lh);
checkNotNull(lh.macManager);

int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE;
this.toSend = lh.macManager.computeDigestAndPackageForSending(
entryId, lh.lastAddConfirmed, currentLedgerLength,
payload);
payload, lh.ledgerKey, flags);
// ownership of RefCounted ByteBuf was passed to computeDigestAndPackageForSending
payload = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.bookkeeper.auth.AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME;

import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -39,6 +40,7 @@
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.util.ByteBufList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -358,8 +360,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
} else {
waitingForAuth.add(msg);
}
} else if (msg instanceof ByteBuf || msg instanceof ByteBufList) {
waitingForAuth.add(msg);
} else {
LOG.info("dropping write of message {}", msg);
LOG.info("[{}] dropping write of message {}", ctx.channel(), msg);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
package org.apache.bookkeeper.proto;

import io.netty.util.ReferenceCounted;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -139,7 +140,7 @@ void writeLac(BookieId address, long ledgerId, byte[] masterKey,
* {@link org.apache.bookkeeper.client.api.WriteFlag}
*/
void addEntry(BookieId address, long ledgerId, byte[] masterKey,
long entryId, ByteBufList toSend, WriteCallback cb, Object ctx,
long entryId, ReferenceCounted toSend, WriteCallback cb, Object ctx,
int options, boolean allowFastFail, EnumSet<WriteFlag> writeFlags);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.EnumSet;
Expand Down Expand Up @@ -288,7 +289,7 @@ public void addEntry(final BookieId addr,
final long ledgerId,
final byte[] masterKey,
final long entryId,
final ByteBufList toSend,
final ReferenceCounted toSend,
final WriteCallback cb,
final Object ctx,
final int options,
Expand Down Expand Up @@ -357,7 +358,7 @@ private static class ChannelReadyForAddEntryCallback
private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;

private BookieClientImpl bookieClient;
private ByteBufList toSend;
private ReferenceCounted toSend;
private long ledgerId;
private long entryId;
private BookieId addr;
Expand All @@ -369,7 +370,7 @@ private static class ChannelReadyForAddEntryCallback
private EnumSet<WriteFlag> writeFlags;

static ChannelReadyForAddEntryCallback create(
BookieClientImpl bookieClient, ByteBufList toSend, long ledgerId,
BookieClientImpl bookieClient, ReferenceCounted toSend, long ledgerId,
long entryId, BookieId addr, Object ctx,
WriteCallback cb, int options, byte[] masterKey, boolean allowFastFail,
EnumSet<WriteFlag> writeFlags) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,7 @@ public Object encode(Object msg, ByteBufAllocator allocator)
return msg;
}
BookieProtocol.Request r = (BookieProtocol.Request) msg;
if (r instanceof BookieProtocol.AddRequest) {
BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) r;
ByteBufList data = ar.getData();

int totalHeaderSize = 4 // for the request header
+ BookieProtocol.MASTER_KEY_LENGTH; // for the master key

int totalPayloadSize = totalHeaderSize + data.readableBytes();
ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame size */);
buf.writeInt(totalPayloadSize); // Frame header
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags()));
buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);

ar.recycle();
data.prepend(buf);
return data;
} else if (r instanceof BookieProtocol.ReadRequest) {
if (r instanceof BookieProtocol.ReadRequest) {
int totalHeaderSize = 4 // for request type
+ 8 // for ledgerId
+ 8; // for entryId
Expand Down Expand Up @@ -437,7 +421,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
if (LOG.isTraceEnabled()) {
LOG.trace("Encode request {} to channel {}.", msg, ctx.channel());
}
if (msg instanceof BookkeeperProtocol.Request) {
if (msg instanceof ByteBuf || msg instanceof ByteBufList) {
ctx.write(msg, promise);
} else if (msg instanceof BookkeeperProtocol.Request) {
ctx.write(reqV3.encode(msg, ctx.alloc()), promise);
} else if (msg instanceof BookieProtocol.Request) {
ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.util.ByteBufList;

/**
* The packets of the Bookie protocol all have a 4-byte integer indicating the
Expand Down Expand Up @@ -252,58 +251,6 @@ public String toString() {
public void recycle() {}
}

/**
* A Request that adds data.
*/
class AddRequest extends Request {
ByteBufList data;

static AddRequest create(byte protocolVersion, long ledgerId,
long entryId, short flags, byte[] masterKey,
ByteBufList data) {
AddRequest add = RECYCLER.get();
add.protocolVersion = protocolVersion;
add.opCode = ADDENTRY;
add.ledgerId = ledgerId;
add.entryId = entryId;
add.flags = flags;
add.masterKey = masterKey;
add.data = data.retain();
return add;
}

ByteBufList getData() {
// We need to have different ByteBufList instances for each bookie write
return ByteBufList.clone(data);
}

boolean isRecoveryAdd() {
return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD;
}

private final Handle<AddRequest> recyclerHandle;
private AddRequest(Handle<AddRequest> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<AddRequest> RECYCLER = new Recycler<AddRequest>() {
@Override
protected AddRequest newObject(Handle<AddRequest> handle) {
return new AddRequest(handle);
}
};

@Override
public void recycle() {
ledgerId = -1;
entryId = -1;
masterKey = null;
ReferenceCountUtil.release(data);
data = null;
recyclerHandle.recycle(this);
}
}

/**
* This is similar to add request, but it used when processing the request on the bookie side.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
Expand Down Expand Up @@ -771,7 +772,7 @@ void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) {
* @param writeFlags
* WriteFlags
*/
void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBufList toSend, WriteCallback cb,
void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ReferenceCounted toSend, WriteCallback cb,
Object ctx, final int options, boolean allowFastFail, final EnumSet<WriteFlag> writeFlags) {
Object request = null;
CompletionKey completionKey = null;
Expand All @@ -782,9 +783,12 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf
return;
}
completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY);
request = BookieProtocol.AddRequest.create(
BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
(short) options, masterKey, toSend);

if (toSend instanceof ByteBuf) {
request = ((ByteBuf) toSend).retainedDuplicate();
} else {
request = ByteBufList.clone((ByteBufList) toSend);
}
} else {
final long txnId = getTxnId();
completionKey = new V3CompletionKey(txnId, OperationType.ADD_ENTRY);
Expand All @@ -799,11 +803,14 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf
}

ByteString body = null;
if (toSend.hasArray()) {
body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes());
ByteBufList bufToSend = (ByteBufList) toSend;

if (bufToSend.hasArray()) {
body = UnsafeByteOperations.unsafeWrap(bufToSend.array(), bufToSend.arrayOffset(),
bufToSend.readableBytes());
} else {
for (int i = 0; i < toSend.size(); i++) {
ByteString piece = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(i).nioBuffer());
for (int i = 0; i < bufToSend.size(); i++) {
ByteString piece = UnsafeByteOperations.unsafeWrap(bufToSend.getBuffer(i).nioBuffer());
// use ByteString.concat to avoid byte[] allocation when toSend has multiple ByteBufs
body = (body == null) ? piece : body.concat(piece);
}
Expand Down Expand Up @@ -1143,14 +1150,6 @@ private void writeAndFlush(final Channel channel,
StringUtils.requestToString(request));

errorOut(key, BKException.Code.TooManyRequestsException);

// If the request is a V2 add request, we retained the data's reference when creating the AddRequest
// object. To avoid the object leak, we need to release the reference if we met any errors
// before sending it.
if (request instanceof BookieProtocol.AddRequest) {
BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) request;
ar.recycle();
}
return;
}

Expand Down
Loading

0 comments on commit 234b817

Please sign in to comment.