首页 > 开发 > Java > 正文

java 中Spark中将对象序列化存储到hdfs

2024-07-13 10:08:48
字体:
来源:转载
供稿:网友

java 中Spark中将对象序列化存储到hdfs

摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.

废话不多说, 直接贴代码了. spark1.4 + hbase0.98

import org.apache.spark.storage.StorageLevelimport scala.collection.JavaConverters._import java.io.Fileimport java.io.FileInputStreamimport java.io.FileOutputStreamimport java.io.ObjectInputStreamimport java.io.ObjectOutputStreamimport java.net.URIimport java.util.Dateimport org.ansj.library.UserDefineLibraryimport org.ansj.splitWord.analysis.NlpAnalysisimport org.ansj.splitWord.analysis.ToAnalysisimport org.apache.hadoop.fs.FSDataInputStreamimport org.apache.hadoop.fs.FSDataOutputStreamimport org.apache.hadoop.fs.FileSystemimport org.apache.hadoop.fs.FileUtilimport org.apache.hadoop.fs.Pathimport org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}import org.apache.hadoop.hbase.filter.FilterListimport org.apache.hadoop.hbase.filter.PageFilterimport org.apache.hadoop.hbase.filter.RegexStringComparatorimport org.apache.hadoop.hbase.filter.SingleColumnValueFilterimport org.apache.hadoop.hbase.filter.CompareFilter.CompareOpimport org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.hadoop.hbase.protobuf.ProtobufUtilimport org.apache.hadoop.hbase.util.{Base64, Bytes}import com.feheadline.fespark.db.Neo4jManagerimport com.feheadline.fespark.util.Envimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.rdd._import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}import scala.math.logimport scala.io.Sourceobject Word2VecDemo { def convertScanToString(scan: Scan) = {  val proto = ProtobufUtil.toScan(scan)  Base64.encodeBytes(proto.toByteArray) } def main(args: Array[String]): Unit = {  val sparkConf = new SparkConf().setAppName("Word2Vec Demo")  sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  sparkConf.set("spark.kryoserializer.buffer", "256m")  sparkConf.set("spark.kryoserializer.buffer.max","2046m")  sparkConf.set("spark.akka.frameSize", "500")  sparkConf.set("spark.rpc.askTimeout", "30")    val sc = new SparkContext(sparkConf)  val hbaseConf = HBaseConfiguration.create()  hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")  hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")  val scan = new Scan()  val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)    val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")    val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(  "data".getBytes,  "article".getBytes,  CompareOp.EQUAL,  comp  )    filterList.addFilter(articleFilter)  filterList.addFilter(new PageFilter(100))    scan.setFilter(filterList)  scan.setCaching(50)  scan.setCacheBlocks(false)  hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))  val crawledRDD = sc.newAPIHadoopRDD(   hbaseConf,   classOf[TableInputFormat],   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],   classOf[org.apache.hadoop.hbase.client.Result]  )   val articlesRDD = crawledRDD.filter{   case (_,result) => {     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))     content != null   }  }  val wordsInDoc = articlesRDD.map{   case (_,result) => {     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))     if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq     else Seq("")   }  }    val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)    val word2vec = new Word2Vec()  val model = word2vec.fit(fitleredWordsInDoc)    //---------------------------------------重点看这里-------------------------------------------------------------  //将上面的模型存储到hdfs  val hadoopConf = sc.hadoopConfiguration  hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")  val fileSystem = FileSystem.get(hadoopConf)  val path = new Path("/user/hadoop/data/mllib/word2vec-object")  val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))  oos.writeObject(model)  oos.close    //这里示例另外一个程序直接从hdfs读取序列化对象使用模型  val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))  val sample_model = ois.readObject.asInstanceOf[Word2VecModel]    /*  * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型  * import java.io._  * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}  * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))  * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]  * ois.close  */  //-------------------------------------------------------------------------------------------------------------- }}

感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!


注:相关教程知识阅读请移步到JAVA教程频道。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表