돈벌고싶다

나만의 코인 단타 투자법 - 2. 머신러닝을 통한 매수/매도 알림 시스템 구축 본문

투자법 공유

나만의 코인 단타 투자법 - 2. 머신러닝을 통한 매수/매도 알림 시스템 구축

coinwithpython 2023. 4. 10. 01:45
728x90
반응형

서론

내가 쓰고 있는 다양한 보조 시스템을 공유하고자 한다. 이번 글에서 공유할 보조 시스템은 머신러닝을 통한 매수/매도 알림 시스템이다. 성능이 좋아 공유 여부를 많이 고민하다가, 어차피 해당 코드를 진짜로 활용하여 매매에 활용하는 사람은 적을 것이라 판단하여, 데이터 및 모델을 너프하여 공유하도록 결정했다. 이것만 보고 매수 매도를 하는 것이 아니기 때문에, 해당 코드를 통해 자동매매시스템을 구현해서는 안된다. 필자의 경우 보조지표가 생겨날수록 포지션 진입에 대한 근거가 강하게 생기고, 그럴수록 과감한 행동을 통해 적절한 순간에 매수 매도를 할 수 있었다. 독자 분들도 본인의 매매 기법이 아직 완벽하다고 생각들지 않는다면 추가하여 활용해보는 것도 좋아 보인다.


프로세스(전략) 설명

일단 모델은 다음과 같이 학습시켰다.

  1. 매수 컬럼명을 'long', 매도 컬럼명을 'short' 으로 설정한다.
  2. 매수(long) : 해당 row 기준 후로 10분동안 정해진 가격(코드의 경우 5%) 이상 상승하는 경우 1, 아닌 경우 0
  3. 매도(short) : 해당 row 기준 후로 10분동안 정해진 가격(코드의 경우 -5%) 이상 하락하는 경우 1, 아닌 경우 0
  4. Data engineering 후 XGBoost를 활용하여 classification 진행

코드

1. 라이브러리 호출

# 라이브러리
from pybit import usdt_perpetual
import numpy as np
import pandas as pd
import schedule
import time
import datetime
import calendar
import math
import ccxt
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, roc_curve, confusion_matrix, classification_report
from sklearn.model_selection import train_test_split, StratifiedKFold, KFold
import xgboost as xgb
import seaborn as sns

 

2. 각종 함수 정의

def get_binance_data(symbol, interval, end_date):
    btc_ohlcv = binance.fetch_ohlcv(symbol, interval, limit=1000, params={'endTime':end_date})
    df = pd.DataFrame(btc_ohlcv, columns=['datetime', 'open', 'high', 'low', 'close', 'volume'])
    df['datetime'] = pd.to_datetime(df['datetime'], unit='ms')
    df.set_index('datetime', inplace=True)
    return df

def to_mstimestamp(string):
    string = datetime.datetime.strptime(string, "%Y-%m-%d %H:%M:%S")
    string = datetime.datetime.timestamp(string)
    string = int(string) * 1000
    return string

# MACD
def fnMACD(m_Df, column, m_NumFast=12, m_NumSlow=26, m_NumSignal=9):
    m_Df['EMAFast'] = m_Df[column].ewm(span = m_NumFast, min_periods = m_NumFast - 1).mean()
    m_Df['EMASlow'] = m_Df[column].ewm(span = m_NumSlow, min_periods = m_NumSlow - 1).mean()
    m_Df[f'MACD_{column}'] = m_Df['EMAFast'] - m_Df['EMASlow']
    m_Df[f'MACDSignal_{column}'] = m_Df[f'MACD_{column}'].ewm(span = m_NumSignal, min_periods = m_NumSignal-1).mean()
    m_Df[f'MACDDiff_{column}'] = m_Df[f'MACD_{column}'] - m_Df[f'MACDSignal_{column}']
    del m_Df['EMAFast']
    del m_Df['EMASlow']
    return m_Df

# stochastic
def get_stochastic(df, n=15):
    df['fast_k'] = ((df['close'] - df['low'].rolling(n).min()) / (df['high'].rolling(n).max() - df['low'].rolling(n).min())) * 100
    df['slow_k'] = df['fast_k'].rolling(n).mean()
    df['slow_d'] = df['slow_k'].rolling(n).mean()
    df['stochasticDiff'] = df['slow_k'] - df['slow_d']
    return df

# moving average 계산
def get_moving_average(df, column):
    df[f'ma5_{column}'] = df[column].rolling(window=5).mean()
    df[f'ma10_{column}'] = df[column].rolling(window=10).mean()
    df[f'ma15_{column}'] = df[column].rolling(window=15).mean()
    df[f'ma20_{column}'] = df[column].rolling(window=20).mean()
    df[f'ma25_{column}'] = df[column].rolling(window=25).mean()
    df[f'ma30_{column}'] = df[column].rolling(window=30).mean()
    df[f'ma60_{column}'] = df[column].rolling(window=60).mean()
    df[f'ma5_{column}_compare'] = df.apply(lambda x: 0 if x['close'] < x[f'ma5_{column}'] else 1, axis=1)
    df[f'ma10_{column}_compare'] = df.apply(lambda x: 0 if x['close'] < x[f'ma10_{column}'] else 1, axis=1)
    df[f'ma15_{column}_compare'] = df.apply(lambda x: 0 if x['close'] < x[f'ma15_{column}'] else 1, axis=1)
    df[f'ma20_{column}_compare'] = df.apply(lambda x: 0 if x['close'] < x[f'ma20_{column}'] else 1, axis=1)
    df[f'ma25_{column}_compare'] = df.apply(lambda x: 0 if x['close'] < x[f'ma25_{column}'] else 1, axis=1)
    df[f'ma30_{column}_compare'] = df.apply(lambda x: 0 if x['close'] < x[f'ma30_{column}'] else 1, axis=1)
    df[f'ma60_{column}_compare'] = df.apply(lambda x: 0 if x['close'] < x[f'ma60_{column}'] else 1, axis=1)
    df[f'ma_{column}_trend'] = df[f'ma5_{column}_compare'] + df[f'ma10_{column}_compare'] + df[f'ma15_{column}_compare'] + df[f'ma20_{column}_compare'] + df[f'ma25_{column}_compare'] + df[f'ma30_{column}_compare'] + df[f'ma60_{column}_compare']
    del df[f'ma5_{column}']
    del df[f'ma10_{column}']
    del df[f'ma15_{column}']
    del df[f'ma20_{column}']
    del df[f'ma25_{column}']
    del df[f'ma30_{column}']
    del df[f'ma60_{column}']
    return df

# momentum 계산
def get_momentum(df, column):
    df[f'momentum10_{column}'] = df[column].pct_change(10)
    df[f'ma_momentum10_{column}'] = df[f'momentum10_{column}'].rolling(window=9).mean()
    df[f'momentum10_{column}_compare'] = df.apply(lambda x: 0 if x[f'momentum10_{column}'] < x[f'ma_momentum10_{column}'] else 1, axis=1)
    df[f'momentum20_{column}'] = df[column].pct_change(20)
    df[f'ma_momentum20_{column}'] = df[f'momentum20_{column}'].rolling(window=9).mean()
    df[f'momentum20_{column}_compare'] = df.apply(lambda x: 0 if x[f'momentum20_{column}'] < x[f'ma_momentum20_{column}'] else 1, axis=1)
    df[f'momentum30_{column}'] = df[column].pct_change(30)
    df[f'ma_momentum30_{column}'] = df[f'momentum30_{column}'].rolling(window=9).mean()
    df[f'momentum30_{column}_compare'] = df.apply(lambda x: 0 if x[f'momentum30_{column}'] < x[f'ma_momentum30_{column}'] else 1, axis=1)
    df[f'momentum_{column}_trend'] = df[f'momentum10_{column}_compare'] + df[f'momentum20_{column}_compare'] + df[f'momentum30_{column}_compare']
    return df

def get_rsi(df, period=14):
    U = np.where(df['close'].diff(1) > 0, df['close'].diff(1), 0)
    D = np.where(df['close'].diff(1) < 0, df['close'].diff(1) *(-1), 0)
    AU = pd.DataFrame(U, index=df.index).rolling(window=period).mean()
    AD = pd.DataFrame(D, index=df.index).rolling(window=period).mean()
    RSI = AU / (AD+AU) *100
    df['RSI'] = RSI
    return df
    
# xgboost
def build_xgboost(split_num, train, target, test, test_target, class_names):
    
    params = {
                'tree_method':'gpu_hist',
                'colsample_bytree': 0.7,
                'subsample': 0.8,
                'eta': 0.04,
                'max_depth': 8,
                'eval_metric':'auc',
                'objective':'binary:logistic',
                'num_class':1
                }
    
    # return train pred prob and test pred prob 
    train_pred, test_pred = np.zeros((train.shape[0], 1)), np.zeros((test.shape[0], 1))
    cv_accuracy = []
    n_iter = 1
    
    skf = StratifiedKFold(n_splits=split_num, shuffle=True, random_state=2021)
    for train_idx, val_idx in skf.split(train, target):

        print(f'\n-------------------------------- {n_iter} 번째 fold --------------------------------\n')
        
        # split train, validation set
        X = train[train_idx]
        y = target[train_idx]
        valid_x = train[val_idx]
        valid_y = target[val_idx]

        d_train = xgb.DMatrix(X, y)
        d_valid = xgb.DMatrix(valid_x, valid_y)
        d_temp = xgb.DMatrix(valid_x)
        d_test = xgb.DMatrix(test)
        
        watchlist = [(d_train, 'train'), (d_valid, 'valid')]
        
        #run traning
        model = xgb.train(params, d_train, 1000, watchlist, 
                        early_stopping_rounds=25,
                        verbose_eval=50)

        # save feat
        train_pred[val_idx] = model.predict(d_temp).reshape(-1,1)
        test_pred += model.predict(d_test).reshape(-1,1)/split_num
        
        # 반복 시 마다 정확도 측정
        accuracy = np.round(accuracy_score(valid_y, np.around(train_pred[val_idx])), 4)
        print(classification_report(valid_y, np.around(train_pred[val_idx]), target_names=class_names))
        
        # 해당 fold 최종 결과
        print(f'\n{n_iter}번째 교차검증 정확도 : {accuracy}, 학습 데이터 크기 : {X.shape[0]}, 검증 데이터 크기 : {valid_x.shape[0]}')
        cv_accuracy.append(accuracy)
        n_iter += 1
        
    print(f'\n-------------------------------- 최종 모델 평가 --------------------------------\n')
    
    # 오차 행렬을 만듭니다.
    matrix = confusion_matrix(test_target, np.around(test_pred))

    # 판다스 데이터프레임을 만듭니다.
    dataframe = pd.DataFrame(matrix, index=class_names, columns=class_names)
    sns.heatmap(dataframe, annot=True, cbar=None, cmap="Blues") # 히트맵 생성
    plt.title("Confusion Matrix"), plt.tight_layout()
    plt.ylabel("True Class"), plt.xlabel("Predicted Class")
    plt.show()

    # 개별 iteration별 정확도를 합하여 평균 정확도 계산
    print('\n# 평균검증 정확도: ', np.mean(cv_accuracy))
    return test_pred

 

3. 데이터 불러오기

now = datetime.datetime.utcnow() + datetime.timedelta(hours=9)
symbol = 'ETHUSDT'
interval = '1m'
binance = ccxt.binance()

dfs = []
df = get_binance_data(symbol, interval, int(datetime.datetime.timestamp(now))*1000)
dfs.append(df)

try:
    for i in range(60):
        df = get_binance_data(symbol, interval, int(datetime.datetime.timestamp(df.index[0]))*1000)
        dfs.append(df)
        time.sleep(0.2)
except:
    pass

df = pd.concat(dfs)
# df = df.sort_index()

필자의 경우 6만개의 row만 불러왔다. 더 정밀한 모델링을 원할 경우 개개인의 상황에 따라 더 불러오는 것이 맞다. 왜냐면 이후에 불균형한 데이터를 맞추기 위해서 train에 활용할 데이터를 소량만 sampling 할 것이기 때문이다. 나의 경우 10분 이내로 5% 이상의 변동성을 가지는 데이터는 61000개 중 3000개가 안됐기 때문에, 학습은 6000개로 이루어졌다고 보면 된다.

 

4. long, short 정의

profit = 0.05      # 5% 수익이 가능한 위치인지 확인
reverage = 10      # 레버리지
fee = 0.06         # 수수료

df['lowest'] = df['low'].rolling(window=10).min()
df['highest'] = df['high'].rolling(window=10).max()

# ((profit + fee/50) / reverage) : 살때, 팔때 수수료가 두번 나오므로 fee / 100 * 2, 수익률과 수수료 모두 레버리지 영향을 받으므로 전체에 대해 /reverage
df['long'] = df.apply(lambda x: 1 if x['highest'] - x['close'] > x['close'] * ((profit + fee/50) / reverage) else 0, axis=1)
df['short'] = df.apply(lambda x: 1 if x['close'] - x['lowest'] > x['close'] * ((profit + fee/50) / reverage) else 0, axis=1)

df = df.iloc[::-1]
  • 매수(long) : 해당 row 기준 후로 10분동안 정해진 가격(코드의 경우 5%) 이상 상승하는 경우 1, 아닌 경우 0
  • 매도(short) : 해당 row 기준 후로 10분동안 정해진 가격(코드의 경우 -5%) 이상 하락하는 경우 1, 아닌 경우 0

 

5. Data Engineering

df = get_stochastic(df)
df = get_rsi(df)
df = get_iip(df)
column_list = ['open', 'high', 'low', 'close', 'volume']
for column in column_list:
    df = get_moving_average(df, column)
    df = get_momentum(df, column)
    df = fnMACD(df, column)

df = df.dropna()
df.reset_index(drop=True, inplace=True)

 

6. 매수(long) 학습

train = df[:int(len(df)*0.7)].reset_index(drop=True)
test = df[int(len(df)*0.7):].reset_index(drop=True)

train0 = train[train['long']==0].sample(3000)
train1 = train[train['long']==1]
train = pd.concat([train0, train1])

train_df, train_y = train.drop(['lowest', 'highest', 'long', 'short', 'open', 'close', 'high', 'low'], axis=1), train['long']
test_df, test_y = test.drop(['lowest', 'highest', 'long', 'short', 'open', 'close', 'high', 'low'], axis=1), test['long']

# 모델 학습 진행
test['pred_long'] = build_xgboost(5, train_df.values, train_y.values, test_df.values, test_y, ['0', '1'])

평균검증 정확도 : 0.8753

 

7. 매도(short) 학습

train = df[:int(len(df)*0.7)].reset_index(drop=True)

train0 = train[train['short']==0].sample(3000)
train1 = train[train['short']==1]
train = pd.concat([train0, train1])

train_df, train_y = train.drop(['lowest', 'highest', 'long', 'short', 'open', 'close', 'high', 'low'], axis=1), train['short']
test_df, test_y = test.drop(['lowest', 'highest', 'long', 'short', 'open', 'close', 'high', 'low', 'pred_long'], axis=1), test['short']

# 모델 학습 진행
test['pred_short'] = build_xgboost(5, train_df.values, train_y.values, test_df.values, test_y, ['0', '1'])

평균검증 정확도 : 0.8407

 

8. Conclusion

def get_signal(data):
    data.reset_index(drop=True, inplace=True)
    buy_signal = []
    sell_signal = []
    for i in range(0, len(data['pred_long'])):
        if data['pred_long'][i] > 0.5:
            buy_signal.append(data['close'][i])
            sell_signal.append(np.nan)
        elif data['pred_short'][i] > 0.5:
            buy_signal.append(np.nan)
            sell_signal.append(data['close'][i])
        else:
            buy_signal.append(np.nan)
            sell_signal.append(np.nan)
    return (buy_signal, sell_signal)

def check(data):
    plt.figure(figsize=(12, 5))
    plt.plot(data['close'], label='close price', alpha=0.5)
    plt.scatter(data.index, data['buy'], color='green', label='buy signal', marker='^', alpha=1)
    plt.scatter(data.index, data['sell'], color='red', label='sell signal', marker='v', alpha=1)
    plt.title('close price')
    plt.xlabel('date')
    plt.ylabel('close price')
    plt.legend(loc='upper left')
    plt.show()
test['buy'] = get_signal(test)[0]
test['sell'] = get_signal(test)[1]
check(test[16000:])

알아보기 쉽게 최근 2000분 동안 매수 매도 신호가 언제 주어졌는지 시각화해보았다(전체를 시각화할 경우 너무 오밀조밀하게 모여있어 구분이 어려움).

 

 

 

생각보다 성능이 좋다. 항상 맞는 것은 아니기에 주의해야 한다. 나는 다음과 같은 순간에 매매할 경우 잘되는 경향이 있었다:

  1. 연속으로 시그널이 생길경우 매매
  2. 매도 / 매수 시그널이 모두 생길 경우 ignore
  3. 큰 [상승, 하락]의 [중간, 이후]에 생기는 시그널은 무시
  4. 저항선, 지지선을 계속해서 주시하며 매매 

매수매도를 위한 보조지표로 활용할 경우 큰 도움이 될 수 있다. 다시 한번 말하지만 해당 지표만을 이용하여 자동매매 프로그램을 구현해서는 안된다. 다른 다양한 지표를 활용하여 복합적으로 현 상황을 이해하고 매수/매도를 해야한다. 독자 분들도 유용하게 사용하여 코인장에서 건승했으면 좋겠다.

 

 

728x90
반응형
Comments