Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GCS Sink Connector not able to write as parquet when published data using golang client for pulsar.. works fine with java client #196

Open
mdsadimnadeem opened this issue Dec 29, 2021 · 2 comments

Comments

@mdsadimnadeem
Copy link

@freeznet when I am writing data using pulsar golang client .. the below is the program I am using to write on pulsar topic and its able to publish on pulsar and I am also able to consume it .. but the stream native gcs sink connector is not able to write it as parquet on gcs and I am getting the below exception..

Slack Thread on Apache Pulsar Community : https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1640579795494300

Exception Stack Trace : `16:28:54.381 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/sadim-test-json3][/pulsar-poc/cloud-storage-sink] Subscribed to topic on pulsar-poc-broker-1.pulsar-poc-broker.pulsar-poc.svc.cluster.local/10.248.1.83:6650 -- consumer: 43
16:28:54.436 [pulsar-io-cloud-storage-sink-flush-0] INFO org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Flushing [SinkRecord(sourceRecord=PulsarRecord(topicName=Optional[persistent://public/default/sadim-test-json3], partition=0, message=Optional[org.apache.pulsar.client.impl.TopicMessageImpl@23bbe054], schema=AUTO_CONSUME({schemaVersion=,schemaType=BYTES}{schemaVersion=org.apache.pulsar.common.protocol.schema.LatestVersion@2dd916ed,schemaType=JSON}), failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$322/0x0000000840688840@14f36ab9, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$321/0x0000000840688440@495fc424), value=[B@50d82707)] buffered records to blob store
16:28:54.436 [pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Caught unexpected exception:
org.apache.avro.SchemaParseException: Cannot parse schema
at org.apache.avro.Schema.parse(Schema.java:1633) ~[java-instance.jar:?]
at org.apache.avro.Schema$Parser.parse(Schema.java:1430) ~[java-instance.jar:?]
at org.apache.avro.Schema$Parser.parse(Schema.java:1418) ~[java-instance.jar:?]
at org.apache.pulsar.io.jcloud.util.AvroRecordUtil.convertToAvroSchema(AvroRecordUtil.java:103) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at org.apache.pulsar.io.jcloud.format.ParquetFormat.initSchema(ParquetFormat.java:63) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.unsafeFlush(BlobStoreAbstractSink.java:239) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink.flush(BlobStoreAbstractSink.java:219) ~[7k9dpWmnkw8dXf7zEFCwXw/:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
16:28:55.788 [pulsar-io-cloud-storage-sink-flush-0] INFO org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Skip flushing, because the pending flush queue is empty ...

accuknox-mysql-helm-temp/pulsar-test-dec22-test1/public/default/CiliumTelemetryDemo27Dec/2021-12-27`

Golang Client Code : `package main
import (
"context"
"time"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"encoding/json"
"strconv"
)

func main() {
fmt.Println("hello world")
for i := 0; i < 500; i++ {
fmt.Println("sadim"+ strconv.Itoa(i))
writePulsar("ciliumjson","sadim111")
fmt.Println("sadimend"+ strconv.Itoa(i))
}
fmt.Println("hello world 2")
}

type AccuknoxJsonWrapperObject struct {
msg AccuknoxFinalJsonWrapperObject msg:"json"
}

type AccuknoxFinalJsonWrapperObject struct {
inputjson string
}

var (
exampleSchemaDef = "{"type":"record","name":"Example2","namespace":"test2","fields":[{"name":"inputjson","type":"string"}]}"
)

func PublishToPulsarTopic(inputjson string, topicName string) (result string) {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
fmt.Println("Could not instantiate Pulsar client: %v")
}

defer client.Close()

	JsonWrapperObject2 := AccuknoxFinalJsonWrapperObject{inputjson}
	fmt.Println("sadim1")
	fmt.Println(JsonWrapperObject2)
	JsonWrapperObject := AccuknoxJsonWrapperObject{JsonWrapperObject2}
	fmt.Println("sadim2")
	fmt.Println(JsonWrapperObject)
	
	file, _ := json.Marshal(JsonWrapperObject2)
	fmt.Println("sadim3")
	fmt.Println(file)
	file2, _ := json.Marshal(JsonWrapperObject)
	fmt.Println("sadim4")
	fmt.Println(file2)

properties := make(map[string]string)
properties["pulsar"] = "hello"
fmt.Println("sadim5")
fmt.Println(properties)

jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef, properties)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
Schema: jsonSchemaWithProperties,
})

if err != nil {
fmt.Println(err)
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: (JsonWrapperObject2),
})

defer producer.Close()

if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")

return ""

}
type testJSON struct {
InputJson string
}
var (
exampleSchemaDef5 = "{"type":"record","name":"testJSON","namespace":"test2"," +
""fields":[{"name":"InputJson","type":"testJSON"}]}"
)

var (
exSchema = "{"type":"record","schema":"","properties":{"InputJson":"testJSON"}}"
)
func writePulsar(inputjson string, topicName string) (result string) {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
if err != nil {
fmt.Println(err)
}
defer client.Close()

//a2 := testJSON{InputJson: inputjson}
//properties := map[string]testJSON{"InputJson" : a2}
properties2 := make(map[string]string)
properties2["InputJson"] = "testJSON"
jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef5, properties2)
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topicName,
Schema: jsonSchemaWithProperties,
})
if err != nil {
fmt.Println(err)
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Value: &testJSON{
InputJson: inputjson,
},
})
if err != nil {
fmt.Println(err)
}
producer.Close()
return ""
}`

Java Working Client which is able to write as parquet on pulsar code: `package accuknox.datalake;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.schema.JSONSchema;

public class App3 {
public static void main(String[] args) throws InterruptedException, IOException {
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
/*
* Producer producer = client.newProducer(JSONSchema.of(String.class))
* .topic("persistent://public/default/cilium-telemetry-4") .create();
*/
//Schema.JSON
Producer producer = client.newProducer(JSONSchema.of(AccuknoxJsonObject.class)).topic("CiliumTelemetryDemo27Dec").create();
//Producer<byte[]> producer = client.newProducer().topic("cilium-test1").create();
FileReader fr = null;
BufferedReader br = null;
AccuknoxJsonObject jsonObject = null;
for (int i=0;i<=90;i++) {
try
{
File file = new File("./knoxfeedermockdata/flow.txt");
//File file = new File("./knoxfeedermockdata/cilium_alerts_tableMockData.txt");
//File file = new File("./knoxfeedermockdata/cilium_v3_All.txt");
System.out.println(file.getAbsolutePath());
fr=new FileReader(file);
br=new BufferedReader(fr);
String line;
while((line=br.readLine())!=null)
{
System.out.println(line);
//for(int i = 0 ; i< 1000000; i++)
jsonObject = new AccuknoxJsonObject(line);
producer.send(jsonObject);
}
System.gc();
//this.getRuntime().gc()
fr.close(); //closes the stream and release the resources
}
catch(IOException e)
{
System.err.print(e);
} catch(NullPointerException e2) {
System.err.print(e2);
} finally {
try {
if(br != null)
br.close();
if(fr != null)
fr.close();
} catch(NullPointerException ex) {
System.err.print(ex);
}
}
//Thread.sleep(200);
}
producer.close();
client.close();
}
}
`

cc: @sijie

@mdsadimnadeem
Copy link
Author

@mdsadimnadeem
Copy link
Author

even if we get a sample code for a golang pulsar producer client which when sends data on a pulsar topic .. that data is written as parquet on gcs by stream native gcs sink connector .. that will work for us .. we can use the same syntax @freeznet @sijie

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant