OpenFaaS Lab 4

Async functions and publishing data to Kafka

Posted by Krystian Wojcicki on Monday, July 29, 2019 Tags: Guide   3 minute read

Lab 4

  • Async functions and pushing to Kafka

To push to kafka we first need to have a Kafka running.

helm repo add incubator
kubectl create ns kafka
helm upgrade kf incubator/kafka \
    --install \
    --namespace kafka \
    --set imageTag=4.1.3 \
    --set persistence.enabled=false
kubectl -n kafka apply -f - <<EOF
apiVersion: v1
kind: Pod
  name: testclient
  namespace: kafka
  - name: kafka
    image: confluentinc/cp-kafka:5.0.1
    - sh
    - -c
    - "exec tail -f /dev/null"

Now we will create a function sleep-produce that will sleep for a bit and then produce a message to kafka.

faas new --lang python3 sleep-produce

Add the following to sleep-produce

import time
from kafka import KafkaProducer

def handle(req):
    """handle a request to the function
        req (str): request body


    producer = KafkaProducer(bootstrap_servers='kf-kafka.kafka:9092')
    producer.send('topic_name', b'some_message_bytes')

    print("Sent message to topic")
    return req

With kafka-python in ./sleep-produce/requirements.txt

Now lets deploy it

faas build -f sleep-produce.yml
kind load docker-image sleep-produce:latest
faas deploy -f sleep-produce.yml --gateway $ip:31112

Next to test it will we need another console window to consume from kafka

kubectl -n kafka exec -it testclient -- kafka-console-consumer --bootstrap-server kf-kafka:9092 --topic topic_name --from-beginning

This will continue listening until you press ctrl+c. In your original window you can now invoke sleep-produce and see the resulting message in your Kafka consumer window.

kwojcicki@ubuntu:~/workspace/openfaas-workshops$ kubectl -n kafka exec -it testclient -- kafka-console-consumer --bootstrap-server kf-kafka:9092 --topic topic_name --from-beginning

You may have noticed all the functions invocations so far have been synchronous. But what if want to invoke a function async. In OpenFaas its as simple as added async to the curl call and you are done!

With your Kafka Consumer shell still open try the following

kwojcicki@ubuntu:~/workspace/openfaas-workshops$ time curl -XPOST $ip:31112/async-function/sleep-produce

real    0m0.011s
user    0m0.009s
sys     0m0.000s

Unlike before the curl call returns immediately and the actual function invocation is queued up and will be called at a later time.

To cleanup the Kafka + testpod containers run the following

kubectl -n kafka delete po testclient
helm delete --purge kf