博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Pig系统分析(8)-Pig可扩展性
阅读量:5031 次
发布时间:2019-06-12

本文共 5104 字,大约阅读时间需要 17 分钟。

本文是Pig系统分析系列中的最后一篇了,主要讨论怎样扩展Pig功能。不仅介绍Pig本身提供的UDFs扩展机制,还从架构上探讨Pig扩展可能性。

补充说明:前些天同事发现twitter推动的Pig On Spark项目:,准备研究下。

UDFs

通过UDFs(用户自己定义函数),能够自己定义数据处理方法,扩展Pig功能。实际上,UDFS除了使用之前须要register/define外。和内置函数没什么不同。

主要的EvalFunc

以内置的ABS函数为例:

public class ABS extends EvalFunc
{ /** * java level API * @param input expectsa single numeric value * @return output returns a single numeric value, absolute value of the argument */ public Double exec(Tuple input) throws IOException { if (input == null || input.size() == 0) return null; Double d; try{ d = DataType.toDouble(input.get(0)); } catch (NumberFormatException nfe){ System.err.println("Failed to process input; error -" + nfe.getMessage()); return null; } catch (Exception e){ throw new IOException("Caught exception processing input row", e); } return Math.abs(d); } …… public Schema outputSchema(Schema input) ; public List
getArgToFuncMapping() throws FrontendException; }
  1. 函数都继承EvalFunc接口,泛型參数Double代表返回类型。

  2. exec方法:输入參数类型为元组,代表一行记录。
  3. outputSchema方法:用于处理输入和输出Schema
  4. getArgToFuncMapping:用于支持各种数据类型重载。

聚合函数

EvalFuc方法也能实现聚合函数,这是由于group操作对每一个分组都返回一条记录,每组中包括一个Bag,所以exec方法中迭代处理Bag中记录就可以。

以Count函数为例:

public Long exec(Tuple input) throws IOException {    try {        DataBag bag = (DataBag)input.get(0);        if(bag==null)            return null;        Iterator it = bag.iterator();        long cnt = 0;        while (it.hasNext()){            Tuple t = (Tuple)it.next();            if (t != null && t.size() > 0 && t.get(0) != null )                cnt++;        }        return cnt;    } catch (ExecException ee) {        throw ee;    } catch (Exception e) {        int errCode = 2106;                       String msg = "Error while computing count in " + this.getClass().getSimpleName();        throw new ExecException(msg, errCode, PigException.BUG, e);    }}

Algebraic 和Accumulator 接口

如前所述,具备algebraic性质的聚合函数在Map-Reduce过程中能被Combiner优化。直观来理解,具备algebraic性质的函数处理过程能被分为三部分:initial(初始化,处理部分输入数据)、intermediate(中间过程,处理初始化过程的结果)和final(收尾,处理中间过程的结果)。

比方COUNT函数,初始化过程为count计数操作。中间过程和收尾为sum求和操作。更进一步。假设函数在这三个阶段中都能进行同样的操作,那么函数具备distributive性质。比方SUM函数。

Pig提供了Algebraic 接口:

public interface Algebraic{    /**     * Get the initial function.     * @return A function name of f_init. f_init shouldbe an eval func.     * The return type off_init.exec() has to be Tuple     */    public String getInitial();     /**     * Get the intermediatefunction.     * @return A function name of f_intermed. f_intermedshould be an eval func.     * The return type off_intermed.exec() has to be Tuple     */    public String getIntermed();     /**     * Get the final function.     * @return A function name of f_final. f_final shouldbe an eval func parametrized by     * the same datum as the evalfunc implementing this interface.     */    public String getFinal();}
当中每一个方法都返回EvalFunc实现类的名称。

继续以COUNT函数为例,COUNT实现了Algebraic接口。针对下面语句:

input= load 'data' as (x, y);grpd= group input by x;cnt= foreach grpd generate group, COUNT(input);storecnt into 'result';
Pig会重写MR运行计划:

Mapload,foreach(group,COUNT.Initial)Combineforeach(group,COUNT.Intermediate)Reduceforeach(group,COUNT.Final),store
Algebraic 接口通过Combiner优化降低传输数据量,而Accumulator接口则关注的是内存使用量。UDF实现Accumulator接口后,Pig保证全部key相同的数据(通过Shuffle)以增量的形式传递给UDF(默认pig.accumulative.batchsize=20000)。相同。COUNT也实现了Accumulator接口。

/* Accumulator interface implementation */    private long intermediateCount = 0L;    @Override    public void accumulate(Tuple b) throws IOException {       try {           DataBag bag = (DataBag)b.get(0);           Iterator it = bag.iterator();           while (it.hasNext()){                Tuple t = (Tuple)it.next();                if (t != null && t.size() > 0 && t.get(0) != null) {                    intermediateCount += 1;                }            }       } catch (ExecException ee) {           throw ee;       } catch (Exception e) {           int errCode = 2106;           String msg = "Error while computing min in " + this.getClass().getSimpleName();           throw new ExecException(msg, errCode, PigException.BUG, e);                 }    }     @Override    public void cleanup() {       intermediateCount = 0L;    }    @Override    /*    *当前key都被处理完之后被调用    */    public Long getValue() {       return intermediateCount;    }

前后端数据传递

通过UDFs构造函数传递数据是最简单的方法。然后通过define语句定义UDF实例时指定构造方法參数。但有些情况下。比方数据在执行期才产生,或者数据不能用String格式表达,这时候就得使用UDFContext了。

UDF通过getUDFContext方法获取保存在ThreadLoacl中的UDFContext实例。

UDFContext包括下面信息:

  1. jconf:Hadoop Configuration。

  2. clientSysProps:系统属性。
  3. HashMap<UDFContextKey,Properties> udfConfs:用户自己保存的属性,当中UDFContextKey由UDF类名生成。

UDFs运行流程

Pig架构可扩展性

Pig哲学之三——Pigs Live Anywhere。

理论上。Pig并不被限定执行在Hadoop框架上,有几个能够參考的实现和提议。

  1. Pigen。Pig on Tez。,架构图例如以下:
  2. Pig的后端抽象层:。

    眼下已经实现了PigLatin执行在Galago上。

參考资料

Pig官网:

Pig paper at SIGMOD 2008:Building a High Level DataflowSystem on top of MapReduce:The Pig Experience

Programming.Pig:Dataflow.Scripting.with.Hadoop(2011.9).Alan.Gates

 

转载于:https://www.cnblogs.com/wzzkaifa/p/6805110.html

你可能感兴趣的文章
百度编辑器图片在线流量返回url改动
查看>>
我对你的期望有点过了
查看>>
微信小程序wx:key以及wx:key=" *this"详解:
查看>>
下拉框比较符
查看>>
2.2.5 因子的使用
查看>>
css选择器
查看>>
photoplus
查看>>
Python 拓展之推导式
查看>>
[Leetcode] DP-- 474. Ones and Zeroes
查看>>
80X86寄存器详解<转载>
查看>>
c# aop讲解
查看>>
iterable与iterator
查看>>
返回顶部(动画)
查看>>
webpack+react+antd 单页面应用实例
查看>>
Confluence 6 SQL Server 数据库驱动修改
查看>>
Confluence 6 通过 SSL 或 HTTPS 运行 - 备注和问题解决
查看>>
【47.76%】【Round #380B】Spotlights
查看>>
Git(使用码云)
查看>>
分享Java web 开发必游之路
查看>>
IIS初始化(预加载),解决第一次访问慢,程序池被回收问题(转载)
查看>>