File size: 4,244 Bytes
e4e4574 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
"""Kraken provider implementation"""
from __future__ import annotations
from typing import List
from datetime import datetime
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from core.base_provider import BaseProvider
from core.models import OHLCV, Price
class KrakenProvider(BaseProvider):
"""Kraken public API provider"""
# Kraken interval mapping (in minutes)
INTERVAL_MAP = {
"1m": 1,
"5m": 5,
"15m": 15,
"1h": 60,
"4h": 240,
"1d": 1440,
"1w": 10080,
}
# Symbol mapping
SYMBOL_MAP = {
"BTC": "XXBTZUSD",
"ETH": "XETHZUSD",
"SOL": "SOLUSD",
"XRP": "XXRPZUSD",
"ADA": "ADAUSD",
"DOT": "DOTUSD",
"LINK": "LINKUSD",
"LTC": "XLTCZUSD",
"BCH": "BCHUSD",
"MATIC": "MATICUSD",
"AVAX": "AVAXUSD",
"XLM": "XXLMZUSD",
}
def __init__(self):
super().__init__(
name="kraken",
base_url="https://api.kraken.com/0/public",
timeout=10
)
def _normalize_symbol(self, symbol: str) -> str:
"""Normalize symbol to Kraken format"""
symbol = symbol.upper().replace("/", "").replace("USDT", "").replace("-", "")
return self.SYMBOL_MAP.get(symbol, f"{symbol}USD")
async def fetch_ohlcv(self, symbol: str, interval: str, limit: int) -> List[OHLCV]:
"""Fetch OHLCV data from Kraken"""
kraken_symbol = self._normalize_symbol(symbol)
kraken_interval = self.INTERVAL_MAP.get(interval, 60)
url = f"{self.base_url}/OHLC"
params = {
"pair": kraken_symbol,
"interval": kraken_interval
}
data = await self._make_request(url, params)
if "error" in data and data["error"]:
raise Exception(f"Kraken error: {data['error']}")
# Get the OHLC data (key is the pair name)
result = data.get("result", {})
pair_key = next(iter([k for k in result.keys() if k != "last"]), None)
if not pair_key:
raise Exception("No OHLC data returned from Kraken")
ohlc_data = result[pair_key]
# Parse Kraken OHLC format
# [time, open, high, low, close, vwap, volume, count]
ohlcv_list = []
for candle in ohlc_data[:limit]:
ohlcv_list.append(OHLCV(
timestamp=int(candle[0]) * 1000, # Convert to milliseconds
open=float(candle[1]),
high=float(candle[2]),
low=float(candle[3]),
close=float(candle[4]),
volume=float(candle[6])
))
return ohlcv_list
async def fetch_prices(self, symbols: List[str]) -> List[Price]:
"""Fetch current prices from Kraken ticker"""
# Kraken requires specific pair names
pairs = [self._normalize_symbol(s) for s in symbols]
url = f"{self.base_url}/Ticker"
params = {
"pair": ",".join(pairs)
}
data = await self._make_request(url, params)
if "error" in data and data["error"]:
raise Exception(f"Kraken error: {data['error']}")
result = data.get("result", {})
prices = []
for pair_key, ticker in result.items():
# Extract base symbol
base_symbol = next(
(s for s, p in self.SYMBOL_MAP.items() if p == pair_key),
pair_key[:3]
)
# Kraken ticker format: c = last, v = volume, o = open
last_price = float(ticker["c"][0])
volume_24h = float(ticker["v"][1]) * last_price # Volume in quote currency
open_price = float(ticker["o"])
# Calculate 24h change percentage
change_24h = ((last_price - open_price) / open_price * 100) if open_price > 0 else 0
prices.append(Price(
symbol=base_symbol,
name=base_symbol,
price=last_price,
priceUsd=last_price,
change24h=change_24h,
volume24h=volume_24h,
lastUpdate=datetime.now().isoformat()
))
return prices
|