all9 분석 도구
민광석
2024-11-17 17:10
181
0
0
본문
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from prophet import Prophet
from sklearn.ensemble import RandomForestRegressor
import xgboost as xgb
import lightgbm as lgb
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder
from sklearn.model_selection import GridSearchCV, TimeSeriesSplit
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.feature_extraction import FeatureHasher
import statsmodels.api as sm
from neuralprophet import NeuralProphet
from scipy.stats import ttest_ind, wilcoxon, pearsonr
from scipy.sparse import csr_matrix, hstack
import warnings
import re
import logging
import sys
from multiprocessing import cpu_count
import gc
# 로그 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s:%(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("modeling.log")
]
)
warnings.filterwarnings("ignore")
class DataProcessor:
def __init__(self):
self.df_sales = None
self.df_life = None
self.merged_df = None
self.grouped_df = None
def get_data_from_db(self, query):
try:
db_user = ''
db_password = ''
db_host = ''
db_name = 'g2bsaledb'
engine = create_engine(f'mysql+pymysql://{db_user}:{db_password}@{db_host}/{db_name}')
with engine.connect() as conn:
df = pd.read_sql(query, conn)
logging.info(f"쿼리 성공: {query.splitlines()[0].strip()}...")
return df
except Exception as e:
logging.error(f"데이터베이스 연결 오류: {e}")
return None
def optimize_dtypes(self, df):
try:
for col in df.select_dtypes(include=['int64']).columns:
df[col] = df[col].astype('int32')
for col in df.select_dtypes(include=['float64']).columns:
df[col] = df[col].astype('float32')
return df
except Exception as e:
logging.error(f"데이터 타입 최적화 오류: {e}")
return df
def apply_one_hot_encoding(self, df, columns):
try:
for col in columns:
df[col] = df[col].astype(str)
logging.info(f"{col} 고유값 개수: {df[col].nunique()}")
logging.info(f"{col} 샘플 값: {df[col].unique()[:5]}")
ohe = OneHotEncoder(drop='first', sparse_output=True, dtype=np.int8)
ohe_features = ohe.fit_transform(df[columns])
logging.info(f"원-핫 인코딩 변환 후 형태: {ohe_features.shape}")
return ohe_features
except Exception as e:
logging.error(f"원-핫 인코딩 오류: {e}")
return None
def apply_feature_hashing(self, df, columns, n_features=100): # n_features 기본값을 100으로 변경
try:
# 각 컬럼별로 별도의 리스트 생성
text_features = []
for idx in range(len(df)):
# 각 행의 모든 컬럼 값을 리스트로 만듦
row_features = [str(df[col].iloc[idx]) for col in columns]
text_features.append(row_features)
logging.info(f"특성 해싱 입력 데이터 샘플: {text_features[:2]}")
logging.info(f"특성 해싱 입력 데이터 형태: {len(text_features)} rows")
hasher = FeatureHasher(n_features=n_features, input_type='string')
hashed_features = hasher.transform(text_features)
logging.info(f"특성 해싱 변환 후 형태: {hashed_features.shape}")
return hashed_features
except Exception as e:
logging.error(f"특성 해싱 오류: {e}")
logging.error(f"columns: {columns}")
logging.error(f"데이터 샘플: {df[columns].head()}")
return None
def convert_date(self, date_str):
try:
date = pd.to_datetime(str(date_str), format='%Y%m%d')
if date.year < 1900 or date.year > 2100:
logging.warning(f"비정상적인 날짜: {date_str}")
return None
return date
except Exception as e:
logging.error(f"날짜 변환 오류 - 입력값: {date_str}, 오류: {e}")
return None
def preprocess_data(self):
if self.df_sales is None or self.df_life is None:
logging.error("데이터가 충분하지 않습니다.")
return None
try:
logging.info(f"df_sales 행 수 (원본): {len(self.df_sales)}")
logging.info(f"df_life 행 수 (원본): {len(self.df_life)}")
# 날짜 변환
self.df_sales['납품요구접수일자'] = self.df_sales['납품요구접수일자'].apply(self.convert_date)
logging.info(f"날짜 범위: {self.df_sales['납품요구접수일자'].min()} ~ {self.df_sales['납품요구접수일자'].max()}")
# 결측치 처리
self.df_sales.dropna(subset=['물품분류번호', '납품요구접수일자', '수요기관코드', '품명'], inplace=True)
self.df_life.dropna(subset=['물품분류번호', '내용연수'], inplace=True)
self.df_sales['물품분류번호'] = self.df_sales['물품분류번호'].astype(str).str.strip()
self.df_life['물품분류번호'] = self.df_life['물품분류번호'].astype(str).str.strip()
logging.info(f"df_sales 행 수 (dropna 후): {len(self.df_sales)}")
logging.info(f"df_life 행 수 (dropna 후): {len(self.df_life)}")
logging.info(f"df_sales '물품분류번호' 데이터 타입: {self.df_sales['물품분류번호'].dtype}")
logging.info(f"df_life '물품분류번호' 데이터 타입: {self.df_life['물품분류번호'].dtype}")
self.merged_df = pd.merge(self.df_sales, self.df_life, on='물품분류번호', how='inner')
logging.info(f"병합된 데이터 행 수: {len(self.merged_df)}")
self.merged_df = self.optimize_dtypes(self.merged_df)
ohe_columns = ['물품분류번호']
ohe_features = self.apply_one_hot_encoding(self.merged_df, ohe_columns)
hash_columns = ['수요기관코드', '품명']
hashed_features = self.apply_feature_hashing(self.merged_df, hash_columns, n_features=100)
numeric_features = self.merged_df.select_dtypes(include=['int32', 'float32']).values
if ohe_features is not None and hashed_features is not None:
final_features = hstack([
csr_matrix(numeric_features),
ohe_features,
hashed_features
])
logging.info(f"전처리된 데이터의 최종 형태: {final_features.shape}")
return final_features
else:
logging.error("특성 변환 중 오류가 발생했습니다.")
return None
except Exception as e:
logging.error(f"데이터 전처리 오류: {e}")
return None
import tensorflow as tf
class ForecastModels:
def __init__(self, features, original_df):
try:
if features is None or original_df is None:
raise ValueError("입력 데이터가 없습니다.")
if features.shape[0] != len(original_df):
raise ValueError("특성과 원본 데이터의 길이가 일치하지 않습니다.")
self.features = features
self.original_df = original_df
self.predictions = {}
self.split_index = int(features.shape[0] * 0.8)
# sparse matrix를 분할할 때 올바른 방법 사용
self.X_train = features[:self.split_index]
self.X_test = features[self.split_index:]
self.y_train = original_df['내용연수'].values[:self.split_index]
self.y_test = original_df['내용연수'].values[self.split_index:]
# TimeSeriesSplit 설정
self.tscv = TimeSeriesSplit(n_splits=5)
logging.info(f"학습 데이터 형태: {self.X_train.shape}")
logging.info(f"테스트 데이터 형태: {self.X_test.shape}")
logging.info(f"학습 레이블 수: {len(self.y_train)}")
logging.info(f"테스트 레이블 수: {len(self.y_test)}")
except Exception as e:
logging.error(f"ForecastModels 초기화 오류: {e}")
raise
def run_prophet(self):
try:
logging.info("Prophet 모델 학습 시작...")
# 날짜별 내용연수의 평균 계산
df_prophet = self.original_df.groupby('납품요구접수일자')['내용연수'].agg(['mean', 'count']).reset_index()
df_prophet.columns = ['ds', 'y', 'count']
# 날짜별 통계 로깅
daily_stats = df_prophet.head()
logging.info(f"일별 구매 통계:\n{daily_stats}")
train_df = df_prophet[:self.split_index].copy()
model = Prophet(
yearly_seasonality=True,
weekly_seasonality=True,
daily_seasonality=True,
interval_width=0.95
)
model.fit(train_df[['ds', 'y']])
future_dates = df_prophet[self.split_index:]['ds']
future = pd.DataFrame({'ds': future_dates})
forecast = model.predict(future)
# 예측값을 원래 데이터의 길이에 맞게 확장
full_predictions = np.repeat(forecast['yhat'].values, df_prophet[self.split_index:]['count'])
self.predictions['Prophet'] = full_predictions[:len(self.y_test)]
mse = mean_squared_error(self.y_test, self.predictions['Prophet'])
logging.info(f"Prophet MSE: {mse:.4f}")
del model, forecast
gc.collect()
except Exception as e:
logging.error(f"Prophet 모델 오류: {e}")
def run_random_forest(self):
try:
logging.info("RandomForest 모델 학습 시작...")
model = RandomForestRegressor(
n_estimators=100,
max_depth=7,
n_jobs=-1,
random_state=42
)
# 교차 검증
cv_scores = []
for train_idx, val_idx in self.tscv.split(self.X_train):
X_cv_train = self.X_train[train_idx]
X_cv_val = self.X_train[val_idx]
y_cv_train = self.y_train[train_idx]
y_cv_val = self.y_train[val_idx]
model.fit(X_cv_train, y_cv_train)
pred = model.predict(X_cv_val)
cv_scores.append(mean_squared_error(y_cv_val, pred))
logging.info(f"RandomForest 교차 검증 MSE: {np.mean(cv_scores):.4f} (±{np.std(cv_scores):.4f})")
# 전체 학습 및 예측
model.fit(self.X_train, self.y_train)
self.predictions['RandomForest'] = model.predict(self.X_test)
mse = mean_squared_error(self.y_test, self.predictions['RandomForest'])
logging.info(f"RandomForest MSE: {mse:.4f}")
# 특성 중요도
feature_importance = pd.DataFrame({
'feature': range(self.X_train.shape[1]),
'importance': model.feature_importances_
})
feature_importance = feature_importance.sort_values('importance', ascending=False)
logging.info(f"상위 10개 중요 특성:\n{feature_importance.head(10)}")
del model
gc.collect()
except Exception as e:
logging.error(f"RandomForest 모델 오류: {e}")
def run_xgboost(self):
try:
logging.info("XGBoost 모델 학습 시작...")
model = xgb.XGBRegressor(
n_estimators=100,
learning_rate=0.1,
max_depth=7,
n_jobs=-1,
random_state=42
)
# 교차 검증
cv_scores = []
for train_idx, val_idx in self.tscv.split(self.X_train):
X_cv_train = self.X_train[train_idx]
X_cv_val = self.X_train[val_idx]
y_cv_train = self.y_train[train_idx]
y_cv_val = self.y_train[val_idx]
model.fit(X_cv_train, y_cv_train)
pred = model.predict(X_cv_val)
cv_scores.append(mean_squared_error(y_cv_val, pred))
logging.info(f"XGBoost 교차 검증 MSE: {np.mean(cv_scores):.4f} (±{np.std(cv_scores):.4f})")
# 전체 학습 및 예측
model.fit(self.X_train, self.y_train)
self.predictions['XGBoost'] = model.predict(self.X_test)
mse = mean_squared_error(self.y_test, self.predictions['XGBoost'])
logging.info(f"XGBoost MSE: {mse:.4f}")
del model
gc.collect()
except Exception as e:
logging.error(f"XGBoost 모델 오류: {e}")
def run_lightgbm(self):
try:
logging.info("LightGBM 모델 학습 시작...")
params = {
'objective': 'regression',
'metric': 'mse',
'num_leaves': 127,
'max_depth': 7,
'learning_rate': 0.1,
'feature_fraction': 0.8,
'bagging_fraction': 0.8,
'bagging_freq': 5,
'min_data_in_leaf': 100,
'n_estimators': 100,
'verbose': -1
}
model = lgb.LGBMRegressor(**params)
# 교차 검증
cv_scores = []
for train_idx, val_idx in self.tscv.split(self.X_train):
X_cv_train = self.X_train[train_idx]
X_cv_val = self.X_train[val_idx]
y_cv_train = self.y_train[train_idx]
y_cv_val = self.y_train[val_idx]
model.fit(X_cv_train, y_cv_train)
pred = model.predict(X_cv_val)
cv_scores.append(mean_squared_error(y_cv_val, pred))
logging.info(f"LightGBM 교차 검증 MSE: {np.mean(cv_scores):.4f} (±{np.std(cv_scores):.4f})")
# 전체 학습 및 예측
model.fit(self.X_train, self.y_train)
self.predictions['LightGBM'] = model.predict(self.X_test)
mse = mean_squared_error(self.y_test, self.predictions['LightGBM'])
logging.info(f"LightGBM MSE: {mse:.4f}")
del model
gc.collect()
except Exception as e:
logging.error(f"LightGBM 모델 오류: {e}")
def run_lstm(self):
try:
logging.info("LSTM 모델 학습 시작...")
# 데이터 정규화
scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(self.X_train.toarray())
X_test_scaled = scaler.transform(self.X_test.toarray())
# LSTM 입력 형태로 변환
X_train_reshaped = X_train_scaled.reshape((X_train_scaled.shape[0], 1, X_train_scaled.shape[1]))
X_test_reshaped = X_test_scaled.reshape((X_test_scaled.shape[0], 1, X_test_scaled.shape[1]))
# 모델 구성
model = Sequential([
LSTM(64, input_shape=(1, X_train_scaled.shape[1]), return_sequences=True),
Dropout(0.2),
LSTM(32),
Dropout(0.2),
Dense(16, activation='relu'),
Dense(1)
])
model.compile(optimizer='adam', loss='mse')
# 학습
batch_size = 1024
epochs = 10
model.fit(
X_train_reshaped,
self.y_train,
batch_size=batch_size,
epochs=epochs,
verbose=0
)
# 예측
self.predictions['LSTM'] = model.predict(X_test_reshaped, batch_size=batch_size).flatten()
mse = mean_squared_error(self.y_test, self.predictions['LSTM'])
logging.info(f"LSTM MSE: {mse:.4f}")
del model
gc.collect()
except Exception as e:
logging.error(f"LSTM 모델 오류: {e}")
def run_neural_prophet(self):
try:
logging.info("NeuralProphet 모델 학습 시작...")
# 데이터 준비
df_prophet = self.original_df.groupby('납품요구접수일자')['내용연수'].mean().reset_index()
df_prophet.columns = ['ds', 'y']
# 날짜별 통계 로깅
daily_stats = df_prophet.head()
logging.info(f"NeuralProphet 일별 통계:\n{daily_stats}")
# 데이터 샘플링 (필요한 경우)
sample_size = min(100000, len(df_prophet))
df_prophet = df_prophet.sample(n=sample_size, random_state=42) if len(df_prophet) > sample_size else df_prophet
train_df = df_prophet[:self.split_index].copy()
model = NeuralProphet(
batch_size=64,
learning_rate=0.01,
epochs=50,
daily_seasonality=True,
weekly_seasonality=True,
yearly_seasonality=True
)
# 학습
model.fit(train_df)
# 예측
future = model.make_future_dataframe(df_prophet[self.split_index:])
forecast = model.predict(future)
# 예측값을 원래 데이터의 길이에 맞게 확장
predictions = np.repeat(forecast['yhat1'].values, df_prophet[self.split_index:]['count'])
self.predictions['NeuralProphet'] = predictions[:len(self.y_test)]
mse = mean_squared_error(self.y_test, self.predictions['NeuralProphet'])
logging.info(f"NeuralProphet MSE: {mse:.4f}")
del model, forecast
gc.collect()
except Exception as e:
logging.error(f"NeuralProphet 모델 오류: {e}")
def run_arima(self):
try:
logging.info("ARIMA 모델 학습 시작...")
# 시계열 데이터 준비
ts_series = pd.Series(
self.y_train,
index=self.original_df['납품요구접수일자'].values[:self.split_index]
)
# ARIMA 모델 학습
model = sm.tsa.ARIMA(ts_series, order=(2,1,2))
results = model.fit()
# 예측
forecast = results.forecast(steps=len(self.y_test))
self.predictions['ARIMA'] = forecast.values
mse = mean_squared_error(self.y_test, self.predictions['ARIMA'])
logging.info(f"ARIMA MSE: {mse:.4f}")
del model, results, forecast
gc.collect()
except Exception as e:
logging.error(f"ARIMA 모델 오류: {e}")
def run_all_models(self):
try:
logging.info("모든 모델 학습 시작...")
models = {
'Prophet': self.run_prophet,
'RandomForest': self.run_random_forest,
'XGBoost': self.run_xgboost,
'LightGBM': self.run_lightgbm,
'LSTM': self.run_lstm,
'NeuralProphet': self.run_neural_prophet,
'ARIMA': self.run_arima
}
for name, func in models.items():
try:
logging.info(f"{name} 모델 학습 중...")
func()
gc.collect()
except Exception as e:
logging.error(f"{name} 모델 실행 실패: {e}")
continue
logging.info("모든 모델 학습 완료")
# 모델 성능 요약
for model_name, predictions in self.predictions.items():
if len(predictions) == len(self.y_test):
mse = mean_squared_error(self.y_test, predictions)
mae = mean_absolute_error(self.y_test, predictions)
r2 = r2_score(self.y_test, predictions)
logging.info(f"{model_name} 최종 성능:")
logging.info(f"- MSE: {mse:.4f}")
logging.info(f"- MAE: {mae:.4f}")
logging.info(f"- R2: {r2:.4f}")
except Exception as e:
logging.error(f"모델 학습 중 오류 발생: {e}")
import traceback
logging.error(traceback.format_exc())
class ReportGenerator:
def __init__(self, original_df, predictions, split_index):
self.original_df = original_df
self.predictions = predictions
self.split_index = split_index
self.y_test = original_df['내용연수'].values[split_index:]
def generate_model_performance_report(self):
try:
logging.info("모델 성능 보고서 생성 시작...")
performance_metrics = {}
for model_name, pred in self.predictions.items():
if len(pred) == len(self.y_test):
metrics = {
'MSE': mean_squared_error(self.y_test, pred),
'MAE': mean_absolute_error(self.y_test, pred),
'R2': r2_score(self.y_test, pred)
}
performance_metrics[model_name] = metrics
df_metrics = pd.DataFrame(performance_metrics).T
df_metrics.to_csv('model_performance_metrics.csv', encoding='utf-8-sig')
logging.info("모델 성능 보고서가 생성되었습니다.")
logging.info("\n" + str(df_metrics))
except Exception as e:
logging.error(f"모델 성능 보고서 생성 오류: {e}")
def generate_statistical_analysis_report(self):
try:
logging.info("통계 분석 보고서 생성 시작...")
statistical_tests = {}
for model_name, pred in self.predictions.items():
if len(pred) == len(self.y_test):
t_stat, t_pval = ttest_ind(self.y_test, pred)
w_stat, w_pval = wilcoxon(self.y_test, pred)
corr, p_val = pearsonr(self.y_test, pred)
statistical_tests[model_name] = {
't_statistic': t_stat,
't_pvalue': t_pval,
'wilcoxon_statistic': w_stat,
'wilcoxon_pvalue': w_pval,
'correlation': corr,
'correlation_pvalue': p_val
}
df_stats = pd.DataFrame(statistical_tests).T
df_stats.to_csv('statistical_analysis.csv', encoding='utf-8-sig')
logging.info("통계 분석 보고서가 생성되었습니다.")
logging.info("\n" + str(df_stats))
except Exception as e:
logging.error(f"통계 분석 보고서 생성 오류: {e}")
def generate_detailed_statistical_analysis_report(self):
try:
logging.info("상세 통계 분석 보고서 생성 시작...")
detailed_stats = {}
for model_name, pred in self.predictions.items():
if len(pred) == len(self.y_test):
detailed_stats[model_name] = {
'Mean_Actual': np.mean(self.y_test),
'Mean_Predicted': np.mean(pred),
'Std_Actual': np.std(self.y_test),
'Std_Predicted': np.std(pred),
'Min_Actual': np.min(self.y_test),
'Min_Predicted': np.min(pred),
'Max_Actual': np.max(self.y_test),
'Max_Predicted': np.max(pred)
}
df_detailed = pd.DataFrame(detailed_stats).T
df_detailed.to_csv('detailed_statistical_analysis.csv', encoding='utf-8-sig')
logging.info("상세 통계 분석 보고서가 생성되었습니다.")
logging.info("\n" + str(df_detailed))
except Exception as e:
logging.error(f"상세 통계 분석 보고서 생성 오류: {e}")
def generate_life_performance_report(self):
try:
model_report = pd.DataFrame({
'품명': self.original_df['품명'],
'Actual': self.original_df['내용연수']
})
for model_name, pred in self.predictions.items():
if len(pred) < len(model_report):
pred = np.append(pred, [np.nan] * (len(model_report) - len(pred)))
model_report[model_name] = pred
model_report.to_csv('model_life_performance_report.csv',
index=False,
encoding='utf-8-sig')
logging.info("모델 연수 성능 비교 보고서가 CSV 파일로 생성되었습니다.")
except Exception as e:
logging.error(f"모델 연수 성능 보고서 생성 오류: {e}")
def generate_all_reports(self):
try:
self.generate_model_performance_report()
self.generate_statistical_analysis_report()
self.generate_detailed_statistical_analysis_report()
self.generate_life_performance_report()
logging.info("모든 보고서 생성이 완료되었습니다.")
except Exception as e:
logging.error(f"보고서 생성 중 오류 발생: {e}")
if __name__ == "__main__":
try:
# 데이터 처리
data_processor = DataProcessor()
# SQL 쿼리
query_sales = """
SELECT
CAST(납품요구접수일자 AS CHAR) AS 납품요구접수일자,
CAST(물품분류번호 AS CHAR) AS 물품분류번호,
품명,
수요기관코드
FROM
g2b_data
"""
query_life = """
SELECT
물품분류번호,
내용연수
FROM
g2blifedb
"""
# 데이터 로드 및 처리
logging.info("판매 데이터 로딩 시작...")
data_processor.df_sales = data_processor.get_data_from_db(query_sales)
logging.info(f"판매 데이터 로딩 완료. 행 수: {len(data_processor.df_sales) if data_processor.df_sales is not None else 0}")
logging.info("수명 데이터 로딩 시작...")
data_processor.df_life = data_processor.get_data_from_db(query_life)
logging.info(f"수명 데이터 로딩 완료. 행 수: {len(data_processor.df_life) if data_processor.df_life is not None else 0}")
df_sparse = data_processor.preprocess_data()
if df_sparse is not None and df_sparse.shape[0] > 0:
# 모델 예측
forecast_models = ForecastModels(df_sparse, data_processor.merged_df)
forecast_models.run_all_models()
# 보고서 생성
report_generator = ReportGenerator(data_processor.merged_df, forecast_models.predictions, forecast_models.split_index)
report_generator.generate_all_reports()
# 메모리 최적화
del data_processor, forecast_models
gc.collect()
else:
logging.error("유효한 데이터가 없습니다.")
except Exception as e:
logging.error(f"메인 실행 오류: {e}")
0
로그인 후 추천 또는 비추천하실 수 있습니다.
민광석
회원등급 : 최고관리자
포인트 1,800
경험치 233
[레벨 2] - 진행률
9%
가입일
2024-10-21 11:52:45
댓글목록0