keras用得比较多,但是有时keras用起来不是很顺手。因为keras是基于theano或TensorFlow的,所以趁假期有空,先看看theano。
Classifying MNIST digits using Logistic Regression 官方文档:http://deeplearning.net/tutorial/logreg.html
一. 任务描述
用Logistic Regression回归做手写体数字识别,用theano实现。
二. 问题建模
Logistic Regression是一个线性的分类器,该模型包含权值矩阵W和偏移向量b。则输入数据x对应着分类为i的概率可以表示为: P(Y=i|x,W,b)=softmaxi(Wx+b)=eWix+bi∑jeWjx+bj 最后预测的类别为: yPRed=argmaxip(Y=i|x,W,b) 对于数据集D似然函数L,损失函数l可以表示为: L(θ={W,b},D)=∑i=0|D|log(P(Y=y(i)|x(i),W,b)) l(θ={W,b},D)=−L(θ={W,b},D)
三. 数据集
本实验用的数据集和之前一篇博客用的数据集一样,具体可以参考:http://blog.csdn.net/whai362/article/details/51813404 对于每个数字是一张28×28的灰度图,这里的做法是直接拉成784维的向量,然后归一化作为输入数据x,具体处理方法详见代码。
四. 依赖的第三方库
Theano 0.8.2OpenCV 3.0Python 2.7五. 实验代码
file_util.pyimport osdef read_dir(root, suffix = 'null'):    file_path_list = []    for file_path, dirs, files in os.walk(root):        for file in files:            if suffix != 'null' and not (file.find(suffix) >= 0 and file.find(suffix) < len(file)):                continue            file_path_list.append(os.path.join(file_path, file))    file_path_list.sort()    return file_path_listdef read_file(file_path):    file_object = open(file_path, 'r')    file_content = file_object.read()    file_object.close()    return file_content.decode('utf-8', 'ignore')def write_file(file_path, file_content):    file_object = open(file_path, 'w')    file_object.write(file_content)    file_object.close()theano_logistic.pyimport theanoimport theano.tensor as Timport cv2import numpyimport timeitimport file_utilclass LogisticRegression(object):    def __init__(self, input, n_in, n_out):        self.W = theano.shared(            value = numpy.zeros((n_in, n_out), dtype = theano.config.floatX),            name = 'W',            borrow = True)        self.b = theano.shared(            value = numpy.zeros((n_out,), dtype = theano.config.floatX),            name = 'b',            borrow = True)        self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)        self.y_pred = T.argmax(self.p_y_given_x, axis = 1)        self.params = [self.W, self.b]        self.input = input    def negative_log_likelihood(self, y):        return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])    def errors(self, y):        if y.ndim != self.y_pred.ndim:            raise TypeError(                'y should have the same shape as self.y_pred',                ('y', y.type, 'y_pred', self.y_pred.type))        if y.dtype.startswith('int'):            return T.mean(T.neq(self.y_pred, y))        else:            raise NotImplementedError()def shared_data(data, borrow=True):    shared = theano.shared(        numpy.asarray(data, dtype = theano.config.floatX),        borrow = borrow)    return shareddef load_data(data_path):    img_path_list = file_util.read_dir(data_path)    img_num = len(img_path_list)    data = []    label = []    for img_path in img_path_list:        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)        data.append(img.reshape((img.shape[0] * img.shape[1])) / 255.0)        label.append((int)(img_path.split('/')[-1].split('.')[0]))    data = numpy.mat(data)    label = numpy.array(label)    # print data.shape    # print label.shape    idx = numpy.random.permutation(img_num)    data = data[idx]    label = label[idx]    train_data, valid_data, test_data = data[:img_num / 3], data[img_num / 3:img_num * 2 / 3], data[img_num * 2 / 3:]    train_label, valid_label, test_label = label[:img_num / 3], label[img_num / 3:img_num * 2 / 3], label[img_num * 2 / 3:]    # print train_data.shape    # print valid_data.shape    # print test_data.shape    return train_data, train_label, valid_data, valid_label, test_data, test_labeldef sgd_optimization_mnist(data_path, learning_rate = 0.13, n_epochs = 1000, batch_size = 600):    train_data, train_label, valid_data, valid_label, test_data, test_label = load_data(data_path)    n_train_batches = train_data.shape[0] // batch_size    n_valid_batches = valid_data.shape[0] // batch_size    n_test_batches = test_data.shape[0] // batch_size    print('Building the model...')    index = T.lscalar()    x = T.matrix('x')    y = T.ivector('y')    classifier = LogisticRegression(input = x, n_in = 28 * 28, n_out = 10)    cost = classifier.negative_log_likelihood(y)    test_model = theano.function(        inputs = [index],        outputs = classifier.errors(y),        givens = {            x: shared_data(test_data)[index * batch_size: (index + 1) * batch_size],            y: T.cast(shared_data(test_label), 'int32')[index * batch_size: (index + 1) * batch_size]        })    validate_model = theano.function(        inputs = [index],        outputs = classifier.errors(y),        givens = {            x: shared_data(valid_data)[index * batch_size: (index + 1) * batch_size],            y: T.cast(shared_data(valid_label), 'int32')[index * batch_size: (index + 1) * batch_size]        })    g_W = T.grad(cost = cost, wrt = classifier.W)    g_b = T.grad(cost = cost, wrt = classifier.b)    updates = [(classifier.W, classifier.W - learning_rate * g_W), (classifier.b, classifier.b - learning_rate * g_b)]    train_model = theano.function(        inputs = [index],        outputs = cost,        updates = updates,        givens = {            x: shared_data(train_data)[index * batch_size: (index + 1) * batch_size],            y: T.cast(shared_data(train_label), 'int32')[index * batch_size: (index + 1) * batch_size]        })    patience = 5000    patience_increase = 2    improvement_threshold = 0.995    validation_frequency = min(n_train_batches, patience // 2)    best_validation_loss = numpy.inf    test_score = 0.    start_time = timeit.default_timer()    done_looping = False    epoch = 0    while (epoch < n_epochs) and (not done_looping):        epoch = epoch + 1        for minibatch_index in range(n_train_batches):            minibatch_avg_cost = train_model(minibatch_index)            iter_num = (epoch - 1) * n_train_batches + minibatch_index            if (iter_num + 1) % validation_frequency == 0:                validation_losses = [validate_model(i) for i in range(n_valid_batches)]                this_validation_loss = numpy.mean(validation_losses)                print(                    'epoch %i, minibatch %i/%i, validation error %f %%' %                    (epoch, minibatch_index + 1, n_train_batches, this_validation_loss * 100.))                if this_validation_loss < best_validation_loss:                    if this_validation_loss < best_validation_loss * improvement_threshold:                        patience = max(patience, iter_num * patience_increase)                    best_validation_loss = this_validation_loss                    test_losses = [test_model(i) for i in range(n_test_batches)]                    test_score = numpy.mean(test_losses)                    print((                        '   epoch %i, minibatch %i/%i, test error of best model %f %%') %                         (epoch, minibatch_index + 1, n_train_batches, test_score * 100.))            if patience <= iter_num:                done_looping = True                break    end_time = timeit.default_timer()    print(        ('Optimization complete with best validation score of %f %%, with test performance %f %%') %        (best_validation_loss * 100., test_score * 100.))    print('The code run for %d epochs, with %f epochs/sec' %        (epoch, 1. * epoch / (end_time - start_time)))    # print n_train_batches, n_valid_batches, n_test_batchesif __name__ == '__main__':    numpy.random.seed(123456)    sgd_optimization_mnist('E:/ML/KNN/mnist_data/')六. 实验结果

如有错误,请指正。