일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- 파이썬
- myposition
- Public_Trading_Records
- Query_Index_Price_Kline
- 데이터불러오기
- 프리미엄지수
- Query_Kline
- xgboost
- Python
- 백테스팅
- open_interest
- latest_big_deal
- 파아썬
- 자동매매
- 코인
- orderbook
- 바이비트
- 머신러닝
- 호가창
- Bybit
- Query_Premium_Index_Kline
- 변동성돌파
- Machine Learning
- place_active_order
- 비트코인
- 모멘텀지표
- kline
- 롱숏비율
- bitcoin
- API
- Today
- Total
돈벌고싶다
나만의 코인 단타 투자법 - 2. 머신러닝을 통한 매수/매도 알림 시스템 구축 본문
서론
내가 쓰고 있는 다양한 보조 시스템을 공유하고자 한다. 이번 글에서 공유할 보조 시스템은 머신러닝을 통한 매수/매도 알림 시스템이다. 성능이 좋아 공유 여부를 많이 고민하다가, 어차피 해당 코드를 진짜로 활용하여 매매에 활용하는 사람은 적을 것이라 판단하여, 데이터 및 모델을 너프하여 공유하도록 결정했다. 이것만 보고 매수 매도를 하는 것이 아니기 때문에, 해당 코드를 통해 자동매매시스템을 구현해서는 안된다. 필자의 경우 보조지표가 생겨날수록 포지션 진입에 대한 근거가 강하게 생기고, 그럴수록 과감한 행동을 통해 적절한 순간에 매수 매도를 할 수 있었다. 독자 분들도 본인의 매매 기법이 아직 완벽하다고 생각들지 않는다면 추가하여 활용해보는 것도 좋아 보인다.
프로세스(전략) 설명
일단 모델은 다음과 같이 학습시켰다.
- 매수 컬럼명을 'long', 매도 컬럼명을 'short' 으로 설정한다.
- 매수(long) : 해당 row 기준 후로 10분동안 정해진 가격(코드의 경우 5%) 이상 상승하는 경우 1, 아닌 경우 0
- 매도(short) : 해당 row 기준 후로 10분동안 정해진 가격(코드의 경우 -5%) 이상 하락하는 경우 1, 아닌 경우 0
- Data engineering 후 XGBoost를 활용하여 classification 진행
코드
1. 라이브러리 호출
# 라이브러리
from pybit import usdt_perpetual
import numpy as np
import pandas as pd
import schedule
import time
import datetime
import calendar
import math
import ccxt
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, roc_curve, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split, StratifiedKFold, KFold
import xgboost as xgb
import seaborn as sns
2. 각종 함수 정의
def get_binance_data(symbol, interval, end_date):
btc_ohlcv = binance.fetch_ohlcv(symbol, interval, limit=1000, params={'endTime':end_date})
df = pd.DataFrame(btc_ohlcv, columns=['datetime', 'open', 'high', 'low', 'close', 'volume'])
df['datetime'] = pd.to_datetime(df['datetime'], unit='ms')
df.set_index('datetime', inplace=True)
return df
def to_mstimestamp(string):
string = datetime.datetime.strptime(string, "%Y-%m-%d %H:%M:%S")
string = datetime.datetime.timestamp(string)
string = int(string) * 1000
return string
# MACD
def fnMACD(m_Df, column, m_NumFast=12, m_NumSlow=26, m_NumSignal=9):
m_Df['EMAFast'] = m_Df[column].ewm(span = m_NumFast, min_periods = m_NumFast - 1).mean()
m_Df['EMASlow'] = m_Df[column].ewm(span = m_NumSlow, min_periods = m_NumSlow - 1).mean()
m_Df[f'MACD_{column}'] = m_Df['EMAFast'] - m_Df['EMASlow']
m_Df[f'MACDSignal_{column}'] = m_Df[f'MACD_{column}'].ewm(span = m_NumSignal, min_periods = m_NumSignal-1).mean()
m_Df[f'MACDDiff_{column}'] = m_Df[f'MACD_{column}'] - m_Df[f'MACDSignal_{column}']
del m_Df['EMAFast']
del m_Df['EMASlow']
return m_Df
# stochastic
def get_stochastic(df, n=15):
df['fast_k'] = ((df['close'] - df['low'].rolling(n).min()) / (df['high'].rolling(n).max() - df['low'].rolling(n).min())) * 100
df['slow_k'] = df['fast_k'].rolling(n).mean()
df['slow_d'] = df['slow_k'].rolling(n).mean()
df['stochasticDiff'] = df['slow_k'] - df['slow_d']
return df
# moving average 계산
def get_moving_average(df, column):
df[f'ma5_{column}'] = df[column].rolling(window=5).mean()
df[f'ma10_{column}'] = df[column].rolling(window=10).mean()
df[f'ma15_{column}'] = df[column].rolling(window=15).mean()
df[f'ma20_{column}'] = df[column].rolling(window=20).mean()
df[f'ma25_{column}'] = df[column].rolling(window=25).mean()
df[f'ma30_{column}'] = df[column].rolling(window=30).mean()
df[f'ma60_{column}'] = df[column].rolling(window=60).mean()
df[f'ma5_{column}_compare'] = df.apply(lambda x: 0 if x['close'] < x[f'ma5_{column}'] else 1, axis=1)
df[f'ma10_{column}_compare'] = df.apply(lambda x: 0 if x['close'] < x[f'ma10_{column}'] else 1, axis=1)
df[f'ma15_{column}_compare'] = df.apply(lambda x: 0 if x['close'] < x[f'ma15_{column}'] else 1, axis=1)
df[f'ma20_{column}_compare'] = df.apply(lambda x: 0 if x['close'] < x[f'ma20_{column}'] else 1, axis=1)
df[f'ma25_{column}_compare'] = df.apply(lambda x: 0 if x['close'] < x[f'ma25_{column}'] else 1, axis=1)
df[f'ma30_{column}_compare'] = df.apply(lambda x: 0 if x['close'] < x[f'ma30_{column}'] else 1, axis=1)
df[f'ma60_{column}_compare'] = df.apply(lambda x: 0 if x['close'] < x[f'ma60_{column}'] else 1, axis=1)
df[f'ma_{column}_trend'] = df[f'ma5_{column}_compare'] + df[f'ma10_{column}_compare'] + df[f'ma15_{column}_compare'] + df[f'ma20_{column}_compare'] + df[f'ma25_{column}_compare'] + df[f'ma30_{column}_compare'] + df[f'ma60_{column}_compare']
del df[f'ma5_{column}']
del df[f'ma10_{column}']
del df[f'ma15_{column}']
del df[f'ma20_{column}']
del df[f'ma25_{column}']
del df[f'ma30_{column}']
del df[f'ma60_{column}']
return df
# momentum 계산
def get_momentum(df, column):
df[f'momentum10_{column}'] = df[column].pct_change(10)
df[f'ma_momentum10_{column}'] = df[f'momentum10_{column}'].rolling(window=9).mean()
df[f'momentum10_{column}_compare'] = df.apply(lambda x: 0 if x[f'momentum10_{column}'] < x[f'ma_momentum10_{column}'] else 1, axis=1)
df[f'momentum20_{column}'] = df[column].pct_change(20)
df[f'ma_momentum20_{column}'] = df[f'momentum20_{column}'].rolling(window=9).mean()
df[f'momentum20_{column}_compare'] = df.apply(lambda x: 0 if x[f'momentum20_{column}'] < x[f'ma_momentum20_{column}'] else 1, axis=1)
df[f'momentum30_{column}'] = df[column].pct_change(30)
df[f'ma_momentum30_{column}'] = df[f'momentum30_{column}'].rolling(window=9).mean()
df[f'momentum30_{column}_compare'] = df.apply(lambda x: 0 if x[f'momentum30_{column}'] < x[f'ma_momentum30_{column}'] else 1, axis=1)
df[f'momentum_{column}_trend'] = df[f'momentum10_{column}_compare'] + df[f'momentum20_{column}_compare'] + df[f'momentum30_{column}_compare']
return df
def get_rsi(df, period=14):
U = np.where(df['close'].diff(1) > 0, df['close'].diff(1), 0)
D = np.where(df['close'].diff(1) < 0, df['close'].diff(1) *(-1), 0)
AU = pd.DataFrame(U, index=df.index).rolling(window=period).mean()
AD = pd.DataFrame(D, index=df.index).rolling(window=period).mean()
RSI = AU / (AD+AU) *100
df['RSI'] = RSI
return df
# xgboost
def build_xgboost(split_num, train, target, test, test_target, class_names):
params = {
'tree_method':'gpu_hist',
'colsample_bytree': 0.7,
'subsample': 0.8,
'eta': 0.04,
'max_depth': 8,
'eval_metric':'auc',
'objective':'binary:logistic',
'num_class':1
}
# return train pred prob and test pred prob
train_pred, test_pred = np.zeros((train.shape[0], 1)), np.zeros((test.shape[0], 1))
cv_accuracy = []
n_iter = 1
skf = StratifiedKFold(n_splits=split_num, shuffle=True, random_state=2021)
for train_idx, val_idx in skf.split(train, target):
print(f'\n-------------------------------- {n_iter} 번째 fold --------------------------------\n')
# split train, validation set
X = train[train_idx]
y = target[train_idx]
valid_x = train[val_idx]
valid_y = target[val_idx]
d_train = xgb.DMatrix(X, y)
d_valid = xgb.DMatrix(valid_x, valid_y)
d_temp = xgb.DMatrix(valid_x)
d_test = xgb.DMatrix(test)
watchlist = [(d_train, 'train'), (d_valid, 'valid')]
#run traning
model = xgb.train(params, d_train, 1000, watchlist,
early_stopping_rounds=25,
verbose_eval=50)
# save feat
train_pred[val_idx] = model.predict(d_temp).reshape(-1,1)
test_pred += model.predict(d_test).reshape(-1,1)/split_num
# 반복 시 마다 정확도 측정
accuracy = np.round(accuracy_score(valid_y, np.around(train_pred[val_idx])), 4)
print(classification_report(valid_y, np.around(train_pred[val_idx]), target_names=class_names))
# 해당 fold 최종 결과
print(f'\n{n_iter}번째 교차검증 정확도 : {accuracy}, 학습 데이터 크기 : {X.shape[0]}, 검증 데이터 크기 : {valid_x.shape[0]}')
cv_accuracy.append(accuracy)
n_iter += 1
print(f'\n-------------------------------- 최종 모델 평가 --------------------------------\n')
# 오차 행렬을 만듭니다.
matrix = confusion_matrix(test_target, np.around(test_pred))
# 판다스 데이터프레임을 만듭니다.
dataframe = pd.DataFrame(matrix, index=class_names, columns=class_names)
sns.heatmap(dataframe, annot=True, cbar=None, cmap="Blues") # 히트맵 생성
plt.title("Confusion Matrix"), plt.tight_layout()
plt.ylabel("True Class"), plt.xlabel("Predicted Class")
plt.show()
# 개별 iteration별 정확도를 합하여 평균 정확도 계산
print('\n# 평균검증 정확도: ', np.mean(cv_accuracy))
return test_pred
3. 데이터 불러오기
now = datetime.datetime.utcnow() + datetime.timedelta(hours=9)
symbol = 'ETHUSDT'
interval = '1m'
binance = ccxt.binance()
dfs = []
df = get_binance_data(symbol, interval, int(datetime.datetime.timestamp(now))*1000)
dfs.append(df)
try:
for i in range(60):
df = get_binance_data(symbol, interval, int(datetime.datetime.timestamp(df.index[0]))*1000)
dfs.append(df)
time.sleep(0.2)
except:
pass
df = pd.concat(dfs)
# df = df.sort_index()
필자의 경우 6만개의 row만 불러왔다. 더 정밀한 모델링을 원할 경우 개개인의 상황에 따라 더 불러오는 것이 맞다. 왜냐면 이후에 불균형한 데이터를 맞추기 위해서 train에 활용할 데이터를 소량만 sampling 할 것이기 때문이다. 나의 경우 10분 이내로 5% 이상의 변동성을 가지는 데이터는 61000개 중 3000개가 안됐기 때문에, 학습은 6000개로 이루어졌다고 보면 된다.
4. long, short 정의
profit = 0.05 # 5% 수익이 가능한 위치인지 확인
reverage = 10 # 레버리지
fee = 0.06 # 수수료
df['lowest'] = df['low'].rolling(window=10).min()
df['highest'] = df['high'].rolling(window=10).max()
# ((profit + fee/50) / reverage) : 살때, 팔때 수수료가 두번 나오므로 fee / 100 * 2, 수익률과 수수료 모두 레버리지 영향을 받으므로 전체에 대해 /reverage
df['long'] = df.apply(lambda x: 1 if x['highest'] - x['close'] > x['close'] * ((profit + fee/50) / reverage) else 0, axis=1)
df['short'] = df.apply(lambda x: 1 if x['close'] - x['lowest'] > x['close'] * ((profit + fee/50) / reverage) else 0, axis=1)
df = df.iloc[::-1]
- 매수(long) : 해당 row 기준 후로 10분동안 정해진 가격(코드의 경우 5%) 이상 상승하는 경우 1, 아닌 경우 0
- 매도(short) : 해당 row 기준 후로 10분동안 정해진 가격(코드의 경우 -5%) 이상 하락하는 경우 1, 아닌 경우 0
5. Data Engineering
df = get_stochastic(df)
df = get_rsi(df)
df = get_iip(df)
column_list = ['open', 'high', 'low', 'close', 'volume']
for column in column_list:
df = get_moving_average(df, column)
df = get_momentum(df, column)
df = fnMACD(df, column)
df = df.dropna()
df.reset_index(drop=True, inplace=True)
6. 매수(long) 학습
train = df[:int(len(df)*0.7)].reset_index(drop=True)
test = df[int(len(df)*0.7):].reset_index(drop=True)
train0 = train[train['long']==0].sample(3000)
train1 = train[train['long']==1]
train = pd.concat([train0, train1])
train_df, train_y = train.drop(['lowest', 'highest', 'long', 'short', 'open', 'close', 'high', 'low'], axis=1), train['long']
test_df, test_y = test.drop(['lowest', 'highest', 'long', 'short', 'open', 'close', 'high', 'low'], axis=1), test['long']
# 모델 학습 진행
test['pred_long'] = build_xgboost(5, train_df.values, train_y.values, test_df.values, test_y, ['0', '1'])
7. 매도(short) 학습
train = df[:int(len(df)*0.7)].reset_index(drop=True)
train0 = train[train['short']==0].sample(3000)
train1 = train[train['short']==1]
train = pd.concat([train0, train1])
train_df, train_y = train.drop(['lowest', 'highest', 'long', 'short', 'open', 'close', 'high', 'low'], axis=1), train['short']
test_df, test_y = test.drop(['lowest', 'highest', 'long', 'short', 'open', 'close', 'high', 'low', 'pred_long'], axis=1), test['short']
# 모델 학습 진행
test['pred_short'] = build_xgboost(5, train_df.values, train_y.values, test_df.values, test_y, ['0', '1'])
8. Conclusion
def get_signal(data):
data.reset_index(drop=True, inplace=True)
buy_signal = []
sell_signal = []
for i in range(0, len(data['pred_long'])):
if data['pred_long'][i] > 0.5:
buy_signal.append(data['close'][i])
sell_signal.append(np.nan)
elif data['pred_short'][i] > 0.5:
buy_signal.append(np.nan)
sell_signal.append(data['close'][i])
else:
buy_signal.append(np.nan)
sell_signal.append(np.nan)
return (buy_signal, sell_signal)
def check(data):
plt.figure(figsize=(12, 5))
plt.plot(data['close'], label='close price', alpha=0.5)
plt.scatter(data.index, data['buy'], color='green', label='buy signal', marker='^', alpha=1)
plt.scatter(data.index, data['sell'], color='red', label='sell signal', marker='v', alpha=1)
plt.title('close price')
plt.xlabel('date')
plt.ylabel('close price')
plt.legend(loc='upper left')
plt.show()
test['buy'] = get_signal(test)[0]
test['sell'] = get_signal(test)[1]
check(test[16000:])
알아보기 쉽게 최근 2000분 동안 매수 매도 신호가 언제 주어졌는지 시각화해보았다(전체를 시각화할 경우 너무 오밀조밀하게 모여있어 구분이 어려움).
생각보다 성능이 좋다. 항상 맞는 것은 아니기에 주의해야 한다. 나는 다음과 같은 순간에 매매할 경우 잘되는 경향이 있었다:
- 연속으로 시그널이 생길경우 매매
- 매도 / 매수 시그널이 모두 생길 경우 ignore
- 큰 [상승, 하락]의 [중간, 이후]에 생기는 시그널은 무시
- 저항선, 지지선을 계속해서 주시하며 매매
매수매도를 위한 보조지표로 활용할 경우 큰 도움이 될 수 있다. 다시 한번 말하지만 해당 지표만을 이용하여 자동매매 프로그램을 구현해서는 안된다. 다른 다양한 지표를 활용하여 복합적으로 현 상황을 이해하고 매수/매도를 해야한다. 독자 분들도 유용하게 사용하여 코인장에서 건승했으면 좋겠다.
'투자법 공유' 카테고리의 다른 글
나만의 코인 단타 투자법 - 1. 우리의 자동매매 프로그램, 왜 수익을 내지 못하는가? (2) | 2023.03.31 |
---|