Browse Source

Updated: worker2.py supports "timeout" value so we can logout, etc.

Steve Thielemann 5 years ago
parent
commit
90983945e0
3 changed files with 92 additions and 12 deletions
  1. 29 1
      NOTES.txt
  2. 15 11
      rpc_client.py
  3. 48 0
      worker2.py

+ 29 - 1
NOTES.txt

@@ -4,9 +4,37 @@ https://www.rabbitmq.com/getstarted.html
 send.py -> receive.py  https://www.rabbitmq.com/tutorials/tutorial-one-python.html  Hello World.
 uses: hello queue
 
-new_task.py -> worker.py  https://www.rabbitmq.com/tutorials/tutorial-two-python.html Work Queues.
+new_task.py -> worker.py, worker2.py  https://www.rabbitmq.com/tutorials/tutorial-two-python.html Work Queues.
 uses: task_queue
 
 rpc_client.py -> rpc_server.py https://www.rabbitmq.com/tutorials/tutorial-six-python.html RPC Call
 uses: rpc_queue
 
+
+Ok, so save time at the end of the callback, and compare at the start
+of the callback.  If > 10 seconds, clear our throttle buffer!
+
+^ Ok, here's the problem:  Pacer.  We get a message, login, process.
+Then what?  The problem is, we want to logout after 4-5 seconds of 
+nothing to do.
+
+https://github.com/pika/pika/issues/770
+
+Replacing:
+channel.start_consuming()
+with
+while channel._consumer_infos:
+  channel.connection.process_data_events(time_time=1)
+  print("Ok!  I can do something between events / when I have no events.")
+
+See worker2.py for working example of this.
+
+^ This allows us to do something (like see if we are logged in, and if a certain amount of time
+has passed or not.  We'll be able to logout / do other things.
+
+I'm not sure about the 1 second.  I don't need it to be like that.  Maybe 60?  (Every minute?)  or
+
+if we're waiting for work, set at 10, if we're logged out, set to 60.
+
+
+

+ 15 - 11
rpc_client.py

@@ -3,43 +3,47 @@
 import pika
 import uuid
 
+
 class FibonacciRpcClient(object):
     def __init__(self):
         self.connection = pika.BlockingConnection(
-            pika.ConnectionParameters(host='localhost'))
+            pika.ConnectionParameters(host="localhost")
+        )
         self.channel = self.connection.channel()
 
-        result = self.channel.queue_declare(queue='', exclusive=True)
+        result = self.channel.queue_declare(queue="", exclusive=True)
         self.callback_queue = result.method.queue
 
         self.channel.basic_consume(
             queue=self.callback_queue,
             on_message_callback=self.on_response,
-            auto_ack=True)
+            auto_ack=True,
+        )
 
     def on_response(self, ch, method, props, body):
         if self.corr_id == props.correlation_id:
             self.response = body
-    
-    def call(self,n):
+
+    def call(self, n):
         self.response = None
         self.corr_id = str(uuid.uuid4())
         self.channel.basic_publish(
-            exchange='',
-            routing_key='rpc_queue',
+            exchange="",
+            routing_key="rpc_queue",
             properties=pika.BasicProperties(
-                reply_to=self.callback_queue,
-                correlation_id=self.corr_id,
+                reply_to=self.callback_queue, correlation_id=self.corr_id
             ),
-            body=str(n)
+            body=str(n),
         )
         while self.response is None:
             self.connection.process_data_events()
         return int(self.response)
 
+
 fibonacci_rpc = FibonacciRpcClient()
 
-for n in range(10,30, 5):
+
+for n in range(10, 30, 5):
     print(" [x] Requesting fib({0})".format(n))
     response = fibonacci_rpc.call(n)
     print(" [.] Got %r" % response)

+ 48 - 0
worker2.py

@@ -0,0 +1,48 @@
+#!/usr/bin/env python3
+import pika
+import time
+
+connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
+channel = connection.channel()
+
+channel.queue_declare(queue="task_queue", durable=True)
+print(" [*] Waiting for messages. To exit press CTRL+C")
+
+heart = 0
+
+def callback(ch, method, properties, body):
+    global heart
+
+    # instead of b'Hello World!'
+    text = body.decode("utf-8")
+    print(" [x] Received", text)
+    time.sleep(text.count("."))
+    print(" [x] Done")
+    heart = time.time()
+    ch.basic_ack(delivery_tag=method.delivery_tag)
+
+
+channel.basic_qos(prefetch_count=1)
+channel.basic_consume(queue="task_queue", on_message_callback=callback)
+
+# channel.start_consuming()
+
+delay = 60
+
+while channel._consumer_infos:
+    channel.connection.process_data_events(time_limit=delay)
+    print("delay=", delay, "heart=", heart, "clock=", time.time());
+    if ( heart == 0 ):
+        if delay != 60:
+            delay = 60
+            print("Deep sleep...")
+    else:
+        delay = 10
+        delta = time.time() - heart
+        print("delta=", delta)
+        if (delta > 50):
+            print("ZZZzzz...")
+            heart = 0
+
+connection.close()
+