func (this *Mongodb) StoreRecords(dbName, collection, storeDbName, tmpCollection string, queryMap, selecter bson.M) (err error) { session := this.GetSession() defer session.Close() iter := session.DB(dbName).C(collection).Find(queryMap).Select(selecter).Iter() oneMap := make(bson.M) historyMapSlice := make([]interface{}, 0, MAX_BULK_WRITE_FILE_LEN) flag := false for iter.Next(&oneMap) { tmpMap := DeepCopy(oneMap)//go深度拷贝 historyMapSlice = append(historyMapSlice, tmpMap.(bson.M)) if len(historyMapSlice) >= MAX_BULK_WRITE_FILE_LEN {//写入最大长度 this.BulkInsert(storeDbName, tmpCollection, historyMapSlice) historyMapSlice = make([]interface{}, 0, MAX_BULK_WRITE_FILE_LEN) flag = true } } if err = iter.Close(); err != nil { log.Error("StoreRecords,iter.Close() fail, db:", dbName, ", Collection:", collection, ", queryCondition:", queryMap, ", selector:", selecter, ", ErrInfo:", err) } if 0 == len(historyMapSlice) && !flag { err = errors.New("not found StoreRecords data") return } this.BulkInsert(storeDbName, tmpCollection, historyMapSlice) return}func (this *Mongodb) BulkInsert(dataBase, collection string, allRecords []interface{}) (err error) { if 0 == len(allRecords) { log.Info("bulkInsert: 0 == len(allRecords)! db:", dataBase, "collection:", collection) return } bulkInsert := func(c *mgo.Collection) error { bulk := c.Bulk() bulk.Unordered() bulk.Insert(allRecords...) _, err := bulk.Run() return err } err = this.WitchCollection(dataBase, collection, bulkInsert) if err != nil { log.Error("mongodb BulkInsert, BulkInsert Failed, db:", dataBase, ", Collection:", collection, ", ErrInfo:", err) } return}
新闻热点
疑难解答