Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema validation failed while deserialize data serialized by Encoder.Bean Serializer using Kryo. #1075

Open
ams45 opened this issue Apr 30, 2024 · 1 comment

Comments

@ams45
Copy link

ams45 commented Apr 30, 2024

Describe the bug
For stateful spark structured Streaming query processing, Used RocksDB state store, and existing state data is serialized by Spark's Encoders.bean serializer, still to have backward compatibility I changed serializer to Encoders.kryo with other registration changes and used CompatibleFieldSerializer but it failed with below error.
`- Provided value schema: StructType(StructField(groupState,StructType(StructField(value,BinaryType,true)),true))

  • Existing value schema: StructType(StructField(groupState,StructType(StructField(count,LongType,true),StructField(word,StringType,true),StructField(wordLen,LongType,true),StructField(wordsTs,ArrayType(LongType,true),true)),true))
    If you want to force running query without schema validation, please set spark.sql.streaming.stateStore.stateSchemaCheck to false.
    Please note running query with incompatible schema could cause indeterministic behavior.`

To Reproduce
register CompatibleFieldSerializer serializer as mentioned below.
`public class CustomKryoRegistrator implements KryoRegistrator{

@Override
public void registerClasses(Kryo kryo) {

	//kryo.register(StateInfo.class, new StateInfoSerdes());  // custom serdes 
	kryo.register(StateInfo.class, new CompatibleFieldSerializer<>(kryo, StateInfo.class));  // inbuild impl provided
	kryo.register(java.util.Collections.emptyList().getClass());
	kryo.register(java.util.HashMap.class);
	kryo.register(java.util.LinkedHashMap.class);
	
}

}`

Create StateInfo class for state data with below fields.
private String word; private Long count; private List<Long> wordsTs; private Long wordLen; private Map<String, LinkedHashMap<String, String>> hashSrcEvts;

write a sample world count spark streaming query
`public class SparkStreamingWordCount {

public static void start() throws StreamingQueryException {
	    	
    SparkSession spark = createSparkSession();
    List<String> values = Arrays.asList("29--2141741193=1713852879594","48--2141741193=1713853467453");
    LinkedHashMap<String,String> linkedHashMap = new LinkedHashMap<String, String>();
    values.forEach(v -> linkedHashMap.put(v.split("=")[0], v.split("=")[1]));
    
 // Define the function to update state and compute word count
    MapGroupsWithStateFunction<String, String, StateInfo, StateOutput> stateAggregationFunc =
        new MapGroupsWithStateFunction<String, String, StateInfo, StateOutput>() {
			
			private static final long serialVersionUID = 1L;
			private StateInfo state;			

			@Override
			public StateOutput call(String key, Iterator<String> values, GroupState<StateInfo> oldState) throws Exception {
				
			state = oldState.exists() ? oldState.get() : new StateInfo(key, 0L, Collections.emptyList(), 0L, Collections.emptyMap());
                                 //state = oldState.exists() ? oldState.get() : new StateInfo(key, 0L,Collections.emptyList(), 0L);
				
			while (values.hasNext()) {
				String wordKey = values.next().toString();
				state = oldState.exists() ? oldState.get() : new StateInfo(key, 0L, Collections.emptyList(), 0L, Collections.emptyMap()); 
                                  //state = oldState.exists() ? oldState.get() : new StateInfo(key, 0L, Collections.emptyList(), 0L);
				StateInfo updatedState= new StateInfo(key, state.getCount()+1, Collections.emptyList(), Long.valueOf(key.length()), populateHashSrcEvts(key, state.getHashSrcEvts()));
                                   //StateInfo updatedState= new StateInfo(key, state.getCount()+1, Collections.emptyList(), Long.valueOf(key.length()));

				oldState.update(updatedState);
				
				
			}
			return new StateOutput(key, state.getCount());
				
			}

			
			private Map<String, LinkedHashMap<String, String>> populateHashSrcEvts(String key, Map<String, LinkedHashMap<String, String>> map) {
				
				Map<String, LinkedHashMap<String, String>> map2 = new HashMap<String, LinkedHashMap<String, String>>();
				map2.put(key, linkedHashMap);
				return map2;
			}
			
			
		};
    

	        Dataset<Row> kafkaStream = spark
	                .readStream()
	                .format("kafka")
	                .options(getKafkaParams())
	                .option("subscribe",  "topic1")
	                .option("maxOffsetsPerTrigger", 1000)
	                .option("checkpointLocation", "/Users/amol/rocksdb/")
	                .load();

	        
	        Dataset<String> stringStream = kafkaStream.selectExpr("CAST(value AS STRING)").as(Encoders.STRING());

	        Dataset<StateOutput> wordCounts = null;
    try {
	// Generate running word count
    	
    wordCounts = stringStream
    		.groupByKey((MapFunction<String, String>) value -> value, Encoders.STRING())
    		.mapGroupsWithState( 
    				stateAggregationFunc,
    				Encoders.kryo(StateInfo.class),
    				Encoders.bean(StateOutput.class));
    }catch (Exception e) {
		System.out.println(e.getStackTrace());
		System.exit(1);
	}

    
    // Start running the query that prints the running counts to the console
    StreamingQuery query = null;
	try {
		query = wordCounts.writeStream()
		        .outputMode("update")
		        .format("console")
                .option("checkpointLocation", "/Users/amol/rocksdb/")
		        .start();
		System.out.println("spark streaming query started...");
	} catch (TimeoutException e) {
		// TODO Auto-generated catch block
	}
        
    query.awaitTermination();
    System.out.println("spark streaming query stoped...");
}

private static SparkSession createSparkSession() {
	SparkSession spark = SparkSession
            .builder()
            .appName("SparkStreamingWordCount")
            .master("local[*]")
            .getOrCreate();
    spark.sparkContext().conf().set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider");
    spark.sparkContext().conf().set("spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows", "false");
    spark.sparkContext().conf().set("spark.sql.streaming.stateStore.stateSchemaCheck", "false");
    spark.sparkContext().conf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    spark.sparkContext().conf().set("spark.kryo.registrationRequired", "true");
    spark.sparkContext().conf().set("spark.kryo.classesToRegister", "com.test.rockdb.StateInfo");
    spark.sparkContext().conf().set("spark.kryo.referenceTracking", "false");
    spark.sparkContext().conf().set("spark.kryo.registrator", CustomKryoRegistrator.class.getName());
    
    
   
    System.out.println("spark session created");
	return spark;
}

private static Map<String, String> getKafkaParams() {
	Map<String, String> kafkaParams = new HashMap<String, String>();
	kafkaParams.put("kafka.bootstrap.servers", "localhost:9092");
	kafkaParams.put("group.id", "consumer-group");
	return kafkaParams;
}

}`

Now to reproduce the issue

  1. create a state with bean serializer,
  2. Change the StateInfo class to add new field like:private Map<String, LinkedHashMap<String, String>> hashSrcEvts;
  3. change the serializer to kryo
  4. generate event and try to update the state using kryo serializer.

Environment:

  • OS: [e.g. Mac OS eclipse]
  • JDK Version: [e.g. 1.8]
  • Kryo Version: [e.g. 4.0.2]

Additional context
This issue is while evolution of rock db state structure, in the spark structure streaming, where existing rocks DB state data is serialized by bean serializer but to change schema(add/remove field in StateInfo) using kryo's CompatibleFieldSerializer.

@theigl
Copy link
Collaborator

theigl commented May 7, 2024

Thanks for the report @ams45. Can you try to reproduce this using Kryo alone? At the moment the setup is much too complicated for me to look into.

The best approach would be to write a simple unit test without dependencies that reproduces your exact Kryo setup. You can serialize the original StateInfo class and then deserialize it into a StateInfoNew class with the new field.

Something like this but with the exact Kryo config:

@Test
void testChange () {
        Kryo kryo = new Kryo();
	kryo.register(StateInfo.class);
	kryo.register(StateInfoNew.class);

	final StateInfo si = new StateInfo();
	Output output = new Output(512);
	kryo.writeObject(output, si);
	
	Input input = new Input(output.getBuffer());
	final StateInfoNew o = kryo.readObject(input, StateInfoNew.class);
	assertNotNull(o);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

No branches or pull requests

2 participants