1.
Êý¾ÝÄ£ÐÍ
Schema
Pig Latin±í´ïʽ²Ù×÷µÄÊÇrelation£¬FILTER¡¢FOREACH¡¢GROUP¡¢SPLITµÈ¹ØÏµ²Ù×÷·ûËù²Ù×÷µÄrelation¾ÍÊÇbag£¬bagΪtupleµÄ¼¯ºÏ£¬tupleΪÓÐÐòµÄfieldÁÐ±í¼¯ºÏ£¬¶øfield±íʾÊý¾Ý¿é£¨A
field is a piece of data£©£¬¿ÉÀí½âΪÊý¾Ý×ֶΡ£
SchemaΪÊý¾ÝËù×ñ´ÓµÄÀàÐ͸ñʽ£¬°üÀ¨£ºfieldµÄÃû³Æ¼°ÀàÐÍ£¨names
and types£©¡£Óû§³£ÓÃasÓï¾äÀ´×Ô¶¨Òåschema£¬»òÊÇloadº¯Êýµ¼Èëschema£¬±ÈÈ磺
A = foreach X generate .. as field1:chararray, .. as field2:bag{}; A = load '..' using PigStorage('\t', '-schema'); A = load '..' using org.apache.pig.piggybank.storage.avro.AvroStorage(); |
Èô²»Ö¸¶¨fieldµÄÀàÐÍ£¬ÔòÆäĬÈÏΪbytearray¡£¶Ôδ֪schema½øÐвÙ×÷ʱ£¬ÓУº
Èôjoin/cogroup/cross¶à¹ØÏµ²Ù×÷Óöµ½Î´Öªschema£¬Ôò»á½«ÆäÊÓΪnull
schema£¬µ¼Ö·µ»Ø½á¹ûµÄschemaҲΪnull£»
ÈôflattenÒ»¸öempty inner schemaµÄbag£¨¼´:bag{}£©Ê±£¬Ôò·µ»Ø½á¹ûµÄschemaΪnull£»
Èôunionʱ¶þÕßrelationµÄschema²»Ò»Ö£¬Ôò·µ»Ø½á¹ûµÄschemaΪnull£»
ÈôfieldµÄschemaΪnull£¬»á½«¸Ã×Ö¶ÎÊÓΪbytearray¡£
ΪÁ˱£Ö¤pig½Å±¾ÔËÐеÄÓÐЧÐÔ£¬ÔÚдUDFʱҪÔÚoutputSchema·½·¨ÖÐÖ¸¶¨·µ»Ø½á¹ûµÄschema¡£
Êý¾ÝÀàÐÍ
PigµÄ»ù±¾Êý¾ÝÀàÐÍÓë¶ÔÓ¦µÄJavaÀࣺ

¸´ÔÓÊý¾ÝÀàÐͼ°Æä¶ÔÓ¦µÄJavaÀࣺ

PigµÄ¸´ÔÓÊý¾ÝÀàÐÍ¿ÉÒÔǶÌ×±í´ï£¬±ÈÈ磺tupleÖÐÓÐtuple (a, (b, c, d))£¬tupleÖÐÓÐbag
(a, {(b,c), (d,e)})µÈµÈ¡£µ«ÊÇÒ»¶¨Òª×ñ´ÓÊý¾ÝÀàÐͱ¾ÉíµÄ¶¨Ò壬±ÈÈ磺bagÖÐÖ»ÄÜÊÇtupleµÄ¼¯ºÏ£¬±ÈÈç{a,
{(b),(c)}}¾ÍÊDz»ºÏ·¨µÄ¡£
Pig»¹ÓÐÒ»ÖÖÌØÊâµÄÊý¾ÝÀàÐÍ£ºnull£¬ÓëJava¡¢CÖÐnull²»Ò»Ñù£¬Æä±íʾ²»ÖªµÀµÄ»ò²»´æÔÚµÄÊý¾ÝÀàÐÍ£¨unknown
or non-existent£©¡£±ÈÈ磬ÔÚloadÊý¾Ýʱ£¬Èç¹ûÓеÄÊý¾ÝÐÐ×ֶβ»·ûºÏ¶¨ÒåµÄschema£¬Ôò¸Ã×ֶλᱻÖÃΪnull¡£
2. Ô´Âë·ÖÎö
ÒÔÏÂÔ´Âë·ÖÎö²ÉÓõÄÊÇ0.12°æ±¾¡£
Tuple
ÔÚKEYSETÔ´ÂëÖУ¬´´½¨Tuple¶ÔÏó²ÉÓù¤³§+µ¥ÀýÉè¼ÆÄ£Ê½£º
private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance(); Tuple t = TUPLE_FACTORY.newTuple(s); |
ÊÂʵÉÏ£¬TupleFactoryÊǸö³éÏóÀ࣬ʵÏÖ½Ó¿ÚTupleMaker<Tuple>¡£ÔÚ·½·¨TupleFactory.getInstance()ÖУ¬Ä¬ÈÏÇé¿öÏ·µ»ØµÄÊÇBinSedesTupleFactory¶ÔÏó£¬Í¬Ê±Ö§³Ö¼ÓÔØÓû§ÖØÐ´µÄTupleFactoryÀࣨpig.data.tuple.factory.nameÖ¸¶¨ÀàÃû¡¢
pig.data.tuple.factory.jarÖ¸¶¨ÀàËùÔÚµÄjar£©¡£BinSedesTupleFactory¼Ì³ÐÓÚTupleFactory£º

ÔÚBinSedesTupleFactoryµÄnewTuple·½·¨ÖУ¬·µ»ØµÄÊÇBinSedesTuple¶ÔÏó¡£BinSedesTupleÀà¼Ì³ÐÓÚDefaultTupleÀ࣬ÔÚDefaultTupleÀàÖÐÓÐList<Object>
mFields×ֶΣ¬Õâ±ãÊÇ´æ´¢TupleÊý¾ÝµÄµØ·½ÁË£¬mFieldsËù³ÖÓÐÀàÐÍΪArrayList<Object>()£»¡£Ààͼ¹ØÏµ£º

Bag
´´½¨Bag¶ÔÏóÓÐÏÂÃæ¼¸ÖÖ·½·¨£º
// factory BagFactory mBagFactory = BagFactory.getInstance(); DataBag output = mBagFactory.newDefaultBag();
// if you know upfront how many tuples you are
going to put in this bag.
DataBag bag = new NonSpillableDataBag(m.size()); |
ÓëTupleFactoryÒ»Ñù£¬BagFactoryÒ²ÊdzéÏóÀ࣬Ҳ֧³ÖÓû§×Ô¶¨ÒåÖØÐ´£»getInstance·½·¨Ä¬ÈÏ·µ»ØµÄÊÇDefaultBagFactory
DefaultBagFactoryÓÐnewDefaultBag¡¢newSortedBag¡¢newDistinctBag·½·¨·Ö±ð´´½¨ÈýÀàbag£º
default bagÖеÄtupleûÓÐÅÅÐò£¬Ò²Ã»ÓÐÈ¥ÖØ£»
sorted bagÖеÄtupleÊǰ´Ðò´æ·Å£¬Ë³ÐòÊÇÓÉtuple default
comparator»òbag´´½¨Ê±µÄcomparatorËù¶¨ÒåµÄ£»
distinct bag¹ËÃû˼Ò壬tupleÓÐÈ¥ÖØ¡£
ÈýÀàbagµÄ¹¹ÔìÆ÷ÈçÏ£º
public DefaultDataBag() { mContents = new ArrayList<Tuple>(); }
public SortedDataBag(Comparator<Tuple>
comp) {
mComp = (comp == null) ? new DefaultComparator()
: comp;
mContents = new ArrayList<Tuple>();
}
public DistinctDataBag() {
mContents = new HashSet<Tuple>();
} |
BagFactoryµÄÀàͼ£º

DefaultAbstractBag×÷ΪÈýÖÖÀàÐÍbagµÄ»ùÀ࣬ÓÐÒ»¸ö×Ö¶ÎmContentsÓÃÓÚ´æ·Åtuple£¬NonSpillableDataBagÖ±½ÓʵÏÖDataBag½Ó¿Ú¡£DataBagµÄÀàͼ£º

3. ʵս
ÏÖÓÐavroÈÕÖ¾Êý¾Ý£¨¼ûǰһƪ£©£¬Æä×Ö¶Î:
1.dvc±íʾÓû§ÊÖ»ú±êʶ£»
2.appUseÓëappInstallͬΪavro MapÀàÐÍ£¬ÆäkeyΪappÃû³Æ£¨app
name£©£¬valueΪMap<String, Object>£¬°üº¬ÁËÒ»¸ö±íʾʹÓÃʱ¼äµÄ×Ö¶Îtimelist£¨ÀàÐÍΪArrayList£©£»¾ßÌå¸ñʽÈçÏÂ
'dvc': 'imei_123', 'appUse': { 'app name1': { ... 'timelist': [...] }, 'app name2': { ... 'timelist': [...] }, ... }£¬ 'appInstall': { 'app name1': { ... 'timelist': [...] }, ... } |
ÏÖÔÚ£¬ÏëÒªµÃµ½Ã¿¸öÓû§µÄappÁÐ±í¼°appµÄ´ò¿ª´ÎÊý£¬ÒÔ¸ñʽdvc, {(app)},
{(app, frequency)}Êä³ö£¬¼´Óû§ + appÁбí + ʹÓôÎÊýÀà±í¡£Èç¹ûÓÃMapRduce×ö£¬µÃ·ÖΪÒÔϲ½Ö裺
1.ÒÔ(dvc, app)ΪkeyÖµ£¬¼ÆËãvalueֵΪʹÓôÎÊý£»
2.ÒÔdvcΪkeyÖµ£¬ºÏ²¢Í¬Ò»Óû§µÄ²»Í¬app£¬valueֵΪ(app,
fre)£»
3.ÒÔdvcΪkeyÖµ£¬¼ÆËãappinstallµÄappÁÐ±í£»
4.½«²½Öè2µÃµ½µÄÊý¾ÝÓë²½Öè3µÃµ½µÄÊý¾Ý×öjoin£¬È»ºóÊä³ö¡£
¿ÉÒÔ¿´³öÓÃMapReduceÂÔÏÔ·±¸´£¬ÈçºÎÀ´ÓÃpigÀ´ÊµÏÖÄØ£¿ÎÒÃÇ¿ÉÒÔ¶ÔappUse:map[]±àдEVAL
UDF£¬ÈÃÆä·µ»Ø(appÃû³Æ, timelistµÄ³¤¶È) £º
public class AppTimelist extends EvalFunc<DataBag>{ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance(); private static final BagFactory BAG_FACTORY = BagFactory.getInstance(); @SuppressWarnings({ "unchecked" }) @Override public DataBag exec(Tuple input) throws IOException { Map<String, Map<String, Object>> m = (Map<String, Map<String, Object>>) input.get(0); List<Object> result = new ArrayList<Object>(); DataBag output = BAG_FACTORY.newDefaultBag(); if(m == null) return null; for(Map.Entry<String, Map<String, Object>> e: m.entrySet()) { result.clear(); String app = e.getKey(); long size = ((DataBag) e.getValue().get("timelist")).size(); result.add(app); result.add(size); output.add(TUPLE_FACTORY.newTuple(result)); } return output; } } |
pig½«JavaµÄArrayListת³ÉDataBagµÄÀàÐÍ£¬ËùÒÔÒª¶Ôtimelist½øÐÐǿת²Ù×÷¡£
¶ÔappInstall:map[]±àдEVAL UDF£¬·µ»Ø(appList)£º
public class DistinctBag extends EvalFunc<DataBag> { BagFactory mBagFactory = BagFactory.getInstance(); @Override public DataBag exec(Tuple input) throws IOException { if(input == null || input.size() == 0) { return null; } DataBag in = (DataBag) input.get(0); DataBag out = mBagFactory.newDistinctBag(); if(in == null) { return null; } for(Tuple tp: in) { DataBag applist = (DataBag) tp.get(0); for(Tuple app: applist) out.add(app); } return out; } } |
ÉÏÃæÌáµ½¹ý£¬ÈôûÓиøEVAL UDFÖ¸¶¨·µ»ØÖµµÄschema£¬Ôò·µ»Ø½á¹ûµÄschemaΪnull£¬Èç´Ë»áÔì³ÉÀàÐ͵ĶªÊ§£¬ÔÚºóÃæµÄ²Ù×÷ÖÐÈÝÒ×±¨NullPointerException¡£
// AppTimelist.java @Override public Schema outputSchema(Schema input) { try { Schema tupleSchema = new Schema(); FieldSchema chararrayFieldSchema = new Schema.FieldSchema(null, DataType.CHARARRAY); FieldSchema longFieldSchema = new Schema.FieldSchema(null, DataType.LONG); tupleSchema.add(chararrayFieldSchema); tupleSchema.add(longFieldSchema); return new Schema(new Schema.FieldSchema(getSchemaName(this .getClass().getName().toLowerCase(), input), tupleSchema, DataType.TUPLE)); } catch (Exception e) { return null; } }
// DistinctBag.java
@Override
public Schema outputSchema(Schema input) {
FieldSchema innerFieldSchema = new Schema.FieldSchema(null,
DataType.CHARARRAY);
Schema innerSchema = new Schema(innerFieldSchema);
Schema bagSchema = null;
try {
bagSchema = new Schema(new FieldSchema(null, innerSchema,
DataType.BAG));
} catch(FrontendException e) {
throw new RuntimeException(e);
}
return bagSchema;
} |
ͳ¼ÆappÁÐ±í£º
define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage; define DistinctBag com.pig.udf.bag.DistinctBag; A = load '..' using AvroStorage(); B = foreach A generate value.fields.data#'dvc' as dvc:chararray, value.fields.data#'appInstall' as ins:map[map[]]; C = foreach B generate dvc, KEYSET(ins) as applist; D = group C by dvc; -- extract applist from grouped D E = foreach D { projected = foreach $1 generate applist; generate group as dvc, projected as grouped; } F = foreach E generate dvc, DistinctBag(grouped) as applist; store F into '..' using AvroStorage(); |
ͳ¼ÆappʹÓÃʱ³¤£º
define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage; define AppTimelist com.pig.udf.map.AppTimelist; A = load '..' using AvroStorage(); B = foreach A generate value.fields.data#'dvc' as dvc:chararray, value.fields.data#'appUse' as use:map[map[]]; C = foreach B generate dvc, flatten(AppTimelist(use)) as (app, fre); D = group C by (dvc, app); E = foreach D generate flatten(group) as (dvc, app), SUM($1.fre) as fre; F = group E by dvc; G = foreach F { projected = foreach $1 generate app, fre; generate group as dvc, projected as appfre; } store G into '..' using AvroStorage(); |
¶þÕß×öjoin¼´¿ÉµÃµ½½á¹û¡£
|