Skip to content

Commit

Permalink
issue #40: fixed in Spark, optional laplace, atomic double #CHANGELOG
Browse files Browse the repository at this point in the history
  • Loading branch information
rcabanasdepaz committed Oct 21, 2016
1 parent e3a0b4c commit 5f66fcd
Show file tree
Hide file tree
Showing 13 changed files with 1,114 additions and 9 deletions.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Empty file.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions sparklink/pom.xml
Expand Up @@ -54,5 +54,11 @@
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Expand Up @@ -18,6 +18,7 @@
package eu.amidst.sparklink.core.learning;


import com.google.common.util.concurrent.AtomicDouble;
import eu.amidst.core.datastream.DataInstance;
import eu.amidst.core.exponentialfamily.EF_BayesianNetwork;
import eu.amidst.core.exponentialfamily.SufficientStatistics;
Expand Down Expand Up @@ -59,11 +60,23 @@ public class ParallelMaximumLikelihood implements ParameterLearningAlgorithm, Se
*/
protected transient SufficientStatistics sumSS;

double numInstances;
/** Represents the data instance count. */
protected AtomicDouble numInstances;

/** Represents whether Laplace correction (i.e. MAP estimation) is used*/
protected boolean laplace = true;



public void initLearning() {
efBayesianNetwork = new EF_BayesianNetwork(dag);
sumSS = efBayesianNetwork.createInitSufficientStatistics();
if (laplace) {
sumSS = efBayesianNetwork.createInitSufficientStatistics();
numInstances = new AtomicDouble(1.0); //Initial counts
}else {
sumSS = efBayesianNetwork.createZeroSufficientStatistics();
numInstances = new AtomicDouble(0.0); //Initial counts
}

}

Expand Down Expand Up @@ -110,16 +123,13 @@ public double updateModel(DataSpark dataUpdate) {

//this.sumSS = computeSufficientStatistics(dataUpdate, efBayesianNetwork);

this.sumSS = dataUpdate.getDataSet()
sumSS.sum(dataUpdate.getDataSet()
.mapPartitions( iter -> sufficientStatisticsMap(iter, this.efBayesianNetwork))
.reduce(ParallelMaximumLikelihood::sufficientStatisticsReduce);
.reduce(ParallelMaximumLikelihood::sufficientStatisticsReduce));

//Add the prior
sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());

// FIXME: Maybe a generic method from the class, what about caching?
numInstances = dataSpark.getDataSet().count();
numInstances++;//Initial counts
numInstances.addAndGet(dataSpark.getDataSet().count());


return this.getLogMarginalProbability();
Expand Down Expand Up @@ -149,7 +159,7 @@ public BayesianNetwork getLearntBayesianNetwork() {
//Normalize the sufficient statistics
SufficientStatistics normalizedSS = efBayesianNetwork.createZeroSufficientStatistics();
normalizedSS.copy(sumSS);
normalizedSS.divideBy(numInstances);
normalizedSS.divideBy(numInstances.get());

efBayesianNetwork.setMomentParameters(normalizedSS);
return efBayesianNetwork.toBayesianNetwork(dag);
Expand Down
@@ -0,0 +1,89 @@
/*
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*
*/

package eu.amidst.flinklink.core.learning.parametric;


import eu.amidst.core.datastream.DataInstance;
import eu.amidst.core.datastream.DataStream;
import eu.amidst.core.distribution.Multinomial;
import eu.amidst.core.io.BayesianNetworkLoader;
import eu.amidst.core.io.DataStreamWriter;
import eu.amidst.core.models.BayesianNetwork;
import eu.amidst.core.models.DAG;
import eu.amidst.core.utils.BayesianNetworkSampler;
import eu.amidst.core.utils.DAGGenerator;
import eu.amidst.core.variables.Variable;
import eu.amidst.sparklink.core.data.DataSpark;
import eu.amidst.sparklink.core.io.DataSparkLoader;
import eu.amidst.sparklink.core.learning.ParallelMaximumLikelihood;
import junit.framework.TestCase;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
import org.junit.Assert;

import java.io.IOException;

/**
* Created by andresmasegosa on 2/9/15.
*/
public class ParallelMaximumLikelihoodTest extends TestCase {

public void testingMLParallelWI() throws Exception {

SparkConf conf = new SparkConf().setAppName("SparkLink!").setMaster("local");;
SparkContext sc = new SparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);

//Path to dataset
String path ="datasets/simulated/WI_samples.json";

//Create an AMIDST object for managing the data
DataSpark dataSpark = DataSparkLoader.open(sqlContext, path);

//Learning algorithm
ParallelMaximumLikelihood parameterLearningAlgorithm = new ParallelMaximumLikelihood();


//We fix the BN structure
DAG dag = DAGGenerator.getNaiveBayesStructure(dataSpark.getAttributes(), "W");

parameterLearningAlgorithm.setDAG(dag);

//We set the batch size which will be employed to learn the model in parallel
parameterLearningAlgorithm.setBatchSize(100);
//We set the data which is going to be used for leaning the parameters
parameterLearningAlgorithm.setDataSpark(dataSpark);
//We perform the learning
parameterLearningAlgorithm.runLearning();
//And we get the model
BayesianNetwork bn = parameterLearningAlgorithm.getLearntBayesianNetwork();

System.out.println(bn);

Multinomial dist = bn.getConditionalDistribution(bn.getVariables().getVariableByName("W"));

double[] p = dist.getProbabilities();

Assert.assertTrue(p[0] == 0.6998001998001998);
Assert.assertTrue(p[1] == 0.3001998001998002);

}


}

0 comments on commit 5f66fcd

Please sign in to comment.