Zap Curve Authentication

import os
import tempfile
import threading
import time

import zmq

from agents import Agent, Message, PowerfulAgent


class NotificationBroker(PowerfulAgent):
    def setup(
        self,
        name=None,
        pub_address=None,
        sub_address=None,
        private_key=None,
        client_certificates_path=None,
    ):

        # configure public key auth/encryption if private_key is provided
        options = self.curve_server_config(private_key) if private_key else {}
        self.create_notification_broker(pub_address, sub_address, options=options)

        # start authenticator if client_certificates_path is provided
        if client_certificates_path:
            self.auth = self.start_authenticator(
                domain="*", certificates_path=client_certificates_path
            )


class Sender(PowerfulAgent):
    def setup(
        self,
        name=None,
        pub_address=None,
        sub_address=None,
        private_key=None,
        public_key=None,
        server_public_key=None,
    ):
        # configure public key auth/encryption if keys are provided
        if private_key and public_key and server_public_key:
            options = self.curve_client_config(
                server_public_key, public_key, private_key
            )
        else:
            options = {}
        self.counter = 0
        self.pub, self.sub = self.create_notification_client(
            pub_address, sub_address, options=options
        )

        # begin sending forever, add to managed threads for graceful cleanup
        t = threading.Thread(target=self.send_forever)
        self.threads.append(t)
        t.start()

    def send_forever(self):
        # use exit event to gracefully exit loop and graceful cleanup
        while not self.exit_event.is_set():
            time.sleep(1)
            self.counter += 1
            self.log.info(f"publishing: {self.counter}")
            self.pub.send(Message.notification(payload=self.counter))


class Listener(PowerfulAgent):
    def setup(
        self,
        name=None,
        pub_address=None,
        sub_address=None,
        private_key=None,
        public_key=None,
        server_public_key=None,
    ):
        # configure public key auth/encryption if keys are provided
        if private_key and public_key and server_public_key:
            options = self.curve_client_config(
                server_public_key, public_key, private_key
            )
        else:
            options = {}
        self.pub, self.sub = self.create_notification_client(
            pub_address, sub_address, options=options
        )
        self.sub.observable.subscribe(
            lambda x: self.log.info(f"received: { x['payload'] }")
        )


if __name__ == "__main__":

    with tempfile.TemporaryDirectory() as trusted_keys_path, tempfile.TemporaryDirectory() as untrusted_keys_path:

        # create key pairs in corresponding directories
        Agent.create_curve_certificates(trusted_keys_path, "server")
        Agent.create_curve_certificates(trusted_keys_path, "listener")
        Agent.create_curve_certificates(untrusted_keys_path, "listener2")

        # load key pairs
        server_public_key, server_private_key = Agent.load_curve_certificate(
            os.path.join(trusted_keys_path, "server.key_secret")
        )
        listener_public_key, listener_private_key = Agent.load_curve_certificate(
            os.path.join(trusted_keys_path, "listener.key_secret")
        )
        listener2_public_key, listener2_private_key = Agent.load_curve_certificate(
            os.path.join(untrusted_keys_path, "listener2.key_secret")
        )

        broker = NotificationBroker(
            name="broker",
            pub_address="tcp://127.0.0.1:5000",
            sub_address="tcp://127.0.0.1:5001",
            private_key=server_private_key,
            client_certificates_path=trusted_keys_path,
        )
        sender = Sender(
            name="sender",
            pub_address="tcp://127.0.0.1:5000",
            sub_address="tcp://127.0.0.1:5001",
            private_key=server_private_key,
            public_key=server_public_key,
            server_public_key=server_public_key,
        )
        listener = Listener(
            name="listener",
            pub_address="tcp://127.0.0.1:5000",
            sub_address="tcp://127.0.0.1:5001",
            private_key=listener_private_key,
            public_key=listener_public_key,
            server_public_key=server_public_key,
        )
        listener2 = Listener(
            name="listener2",
            pub_address="tcp://127.0.0.1:5000",
            sub_address="tcp://127.0.0.1:5001",
            private_key=listener2_private_key,
            public_key=listener2_public_key,
            server_public_key=server_public_key,
        )