-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcopyTrader.py
274 lines (233 loc) · 10.2 KB
/
copyTrader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
import pandas as pd
import json
import time
from textblob import TextBlob
import ccxt
# Configuration
API_KEY = 'your_api_key' # Exchange API key
API_SECRET = 'your_api_secret' # Exchange API secret
EXCHANGE_ID = 'binance' # Exchange platform (e.g., 'binance', 'kraken')
# Blockchain Explorer API Key (for wallet monitoring)
BLOCKCHAIN_API_KEY = 'your_blockchain_api_key'
# Copy trading wallets to monitor
COPY_TRADE_WALLETS = ['wallet_address_1', 'wallet_address_2']
# Trade Parameters
TRADE_AMOUNT = 0.001 # Base trade amount (adjust dynamically as needed)
TAKE_PROFIT_PERCENT = 1.5 # Take profit percentage
STOP_LOSS_PERCENT = 1.0 # Stop loss percentage
# Sniping Conditions
SNIPING_CONDITIONS = {
"price_change": 5.0, # Minimum percentage price change for sniping
"volume_spike": 2.0, # Volume spike multiplier for sniping
}
# Logging Configuration
LOG_LEVEL = 'DEBUG' # Set to 'DEBUG', 'INFO', or 'ERROR'
LOG_FILE_PATH = 'trading_bot.log' # Path to log file
# Timeframe Configuration
TIMEFRAME = '1m' # Default timeframe for fetching candles
DAILY_TIMEFRAME = '1d' # Timeframe for daily trend analysis
VOLATILITY_PERIOD = 20 # Period for volatility calculation
MOMENTUM_PERIOD = 10 # Period for momentum calculation
# Initialize the exchange outside of conditional block
exchange = None
if CCXT_AVAILABLE:
exchange = getattr(ccxt, EXCHANGE_ID)({
'apiKey': API_KEY,
'secret': API_SECRET,
'enableRateLimit': True,
})
# Dynamic data population from CSV or JSON
def fetch_trading_symbols_from_file(file_path, file_type='csv'):
"""Fetch and filter trading symbols from a file (CSV or JSON)."""
try:
if file_type == 'csv':
df = pd.read_csv(file_path)
symbols = df['symbol'].tolist()
elif file_type == 'json':
with open(file_path, 'r') as f:
data = json.load(f)
symbols = data.get('symbols', [])
else:
raise ValueError("Unsupported file type. Use 'csv' or 'json'.")
print(f"Fetched {len(symbols)} symbols from {file_path}.")
return symbols
except Exception as e:
print(f"Error fetching symbols from file: {e}")
return []
# Load symbols dynamically from a file (replace 'symbols.csv' with your file path)
tokens = fetch_trading_symbols_from_file('symbols.csv', file_type='csv')
# Ensure fallback logic in case file-based fetching fails
if not tokens:
print("Failed to load symbols from file. Falling back to default dynamic fetching.")
def fetch_trading_symbols(quote_currency='USDT', max_symbols=10):
"""Fetch and filter trading symbols dynamically from the exchange."""
if not exchange:
print("Exchange is not initialized. Cannot fetch symbols.")
return []
try:
exchange.load_markets()
symbols = [symbol for symbol in exchange.symbols if symbol.endswith(f"/{quote_currency}")]
print(f"Fetched {len(symbols)} symbols matching {quote_currency}.")
return symbols[:max_symbols] # Limit the number of symbols
except Exception as e:
print(f"Error fetching trading symbols: {e}")
return []
tokens = fetch_trading_symbols()
# Initialize data structure
data = {
'symbol': tokens,
'open_side': [None] * len(tokens),
'index_pos': list(range(len(tokens))),
'open_size': [0] * len(tokens),
'open_bool': [False] * len(tokens),
'long': [None] * len(tokens)
}
positions_df = pd.DataFrame(data)
# Helper functions
def fetch_latest_candle(symbol, timeframe=TIMEFRAME):
"""Fetch the latest candle for the given symbol and timeframe."""
if not exchange:
print("Exchange is not initialized. Cannot fetch candle data.")
return None
candles = exchange.fetch_ohlcv(symbol, timeframe=timeframe, limit=2)
return candles[-1] # Return the most recent candle
def calculate_daily_trend(symbol):
"""Analyze the daily trend for better-informed trading decisions."""
if not exchange:
print("Exchange is not initialized. Cannot calculate daily trend.")
return None
candles = exchange.fetch_ohlcv(symbol, timeframe=DAILY_TIMEFRAME, limit=5) # Fetch last 5 daily candles
if len(candles) < 5:
return None
# Simple trend analysis: compare the closing prices over 5 days
closes = [c[4] for c in candles]
trend = "up" if closes[-1] > closes[0] else "down"
print(f"Daily trend for {symbol}: {trend}")
return trend
def calculate_volatility(prices, period=VOLATILITY_PERIOD):
"""Calculate volatility using standard deviation."""
if len(prices) < period:
return None
return pd.Series(prices).pct_change().rolling(period).std().iloc[-1]
def calculate_rsi(prices, period=14):
"""Calculate the Relative Strength Index (RSI)."""
if len(prices) < period:
return None
gains = [prices[i] - prices[i - 1] for i in range(1, len(prices)) if prices[i] > prices[i - 1]]
losses = [prices[i - 1] - prices[i] for i in range(1, len(prices)) if prices[i] < prices[i - 1]]
avg_gain = sum(gains[-period:]) / period
avg_loss = sum(losses[-period:]) / period
if avg_loss == 0:
return 100
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return rsi
def perform_sentiment_analysis(text):
"""Perform sentiment analysis on a given text."""
analysis = TextBlob(text)
sentiment = analysis.sentiment.polarity
return sentiment
def calculate_momentum(prices, period=MOMENTUM_PERIOD):
"""Calculate momentum for trend-following logic."""
if len(prices) < period:
return None
return prices[-1] - prices[-period] # Momentum as price difference
def place_order(side, amount, symbol):
"""Place a market order."""
if not exchange:
print("Exchange is not initialized. Cannot place order.")
return None
try:
order = exchange.create_order(
symbol=symbol,
type='market',
side=side,
amount=amount
)
return order
except Exception as e:
print(f"Error placing {side} order: {e}")
return None
def monitor_copy_trade_wallets():
"""Monitor selected wallets and copy their trades."""
print("Monitoring wallets for copy trading...")
for wallet in COPY_TRADE_WALLETS:
try:
trades = get_wallet_trades(wallet) # Fetch trades from blockchain
for trade in trades:
print(f"Wallet {wallet} executed trade: {trade}")
place_order(trade['side'], trade['amount'], trade['symbol'])
except Exception as e:
print(f"Error monitoring wallet {wallet}: {e}")
def get_wallet_trades(wallet):
"""Fetch trades from the blockchain for a given wallet."""
# Replace the following with real API or blockchain explorer integration
# Example: Using Etherscan API or similar for fetching wallet transactions
try:
transactions = [] # Replace with real API call result
# Simulate parsing transactions into trade actions
trades = []
for tx in transactions:
if tx['to'] in COPY_TRADE_WALLETS: # Example condition
trades.append({
'symbol': tx['token'],
'side': 'buy' if tx['type'] == 'incoming' else 'sell',
'amount': tx['amount']
})
return trades
except Exception as e:
print(f"Error fetching trades for wallet {wallet}: {e}")
return []
def pairs_trading(symbols):
"""Execute pairs trading strategy based on price spreads."""
prices = [fetch_latest_candle(symbol)[4] for symbol in symbols]
if len(prices) == 2:
spread = prices[0] - prices[1]
print(f"Pair Spread: {spread}")
if spread > PAIR_SPREAD_THRESHOLD: # Arbitrary threshold
print("Spread too wide: Short first asset, Long second asset")
place_order('sell', TRADE_AMOUNT, symbols[0])
place_order('buy', TRADE_AMOUNT, symbols[1])
elif spread < -PAIR_SPREAD_THRESHOLD:
print("Spread too negative: Long first asset, Short second asset")
place_order('buy', TRADE_AMOUNT, symbols[0])
place_order('sell', TRADE_AMOUNT, symbols[1])
def find_open_positions(df):
"""Find positions where open_bool is True and return the symbols."""
open_positions = df[df['open_bool'] == True]
symbols = open_positions['symbol'].tolist()
print(f"Open positions found: {symbols}")
return symbols
def token_sniping(symbols):
"""Sniping logic to monitor tokens for specific conditions."""
for symbol in symbols:
candle = fetch_latest_candle(symbol)
if candle is None:
continue
price_change = ((candle[4] - candle[1]) / candle[1]) * 100 # % price change
volume_spike = candle[5] / sum([c[5] for c in exchange.fetch_ohlcv(symbol, timeframe=TIMEFRAME, limit=20)]) # Volume spike ratio
if price_change >= SNIPING_CONDITIONS["price_change"]:
print(f"Sniping opportunity detected for {symbol}: Price change {price_change:.2f}%")
place_order('buy', TRADE_AMOUNT, symbol)
if volume_spike >= SNIPING_CONDITIONS["volume_spike"]:
print(f"Volume spike detected for {symbol}: {volume_spike:.2f}x average volume")
place_order('buy', TRADE_AMOUNT, symbol)
def dollar_cost_averaging(symbol):
"""Implement Dollar-Cost Averaging (DCA) logic."""
print(f"Executing DCA for {symbol} with amount {DCA_AMOUNT}")
place_order('buy', DCA_AMOUNT, symbol)
pnl_tracker = {"realized_pnl": 0.0, "unrealized_pnl": 0.0}
def update_pnl(symbol, entry_price, current_price, amount, side):
"""Update PnL tracking."""
if side == "buy":
pnl_tracker["unrealized_pnl"] = (current_price - entry_price) * amount
elif side == "sell":
pnl_tracker["realized_pnl"] += (current_price - entry_price) * amount
print(f"PnL Update: Realized: {pnl_tracker['realized_pnl']:.2f}, Unrealized: {pnl_tracker['unrealized_pnl']:.2f}")
def trading_logic():
"""Main trading logic with enhanced features."""
print("Starting trading bot...")
monitor_copy_trade_wallets()
# Add additional logic for trading here
if __name__ == "__main__":
trading_logic()