Skip to content

Commit

Permalink
Fix cancellation of running tests #55
Browse files Browse the repository at this point in the history
Fixes #55
  • Loading branch information
basilevs committed Apr 17, 2024
1 parent c11291a commit 534fa84
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 36 deletions.
Expand Up @@ -49,6 +49,9 @@ public IProcess execute(Command command) throws CoreException {
// public IProcess execute(final Command scriptlet, IPipe in, IPipe out)
// throws CoreException
public IProcess execute(Command scriptlet, IPipe in, IPipe out) throws CoreException {
if (isClosed()) {
throw new CoreException(Status.CANCEL_STATUS);
}
final ICommandService svc = scriptlet instanceof ProcInstance ? new ProcInstanceService()
: CorePlugin.getScriptletManager().getScriptletService(scriptlet);
final IPipe tinput = in == null ? createPipe().close(Status.OK_STATUS) : in;
Expand Down
Expand Up @@ -71,7 +71,7 @@ public synchronized IStatus waitFor(long timeout, IProgressMonitor monitor) thro
}

public synchronized boolean isAlive() {
return status == null;
return status == null && !session.isClosed();
}

public ISession getSession() {
Expand Down
Expand Up @@ -48,6 +48,9 @@ public IStatus service(Command command, IProcess process)
List<Object> contentOutput = new ArrayList<Object>();

for (int i = 0; i < times; i++) {
if (!process.isAlive()) {
throw new CoreException(Status.CANCEL_STATUS);
}
IPipe input = process.getSession().createPipe();
for (Object o : content)
input.write(o);
Expand Down
Expand Up @@ -51,6 +51,9 @@ public IStatus service(Command command, IProcess process) throws InterruptedExce
ISession session = process.getSession();
try {
for (int i = 0; i < times; i++) {
if (!process.isAlive()) {
throw new CoreException(Status.CANCEL_STATUS);
}
if (i + 1 == times && !t.isNoScreenshot()) {
session.putProperty(NO_SCREENSHOT, null);
} else {
Expand Down
Expand Up @@ -10,12 +10,16 @@
*******************************************************************************/
package org.eclipse.rcptt.ecl.server.tcp;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;

import org.eclipse.core.runtime.CoreException;
import org.eclipse.core.runtime.ILog;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Platform;
import org.eclipse.core.runtime.Status;
import org.eclipse.rcptt.ecl.core.Command;
import org.eclipse.rcptt.ecl.internal.core.CorePlugin;
Expand All @@ -24,12 +28,16 @@
import org.eclipse.rcptt.ecl.runtime.IPipe;
import org.eclipse.rcptt.ecl.runtime.IProcess;
import org.eclipse.rcptt.ecl.runtime.ISession;
import org.osgi.framework.FrameworkUtil;

final class SessionRequestHandler implements Runnable {
private final Socket socket;
private final ISession session;
private final BufferedInputStream inputStream;
private final int defaultTimeout;
private final static ILog LOG = Platform.getLog(FrameworkUtil.getBundle(SessionRequestHandler.class));

SessionRequestHandler(Socket socket, boolean useJobs) {
SessionRequestHandler(Socket socket, boolean useJobs) throws IOException {
// super("ECL tcp session:" + socket.getPort());
this.socket = socket;
try {
Expand All @@ -38,11 +46,13 @@ final class SessionRequestHandler implements Runnable {
CorePlugin.log(e);
}
this.session = EclRuntime.createSession(useJobs);
this.inputStream = new BufferedInputStream(socket.getInputStream());
this.defaultTimeout = socket.getSoTimeout();
}

public void run() {
try {
IPipe pipe = CoreUtils.createEMFPipe(socket.getInputStream(),
IPipe pipe = CoreUtils.createEMFPipe(inputStream,
socket.getOutputStream());
while (!Thread.currentThread().isInterrupted()
&& !socket.isClosed()) {
Expand Down Expand Up @@ -116,11 +126,15 @@ private IPipe readInput(IPipe pipe) throws CoreException {
}

private IStatus writeOutput(IPipe pipe, IProcess process)
throws CoreException {
throws CoreException, IOException {
Object object;
do {
object = process.getOutput().take(Long.MAX_VALUE);
if (object instanceof IStatus) {
object = process.getOutput().take(100);
if (object == null) {
if (!isConnected()) {
throw new CoreException(Status.CANCEL_STATUS);
}
} else if (object instanceof IStatus) {
try {
return process.waitFor();
} catch (InterruptedException e) {
Expand All @@ -129,8 +143,22 @@ private IStatus writeOutput(IPipe pipe, IProcess process)
} else {
pipe.write(object);
}

} while (true);
}

private boolean isConnected() throws IOException {
inputStream.mark(10);
socket.setSoTimeout(1);
try {
return inputStream.read() >= 0 && inputStream.read() >= 0;
} catch (SocketTimeoutException e) {
return true;
} finally {
socket.setSoTimeout(defaultTimeout);
inputStream.reset();
}
}

public void recover(Socket client) {
}
Expand Down
@@ -1,19 +1,20 @@
package org.eclipse.rcptt.ecl.client.tcp.tests;


import java.util.function.BiFunction;
import java.util.function.Function;

import org.eclipse.core.runtime.CoreException;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.eclipse.rcptt.ecl.core.Command;
import org.eclipse.rcptt.ecl.runtime.ICommandService;
import org.eclipse.rcptt.ecl.runtime.IProcess;

import com.google.common.base.Function;

public class EclInjectedCommandService implements ICommandService {
public static Function<Command, IStatus> delegate = new Function<Command, IStatus>() {
public static BiFunction<Command, IProcess, IStatus> delegate = new BiFunction<Command, IProcess, IStatus>() {
@Override
public IStatus apply(Command ignored) {
public IStatus apply(Command ignored, IProcess ignored2) {
return Status.OK_STATUS;
}
};
Expand All @@ -23,7 +24,16 @@ public EclInjectedCommandService() {

@Override
public IStatus service(Command command, IProcess context) throws InterruptedException, CoreException {
return delegate.apply(command);
return delegate.apply(command, context);
}


public static void inject(Function<Command, IStatus> injection) {
delegate = (command, process) -> injection.apply(command);
}

public static void inject(BiFunction<Command, IProcess, IStatus> injection) {
delegate = injection;
}

}
Expand Up @@ -13,9 +13,15 @@
import com.google.common.base.Function;
import com.google.common.io.Closer;

import static org.junit.Assert.assertTrue;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.core.runtime.CoreException;
import org.eclipse.core.runtime.IStatus;
Expand Down Expand Up @@ -66,12 +72,7 @@ public void close() throws IOException {
}
}
});
EclInjectedCommandService.delegate = new Function<Command, IStatus>() {
@Override
public IStatus apply(Command ignored) {
return Status.OK_STATUS;
}
};
EclInjectedCommandService.inject(ignored -> Status.OK_STATUS);
}

@After
Expand All @@ -81,12 +82,7 @@ public void closeServer() throws CoreException, IOException {

@Test
public void simpleExecution() throws CoreException, InterruptedException {
EclInjectedCommandService.delegate = new Function<Command, IStatus>() {
@Override
public IStatus apply(Command ignored) {
return new Status(IStatus.INFO, "id", "message");
}
};
EclInjectedCommandService.inject(ignored -> new Status(IStatus.INFO, "id", "message"));
IStatus status = executeCommand();
Assert.assertFalse(status.getMessage(), status.matches(IStatus.ERROR | IStatus.WARNING | IStatus.CANCEL));
Assert.assertTrue(status.matches(IStatus.INFO));
Expand All @@ -102,12 +98,7 @@ private IStatus executeCommand() throws CoreException, InterruptedException {

@Test
public void errorIsReported() throws CoreException, InterruptedException {
EclInjectedCommandService.delegate = new Function<Command, IStatus>() {
@Override
public IStatus apply(Command ignored) {
return new Status(IStatus.ERROR, "id", "message");
}
};
EclInjectedCommandService.inject(ignored -> new Status(IStatus.ERROR, "id", "message"));
IStatus status = executeCommand();
Assert.assertTrue(status.matches(IStatus.ERROR));
Assert.assertEquals("message", status.getMessage());
Expand All @@ -121,13 +112,10 @@ private static void absurdFunctionThatThrows() {

@Test
public void istatusPropagatesTraces() throws CoreException, InterruptedException {
EclInjectedCommandService.delegate = new Function<Command, IStatus>() {
@Override
public IStatus apply(Command ignored) {
EclInjectedCommandService.inject( ignored -> {
absurdFunctionThatThrows();
return Status.OK_STATUS;
}
};
});
IStatus status = executeCommand();
Throwable e = status.getException();
while (! (e instanceof CoreException)) {
Expand Down Expand Up @@ -173,4 +161,32 @@ public void testCommandExecution001() throws Throwable {
}
TestCase.assertEquals(count1, count2);
}

@Test(timeout=10000)
public void processDiesIfSessionIsClosed() throws CoreException, InterruptedException {
AtomicBoolean isAlive = new AtomicBoolean();
CountDownLatch start = new CountDownLatch(1);
CountDownLatch stop = new CountDownLatch(1);
EclInjectedCommandService.inject((ignored, process) -> {
do {
isAlive.set(process.isAlive());
start.countDown();
} while (isAlive.get());
stop.countDown();
return Status.CANCEL_STATUS;
});

CompletableFuture.runAsync(() -> {
try {
executeCommand();
} catch (CoreException | InterruptedException e) {
throw new AssertionError(e);
}
});
assertTrue(start.await(1, TimeUnit.SECONDS));
assertTrue(isAlive.get());
session.close();
assertTrue(stop.await(1, TimeUnit.SECONDS));
Assert.assertFalse(isAlive.get());
}
}
Expand Up @@ -47,6 +47,20 @@ public DataExecutable(AutLaunch launch, IQ7NamedElement element,
}
this.launch = launch;
executionMonitor = new NullProgressMonitor();
addListener(new Listener() {

@Override
public void onStatusChange(Executable executable) {
if (getStatus() == State.COMPLETED) {
executionMonitor.setCanceled(true);
}
}

@Override
public void updateSessionCounters(Executable executable, IStatus status) {
}

});
}

public AutLaunch getAut() {
Expand Down
Expand Up @@ -30,7 +30,13 @@ public IStatus service(Command command, IProcess context)
int ms = wait.getMs();
if (ms < 0)
return TeslaImplPlugin.err("Negative delay is not permitted");
Thread.sleep(ms); // any exceptions will be handled by session
long stop = System.currentTimeMillis() + ms;
while (System.currentTimeMillis() < stop) {
if (!context.isAlive()) {
throw new CoreException(Status.CANCEL_STATUS);
}
Thread.sleep(100); // any exceptions will be handled by session
}
return Status.OK_STATUS;
}

Expand Down
Expand Up @@ -12,7 +12,9 @@

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.eclipse.core.runtime.CoreException;
import org.eclipse.rcptt.ecl.client.tcp.EclTcpSession;
import org.eclipse.rcptt.ecl.debug.runtime.SuspendListener;
import org.eclipse.rcptt.ecl.debug.runtime.SuspendManager;
Expand Down Expand Up @@ -66,8 +68,12 @@ public void start() {

private void testLocalEclServer() {
try {
new EclTcpSession(InetAddress.getByName("localhost"), ecl);
new EclTcpSession(InetAddress.getByName("localhost"), ecl).close();
Activator.info("Verified that local ECL server is working");
} catch ( CoreException e) {
Activator.err(e, "Error testing a local ECL server. Something is blocking connection");
} catch (UnknownHostException e) {
Activator.err(e, "Error testing a local ECL server. Something is blocking connection");
} catch (IOException e) {
Activator.err(e, "Error testing a local ECL server. Something is blocking connection");
}
Expand Down

0 comments on commit 534fa84

Please sign in to comment.