Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GG-37994 Task event can record attributes. #2994

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class TaskEvent extends EventAdapter {
* @param subjId Subject ID.
*/
public TaskEvent(ClusterNode node, String msg, int type, IgniteUuid sesId, String taskName, String taskClsName,
boolean internal, @Nullable UUID subjId) {
boolean internal, @Nullable UUID subjId) {
super(node, msg, type);

this.sesId = sesId;
Expand Down
103 changes: 103 additions & 0 deletions modules/core/src/main/java/org/apache/ignite/events/TaskEventV2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2019 GridGain Systems, Inc. and Contributors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2023

*
* Licensed under the GridGain Community Edition License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.events;

import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;

import java.util.Map;
import java.util.UUID;

/**
* Grid task event.
* <p>
* Grid events are used for notification about what happens within the grid. Note that by
* design Ignite keeps all events generated on the local node locally and it provides
* APIs for performing a distributed queries across multiple nodes:
* <ul>
* <li>
* {@link org.apache.ignite.IgniteEvents#remoteQuery(org.apache.ignite.lang.IgnitePredicate, long, int...)} -
* asynchronously querying events occurred on the nodes specified, including remote nodes.
* </li>
* <li>
* {@link org.apache.ignite.IgniteEvents#localQuery(org.apache.ignite.lang.IgnitePredicate, int...)} -
* querying only local events stored on this local node.
* </li>
* <li>
* {@link org.apache.ignite.IgniteEvents#localListen(org.apache.ignite.lang.IgnitePredicate, int...)} -
* listening to local grid events (events from remote nodes not included).
* </li>
* </ul>
* User can also wait for events using method {@link org.apache.ignite.IgniteEvents#waitForLocal(org.apache.ignite.lang.IgnitePredicate, int...)}.
* <h1 class="header">Events and Performance</h1>
* Note that by default all events in Ignite are enabled and therefore generated and stored
* by whatever event storage SPI is configured. Ignite can and often does generate thousands events per seconds
* under the load and therefore it creates a significant additional load on the system. If these events are
* not needed by the application this load is unnecessary and leads to significant performance degradation.
* <p>
* It is <b>highly recommended</b> to enable only those events that your application logic requires
* by using {@link org.apache.ignite.configuration.IgniteConfiguration#getIncludeEventTypes()} method in Ignite configuration. Note that certain
* events are required for Ignite's internal operations and such events will still be generated but not stored by
* event storage SPI if they are disabled in Ignite configuration.
* @see EventType#EVT_TASK_FAILED
* @see EventType#EVT_TASK_FINISHED
* @see EventType#EVT_TASK_REDUCED
* @see EventType#EVT_TASK_STARTED
* @see EventType#EVT_TASK_SESSION_ATTR_SET
* @see EventType#EVT_TASK_TIMEDOUT
* @see EventType#EVTS_TASK_EXECUTION
*/
public class TaskEventV2 extends TaskEvent {
/** */
private static final long serialVersionUID = 0L;

/** */
private Map<Object, Object> attributes;

/**
* Creates task event with given parameters.
*
* @param node Node.
* @param msg Optional message.
* @param type Event type.
* @param sesId Task session ID.
* @param taskName Task name.
* @param subjId Subject ID.
*/
ashapkin marked this conversation as resolved.
Show resolved Hide resolved
public TaskEventV2(ClusterNode node, String msg, int type, IgniteUuid sesId, String taskName, String taskClsName,
boolean internal, @Nullable UUID subjId, Map<Object, Object> attributes) {
super(node, msg, type, sesId, taskName, taskClsName, internal, subjId);
this.attributes = attributes;
}

public Map<Object, Object> attributes() {
ashapkin marked this conversation as resolved.
Show resolved Hide resolved
return attributes;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(TaskEventV2.class, this,
"nodeId8", U.id8(node().id()),
"msg", message(),
"type", name(),
"attributes", attributes(),
"tstamp", timestamp());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ public enum IgniteFeatures {
NEW_DR_FST_COMMANDS(67),

/** This feature enables checking a cache generation for incoming cache messages (see GridCacheIdMessage). */
CHECK_CACHE_GENERATION(68);
CHECK_CACHE_GENERATION(68),

/** This feature enables attributes to be included into TaskEvents. */
TASK_EVT_ATTRIBUTE_SUPPORT(69);

/**
* Unique feature identifier.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.ignite.events.Event;
import org.apache.ignite.events.JobEvent;
import org.apache.ignite.events.TaskEvent;
import org.apache.ignite.events.TaskEventV2;
import org.apache.ignite.internal.GridJobCancelRequest;
import org.apache.ignite.internal.GridJobContextImpl;
import org.apache.ignite.internal.GridJobExecuteRequest;
Expand Down Expand Up @@ -114,6 +115,8 @@
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_CANCEL;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_SIBLINGS;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
import static org.apache.ignite.internal.IgniteFeatures.TASK_EVT_ATTRIBUTE_SUPPORT;
import static org.apache.ignite.internal.IgniteFeatures.allNodesSupport;
import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.makeUpdateListener;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
Expand Down Expand Up @@ -1742,17 +1745,7 @@ private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest req)
U.resolveClassLoader(ses.getClassLoader(), ctx.config()));

if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) {
Event evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null);

ctx.event().record(evt);
recordTaskEvent(attrs, ses);
}

synchronized (ses) {
Expand Down Expand Up @@ -2442,4 +2435,37 @@ else if (jobId == null)
public long computeJobWorkerInterruptTimeout() {
return computeJobWorkerInterruptTimeout.getOrDefault(ctx.config().getFailureDetectionTimeout());
}

/**
* @param attrs Attributes set.
* @param ses Internal session.
*/
public void recordTaskEvent(Map<?, ?> attrs, GridTaskSessionImpl ses) {
Event evt;

if (allNodesSupport(ctx, TASK_EVT_ATTRIBUTE_SUPPORT)) {
evt = new TaskEventV2(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null,
ses.isFullSupport() ? ses.getAttributes() : null);
} else {
evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null);
}

ctx.event().record(evt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.TaskEvent;
import org.apache.ignite.events.TaskEventV2;
import org.apache.ignite.internal.ComputeTaskInternalFuture;
import org.apache.ignite.internal.GridJobExecuteResponse;
import org.apache.ignite.internal.GridJobSiblingImpl;
Expand Down Expand Up @@ -101,6 +102,8 @@
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_SIBLINGS;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK;
import static org.apache.ignite.internal.GridTopic.TOPIC_TASK_CANCEL;
import static org.apache.ignite.internal.IgniteFeatures.TASK_EVT_ATTRIBUTE_SUPPORT;
import static org.apache.ignite.internal.IgniteFeatures.allNodesSupport;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isPersistenceEnabled;
import static org.apache.ignite.internal.processors.metric.GridMetricManager.SYS_METRICS;
Expand Down Expand Up @@ -834,17 +837,34 @@ else if (task != null) {
if (ctx.event().isRecordable(EVT_MANAGEMENT_TASK_STARTED) && dep.visorManagementTask(task, taskCls)) {
VisorTaskArgument visorTaskArgument = (VisorTaskArgument)arg;

Event evt = new TaskEvent(
ctx.discovery().localNode(),
visorTaskArgument != null && visorTaskArgument.getArgument() != null
? visorTaskArgument.getArgument().toString() : "[]",
EVT_MANAGEMENT_TASK_STARTED,
ses.getId(),
taskCls == null ? null : taskCls.getSimpleName(),
"VisorManagementTask",
false,
subjId
);
Event evt;

if (allNodesSupport(ctx, TASK_EVT_ATTRIBUTE_SUPPORT)) {
evt = new TaskEventV2(
ctx.discovery().localNode(),
visorTaskArgument != null && visorTaskArgument.getArgument() != null
? visorTaskArgument.getArgument().toString() : "[]",
EVT_MANAGEMENT_TASK_STARTED,
ses.getId(),
taskCls == null ? null : taskCls.getSimpleName(),
"VisorManagementTask",
false,
subjId,
null
);
} else {
evt = new TaskEvent(
ctx.discovery().localNode(),
visorTaskArgument != null && visorTaskArgument.getArgument() != null
? visorTaskArgument.getArgument().toString() : "[]",
EVT_MANAGEMENT_TASK_STARTED,
ses.getId(),
taskCls == null ? null : taskCls.getSimpleName(),
"VisorManagementTask",
false,
subjId
);
}

ctx.event().record(evt);
}
Expand Down Expand Up @@ -1074,17 +1094,7 @@ private void sendSessionAttributes(Map<?, ?> attrs, GridTaskSessionImpl ses)
}

if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) {
Event evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null);

ctx.event().record(evt);
recordTaskEvent(attrs, ses);
}

notifyTaskStatusMonitors(ComputeTaskStatus.snapshot(ses), false);
Expand Down Expand Up @@ -1690,4 +1700,37 @@ public Map<ComputeJobStatusEnum, Long> jobStatuses(IgniteUuid sesId) {
else
return taskWorker.jobStatuses();
}

/**
* @param attrs Attributes set.
* @param ses Internal session.
*/
public void recordTaskEvent(Map<?, ?> attrs, GridTaskSessionImpl ses) {
Event evt;

if (allNodesSupport(ctx, TASK_EVT_ATTRIBUTE_SUPPORT)) {
evt = new TaskEventV2(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null,
ses.isFullSupport() ? ses.getAttributes() : null);
} else {
evt = new TaskEvent(
ctx.discovery().localNode(),
"Changed attributes: " + attrs,
EVT_TASK_SESSION_ATTR_SET,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
false,
null);
}

ctx.event().record(evt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.ignite.events.Event;
import org.apache.ignite.events.JobEvent;
import org.apache.ignite.events.TaskEvent;
import org.apache.ignite.events.TaskEventV2;
import org.apache.ignite.internal.ComputeTaskInternalFuture;
import org.apache.ignite.internal.GridInternalException;
import org.apache.ignite.internal.GridJobCancelRequest;
Expand Down Expand Up @@ -105,6 +106,8 @@
import static org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB;
import static org.apache.ignite.internal.GridTopic.TOPIC_JOB_CANCEL;
import static org.apache.ignite.internal.IgniteFeatures.TASK_EVT_ATTRIBUTE_SUPPORT;
import static org.apache.ignite.internal.IgniteFeatures.allNodesSupport;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
import static org.apache.ignite.internal.processors.job.ComputeJobStatusEnum.CANCELLED;
Expand Down Expand Up @@ -1576,15 +1579,29 @@ void onNodeLeft(UUID nodeId) {
*/
private void recordTaskEvent(int evtType, String msg) {
if (!internal && ctx.event().isRecordable(evtType)) {
Event evt = new TaskEvent(
ctx.discovery().localNode(),
msg,
evtType,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
internal,
subjId);
Event evt;
if (allNodesSupport(ctx, TASK_EVT_ATTRIBUTE_SUPPORT)) {
evt = new TaskEventV2(
ctx.discovery().localNode(),
msg,
evtType,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
internal,
subjId,
ses.isFullSupport() ? ses.getAttributes() : null);
} else {
evt = new TaskEvent(
ctx.discovery().localNode(),
msg,
evtType,
ses.getId(),
ses.getTaskName(),
ses.getTaskClassName(),
internal,
subjId);
}

ctx.event().record(evt);
}
Expand Down