大数据很火,很多人都想插一杠子,很多人都买了《Hadoop权威指南》,都看了《Hadoop in Action》,但可能还是没有一个有血有肉的认识,因为实践才能出真知。正好最近参加了一个大数据的比赛,虽然最后可能方法有些问题,但是做完这个题目,对大数据确实有了一个更好的认识。就像提交完成后才老师说的:大数据不过如此。 比赛题目在这里: http://note.youdao.com/share/?id=4716c99c69215bd79da3cb74bf5d1584&type=note
题目给出的是一个SQL语句,语义上共分为3不
题目就是要求我们尽可能快的用大数据的方式进行查询得到结果。我们讨论认为Hbase、Hive或者是impala都是通用性工具,运行效果肯定比不上我们自定义的Hadoop程序,所以我们尝试不使用这些工具,直接用 hdfs和hadoop实现(结果表明这样可能效果并不是很好,这个最后再讨论)。
因为分组标准聚合的标准不同,所以一般认为不能通过一遍扫描统计两个表的字段,但是考虑LAC的取值只能是intFirstLac和intEndLac,CI的取值只能是intFirstCi和intEndCi,所以前两次查询的分组标准有很大联系,在更新的时候也是直接通过两者是否相等来统计的,所以我们总结出一遍扫描来统计所有字段的方法。 对于一条记录,有两种情况:
因此,对于每一行数据(包含95个字段),map输出两个
因为人比较多,所以各自专注于不同的功能,包括 Java版本、Python版本、Awk程序用于结果分析。这里只分析 Java版本的实现。
Key由LAC和CI作为属性,这里定义了 readFields
和write
方法用于序列化,compareTo
用于比较,定义 equals/hashCode
保证在比较相等的时候有正确的语义。最后,定义了rawComparator,当比较 key 的时候就可以直接从字符数组比较,而不需要首先解析出对象再比较
public class Key implements WritableComparable<Key>{
int lac, ci;
public Key(){}
public Key(int lac, int ci) {
//...
}
public void readFields(DataInput in) throws IOException {
//...
}
public void write(DataOutput out) throws IOException {
//...
}
public int compareTo(Key k) {
//...
}
@Override
public boolean equals(Object obj){
//...
}
public int hashCode(){
//..
}
public String toString(){
//...
}
public static class Comparator extends WritableComparator{
public Comparator(){
super(Key.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int lac1 = readInt(b1, s1);
int lac2 = readInt(b2, s2);
if(lac1 != lac2) return lac1 - lac2;
int ci1 = readInt(b1, s1 + 4);
int ci2 = readInt(b2, s2 + 4);
return ci1 - ci2;
}
}
static{
WritableComparator.define(Key.class, new Comparator());
}
}
Value和Key类似,不过Value有重载了toString方法,用于最后reduce输出的时候调用。所以关于比例的计算可以写在这里。
在 hadoop2.x 中counter可以这么使用 context.getCounter("user_defined", "ok1").increment(1);
public class MyMapper extends Mapper<LongWritable, Text, Key, Value> {
public MyMapper() {
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// split the fields
// parse the fields and set to attribute
// caculate lac and ci
// if satisfy condition 1, set value1's field to 1
// if satisfy condition 2, set value2's filed to 1
// if key1 == key2, merge value1 and value2
// context.write value1/value2
}
}
需要注意的一点是 Writable是易变的。因为在Value字段中有一个set对象,在readFields时要特别注意,首先要清空这个set,因为Value会被重用。也就是说在reduce的Iterable<Value> vlist
中每一个vlist中的值的对象是同一个,只是值在readFields时被改变。
protected void reduce(Key k, Iterable<Value> vlist, Context context){
// accumulate and write
}