本文共 5833 字,大约阅读时间需要 19 分钟。
1.map
package mr.hdfstoHbase.HbaseTOHbase;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Text;import java.io.IOException;import java.util.HashMap;import java.util.List;public class HbasetoHbaseMapper extends TableMapper{ private Text mapKey = new Text(); private MyTableWritable mapValue = new MyTableWritable(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String str = Bytes.toString(key.get()); mapKey.set(str); List cells = value.listCells(); HashMap | cellMap = new HashMap<>(); for (Cell tmp : cells ) { String family = Bytes.toString(tmp.getFamilyArray(), tmp.getFamilyOffset(), tmp.getFamilyLength()); String qKey = Bytes.toString(tmp.getQualifierArray(), tmp.getQualifierOffset(), tmp.getQualifierLength()); String values = Bytes.toString(tmp.getValueArray(), tmp.getValueOffset(), tmp.getValueLength()); if (family.equals("info")) { cellMap.put(qKey, values); } } mapValue.setId(Integer.valueOf(str)); mapValue.setName(cellMap.get("name")); mapValue.setAge(Integer.valueOf(cellMap.get("age"))); context.write(mapKey, mapValue); }}
2.reduce
package mr.hdfstoHbase.HbaseTOHbase;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.mapreduce.TableReducer;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import java.io.IOException;/** * 从hbase写入hbase中 * 例子:hadoop:student 到 hadoop:mystu ** public abstract class TableReducer
*/public class HbaseToHbaseReducer extends TableReducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (MyTableWritable tmp : values) { Put put = new Put(Bytes.toBytes(key.toString())); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("id"), Bytes.toBytes(tmp.getId())); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"), Bytes.toBytes(tmp.getName())); put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"), Bytes.toBytes(tmp.getAge())); context.write(NullWritable.get(), put); } }}
3.runner
package mr.hdfstoHbase.HbaseTOHbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import java.io.IOException;public class HbaseToHbaseRunner { public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { System.setProperty("hadoop.home.dir", "E:\\software\\bigdate\\hadoop-2.6.0-cdh5.15.0\\hadoop-2.6.0-cdh5.15.0"); Configuration conf = new Configuration(); //hdfs入口 conf.set("fs.defaultFS", "hdfs://wang:9000"); conf.set("zookeeper.znode.parent", "/hbase"); conf.set("hbase.zookeeper.quorum", "wang"); conf.set("hbase.zookeeper.property.clientPort", "2181"); Job job = Job.getInstance(conf); job.setJobName("HbaseToHbaseRunnerJob"); job.setJarByClass(HbaseToHbaseRunner.class); //设置输入类型,以及hbase初始化 /* public static void initTableMapperJob(String table, Scan scan, Class mapper, Class outputKeyClass, Class outputValueClass, Job job)*/ TableMapReduceUtil.initTableMapperJob("hadoop:student",//hbase表的名称 new Scan(),//全表扫描 HbasetoHbaseMapper.class, Text.class, MyTableWritable.class, job); /* public static void initTableReducerJob(String table, Class reducer, Job job)*/ TableMapReduceUtil.initTableReducerJob( "hadoop:mystu", HbaseToHbaseReducer.class, job ); //执行任务 job.waitForCompletion(true); }}
4.自定义类
package mr.hdfstoHbase.HbaseTOHbase;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class MyTableWritable implements Writable { //private String id=null; private int id = 0; private String name = null; private int age = 0; @Override public void write(DataOutput out) throws IOException { // out.writeUTF(id); out.writeInt(id); out.writeUTF(name); out.writeInt(age); } @Override public void readFields(DataInput in) throws IOException { // id=in.readUTF(); id = in.readInt(); name = in.readUTF(); age = in.readInt(); }/* public String getId() { return id; } public void setId(String id) { this.id = id; }*/ public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } @Override public String toString() { return "MyTableWritable{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", age=" + age + '}'; }}
转载地址:http://wgjxi.baihongyu.com/