1.如果是格式化成Json的話直接
val rdd = df.toJSON.rdd
2.如果要指定格式需要自定义函数如下:
//格式化具体字段条目
def formatItem(p:(StructField,Any)):String={ p match { case (sf,a) => sf.dataType match { case StringType => "/"" + sf.name + "/":/"" + a + "/"" case IntegerType => "/"" + sf.name + "/":" + a case LongType => "/"" + sf.name + "/":" + a case StructType(s) => "/"" + sf.name + "/":" + formatStruct(s, a.asInstanceOf[Row]) } } }
//格式化整行数据格式def formatStruct(schema:Seq[StructField],r:Row)= { val paired = schema.zip(r.toSeq) "{" + paired.foldLeft("")((s,p) => (if(s == "") "" else (s + ", ")) + formatItem(p)) + "}" }
//格式化整个DFdef formatDataFrame(st:StructType,srdd:DataFrame)={ srdd.rdd.map(formatStruct(st.fields,_))}
调用示例:
val strings = formatDataFrame(df.schema, df)
strings.foreach { PRintln }
1.RDD -> Dataset val ds = rdd.toDS()2.RDD -> DataFrame val df = spark.read.json(rdd)3.Dataset -> RDDval rdd = ds.rdd4.Dataset -> DataFrameval df = ds.toDF()5.DataFrame -> RDDval rdd = df.toJSON.rdd6.DataFrame -> Datasetval ds = df.toJSON
转载于http://www.CUOXin.com/ciade/
新闻热点
疑难解答