keras用得比较多,但是有时keras用起来不是很顺手。因为keras是基于theano或TensorFlow的,所以趁假期有空,先看看theano。
Classifying MNIST digits using Logistic Regression 官方文档:http://deeplearning.net/tutorial/logreg.html
一. 任务描述
用Logistic Regression回归做手写体数字识别,用theano实现。
二. 问题建模
Logistic Regression是一个线性的分类器,该模型包含权值矩阵W和偏移向量b。则输入数据x对应着分类为i的概率可以表示为: P(Y=i|x,W,b)=softmaxi(Wx+b)=eWix+bi∑jeWjx+bj 最后预测的类别为: yPRed=argmaxip(Y=i|x,W,b) 对于数据集D似然函数L,损失函数l可以表示为: L(θ={W,b},D)=∑i=0|D|log(P(Y=y(i)|x(i),W,b)) l(θ={W,b},D)=−L(θ={W,b},D)
三. 数据集
本实验用的数据集和之前一篇博客用的数据集一样,具体可以参考:http://blog.csdn.net/whai362/article/details/51813404 对于每个数字是一张28×28的灰度图,这里的做法是直接拉成784维的向量,然后归一化作为输入数据x,具体处理方法详见代码。
四. 依赖的第三方库
Theano 0.8.2OpenCV 3.0Python 2.7五. 实验代码
file_util.pyimport osdef read_dir(root, suffix = 'null'): file_path_list = [] for file_path, dirs, files in os.walk(root): for file in files: if suffix != 'null' and not (file.find(suffix) >= 0 and file.find(suffix) < len(file)): continue file_path_list.append(os.path.join(file_path, file)) file_path_list.sort() return file_path_listdef read_file(file_path): file_object = open(file_path, 'r') file_content = file_object.read() file_object.close() return file_content.decode('utf-8', 'ignore')def write_file(file_path, file_content): file_object = open(file_path, 'w') file_object.write(file_content) file_object.close()theano_logistic.pyimport theanoimport theano.tensor as Timport cv2import numpyimport timeitimport file_utilclass LogisticRegression(object): def __init__(self, input, n_in, n_out): self.W = theano.shared( value = numpy.zeros((n_in, n_out), dtype = theano.config.floatX), name = 'W', borrow = True) self.b = theano.shared( value = numpy.zeros((n_out,), dtype = theano.config.floatX), name = 'b', borrow = True) self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b) self.y_pred = T.argmax(self.p_y_given_x, axis = 1) self.params = [self.W, self.b] self.input = input def negative_log_likelihood(self, y): return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y]) def errors(self, y): if y.ndim != self.y_pred.ndim: raise TypeError( 'y should have the same shape as self.y_pred', ('y', y.type, 'y_pred', self.y_pred.type)) if y.dtype.startswith('int'): return T.mean(T.neq(self.y_pred, y)) else: raise NotImplementedError()def shared_data(data, borrow=True): shared = theano.shared( numpy.asarray(data, dtype = theano.config.floatX), borrow = borrow) return shareddef load_data(data_path): img_path_list = file_util.read_dir(data_path) img_num = len(img_path_list) data = [] label = [] for img_path in img_path_list: img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE) data.append(img.reshape((img.shape[0] * img.shape[1])) / 255.0) label.append((int)(img_path.split('/')[-1].split('.')[0])) data = numpy.mat(data) label = numpy.array(label) # print data.shape # print label.shape idx = numpy.random.permutation(img_num) data = data[idx] label = label[idx] train_data, valid_data, test_data = data[:img_num / 3], data[img_num / 3:img_num * 2 / 3], data[img_num * 2 / 3:] train_label, valid_label, test_label = label[:img_num / 3], label[img_num / 3:img_num * 2 / 3], label[img_num * 2 / 3:] # print train_data.shape # print valid_data.shape # print test_data.shape return train_data, train_label, valid_data, valid_label, test_data, test_labeldef sgd_optimization_mnist(data_path, learning_rate = 0.13, n_epochs = 1000, batch_size = 600): train_data, train_label, valid_data, valid_label, test_data, test_label = load_data(data_path) n_train_batches = train_data.shape[0] // batch_size n_valid_batches = valid_data.shape[0] // batch_size n_test_batches = test_data.shape[0] // batch_size print('Building the model...') index = T.lscalar() x = T.matrix('x') y = T.ivector('y') classifier = LogisticRegression(input = x, n_in = 28 * 28, n_out = 10) cost = classifier.negative_log_likelihood(y) test_model = theano.function( inputs = [index], outputs = classifier.errors(y), givens = { x: shared_data(test_data)[index * batch_size: (index + 1) * batch_size], y: T.cast(shared_data(test_label), 'int32')[index * batch_size: (index + 1) * batch_size] }) validate_model = theano.function( inputs = [index], outputs = classifier.errors(y), givens = { x: shared_data(valid_data)[index * batch_size: (index + 1) * batch_size], y: T.cast(shared_data(valid_label), 'int32')[index * batch_size: (index + 1) * batch_size] }) g_W = T.grad(cost = cost, wrt = classifier.W) g_b = T.grad(cost = cost, wrt = classifier.b) updates = [(classifier.W, classifier.W - learning_rate * g_W), (classifier.b, classifier.b - learning_rate * g_b)] train_model = theano.function( inputs = [index], outputs = cost, updates = updates, givens = { x: shared_data(train_data)[index * batch_size: (index + 1) * batch_size], y: T.cast(shared_data(train_label), 'int32')[index * batch_size: (index + 1) * batch_size] }) patience = 5000 patience_increase = 2 improvement_threshold = 0.995 validation_frequency = min(n_train_batches, patience // 2) best_validation_loss = numpy.inf test_score = 0. start_time = timeit.default_timer() done_looping = False epoch = 0 while (epoch < n_epochs) and (not done_looping): epoch = epoch + 1 for minibatch_index in range(n_train_batches): minibatch_avg_cost = train_model(minibatch_index) iter_num = (epoch - 1) * n_train_batches + minibatch_index if (iter_num + 1) % validation_frequency == 0: validation_losses = [validate_model(i) for i in range(n_valid_batches)] this_validation_loss = numpy.mean(validation_losses) print( 'epoch %i, minibatch %i/%i, validation error %f %%' % (epoch, minibatch_index + 1, n_train_batches, this_validation_loss * 100.)) if this_validation_loss < best_validation_loss: if this_validation_loss < best_validation_loss * improvement_threshold: patience = max(patience, iter_num * patience_increase) best_validation_loss = this_validation_loss test_losses = [test_model(i) for i in range(n_test_batches)] test_score = numpy.mean(test_losses) print(( ' epoch %i, minibatch %i/%i, test error of best model %f %%') % (epoch, minibatch_index + 1, n_train_batches, test_score * 100.)) if patience <= iter_num: done_looping = True break end_time = timeit.default_timer() print( ('Optimization complete with best validation score of %f %%, with test performance %f %%') % (best_validation_loss * 100., test_score * 100.)) print('The code run for %d epochs, with %f epochs/sec' % (epoch, 1. * epoch / (end_time - start_time))) # print n_train_batches, n_valid_batches, n_test_batchesif __name__ == '__main__': numpy.random.seed(123456) sgd_optimization_mnist('E:/ML/KNN/mnist_data/')六. 实验结果
如有错误,请指正。