Skip to content

Commit

Permalink
chores: change PostParam and Headers input data type
Browse files Browse the repository at this point in the history
  • Loading branch information
ranjanrak committed Jul 1, 2022
1 parent c9b03bf commit 0fc99d9
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 96 deletions.
107 changes: 53 additions & 54 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ go get -u github.com/ranjanrak/dead-letter-queue
package main

import (
"net/http"
"net/url"
"fmt"
"net/http"
"net/url"

deadletterqueue "github.com/ranjanrak/dead-letter-queue"
deadletterqueue "github.com/ranjanrak/dead-letter-queue"
)
func main() {
// Create new HTTP request queue instance
Expand All @@ -43,39 +44,44 @@ func main() {
Ctx: nil,
QueueName: "",
DeadHTTP: []int{400, 403, 429, 500, 502},
})
})

// Add post params
postParam := url.Values{}
postParam.Add("exchange", "NSE")
postParam.Add("tradingsymbol", "TCS")
postParam.Add("transaction_type", "BUY")
postParam.Add("quantity", "1")
postParam.Add("product", "CNC")
postParam.Add("order_type", "MARKET")
postParam.Add("validity", "DAY")

// Add request header
var headers http.Header = map[string][]string{}
headers.Add("x-kite-version", "3")
headers.Add("authorization", "token api_key:access_token")
headers.Add("content-type", "application/x-www-form-urlencoded")

// Request message
queueMsg := deadletterqueue.InputMsg{
Name: "Place TCS Order",
Url: "https://api.kite.trade/orders/regular",
ReqMethod: "POST",
PostParam: map[string]interface{}{
"exchange": "NSE",
"tradingsymbol": "TCS",
"transaction_type": "BUY",
"quantity": 1,
"product": "CNC",
"order_type": "MARKET",
"validity": "DAY",
},
Headers: map[string]interface{}{
"x-kite-version": 3,
"authorization": "token abcd123:efgh1234",
"content-type": "application/x-www-form-urlencoded",
},
}

// worker that adds message to http queue
err := httpQueue.AddMessage(queueMsg)
if err != nil {
fmt.Printf("Error adding msg in the request queue : %v", err)
}

// worker that executes http request queues
httpQueue.ExecuteQueue()

// worker that executes only dead letter http queues
httpQueue.ExecuteDeadQueue()
reqMsgOrd := deadletterqueue.InputMsg{
Name: "Place TCS Order",
Url: "https://api.kite.trade/orders/regular",
ReqMethod: "POST",
PostParam: postParam,
Headers: headers,
}

// worker that adds message to redis queue
err := httpQueue.AddMessage(reqMsgOrd)
if err != nil {
fmt.Printf("Error adding msg in the request queue : %v", err)
}

// worker that executes http request queues
httpQueue.ExecuteQueue()

// worker that executes dead letter http queues
httpQueue.ExecuteDeadQueue()
}
```

Expand All @@ -85,28 +91,21 @@ Request represents an HTTP request with all parameters.

### Adding message

Adding an HTTP message to the HTTP queue with all parameters.
Adding an HTTP message to the request queue with all parameters.

```go
// Add request header
var headers http.Header = map[string][]string{}
headers.Add("x-kite-version", "3")
headers.Add("authorization", "token api_key:access_token")

// Request message
queueMsg := deadletterqueue.InputMsg{
Name: "Place TCS Order",
Url: "https://api.kite.trade/orders/regular",
ReqMethod: "POST",
PostParam: map[string]interface{}{
"exchange": "NSE",
"tradingsymbol": "TCS",
"transaction_type": "BUY",
"quantity": 1,
"product": "CNC",
"order_type": "MARKET",
"validity": "DAY",
},
Headers: map[string]interface{}{
"x-kite-version": 3,
"authorization": "token abcd123:efgh1234",
"content-type": "application/x-www-form-urlencoded",
},
Name: "Fetch order book",
Url: "https://api.kite.trade/orders",
ReqMethod: "GET",
PostParam: nil,
Headers: headers,
}
err := httpQueue.AddMessage(queueMsg)
if err != nil {
Expand All @@ -116,7 +115,7 @@ if err != nil {

### Delete message from the request queue

Delete request message available in HTTP queue before it's execution with the input message `Name`.
Delete request message available in the queue before it's execution with the input message `Name`.

```go
err := httpQueue.DeleteReqMsg("Place TCS Order")
Expand Down
28 changes: 8 additions & 20 deletions dead_letter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
Expand All @@ -29,8 +28,8 @@ type InputMsg struct {
Name string
Url string
ReqMethod string
PostParam map[string]interface{}
Headers map[string]interface{}
PostParam url.Values
Headers http.Header
}

// Client represents interface for redis queue
Expand Down Expand Up @@ -115,30 +114,19 @@ func (c *Client) ExecuteQueueName(qName string) {

// RawExecute performs the HTTP request based on request params
func (c *Client) RawExecute(msgParam InputMsg, qName string) {
// Add all POST or PUT params as url.values map
postData := url.Values{}
if msgParam.PostParam != nil {
for key, value := range msgParam.PostParam {
valueStr := fmt.Sprintf("%v", value)
postData.Add(key, valueStr)
}
}
var postBody io.Reader
if msgParam.ReqMethod == "POST" || msgParam.ReqMethod == "PUT" {
// convert post params map into “URL encoded” form
paramsEncoded := postData.Encode()
postBody = bytes.NewReader([]byte(paramsEncoded))
// convert post params map into “URL encoded”
if msgParam.PostParam != nil {
paramsEncoded := msgParam.PostParam.Encode()
postBody = bytes.NewReader([]byte(paramsEncoded))
}
}
req, _ := http.NewRequest(msgParam.ReqMethod, msgParam.Url, postBody)

// Add all request headers to the http request
reqHeader := http.Header{}
if msgParam.Headers != nil {
for name, value := range msgParam.Headers {
valueStr := fmt.Sprintf("%v", value)
reqHeader.Add(name, valueStr)
}
req.Header = reqHeader
req.Header = msgParam.Headers
}

res, err := http.DefaultClient.Do(req)
Expand Down
75 changes: 53 additions & 22 deletions dead_letter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"testing"

"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -33,25 +36,30 @@ func MockRedis() {
func TestAddMessage(t *testing.T) {
// Initialize the mock redis
MockRedis()

// Add post params
postParam := url.Values{}
postParam.Add("exchange", "NSE")
postParam.Add("tradingsymbol", "TCS")
postParam.Add("transaction_type", "BUY")
postParam.Add("quantity", "1")
postParam.Add("product", "CNC")
postParam.Add("order_type", "MARKET")
postParam.Add("validity", "DAY")

// Add request header
var headers http.Header = map[string][]string{}
headers.Add("x-kite-version", "3")
headers.Add("authorization", "token api_key:access_token")
headers.Add("content-type", "application/x-www-form-urlencoded")

// Request message
reqMsgOrd = InputMsg{
Name: "Place TCS Order",
Url: "https://api.kite.trade/orders/regular",
ReqMethod: "POST",
PostParam: map[string]interface{}{
"exchange": "NSE",
"tradingsymbol": "TCS",
"transaction_type": "BUY",
"quantity": 1,
"product": "CNC",
"order_type": "MARKET",
"validity": "DAY",
},
Headers: map[string]interface{}{
"x-kite-version": 3,
"authorization": "token abcd123:efgh1234",
"content-type": "application/x-www-form-urlencoded",
},
PostParam: postParam,
Headers: headers,
}
// mock to set reqMsg for AddMessage call
mock.ExpectSet("ReqQueue", structToJson([]InputMsg{reqMsgOrd}), 0).SetVal("OK")
Expand All @@ -61,19 +69,23 @@ func TestAddMessage(t *testing.T) {
}

func TestDeleteReqMsg(t *testing.T) {
// Add post params
postParam := url.Values{}
postParam.Add("api_key", "api_key")
postParam.Add("request_token", "request_token")
postParam.Add("checksum", "checksum")

// Add request header
var headers http.Header = map[string][]string{}
headers.Add("x-kite-version", "3")

// Request message
reqMsgSess = InputMsg{
Name: "Post session token",
Url: "https://api.kite.trade/session/token",
ReqMethod: "POST",
PostParam: map[string]interface{}{
"api_key": "api_key",
"request_token": "request_token",
"checksum": "checksum",
},
Headers: map[string]interface{}{
"x-kite-version": 3,
},
PostParam: postParam,
Headers: headers,
}
// Add mock to add both reqMsg and reqMsgSess in ReqQueue
mock.ExpectGet("ReqQueue").SetVal(string(structToJson([]InputMsg{reqMsgOrd, reqMsgSess})))
Expand All @@ -100,6 +112,25 @@ func TestDeleteDeadMsg(t *testing.T) {
assert.Nil(t, err)
}

func TestMessageStatus(t *testing.T) {
// Load mock response
mockOrders, err := ioutil.ReadFile("./mockdata/orderbook_response.json")
if err != nil {
t.Errorf("Error while fetching orderbook_response. %v", err)
}
// Mock to fetch request status
mock.ExpectGet("Fetch order book").SetVal(string(mockOrders))
// Check response status for executed message
response, err := cli.MessageStatus("Fetch order book")
if err != nil {
fmt.Printf("Error %v", err)
}
var mockStruct map[string]interface{}
json.Unmarshal([]byte(response), &mockStruct)
//assert
assert.Equal(t, mockStruct["status"], "success", "Fetch order book request failed.")
}

// structToString parses struct to json for redis mock
func structToJson(msg []InputMsg) []byte {
jsonMessage, err := json.Marshal(msg)
Expand Down
1 change: 1 addition & 0 deletions mockdata/orderbook_response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"status":"success","data":[{"placed_by":"Test Client","order_id":"220516001538500","exchange_order_id":"1300000011463164","parent_order_id":null,"status":"COMPLETE","status_message":null,"status_message_raw":null,"order_timestamp":"2022-05-16 11:54:18","exchange_update_timestamp":"2022-05-16 11:54:18","exchange_timestamp":"2022-05-16 11:54:18","variety":"regular","modified":false,"exchange":"NSE","tradingsymbol":"SBIN","instrument_token":779521,"order_type":"MARKET","transaction_type":"BUY","validity":"DAY","validity_ttl":0,"product":"CNC","quantity":1,"disclosed_quantity":0,"price":0,"trigger_price":0,"average_price":455.5,"filled_quantity":1,"pending_quantity":0,"cancelled_quantity":0,"market_protection":0,"meta":{},"tag":null,"guid":"ABCDEFGHIJKLM0123"}]}

0 comments on commit 0fc99d9

Please sign in to comment.