Skip to content

Commit

Permalink
fix issue #41
Browse files Browse the repository at this point in the history
introduce isResourceAllocated() and support reconnect.
  • Loading branch information
lynus committed Jan 23, 2019
1 parent aa27636 commit 2586e0c
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 51 deletions.
10 changes: 6 additions & 4 deletions src/main/java/com/ibm/disni/RdmaActiveEndpointGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public IbvQP createQpProvider(C endpoint) throws IOException{

public void allocateResources(C endpoint) throws Exception {
endpoint.allocateResources();
}
}

public void close() throws IOException, InterruptedException {
super.close();
Expand All @@ -110,9 +110,11 @@ public void close() throws IOException, InterruptedException {
}

void close(RdmaEndpoint endpoint) throws IOException {
IbvContext context = endpoint.getIdPriv().getVerbs();
RdmaActiveCqProcessor<C> cqProcessor = cqMap.get(context.getCmd_fd());
cqProcessor.unregister(endpoint);
if (endpoint.isResouceAllocated()) {
IbvContext context = endpoint.getIdPriv().getVerbs();
RdmaActiveCqProcessor<C> cqProcessor = cqMap.get(context.getCmd_fd());
cqProcessor.unregister(endpoint);
}
}

public int getMaxWR() {
Expand Down
101 changes: 54 additions & 47 deletions src/main/java/com/ibm/disni/RdmaEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ public class RdmaEndpoint {
private boolean isClosed;
private boolean isInitialized;
private boolean serverSide;

private boolean isError;

protected RdmaEndpoint(RdmaEndpointGroup<? extends RdmaEndpoint> group, RdmaCmId idPriv, boolean serverSide) throws IOException{
this.endpointId = group.getNextId();
this.group = group;
this.idPriv = idPriv;
this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ;

this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ;
this.isError = false;

this.qp = null;
this.pd = null;
this.cqProcessor = null;
Expand All @@ -84,7 +86,7 @@ protected RdmaEndpoint(RdmaEndpointGroup<? extends RdmaEndpoint> group, RdmaCmId
this.serverSide = serverSide;
logger.info("new client endpoint, id " + endpointId + ", idPriv " + idPriv.getPs());
}

/**
/**
/**
Expand All @@ -94,41 +96,44 @@ protected RdmaEndpoint(RdmaEndpointGroup<? extends RdmaEndpoint> group, RdmaCmId
* @param timeout connection timeout
*/
public synchronized void connect(SocketAddress dst, int timeout) throws Exception {
if (connState != CONN_STATE_INITIALIZED) {
if (connState != CONN_STATE_INITIALIZED && !isError) {
throw new IOException("endpoint already connected");
}
idPriv.resolveAddr(null, dst, timeout);
while(connState < CONN_STATE_ADDR_RESOLVED){
wait();
isError = false;
if (connState < CONN_STATE_ADDR_RESOLVED) {
idPriv.resolveAddr(null, dst, timeout);
while (connState < CONN_STATE_ADDR_RESOLVED && !isError) {
wait();
}
if (isError)
throw new IOException("resolve address failed");
}
if (connState != CONN_STATE_ADDR_RESOLVED){
throw new IOException("resolve address failed");
if (connState < CONN_STATE_ROUTE_RESOLVED) {
idPriv.resolveRoute(timeout);
while (connState < CONN_STATE_ROUTE_RESOLVED && !isError) {
wait();
}
if (isError)
throw new IOException("resolve route failed");
}

idPriv.resolveRoute(timeout);
while(connState < CONN_STATE_ROUTE_RESOLVED){
wait();
if (connState < CONN_STATE_RESOURCES_ALLOCATED) {
group.allocateResourcesRaw(this);
while (connState < CONN_STATE_RESOURCES_ALLOCATED && !isError) {
wait();
}
if (isError)
throw new IOException("allocate resource failed");
}
if (connState != CONN_STATE_ROUTE_RESOLVED){
throw new IOException("resolve route failed");
}

group.allocateResourcesRaw(this);
while(connState < CONN_STATE_RESOURCES_ALLOCATED){
wait();
}
if (connState != CONN_STATE_RESOURCES_ALLOCATED){
throw new IOException("resolve route failed");
}

RdmaConnParam connParam = getConnParam();
idPriv.connect(connParam);
while(connState < CONN_STATE_CONNECTED){

while(connState < CONN_STATE_CONNECTED && !isError){
wait();
}
}

}
if (isError)
throw new IOException("idPriv connect failed");
}

/* (non-Javadoc)
* @see com.ibm.jverbs.endpoints.ICmConsumer#dispatchCmEvent(com.ibm.jverbs.cm.RdmaCmEvent)
*/
Expand All @@ -138,28 +143,27 @@ public synchronized void dispatchCmEvent(RdmaCmEvent cmEvent)
int eventType = cmEvent.getEvent();
if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_ADDR_RESOLVED.ordinal()) {
connState = RdmaEndpoint.CONN_STATE_ADDR_RESOLVED;
notifyAll();
} else if (cmEvent.getEvent() == RdmaCmEvent.EventType.RDMA_CM_EVENT_ROUTE_RESOLVED.ordinal()) {
connState = RdmaEndpoint.CONN_STATE_ROUTE_RESOLVED;
notifyAll();
} else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_ESTABLISHED.ordinal()) {
logger.info("got event type + RDMA_CM_EVENT_ESTABLISHED, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
connState = CONN_STATE_CONNECTED;
notifyAll();
} else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_DISCONNECTED.ordinal()) {
logger.info("got event type + RDMA_CM_EVENT_DISCONNECTED, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
connState = CONN_STATE_CLOSED;
notifyAll();
} else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_CONNECT_REQUEST.ordinal()) {
logger.info("got event type + RDMA_CM_EVENT_CONNECT_REQUEST, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
} else {
System.err.println("got other event: " + eventType);
logger.info("got event type + UNKNOWN, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
isError = true;
}
notifyAll();
} catch (Exception e) {
throw new IOException(e);
}
}

public final synchronized void allocateResources() throws IOException {
if (!isInitialized) {
this.pd = group.createProtectionDomainRaw(this);
Expand All @@ -176,31 +180,31 @@ synchronized void accept() throws Exception {
group.allocateResourcesRaw(this);
while(connState < CONN_STATE_RESOURCES_ALLOCATED){
wait();
}
}
if (connState != CONN_STATE_RESOURCES_ALLOCATED){
throw new IOException("resolve route failed");
}
}

RdmaConnParam connParam = getConnParam();
idPriv.accept(connParam);
idPriv.accept(connParam);
while(connState < CONN_STATE_CONNECTED){
wait();
}
}
}

/**
* Close this endpoint.
*
* This closes the connection and free's all the resources, e.g., queue pair.
* @throws InterruptedException
* Close this endpoint.
*
* This closes the connection and free's all the resources, e.g., queue pair.
* @throws InterruptedException
*
* @throws Exception the exception
*/
public synchronized void close() throws IOException, InterruptedException {
if (isClosed){
return;
}

logger.info("closing client endpoint");
if (connState == CONN_STATE_CONNECTED) {
idPriv.disconnect();
Expand All @@ -214,7 +218,7 @@ public synchronized void close() throws IOException, InterruptedException {
isClosed = true;
logger.info("closing client done");
}

/**
* Checks if the endpoint is connected.
*
Expand All @@ -223,6 +227,9 @@ public synchronized void close() throws IOException, InterruptedException {
public synchronized boolean isConnected() {
return (connState == CONN_STATE_CONNECTED);
}
public synchronized boolean isResouceAllocated() {
return (connState == CONN_STATE_RESOURCES_ALLOCATED);
}

/**
* Checks if the endpoint is closed.
Expand Down

0 comments on commit 2586e0c

Please sign in to comment.