Skip to content
This repository has been archived by the owner on May 8, 2023. It is now read-only.

Commit

Permalink
Merge pull request #80 from pcdummy/natsdumper_improvements
Browse files Browse the repository at this point in the history
Natsdumper improvements
  • Loading branch information
regner authored Aug 19, 2017
2 parents 0052f63 + 05faf03 commit 165f531
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 16 deletions.
131 changes: 116 additions & 15 deletions cmd/natsdumper/natsdumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import (
"encoding/json"
"flag"
"fmt"
"strings"

nats "github.com/nats-io/go-nats"
"github.com/regner/albiondata-client/lib"
)

var natsURL string
var (
natsURL string
natsChannels string
)

func init() {
flag.StringVar(
Expand All @@ -18,28 +22,47 @@ func init() {
"nats://public:[email protected]:4222",
"NATS URL to connect to.",
)

flag.StringVar(
&natsChannels,
"c",
fmt.Sprintf("%s,%s", lib.NatsMarketOrdersDeduped, lib.NatsGoldPricesDeduped),
fmt.Sprintf("NATS channels to connect to, comma seperated. Can be '%s', '%s', '%s', '%s'",
lib.NatsMarketOrdersDeduped, lib.NatsGoldPricesDeduped, lib.NatsMarketOrdersIngest, lib.NatsGoldPricesIngest,
),
)
}

func dumpMarketOrders(m *nats.Msg) {
morders := &lib.MarketUpload{}
if err := json.Unmarshal(m.Data, morders); err != nil {
func subscribeMarketOrdersIngest(nc *nats.Conn) {
fmt.Printf("mi Subscribing %s\n", lib.NatsMarketOrdersIngest)
marketCh := make(chan *nats.Msg, 64)
marketSub, err := nc.ChanSubscribe(lib.NatsMarketOrdersIngest, marketCh)
if err != nil {
fmt.Printf("%v\n", err)
return
}
defer marketSub.Unsubscribe()

for _, order := range morders.Orders {
jb, _ := json.Marshal(order)
fmt.Printf("%d %s\n", order.LocationID, string(jb))
for {
select {
case msg := <-marketCh:
morders := &lib.MarketUpload{}
if err := json.Unmarshal(msg.Data, morders); err != nil {
fmt.Printf("%v\n", err)
}

for _, order := range morders.Orders {
jb, _ := json.Marshal(order)
fmt.Printf("mi %s\n", string(jb))
}
}
}
}

func main() {
flag.Parse()

nc, _ := nats.Connect(natsURL)
defer nc.Close()

func subscribeMarketOrdersDeduped(nc *nats.Conn) {
fmt.Printf("md Subscribing %s\n", lib.NatsMarketOrdersDeduped)
marketCh := make(chan *nats.Msg, 64)
marketSub, err := nc.ChanSubscribe(lib.NatsGoldPricesDeduped, marketCh)
marketSub, err := nc.ChanSubscribe(lib.NatsMarketOrdersDeduped, marketCh)
if err != nil {
fmt.Printf("%v\n", err)
return
Expand All @@ -49,7 +72,85 @@ func main() {
for {
select {
case msg := <-marketCh:
dumpMarketOrders(msg)
fmt.Printf("md %s\n", string(msg.Data))
}
}
}

func subscribeGoldPricesIngest(nc *nats.Conn) {
fmt.Printf("gi Subscribing %s\n", lib.NatsGoldPricesIngest)
goldCh := make(chan *nats.Msg, 64)
goldSub, err := nc.ChanSubscribe(lib.NatsGoldPricesIngest, goldCh)
if err != nil {
fmt.Printf("%v\n", err)
return
}
defer goldSub.Unsubscribe()

for {
select {
case msg := <-goldCh:
fmt.Printf("gi %s\n", string(msg.Data))
}
}
}

func subscribeGoldPricesDeduped(nc *nats.Conn) {
fmt.Printf("gd Subscribing %s\n", lib.NatsGoldPricesDeduped)
goldCh := make(chan *nats.Msg, 64)
goldSub, err := nc.ChanSubscribe(lib.NatsGoldPricesDeduped, goldCh)
if err != nil {
fmt.Printf("%v\n", err)
return
}
defer goldSub.Unsubscribe()

for {
select {
case msg := <-goldCh:
fmt.Printf("gd %s\n", string(msg.Data))
}
}
}

func main() {
flag.Parse()

nc, _ := nats.Connect(natsURL)
defer nc.Close()

chans := strings.Split(natsChannels, ",")

if len(chans) > 1 {
goChans := 0

for _, goChan := range chans {
switch goChan {
case lib.NatsMarketOrdersIngest:
go subscribeMarketOrdersIngest(nc)
case lib.NatsMarketOrdersDeduped:
go subscribeMarketOrdersDeduped(nc)
case lib.NatsGoldPricesIngest:
go subscribeGoldPricesIngest(nc)
case lib.NatsGoldPricesDeduped:
go subscribeGoldPricesDeduped(nc)
}

goChans = goChans + 1
if goChans > len(chans)-2 {
break
}
}
}

switch chans[len(chans)-1] {
case lib.NatsMarketOrdersIngest:
subscribeMarketOrdersIngest(nc)
case lib.NatsMarketOrdersDeduped:
subscribeMarketOrdersDeduped(nc)
case lib.NatsGoldPricesIngest:
subscribeGoldPricesIngest(nc)
case lib.NatsGoldPricesDeduped:
subscribeGoldPricesDeduped(nc)
}
}
2 changes: 1 addition & 1 deletion lib/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type MarketOrder struct {
Expires string `json:"Expires"`
}

// MarketUpload contains a list of orders and the location where the orders are from
// MarketUpload contains a list of orders
type MarketUpload struct {
Orders []*MarketOrder `json:"Orders"`
}

0 comments on commit 165f531

Please sign in to comment.