Hadoop实战:SQL语句转换为hadoop程序

November 17, 2013 posted in [other]

引入

大数据很火,很多人都想插一杠子,很多人都买了《Hadoop权威指南》,都看了《Hadoop in Action》,但可能还是没有一个有血有肉的认识,因为实践才能出真知。正好最近参加了一个大数据的比赛,虽然最后可能方法有些问题,但是做完这个题目,对大数据确实有了一个更好的认识。就像提交完成后才老师说的:大数据不过如此。 比赛题目在这里: http://note.youdao.com/share/?id=4716c99c69215bd79da3cb74bf5d1584&type=note

题目分析

题目给出的是一个SQL语句,语义上共分为3不

题目就是要求我们尽可能快的用大数据的方式进行查询得到结果。我们讨论认为Hbase、Hive或者是impala都是通用性工具,运行效果肯定比不上我们自定义的Hadoop程序,所以我们尝试不使用这些工具,直接用 hdfs和hadoop实现(结果表明这样可能效果并不是很好,这个最后再讨论)。

代码思想

因为分组标准聚合的标准不同,所以一般认为不能通过一遍扫描统计两个表的字段,但是考虑LAC的取值只能是intFirstLac和intEndLac,CI的取值只能是intFirstCi和intEndCi,所以前两次查询的分组标准有很大联系,在更新的时候也是直接通过两者是否相等来统计的,所以我们总结出一遍扫描来统计所有字段的方法。 对于一条记录,有两种情况:

  1. LAC=intFirstLac,CI=intFirstCi。这时前两个聚合统计的分组标准一致,所以可以直接统计所有相关字段
  2. LAC=intEndLac,CI=intEndCi。此时,两次聚合统计的标准不一致,所以应该输出两条结果记录,一条对应于第一个SQL语句,第二个SQL中的字段为0,一条对应于第二个SQL语句,第一个SQL中的字段为0

因此,对于每一行数据(包含95个字段),map输出两个,第一个对应于第一个sql语句,第二个对应于第二个sql语句,当这两个值的key相同时可以进行合并。最后在combine和reduce的时候累加每个key对应的value字段并得到了最后的统计结果。

代码实现

因为人比较多,所以各自专注于不同的功能,包括 Java版本、Python版本、Awk程序用于结果分析。这里只分析 Java版本的实现。

key

Key由LAC和CI作为属性,这里定义了 readFieldswrite方法用于序列化,compareTo用于比较,定义 equals/hashCode 保证在比较相等的时候有正确的语义。最后,定义了rawComparator,当比较 key 的时候就可以直接从字符数组比较,而不需要首先解析出对象再比较

public class Key implements WritableComparable<Key>{
    int lac, ci;
    public Key(){}
    public Key(int lac, int ci) {
        //...
    }

    public void readFields(DataInput in) throws IOException {
        //...
    }

    public void write(DataOutput out) throws IOException {
        //...
    }

    public int compareTo(Key k) {
        //...
    }

    @Override
    public boolean equals(Object obj){
        //...
    }

    public int hashCode(){
        //..
    }

    public String toString(){
        //...
    }

    public static class Comparator extends WritableComparator{

        public Comparator(){
            super(Key.class);
        }

        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            int lac1 = readInt(b1, s1);
            int lac2 = readInt(b2, s2);
            if(lac1 != lac2) return lac1 - lac2;
            int ci1 = readInt(b1, s1 + 4);
            int ci2 = readInt(b2, s2 + 4);
            return ci1 - ci2;
        }
    }

    static{
        WritableComparator.define(Key.class, new Comparator());
    }
}

Value

Value和Key类似,不过Value有重载了toString方法,用于最后reduce输出的时候调用。所以关于比例的计算可以写在这里。

Maper

在 hadoop2.x 中counter可以这么使用 context.getCounter("user_defined", "ok1").increment(1);

public class MyMapper extends Mapper<LongWritable, Text, Key, Value> {
    public MyMapper() {
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // split the fields
        // parse the fields and set to attribute
        // caculate lac and ci
        // if satisfy condition 1, set value1's field to 1
        // if satisfy condition 2, set value2's filed to 1
        // if key1 == key2, merge value1 and value2
        // context.write value1/value2
    }
}

Conbiner/Reducer

需要注意的一点是 Writable是易变的。因为在Value字段中有一个set对象,在readFields时要特别注意,首先要清空这个set,因为Value会被重用。也就是说在reduce的Iterable<Value> vlist中每一个vlist中的值的对象是同一个,只是值在readFields时被改变。

protected void reduce(Key k, Iterable<Value> vlist, Context context){
    // accumulate and write
}