-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathauthenticated_connection.go
107 lines (84 loc) · 3.11 KB
/
authenticated_connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package pluto
import (
"sync"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
)
var (
AuthenticatedConnections = make(map[uuid.UUID]AuthenticatedConnection)
AuthenticatedConnectionsMutex = new(sync.RWMutex)
)
type AuthenticatedConnection struct {
AcceptedConnection
Producer Identifier
ProducerCredential Credential
}
var authenticator = NewConditionalProcessor(&ConnectionDecoder{
MaxDecode: 1,
ReadDeadline: time.Second * 2,
ProcessableBuilder: func(context Processable, new OutComingProcessable) Processable { return &new },
Processor: NewInlineProcessor(func(processable Processable) (result Processable, succeed bool) {
defer func() { succeed = recover() == nil }()
ApplicationLogger.Debug(ApplicationLog{
Message: "Authentication request",
})
var authenticatedConnection AuthenticatedConnection
// Authenticate Connection
{
connectionID := uuid.MustParse(processable.GetBody().(map[string]any)["connection_id"].(string))
connectionToken := processable.GetBody().(map[string]any)["connection_token"].(string)
AcceptedConnectionsMutex.RLock()
acceptedConnection, found := AcceptedConnections[connectionID]
if !found || acceptedConnection.Token != connectionToken {
AcceptedConnectionsMutex.RUnlock()
return processable, false
}
AcceptedConnectionsMutex.RUnlock()
authenticatedConnection.AcceptedConnection = acceptedConnection
}
// Authenticate Producer
{
p := processable.(*OutComingProcessable)
validated, err := p.ProducerCredential.Validate(p.GetProducer())
if err != nil {
Log.Debug("Validate producer credential", zap.Error(err))
return processable, false
}
if !validated {
// TODO: Write to the client that the credential is not valid
return processable, false
}
authenticatedConnection.Producer = p.Producer
authenticatedConnection.ProducerCredential = p.ProducerCredential
}
// Move the connection from the accepted connections to the authenticated connections
{
AcceptedConnectionsMutex.Lock()
AuthenticatedConnectionsMutex.Lock()
delete(AcceptedConnections, authenticatedConnection.AcceptedConnection.ID)
AuthenticatedConnections[authenticatedConnection.ID] = authenticatedConnection
AuthenticatedConnectionsMutex.Unlock()
AcceptedConnectionsMutex.Unlock()
}
return processable, true
}),
}).Fail(ProcessorBucket{Processors: []Processor{
NewInlineProcessor(func(processable Processable) (Processable, bool) {
defer func() { recover() }()
connectionID := processable.GetBody().(map[string]any)["connection_id"].(uuid.UUID)
connectionToken := processable.GetBody().(map[string]any)["connection_token"].(string)
AcceptedConnectionsMutex.RLock()
acceptedConnection, found := AcceptedConnections[connectionID]
if !found || acceptedConnection.Token != connectionToken {
AcceptedConnectionsMutex.RUnlock()
return processable, false
}
AcceptedConnectionsMutex.RUnlock()
_ = acceptedConnection.Close()
AcceptedConnectionsMutex.Lock()
delete(AcceptedConnections, acceptedConnection.ID)
AcceptedConnectionsMutex.Unlock()
return processable, false
}),
}})