Broker connection and pools.
A connection to the broker.
Parameters: | URL – Broker URL, or a list of URLs, e.g. |
---|
Connection('amqp://guest:guest@localhost:5672//')
Connection('amqp://foo;amqp://bar', failover_strategy='round-robin')
Connection('redis://', transport_options={
'visibility_timeout': 3000,
})
import ssl
Connection('amqp://', login_method='EXTERNAL', ssl={
'ca_certs': '/etc/pki/tls/certs/something.crt',
'keyfile': '/etc/something/system.key',
'certfile': '/etc/something/system.cert',
'cert_reqs': ssl.CERT_REQUIRED,
})
SSL compatibility
SSL currently only works with the py-amqp & amqplib transports. For other transports you can use stunnel.
Parameters: |
|
---|
Note
The connection is established lazily when needed. If you need the connection to be established, then force it by calling connect():
>>> conn = Connection('amqp://')
>>> conn.connect()
and always remember to close the connection:
>>> conn.release()
Pool of channels.
See ChannelPool.
Parameters: |
|
---|
Example usage:
>>> connection = Connection('amqp://')
>>> pool = connection.ChannelPool(2)
>>> c1 = pool.acquire()
>>> c2 = pool.acquire()
>>> c3 = pool.acquire()
>>> c1.release()
>>> c3 = pool.acquire()
Create new kombu.Consumer instance using this connection.
Pool of connections.
See ConnectionPool.
Parameters: |
|
---|
Example usage:
>>> connection = Connection('amqp://')
>>> pool = connection.Pool(2)
>>> c1 = pool.acquire()
>>> c2 = pool.acquire()
>>> c3 = pool.acquire()
>>> c1.release()
>>> c3 = pool.acquire()
Create new kombu.Producer instance using this connection.
Create new SimpleQueue using a channel from this connection.
Same as SimpleQueue(), but configured with buffering semantics. The resulting queue and exchange will not be durable, also auto delete is enabled. Messages will be transient (not persistent), and acknowledgements are disabled (no_ack).
Create new SimpleQueue, using a channel from this connection.
If name is a string, a queue and exchange will be automatically created using that name as the name of the queue and exchange, also it will be used as the default routing key.
Parameters: |
|
---|
Convert connection parameters to URL form.
Decorator for functions supporting a channel keyword argument.
The resulting callable will retry calling the function if it raises connection or channel related errors. The return value will be a tuple of (retval, last_created_channel).
If a channel is not provided, then one will be automatically acquired (remember to close it afterwards).
See ensure() for the full list of supported keyword arguments.
Example usage:
channel = connection.channel()
try:
ret, channel = connection.autoretry(publish_messages, channel)
finally:
channel.close()
Create and return a new channel.
List of exceptions that may be raised by the channel.
Create a copy of the connection with the same connection settings.
Close the connection (if open).
Return true if the cycle is complete after number of retries.
Establish connection to server immediately.
Return true if the connection has been established.
The underlying connection object.
Warning
This instance is transport specific, so do not depend on the interface of this object.
List of exceptions that may be raised by the connection.
Iterator returning the next broker URL to try in the event of connection failure (initialized by failover_strategy).
The cache of declared entities is per connection, in case the server loses data.
Default channel, created upon access and closed when the connection is closed.
Can be used for automatic channel handling when you only need one channel, and also it is the channel implicitly used if a connection is passed instead of a channel, to functions that require a channel.
Wait for a single event from the server.
Parameters: | timeout – Timeout in seconds before we give up. |
---|
:raises socket.timeout: if the timeout is exceeded.
Ensure operation completes, regardless of any channel/connection errors occurring.
Will retry by establishing the connection, and reapplying the function.
Parameters: |
|
---|
Example
This is an example ensuring a publish operation:
>>> from kombu import Connection, Producer
>>> conn = Connection('amqp://')
>>> producer = Producer(conn)
>>> def errback(exc, interval):
... logger.error('Error: %r', exc, exc_info=1)
... logger.info('Retry in %s seconds.', interval)
>>> publish = conn.ensure(producer, producer.publish,
... errback=errback, max_retries=3)
>>> publish({'hello': 'world'}, routing_key='dest')
Ensure we have a connection to the server.
If not retry establishing the connection with the settings specified.
Parameters: |
|
---|
Strategy used to select new hosts when reconnecting after connection failure. One of “round-robin”, “shuffle” or any custom iterator constantly yielding new URLs to try.
Get the currently used transport class.
Heartbeat value, currently only supported by the py-amqp transport.
Verify that heartbeats are sent and received.
If the current transport does not support heartbeats then this is a noop operation.
Parameters: | rate – Rate is how often the tick is called compared to the actual heartbeat value. E.g. if the heartbeat is set to 3 seconds, and the tick is called every 3 / 2 seconds, then the rate is 2. |
---|
The host as a host name/port pair separated by colon.
Get connection info.
Experimental manager that can be used to manage/monitor the broker instance. Not available for all transports.
Close given channel, but ignore connection and channel errors.
Switch to next URL given by the current failover strategy (if any).
List of channel related exceptions that can be automatically recovered from without re-establishing the connection.
List of connection related exceptions that can be recovered from, but where the connection must be closed and re-established first.
Close the connection (if open).
Revive connection after connection re-established.
Switch connection parameters to use a new URL (does not reconnect)
Additional transport specific options, passed on to the transport instance.
See also
The shortcut methods Connection.Pool() and Connection.ChannelPool() is the recommended way to instantiate these classes.
Acquire resource.
Parameters: |
|
---|---|
Raises LimitExceeded: | |
if block is false and the limit has been exceeded. |
Close and remove all resources in the pool (also those in use).
Can be used to close resources from parent processes after fork (e.g. sockets/connections).
Acquire resource.
Parameters: |
|
---|---|
Raises LimitExceeded: | |
if block is false and the limit has been exceeded. |
Close and remove all resources in the pool (also those in use).
Can be used to close resources from parent processes after fork (e.g. sockets/connections).