Browse Source

first commit

Steve Thielemann 5 years ago
commit
de943ad3c4
14 changed files with 229 additions and 0 deletions
  1. 9 0
      .gitignore
  2. 8 0
      NOTES.txt
  3. 17 0
      docker-compose.yml
  4. 3 0
      list_queues.sh
  5. 3 0
      list_unack.sh
  6. 20 0
      new_task.py
  7. 21 0
      receive.py
  8. 4 0
      req.txt
  9. 6 0
      reset_mq.sh
  10. 44 0
      rpc_client.py
  11. 42 0
      rpc_server.py
  12. 12 0
      send.py
  13. 13 0
      send_tasks.sh
  14. 27 0
      worker.py

+ 9 - 0
.gitignore

@@ -0,0 +1,9 @@
+bin
+include
+lib
+lib64
+share
+pyvenv.cfg
+pip-selfcheck.json
+.vscode
+rabbitmq-data

+ 8 - 0
NOTES.txt

@@ -0,0 +1,8 @@
+
+https://www.rabbitmq.com/getstarted.html
+
+send.py -> receive.py  https://www.rabbitmq.com/tutorials/tutorial-one-python.html  Hello World.
+
+new_task.py -> worker.py  https://www.rabbitmq.com/tutorials/tutorial-two-python.html Work Queues.
+
+rpc_client.py -> rpc_server.py https://www.rabbitmq.com/tutorials/tutorial-six-python.html RPC Call

+ 17 - 0
docker-compose.yml

@@ -0,0 +1,17 @@
+version: "2"
+
+services:
+  rabbit:
+    image: rabbitmq:3.7-alpine
+    ports: 
+      - "5672:5672"
+    # persistent data needs a consistent hostname
+    hostname: crunchy_carrot
+    environment:
+      # RABBITMQ_DEFAULT_USER: rabbit
+      # RABBITMQ_DEFAULT_PASS: carrot
+      RABBITMQ_HIPE_COMPILE: 1
+    volumes:
+      - ./rabbitmq-data:/var/lib/rabbitmq
+      # data will be stored in /var/lib/rabbitmq/mnesia/rabbit@HOSTNAME
+

+ 3 - 0
list_queues.sh

@@ -0,0 +1,3 @@
+#!/bin/bash
+
+docker-compose exec rabbit rabbitmqctl list_queues

+ 3 - 0
list_unack.sh

@@ -0,0 +1,3 @@
+#!/bin/bash
+
+docker-compose exec rabbit rabbitmqctl list_queues name messages_ready messages_unacknowledged

+ 20 - 0
new_task.py

@@ -0,0 +1,20 @@
+#!/usr/bin/env python3
+import pika
+import sys
+
+message = " ".join(sys.argv[1:]) or "Hello World!"
+
+connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
+channel = connection.channel()
+
+channel.queue_declare(queue="task_queue", durable=True)
+channel.basic_publish(
+    exchange="",
+    routing_key="task_queue",
+    body=message,
+    properties=pika.BasicProperties(delivery_mode=2),  # make message persistent
+)
+print(" [x] Sent {0}".format(message))
+
+connection.close()
+

+ 21 - 0
receive.py

@@ -0,0 +1,21 @@
+#!/usr/bin/env python3
+import pika
+
+connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
+channel = connection.channel()
+
+channel.queue_declare(queue="hello")
+
+
+def callback(ch, method, properties, body):
+    # instead of b'Hello World!'
+    text = body.decode("utf-8")
+    print(" [x] Received", text)
+
+
+channel.basic_consume(queue="hello", auto_ack=True, on_message_callback=callback)
+print(" [*] Waiting for messages. To exit press CTRL+C")
+channel.start_consuming()
+
+connection.close()
+

+ 4 - 0
req.txt

@@ -0,0 +1,4 @@
+black
+pylint
+pika
+

+ 6 - 0
reset_mq.sh

@@ -0,0 +1,6 @@
+#!/bin/bash
+
+sudo rm -rf rabbitmq-data
+mkdir rabbitmq-data
+sudo chown 100 rabbitmq-data
+

+ 44 - 0
rpc_client.py

@@ -0,0 +1,44 @@
+#!/usr/bin/env python
+
+import pika
+import uuid
+
+class FibonacciRpcClient(object):
+    def __init__(self):
+        self.connection = pika.BlockingConnection(
+            pika.ConnectionParameters(host='localhost'))
+        self.channel = self.connection.channel()
+
+        result = self.channel.queue_declare(queue='', exclusive=True)
+        self.callback_queue = result.method.queue
+
+        self.channel.basic_consume(
+            queue=self.callback_queue,
+            on_message_callback=self.on_response,
+            auto_ack=True)
+
+    def on_response(self, ch, method, props, body):
+        if self.corr_id == props.correlation_id:
+            self.response = body
+    
+    def call(self,n):
+        self.response = None
+        self.corr_id = str(uuid.uuid4())
+        self.channel.basic_publish(
+            exchange='',
+            routing_key='rpc_queue',
+            properties=pika.BasicProperties(
+                reply_to=self.callback_queue,
+                correlation_id=self.corr_id,
+            ),
+            body=str(n)
+        )
+        while self.response is None:
+            self.connection.process_data_events()
+        return int(self.response)
+
+fibonacci_rpc = FibonacciRpcClient()
+
+print(" [x] Requesting fib(30)")
+response = fibonacci_rpc.call(30)
+print(" [.] Got %r" % response)

+ 42 - 0
rpc_server.py

@@ -0,0 +1,42 @@
+#!/usr/bin/env python
+
+import pika
+
+# https://www.rabbitmq.com/tutorials/tutorial-six-python.html
+
+connection = pika.BlockingConnection(
+    pika.ConnectionParameters(host='localhost'))
+
+channel = connection.channel()
+
+channel.queue_declare(queue='rpc_queue')
+
+def fib(n):
+    if n== 0:
+        return 0
+    elif n == 1:
+        return 1
+    else:
+        return fib(n-1) + fib(n-2)
+
+def on_request(ch, method, props, body):
+    n = int(body)
+
+    print(" [.] fib(%s)" % n)
+    response = fib(n)
+
+    ch.basic_publish(exchange='',
+        routing_key=props.reply_to,
+        properties=pika.BasicProperties(correlation_id = props.correlation_id),
+        body=str(response))
+
+    ch.basic_ack(delivery_tag=method.delivery_tag)
+
+
+channel.basic_qos(prefetch_count=1)
+
+channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
+
+print( "[x] Awaiting RPC requests")
+
+channel.start_consuming()

+ 12 - 0
send.py

@@ -0,0 +1,12 @@
+#!/usr/bin/env python3
+import pika
+
+connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
+channel = connection.channel()
+
+channel.queue_declare(queue="hello")
+channel.basic_publish(exchange="", routing_key="hello", body="Hello World!")
+print(" [x] Send 'Hello World!'")
+
+connection.close()
+

+ 13 - 0
send_tasks.sh

@@ -0,0 +1,13 @@
+#!/bin/bash
+
+./new_task.py task 1.
+./new_task.py task 2..
+./new_task.py task 3...
+./new_task.py task 4....
+./new_task.py task 5...
+./new_task.py task 6..
+./new_task.py task 7.
+./new_task.py task 8..
+./new_task.py task 9...
+./new_task.py task 10....
+

+ 27 - 0
worker.py

@@ -0,0 +1,27 @@
+#!/usr/bin/env python3
+import pika
+import time
+
+connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
+channel = connection.channel()
+
+channel.queue_declare(queue="task_queue", durable=True)
+print(" [*] Waiting for messages. To exit press CTRL+C")
+
+
+def callback(ch, method, properties, body):
+    # instead of b'Hello World!'
+    text = body.decode("utf-8")
+    print(" [x] Received", text)
+    time.sleep(text.count("."))
+    print(" [x] Done")
+    ch.basic_ack(delivery_tag=method.delivery_tag)
+
+
+channel.basic_qos(prefetch_count=1)
+channel.basic_consume(queue="task_queue", on_message_callback=callback)
+
+channel.start_consuming()
+
+connection.close()
+