こんにちは。mlbotmanです。
2021年末に開催されていたBotterのAdventCalendarを知り、JMLさんの記事が素晴らしかったので自分も試してみたいということで挑戦してみます。
参考にさせていただいた元記事はこちら。
JMLさんはおそらく経験値豊富なんだろうなと想像しているのですが、
の流れで本番稼働されています。時は金なりを実践されており素晴らしいですね。理論上1日早く動かせれば1日分利益が出せるわけです。
経験値の差、度胸の差を感じてしまいますが、今回はデータの収集を行なってみて、実現可能性がどれくらいあるのかというところを確認していきます。
元記事ではFTXを利用されているようです。現在FTXは日本からの登録を弾いているようなので、Binanceで似た戦略が可能かというのを確認していきます。
元記事にあったXAUTペアはBinanceに上場されていないため、PAXG/USDTのペアでデータ取得してみます。
PAXGはXAUTと同じく現物のゴールドとペッグされたステーブルコインです。
実装の方針は以下としました。
いきなりですがコードは以下です。ざざっと勢いで書いているため、実際にbot化して動かす際には、websocketのコネクションロスト時の対応等の追加が必要です。
以上なのですが、せっかくのBlog形式なのでコードの説明と何を考えたのかも備忘録として残しておきます。
データを取得する際は、可能な限り素早くデータを取得するべきと考えます。bot側から能動的にアクセスしにいくREST APIに対してwebsocketはセッションを一度はってしまえばserver側からデータが垂れ流されてくるので速度面でとても有利です。
該当の処理を実装しているのは以下のコード。
app/fetcher.py
import os
from dotenv import dotenv_values
from rich import print
import asyncio
from binance import ThreadedWebsocketManager
from orator import DatabaseManager
from orator import Model
from models.mini_ticker import MiniTicker
from models.agg_trade import AggTrade
from models.depth import Depth
from loguru import logger
import time
config = dotenv_values('.env')
logger.add(
'logs/fetcher.ltsv',
format="{time:YYYY-MM-DDTHH:mm:ss} \t{file} \t{name}:{function}:{line} \t{level} \t{message}",
rotation='512 MB', retention=100
)
def main():
db_config = {
'default': 'postgres',
'postgres': {
'driver': 'postgres',
'host': config['db_host'],
'database': config['db_name'],
'user': config['db_user'],
'password': config['db_pass'],
'prefix': ''
}
}
db = DatabaseManager(db_config)
Model.set_connection_resolver(db)
twm = ThreadedWebsocketManager(api_key=config['api_key'], api_secret=config['api_secret'])
twm.start()
def handle_socket_message(msg):
# print(msg)
try:
if 'depth' in msg['stream']:
depth = Depth()
depth.save_data(msg)
if 'miniTicker' in msg['stream']:
mini_ticker = MiniTicker()
mini_ticker.save_data(msg)
if 'aggTrade' in msg['stream']:
agg_trade = AggTrade()
agg_trade.save_data(msg)
except Exception as e:
logger.error(e)
streams = config['streams'].split(',')
twm.start_multiplex_socket(callback=handle_socket_message, streams=streams)
twm.join()
if __name__ == '__main__':
main()
処理としては multiplex socketをスレッドで立ち上げ、msgのstreamに応じてPG(PostgreSQL)へ書き込んでいるだけです。
今回はdepth/miniTicker/aggTradeのデータを保存していますが、髭取りがいけるかどうかという話であれば、おそらくaggTradeのデータで検証が可能では?と想定しています。
MT5側から以下のようなデータを受け付けるサーバー側APIをはやしています。
curl -X 'POST' \
'https://example.com:3000/currency_pair' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"name": "string",
"time": 0,
"bid": 0,
"ask": 0,
"last": 0,
"volume": 0,
"time_msc": 0
}'
そんなに大量のデータを受け付けるわけでは有りませんが、念の為asyncにしています。
FastAPI今回始めて触ってみたのですが、REST APIの実装は言うまでもなく、動かせるドキュメンテーション自動で生成してくれたりと本当に便利でした。
app/recv.py
from locale import currency
import os
from dotenv import dotenv_values
from rich import print
import asyncio
from orator import DatabaseManager
from orator import Model
from models.mini_ticker import MiniTicker
from models.agg_trade import AggTrade
from models.depth import Depth
from models.currency_pair_log import CurrencyPairLog
from loguru import logger
import time
from fastapi import FastAPI
from models.currency_pair import CurrencyPair
config = dotenv_values('.env')
logger.add(
'logs/recv.ltsv',
format="{time:YYYY-MM-DDTHH:mm:ss} \t{file} \t{name}:{function}:{line} \t{level} \t{message}",
rotation='512 MB', retention=100
)
db_config = {
'default': 'postgres',
'postgres': {
'driver': 'postgres',
'host': config['db_host'],
'database': config['db_name'],
'user': config['db_user'],
'password': config['db_pass'],
'prefix': ''
}
}
db = DatabaseManager(db_config)
Model.set_connection_resolver(db)
app = FastAPI()
@app.post("/currency_pair")
async def create_currency_pair(currency_pair: CurrencyPair):
msg = currency_pair.to_msg()
cpl = CurrencyPairLog()
cpl.save_data(msg)
return {'res': 'ok', 'name': currency_pair.name, 'bid': currency_pair.bid, 'ask': currency_pair.ask}
次回は取得したデータを元に、髭取りが可能なのかを調査してみたいと思います。
皆さんに爆益あれ。
(おしまい)