Skip to content

powerful_agent.py

PowerfulAgent

create_notification_broker(self, pub_address, sub_address, options=None)

Starts a pub-sub notifications broker

Parameters:

Name Type Description Default
pub_address str

agents publish to this address to notify other agents

required
sub_address str

agents listen on this address for notifications

required

Returns:

Type Description

connections (pub, sub)

Source code in agents/powerful_agent.py
def create_notification_broker(self, pub_address, sub_address, options=None):
    """Starts a pub-sub notifications broker

    Args:
        pub_address (str): agents publish to this address to notify other agents
        sub_address (str): agents listen on this address for notifications

    Returns:
        connections (pub, sub)
    """
    if options is None:
        options = {}
    xpub = self.bind_socket(zmq.XPUB, options, sub_address)
    xsub = self.bind_socket(zmq.XSUB, options, pub_address)
    self._disposables.append(
        Pipeline()(xsub.observable, subscribe=lambda x: xpub.send(x))
    )
    self._disposables.append(
        Pipeline()(xpub.observable, subscribe=lambda x: xsub.send(x))
    )
    return xsub, xpub

create_notification_client(self, pub_address, sub_address, options=None, topics='')

Creates 2 connections (pub, sub) to a notifications broker

Parameters:

Name Type Description Default
pub_address str

publish to this address to notify other agents

required
sub_address str

listen on this address for notifications

required

Returns:

Type Description

connections (pub, sub)

Source code in agents/powerful_agent.py
def create_notification_client(
    self, pub_address, sub_address, options=None, topics=""
):
    """Creates 2 connections (pub, sub) to a notifications broker

    Args:
        pub_address (str): publish to this address to notify other agents
        sub_address (str): listen on this address for notifications

    Returns:
        connections (pub, sub)
    """
    if options is None:
        options = {}
    pub = self.connect_socket(zmq.PUB, options, pub_address)
    sub = self.connect_socket(zmq.SUB, options, sub_address)
    sub.socket.subscribe(topics)
    return pub, sub.update(
        {"observable": sub.observable.pipe(ops.map(Message.decode))}
    )

shutdown(self)

Shutdown procedure, call super().shutdown() if overriding

Source code in agents/powerful_agent.py
def shutdown(self):
    if self._zap:
        self.log.info("stopping ZMQ Authenticator ...")
        self._zap.stop()
    for d in self._disposables:
        self.log.info(f"disposing {d} ...")
        d.dispose()
    super().shutdown()

start_authenticator(self, domain='*', whitelist=None, blacklist=None, certificates_path=None)

Starts ZAP Authenticator in thread

configure_curve must be called every time certificates are added or removed, in order to update the Authenticator’s state

Parameters:

Name Type Description Default
certificates_path str

path to client public keys to allow

None
whitelist list[str]

ip addresses to whitelist

None
domain

(str): domain to apply authentication

'*'
Source code in agents/powerful_agent.py
def start_authenticator(
    self, domain="*", whitelist=None, blacklist=None, certificates_path=None
):
    """Starts ZAP Authenticator in thread

    configure_curve must be called every time certificates are added or removed, in order to update the Authenticator’s state

    Args:
        certificates_path (str): path to client public keys to allow
        whitelist (list[str]): ip addresses to whitelist
        domain: (str): domain to apply authentication
    """
    certificates_path = certificates_path if certificates_path else CURVE_ALLOW_ANY
    self._zap = ThreadAuthenticator(self.zmq_context, log=self.log)
    self._zap.start()
    if whitelist is not None:
        self._zap.allow(*whitelist)
    elif blacklist is not None:
        self._zap.deny(*blacklist)
    else:
        self._zap.allow()
    self._zap.configure_curve(domain=domain, location=certificates_path)
    return self._zap