Skip to content

Commit

Permalink
IGNITE-21139 Provide ability to extract a command argument and class …
Browse files Browse the repository at this point in the history
…from management task event
  • Loading branch information
NSAmelchev committed Jan 30, 2024
1 parent 042d4f5 commit 7a95544
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.events;

import java.util.UUID;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.EventType;
import org.apache.ignite.events.TaskEvent;
import org.apache.ignite.internal.visor.VisorTaskArgument;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;

/**
* Management task started event.
*
* @see EventType#EVT_MANAGEMENT_TASK_STARTED
*/
public class ManagementTaskEvent extends TaskEvent {
/** */
private static final long serialVersionUID = 0L;

/** */
private final VisorTaskArgument<?> arg;

/**
* Creates task event with given parameters.
*
* @param node Node.
* @param msg Optional message.
* @param type Event type.
* @param sesId Task session ID.
* @param taskName Task name.
* @param subjId Subject ID.
* @param internal Whether current task belongs to Ignite internal tasks.
* @param taskClsName Name ot the task class.
*/
public ManagementTaskEvent(ClusterNode node, String msg, int type, IgniteUuid sesId, String taskName,
String taskClsName, boolean internal, @Nullable UUID subjId, VisorTaskArgument<?> arg) {
super(node, msg, type, sesId, taskName, taskClsName, internal, subjId);

this.arg = arg;
}

/** @return Task argument. */
public VisorTaskArgument<?> argument() {
return arg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.compute.ComputeTaskCancelledCheckedException;
import org.apache.ignite.internal.events.ManagementTaskEvent;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
Expand Down Expand Up @@ -717,16 +718,17 @@ else if (task != null) {
&& ctx.grid().commandsRegistry().isCommandTask(taskCls)) {
VisorTaskArgument visorTaskArg = (VisorTaskArgument)arg;

Event evt = new TaskEvent(
Event evt = new ManagementTaskEvent(
ctx.discovery().localNode(),
visorTaskArg != null && visorTaskArg.getArgument() != null
? visorTaskArg.getArgument().toString() : "[]",
EVT_MANAGEMENT_TASK_STARTED,
ses.getId(),
taskCls == null ? null : taskCls.getSimpleName(),
"VisorManagementTask",
taskName,
taskCls == null ? null : taskCls.getName(),
false,
securitySubjectId(ctx)
securitySubjectId(ctx),
visorTaskArg
);

ctx.event().record(evt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.ignite.internal;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -35,6 +37,7 @@
import org.apache.ignite.internal.client.GridClientException;
import org.apache.ignite.internal.client.thin.TestTask;
import org.apache.ignite.internal.dto.IgniteDataTransferObject;
import org.apache.ignite.internal.events.ManagementTaskEvent;
import org.apache.ignite.internal.management.api.ComputeCommand;
import org.apache.ignite.internal.management.api.LocalCommand;
import org.apache.ignite.internal.management.api.RequireTask;
Expand Down Expand Up @@ -109,6 +112,8 @@ public void testNotManagementTask() throws Exception {
* @throws Exception If failed.
*/
private void doTestManagementTask(Class<? extends ComputeTask<?, ?>> cls, boolean expEvt) throws Exception {
TestCommandArg arg = new TestCommandArg("test-arg");

final AtomicReference<TaskEvent> evt = new AtomicReference<>();

final CountDownLatch evtLatch = new CountDownLatch(1);
Expand All @@ -124,11 +129,12 @@ private void doTestManagementTask(Class<? extends ComputeTask<?, ?>> cls, boolea
}, EventType.EVT_MANAGEMENT_TASK_STARTED);

for (ClusterNode node : ignite.cluster().forServers().nodes())
ignite.compute().executeAsync(cls.getName(), new VisorTaskArgument<>(node.id(), new TestCommandArg(), true));
ignite.compute().executeAsync(cls.getName(), new VisorTaskArgument<>(node.id(), arg, true));

if (expEvt) {
assertTrue(evtLatch.await(10000, TimeUnit.MILLISECONDS));
assertTrue(evt.get() instanceof TaskEvent);
assertTrue(evt.get() instanceof ManagementTaskEvent);
assertEquals(arg, ((ManagementTaskEvent)evt.get()).argument().getArgument());
}
else
assertFalse(evtLatch.await(1000, TimeUnit.MILLISECONDS));
Expand Down Expand Up @@ -211,14 +217,45 @@ public static class TestCommandArg extends IgniteDataTransferObject {
/** */
private static final long serialVersionUID = 0;

/** {@inheritDoc} */
@Override protected void writeExternalData(ObjectOutput out) {
/** */
private String param;

/** */
public TestCommandArg(String param) {
this.param = param;
}

/** */
public TestCommandArg() {
// No-op.
}

/** {@inheritDoc} */
@Override protected void readExternalData(byte protoVer, ObjectInput in) {
// No-op.
@Override protected void writeExternalData(ObjectOutput out) throws IOException {
out.writeObject(param);
}

/** {@inheritDoc} */
@Override protected void readExternalData(byte ver, ObjectInput in) throws IOException, ClassNotFoundException {
param = (String)in.readObject();
}

/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;

if (o == null || getClass() != o.getClass())
return false;

TestCommandArg arg = (TestCommandArg)o;

return Objects.equals(param, arg.param);
}

/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hash(param);
}
}
}

0 comments on commit 7a95544

Please sign in to comment.