Notifications

import threading
import time

import zmq

from agents import Message, PowerfulAgent


class NotificationBroker(PowerfulAgent):
    def setup(self, name=None, pub_address=None, sub_address=None):
        self.create_notification_broker(pub_address, sub_address)


class Sender(PowerfulAgent):
    def setup(self, name=None, pub_address=None, sub_address=None):
        self.counter = 0
        self.pub, self.sub = self.create_notification_client(pub_address, sub_address)

        # begin sending forever, add to managed threads for graceful cleanup
        t = threading.Thread(target=self.send_forever)
        self.threads.append(t)
        t.start()

    def send_forever(self):
        # use exit event to gracefully exit loop and graceful cleanup
        while not self.exit_event.is_set():
            time.sleep(1)
            self.counter += 1
            self.log.info(f"publishing: {self.counter}")
            self.pub.send(Message.notification(payload=self.counter))


class Listener(PowerfulAgent):
    def setup(self, name=None, pub_address=None, sub_address=None):
        self.pub, self.sub = self.create_notification_client(pub_address, sub_address)
        self.sub.observable.subscribe(
            lambda x: self.log.info(f"received: { x['payload'] }")
        )


if __name__ == "__main__":
    broker = NotificationBroker(
        name="broker",
        pub_address="tcp://0.0.0.0:5000",
        sub_address="tcp://0.0.0.0:5001",
    )
    sender = Sender(
        name="sender",
        pub_address="tcp://0.0.0.0:5000",
        sub_address="tcp://0.0.0.0:5001",
    )
    listener = Listener(
        name="listener",
        pub_address="tcp://0.0.0.0:5000",
        sub_address="tcp://0.0.0.0:5001",
    )