import numpy as np
import pandas as pd
import os,sys
import random
import re
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.colors import BoundaryNorm
from keras import models
from keras import layers
from keras.optimizers import SGD, Adadelta
from keras.callbacks import EarlyStopping, ModelCheckpoint
import datetime
import h5py
from sklearn.metrics import roc_curve, auc
from mpl_toolkits.axes_grid.inset_locator import inset_axes, InsetPosition
%matplotlib inline
pd.set_option('display.max_columns',40)
def show_images(indice, ref_images, new_images, sub_images, title=None, Npanel=None, labels=None, classes=None, labelAsValue=True, labelAsFloat=False, save_name=None):
if Npanel == None:
Npanel = (len(indice)-1)/10 + 1
for ii in range(Npanel):
fig = plt.figure(figsize=(14, 14))
if title != None:
fig.suptitle(title+' (%d)' % (ii+1), fontsize=16)
n = 0
for i in indice[ii*10:]:
ref_ax = fig.add_subplot(5, 6, 3*n+0+1)
ref_ax.set_xticks([])
ref_ax.set_yticks([])
ref_ax.imshow(ref_images[i].reshape(29,29),
cmap=plt.cm.gray,
#norm=BoundaryNorm(np.linspace(ref_images[i].min(), ref_images[i].min()+0.2, 256), ncolors=256),
interpolation='nearest')
new_ax = fig.add_subplot(5, 6, 3*n+1+1)
new_ax.set_xticks([])
new_ax.set_yticks([])
new_ax.imshow(new_images[i].reshape(29,29),
cmap=plt.cm.gray,
#norm=BoundaryNorm(np.linspace(ref_images[i].min(), ref_images[i].mean()+500, 256), ncolors=256),
interpolation='nearest')
sub_ax = fig.add_subplot(5, 6, 3*n+2+1)
sub_ax.set_xticks([])
sub_ax.set_yticks([])
sub_ax.imshow(sub_images[i].reshape(29,29),
cmap=plt.cm.gray,
#norm=BoundaryNorm(np.linspace(ref_images[i].min(), ref_images[i].min()+2.0, 256), ncolors=256),
interpolation='nearest')
if labels is not None and classes is not None:
ref_ax.set_title('ref:{:1.0f}/{:.2f} ({})'.format(labels[i], classes[i], i))
new_ax.set_title('new:{:1.0f}/{:.2f} ({})'.format(labels[i], classes[i], i))
sub_ax.set_title('sub:{:1.0f}/{:.2f} ({})'.format(labels[i], classes[i], i))
#ax.set_title('%d' % (i))
elif labels is not None:
if labelAsValue:
if labelAsFloat:
ref_ax.set_title('ref:%f' % (labels[i]))
new_ax.set_title('new:%f' % (labels[i]))
sub_ax.set_title('sub:%f' % (labels[i]))
else:
ref_ax.set_title('ref:{}'.format(labels[i]))
new_ax.set_title('new:{}'.format(labels[i]))
sub_ax.set_title('sub:{}'.format(labels[i]))
else:
ref_ax.set_title('ref:%d' % (i))
new_ax.set_title('new:%d' % (i))
sub_ax.set_title('sub:%d' % (i))
else:
ref_ax.set_title('ref:%d' % (i))
new_ax.set_title('new:%d' % (i))
sub_ax.set_title('sub:%d' % (i))
n += 1
if n == 10:
break
if save_name is not None: plt.savefig(save_name+'_{}.png'.format(ii))
plt.show()
plt.close()
artificial_real_params = pd.read_csv('data/artificial_real.csv')
print 'Loaded: ', artificial_real_params.shape
artifact_params = pd.read_csv('data/artifact.csv')
print 'Loaded: ', artifact_params.shape
artificial_real_images = np.load('data/artificial_real.npy')
print 'Loaded: ', artificial_real_images.shape
artifact_images = np.load('data/artifact.npy')
print 'Loaded: ', artifact_images.shape
print artificial_real_params[artificial_real_params['fits_det_id']==111].shape
print artificial_real_params[(artificial_real_params['fits_det_id']==111)&
(artificial_real_params['object_type']=='rand_artificial_real')].shape
print artificial_real_params[(artificial_real_params['fits_det_id']==111)&
(artificial_real_params['object_type']=='galx_artificial_real')].shape
artificial_real_params[:5]
index = artificial_real_params[(artificial_real_params['fits_det_id']==111)&
(artificial_real_params['object_type']=='galx_artificial_real')].index.values
show_images(index[:10],*np.split(artificial_real_images, 3, axis=3),
#save_name='psstar_and_dist_less_than_3arcsec',
title='Artificial Real (DET 111: Around Galaxy)')
index = artificial_real_params[(artificial_real_params['fits_det_id']==111)&
(artificial_real_params['object_type']=='rand_artificial_real')].index.values
show_images(index[:10],*np.split(artificial_real_images, 3, axis=3),
#save_name='psstar_and_dist_less_than_3arcsec',
title='Artificial Real (DET 111: Random)')
def normalize_image(ref_image_nd, new_image_nd, sub_image_nd):
ref_image_nd = ref_image_nd.reshape(ref_image_nd.shape[0],ref_image_nd.shape[1])
new_image_nd = new_image_nd.reshape(new_image_nd.shape[0],new_image_nd.shape[1])
sub_image_nd = sub_image_nd.reshape(sub_image_nd.shape[0],sub_image_nd.shape[1])
ref_image_nd = (ref_image_nd-ref_image_nd.min())/(ref_image_nd.max()-ref_image_nd.min())
new_image_nd = (new_image_nd-new_image_nd.min())/(new_image_nd.max()-new_image_nd.min())
sub_image_nd = (sub_image_nd-sub_image_nd.min())/(sub_image_nd.max()-sub_image_nd.min())
return [ref_image_nd, new_image_nd, sub_image_nd]
def make_learning_data(artificial_real_images, artifact_images, use_number_of_sources=14000):
train_split = int(use_number_of_sources*5./7.)
val_split = int(train_split+use_number_of_sources*1./7.)
test_split = int(val_split+use_number_of_sources*1./7.)
artificial_real_labels = np.ones(test_split, dtype=np.bool)
artifact_labels = np.zeros(test_split, dtype=np.bool)
learning_data = {}
learning_data['image_train'] = np.concatenate([artificial_real_images[0 :train_split],
artifact_images[0 :train_split]], axis=0)
learning_data['image_val'] = np.concatenate([artificial_real_images[train_split:val_split ],
artifact_images[train_split:val_split ]], axis=0)
learning_data['image_test'] = np.concatenate([artificial_real_images[val_split :test_split ],
artifact_images[val_split :test_split ]], axis=0)
learning_data['label_train'] = np.concatenate([artificial_real_labels[0 :train_split],
artifact_labels[0 :train_split]], axis=0)
learning_data['label_val'] = np.concatenate([artificial_real_labels[train_split:val_split ],
artifact_labels[train_split:val_split ]], axis=0)
learning_data['label_test'] = np.concatenate([artificial_real_labels[val_split :test_split ],
artifact_labels[val_split :test_split ]], axis=0)
#learning_data['param_train'] = np.concatenate([artificial_real_params[0 :train_split],
# artifact_params[0 :train_split]], axis=0)
#learning_data['param_val'] = np.concatenate([artificial_real_params[train_split:val_split ],
# artifact_params[train_split:val_split ]], axis=0)
#learning_data['param_test'] = np.concatenate([artificial_real_params[val_split :test_split ],
# artifact_params[val_split :test_split ]], axis=0)
print 'num of data : (train,val,test) = ({},{},{})'.format(learning_data['image_train'].shape[0],
learning_data['image_val'].shape[0],
learning_data['image_test'].shape[0])
return learning_data
def make_ml_model(det_num, learning_data, output_directory_path):
input_shape = (29, 29, 3)
epochs = 100
model = models.Sequential()
model.add(layers.Conv2D(32, kernel_size=(5,5), activation='relu', input_shape=input_shape))
model.add(layers.MaxPooling2D(pool_size=(2,2)))
model.add(layers.Conv2D(64, (3,3), activation='relu'))
model.add(layers.MaxPooling2D(pool_size=(2,2)))
model.add(layers.Dropout(0.3))
model.add(layers.Flatten())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer=Adadelta(), metrics=['accuracy'])
early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1)
checkpoint = ModelCheckpoint(filepath=os.path.join(output_directory_path,'model{}.hdf5'.format(det_num)),
save_best_only=True)
hist = model.fit(learning_data['image_train'], learning_data['label_train'],
epochs=epochs, batch_size=64, shuffle=True, verbose=False,
validation_data=(learning_data['image_val'], learning_data['label_val']),
callbacks=[early_stopping, checkpoint])
return model, hist
def evaluate_model(det_num, output_directory_path, model, hist, learning_data):
fig_file_name = str(det_num) + '_accuracy_loss.png'
fig_file_path = os.path.join(output_directory_path, fig_file_name)
fig = plt.figure()
fig.suptitle('Model accuracy/loss')
plt.plot(range(len(hist.history['val_acc'])), hist.history['val_acc'] , label='val_acc')
plt.plot(range(len(hist.history['val_loss'])), hist.history['val_loss'] , label='val_loss')
plt.plot(range(len(hist.history['acc'])), hist.history['acc'], '--' , label='test_acc')
plt.plot(range(len(hist.history['loss'])), hist.history['loss'], '--', label='test_loss')
plt.legend()
plt.xlabel('epoch')
plt.ylabel('accuracy / loss')
plt.savefig(fig_file_path, dpi=150)
#plt.show()
plt.close()
dat_file_name = str(det_num) + '_accuracy_loss.dat'
dat_file_path = os.path.join(output_directory_path, dat_file_name)
with open(dat_file_path, mode='w') as file_handle:
loss_and_accuracy = model.evaluate(learning_data['image_train'], learning_data['label_train'], verbose=False)
file_handle.write('Training Set : loss %5.4f accuracy %5.4f' % (loss_and_accuracy[0], loss_and_accuracy[1]))
loss_and_accuracy = model.evaluate(learning_data['image_val'], learning_data['label_val'], verbose=False)
file_handle.write('Validation Set : loss %5.4f accuracy %5.4f' % (loss_and_accuracy[0], loss_and_accuracy[1]))
loss_and_accuracy = model.evaluate(learning_data['image_test'], learning_data['label_test'], verbose=False)
file_handle.write('Test Set : loss %5.4f accuracy %5.4f' % (loss_and_accuracy[0], loss_and_accuracy[1]))
fig_file_name = str(det_num) + '_roc_curve.png'
fig_file_path = os.path.join(output_directory_path, fig_file_name)
prob = model.predict_proba(learning_data['image_test'], verbose=False)
fpr, tpr, thresholds = roc_curve(learning_data['label_test'], prob)
val_auc = auc(fpr, tpr)
fig = plt.figure(figsize=(8,6))
fig.suptitle("ROC curve for Test Set (AUC=%6.4f)" % (val_auc), fontsize=16)
ax = fig.add_subplot(111)
ax.plot(fpr, tpr, color='red')
ax.set_xlabel("False Positive Rate")
ax.set_ylabel("True Positive Rate")
ax.set_ylim(-0.05,1.05)
ax.set_xlim(-0.05,1.05)
ax.grid()
ax.set_aspect('equal')
iaxes = inset_axes(ax, width="50%", height="50%", loc=4)
ip = InsetPosition(ax, [0.4,0.1,0.5,0.5])
iaxes.set_axes_locator(ip)
iaxes.plot(fpr, tpr, color='red')
iaxes.set_xticks(np.arange(0.0, 0.10, 0.020))
iaxes.set_xticks(np.arange(0.0, 0.10, 0.005), minor=True)
iaxes.set_yticks(np.arange(0.90, 1.0, 0.020))
iaxes.set_yticks(np.arange(0.90, 1.0, 0.005), minor=True)
iaxes.set_xlim(0.00, 0.10)
iaxes.set_ylim(0.90, 1.00)
iaxes.grid()
iaxes.set_aspect('equal')
plt.savefig(fig_file_path, dpi=150)
#plt.show()
plt.close()
det_q_list = [1,2,3,4]
det_num_list = [11,12,13,14,15,16,
21,22,23,24,25,26,
31,32,33,34,35,
41,42,43,44]
det_num_list = [q*100+num for q in det_q_list for num in det_num_list]
output_root_directory_path = 'models'
np.random.seed(20200325)
random.seed(20200325)
for det_num in det_num_list:
output_directory_path = os.path.join(output_root_directory_path, str(det_num))
if not os.path.isdir(output_directory_path): os.makedirs(output_directory_path)
print 'DET: {}'.format(det_num)
indice_artificial_real_rand = artificial_real_params[(artificial_real_params['fits_det_id']==det_num)&
(artificial_real_params['object_type']=='rand_artificial_real')].index.values
indice_artificial_real_galx = artificial_real_params[(artificial_real_params['fits_det_id']==det_num)&
(artificial_real_params['object_type']=='galx_artificial_real')].index.values
indice_artifact = artifact_params[(artifact_params['fits_det_id']==det_num)&
(artifact_params['object_type']=='artifact')].index.values
indice_artificial_real = np.array(random.sample(indice_artificial_real_rand, 7000) +
random.sample(indice_artificial_real_galx, 7000))
indice_artifact = np.array(random.sample(indice_artifact, 14000))
np.random.shuffle(indice_artificial_real)
np.random.shuffle(indice_artifact)
artificial_real_normalized_images = np.array([np.stack(normalize_image(*np.split(images, 3, axis=2)), axis=-1)
for images in artificial_real_images[indice_artificial_real]])
artifact_normalized_images = np.array([np.stack(normalize_image(*np.split(images, 3, axis=2)), axis=-1)
for images in artifact_images[indice_artifact]])
learning_data = make_learning_data(artificial_real_normalized_images, artifact_normalized_images)
model, hist = make_ml_model(det_num, learning_data, output_directory_path)
evaluate_model(det_num, output_directory_path, model, hist, learning_data)