Skip to content

User aggregators #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
917e118
Merge branch 'master' into boa_evaluator
Jan 31, 2017
a6e1c3d
Initial commit for User Defined Aggregation support
Feb 1, 2017
a31e918
Fixing bug : Only one user defined aggregator runs in the presence of…
Feb 2, 2017
e4b0f05
REmoving unnecessary prints
Feb 2, 2017
ba6b2ff
Fixing a bug: Filter non aggregator functions from list
Feb 2, 2017
1829489
Merge branch 'boa_evaluator' into user_Aggregators
Feb 2, 2017
de27178
Fixing a test case as code generation has change.
Feb 2, 2017
a7fd7b6
Updating latest code generation string template.
Feb 2, 2017
81e8f37
Fixing bug in UserDefinedCode generating process. Fixing fullyqualifi…
Feb 3, 2017
2972425
adding naive bayes exmaple using user defined aggragation
Feb 3, 2017
b161ab2
Allowing creation of arrays of nested and complex types
Feb 12, 2017
81af781
Adding capability to convert a tuple into array if possible. If tuple…
Feb 12, 2017
3295b15
code for matrix transpose, inverse, summation and substraction suppor…
Feb 13, 2017
a05a385
Adding machine learning examples codes in test directory
Feb 14, 2017
ff5b37b
Adding matrix operations
Feb 14, 2017
6b6aa9f
Fixing bug in getCol method in matrix operations
Feb 15, 2017
0da11ef
linear regression optimized and unoptimized code
Feb 18, 2017
737060d
adding neural network withour back propogation
Feb 19, 2017
db0a04f
Changes in MatrixOperations and Adding Print facility for debugging H…
Feb 19, 2017
98eb3ac
removing merge conflicts
Feb 19, 2017
fb23150
adding back propogation in neural
Feb 20, 2017
491adfc
adding pca
Feb 21, 2017
72711ce
adding optimized pca
Feb 22, 2017
edf12ff
adding new machine learning algorCithms
Feb 22, 2017
656775d
Adding changes to support options as user defined aggregations
Feb 26, 2017
e691a5b
Changes to support serialization of ml model in Boa
Feb 26, 2017
390fc86
Storing the class as part of model
Feb 26, 2017
6ecf209
Adding serialization support for the model using simple json
Feb 26, 2017
fec8ee8
adding support for loading ml model
Feb 27, 2017
739eb3c
Allowing options in user defined aggregator class
Mar 2, 2017
459000f
adding training model usage
Mar 3, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

172 changes: 172 additions & 0 deletions examples/naive.boa
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
p: Project = input;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to 'test/known-good/' ? We dont have an examples directory and if you are going to put code examples in there, might as well use them as test cases.

type fv = {a:int, b:int, c:int, d:int};
type stats = {a_stat:float, b_stat:float, c_stat:float};
type complete_stat = {avg: stats, dev: stats};
type Data = {training: fv, testing: fv};
splitRatio : float = 0.67;

naive := function(vals : array of Data) : float {
train : array of fv;
test : array of fv;

spearated: map[int] of array of fv; # classified per value
summaries : map[int] of complete_stat;

# separate the training and testing datasets
foreach(i:int; def(vals[i])) {
if(def(train)) {
train = train + {vals[i].training};
} else {
train = {vals[i].training};
}
if(def(test)) {
test = test+ {vals[i].testing};
} else {
test = {vals[i].testing};
}

}


# classify training datasets
foreach(i:int; def(train[i])) {
temp : array of fv = {train[i]};
if(!haskey(spearated, train[i].d)) {
spearated[train[i].d] = temp;
} else {
spearated[train[i].d] = spearated[train[i].d] + temp;
}
}

# all the classes
classes : array of int = keys(spearated);

# summarize data from training dataset
foreach(i:int; def(classes[i])) {
# calculate mean
feature_mean : stats = {0.0, 0.0, 0.0};
foreach(j:int; def(spearated[classes[i]][j])) {
feature_mean.a_stat = feature_mean.a_stat + spearated[classes[i]][j].a;
feature_mean.b_stat = feature_mean.b_stat + spearated[classes[i]][j].b;
feature_mean.c_stat = feature_mean.c_stat + spearated[classes[i]][j].c;
}
feature_mean.a_stat = feature_mean.a_stat / len(spearated[classes[i]]);
feature_mean.b_stat = feature_mean.b_stat / len(spearated[classes[i]]);
feature_mean.c_stat = feature_mean.c_stat / len(spearated[classes[i]]);


# calculate sd
feature_sd : stats = {0.0, 0.0, 0.0};
foreach(j:int; def(spearated[classes[i]][j])) {
feature_sd.a_stat = feature_sd.a_stat + (spearated[classes[i]][j].a - feature_mean.a_stat);
feature_sd.b_stat = feature_sd.b_stat + (spearated[classes[i]][j].b - feature_mean.b_stat);
feature_sd.c_stat = feature_sd.c_stat + (spearated[classes[i]][j].c - feature_mean.c_stat);
}
feature_sd.a_stat = sqrt(feature_sd.a_stat / len(spearated[classes[i]]));
feature_sd.b_stat = sqrt(feature_sd.b_stat / len(spearated[classes[i]]));
feature_sd.c_stat = sqrt(feature_sd.c_stat / len(spearated[classes[i]]));

# summarized a class
summaries[classes[i]] = {feature_mean, feature_sd};
}


predictions: array of int;
predictions = new(predictions, len(test), -1);

# predict for each test data
foreach(i:int; def(test[i])) {
probabilities : map[int] of float;
foreach(j: int; def(classes[j])) {
probabilities[classes[j]] = 1.0;
mean := summaries[classes[j]].avg;
deviation := summaries[classes[j]].dev;
probabilities[classes[j]] = probabilities[classes[j]] * (1/ (sqrt(2 * 3.14) * deviation.a_stat)) * (exp(-1 * ((pow((1.0 * test[i].a) - mean.a_stat, 2))/(2 * pow(deviation.a_stat, 2)))));
probabilities[classes[j]] = probabilities[classes[j]] * (1/ (sqrt(2 * 3.14) * deviation.a_stat)) * (exp(-1 * ((pow((1.0 * test[i].b) - mean.b_stat, 2))/(2 * pow(deviation.b_stat, 2)))));
probabilities[classes[j]] = probabilities[classes[j]] * (1/ (sqrt(2 * 3.14) * deviation.a_stat)) * (exp(-1 * ((pow((1.0 * test[i].c) - mean.c_stat, 2))/(2 * pow(deviation.c_stat, 2)))));
}

bestProb : float = 0;
bestLab : int = -1;
foreach(j: int; def(classes[j])) {
if ((bestLab == -1) || (bestProb < probabilities[classes[j]])) {
bestProb = probabilities[classes[j]];
bestLab = classes[j];
}
}
predictions[i] = bestLab;
}

correct : float = 0.0;
foreach(i:int; def(test[i])) {
if(predictions[i] == test[i].d) {
correct = correct + 1.0;
}
}
return correct/len(test) * 100;
};

scale := function(ast: int, method: int, class: int) : int {
total : int = 0;
if(ast > 1000) {
total++;
} if(method > 500) {
total++;
} if(class > 50) {
total++;
}
return total;
};


naive_bayes : output naive of Data;

# count ast nodes

astCount := 0;
classCount := 0;
methodCount := 0;
visit(p, visitor {
# only look at the latest snapshot
before n: CodeRepository -> {
snapshot := getsnapshot(n);
foreach (i: int; def(snapshot[i]))
visit(snapshot[i]);
stop;
}
before node: Declaration -> {
if (node.kind == TypeKind.CLASS) {
classCount++;
foreach (i: int; node.methods[i]) {
methodCount++;
}
}
}
# by default, count all visited nodes
before _ -> astCount++;
# these nodes are not part of the AST, so do nothing when visiting
before Project, ChangedFile -> ;
});



dummy : fv = {0, 0, 0, 0};
nondummy : fv = {astCount, methodCount, classCount, scale(astCount, methodCount, classCount)};
data1: Data = {nondummy, dummy};
data2: Data = {dummy, nondummy};
if(rand() > splitRatio)
naive_bayes << data1;
else
naive_bayes << data2;


if(rand() > splitRatio)
naive_bayes << data1;
else
naive_bayes << data2;


if(rand() > splitRatio)
naive_bayes << data1;
else
naive_bayes << data2;
Binary file added lib/gson-2.8.0.jar
Binary file not shown.
Binary file added lib/guava-21.0.jar
Binary file not shown.
Binary file added lib/jama-1.0.3.jar
Binary file not shown.
16 changes: 15 additions & 1 deletion src/antlr/Boa.g
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ outputType returns [OutputType ast]
locals [int l, int c]
@init { $l = getStartLine(); $c = getStartColumn(); }
@after { $ast.setPositions($l, $c, getEndLine(), getEndColumn()); }
: OUTPUT (tk=SET { $ast = new OutputType(new Identifier($tk.text)); } | id=identifier { $ast = new OutputType($id.ast); }) (LPAREN el=expressionList RPAREN { $ast.setArgs($el.list); })? (LBRACKET m=component RBRACKET { $ast.addIndice($m.ast); })* OF m=component { $ast.setType($m.ast); } (WEIGHT m=component { $ast.setWeight($m.ast); })? (FORMAT LPAREN el=expressionList RPAREN)?
: OUTPUT (tk=SET { $ast = new OutputType(new Identifier($tk.text)); } | id=identifier { $ast = new OutputType($id.ast); }) (LPAREN vl=vardeclList RPAREN { $ast.setParams($vl.list); })? (LPAREN el=expressionList RPAREN { $ast.setArgs($el.list); })? (LBRACKET m=component RBRACKET { $ast.addIndice($m.ast); })* OF m=component { $ast.setType($m.ast); } (WEIGHT m=component { $ast.setWeight($m.ast); })? (FORMAT LPAREN el=expressionList RPAREN)?
;

functionType returns [FunctionType ast]
Expand Down Expand Up @@ -429,6 +429,20 @@ expressionList returns [ArrayList<Expression> list]
| e=expression { $list.add($e.ast); } ({ notifyErrorListeners("error: ',' expected"); } e=expression { $list.add($e.ast); } | COMMA e=expression { $list.add($e.ast); })*
;


useraggParamDeclaration returns [VarDeclStatement ast]
locals [int l, int c]
@init { $l = getStartLine(); $c = getStartColumn(); }
@after { $ast.setPositions($l, $c, getEndLine(), getEndColumn()); }
: v=forVariableDeclaration { $ast = $v.ast; }
;

vardeclList returns [ArrayList<VarDeclStatement> list]
@init { $list = new ArrayList<VarDeclStatement>(); }
: e=useraggParamDeclaration { $list.add($e.ast); } (COMMA e=useraggParamDeclaration { $list.add($e.ast); })*
| e=useraggParamDeclaration { $list.add($e.ast); } ({ notifyErrorListeners("error: ',' expected"); } e=useraggParamDeclaration { $list.add($e.ast); } | COMMA e=useraggParamDeclaration { $list.add($e.ast); })*
;

conjunction returns [Conjunction ast]
locals [int l, int c]
@init { $l = getStartLine(); $c = getStartColumn(); }
Expand Down
5 changes: 5 additions & 0 deletions src/java/boa/BoaEnumInterface.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package boa;

public interface BoaEnumInterface {
Object getValue();
}
14 changes: 14 additions & 0 deletions src/java/boa/BoaTup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package boa;

import java.io.IOException;
import java.util.Collection;


public interface BoaTup {
public String[] getValues();
public byte[] serialize(Object o) throws IOException;
public Object getValue(String f);
public String toString();
public <T> T[] asArray(T[] type);
public String[] getFieldNames();
}
55 changes: 55 additions & 0 deletions src/java/boa/aggregators/Aggregator.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

import java.io.IOException;

import boa.BoaTup;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer.Context;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

import boa.functions.BoaCasts;
import boa.io.EmitKey;
import boa.io.EmitValue;
Expand All @@ -38,6 +43,29 @@ public abstract class Aggregator {
private Context context;
private EmitKey key;
private boolean combining;
private final static Set<String> inBuiltAggs = new HashSet<String>();

static {
inBuiltAggs.add("sum");
inBuiltAggs.add("top");
inBuiltAggs.add("maximum");
inBuiltAggs.add("minimum");
inBuiltAggs.add("max");
inBuiltAggs.add("min");
inBuiltAggs.add("collection");
inBuiltAggs.add("mean");
inBuiltAggs.add("median");
inBuiltAggs.add("stdev");
inBuiltAggs.add("quantile");
inBuiltAggs.add("kurtosis");
inBuiltAggs.add("histogram");
inBuiltAggs.add("graphCSV");
inBuiltAggs.add("set");
inBuiltAggs.add("bottom");
inBuiltAggs.add("skewness");
inBuiltAggs.add("confidence");
inBuiltAggs.add("variance");
}

/**
* Construct an Aggregator.
Expand Down Expand Up @@ -93,6 +121,13 @@ public void aggregate(final double data) throws IOException, InterruptedExceptio
this.aggregate(BoaCasts.doubleToString(data), null);
}

public void aggregate(final BoaTup data, final String metadata) throws IOException, InterruptedException, FinishedException {
}

public void aggregate(final BoaTup data) throws IOException, InterruptedException, FinishedException {
this.aggregate(data, null);
}

@SuppressWarnings("unchecked")
protected void collect(final String data, final String metadata) throws IOException, InterruptedException {
if (this.combining)
Expand All @@ -107,6 +142,22 @@ protected void collect(final String data) throws IOException, InterruptedExcepti
this.collect(data, null);
}

protected void collect(final BoaTup data) throws IOException, InterruptedException {
this.collect(data.toString(), null);
}

protected void collect(final BoaTup[] data) throws IOException, InterruptedException {
this.collect(Arrays.toString(data), null);
}

protected void collect(final double[] data) throws IOException, InterruptedException {
this.collect(Arrays.toString(data), null);
}

protected void collect(final long[] data) throws IOException, InterruptedException {
this.collect(Arrays.toString(data), null);
}

@SuppressWarnings("unchecked")
protected void collect(final long data, final String metadata) throws IOException, InterruptedException {
this.collect(BoaCasts.longToString(data), metadata);
Expand Down Expand Up @@ -157,4 +208,8 @@ public void setKey(final EmitKey key) {
public EmitKey getKey() {
return this.key;
}

public final static boolean isUserDefinedAggregator(String name) {
return !Aggregator.inBuiltAggs.contains(name);
}
}
25 changes: 25 additions & 0 deletions src/java/boa/aggregators/UserDefinedAggregator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package boa.aggregators;


import boa.compiler.UserDefinedAggregators;
import boa.datagen.util.FileIO;
import com.google.gson.Gson;

import java.io.*;

@AggregatorSpec(name = "UserDefinedAgg", formalParameters = { "any", "any" }, type = "UserDefined", canCombine = false)
public abstract class UserDefinedAggregator extends Aggregator {

public void store(Object object) {
Gson json = new Gson();
File output = new File(UserDefinedAggregators.getFileName());
final String dest= output.getAbsolutePath() + "/";
output.mkdir();
writeAsJSON(object, dest + UserDefinedAggregators.getFileName() + ".model");
}

private void writeAsJSON(Object object, String path) {
Gson writer = new Gson();
FileIO.writeFileContents(new File(path), writer.toJson(object));
}
}
3 changes: 3 additions & 0 deletions src/java/boa/compiler/BoaCompiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ public void syntaxError(Recognizer<?, ?> recognizer, Object offendingSymbol, int

try {
if (!parserErrorListener.hasError) {
UserDefinedAggregators.setFileName(f.getName());
UserDefinedAggregators.setJobName("Job" + jobName);
new TypeCheckingVisitor().start(p, new SymbolTable());

final TaskClassifyingVisitor simpleVisitor = new TaskClassifyingVisitor();
Expand Down Expand Up @@ -241,6 +243,7 @@ public void syntaxError(Recognizer<?, ?> recognizer, Object offendingSymbol, int
st.add("jobs", jobs);
st.add("jobnames", jobnames);
st.add("combineTables", CodeGeneratingVisitor.combineAggregatorStrings);
st.add("userDeclAgg", CodeGeneratingVisitor.userAggregatorDeclStrings);
st.add("reduceTables", CodeGeneratingVisitor.reduceAggregatorStrings);
st.add("splitsize", isSimple ? 64 * 1024 * 1024 : 10 * 1024 * 1024);
if(DefaultProperties.localDataPath != null) {
Expand Down
Loading