首页 > 学院 > 开发设计 > 正文

HBase 5种写入数据方式

2019-11-09 13:32:18
字体:
来源:转载
供稿:网友
问题导读:1.如何直接使用HTable进行导入?2.如何从HDFS文件导入HBase,继承自Mapper?3.如何读取HBase表写入HBase表中字段?4.如何让MR和HTable结合?Version :hadoop1.2.1; hbaes0.94.16;HBase写入数据方式(参考:《HBase The Definitive Guide》),可以简单分为下面几种:1. 直接使用HTable进行导入,代码如下:package hbase.curd;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.Random;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.util.Bytes;public class PutExample {        /**         * @param args         * @throws IOException          */        PRivate HTable  table = HTableUtil.getHTable("testtable");        public static void main(String[] args) throws IOException {                // TODO Auto-generated method stub                PutExample pe = new PutExample();                pe.putRows();                        }                public void putRows(){                List<Put> puts = new ArrayList<Put>();                for(int i=0;i<10;i++){                        Put put = new Put(Bytes.toBytes("row_"+i));                        Random random = new Random();                                                if(random.nextBoolean()){                                put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("colfam1_qual1_value_"+i));                        }                        if(random.nextBoolean()){                                put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("colfam1_qual1_value_"+i));                        }                        if(random.nextBoolean()){                                put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual3"), Bytes.toBytes("colfam1_qual1_value_"+i));                        }                        if(random.nextBoolean()){                                put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual4"), Bytes.toBytes("colfam1_qual1_value_"+i));                                }                        if(random.nextBoolean()){                                put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual5"), Bytes.toBytes("colfam1_qual1_value_"+i));                        }                        puts.add(put);                }                try{                        table.put(puts);                        table.close();                }catch(Exception e){                        e.printStackTrace();                        return ;                }                System.out.println("done put rows");        }}其中HTableUtil如下:package hbase.curd;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.util.Bytes;public class HTableUtil {        private static HTable table;        private static Configuration conf;                static{                conf =HBaseConfiguration.create();                conf.set("mapred.job.tracker", "hbase:9001");                conf.set("fs.default.name", "hbase:9000");                conf.set("hbase.zookeeper.quorum", "hbase");                try {                        table = new HTable(conf,"testtable");                } catch (IOException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                }        }        public static Configuration getConf(){                return conf;        }        public static HTable getHTable(String tablename){                if(table==null){                        try {                                table= new HTable(conf,tablename);                        } catch (IOException e) {                                // TODO Auto-generated catch block                                e.printStackTrace();                        }                 }                return table;        }                public static  byte[] gB(String name){                return Bytes.toBytes(name);        }}这一种是没有使用MR的,下面介绍的几种方式都是使用MR的。2.1 从HDFS文件导入HBase,继承自Mapper,代码如下:package hbase.mr;import java.io.IOException;import hbase.curd.HTableUtil;import org.apache.commons.cli.CommandLine;import org.apache.commons.cli.CommandLineParser;import org.apache.commons.cli.HelpFormatter;import org.apache.commons.cli.Option;import org.apache.commons.cli.Options;import org.apache.commons.cli.PosixParser;import org.apache.commons.codec.digest.DigestUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class ImportFromFile {        /**         * 从文件导入到HBase         * @param args         */        public static final String NAME="ImportFromFile";        public enum Counters{LINES}                static class ImportMapper extends Mapper<LongWritable,Text,                ImmutableBytesWritable,Writable>{                private byte[] family =null;                private byte[] qualifier = null;                @Override                protected void setup(Context cxt){                        String column = cxt.getConfiguration().get("conf.column");                        byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));                        family = colkey[0];                        if(colkey.length>1){                                qualifier = colkey[1];                        }                }                @Override                public void map(LongWritable offset,Text line,Context cxt){                        try{                                String lineString= line.toString();                                byte[] rowkey= DigestUtils.md5(lineString);                                Put put = new Put(rowkey);                                put.add(family,qualifier,Bytes.toBytes(lineString));                                cxt.write(new ImmutableBytesWritable(rowkey), put);                                cxt.getCounter(Counters.LINES).increment(1);                        }catch(Exception e){                                e.printStackTrace();                        }                }        }        private static CommandLine parseArgs(String[] args){                Options options = new Options();                Option o = new Option("t" ,"table",true,"table to import into (must exist)");                o.setArgName("table-name");                o.setRequired(true);                options.addOption(o);                                o= new Option("c","column",true,"column to store row data into");                o.setArgName("family:qualifier");                o.setRequired(true);                options.addOption(o);                                o = new Option("i", "input", true,                "the directory or file to read from");                o.setArgName("path-in-HDFS");                o.setRequired(true);                options.addOption(o);                options.addOption("d", "debug", false, "switch on DEBUG log level");                CommandLineParser parser = new PosixParser();                CommandLine cmd = null;                try {                        cmd = parser.parse(options, args);                } catch (Exception e) {                        System.err.println("ERROR: " + e.getMessage() + "/n");                        HelpFormatter formatter = new HelpFormatter();                        formatter.printHelp(NAME + " ", options, true);                        System.exit(-1);                }                return cmd;        }        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {                                Configuration conf = HTableUtil.getConf();                String[] otherArgs = new GenericOptionsParser(conf, initialArg()).getRemainingArgs();                 CommandLine cmd = parseArgs(otherArgs);                String table = cmd.getOptionValue("t");                String input = cmd.getOptionValue("i");                String column = cmd.getOptionValue("c");                conf.set("conf.column", column);                Job job = new Job(conf, "Import from file " + input + " into table " + table);                job.setJarByClass(ImportFromFile.class);                job.setMapperClass(ImportMapper.class);                job.setOutputFormatClass(TableOutputFormat.class);                job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);                job.setOutputKeyClass(ImmutableBytesWritable.class);                job.setOutputValueClass(Writable.class);                job.setNumReduceTasks(0);                 FileInputFormat.addInputPath(job, new Path(input));                System.exit(job.waitForCompletion(true) ? 0 : 1);        }                private static String[] initialArg(){                String []args = new String[6];                args[0]="-c";                args[1]="fam:data";                args[2]="-i";                args[3]="/user/hadoop/input/picdata";                args[4]="-t";                args[5]="testtable";                return args;        }}2.2 读取HBase表写入HBase表中字段,代码如下:package hbase.mr;import hadoop.util.HadoopUtils;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ParseDriver {        /**         * 把hbase表中数据拷贝到其他表(或本表)相同字段         * @param args         */        enum Counters{                VALID, ROWS, COLS, ERROR        }        private static Logger log = LoggerFactory.getLogger(ParseDriver.class);        static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{                private byte[] columnFamily =null ;                private byte[] columnQualifier =null;                @Override                protected void setup(Context cxt){                        columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));                        columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));                }                @Override                 public void map(ImmutableBytesWritable row,Result columns,Context cxt){                        cxt.getCounter(Counters.ROWS).increment(1);                        String value =null;                        try{                                Put put = new Put(row.get());                                for(KeyValue kv : columns.list()){                                        cxt.getCounter(Counters.COLS).increment(1);                                        value= Bytes.toStringBinary(kv.getValue());                                        if(equals(columnQualifier,kv.getQualifier())){  // 过滤column                                                put.add(columnFamily,columnQualifier,kv.getValue());                                                cxt.write(row, put);                                                cxt.getCounter(Counters.VALID).increment(1);                                        }                                }                        }catch(Exception e){                                log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+                                                ",Value:"+value);                                cxt.getCounter(Counters.ERROR).increment(1);                        }                }                private boolean equals(byte[] a,byte[] b){                        String aStr= Bytes.toString(a);                        String bStr= Bytes.toString(b);                        if(aStr.equals(bStr)){                                return true;                        }                        return false;                }        }                public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {                byte[] columnFamily = Bytes.toBytes("fam");                byte[] columnQualifier = Bytes.toBytes("data");                Scan scan = new Scan ();                scan.addColumn(columnFamily, columnQualifier);                HadoopUtils.initialConf("hbase");                Configuration conf = HadoopUtils.getConf();                conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));                conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));                                String input ="testtable" ;//                String output="testtable1"; //                                                 Job job = new Job(conf,"Parse data in "+input+",write to"+output);                job.setJarByClass(ParseDriver.class);                TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,                                 ImmutableBytesWritable.class, Put.class,job);                TableMapReduceUtil.initTableReducerJob(output, IdentityTableReducer.class, job);                                System.exit(job.waitForCompletion(true)?0:1);                        }}其中HadoopUtils代码如下:package hadoop.util;import java.io.IOException;import java.net.URI;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.util.LineReader;public class HadoopUtils {        private static Configuration conf;        public  static void initialConf(){                conf = new Configuration();                conf.set("mapred.job.tracker", "hbase:9001");                conf.set("fs.default.name", "hbase:9000");                conf.set("hbase.zookeeper.quorum", "hbase");        }        public  static void initialConf(String host){                conf = new Configuration();                conf.set("mapred.job.tracker", host+":9001");                conf.set("fs.default.name", host+":9000");                conf.set("hbase.zookeeper.quorum", host);        }        public static Configuration getConf(){                if(conf==null){                        initialConf();                }                return conf;        }                public static List<String> readFromHDFS(String fileName) throws IOException {                Configuration conf = getConf();                FileSystem fs = FileSystem.get(URI.create(fileName), conf);                FSDataInputStream hdfsInStream = fs.open(new Path(fileName));                // 按行读取(新版本的方法)                LineReader inLine = new LineReader(hdfsInStream, conf);                Text txtLine = new Text();                                int iResult = inLine.readLine(txtLine); //读取第一行                List<String> list = new ArrayList<String>();                while (iResult > 0 ) {                        list.add(txtLine.toString());                        iResult = inLine.readLine(txtLine);                }                                hdfsInStream.close();                fs.close();                return list;        }}2.3 MR和HTable结合,代码如下:package hbase.mr;import hadoop.util.HadoopUtils;import hbase.mr.AnalyzeDriver.Counters;import java.io.IOException;import java.util.Date;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ParseSinglePutDriver {        /**         * 使用HTable进行写入         * 把infoTable 表中的 qualifier字段复制到qualifier1字段         * 单个Put         * @param args         */        private static Logger log = LoggerFactory.getLogger(ParseMapper.class);        static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{                private HTable infoTable =null ;                private byte[] columnFamily =null ;                private byte[] columnQualifier =null;                private byte[] columnQualifier1 =null;                @Override                 protected void setup(Context cxt){                        log.info("ParseSinglePutDriver setup,current time: "+new Date());                        try {                                infoTable = new HTable(cxt.getConfiguration(),                                                cxt.getConfiguration().get("conf.infotable"));                                infoTable.setAutoFlush(false);                        } catch (IOException e) {                                log.error("Initial infoTable error:/n"+e.getMessage());                        }                        columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));                        columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));                        columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier1"));                }                @Override                 protected void cleanup(Context cxt){                        try {                                infoTable.flushCommits();                                log.info("ParseSinglePutDriver cleanup ,current time :"+new Date());                        } catch (IOException e) {                                log.error("infoTable flush commits error:/n"+e.getMessage());                        }                }                @Override                 public void map(ImmutableBytesWritable row,Result columns,Context cxt){                        cxt.getCounter(Counters.ROWS).increment(1);                        String value =null ;                        try{                                Put put = new Put(row.get());                                for(KeyValue kv : columns.list()){                                        cxt.getCounter(Counters.COLS).increment(1);                                        value= Bytes.toStringBinary(kv.getValue());                                        if(equals(columnQualifier,kv.getQualifier())){  // 过滤column                                                put.add(columnFamily,columnQualifier1,kv.getValue());                                                infoTable.put(put);                                        }                                }                        }catch(Exception e){                                log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+                                                ",Value:"+value);                                cxt.getCounter(Counters.ERROR).increment(1);                        }                }                private boolean equals(byte[] a,byte[] b){                        String aStr= Bytes.toString(a);                        String bStr= Bytes.toString(b);                        if(aStr.equals(bStr)){                                return true;                        }                        return false;                }        }        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {                String input ="testtable";                byte[] columnFamily = Bytes.toBytes("fam");                byte[] columnQualifier = Bytes.toBytes("data");                byte[] columnQualifier1 = Bytes.toBytes("data1");                Scan scan = new Scan ();                scan.addColumn(columnFamily, columnQualifier);                HadoopUtils.initialConf("hbase");                Configuration conf = HadoopUtils.getConf();                conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));                conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));                conf.set("conf.columnqualifier1", Bytes.toStringBinary(columnQualifier1));                conf.set("conf.infotable", input);                                Job job = new Job(conf,"Parse data in "+input+",into tables");                job.setJarByClass(ParseSinglePutDriver.class);                TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,                                 ImmutableBytesWritable.class, Put.class,job);                        job.setOutputFormatClass(NullOutputFormat.class);                job.setNumReduceTasks(0);                System.exit(job.waitForCompletion(true)?0:1);        }}2.4 上面2.3中的HTable其实也是可以put一个List的,下面的方式就是put一个list的方式,这样效率会高。package hbase.mr;import hadoop.util.HadoopUtils;import hbase.mr.AnalyzeDriver.Counters;import java.io.IOException;import java.util.ArrayList;import java.util.Date;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ParseListPutDriver {        /**         * 使用HTable进行写入         * List <Put> 进行测试,查看效率         * 把infoTable 表中的 qualifier字段复制到qualifier1字段         * @param args         */        private static Logger log = LoggerFactory.getLogger(ParseMapper.class);        static class ParseMapper extends TableMapper<ImmutableBytesWritable,Writable>{                private HTable infoTable =null ;                private byte[] columnFamily =null ;                private byte[] columnQualifier =null;                private byte[] columnQualifier1 =null;                private List<Put> list = new ArrayList<Put>();                @Override                 protected void setup(Context cxt){                        log.info("ParseListPutDriver setup,current time: "+new Date());                        try {                                infoTable = new HTable(cxt.getConfiguration(),                                                cxt.getConfiguration().get("conf.infotable"));                                infoTable.setAutoFlush(false);                        } catch (IOException e) {                                log.error("Initial infoTable error:/n"+e.getMessage());                        }                        columnFamily = Bytes.toBytes(cxt.getConfiguration().get("conf.columnfamily"));                        columnQualifier = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier"));                        columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get("conf.columnqualifier1"));                }                @Override                 protected void cleanup(Context cxt){                        try {                                infoTable.put(list);                                infoTable.flushCommits();                                log.info("ParseListPutDriver cleanup ,current time :"+new Date());                        } catch (IOException e) {                                log.error("infoTable flush commits error:/n"+e.getMessage());                        }                }                @Override                 public void map(ImmutableBytesWritable row,Result columns,Context cxt){                        cxt.getCounter(Counters.ROWS).increment(1);                        String value =null ;                        try{                                Put put = new Put(row.get());                                for(KeyValue kv : columns.list()){                                        cxt.getCounter(Counters.COLS).increment(1);                                        value= Bytes.toStringBinary(kv.getValue());                                        if(equals(columnQualifier,kv.getQualifier())){  // 过滤column                                                put.add(columnFamily,columnQualifier1,kv.getValue());                                                list.add(put);                                        }                                }                        }catch(Exception e){                                log.info("Error:"+e.getMessage()+",Row:"+Bytes.toStringBinary(row.get())+                                                ",Value:"+value);                                cxt.getCounter(Counters.ERROR).increment(1);                        }                }                private boolean equals(byte[] a,byte[] b){                        String aStr= Bytes.toString(a);                        String bStr= Bytes.toString(b);                        if(aStr.equals(bStr)){                                return true;                        }                        return false;                }        }        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {                String input ="testtable";                byte[] columnFamily = Bytes.toBytes("fam");                byte[] columnQualifier = Bytes.toBytes("data");                byte[] columnQualifier1 = Bytes.toBytes("data2");                Scan scan = new Scan ();                scan.addColumn(columnFamily, columnQualifier);                HadoopUtils.initialConf("hbase");                Configuration conf = HadoopUtils.getConf();                conf.set("conf.columnfamily", Bytes.toStringBinary(columnFamily));                conf.set("conf.columnqualifier", Bytes.toStringBinary(columnQualifier));                conf.set("conf.columnqualifier1", Bytes.toStringBinary(columnQualifier1));                conf.set("conf.infotable", input);                                Job job = new Job(conf,"Parse data in "+input+",into tables");                job.setJarByClass(ParseListPutDriver.class);                TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,                                 ImmutableBytesWritable.class, Put.class,job);                        job.setOutputFormatClass(NullOutputFormat.class);                job.setNumReduceTasks(0);                System.exit(job.waitForCompletion(true)?0:1);        }}数据记录条数为:26632,可以看到下面图片中的时间记录对比:  由于结合了hbase,所以需要在hadoop_home/lib目录下面加些额外的包,其整个包如下(hbase1.0.jar为编译打包的MR程序): 
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表