Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
¡¾PigÔ´Âë·ÖÎö¡¿Ì¸Ì¸PigµÄÊý¾ÝÄ£ÐÍ
 
×÷ÕߣºÖÜ Ã÷Ò« À´Ô´£ºIBM ·¢²¼ÓÚ 2016-2-26
  3756  次浏览      28
 

1. Êý¾ÝÄ£ÐÍ

Schema

Pig Latin±í´ïʽ²Ù×÷µÄÊÇrelation£¬FILTER¡¢FOREACH¡¢GROUP¡¢SPLITµÈ¹ØÏµ²Ù×÷·ûËù²Ù×÷µÄrelation¾ÍÊÇbag£¬bagΪtupleµÄ¼¯ºÏ£¬tupleΪÓÐÐòµÄfieldÁÐ±í¼¯ºÏ£¬¶øfield±íʾÊý¾Ý¿é£¨A field is a piece of data£©£¬¿ÉÀí½âΪÊý¾Ý×ֶΡ£

SchemaΪÊý¾ÝËù×ñ´ÓµÄÀàÐ͸ñʽ£¬°üÀ¨£ºfieldµÄÃû³Æ¼°ÀàÐÍ£¨names and types£©¡£Óû§³£ÓÃasÓï¾äÀ´×Ô¶¨Òåschema£¬»òÊÇloadº¯Êýµ¼Èëschema£¬±ÈÈ磺

A = foreach X generate .. as field1:chararray, .. as field2:bag{};
A = load '..' using PigStorage('\t', '-schema');
A = load '..' using org.apache.pig.piggybank.storage.avro.AvroStorage();

Èô²»Ö¸¶¨fieldµÄÀàÐÍ£¬ÔòÆäĬÈÏΪbytearray¡£¶Ôδ֪schema½øÐвÙ×÷ʱ£¬ÓУº

Èôjoin/cogroup/cross¶à¹ØÏµ²Ù×÷Óöµ½Î´Öªschema£¬Ôò»á½«ÆäÊÓΪnull schema£¬µ¼Ö·µ»Ø½á¹ûµÄschemaҲΪnull£»

ÈôflattenÒ»¸öempty inner schemaµÄbag£¨¼´:bag{}£©Ê±£¬Ôò·µ»Ø½á¹ûµÄschemaΪnull£»

Èôunionʱ¶þÕßrelationµÄschema²»Ò»Ö£¬Ôò·µ»Ø½á¹ûµÄschemaΪnull£»

ÈôfieldµÄschemaΪnull£¬»á½«¸Ã×Ö¶ÎÊÓΪbytearray¡£

ΪÁ˱£Ö¤pig½Å±¾ÔËÐеÄÓÐЧÐÔ£¬ÔÚдUDFʱҪÔÚoutputSchema·½·¨ÖÐÖ¸¶¨·µ»Ø½á¹ûµÄschema¡£

Êý¾ÝÀàÐÍ

PigµÄ»ù±¾Êý¾ÝÀàÐÍÓë¶ÔÓ¦µÄJavaÀࣺ

¸´ÔÓÊý¾ÝÀàÐͼ°Æä¶ÔÓ¦µÄJavaÀࣺ

PigµÄ¸´ÔÓÊý¾ÝÀàÐÍ¿ÉÒÔǶÌ×±í´ï£¬±ÈÈ磺tupleÖÐÓÐtuple (a, (b, c, d))£¬tupleÖÐÓÐbag (a, {(b,c), (d,e)})µÈµÈ¡£µ«ÊÇÒ»¶¨Òª×ñ´ÓÊý¾ÝÀàÐͱ¾ÉíµÄ¶¨Ò壬±ÈÈ磺bagÖÐÖ»ÄÜÊÇtupleµÄ¼¯ºÏ£¬±ÈÈç{a, {(b),(c)}}¾ÍÊDz»ºÏ·¨µÄ¡£

Pig»¹ÓÐÒ»ÖÖÌØÊâµÄÊý¾ÝÀàÐÍ£ºnull£¬ÓëJava¡¢CÖÐnull²»Ò»Ñù£¬Æä±íʾ²»ÖªµÀµÄ»ò²»´æÔÚµÄÊý¾ÝÀàÐÍ£¨unknown or non-existent£©¡£±ÈÈ磬ÔÚloadÊý¾Ýʱ£¬Èç¹ûÓеÄÊý¾ÝÐÐ×ֶβ»·ûºÏ¶¨ÒåµÄschema£¬Ôò¸Ã×ֶλᱻÖÃΪnull¡£

2. Ô´Âë·ÖÎö

ÒÔÏÂÔ´Âë·ÖÎö²ÉÓõÄÊÇ0.12°æ±¾¡£

Tuple

ÔÚKEYSETÔ´ÂëÖУ¬´´½¨Tuple¶ÔÏó²ÉÓù¤³§+µ¥ÀýÉè¼ÆÄ£Ê½£º

private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
Tuple t = TUPLE_FACTORY.newTuple(s);

ÊÂʵÉÏ£¬TupleFactoryÊǸö³éÏóÀ࣬ʵÏÖ½Ó¿ÚTupleMaker<Tuple>¡£ÔÚ·½·¨TupleFactory.getInstance()ÖУ¬Ä¬ÈÏÇé¿öÏ·µ»ØµÄÊÇBinSedesTupleFactory¶ÔÏó£¬Í¬Ê±Ö§³Ö¼ÓÔØÓû§ÖØÐ´µÄTupleFactoryÀࣨpig.data.tuple.factory.nameÖ¸¶¨ÀàÃû¡¢ pig.data.tuple.factory.jarÖ¸¶¨ÀàËùÔÚµÄjar£©¡£BinSedesTupleFactory¼Ì³ÐÓÚTupleFactory£º

ÔÚBinSedesTupleFactoryµÄnewTuple·½·¨ÖУ¬·µ»ØµÄÊÇBinSedesTuple¶ÔÏó¡£BinSedesTupleÀà¼Ì³ÐÓÚDefaultTupleÀ࣬ÔÚDefaultTupleÀàÖÐÓÐList<Object> mFields×ֶΣ¬Õâ±ãÊÇ´æ´¢TupleÊý¾ÝµÄµØ·½ÁË£¬mFieldsËù³ÖÓÐÀàÐÍΪArrayList<Object>()£»¡£Ààͼ¹ØÏµ£º

Bag

´´½¨Bag¶ÔÏóÓÐÏÂÃæ¼¸ÖÖ·½·¨£º

// factory
BagFactory mBagFactory = BagFactory.getInstance();
DataBag output = mBagFactory.newDefaultBag();

// if you know upfront how many tuples you are going to put in this bag.
DataBag bag = new NonSpillableDataBag(m.size());

ÓëTupleFactoryÒ»Ñù£¬BagFactoryÒ²ÊdzéÏóÀ࣬Ҳ֧³ÖÓû§×Ô¶¨ÒåÖØÐ´£»getInstance·½·¨Ä¬ÈÏ·µ»ØµÄÊÇDefaultBagFactory

DefaultBagFactoryÓÐnewDefaultBag¡¢newSortedBag¡¢newDistinctBag·½·¨·Ö±ð´´½¨ÈýÀàbag£º

default bagÖеÄtupleûÓÐÅÅÐò£¬Ò²Ã»ÓÐÈ¥ÖØ£»

sorted bagÖеÄtupleÊǰ´Ðò´æ·Å£¬Ë³ÐòÊÇÓÉtuple default comparator»òbag´´½¨Ê±µÄcomparatorËù¶¨ÒåµÄ£»

distinct bag¹ËÃû˼Ò壬tupleÓÐÈ¥ÖØ¡£

ÈýÀàbagµÄ¹¹ÔìÆ÷ÈçÏ£º

public DefaultDataBag() {
mContents = new ArrayList<Tuple>();
}

public SortedDataBag(Comparator<Tuple> comp) {
mComp = (comp == null) ? new DefaultComparator() : comp;
mContents = new ArrayList<Tuple>();
}

public DistinctDataBag() {
mContents = new HashSet<Tuple>();
}

BagFactoryµÄÀàͼ£º

DefaultAbstractBag×÷ΪÈýÖÖÀàÐÍbagµÄ»ùÀ࣬ÓÐÒ»¸ö×Ö¶ÎmContentsÓÃÓÚ´æ·Åtuple£¬NonSpillableDataBagÖ±½ÓʵÏÖDataBag½Ó¿Ú¡£DataBagµÄÀàͼ£º

3. ʵս

ÏÖÓÐavroÈÕÖ¾Êý¾Ý£¨¼ûǰһƪ£©£¬Æä×Ö¶Î:

1.dvc±íʾÓû§ÊÖ»ú±êʶ£»

2.appUseÓëappInstallͬΪavro MapÀàÐÍ£¬ÆäkeyΪappÃû³Æ£¨app name£©£¬valueΪMap<String, Object>£¬°üº¬ÁËÒ»¸ö±íʾʹÓÃʱ¼äµÄ×Ö¶Îtimelist£¨ÀàÐÍΪArrayList£©£»¾ßÌå¸ñʽÈçÏÂ

'dvc': 'imei_123',
'appUse': {
'app name1': {
...
'timelist': [...]
},
'app name2': {
...
'timelist': [...]
},
...
}£¬
'appInstall': {
'app name1': {
...
'timelist': [...]
},
...
}

ÏÖÔÚ£¬ÏëÒªµÃµ½Ã¿¸öÓû§µÄappÁÐ±í¼°appµÄ´ò¿ª´ÎÊý£¬ÒÔ¸ñʽdvc, {(app)}, {(app, frequency)}Êä³ö£¬¼´Óû§ + appÁбí + ʹÓôÎÊýÀà±í¡£Èç¹ûÓÃMapRduce×ö£¬µÃ·ÖΪÒÔϲ½Ö裺

1.ÒÔ(dvc, app)ΪkeyÖµ£¬¼ÆËãvalueֵΪʹÓôÎÊý£»

2.ÒÔdvcΪkeyÖµ£¬ºÏ²¢Í¬Ò»Óû§µÄ²»Í¬app£¬valueֵΪ(app, fre)£»

3.ÒÔdvcΪkeyÖµ£¬¼ÆËãappinstallµÄappÁÐ±í£»

4.½«²½Öè2µÃµ½µÄÊý¾ÝÓë²½Öè3µÃµ½µÄÊý¾Ý×öjoin£¬È»ºóÊä³ö¡£

¿ÉÒÔ¿´³öÓÃMapReduceÂÔÏÔ·±¸´£¬ÈçºÎÀ´ÓÃpigÀ´ÊµÏÖÄØ£¿ÎÒÃÇ¿ÉÒÔ¶ÔappUse:map[]±àдEVAL UDF£¬ÈÃÆä·µ»Ø(appÃû³Æ, timelistµÄ³¤¶È) £º

public class AppTimelist  extends EvalFunc<DataBag>{
private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
private static final BagFactory BAG_FACTORY = BagFactory.getInstance();

@SuppressWarnings({ "unchecked" })
@Override
public DataBag exec(Tuple input) throws IOException {
Map<String, Map<String, Object>> m = (Map<String, Map<String, Object>>) input.get(0);
List<Object> result = new ArrayList<Object>();
DataBag output = BAG_FACTORY.newDefaultBag();

if(m == null)
return null;

for(Map.Entry<String, Map<String, Object>> e: m.entrySet()) {
result.clear();
String app = e.getKey();
long size = ((DataBag) e.getValue().get("timelist")).size();
result.add(app);
result.add(size);
output.add(TUPLE_FACTORY.newTuple(result));
}

return output;
}
}

pig½«JavaµÄArrayListת³ÉDataBagµÄÀàÐÍ£¬ËùÒÔÒª¶Ôtimelist½øÐÐǿת²Ù×÷¡£

¶ÔappInstall:map[]±àдEVAL UDF£¬·µ»Ø(appList)£º

public class DistinctBag extends EvalFunc<DataBag> {
BagFactory mBagFactory = BagFactory.getInstance();

@Override
public DataBag exec(Tuple input) throws IOException {
if(input == null || input.size() == 0) {
return null;
}

DataBag in = (DataBag) input.get(0);
DataBag out = mBagFactory.newDistinctBag();

if(in == null) {
return null;
}

for(Tuple tp: in) {
DataBag applist = (DataBag) tp.get(0);
for(Tuple app: applist)
out.add(app);
}
return out;
}
}

ÉÏÃæÌáµ½¹ý£¬ÈôûÓиøEVAL UDFÖ¸¶¨·µ»ØÖµµÄschema£¬Ôò·µ»Ø½á¹ûµÄschemaΪnull£¬Èç´Ë»áÔì³ÉÀàÐ͵ĶªÊ§£¬ÔÚºóÃæµÄ²Ù×÷ÖÐÈÝÒ×±¨NullPointerException¡£

// AppTimelist.java
@Override
public Schema outputSchema(Schema input) {
try {
Schema tupleSchema = new Schema();
FieldSchema chararrayFieldSchema = new Schema.FieldSchema(null, DataType.CHARARRAY);
FieldSchema longFieldSchema = new Schema.FieldSchema(null, DataType.LONG);
tupleSchema.add(chararrayFieldSchema);
tupleSchema.add(longFieldSchema);
return new Schema(new Schema.FieldSchema(getSchemaName(this
.getClass().getName().toLowerCase(), input), tupleSchema,
DataType.TUPLE));
} catch (Exception e) {
return null;
}
}

// DistinctBag.java
@Override
public Schema outputSchema(Schema input) {
FieldSchema innerFieldSchema = new Schema.FieldSchema(null, DataType.CHARARRAY);
Schema innerSchema = new Schema(innerFieldSchema);
Schema bagSchema = null;

try {
bagSchema = new Schema(new FieldSchema(null, innerSchema, DataType.BAG));
} catch(FrontendException e) {
throw new RuntimeException(e);
}
return bagSchema;
}

ͳ¼ÆappÁÐ±í£º

define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage;
define DistinctBag com.pig.udf.bag.DistinctBag;
A = load '..' using AvroStorage();
B = foreach A generate value.fields.data#'dvc' as dvc:chararray, value.fields.data#'appInstall' as ins:map[map[]];
C = foreach B generate dvc, KEYSET(ins) as applist;
D = group C by dvc;
-- extract applist from grouped D
E = foreach D {
projected = foreach $1 generate applist;
generate group as dvc, projected as grouped;
}
F = foreach E generate dvc, DistinctBag(grouped) as applist;
store F into '..' using AvroStorage();

ͳ¼ÆappʹÓÃʱ³¤£º

define AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage;
define AppTimelist com.pig.udf.map.AppTimelist;
A = load '..' using AvroStorage();
B = foreach A generate value.fields.data#'dvc' as dvc:chararray, value.fields.data#'appUse' as use:map[map[]];
C = foreach B generate dvc, flatten(AppTimelist(use)) as (app, fre);
D = group C by (dvc, app);
E = foreach D generate flatten(group) as (dvc, app), SUM($1.fre) as fre;
F = group E by dvc;
G = foreach F {
projected = foreach $1 generate app, fre;
generate group as dvc, projected as appfre;
}
store G into '..' using AvroStorage();

¶þÕß×öjoin¼´¿ÉµÃµ½½á¹û¡£

   
3756 ´Îä¯ÀÀ       28
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ

²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí

GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí