求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
  
 
 
     
   
分享到
HBase用一个MR同时写入两张表
 

作者:fansy1990 ,发布于2013-2-21 ,来源:CSDN

 

以前在学习HBase的时候,也曾想过是否可以在一个MR中同时写入两个表,但以前在网上找的时候都找不到比较相关的答案,这两天又重新找了下,居然有类似的实现,然后就自己参考着写了下,基本可以运行,下面就详细说下思路:

原始数据如下:

  fansy,22,blog.csdu.net/fansy1990
  tom,25,blog.csdu.net/tom1987
  kate,23,blog.csdu.net/kate1989
  jake,20,blog.csdu.net/jake1992
  john,35,blog.csdu.net/john1977
  ben,30,blog.csdu.net/ben1982 

第一列代表name,dierlie代表age,disanlie代表webPage;要做的事情是把name和age存入表1,name和webPage存入表2;下面贴代码:

ImportToHB.java:

package org.fansy.multipletables;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * write to multiple tables
 * @author fansy
 *
 */
public class ImportToHB extends Configured implements Tool{


	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new ImportToHB(), args);
        System.exit(exitCode);
	}

	@Override
	public int run(String[] args) throws Exception {
		if(args.length!=7){
			System.err.println("wrong args length:"+args.length);
		//	System.out.println();			
			System.out.println("Usage: <input> <table1> <table1-fam> <table1-qua> "+
		"<table2> <table2-fam> <table2-qua>");
			System.exit(-1);
		}
		Configuration conf=new Configuration();
		conf.set("TABLE1", args[1]);
		conf.set("T1-FAM", args[2]);
		conf.set("T1-QUA", args[3]);
		conf.set("TABLE2", args[4]);
		conf.set("T2-FAM", args[5]);
		conf.set("T2-QUA", args[6]);
		Job job = new Job(conf);
        job.setJarByClass(ImportToHB.class);
        job.setMapperClass(MapperHB.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Writable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        job.setOutputFormatClass(MultiTableOutputFormat.class);
        job.setNumReduceTasks(0);

        if(job.waitForCompletion(true)){
        	return 0;
        }
		return -1;
	}
} 

MapperHB.java:

package org.fansy.multipletables;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;

public class MapperHB extends Mapper<LongWritable,Text,ImmutableBytesWritable,Writable>{
	private byte[] table1;
	private byte[] table2;
	private byte[] t1_fam;
	private byte[] t1_qua;
	private byte[] t2_fam;
	private byte[] t2_qua;
	
	public  void setup(Context context){
		table1=Bytes.toBytes(context.getConfiguration().get("TABLE1"));
		table2=Bytes.toBytes(context.getConfiguration().get("TABLE2"));
		t1_fam=Bytes.toBytes(context.getConfiguration().get("T1-FAM"));
		t1_qua=Bytes.toBytes(context.getConfiguration().get("T1-QUA"));
		t2_fam=Bytes.toBytes(context.getConfiguration().get("T2-FAM"));
		t2_qua=Bytes.toBytes(context.getConfiguration().get("T2-QUA"));
	}
	
	public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
		String[] info=value.toString().split(",");
		if(info.length!=3){
			return;
		}
		String name=info[0];
		String age=info[1];
		String webPage=info[2];
		
		// write to the first table row = name+age, value=age;
		 ImmutableBytesWritable putTable = new ImmutableBytesWritable(table1);
		 Put put = new Put(Bytes.toBytes(name+","+age));
		 put.add(t1_fam,t1_qua,Bytes.toBytes(age));
		 context.write(putTable, put);
		 
		 // write to the second table row=name+webPage,value=webPage
		  putTable = new ImmutableBytesWritable(table2);
		 put = new Put(Bytes.toBytes(name+","+webPage));
		 put.add(t2_fam,t2_qua,Bytes.toBytes(webPage));
		 context.write(putTable, put);
	}
} 

上面的代码只用了一个Mapper,同时写入两个HBase表中。这里的要点是设置Mapper的输出key和value的类型,按照上面的代码类型为:ImmutableBytesWritable和Writable,而且在job的声明处要设置输出类型:job.setOutputFormatClass(MultiTableOutputFormat.class);

如何运行上面的程序?

  (1)在HBase中创建两张表:
  create 'table1','info'
  create 'table2','info'
  (2)ImportToHB的输入参数如下:
  hdfs://master:9000/user/fansy/input/info.dat table1 info age table2 info webPage
  (3)直接在eclipse中运行 

运行后在HBase中察看输出的数据如下:

相关文章 相关文档 相关视频



我们该如何设计数据库
数据库设计经验谈
数据库设计过程
数据库编程总结
数据库性能调优技巧
数据库性能调整
数据库性能优化讲座
数据库系统性能调优系列
高性能数据库设计与优化
高级数据库架构师
数据仓库和数据挖掘技术
Hadoop原理、部署与性能调优
 
分享到
 
 


MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   


并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理


GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...