Hbase关键的几个点

一. 什么时候需要HBase

  1. 半结构化或非结构化数据

    对于数据结构字段不够确定或杂乱无章很难按一个概念去进行抽取的数据适合用HBase。当业务发展需要增加存储比如一个用户的email,phone,address信息时RDBMS需要停机维护,而HBase支持动态增加.

  2. 记录非常稀疏

    RDBMS的行有多少列是固定的,为null的列浪费了存储空间。而如上文提到的,HBase为null的Column不会被存储,这样既节省了空间又提高了读性能。

  3. 多版本数据

    根据Row key和Column key定位到的Value可以有任意数量的版本值,因此对于需要存储变动历史记录的数据,用HBase就非常方便了。对于某一值,业务上一般只需要最新的值,但有时可能需要查询到历史值。

  4. 超大数据量

    当数据量越来越大,RDBMS数据库撑不住了,就出现了读写分离策略,通过一个Master专门负责写操作,多个Slave负责读操作,服务器成本倍增。随着压力增加,Master撑不住了,这时就要分库了,把关联不大的数据分开部署,一些join查询不能用了,需要借助中间层。随着数据量的进一步增加,一个表的记录越来越大,查询就变得很慢,于是又得搞分表,比如按ID取模分成多个表以减少单个表的记录数。经历过这些事的人都知道过程是多么的折腾。采用HBase就简单了,只需要加机器即可,HBase会自动水平切分扩展,跟Hadoop的无缝集成保障了其数据可靠性(HDFS)和海量数据分析的高性能(MapReduce)

二. HTable一些基本概念

  1. Row key

    行主键, HBase不支持条件查询和Order by等查询,读取记录只能按Row key(及其range)或全表扫描,因此Row key需要根据业务来设计以利用其存储排序特性(Table按Row key字典序排序如1,10,100,11,2)提高性能。

  2. Column Family(列族)

    在表创建时声明,每个Column Family为一个存储单元。

  3. Column(列)

    HBase的每个列都属于一个列族,以列族名为前缀,如列article:title和article:content属于article列族,author:name和author:nickname属于author列族。

    Column不用创建表时定义即可以动态新增,同一Column Family的Columns会群聚在一个存储单元上,并依Column key排序,因此设计时应将具有相同I/O特性的Column设计在一个Column Family上以提高性能。

  4. Timestamp

    HBase通过row和column确定一份数据,这份数据的值可能有多个版本,不同版本的值按照时间倒序排序,即最新的数据排在最前面,查询时默认返回最新版本。Timestamp默认为系统当前时间(精确到毫秒),也可以在写入数据时指定该值。

  5. Value

    每个值通过4个键唯一索引,tableName+RowKey+ColumnKey+Timestamp=>value

  6. 存储类型

    • TableName 是字符串
    • RowKey 和 ColumnName 是二进制值(Java 类型 byte[])
    • Timestamp 是一个 64 位整数(Java 类型 long)
    • value 是一个字节数组(Java类型 byte[])。

将HTable的存储结构理解为

hbase-data

即HTable按Row key自动排序,每个Row包含任意数量个Columns,Columns之间按Column key自动排序,每个Column包含任意数量个Values。理解该存储结构将有助于查询结果的迭代。

三. 模式设计应遵循的原则

  1. 列族的数量以及列族的势

    列族的数量越少越好,牵扯到了hbase的flushing;同一个表中不同列族所存储的记录数量的差别也需要考虑(列族的势),会造成记录数量少的列族的数据分散在多个region上,影响查询效率。

  2. 行键的设计

    避免使用时序或者单调(递增/递减)行键,否则会导致连续到来的数据会被分配到统一region中。

  3. 尽量最小化行键和列族的大小

    避免hbase的索引过大,加重系统存储的负担

  4. 版本的数量

    HColumnDescriptor设置版本的数量,避免设置过大,版本保留过多。

Kafka学习笔记 - Consumer开发的一些关键点

Kafka的consumer是以pull的形式获取消息数据的。不同于队列和发布-订阅模式,kafka采用了consumer group的模式。通常的,一般采用一个consumer中的一个group对应一个业务,配合多个producer提供数据。

pic

一. 消费过的数据无法再次消费

在user level上,一旦消费过topic里的数据,那么就无法再次用同一个groupid消费同一组数据。如果想要再次消费数据,要么换另一个groupid,要么使用镜像:

pic

此外,low level的api提供了一些机制去设置partion和offset。

二. offset管理

kafka会记录offset到zk中。但是,zk client api对zk的频繁写入是一个低效的操作。0.8.2 kafka引入了native offset storage,将offset管理从zk移出,并且可以做到水平扩展。其原理就是利用了kafka的compacted topic,offset以consumer group,topic与partion的组合作为key直接提交到compacted topic中。同时Kafka又在内存中维护了的三元组来维护最新的offset信息,consumer来取最新offset信息的时候直接内存里拿即可。当然,kafka允许你快速的checkpoint最新的offset信息到磁盘上。

三. stream

This API is centered around iterators, implemented by the KafkaStream class. Each KafkaStream represents the stream of messages from one or more partitions on one or more servers. Each stream is used for single threaded processing, so the client can provide the number of desired streams in the create call. Thus a stream may represent the merging of multiple server partitions (to correspond to the number of processing threads), but each partition only goes to one stream.

根据官方文档所说,stream即指的是来自一个或多个服务器上的一个或者多个partition的消息。每一个stream都对应一个单线程处理。因此,client能够设置满足自己需求的stream数目。总之,一个stream也许代表了多个服务器partion的消息的聚合,但是每一个partition都只能到一个stream。

四. consumer和partition

  1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
  2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
  3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
  4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
  5. High-level接口中获取不到数据的时候是会block的

负载低的情况下可以每个线程消费多个partition。但负载高的情况下,Consumer 线程数最好和Partition数量保持一致。如果还是消费不过来,应该再开 Consumer 进程,进程内线程数同样和分区数一致。(多谢 @shadyxu 指出)

五. high-level的consumer工具

  1. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker –group pv

    可以看到当前group offset的状况。

  2. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits

     3个参数, 
     [earliest | latest],表示将offset置到哪里 
     consumer.properties ,这里是配置文件的路径 
     topic,topic名,这里是page_visits
    

六. SimpleConsumer

kafka的low-level接口,使用场景:

  1. 读取一个消息多次。
  2. 在一个进程中仅仅消费某一个topic中几个partition的数据.
  3. 管理事务以确保一个消息处理且仅仅被处理一次。

用这个接口需要注意一下几点:

  1. 在应用中必须跟踪记录offset以确保能够确定上次消费到的位置。
  2. 必须设置哪一个broker是要操作的topic和partition的leader。
  3. 必须自己控制broker的leader的改变。

使用步骤:

  1. 找出一个active状态的broker并且找出哪一个broker是那些topic和partition的leader,必须知道读哪个topic的哪个partition。
  2. 找到负责该partition的broker leader,从而找到存有该partition副本的那个broker。
  3. 自己去写request并fetch数据。
  4. 获取数据。
  5. 需要识别和处理broker leader的改变。

Kafka学习笔记 - 使用与配置

目录

本文一、二部分内容主要来自官方文档。

一. 使用

  1. 下载代码

    https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz

     tar -xzf kafka_2.10-0.8.2.0.tgz
     cd kafka_2.10-0.8.2.0
    
  2. 启动服务器

    kafka依赖zookeeper,所以需要首先安装并启动zookeeper。可以使用kafka自带的zookeeper。

     bin/zookeeper-server-start.sh config/zookeeper.properties
    

    然后即可启动kafka

     bin/kafka-server-start.sh config/server.properties
    
  3. 创建topic

    消息传输需要指定topic。所以首先要创建一个topic。

     bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    

    之后,可以看到已经创建的topic.其中的replication-factor指的是复制因子,即log冗余的份数,这里的数字不能大于broker的数量。

     bin/kafka-topics.sh --list --zookeeper localhost:2181
    

    也可以不用手动创建topic,只需要配置broker的时候设置为auto-create topic when a non-existent topic is published to.

  4. 发送消息

    kafka提供了一个命令行客户端,可以从一个文件或者标准输入里读取并发送到kafka集群。默认的,每一行都作为一个单独的消息。

     bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
    

    在命令行输入消息并回车即可发送消息。

  5. 启动一个消费者

    kafka也提供了一个命令行消费者,接受消息并打印到标准输出。

     bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
    
  6. 设置多broker集群

    首先需要为每一个broker创建一个配置文件。

     cp config/server.properties config/server-1.properties 
     cp config/server.properties config/server-2.properties
    
     config/server-1.properties:
         broker.id=1
         port=9093
         log.dirs=/tmp/kafka-logs-1
    
     config/server-2.properties:
         broker.id=2
         port=9094
         log.dirs=/tmp/kafka-logs-2  
    

    然后启动这两个结点:

     bin/kafka-server-start.sh config/server-1.properties &
     bin/kafka-server-start.sh config/server-2.properties &
    

    现在一共有了三个结点,三个broker,那么这样就可以形成一个集群。

    创建一个复制引子为3的topic

     bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    

    如果想查看目前这个topic的partion在broker上的分布情况

     bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    

二. 关键配置

2.1 broker

  • broker.id: broker的唯一标识符,集群环境该值不可重复。
  • log.dirs: 一个用逗号分隔的目录列表,可以有多个,用来为Kafka存储数据。每当需要为一个新的partition分配一个目录时,会选择当前的存储partition最少的目录来存储。
  • zookeeper.connect:zookeeper访问地址,多个地址用’,’隔开
  • message.max.bytes: server能接收的一条消息的最大的大小。这个属性跟consumer使用的最大fetch大小是一致的,这很重要,否则一个不守规矩的producer会发送一个太大的消息。默认值:1000000。

2.2 producer

  • metadata.broker.list: kafka的broker列表,格式为host1:port1,host2:port2
  • request.required.acks:用来控制一个produce请求怎样才能算完成,准确的说,是有多少broker必须已经提交数据到log文件,并向leader发送ack,可以设置如下的值:
    • 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
    • 1,意味着在leader replication已经接收到数据后,producer会得到一个ack。这个选项提供了更好的持久性。
    • -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。这个选项提供了最好的持久性,只要还有一个replication存活,那么数据就不会丢失。
  • producer.type:决定消息是否应在一个后台线程异步发送。async表示异步发送;sync表示同步发送。设置为async则允许批量发送请求,这回带来更高的吞吐量,但是client的机器挂了的话会丢失还没有发送的数据。
  • serializer.class: 消息的序列化使用的class,如kafka.serializer.StringEncoder

更多细节参见kafka.consumer.ProducerConfig类。

2.3 consumer

  • group.id: 唯一的指明了consumer的group的名字,group名一样的进程属于同一个consumer group。
  • zookeeper.connect: 通broker的配置
  • consumer.id:consumer的唯一标识符,如果没有设置的话则自动生成。
  • fetch.message.max.bytes:每一个获取某个topic的某个partition的请求,得到最大的字节数,每一个partition的要被读取的数据会加载入内存,所以这可以帮助控制consumer使用的内存。这个值的设置不能小于在server端设置的最大消息的字节数,否则producer可能会发送大于consumer可以获取的字节数限制的消息。默认值:1024 * 1024。
  • fetch.min.bytes:一个fetch请求最少要返回多少字节的数据,如果数据量比这个配置少,则会等待,直到有足够的数据为止。默认值:1。
  • fetch.wait.max.ms:在server回应fetch请求前,如果消息不足,就是说小于fetch.min.bytes时,server最多阻塞的时间。如果超时,消息将立即发送给consumer。默认值:100。
  • socket.receive.buffer.bytes: socket的receiver buffer的字节大小。默认值:64 * 1024。

更多细节参见kafka.consumer.ConsumerConfig类。

三. Storm-kafka使用

Kafka很多使用场景是输出消息到Storm的,Storm本身也提供了storm-kafka的包,在使用Storm的KafkaSpout时需要注意以下几点:

  • 在采用基于SimpleConsumer的消费端实现时,我们遇到过一个情况是大量的轮询导致整个环境网络的流量异常,原因是该topic一直没有新消息,consumer端的轮询没有设置等待参数,也没有在client线程里判断进行一个短暂的sleep。几乎是以死循环的方式不断跟server端通讯,尽管每次的数据包很小,但只要有几个这样的消费端足以引起网络流量的异常。这里需要设置maxWait参数,但是此参数必须与minBytes配合使用才有效。但是在storm-kafka的KafkaUtils中的fetchMessages方法中对minBytes没有设置,因此即使设置了maxWait也没有效果。这里需要自己重写KafkaUtils来解决。

      FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).                    clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(1).build(); // 此处是修复了原来代码里没有设置minBytes
    
  • 修复了上述问题后,后来还是遇到网络流量异常的情况,后来在追踪KafkaSpout源码的过程中,发现当kafka中的消息过大时,如果不设置合适的bufferSizeBytes以及fetchSizeBytes(至少要大于kafka中最大消息的大小),那么很容易造成客户端由于bufferSizeBytes或者fetchSize设置过小,无法将消息放入buffer中也不能成功fetch而不停地去轮询服务端,从而导致网络流量异常。

Kafka学习笔记 - 介绍

什么是kafka

最近公司需要上基于nginx log的数据统计系统。其中一个重要的结点即分布式日志收集。在调研了多种方案之后,最终确定了flume+kafka+storm+hbase的系统架构。其中kafka则是linkedin一个专门为日志而产生的service。官方文档上如是说:Kafka是一个分布式、分区、冗余的commit日志service。它提供了一种特殊设计的消息系统功能。

关于SpringMVC中model的attribute无法指定别名的解决方案

最近由于项目需要,发现spring mvc在绑定参数时有这么一个缺陷。

Url: http://localhost:8080/api/test?user_name=testUser

Controller:

@Controller
@RequestMapping("/api")
public class ApiController extends BaseController {

    @RequestMapping(value = "/test", headers = "Accept=application/json")
    public void authUser(ModelMap modelMap, Account acc) {
        ResultPack.packOk(modelMap);
    }
}

public class Account{
    private static final long serialVersionUID = 750752375611621980L;

    private long id;
    private String userName;
    private String password;
    private AccountType type = AccountType.ADMIN;
    private long timeTag;
    private int status = 1;
    ...
    ...
}

user_name无法映射到acc的userName上。如果使用json的方式,可以使用JsonProperty注解来解决。否则,spring貌似没提供解决方案。

于是追踪了一下spring mvc的源代码,发现可以通过重写ServletModelAttributeMethodProcessor来支持这个功能。

Github: https://github.com/superhj1987/spring-mvc-model-attribute-alias

工作总结@2014

突然发觉已经是2015年的1月15号了,即兴补上一篇2014年的总结吧。

对自己来说,今年最大的事情莫过于离开一座城市,到达另一座城市,开始了新的职业生涯。

进入新的公司,一个创业公司,截然不同的运作方式让我一开始有点措手不及。相比之前在大公司,小公司更需要一个人的快速成长以及自我约束,以及那种随叫随到、不怕脏累的奋斗精神。而技术层面,要尽最大化压榨硬件资源,用有限的硬件资源达到最大的性能。这些都让自己的架构方式和代码编写不得不去改变、去适应,这也算是一种成长吧。公司的基础架构、公共组件、项目管理、技术体系、项目架构都是一个初级的水平,改变这些是一个很难很长的路,但又不得不做。到现在,在做这些改变的过程中,自己的基础知识得到了巩固、架构能力也有了一定的提升,技术视野也开阔了一些。熟悉了公司的流程和整体的氛围,也算融入了这个团队,要做的还有很多,阻力也有很多。一切都在逼迫自己去学习、去思考、去提高。这也是与以前相比,给自己最大动力的事情。

2015年,工作上希望自己能做到这些

  • 合理设计并实现整个公司的基础架构
  • 构建合理的项目管理流程、监督机制
  • 提升团队的整体水平
  • 保证产品的研发进度以及线上稳定性
  • 招一些优秀的人加入

自身方面,希望能做到这些:

  • 提升自身的技术水平和视野
  • 深入学习一门技术:docker netty kafka rabbitmq elasticsearch solr
  • 阅读至少五本非技术书籍

Stay hungry,stay foolish!

SpringMVC的controller传递HttpServletResponse参数的那点事

    @RequestMapping(value = "cardDown", method = RequestMethod.GET, headers = "Accept=text/html")
    public void cardDown(ModelMap modelMap, HttpServletRequest request, HttpServletResponse response, String id, int status){
    ......
    }

之前在使用Spring mvc的时候发现这么一回事:在spring mvc的controller的参数里如果有HttpServletResponse(类似上面的代码),那么必须有返回值框架才会去在执行完handler后去搜索相应的viewResolver和view从而展现数据。如果没有返回值,那么默认就是返回null的。我初步推测框架的处理过程大致如此:如果controller参数里传递HttpServletResposne的话,框架就认为视图由handler自己生成可以不参于这个过程,但是如果handler有返回值的话,那么仍然认为还需要参与到视图生成的过程中。

ShellShock这点事

前言

在微博上看到最近安全界爆出了一个危害比之前的“心脏流血”(Heartbleed Bug)还要大很多的Bash代码注入漏洞:CVE-2014-6271 “shellshock”漏洞,然后随之而来一系列相关漏洞。详情可以看这些链接:CVE-2014-6271CVE-2014-7169CVE-2014-7186CVE-2014-7187CVE-2014-6277。世界上Linux服务器的占有份额是很大的,而bash又是Linux不可或缺的一个部分。可想而知,这个漏洞的破坏力有多大。这个从名字上就可以看出来,ShellShock是医学上的一种严重的疾病,中文叫做“弹震症”,指的是受到爆炸冲击后导致浑身颤抖、思维混乱等症状。这个命名很形象地反映了问题的严重性。

Nginx源码分析之启动过程

nginx的启动过程代码主要分布在src/core以及src/os/unix目录下。启动流程的函数调用序列:main(src/core/nginx.c)→ngx_init_cycle(src/core/ngx_cycle.c)→ngx_master_process_cycle(src/os/)。nginx的启动过程就是围绕着这三个函数进行的。

main函数的处理过程总体上可以概括如下:

Nginx负载均衡

目录

本文最新更新于2016.11.01

一 特点

1.1 应用情况

Nginx做为一个强大的Web服务器软件,具有高性能、高并发性和低内存占用的特点。此外,其也能够提供强大的反向代理功能。俄罗斯大约有超过20%的虚拟主机采用Nginx作为反向代理服务器,在国内也有腾讯、新浪、网易等多家网站在使用Nginx作为反向代理服务器。据Netcraft统计,世界上最繁忙的网站中有11.48%使用Nginx作为其服务器或者代理服务器。基于反向代理的功能,Nginx作为负载均衡主要有以下几点理由:

  1. 高并发连接
  2. 内存消耗少
  3. 配置文件非常简单
  4. 成本低廉
  5. 支持Rewrite重写规则
  6. 内置的健康检查功能
  7. 节省带宽
  8. 稳定性高