powerful_agent.py
PowerfulAgent
create_notification_broker(self, pub_address, sub_address, options=None)
Starts a pub-sub notifications broker
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pub_address |
str |
agents publish to this address to notify other agents |
required |
sub_address |
str |
agents listen on this address for notifications |
required |
Returns:
Type | Description |
---|---|
|
connections (pub, sub) |
Source code in agents/powerful_agent.py
def create_notification_broker(self, pub_address, sub_address, options=None):
"""Starts a pub-sub notifications broker
Args:
pub_address (str): agents publish to this address to notify other agents
sub_address (str): agents listen on this address for notifications
Returns:
connections (pub, sub)
"""
if options is None:
options = {}
xpub = self.bind_socket(zmq.XPUB, options, sub_address)
xsub = self.bind_socket(zmq.XSUB, options, pub_address)
self._disposables.append(
Pipeline()(xsub.observable, subscribe=lambda x: xpub.send(x))
)
self._disposables.append(
Pipeline()(xpub.observable, subscribe=lambda x: xsub.send(x))
)
return xsub, xpub
create_notification_client(self, pub_address, sub_address, options=None, topics='')
Creates 2 connections (pub, sub) to a notifications broker
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pub_address |
str |
publish to this address to notify other agents |
required |
sub_address |
str |
listen on this address for notifications |
required |
Returns:
Type | Description |
---|---|
|
connections (pub, sub) |
Source code in agents/powerful_agent.py
def create_notification_client(
self, pub_address, sub_address, options=None, topics=""
):
"""Creates 2 connections (pub, sub) to a notifications broker
Args:
pub_address (str): publish to this address to notify other agents
sub_address (str): listen on this address for notifications
Returns:
connections (pub, sub)
"""
if options is None:
options = {}
pub = self.connect_socket(zmq.PUB, options, pub_address)
sub = self.connect_socket(zmq.SUB, options, sub_address)
sub.socket.subscribe(topics)
return pub, sub.update(
{"observable": sub.observable.pipe(ops.map(Message.decode))}
)
shutdown(self)
Shutdown procedure, call super().shutdown() if overriding
Source code in agents/powerful_agent.py
def shutdown(self):
if self._zap:
self.log.info("stopping ZMQ Authenticator ...")
self._zap.stop()
for d in self._disposables:
self.log.info(f"disposing {d} ...")
d.dispose()
super().shutdown()
start_authenticator(self, domain='*', whitelist=None, blacklist=None, certificates_path=None)
Starts ZAP Authenticator in thread
configure_curve must be called every time certificates are added or removed, in order to update the Authenticator’s state
Parameters:
Name | Type | Description | Default |
---|---|---|---|
certificates_path |
str |
path to client public keys to allow |
None |
whitelist |
list[str] |
ip addresses to whitelist |
None |
domain |
|
(str): domain to apply authentication |
'*' |
Source code in agents/powerful_agent.py
def start_authenticator(
self, domain="*", whitelist=None, blacklist=None, certificates_path=None
):
"""Starts ZAP Authenticator in thread
configure_curve must be called every time certificates are added or removed, in order to update the Authenticator’s state
Args:
certificates_path (str): path to client public keys to allow
whitelist (list[str]): ip addresses to whitelist
domain: (str): domain to apply authentication
"""
certificates_path = certificates_path if certificates_path else CURVE_ALLOW_ANY
self._zap = ThreadAuthenticator(self.zmq_context, log=self.log)
self._zap.start()
if whitelist is not None:
self._zap.allow(*whitelist)
elif blacklist is not None:
self._zap.deny(*blacklist)
else:
self._zap.allow()
self._zap.configure_curve(domain=domain, location=certificates_path)
return self._zap