Skip to content

Commit

Permalink
[fix][fn] Add missing version field back to querystate API (#21966)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng authored and Technoboy- committed Jan 27, 2024
1 parent f9e0237 commit 8025763
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,31 @@ public interface ByteBufferStateStore extends StateStore {
*/
CompletableFuture<ByteBuffer> getAsync(String key);

/**
* Retrieve the StateValue for the key.
*
* @param key name of the key
* @return the StateValue.
*/
default StateValue getStateValue(String key) {
return getStateValueAsync(key).join();
}

/**
* Retrieve the StateValue for the key, but don't wait for the operation to be completed.
*
* @param key name of the key
* @return the StateValue.
*/
default CompletableFuture<StateValue> getStateValueAsync(String key) {
return getAsync(key).thenApply(val -> {
if (val != null && val.remaining() >= 0) {
byte[] data = new byte[val.remaining()];
val.get(data);
return new StateValue(data, null, null);
} else {
return null;
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.api.state;

import lombok.AllArgsConstructor;
import lombok.Getter;

@Getter
@AllArgsConstructor
public class StateValue {
private final byte[] value;
private final Long version;
private final Boolean isNumber;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
<Method name="getSchema"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Class name="org.apache.pulsar.functions.api.state.StateValue"/>
<Method name="getValue"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Class name="org.apache.pulsar.functions.api.utils.FunctionRecord$FunctionRecordBuilder"/>
<Method name="properties"/>
Expand All @@ -39,4 +44,8 @@
<Method name="schema"/>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
<Match>
<Class name="org.apache.pulsar.functions.api.state.StateValue"/>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.options.Options;
import org.apache.pulsar.functions.api.StateStoreContext;
import org.apache.pulsar.functions.api.state.StateValue;
import org.apache.pulsar.functions.utils.FunctionCommon;

/**
Expand Down Expand Up @@ -190,4 +191,33 @@ public ByteBuffer get(String key) {
throw new RuntimeException("Failed to retrieve the state value for key '" + key + "'", e);
}
}

@Override
public StateValue getStateValue(String key) {
try {
return result(getStateValueAsync(key));
} catch (Exception e) {
throw new RuntimeException("Failed to retrieve the state value for key '" + key + "'", e);
}
}

@Override
public CompletableFuture<StateValue> getStateValueAsync(String key) {
return table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
data -> {
try {
if (data != null && data.value() != null && data.value().readableBytes() >= 0) {
byte[] result = new byte[data.value().readableBytes()];
data.value().readBytes(result);
return new StateValue(result, data.version(), data.isNumber());
}
return null;
} finally {
if (data != null) {
ReferenceCountUtil.safeRelease(data);
}
}
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.functions.api.StateStoreContext;
import org.apache.pulsar.functions.api.state.StateValue;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;

Expand Down Expand Up @@ -111,6 +112,20 @@ public CompletableFuture<ByteBuffer> getAsync(String key) {
.orElse(null));
}

@Override
public StateValue getStateValue(String key) {
return getStateValueAsync(key).join();
}

@Override
public CompletableFuture<StateValue> getStateValueAsync(String key) {
return store.get(getPath(key))
.thenApply(optRes ->
optRes.map(x ->
new StateValue(x.getValue(), x.getStat().getVersion(), null))
.orElse(null));
}

@Override
public void incrCounter(String key, long amount) {
incrCounterAsync(key, amount).join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.options.Options;
import org.apache.bookkeeper.api.kv.result.DeleteResult;
import org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.api.state.StateValue;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -114,6 +116,24 @@ public void testGetValue() throws Exception {
);
}

@Test
public void testGetStateValue() throws Exception {
KeyValue returnedKeyValue = mock(KeyValue.class);
ByteBuf returnedValue = Unpooled.copiedBuffer("test-value", UTF_8);
when(returnedKeyValue.value()).thenReturn(returnedValue);
when(returnedKeyValue.version()).thenReturn(1l);
when(returnedKeyValue.isNumber()).thenReturn(false);
when(mockTable.getKv(any(ByteBuf.class)))
.thenReturn(FutureUtils.value(returnedKeyValue));
StateValue result = stateContext.getStateValue("test-key");
assertEquals("test-value", new String(result.getValue(), UTF_8));
assertEquals(1l, result.getVersion().longValue());
assertEquals(false, result.getIsNumber().booleanValue());
verify(mockTable, times(1)).getKv(
eq(Unpooled.copiedBuffer("test-key", UTF_8))
);
}

@Test
public void testGetAmount() throws Exception {
when(mockTable.getNumber(any(ByteBuf.class)))
Expand All @@ -132,6 +152,12 @@ public void testGetKeyNotPresent() throws Exception {
assertTrue(result != null);
assertEquals(result.get(), null);

when(mockTable.getKv(any(ByteBuf.class)))
.thenReturn(FutureUtils.value(null));
CompletableFuture<StateValue> stateValueResult = stateContext.getStateValueAsync("test-key");
assertTrue(stateValueResult != null);
assertEquals(stateValueResult.get(), null);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.testng.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.functions.api.state.StateValue;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
Expand Down Expand Up @@ -101,6 +102,10 @@ public void testGetKeyNotPresent() throws Exception {
CompletableFuture<ByteBuffer> result = stateContext.getAsync("test-key");
assertTrue(result != null);
assertEquals(result.get(), null);

CompletableFuture<StateValue> stateValueResult = stateContext.getStateValueAsync("test-key");
assertTrue(stateValueResult != null);
assertEquals(stateValueResult.get(), null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.api.state.StateValue;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.functions.proto.Function;
Expand Down Expand Up @@ -1151,23 +1152,29 @@ public FunctionState getFunctionState(final String tenant,

try {
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
ByteBuffer buf = store.get(key);
if (buf == null) {
StateValue value = store.getStateValue(key);
if (value == null) {
throw new RestException(Status.NOT_FOUND, "key '" + key + "' doesn't exist.");
}
byte[] data = value.getValue();
if (data == null) {
throw new RestException(Status.NOT_FOUND, "key '" + key + "' doesn't exist.");
}

// try to parse the state as a long
// but even if it can be parsed as a long, this number may not be the actual state,
// so we will always return a `stringValue` or `bytesValue` with the number value
ByteBuffer buf = ByteBuffer.wrap(data);

Long number = null;
if (buf.remaining() == Long.BYTES) {
number = buf.getLong();
}
if (Boolean.TRUE.equals(value.getIsNumber())) {
return new FunctionState(key, null, null, number, value.getVersion());
}

if (Utf8.isWellFormed(buf.array())) {
return new FunctionState(key, new String(buf.array(), UTF_8), null, number, null);
if (Utf8.isWellFormed(data)) {
return new FunctionState(key, new String(data, UTF_8), null, number, value.getVersion());
} else {
return new FunctionState(key, null, buf.array(), number, null);
return new FunctionState(key, null, data, number, value.getVersion());
}
} catch (RestException e) {
throw e;
Expand Down Expand Up @@ -1215,7 +1222,7 @@ public void putFunctionState(final String tenant,
try {
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
ByteBuffer data;
if (StringUtils.isNotEmpty(state.getStringValue())) {
if (state.getStringValue() != null) {
data = ByteBuffer.wrap(state.getStringValue().getBytes(UTF_8));
} else if (state.getByteValue() != null) {
data = ByteBuffer.wrap(state.getByteValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ private void doTestPythonWordCountFunction(String functionName) throws Exception
getFunctionStatus(functionName, numMessages);

// get state
queryState(functionName, "hello", numMessages);
queryState(functionName, "test", numMessages);
queryState(functionName, "hello", numMessages, numMessages - 1);
queryState(functionName, "test", numMessages, numMessages - 1);
for (int i = 0; i < numMessages; i++) {
queryState(functionName, "message-" + i, 1);
queryState(functionName, "message-" + i, 1, 0);
}

// test put state
Expand Down Expand Up @@ -468,7 +468,7 @@ private void getFunctionStatus(String functionName, int numMessages) throws Exce
assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\" : " + numMessages));
}

private void queryState(String functionName, String key, int amount)
private void queryState(String functionName, String key, int amount, long version)
throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
Expand All @@ -480,6 +480,9 @@ private void queryState(String functionName, String key, int amount)
"--key", key
);
assertTrue(result.getStdout().contains("\"numberValue\": " + amount));
assertTrue(result.getStdout().contains("\"version\": " + version));
assertFalse(result.getStdout().contains("stringValue"));
assertFalse(result.getStdout().contains("byteValue"));
}

private void putAndQueryState(String functionName, String key, String state, String expect)
Expand Down

0 comments on commit 8025763

Please sign in to comment.