This content originally appeared on DEV Community and was authored by Paozakai
Colab-ready: improved stable pipeline with Optuna + LGB + XGB + stacking
1) ก่อนรัน ให้อัปโหลดไฟล์ผ่าน Colab UI: /content/train.csv, /content/test.csv, /content/sample_submission.csv
2) Copy-paste ทั้งหมดนี้ใน cell เดียวแล้วรัน
-------------------------
ติดตั้งไลบรารี (ครั้งแรก)
!pip install --quiet lightgbm xgboost scikit-learn pandas numpy matplotlib optuna
-------------------------
import warnings
warnings.filterwarnings('ignore')
import os
import math
import time
import numpy as np
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error
from sklearn.linear_model import Ridge
import lightgbm as lgb
import xgboost as xgb
import optuna
from google.colab import files
from sklearn.preprocessing import StandardScaler
SEED = 42
np.random.seed(SEED)
-------------------------
Paths (Upload files via Colab file upload UI with these exact names)
TRAIN_PATH = "/content/train (1).csv"
TEST_PATH = "/content/test (1).csv"
SAMPLE_SUB_PATH = "/content/sample_submission (1).csv"
-------------------------
Load data
for p in [TRAIN_PATH, TEST_PATH, SAMPLE_SUB_PATH]:
if not os.path.exists(p):
raise FileNotFoundError(f"ไม่พบไฟล์: {p}. กรุณาอัปโหลดไฟล์ไปที่ Colab แล้วตั้งชื่อให้ตรง (train.csv, test.csv, sample_submission.csv)")
train = pd.read_csv(TRAIN_PATH)
test = pd.read_csv(TEST_PATH)
sample_sub = pd.read_csv(SAMPLE_SUB_PATH)
print("train shape:", train.shape)
print("test shape:", test.shape)
print("sample_sub shape:", sample_sub.shape)
print("ตัวอย่าง train:")
print(train.head())
Ensure sorted by id
train = train.sort_values('id').reset_index(drop=True)
test = test.sort_values('id').reset_index(drop=True)
Convert price to float
train['price'] = train['price'].astype(float)
Ensure test has price column (NaN)
if 'price' not in test.columns:
test['price'] = np.nan
else:
test['price'] = test['price'].astype(float)
Concat to build features that correctly use history
all_df = pd.concat([train[['id','price']], test[['id','price']]], ignore_index=True)
-------------------------
Feature engineering function (expanded but robust)
def create_features_full(df):
df = df.copy()
# lags (short, medium, seasonal)
lags = [1,2,3,5,7,14,21,28,30,60]
for l in lags:
df[f'lag_{l}'] = df['price'].shift(l)
# rolling windows
windows = [3,5,7,14,21,30,60]
for w in windows:
df[f'roll_mean_{w}'] = df['price'].shift(1).rolling(window=w, min_periods=1).mean()
df[f'roll_std_{w}'] = df['price'].shift(1).rolling(window=w, min_periods=1).std().fillna(0)
df[f'roll_med_{w}'] = df['price'].shift(1).rolling(window=w, min_periods=1).median().fillna(method='bfill').fillna(0)
# ewm
for span in [3,7,14,30]:
df[f'ewm_{span}'] = df['price'].shift(1).ewm(span=span, adjust=False).mean()
# expanding
df['expanding_mean'] = df['price'].shift(1).expanding().mean()
# diffs
df['diff_1'] = df['price'] - df['price'].shift(1)
df['diff_2'] = df['price'] - df['price'].shift(2)
df['pct_change_1'] = df['price'].pct_change(1)
df['pct_change_3'] = df['price'].pct_change(3)
# season proxies
df['id_mod7'] = df['id'] % 7
df['id_mod30'] = df['id'] % 30
df['day_of_year'] = df['id'] % 365
# ratio features
df['lag1_div_lag7'] = df['price'].shift(1) / df['price'].shift(7).replace(0, np.nan)
# sanitize
df = df.replace([np.inf, -np.inf], np.nan)
return df
all_feat = create_features_full(all_df)
train_feat = all_feat.iloc[:len(train)].reset_index(drop=True)
test_feat = all_feat.iloc[len(train):].reset_index(drop=True)
-------------------------
Prepare training rows: dropna on engineered features
TARGET = 'price'
train_model = train_feat.dropna().reset_index(drop=True)
train_model['target'] = np.log1p(train_model['price'].astype(float)) # log1p target
FEATURES = [c for c in train_model.columns if c not in ['price','target']]
print("จำนวน features:", len(FEATURES))
Validation split: use last VAL_SIZE rows as validation (time-series holdout)
VAL_SIZE = min(300, int(len(train_model) * 0.1))
train_X = train_model[FEATURES].iloc[:-VAL_SIZE].reset_index(drop=True)
train_y = train_model['target'].iloc[:-VAL_SIZE].reset_index(drop=True)
val_X = train_model[FEATURES].iloc[-VAL_SIZE:].reset_index(drop=True)
val_y = train_model['target'].iloc[-VAL_SIZE:].reset_index(drop=True)
Prepare test features (some NaNs possible; fill conservatively)
test_X = test_feat[FEATURES].copy().reset_index(drop=True)
test_X = test_X.fillna(method='ffill').fillna(method='bfill').fillna(0)
print("train_X shape:", train_X.shape, "val_X shape:", val_X.shape, "test_X shape:", test_X.shape)
-------------------------
Optuna objective: robust (catch exceptions and return inf when fail)
def lgb_objective(trial):
try:
param = {
'objective': 'regression',
'metric': 'rmse',
'boosting_type': 'gbdt',
'verbosity': -1,
'seed': SEED,
'learning_rate': trial.suggest_float('learning_rate', 0.005, 0.05, log=True),
'num_leaves': trial.suggest_int('num_leaves', 20, 128),
'min_data_in_leaf': trial.suggest_int('min_data_in_leaf', 10, 200),
'feature_fraction': trial.suggest_float('feature_fraction', 0.5, 1.0),
'bagging_fraction': trial.suggest_float('bagging_fraction', 0.5, 1.0),
'bagging_freq': trial.suggest_int('bagging_freq', 1, 7),
'lambda_l1': trial.suggest_float('lambda_l1', 0.0, 5.0),
'lambda_l2': trial.suggest_float('lambda_l2', 0.0, 5.0)
}
# small time-series CV inside objective to estimate performance
cv = TimeSeriesSplit(n_splits=3)
rmses = []
for tr_idx, va_idx in cv.split(train_X):
dtr = lgb.Dataset(train_X.iloc[tr_idx], label=train_y.iloc[tr_idx])
dva = lgb.Dataset(train_X.iloc[va_idx], label=train_y.iloc[va_idx], reference=dtr)
bst = lgb.train(param, dtr, num_boost_round=1500, valid_sets=[dva],
callbacks=[lgb.early_stopping(50, verbose=False)])
pred = bst.predict(train_X.iloc[va_idx], num_iteration=bst.best_iteration)
rmses.append(np.sqrt(mean_squared_error(train_y.iloc[va_idx], pred)))
return float(np.mean(rmses))
except Exception as e:
# print to debug but don't fail the trial
print("Trial failed with exception:", e)
return float("inf")
Run optuna study (small number trials by default; increase if you want)
study = optuna.create_study(direction='minimize', sampler=optuna.samplers.TPESampler(seed=SEED))
try:
study.optimize(lgb_objective, n_trials=20, timeout=300) # n_trials or timeout (seconds)
except Exception as e:
print("Optuna run exception:", e)
Safely extract best params
best_params = {}
try:
if study.best_trial is not None and study.best_trial.values is not None:
best_params = study.best_params
except Exception as e:
print("No valid best_params from Optuna; fallback to defaults.", e)
print("best_params:", best_params)
Default fallback params (merge)
lgb_params = {
'objective': 'regression',
'metric': 'rmse',
'boosting_type': 'gbdt',
'verbosity': -1,
'seed': SEED,
'learning_rate': best_params.get('learning_rate', 0.03),
'num_leaves': int(best_params.get('num_leaves', 64)),
'min_data_in_leaf': int(best_params.get('min_data_in_leaf', 20)),
'feature_fraction': best_params.get('feature_fraction', 0.8),
'bagging_fraction': best_params.get('bagging_fraction', 0.8),
'bagging_freq': int(best_params.get('bagging_freq', 5)),
'lambda_l1': best_params.get('lambda_l1', 0.0),
'lambda_l2': best_params.get('lambda_l2', 0.0)
}
-------------------------
Train final LightGBM on train_X with early stopping on validation
dtrain = lgb.Dataset(train_X, label=train_y)
dval = lgb.Dataset(val_X, label=val_y, reference=dtrain)
print("\nTraining final LightGBM...")
lgbm = lgb.train(lgb_params, dtrain, num_boost_round=3000, valid_sets=[dval],
callbacks=[lgb.early_stopping(100, verbose=100)])
val_pred_lgb_log = lgbm.predict(val_X, num_iteration=lgbm.best_iteration)
val_rmse_lgb_log = np.sqrt(mean_squared_error(val_y, val_pred_lgb_log))
print("LightGBM val RMSE (log1p):", val_rmse_lgb_log)
-------------------------
Train XGBoost (stable set of hyperparams)
print("\nTraining XGBoost...")
dtrain_xgb = xgb.DMatrix(train_X, label=train_y)
dval_xgb = xgb.DMatrix(val_X, label=val_y)
dtest_xgb = xgb.DMatrix(test_X)
xgb_params = {
'objective': 'reg:squarederror',
'eval_metric': 'rmse',
'eta': 0.03,
'max_depth': 6,
'subsample': 0.8,
'colsample_bytree': 0.8,
'seed': SEED
}
model_xgb = xgb.train(xgb_params, dtrain_xgb, num_boost_round=3000, evals=[(dval_xgb, 'val')],
early_stopping_rounds=100, verbose_eval=100)
val_pred_xgb_log = model_xgb.predict(dval_xgb, iteration_range=(0, model_xgb.best_iteration))
val_rmse_xgb_log = np.sqrt(mean_squared_error(val_y, val_pred_xgb_log))
print("XGBoost val RMSE (log1p):", val_rmse_xgb_log)
-------------------------
Create OOF predictions for stacking (TimeSeriesSplit)
n_splits = 5
tscv = TimeSeriesSplit(n_splits=n_splits)
oof_lgb = np.zeros(len(train_X))
oof_xgb = np.zeros(len(train_X))
fold = 0
print("\nCreating OOF predictions for stacking...")
for tr_idx, va_idx in tscv.split(train_X):
fold += 1
print("Fold", fold)
# lightgbm fold
dtr = lgb.Dataset(train_X.iloc[tr_idx], label=train_y.iloc[tr_idx])
dva = lgb.Dataset(train_X.iloc[va_idx], label=train_y.iloc[va_idx], reference=dtr)
bst = lgb.train(lgb_params, dtr, num_boost_round=3000, valid_sets=[dva],
callbacks=[lgb.early_stopping(100, verbose=False)])
oof_lgb[va_idx] = bst.predict(train_X.iloc[va_idx], num_iteration=bst.best_iteration)
# xgb fold
dxtr = xgb.DMatrix(train_X.iloc[tr_idx], label=train_y.iloc[tr_idx])
dxva = xgb.DMatrix(train_X.iloc[va_idx], label=train_y.iloc[va_idx])
bst_x = xgb.train(xgb_params, dxtr, num_boost_round=3000, evals=[(dxva,'val')],
early_stopping_rounds=100, verbose_eval=False)
oof_xgb[va_idx] = bst_x.predict(dxva, iteration_range=(0, bst_x.best_iteration))
Meta-model (Ridge)
stack_X = np.vstack([oof_lgb, oof_xgb]).T
meta = Ridge(alpha=1.0)
meta.fit(stack_X, train_y)
print("Meta-model trained (Ridge).")
-------------------------
Iterative forecasting for test using recursive approach
We'll maintain 'history' (train prices + predicted test prices) and create features per step
history = pd.concat([train[['id','price']], pd.DataFrame({'id': test['id'].values, 'price': [np.nan]*len(test)})], ignore_index=True)
def make_row_features(history_df, idx):
# Build features for row idx based on history up to idx-1
tmp = history_df.loc[:idx].copy().reset_index(drop=True)
tmp = create_features_full(tmp)
# last row is current
feat_row = tmp.iloc[-1:][FEATURES].fillna(0)
return feat_row
print("\nForecasting test iteratively (recursive)...")
preds_test_log = []
for i in range(len(train), len(history)):
feat = make_row_features(history, i)
# LGB predict (log-space)
try:
p_lgb = lgbm.predict(feat, num_iteration=lgbm.best_iteration)[0]
except Exception as e:
p_lgb = val_pred_lgb_log.mean() # fallback
try:
dx = xgb.DMatrix(feat)
p_xgb = model_xgb.predict(dx, iteration_range=(0, model_xgb.best_iteration))[0]
except Exception as e:
p_xgb = val_pred_xgb_log.mean()
# meta predict (input are log-space preds)
meta_in = np.array([[p_lgb, p_xgb]])
p_meta_log = meta.predict(meta_in)[0]
# convert to price space for storing in history
p_price = float(np.expm1(p_meta_log))
if p_price < 0:
p_price = 0.0
history.at[i, 'price'] = p_price
preds_test_log.append(p_meta_log)
preds_test_price = np.expm1(np.array(preds_test_log))
ensure length
if len(preds_test_price) != len(test):
preds_test_price = np.resize(preds_test_price, len(test))
-------------------------
Final submission
submission = sample_sub.copy()
assume column name for predictions is 'price' or second column
if 'price' in submission.columns:
submission['price'] = preds_test_price
else:
submission.iloc[:,1] = preds_test_price
submission['price'] = submission['price'].round(3)
submission.to_csv('submission.csv', index=False)
print("\nSaved submission.csv")
Diagnostics: compute final stacked val RMSE in price space
val_pred_lgb_price = np.expm1(val_pred_lgb_log)
val_pred_xgb_price = np.expm1(val_pred_xgb_log)
val_stack = np.vstack([val_pred_lgb_log, val_pred_xgb_log]).T
val_meta_pred_log = meta.predict(val_stack)
val_meta_rmse_price = np.sqrt(mean_squared_error(np.expm1(val_y), np.expm1(val_meta_pred_log)))
print("Final stacked val RMSE (price space):", val_meta_rmse_price)
print("LightGBM val RMSE (price space):", np.sqrt(mean_squared_error(np.expm1(val_y), val_pred_lgb_price)))
print("XGBoost val RMSE (price space):", np.sqrt(mean_squared_error(np.expm1(val_y), val_pred_xgb_price)))
download file in Colab
files.download('submission.csv')
print("Done. ดาวน์โหลด submission.csv ได้เลย")
เริ่มต้นโหลดไฟล์ train.csv, test.csv และ sample_submission.csv
ตรวจสอบว่าไฟล์อยู่ครบและชื่อถูกต้อง
เรียงข้อมูลตาม id แล้วแปลง price ให้เป็น float
รวม train และ test เข้าด้วยกันเพื่อทำ feature engineering ให้ต่อเนื่อง
สร้างฟีเจอร์จากราคา เช่น lag, rolling mean, rolling std, ewm, diff, pct_change และฟีเจอร์ seasonality
ทำความสะอาดข้อมูลแทนค่า inf และ NaN
แยกกลับเป็น train_feat และ test_feat
ลบแถวที่ยังมี NaN (เฉพาะฝั่ง train)
สร้างคอลัมน์ target โดยใช้ log1p(price)
แบ่งข้อมูลฝั่ง train เป็น train_X และ val_X โดยใช้ข้อมูลชุดสุดท้ายเป็น validation
ตั้งค่า Optuna เพื่อค้นหา LightGBM parameters ที่เหมาะที่สุด
รัน Optuna แบบ time-series cross-validation 3 รอบ
ดึงค่าพารามิเตอร์ที่ดีที่สุด (ถ้า error จะใช้ค่า default)
เทรน LightGBM บน train_X และหยุดเมื่อ validation ไม่ดีขึ้น
เทรน XGBoost ด้วย early stopping เช่นกัน
สร้าง OOF predictions ด้วย TimeSeriesSplit 5 fold สำหรับใช้ฝึก meta-model
เทรน Ridge Regression เพื่อใช้เป็น meta-model ของ Stacking
เริ่มพยากรณ์ข้อมูล test แบบวนทีละแถว (recursive)
ทุกครั้งที่ทำนายได้ จะเติมค่าที่ได้กลับเข้า history แล้วสร้างฟีเจอร์ใหม่
รวมผลทำนายของ LightGBM และ XGBoost ผ่าน meta-model
นำผลทั้งหมดมาใส่ในไฟล์ submission.csv
บันทึกไฟล์และพร้อมส่ง
Data Preprocessing (อธิบายการเตรียมข้อมูล)
1. โหลดไฟล์
ดึง train, test และ sample_submission เข้ามา แล้วตรวจสอบว่าไฟล์อยู่ในโฟลเดอร์และชื่อถูกต้อง
2. จัดเรียงข้อมูลตาม id
เนื่องจากเป็นข้อมูลแบบ time-series การเรียงลำดับให้ถูกต้องสำคัญมาก
3. รวม train + test ชั่วคราว
ทำเพื่อให้การสร้างฟีเจอร์ที่อิงข้อมูลย้อนหลัง เช่น lag หรือ rolling ไม่ขาดช่วง
4. Feature Engineering หลักที่ใช้
• Lag features เช่น lag_1, lag_3, lag_7, lag_30
ใช้เก็บราคาย้อนหลังตามจำนวนวันต่างๆ
• Rolling windows ค่าเฉลี่ย, ส่วนเบี่ยงเบนมาตรฐาน และ median
• EWMA (Exponential Weighted Mean)
• Expanding mean แนวโน้มสะสม
• Diff และ percentage change เพื่อตรวจจับการเปลี่ยนแปลง
• Seasonality เช่น id mod 7 (รูปแบบรายสัปดาห์), id mod 30 (วงรอบเดือน)
• Ratio feature เช่น lag1 / lag7
5. ลบค่า NaN ที่ฝั่ง train
เฉพาะ train เท่านั้นที่ต้องทำให้สะอาด ส่วน test ใช้ ffill + bfill
6. สร้าง target = log1p(price)
ช่วยให้โมเดลเรียนรู้ได้ง่ายขึ้นและลดความแปรปรวนในข้อมูลราคา
Sequential Model + Hyperparameters
สเต็ปการสร้างโมเดลมี 3 ชั้น
⸻
1) LightGBM (Base Model ตัวที่ 1)
ใช้ Optuna ค้นหา:
• learning_rate
• num_leaves
• min_data_in_leaf
• feature_fraction
• bagging_fraction
• lambda_l1, lambda_l2
เทรนด้วย early stopping เพื่อป้องกัน overfitting
ใช้ metric = RMSE
⸻
2) XGBoost (Base Model ตัวที่ 2)
ตั้งค่าพารามิเตอร์แบบคงที่ให้ค่อนข้าง stable เช่น
• eta = 0.03
• max_depth = 6
• subsample = 0.8
• colsample_bytree = 0.8
เทรนด้วย early stopping เช่นกัน
⸻
3) Ridge Regression (Meta-model)
ใช้รวมผลของทั้ง LGB + XGB
โดยป้อนค่าทำนายของสองโมเดลเป็น feature
สามารถลด noise และเฉลี่ยความแม่นยำของทั้งสองโมเดลให้ดีขึ้น
Validation ที่ใช้ใน Pipeline นี้
ตรวจสอบโมเดล 3 ระดับ
1) Hold-out Validation (Time-series split)
ใช้ข้อมูลช่วงท้ายสุดประมาณ 10% เป็น validation
เหมาะกับข้อมูลที่มีลำดับเวลา
2) Optuna Cross-validation (TimeSeriesSplit 3 fold)
ช่วยให้เลือก hyperparameter ที่เหมาะที่สุดโดยไม่เสี่ยงต่อการฟลุ๊ค
3) OOF validation สำหรับ stacking (5 fold)
ใช้สร้างข้อมูลฝึก meta-model แบบไม่เกิด data leakage
ทำให้ stacking มีความแม่นยำขึ้น
This content originally appeared on DEV Community and was authored by Paozakai
Paozakai | Sciencx (2025-11-21T02:15:31+00:00) Stock Price Prediction With Machine Learning Model. Retrieved from https://www.scien.cx/2025/11/21/stock-price-prediction-with-machine-learning-model/
Please log in to upload a file.
There are no updates yet.
Click the Upload button above to add an update.