This repository has been archived by the owner on Jun 22, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 61
/
MesosCluster.java
133 lines (113 loc) · 4.94 KB
/
MesosCluster.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package org.apache.mesos.mini;
import com.github.dockerjava.api.InternalServerErrorException;
import com.github.dockerjava.api.command.CreateContainerCmd;
import com.github.dockerjava.api.model.PortBinding;
import com.mashape.unirest.http.Unirest;
import com.mashape.unirest.http.exceptions.UnirestException;
import org.apache.log4j.Logger;
import org.apache.mesos.mini.docker.DockerProxy;
import org.apache.mesos.mini.docker.DockerUtil;
import org.apache.mesos.mini.docker.PrivateDockerRegistry;
import org.apache.mesos.mini.mesos.MesosClusterConfig;
import org.apache.mesos.mini.mesos.MesosContainer;
import org.apache.mesos.mini.state.State;
import org.apache.mesos.mini.util.MesosClusterStateResponse;
import org.apache.mesos.mini.util.Predicate;
import org.json.JSONObject;
import org.junit.rules.ExternalResource;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import static com.jayway.awaitility.Awaitility.await;
/**
* Starts the mesos cluster. Responsible for setting up proxy and private docker registry. Once started, users can add
* their own images to the private registry.
*/
public class MesosCluster extends ExternalResource {
private static Logger LOGGER = Logger.getLogger(MesosCluster.class);
private final MesosClusterConfig config;
private MesosContainer mesosContainer;
private DockerUtil dockerUtil;
public MesosCluster(MesosClusterConfig config) {
this.config = config;
this.dockerUtil = new DockerUtil(config.dockerClient);
}
public void start() {
try {
DockerProxy dockerProxy = new DockerProxy(config.dockerClient);
dockerProxy.start();
// Pulls registry images and start container
PrivateDockerRegistry privateDockerRegistry = new PrivateDockerRegistry(config.dockerClient, this.config);
privateDockerRegistry.startPrivateRegistryContainer();
// start the container
mesosContainer = new MesosContainer(config.dockerClient, this.config);
mesosContainer.startMesosLocalContainer(privateDockerRegistry.getContainerId());
// wait until the given number of slaves are registered
new MesosClusterStateResponse(mesosContainer.getMesosMasterURL(), config.numberOfSlaves).waitFor();
} catch (Throwable e) {
LOGGER.error("Error during startup", e);
dockerUtil.stop(); // remove all created docker containers (not handled by after then re-throwing e)
throw e;
}
}
/**
* Pull and start a docker image. This container will be destroyed when the Mesos cluster is shut down.
*
* @param containerName The name of the image
* @return The id of the container
*/
public String addAndStartContainer(String containerName, PortBinding ... portBindings) {
DockerUtil dockerUtil = new DockerUtil(config.dockerClient);
dockerUtil.pullImage(containerName, "latest");
String name = containerName.replace("/","_");
CreateContainerCmd command = config.dockerClient.createContainerCmd(containerName).withName(name);
if (portBindings != null) {
command.withPortBindings(portBindings);
}
return dockerUtil.createAndStart(command);
}
/**
* Pull and start a docker image. This container will be destroyed when the Mesos cluster is shut down.
*
* @param containerName The name of the image
* @return The id of the container
*/
public String addAndStartContainer(String containerName) {
return addAndStartContainer(containerName, null);
}
public State getStateInfo() throws UnirestException {
String json = Unirest.get("http://" + mesosContainer.getMesosMasterURL() + "/state.json").asString().getBody();
return State.fromJSON(json);
}
public JSONObject getStateInfoJSON() throws UnirestException {
return Unirest.get("http://" + mesosContainer.getMesosMasterURL() + "/state.json").asJson().getBody().getObject();
}
public String getMesosMasterURL(){
return mesosContainer.getMesosMasterURL();
}
public void waitForState(final Predicate<State> predicate, int seconds) {
await().atMost(seconds, TimeUnit.SECONDS).until(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
try {
return predicate.test(MesosCluster.this.getStateInfo());
}
catch(InternalServerErrorException e) {
LOGGER.error(e);
// This probably means that the mesos cluster isn't ready yet..
return false;
}
}
});
}
public void waitForState(Predicate<State> predicate) {
waitForState(predicate, 20);
}
@Override
protected void before() throws Throwable {
start();
}
@Override
protected void after() {
dockerUtil.stop();
}
}