This repository has been archived by the owner on Sep 29, 2021. It is now read-only.
/
ZooKeeperAgentModel.java
240 lines (214 loc) · 8.17 KB
/
ZooKeeperAgentModel.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
/*-
* -\-\-
* Helios Services
* --
* Copyright (C) 2016 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* -/-/-
*/
package com.spotify.helios.agent;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.spotify.helios.common.descriptors.Descriptor.parse;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import com.spotify.helios.common.Json;
import com.spotify.helios.common.descriptors.JobId;
import com.spotify.helios.common.descriptors.Task;
import com.spotify.helios.common.descriptors.TaskStatus;
import com.spotify.helios.common.descriptors.TaskStatusEvent;
import com.spotify.helios.servicescommon.EventSender;
import com.spotify.helios.servicescommon.coordination.Paths;
import com.spotify.helios.servicescommon.coordination.PersistentPathChildrenCache;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClient;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClientProvider;
import com.spotify.helios.servicescommon.coordination.ZooKeeperUpdatingPersistentDirectory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.curator.framework.state.ConnectionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The Helios Agent's view into ZooKeeper.
*
* <p>This caches ZK state to local disk so the agent can continue to function in the face of a ZK
* outage.
*/
public class ZooKeeperAgentModel extends AbstractIdleService implements AgentModel {
private static final Logger log = LoggerFactory.getLogger(ZooKeeperAgentModel.class);
private static final String TASK_CONFIG_FILENAME = "task-config.json";
private static final String TASK_STATUS_FILENAME = "task-status.json";
private final PersistentPathChildrenCache<Task> tasks;
private final ZooKeeperUpdatingPersistentDirectory taskStatuses;
private final TaskHistoryWriter historyWriter;
private final List<EventSender> eventSenders;
private final String taskStatusEventTopic;
private final String agent;
private final CopyOnWriteArrayList<AgentModel.Listener> listeners = new CopyOnWriteArrayList<>();
public ZooKeeperAgentModel(final ZooKeeperClientProvider provider,
final String host,
final Path stateDirectory,
final TaskHistoryWriter historyWriter,
final List<EventSender> eventSenders,
final String taskStatusEventTopic)
throws IOException, InterruptedException {
// TODO(drewc): we're constructing too many heavyweight things in the ctor, these kinds of
// things should be passed in/provider'd/etc.
final ZooKeeperClient client = provider.get("ZooKeeperAgentModel_ctor");
this.agent = checkNotNull(host);
final Path taskConfigFile = stateDirectory.resolve(TASK_CONFIG_FILENAME);
this.tasks = client.pathChildrenCache(Paths.configHostJobs(host), taskConfigFile,
Json.type(Task.class));
tasks.addListener(new JobsListener());
final Path taskStatusFile = stateDirectory.resolve(TASK_STATUS_FILENAME);
this.taskStatuses = ZooKeeperUpdatingPersistentDirectory.create("agent-model-task-statuses",
provider,
taskStatusFile,
Paths.statusHostJobs(host));
this.historyWriter = historyWriter;
this.eventSenders = eventSenders;
this.taskStatusEventTopic = taskStatusEventTopic;
}
@Override
protected void startUp() throws Exception {
tasks.startAsync().awaitRunning();
taskStatuses.startAsync().awaitRunning();
if (historyWriter != null) {
historyWriter.startAsync().awaitRunning();
}
}
@Override
protected void shutDown() throws Exception {
tasks.stopAsync().awaitTerminated();
taskStatuses.stopAsync().awaitTerminated();
if (historyWriter != null) {
historyWriter.stopAsync().awaitTerminated();
}
}
private JobId jobIdFromTaskPath(final String path) {
final String prefix = Paths.configHostJobs(agent) + "/";
return JobId.fromString(path.replaceFirst(prefix, ""));
}
/**
* Returns the tasks (basically, a pair of {@link JobId} and {@link Task}) for the current agent.
*/
@Override
public Map<JobId, Task> getTasks() {
final Map<JobId, Task> tasks = Maps.newHashMap();
for (final Map.Entry<String, Task> entry : this.tasks.getNodes().entrySet()) {
final JobId id = jobIdFromTaskPath(entry.getKey());
tasks.put(id, entry.getValue());
}
return tasks;
}
/**
* Returns the {@link TaskStatus}es for all tasks assigned to the current agent.
*/
@Override
public Map<JobId, TaskStatus> getTaskStatuses() {
final Map<JobId, TaskStatus> statuses = Maps.newHashMap();
for (final Map.Entry<String, byte[]> entry : this.taskStatuses.entrySet()) {
try {
final JobId id = JobId.fromString(entry.getKey());
final TaskStatus status = Json.read(entry.getValue(), TaskStatus.class);
statuses.put(id, status);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return statuses;
}
/**
* Set the {@link TaskStatus} for the job identified by {@code jobId}.
*/
@Override
public void setTaskStatus(final JobId jobId, final TaskStatus status)
throws InterruptedException {
log.debug("setting task status: {}", status);
taskStatuses.put(jobId.toString(), status.toJsonBytes());
if (historyWriter != null) {
try {
historyWriter.saveHistoryItem(status);
} catch (Exception e) {
// Log error here and keep going as saving task history is not critical.
// This is to prevent bad data in the queue from screwing up the actually important Helios
// agent operations.
log.error("Error saving task status {} to ZooKeeper: {}", status, e);
}
}
final TaskStatusEvent event = new TaskStatusEvent(status, System.currentTimeMillis(), agent);
final byte[] message = event.toJsonBytes();
for (final EventSender sender : eventSenders) {
sender.send(taskStatusEventTopic, message);
}
}
/**
* Get the {@link TaskStatus} for the job identified by {@code jobId}.
*/
@Override
public TaskStatus getTaskStatus(final JobId jobId) {
final byte[] data = taskStatuses.get(jobId.toString());
if (data == null) {
return null;
}
try {
return parse(data, TaskStatus.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Remove the {@link TaskStatus} for the job identified by {@code jobId}.
*/
@Override
public void removeTaskStatus(final JobId jobId) throws InterruptedException {
taskStatuses.remove(jobId.toString());
}
/**
* Add a listener that will be notified when tasks are changed.
*/
@Override
public void addListener(final AgentModel.Listener listener) {
listeners.add(listener);
listener.tasksChanged(this);
}
/**
* Remove a listener that will be notified when tasks are changed.
*/
@Override
public void removeListener(final AgentModel.Listener listener) {
listeners.remove(listener);
}
protected void fireTasksUpdated() {
for (final AgentModel.Listener listener : listeners) {
try {
listener.tasksChanged(this);
} catch (Exception e) {
log.error("listener threw exception", e);
}
}
}
private class JobsListener implements PersistentPathChildrenCache.Listener {
@Override
public void nodesChanged(final PersistentPathChildrenCache<?> cache) {
fireTasksUpdated();
}
@Override
public void connectionStateChanged(final ConnectionState state) {
// ignore
}
}
}