Skip to content

Commit

Permalink
fixes soem bugs in integration tests
Browse files Browse the repository at this point in the history
- adds extensive log tracing
- trace full resolve paths
- work around found issue automerge/automerge-repo#343
  to accept Automerge-repo double-response on REQ
  • Loading branch information
heckj committed Apr 22, 2024
1 parent 31a404a commit 4a536c2
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 11 deletions.
10 changes: 9 additions & 1 deletion Sources/AutomergeRepo/InternalDocHandle.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import struct Automerge.ChangeHash
import class Automerge.Document
import struct Automerge.SyncState
import struct Foundation.Data
import OSLog

final class InternalDocHandle {
enum DocHandleState {
Expand Down Expand Up @@ -37,7 +38,14 @@ final class InternalDocHandle {

let id: DocumentId
var doc: Automerge.Document?
var state: DocHandleState
var state: DocHandleState {
willSet {
if newValue == .unavailable {
print("X")
}
Logger.repo.trace("updating state of \(self.id) to \(String(describing: newValue))")
}
}
var remoteHeads: [STORAGE_ID: Set<Automerge.ChangeHash>]
var syncStates: [PEER_ID: SyncState]

Expand Down
14 changes: 13 additions & 1 deletion Sources/AutomergeRepo/Networking/NetworkSubsystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ public final class NetworkSubsystem {
// invariant that there should be a valid doc handle available from the repo
throw Errors.Unavailable(id: id)
}

Logger.repo.trace(" - Initiating remote fetch for \(id)")
let newDocument = Document()
for adapter in adapters {
for peerConnection in adapter.peeredConnections {
Logger.repo.trace(" - requesting \(id) from peer \(peerConnection.peerId) at \(peerConnection.endpoint)")
// upsert the requested document into the list by peer
if var existingList = requestedDocuments[id] {
existingList.append(peerConnection.peerId)
Expand All @@ -80,6 +81,7 @@ public final class NetworkSubsystem {
}
}
}
Logger.repo.trace(" - remote fetch for \(id) finished")
}

func send(message: SyncV1Msg, to: PEER_ID?) async {
Expand Down Expand Up @@ -141,14 +143,24 @@ extension NetworkSubsystem: NetworkEventReceiver {
)
return
}
Logger.network.trace("Received \(event.debugDescription) event")
if let peersRequested = requestedDocuments[docId] {
Logger.network.trace("We've requested \(docId) from \(peersRequested.count) peers:")
for p in peersRequested {
Logger.network.trace(" - Peer: \(p)")
}
// if we receive an unavailable from one peer, record it and wait until
// we receive unavailable from all available peers before marking it unavailable
let remainingPeersPending = peersRequested.filter { peerId in
// include the peers OTHER than the one sending the unavailable msg
peerId != unavailableMsg.senderId
}
Logger.network.trace("Removing the sending peer, there are \(remainingPeersPending.count) remaining:")
for p in remainingPeersPending {
Logger.network.trace(" - Peer: \(p)")
}
if remainingPeersPending.isEmpty {
Logger.network.trace("No further peers with requests outstanding, so marking document \(docId) as unavailable")
await repo.markDocUnavailable(id: docId)
requestedDocuments.removeValue(forKey: docId)
} else {
Expand Down
18 changes: 10 additions & 8 deletions Sources/AutomergeRepo/Networking/Providers/WebSocketProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public final class WebSocketProvider: NetworkProvider {
var endpoint: URL?
var peered: Bool

public init(_ config: WebSocketProviderConfiguration = .default) {
public nonisolated init(_ config: WebSocketProviderConfiguration = .default) {
self.config = config
peeredConnections = []
delegate = nil
Expand Down Expand Up @@ -176,7 +176,7 @@ public final class WebSocketProvider: NetworkProvider {
// Race a timeout against receiving a Peer message from the other side
// of the WebSocket connection. If we fail that race, shut down the connection
// and move into a .closed connectionState
let websocketMsg = try await nextMessage(withTimeout: .seconds(3.5))
let websocketMsg = try await nextMessage(on: webSocketTask, withTimeout: .seconds(3.5))

// Now that we have the WebSocket message, figure out if we got what we expected.
// For the sync protocol handshake phase, it's essentially "peer or die" since
Expand Down Expand Up @@ -217,6 +217,7 @@ public final class WebSocketProvider: NetworkProvider {
// throw error on cancel
// otherwise return the msg
private func nextMessage(
on webSocketTask: URLSessionWebSocketTask,
withTimeout: ContinuousClock.Instant
.Duration?
) async throws -> URLSessionWebSocketTask.Message {
Expand All @@ -233,12 +234,12 @@ public final class WebSocketProvider: NetworkProvider {
// going into the receive loop.
try Task.checkCancellation()

// check the invariants
guard let webSocketTask
else {
throw SyncV1Msg.Errors
.ConnectionClosed(errorDescription: "Attempting to wait for a websocket message when the task is nil")
}
// // check the invariants
// guard let webSocketTask
// else {
// throw SyncV1Msg.Errors
// .ConnectionClosed(errorDescription: "Attempting to wait for a websocket message when the task is nil")
// }

// Race a timeout against receiving a Peer message from the other side
// of the WebSocket connection. If we fail that race, shut down the connection
Expand Down Expand Up @@ -337,6 +338,7 @@ public final class WebSocketProvider: NetworkProvider {
// in this method (all handling of them should happen before getting here)
// - .leave invokes the disconnect, and associated messages to the delegate
// - otherwise forward the message to the delegate to work with
Logger.webSocket.trace("WebSocket received: \(msg.debugDescription)")
switch msg {
case let .leave(msg):
Logger.webSocket.trace("\(msg.senderId) requests to kill the connection")
Expand Down
30 changes: 29 additions & 1 deletion Sources/AutomergeRepo/Repo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,10 @@ public final class Repo {
.warning("Invalid documentId \(msg.documentId) received in a sync message \(msg.debugDescription)")
return
}
Logger.repo.trace(" - Sync request received for document \(docId)")
do {
if handles[docId] == nil {
Logger.repo.trace(" - No recorded handle for \(docId), creating one")
// There is no in-memory handle for the document being synced, so this is a request
// to create a local copy of the document encapsulated in the sync message.
let newDocument = Document()
Expand All @@ -202,6 +204,7 @@ public final class Repo {
_ = try await resolveDocHandle(id: docId)
}
guard let handle = handles[docId] else { fatalError("HANDLE DOESN'T EXIST") }
Logger.repo.trace(" - working on handle for \(docId), state: \(String(describing: handle.state))")
let docFromHandle = handle.doc ?? Document()
let syncState = syncState(id: docId, peer: msg.senderId)
// Apply the request message as a sync update
Expand Down Expand Up @@ -447,11 +450,16 @@ public final class Repo {
guard let handle = handles[id] else {
fatalError("No stored document handle for document id: \(id)")
}
Logger.repo.trace("Updated contents of document \(id), state: \(String(describing: handle.state))")
if handle.state == .requesting {
handle.state = .ready
}
assert(handle.state == .ready)
// Automerge-repo https://github.com/automerge/automerge-repo/issues/343 is sending two responses,
// the first being UNAVAILABLE, which we use to change the state, but that triggers this unexpected
// assertion, we we later receive the SYNC update to set the document as expected
assert(handle.state == .ready || handle.state == .unavailable)
handle.doc = doc
handle.state = .ready
if let storage {
do {
try await withThrowingTaskGroup(of: Void.self) { group in
Expand Down Expand Up @@ -515,18 +523,22 @@ public final class Repo {

private func resolveDocHandle(id: DocumentId) async throws -> DocHandle {
if let handle: InternalDocHandle = handles[id] {
Logger.repo.trace("RESOLVE document id \(id) [\(String(describing: handle.state))]")
switch handle.state {
case .idle:
if handle.doc != nil {
// if there's an Automerge document in memory, jump to ready
handle.state = .ready
Logger.repo.trace("RESOLVE: :: \(id) -> [\(String(describing: handle.state))]")
// STRUCT ONLY handles[id] = handle
} else {
// otherwise, first attempt to load it from persistent storage
// (if available)
handle.state = .loading
Logger.repo.trace("RESOLVE: :: \(id) -> [\(String(describing: handle.state))]")
// STRUCT ONLY handles[id] = handle
}
Logger.repo.trace("RESOLVE: :: continuing to resolve")
return try await resolveDocHandle(id: id)
case .loading:
// Do we have the document
Expand All @@ -553,6 +565,7 @@ public final class Repo {
// peers there's a new document before jumping to the 'ready' state
handle.state = .ready
// STRUCT ONLY handles[id] = handle
Logger.repo.trace("RESOLVE: :: \(id) -> [\(String(describing: handle.state))]")
return DocHandle(id: id, doc: docFromHandle)
} else {
// We don't have the underlying Automerge document, so attempt
Expand All @@ -562,44 +575,59 @@ public final class Repo {
if let doc = try await loadFromStorage(id: id) {
handle.state = .ready
// STRUCT ONLY handles[id] = handle
Logger.repo.trace("RESOLVED! :: \(id) -> [\(String(describing: handle.state))]")
return DocHandle(id: id, doc: doc)
} else {
handle.state = .requesting
// STRUCT ONLY handles[id] = handle
pendingRequestReadAttempts[id] = 0
Logger.repo.trace("RESOLVE: :: \(id) -> [\(String(describing: handle.state))]")
Logger.repo.trace("RESOLVE: :: starting remote fetch")
try await network.startRemoteFetch(id: handle.id)
Logger.repo.trace("RESOLVE: :: continuing to resolve")
return try await resolveDocHandle(id: id)
}
}
case .requesting:
guard let updatedHandle = handles[id] else {
Logger.repo.trace("RESOLVED - X :: Missing \(id) -> [UNAVAILABLE]")
throw Errors.DocUnavailable(id: handle.id)
}
if let doc = updatedHandle.doc, updatedHandle.state == .ready {
Logger.repo.trace("RESOLVED! :: \(id) -> [\(String(describing: handle.state))]")
return DocHandle(id: id, doc: doc)
} else {
guard let previousRequests = pendingRequestReadAttempts[id] else {
Logger.repo.trace("RESOLVED - X :: Missing \(id) from pending request read attempts -> [UNAVAILABLE]")
throw Errors.DocUnavailable(id: id)
}
if previousRequests < maxRetriesForFetch {
// we are racing against the receipt of a network result
// to see what we get at the end
Logger.repo.trace(" :: \(id) -> [\(String(describing: handle.state))]")
Logger.repo.trace(" :: check # \(previousRequests) (of \(self.maxRetriesForFetch), waiting \(self.pendingRequestWaitDuration) seconds for remote fetch")
try await Task.sleep(for: pendingRequestWaitDuration)
Logger.repo.trace("RESOLVE: :: continuing to resolve")
return try await resolveDocHandle(id: id)
} else {
Logger.repo.trace("RESOLVED - X :: failed waiting \(previousRequests) of \(self.maxRetriesForFetch) requests for \(id) -> [UNAVAILABLE]")
throw Errors.DocUnavailable(id: id)
}
}
case .ready:
guard let doc = handle.doc else { fatalError("DocHandle state is ready, but ._doc is null") }
Logger.repo.trace("RESOLVED! :: \(id) [\(String(describing: handle.state))]")
return DocHandle(id: id, doc: doc)
case .unavailable:
Logger.repo.trace("RESOLVED - X :: \(id) -> [MARKED UNAVAILABLE]")
throw Errors.DocUnavailable(id: handle.id)
case .deleted:
Logger.repo.trace("RESOLVED - X :: \(id) -> [MARKED DELETED]")
throw Errors.DocDeleted(id: handle.id)
}
} else {
throw Errors.DocUnavailable(id: id)
Logger.repo.error("RESOLVED - X :: Error Resolving document: Repo doesn't have a handle for \(id).")
}
}
}

0 comments on commit 4a536c2

Please sign in to comment.