`
wbj0110
  • 浏览: 1553148 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Hbase和solr在海量数据查询中的应用

阅读更多

对于历史数据的查询,在数据规模不大的情况下,可以用传统的关系型数据库,如oracle,mysql等,可以利用他们提供的索引功能,实现高效的查询。

但是当数据上升到一定规模后,用传统的关系型数据库就不太合适了,当然可以把数据存到分布式数据库HBase中。

HBase目前只支持对rowkey的一级索引,对于二级索引还不支持,当然可以把所有要索引的字段都拼接到rowkey中,根据hbase的filter功能进行查询,

如按照查找一个用户的一段时间的订单,rowkey可以这样设计,rowkey="100022333||2012-12-23:10:20||orderNum",由于hbase在存储时,默认按照rowkey进行排序,这样一个用户的历史数据会集中在一个region中,这样便于顺序的查找。用这种方式的一个弊端是对分页的功能支持的不好,分页所用的总count数量和rowNum,可以在corprocessor中实现记录数量的汇总,但是对于从哪条条记录开始rowNum,则不太好支持,并且总记录数量的汇总需要单独用coprocessor的endpoint来实现,这样就增加了计算量;如果放在客户端做分页,对海量数据来说,是不可行的。

当然可以在Hbase中对该表生成对应的索引表,有几个二级索引就有几张表,如主表的rowkey设计为rowkey="orderNum",那么索引表就是rowkey="usetNum||orderdate",cf="orderNum",同样也会有分页的问题。

下面提出一种sorl+hbase的方式,solr建立索引(solr支持分页的操作),hbase存储数据。

在往HBase写数据的过程中,建立solr索引,可以在HBase的coprocessor的observer功能中实现

public class SorlIndexCoprocessorObserver extends BaseRegionObserver {
 /**
  * 建立solr索引
  */
 public void postPut(final ObserverContext<RegionCoprocessorEnvironment> e,
   final Put put, final WALEdit edit, final boolean writeToWAL)
   throws IOException {
  byte[] rowKey = put.getRow();
  String rowKeyStr = new String(rowKey, "UTF-8");
  List<KeyValue> kv = put
    .get("cf".getBytes(), Bytes.toBytes("orderTime"));
  String orderTime = new String(kv.get(0).getValue(), "UTF-8");

  List<KeyValue> kv2 = put.get("cf".getBytes(), Bytes.toBytes("userNum"));
  String userNum = new String(kv2.get(0).getValue(), "UTF-8");

  String solrUrl = "http://10.1.1.57:8082/solr";
  SolrServer solr = null;
  try {
   solr = new CommonsHttpSolrServer(solrUrl);
  } catch (MalformedURLException err) {
   // TODO Auto-generated catch block
   err.printStackTrace();
  }
  SolrInputDocument siDoc = new SolrInputDocument();
  siDoc.addField("id", rowKeyStr);
  siDoc.addField("rowkey", rowKeyStr);
  siDoc.addField("orderTime", orderTime);
  siDoc.addField("userNum", userNum);
  try {
   solr.add(siDoc);
  } catch (SolrServerException e1) {
   // TODO Auto-generated catch block
   e1.printStackTrace();
  } catch (IOException e2) {
   // TODO Auto-generated catch block
   e2.printStackTrace();
  }
  try {
   solr.commit();
  } catch (SolrServerException e3) {
   e3.printStackTrace();
  } catch (IOException e1) {
   e1.printStackTrace();
  }

 }

配置coprocessor

alter ‘t1′, METHOD => ‘table_att', ‘coprocessor'=> ‘hdfs:///xxx.jar|com.newcosoft.hadoop.hbase.SorlIndexCoprocessorObserver|1001

一个表上可以配置多个协同处理器,一个序列会自动增长进行标识。加载协同处理器(可以说是过滤程序)需要符合以下规则:

[coprocessor jar file location] | class name | [priority] | [arguments] 

对于solr中的commit操作,commit提交后,索引flush到硬盘上,并触发listener,创造新的insexSearcher(新的insexReader,从硬盘中加载索引),这样后续的查询就用新的insexsearcher了对查询性能影响比较大。在批量导入的情况下,可以导入完成后,单独调用sorl的commit操作。

此处采用solr的master,slave方式,master提供索引构建,多个slave提供索引查询;对于master的commit操作,会产生一个新的snapshot,

slave上的Snappuller程序一般是在crontab上面执行的,它会去master询问,有没有新版的snapshot;一旦发现新的版本,slave就会把它下载下来,然后snapinstall。

当一个新的searcher被open的时候,会有一个缓存预热的过程,预热之后,新的索引才会交付使用;这里会控制Snappuller程序的执行频率,solr的优化这里不做深入。

从solr中根据索引字段以及startrow、pagesize查找出对应的历史订单orderNum list后,遍历该list,去hbase中进行查询

 String solrUrlSlave = "http://10.1.1.59:8082/solr";
  SolrServer solr2 = null;
  try {
   solr2 = new CommonsHttpSolrServer(solrUrlSlave);
  } catch (MalformedURLException e) {
   // TODO Auto-generated catch block
   e.printStackTrace();
  }
  
  String queryString = "q=userNum:1111002&orderDate=2012-10-12&startrow=1&pagesize=10&sort=orderDate+desc";
  SolrParams solrParams = SolrRequestParsers
    .parseQueryString(queryString);
  try {
   QueryResponse rsp = solr2.query(solrParams);
   SolrDocumentList docList=rsp.getResults();
//遍历该docList,到HBASE中查找   

//TODO
  } catch (SolrServerException e) {
   e.printStackTrace();
  }

 

当然关于solr的索引,需要考虑到sorl的HA,有很多策略,这里不做介绍了,后面出单独的章节做阐述

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics