Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix issue #41 #43

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/main/java/com/ibm/disni/RdmaActiveEndpointGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public IbvQP createQpProvider(C endpoint) throws IOException{

public void allocateResources(C endpoint) throws Exception {
endpoint.allocateResources();
}
}

public void close() throws IOException, InterruptedException {
super.close();
Expand All @@ -110,9 +110,11 @@ public void close() throws IOException, InterruptedException {
}

void close(RdmaEndpoint endpoint) throws IOException {
IbvContext context = endpoint.getIdPriv().getVerbs();
RdmaActiveCqProcessor<C> cqProcessor = cqMap.get(context.getCmd_fd());
cqProcessor.unregister(endpoint);
if (endpoint.isResouceAllocated()) {
IbvContext context = endpoint.getIdPriv().getVerbs();
RdmaActiveCqProcessor<C> cqProcessor = cqMap.get(context.getCmd_fd());
cqProcessor.unregister(endpoint);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we allow partially allocated ActiveEndpoints to call close() in the first place, only to then check if they have been allocated and prevent the close from proceeding? Rather I suggest we check the allocation state in the endpoint and decide whether or not to de-allocate.

That being said, I'm not convinced that preventing close on partially allocated enpoints is a good idea. What if I do want to close and cleanup the resources? I can't do so anymore. Also, this leaves partially allocated endpoints in the group, lingering around forever. I think we should leave it to the application to decide whether they want to close or not, then then make sure the close call closes whatever needs to be closed while not closing what doesn't need to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention is not to prevent closing partial allocated endpoints, but to prevent releasing un-allocated resource when users decide to close the failed ep.

Rather I suggest we check the allocation state in the endpoint and decide whether or not to de-allocate.

This is exactly what I am trying to achieve by introducing isResourceAllocated(). Application ep's close() implementation should be the following pattern:
super.close();
if (isResourceAllocated())
release its own resources...

Check if resources has been successfully allocated before releasing its own resources. super.close() can be called without checking is because current close() of RdmaEndpoint works fine with partial allocated ep, and RdmaActiveEndpoint's needs the patch you quoted above to work.

Also, this leaves partially allocated endpoints in the group, lingering around forever.

This is not the case. Since ep throws exceptions before allocating resources

group.allocateResourcesRaw(this);

this ep never registered into cqProcessor. It is also unregistered from group's clientEndpointMap in
group.unregisterClientEp(this);

Therefore, no residues left after close().

In summary, users have to choices when catching exception on connect(): 1) close the failed ep and 2) try to connect() again. I think the patch can properly preventing releasing un-allocated resources if users decide to take opt-1. And the logic in connect() can handle the opt-2 situation.

}

public int getMaxWR() {
Expand Down
80 changes: 43 additions & 37 deletions src/main/java/com/ibm/disni/RdmaEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ public class RdmaEndpoint {
private boolean isClosed;
private boolean isInitialized;
private boolean serverSide;

private boolean isError;

protected RdmaEndpoint(RdmaEndpointGroup<? extends RdmaEndpoint> group, RdmaCmId idPriv, boolean serverSide) throws IOException{
this.endpointId = group.getNextId();
this.group = group;
this.idPriv = idPriv;
this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ;

this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ;
this.isError = false;

this.qp = null;
this.pd = null;
this.cqProcessor = null;
Expand All @@ -84,7 +86,7 @@ protected RdmaEndpoint(RdmaEndpointGroup<? extends RdmaEndpoint> group, RdmaCmId
this.serverSide = serverSide;
logger.info("new client endpoint, id " + endpointId + ", idPriv " + idPriv.getPs());
}

/**
/**
/**
Expand All @@ -94,41 +96,44 @@ protected RdmaEndpoint(RdmaEndpointGroup<? extends RdmaEndpoint> group, RdmaCmId
* @param timeout connection timeout
*/
public synchronized void connect(SocketAddress dst, int timeout) throws Exception {
if (connState != CONN_STATE_INITIALIZED) {
if (connState != CONN_STATE_INITIALIZED && !isError) {
throw new IOException("endpoint already connected");
}
idPriv.resolveAddr(null, dst, timeout);
while(connState < CONN_STATE_ADDR_RESOLVED){
wait();
isError = false;
if (connState < CONN_STATE_ADDR_RESOLVED) {
idPriv.resolveAddr(null, dst, timeout);
while (connState < CONN_STATE_ADDR_RESOLVED && !isError) {
wait();
}
if (isError)
throw new IOException("resolve address failed");
}
if (connState != CONN_STATE_ADDR_RESOLVED){
throw new IOException("resolve address failed");
if (connState < CONN_STATE_ROUTE_RESOLVED) {
idPriv.resolveRoute(timeout);
while (connState < CONN_STATE_ROUTE_RESOLVED && !isError) {
wait();
}
if (isError)
throw new IOException("resolve route failed");
}

idPriv.resolveRoute(timeout);
while(connState < CONN_STATE_ROUTE_RESOLVED){
wait();
if (connState < CONN_STATE_RESOURCES_ALLOCATED) {
group.allocateResourcesRaw(this);
while (connState < CONN_STATE_RESOURCES_ALLOCATED && !isError) {
wait();
}
if (isError)
throw new IOException("allocate resource failed");
}
if (connState != CONN_STATE_ROUTE_RESOLVED){
throw new IOException("resolve route failed");
}

group.allocateResourcesRaw(this);
while(connState < CONN_STATE_RESOURCES_ALLOCATED){
wait();
}
if (connState != CONN_STATE_RESOURCES_ALLOCATED){
throw new IOException("resources allocation failed");
}

RdmaConnParam connParam = getConnParam();
idPriv.connect(connParam);
while(connState < CONN_STATE_CONNECTED){

while(connState < CONN_STATE_CONNECTED && !isError){
wait();
}
}

}
if (isError)
throw new IOException("idPriv connect failed");
}

/* (non-Javadoc)
* @see com.ibm.jverbs.endpoints.ICmConsumer#dispatchCmEvent(com.ibm.jverbs.cm.RdmaCmEvent)
*/
Expand All @@ -138,28 +143,26 @@ public synchronized void dispatchCmEvent(RdmaCmEvent cmEvent)
int eventType = cmEvent.getEvent();
if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_ADDR_RESOLVED.ordinal()) {
connState = RdmaEndpoint.CONN_STATE_ADDR_RESOLVED;
notifyAll();
} else if (cmEvent.getEvent() == RdmaCmEvent.EventType.RDMA_CM_EVENT_ROUTE_RESOLVED.ordinal()) {
connState = RdmaEndpoint.CONN_STATE_ROUTE_RESOLVED;
notifyAll();
} else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_ESTABLISHED.ordinal()) {
logger.info("got event type + RDMA_CM_EVENT_ESTABLISHED, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
connState = CONN_STATE_CONNECTED;
notifyAll();
} else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_DISCONNECTED.ordinal()) {
logger.info("got event type + RDMA_CM_EVENT_DISCONNECTED, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
connState = CONN_STATE_CLOSED;
notifyAll();
} else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_CONNECT_REQUEST.ordinal()) {
logger.info("got event type + RDMA_CM_EVENT_CONNECT_REQUEST, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
} else {
logger.info("got event type + UNKNOWN, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
isError = true;
}
notifyAll();
} catch (Exception e) {
throw new IOException(e);
}
}

public final synchronized void allocateResources() throws IOException {
if (!isInitialized) {
this.pd = group.createProtectionDomainRaw(this);
Expand Down Expand Up @@ -200,7 +203,7 @@ public synchronized void close() throws IOException, InterruptedException {
if (isClosed){
return;
}

logger.info("closing client endpoint");
if (connState == CONN_STATE_CONNECTED) {
idPriv.disconnect();
Expand All @@ -223,6 +226,9 @@ public synchronized void close() throws IOException, InterruptedException {
public synchronized boolean isConnected() {
return (connState == CONN_STATE_CONNECTED);
}
public synchronized boolean isResouceAllocated() {
return (connState == CONN_STATE_RESOURCES_ALLOCATED);
}

/**
* Checks if the endpoint is closed.
Expand Down