Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GG-37994 Task event can record attributes. #2994

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.ignite.events;

import java.util.Map;
import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.typedef.internal.S;
Expand Down Expand Up @@ -81,6 +82,9 @@ public class TaskEvent extends EventAdapter {
/** */
private final UUID subjId;

/** */
private Map<Object, Object> attributes;

/** {@inheritDoc} */
@Override public String shortDisplay() {
return name() + ": taskName=" + taskName;
Expand All @@ -97,12 +101,28 @@ public class TaskEvent extends EventAdapter {
* @param subjId Subject ID.
*/
public TaskEvent(ClusterNode node, String msg, int type, IgniteUuid sesId, String taskName, String taskClsName,
boolean internal, @Nullable UUID subjId) {
boolean internal, @Nullable UUID subjId) {
this(node, msg, type, sesId, taskName, taskClsName, null, internal, subjId);
}

/**
* Creates task event with given parameters.
*
* @param node Node.
* @param msg Optional message.
* @param type Event type.
* @param sesId Task session ID.
* @param taskName Task name.
* @param subjId Subject ID.
*/
public TaskEvent(ClusterNode node, String msg, int type, IgniteUuid sesId, String taskName, String taskClsName,
Map<Object, Object> attributes, boolean internal, @Nullable UUID subjId) {
super(node, msg, type);

this.sesId = sesId;
this.taskName = taskName;
this.taskClsName = taskClsName;
this.attributes = attributes;
this.internal = internal;
this.subjId = subjId;
}
Expand Down Expand Up @@ -134,6 +154,10 @@ public IgniteUuid taskSessionId() {
return sesId;
}

public Map<Object, Object> attributes() {
return attributes;
}

/**
* Returns {@code true} if task is created by Ignite and is used for system needs.
*
Expand Down Expand Up @@ -162,6 +186,7 @@ public boolean internal() {
"nodeId8", U.id8(node().id()),
"msg", message(),
"type", name(),
"attributes", attributes(),
"tstamp", timestamp());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1742,17 +1742,7 @@ private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest req)
U.resolveClassLoader(ses.getClassLoader(), ctx.config()));

if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) {
Event evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null);

ctx.event().record(evt);
recordTaskEvent(attrs, ses);
}

synchronized (ses) {
Expand Down Expand Up @@ -2442,4 +2432,37 @@ else if (jobId == null)
public long computeJobWorkerInterruptTimeout() {
return computeJobWorkerInterruptTimeout.getOrDefault(ctx.config().getFailureDetectionTimeout());
}

/**
* @param attrs Attributes set.
* @param ses Internal session.
*/
public void recordTaskEvent(Map<?, ?> attrs, GridTaskSessionImpl ses) {
Event evt;

if (ses.isFullSupport()) {
evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
ses.getAttributes(),
false,
null);
} else {
evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null);
}

ctx.event().record(evt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1074,17 +1074,7 @@ private void sendSessionAttributes(Map<?, ?> attrs, GridTaskSessionImpl ses)
}

if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) {
Event evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null);

ctx.event().record(evt);
recordTaskEvent(attrs, ses);
}

notifyTaskStatusMonitors(ComputeTaskStatus.snapshot(ses), false);
Expand Down Expand Up @@ -1690,4 +1680,37 @@ public Map<ComputeJobStatusEnum, Long> jobStatuses(IgniteUuid sesId) {
else
return taskWorker.jobStatuses();
}

/**
* @param attrs Attributes set.
* @param ses Internal session.
*/
public void recordTaskEvent(Map<?, ?> attrs, GridTaskSessionImpl ses) {
Event evt;

if (ses.isFullSupport()) {
evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
ses.getAttributes(),
false,
null);
} else {
evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null);
}

ctx.event().record(evt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1576,15 +1576,29 @@ void onNodeLeft(UUID nodeId) {
*/
private void recordTaskEvent(int evtType, String msg) {
if (!internal && ctx.event().isRecordable(evtType)) {
Event evt = new TaskEvent(
ctx.discovery().localNode(),
msg,
evtType,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
internal,
subjId);
Event evt;
if (ses.isFullSupport()) {
evt = new TaskEvent(
ctx.discovery().localNode(),
msg,
evtType,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
ses.getAttributes(),
internal,
subjId);
} else {
evt = new TaskEvent(
ctx.discovery().localNode(),
msg,
evtType,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
internal,
subjId);
}

ctx.event().record(evt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
Expand All @@ -32,14 +34,24 @@
import org.apache.ignite.compute.ComputeTaskSession;
import org.apache.ignite.compute.ComputeTaskSessionFullSupport;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.TaskEvent;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.resources.TaskSessionResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;

import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.events.EventType.EVTS_TASK_EXECUTION;
import static org.apache.ignite.events.EventType.EVT_TASK_STARTED;

/**
*
*/
Expand All @@ -51,11 +63,23 @@ public class GridSessionSetTaskAttributeSelfTest extends GridCommonAbstractTest
/** */
public static final int EXEC_COUNT = 5;

/** */
private static final String INT_PARAM_NAME = "customIntParameter";

/** */
private static final String TXT_PARAM_NAME = "customTextParameter";

/** */
public GridSessionSetTaskAttributeSelfTest() {
super(true);
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setIncludeEventTypes(EVTS_TASK_EXECUTION);
}

/**
* @throws Exception if failed.
*/
Expand Down Expand Up @@ -101,6 +125,52 @@ public void testMultiThreaded() throws Exception {
fail();
}

/**
* Check parameters propagation from session to events
* @throws Exception
*/
@Test
public void testAttributesToEventsPropagation() throws Exception {
final int intParam = 1;
final String textParam = "text";

IgniteEx n = startGrid(0);

CountDownLatch latch = new CountDownLatch(1);

n.cluster().state(ACTIVE);

IgnitePredicate<TaskEvent> lsnr = evt -> {
log.info("Received task event [evt=" + evt.name() + ", taskName=" + evt.taskName() + ", taskAttributes=" + evt.attributes() + ']');

if (evt.type() == EVT_TASK_STARTED) {
assertTrue(evt.attributes().isEmpty());
} else {
assertEquals(intParam, evt.attributes().get(INT_PARAM_NAME));
if (evt.attributes().size() > 1)
assertEquals(textParam, evt.attributes().get(TXT_PARAM_NAME));
}

latch.countDown();

return true;
};

n.events().localListen(lsnr, EVTS_TASK_EXECUTION);

// Generate task events.
IgniteFuture<Void> taskFut0 = n.compute().runAsync(new CallableWithSessionAttributes(intParam, textParam));

taskFut0.get(5, TimeUnit.SECONDS);

latch.await(5, TimeUnit.SECONDS);

assertEquals(0, latch.getCount());

// Unsubscribe local task event listener.
n.events().stopLocalListen(lsnr);
}

/**
* @param num Number.
*/
Expand All @@ -114,6 +184,33 @@ private void checkTask(int num) {
assert (Integer)res == SPLIT_COUNT : "Invalid result [num=" + num + ", fut=" + fut + ']';
}

/**
*
*/
@ComputeTaskSessionFullSupport
private static class CallableWithSessionAttributes implements IgniteRunnable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: Callable but implements Runnable.


@TaskSessionResource
private ComputeTaskSession ses;

private int numericParameter;

private String textParameter;

public CallableWithSessionAttributes(int numericParameter, String textParameter) {
this.numericParameter = numericParameter;
this.textParameter = textParameter;
}

@Override public void run() {
if (numericParameter != 0)
ses.setAttribute(INT_PARAM_NAME, numericParameter);

if (textParameter != null)
ses.setAttribute(TXT_PARAM_NAME, textParameter);
}
}

/**
*
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public boolean removeMessageListener(GridTopic topic, @Nullable GridMessageListe
public void triggerTaskEvent(int type, String taskName, IgniteUuid taskSesId, String msg) {
assert type > 0;

triggerEvent(new TaskEvent(locNode, msg, type, taskSesId, taskName, null, false, null));
triggerEvent(new TaskEvent(locNode, msg, type, taskSesId, taskName, null, null, false, null));
}

/**
Expand Down