博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
从hbase到hbase数据传输
阅读量:4160 次
发布时间:2019-05-26

本文共 5833 字,大约阅读时间需要 19 分钟。

1.map

package mr.hdfstoHbase.HbaseTOHbase;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import java.io.IOException;import java.util.HashMap;import java.util.List;public class HbasetoHbaseMapper extends TableMapper
{ private Text mapKey = new Text(); private MyTableWritable mapValue = new MyTableWritable(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String str = Bytes.toString(key.get()); mapKey.set(str); List
cells = value.listCells(); HashMap
cellMap = new HashMap<>(); for (Cell tmp : cells ) { String family = Bytes.toString(tmp.getFamilyArray(), tmp.getFamilyOffset(), tmp.getFamilyLength()); String qKey = Bytes.toString(tmp.getQualifierArray(), tmp.getQualifierOffset(), tmp.getQualifierLength()); String values = Bytes.toString(tmp.getValueArray(), tmp.getValueOffset(), tmp.getValueLength()); if (family.equals("info")) { cellMap.put(qKey, values); } } mapValue.setId(Integer.valueOf(str)); mapValue.setName(cellMap.get("name")); mapValue.setAge(Integer.valueOf(cellMap.get("age"))); context.write(mapKey, mapValue); }}

2.reduce

package mr.hdfstoHbase.HbaseTOHbase;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import java.io.IOException;/** * 从hbase写入hbase中 * 例子:hadoop:student 到 hadoop:mystu * 

* public abstract class TableReducer

*/public class HbaseToHbaseReducer extends TableReducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { for (MyTableWritable tmp : values) { Put put = new Put(Bytes.toBytes(key.toString())); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("id"), Bytes.toBytes(tmp.getId())); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"), Bytes.toBytes(tmp.getName())); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"), Bytes.toBytes(tmp.getAge())); context.write(NullWritable.get(), put); } }}

3.runner

package mr.hdfstoHbase.HbaseTOHbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import java.io.IOException;public class HbaseToHbaseRunner {    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {        System.setProperty("hadoop.home.dir", "E:\\software\\bigdate\\hadoop-2.6.0-cdh5.15.0\\hadoop-2.6.0-cdh5.15.0");        Configuration conf = new Configuration();        //hdfs入口        conf.set("fs.defaultFS", "hdfs://wang:9000");        conf.set("zookeeper.znode.parent", "/hbase");        conf.set("hbase.zookeeper.quorum", "wang");        conf.set("hbase.zookeeper.property.clientPort", "2181");        Job job = Job.getInstance(conf);        job.setJobName("HbaseToHbaseRunnerJob");        job.setJarByClass(HbaseToHbaseRunner.class);        //设置输入类型,以及hbase初始化       /* public static void initTableMapperJob(String table, Scan scan,                Class
mapper, Class
outputKeyClass, Class
outputValueClass, Job job)*/ TableMapReduceUtil.initTableMapperJob("hadoop:student",//hbase表的名称 new Scan(),//全表扫描 HbasetoHbaseMapper.class, Text.class, MyTableWritable.class, job); /* public static void initTableReducerJob(String table, Class
reducer, Job job)*/ TableMapReduceUtil.initTableReducerJob( "hadoop:mystu", HbaseToHbaseReducer.class, job ); //执行任务 job.waitForCompletion(true); }}

4.自定义类

package mr.hdfstoHbase.HbaseTOHbase;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class MyTableWritable implements Writable {    //private  String id=null;    private int id = 0;    private String name = null;    private int age = 0;    @Override    public void write(DataOutput out) throws IOException {        //  out.writeUTF(id);        out.writeInt(id);        out.writeUTF(name);        out.writeInt(age);    }    @Override    public void readFields(DataInput in) throws IOException {        //   id=in.readUTF();        id = in.readInt();        name = in.readUTF();        age = in.readInt();    }/*    public String getId() {        return id;    }    public void setId(String id) {        this.id = id;    }*/    public int getId() {        return id;    }    public void setId(int id) {        this.id = id;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public int getAge() {        return age;    }    public void setAge(int age) {        this.age = age;    }    @Override    public String toString() {        return "MyTableWritable{" +                "id='" + id + '\'' +                ", name='" + name + '\'' +                ", age=" + age +                '}';    }}

 

转载地址:http://wgjxi.baihongyu.com/

你可能感兴趣的文章
【Python基础6】格式化字符串
查看>>
【Python基础7】字典
查看>>
【Python基础8】函数参数
查看>>
【Python基础9】浅谈深浅拷贝及变量赋值
查看>>
Jenkins定制一个具有筛选功能的列表视图
查看>>
【Python基础10】探索模块
查看>>
【Python】将txt文件转换为html
查看>>
[Linux]Shell脚本实现按照模块信息拆分文件内容
查看>>
idea添加gradle模块报错The project is already registered
查看>>
在C++中如何实现模板函数的外部调用
查看>>
在C++中,关键字explicit有什么作用
查看>>
C++中异常的处理方法以及使用了哪些关键字
查看>>
内存分配的形式有哪些? C++
查看>>
什么是内存泄露,如何避免内存泄露 C++
查看>>
栈和堆的空间大小 C++
查看>>
什么是缓冲区溢出 C++
查看>>
sizeof C++
查看>>
使用指针有哪些好处? C++
查看>>
引用还是指针?
查看>>
checkio-non unique elements
查看>>