Skip to content

Commit

Permalink
Merge pull request #125 from kaleido-io/deactivate
Browse files Browse the repository at this point in the history
Add /deactivatepool API for deleting listeners
  • Loading branch information
peterbroadhurst committed May 24, 2023
2 parents aba1386 + 58de133 commit a1429fd
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 16 deletions.
16 changes: 16 additions & 0 deletions src/event-stream/event-stream.service.ts
Expand Up @@ -321,6 +321,22 @@ export class EventStreamService {
);
}

async deleteSubscriptionByName(ctx: Context, streamId: string, name: string) {
const existingSubscriptions = await this.getSubscriptions(ctx);
const sub = existingSubscriptions.find(s => s.name === name && s.stream === streamId);
if (!sub) {
this.logger.log(`No subscription found for ${name}`);
return false;
}
await lastValueFrom(
this.http.delete(
new URL(`/subscriptions/${sub.id}`, this.baseUrl).href,
this.requestOptions(ctx),
),
);
return true;
}

connect(
url: string,
topic: string,
Expand Down
2 changes: 1 addition & 1 deletion src/main.ts
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { NestApplicationOptions, ShutdownSignal, ValidationPipe } from '@nestjs/common';
import { ShutdownSignal, ValidationPipe } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { NestFactory } from '@nestjs/core';
import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger';
Expand Down
33 changes: 22 additions & 11 deletions src/tokens/tokens.controller.ts
Expand Up @@ -28,10 +28,11 @@ import {
TokenMint,
TokenPool,
TokenPoolActivate,
TokenPoolDeactivate,
TokenTransfer,
} from './tokens.interfaces';
import { TokensService } from './tokens.service';
import { RequestContext } from '../request-context/request-context.decorator';
import { Context, RequestContext } from '../request-context/request-context.decorator';

@Controller()
export class TokensController {
Expand All @@ -40,7 +41,7 @@ export class TokensController {
@Post('init')
@HttpCode(204)
@ApiOperation({ summary: 'Perform one-time initialization (if not auto-initialized)' })
async init(@RequestContext() ctx) {
async init(@RequestContext() ctx: Context) {
await this.service.init(ctx);
}

Expand All @@ -53,21 +54,31 @@ export class TokensController {
})
@ApiBody({ type: TokenPool })
@ApiResponse({ status: 202, type: AsyncResponse })
createPool(@RequestContext() ctx, @Body() dto: TokenPool) {
createPool(@RequestContext() ctx: Context, @Body() dto: TokenPool) {
return this.service.createPool(ctx, dto);
}

@Post('activatepool')
@HttpCode(204)
@ApiOperation({
summary: 'Activate a token pool to begin receiving transfer events',
summary: 'Activate a token pool to begin receiving transfer and approval events',
description: 'Will retrigger the token-pool event for this pool as a side-effect',
})
@ApiBody({ type: TokenPoolActivate })
async activatePool(@RequestContext() ctx, @Body() dto: TokenPoolActivate) {
async activatePool(@RequestContext() ctx: Context, @Body() dto: TokenPoolActivate) {
await this.service.activatePool(ctx, dto);
}

@Post('deactivatepool')
@HttpCode(204)
@ApiOperation({
summary: 'Deactivate a token pool to delete all listeners and stop receiving events',
})
@ApiBody({ type: TokenPoolDeactivate })
async deactivatePool(@RequestContext() ctx: Context, @Body() dto: TokenPoolDeactivate) {
await this.service.deactivatePool(ctx, dto);
}

@Post('mint')
@HttpCode(202)
@ApiOperation({
Expand All @@ -77,7 +88,7 @@ export class TokensController {
})
@ApiBody({ type: TokenMint })
@ApiResponse({ status: 202, type: AsyncResponse })
mint(@RequestContext() ctx, @Body() dto: TokenMint) {
mint(@RequestContext() ctx: Context, @Body() dto: TokenMint) {
return this.service.mint(ctx, dto);
}

Expand All @@ -99,7 +110,7 @@ export class TokensController {
})
@ApiBody({ type: TokenApproval })
@ApiResponse({ status: 202, type: AsyncResponse })
approve(@RequestContext() ctx, @Body() dto: TokenApproval) {
approve(@RequestContext() ctx: Context, @Body() dto: TokenApproval) {
return this.service.approval(ctx, dto);
}

Expand All @@ -112,7 +123,7 @@ export class TokensController {
})
@ApiBody({ type: TokenBurn })
@ApiResponse({ status: 202, type: AsyncResponse })
burn(@RequestContext() ctx, @Body() dto: TokenBurn) {
burn(@RequestContext() ctx: Context, @Body() dto: TokenBurn) {
return this.service.burn(ctx, dto);
}

Expand All @@ -125,21 +136,21 @@ export class TokensController {
})
@ApiBody({ type: TokenTransfer })
@ApiResponse({ status: 202, type: AsyncResponse })
transfer(@RequestContext() ctx, @Body() dto: TokenTransfer) {
transfer(@RequestContext() ctx: Context, @Body() dto: TokenTransfer) {
return this.service.transfer(ctx, dto);
}

@Get('balance')
@ApiOperation({ summary: 'Retrieve a token balance' })
@ApiResponse({ status: 200, type: TokenBalance })
balance(@RequestContext() ctx, @Query() query: TokenBalanceQuery) {
balance(@RequestContext() ctx: Context, @Query() query: TokenBalanceQuery) {
return this.service.balance(ctx, query);
}

@Get('receipt/:id')
@ApiOperation({ summary: 'Retrieve the result of an async operation' })
@ApiResponse({ status: 200, type: EventStreamReply })
getReceipt(@RequestContext() ctx, @Param('id') id: string) {
getReceipt(@RequestContext() ctx: Context, @Param('id') id: string) {
return this.blockchain.getReceipt(ctx, id);
}
}
12 changes: 9 additions & 3 deletions src/tokens/tokens.interfaces.ts
Expand Up @@ -187,11 +187,17 @@ export class TokenPoolActivate {

@ApiProperty()
@IsOptional()
config?: any;
config?: TokenPoolConfig;

@ApiProperty({ description: requestIdDescription })
@ApiProperty()
@IsOptional()
requestId?: string;
poolData?: string;
}

export class TokenPoolDeactivate {
@ApiProperty()
@IsNotEmpty()
poolLocator: string;

@ApiProperty()
@IsOptional()
Expand Down
64 changes: 63 additions & 1 deletion src/tokens/tokens.service.ts
Expand Up @@ -14,7 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import { Injectable, Logger } from '@nestjs/common';
import { Injectable, Logger, NotFoundException } from '@nestjs/common';
import { EventStreamService } from '../event-stream/event-stream.service';
import { EventStream, EventStreamSubscription } from '../event-stream/event-stream.interfaces';
import { EventStreamProxyGateway } from '../eventstream-proxy/eventstream-proxy.gateway';
Expand All @@ -33,6 +33,7 @@ import {
TokenMint,
TokenPool,
TokenPoolActivate,
TokenPoolDeactivate,
TokenTransfer,
} from './tokens.interfaces';
import {
Expand Down Expand Up @@ -325,6 +326,67 @@ export class TokensService {
await Promise.all(promises);
}

async deactivatePool(ctx: Context, dto: TokenPoolDeactivate) {
const tokenCreateEvent = this.mapper.getCreateEvent();
const stream = await this.getStream(ctx);

const promises: Promise<boolean>[] = [];
if (tokenCreateEvent?.name !== undefined) {
promises.push(
this.eventstream.deleteSubscriptionByName(
ctx,
stream.id,
packSubscriptionName(
this.instancePath,
dto.poolLocator,
tokenCreateEvent.name,
dto.poolData,
),
),
);
}

promises.push(
...[
this.eventstream.deleteSubscriptionByName(
ctx,
stream.id,
packSubscriptionName(
this.instancePath,
dto.poolLocator,
TransferSingle.name,
dto.poolData,
),
),
this.eventstream.deleteSubscriptionByName(
ctx,
stream.id,
packSubscriptionName(
this.instancePath,
dto.poolLocator,
TransferBatch.name,
dto.poolData,
),
),
this.eventstream.deleteSubscriptionByName(
ctx,
stream.id,
packSubscriptionName(
this.instancePath,
dto.poolLocator,
ApprovalForAll.name,
dto.poolData,
),
),
],
);

const results = await Promise.all(promises);
if (results.every(deleted => !deleted)) {
throw new NotFoundException('No listeners found');
}
}

checkInterface(dto: CheckInterfaceRequest): CheckInterfaceResponse {
const wrapMethods = (methods: IAbiMethod[]): TokenInterface => {
return { format: InterfaceFormat.ABI, methods };
Expand Down

0 comments on commit a1429fd

Please sign in to comment.