/
kafka_consumer.py
97 lines (81 loc) · 2.9 KB
/
kafka_consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import logging
from kafka import KafkaConsumer
from base64 import decodebytes
from PIL import Image
import tensorflow as tf
import numpy as np
import json
import requests
import pickle
from slack_sdk import WebClient
from seldon_core.seldon_client import SeldonClient
import os
logging.basicConfig(level=logging.INFO)
config = os.getenv("CONFIG", "SELDON")
slack_bot_auth_token = os.getenv("SLACK_BOT_TOKEN")
slack_channel_id = os.getenv("SLACK_CHANNEL_ID")
def send_photo_to_slack(result):
image = open("foo.png", 'rb').read()
client = WebClient(slack_bot_auth_token) #input OAuth token
client.files_upload(
channels=slack_channel_id, #input channel id
initial_comment=f"{result}",
filename="dog photo",
content=image
)
def send_client_request(seldon_client, image):
client_prediction = seldon_client.predict(
data=image,
deployment_name="seldon-dogbreed",
payload_type="ndarray"
)
return client_prediction
def predict_seldon(image):
sc = SeldonClient(
gateway="seldon",
transport="rest",
gateway_endpoint="localhost:9000"
)
prediction = send_client_request(sc, image)
response = prediction.response.get("data").get("ndarray")
pred = tf.argmax(response, axis=1)
with open("./models/labels.pickle", "rb") as handle:
idx_to_class1 = pickle.load(handle)
idx_to_class = {value: key for key, value in idx_to_class1.items()}
label = idx_to_class[pred.numpy()[0]]
result = label.split(".")[-1].replace("_", " ")
return result
def predict_tfserve(image):
url = "http://localhost:8501/v1/models/dog_model:predict"
data = json.dumps({"signature_name": "serving_default", "instances": image.tolist(), })
headers = {"Content-Type": "application/json"}
response = requests.post(url, data=data, headers=headers)
print(response.text)
prediction = json.loads(response.text)["predictions"]
pred = tf.argmax(prediction, axis=1)
with open("./models/labels.pickle", "rb") as handle:
idx_to_class1 = pickle.load(handle)
idx_to_class = {value: key for key, value in idx_to_class1.items()}
label = idx_to_class[pred.numpy()[0]]
result = label.split(".")[-1].replace("_", " ")
return result
def consumer():
consumer = KafkaConsumer('dogtopic')
for message in consumer:
with open("foo.png", "wb") as f:
f.write(decodebytes(message.value))
img = tf.keras.utils.load_img(
"foo.png",
target_size=(224,224)
)
input_arr = tf.keras.utils.img_to_array(img)
image = input_arr[None, ...]
if config == "TENSORFLOW":
print("Tensorflow")
result = predict_tfserve(image)
elif config == "SELDON":
print("SELDON")
result = predict_seldon(image)
send_photo_to_slack(result)
if __name__ == "__main__":
consumer()