Module proton
[frames] | no frames]

Source Code for Module proton

   1  # 
   2  # Licensed to the Apache Software Foundation (ASF) under one 
   3  # or more contributor license agreements.  See the NOTICE file 
   4  # distributed with this work for additional information 
   5  # regarding copyright ownership.  The ASF licenses this file 
   6  # to you under the Apache License, Version 2.0 (the 
   7  # "License"); you may not use this file except in compliance 
   8  # with the License.  You may obtain a copy of the License at 
   9  # 
  10  #   http://www.apache.org/licenses/LICENSE-2.0 
  11  # 
  12  # Unless required by applicable law or agreed to in writing, 
  13  # software distributed under the License is distributed on an 
  14  # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
  15  # KIND, either express or implied.  See the License for the 
  16  # specific language governing permissions and limitations 
  17  # under the License. 
  18  # 
  19   
  20  """ 
  21  The proton module defines a suite of APIs that implement the AMQP 1.0 
  22  protocol. 
  23   
  24  The proton APIs consist of the following classes: 
  25   
  26   - L{Messenger} -- A messaging endpoint. 
  27   - L{Message}   -- A class for creating and/or accessing AMQP message content. 
  28   - L{Data}      -- A class for creating and/or accessing arbitrary AMQP encoded 
  29                    data. 
  30   
  31  """ 
  32   
  33  from cproton import * 
  34  import weakref 
  35  try: 
  36    import uuid 
  37  except ImportError: 
  38    """ 
  39    No 'native' UUID support.  Provide a very basic UUID type that is a compatible subset of the uuid type provided by more modern python releases. 
  40    """ 
  41    import struct 
42 - class uuid:
43 - class UUID:
44 - def __init__(self, hex=None, bytes=None):
45 if [hex, bytes].count(None) != 1: 46 raise TypeError("need one of hex or bytes") 47 if bytes is not None: 48 self.bytes = bytes 49 elif hex is not None: 50 fields=hex.split("-") 51 fields[4:5] = [fields[4][:4], fields[4][4:]] 52 self.bytes = struct.pack("!LHHHHL", *[int(x,16) for x in fields])
53
54 - def __cmp__(self, other):
55 if isinstance(other, uuid.UUID): 56 return cmp(self.bytes, other.bytes) 57 else: 58 return -1
59
60 - def __str__(self):
61 return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack("!LHHHHL", self.bytes)
62
63 - def __repr__(self):
64 return "UUID(%r)" % str(self)
65
66 - def __hash__(self):
67 return self.bytes.__hash__()
68 69 import os, random, socket, time 70 rand = random.Random() 71 rand.seed((os.getpid(), time.time(), socket.gethostname()))
72 - def random_uuid():
73 bytes = [rand.randint(0, 255) for i in xrange(16)] 74 75 # From RFC4122, the version bits are set to 0100 76 bytes[7] &= 0x0F 77 bytes[7] |= 0x40 78 79 # From RFC4122, the top two bits of byte 8 get set to 01 80 bytes[8] &= 0x3F 81 bytes[8] |= 0x80 82 return "".join(map(chr, bytes))
83
84 - def uuid4():
85 return uuid.UUID(bytes=random_uuid())
86 87 try: 88 bytes() 89 except NameError: 90 bytes = str 91 92 API_LANGUAGE = "C" 93 IMPLEMENTATION_LANGUAGE = "C"
94 95 -class Constant(object):
96
97 - def __init__(self, name):
98 self.name = name
99
100 - def __repr__(self):
101 return self.name
102
103 -class ProtonException(Exception):
104 """ 105 The root of the proton exception hierarchy. All proton exception 106 classes derive from this exception. 107 """ 108 pass
109
110 -class Timeout(ProtonException):
111 """ 112 A timeout exception indicates that a blocking operation has timed 113 out. 114 """ 115 pass
116
117 -class Interrupt(ProtonException):
118 """ 119 An interrupt exception indicaes that a blocking operation was interrupted. 120 """ 121 pass
122
123 -class MessengerException(ProtonException):
124 """ 125 The root of the messenger exception hierarchy. All exceptions 126 generated by the messenger class derive from this exception. 127 """ 128 pass
129
130 -class MessageException(ProtonException):
131 """ 132 The MessageException class is the root of the message exception 133 hierarhcy. All exceptions generated by the Message class derive from 134 this exception. 135 """ 136 pass
137 138 EXCEPTIONS = { 139 PN_TIMEOUT: Timeout, 140 PN_INTR: Interrupt 141 } 142 143 PENDING = Constant("PENDING") 144 ACCEPTED = Constant("ACCEPTED") 145 REJECTED = Constant("REJECTED") 146 RELEASED = Constant("RELEASED") 147 ABORTED = Constant("ABORTED") 148 SETTLED = Constant("SETTLED") 149 150 STATUSES = { 151 PN_STATUS_ABORTED: ABORTED, 152 PN_STATUS_ACCEPTED: ACCEPTED, 153 PN_STATUS_REJECTED: REJECTED, 154 PN_STATUS_RELEASED: RELEASED, 155 PN_STATUS_PENDING: PENDING, 156 PN_STATUS_SETTLED: SETTLED, 157 PN_STATUS_UNKNOWN: None 158 } 159 160 AUTOMATIC = Constant("AUTOMATIC") 161 MANUAL = Constant("MANUAL")
162 163 -class Messenger(object):
164 """ 165 The L{Messenger} class defines a high level interface for sending 166 and receiving L{Messages<Message>}. Every L{Messenger} contains a 167 single logical queue of incoming messages and a single logical queue 168 of outgoing messages. These messages in these queues may be destined 169 for, or originate from, a variety of addresses. 170 171 The messenger interface is single-threaded. All methods 172 except one (L{interrupt}) are intended to be used from within 173 the messenger thread. 174 175 176 Address Syntax 177 ============== 178 179 An address has the following form:: 180 181 [ amqp[s]:// ] [user[:password]@] domain [/[name]] 182 183 Where domain can be one of:: 184 185 host | host:port | ip | ip:port | name 186 187 The following are valid examples of addresses: 188 189 - example.org 190 - example.org:1234 191 - amqp://example.org 192 - amqps://example.org 193 - example.org/incoming 194 - amqps://example.org/outgoing 195 - amqps://fred:trustno1@example.org 196 - 127.0.0.1:1234 197 - amqps://127.0.0.1:1234 198 199 Sending & Receiving Messages 200 ============================ 201 202 The L{Messenger} class works in conjuction with the L{Message} class. The 203 L{Message} class is a mutable holder of message content. 204 205 The L{put} method copies its L{Message} to the outgoing queue, and may 206 send queued messages if it can do so without blocking. The L{send} 207 method blocks until it has sent the requested number of messages, 208 or until a timeout interrupts the attempt. 209 210 211 >>> message = Message() 212 >>> for i in range(3): 213 ... message.address = "amqp://host/queue" 214 ... message.subject = "Hello World %i" % i 215 ... messenger.put(message) 216 >>> messenger.send() 217 218 Similarly, the L{recv} method receives messages into the incoming 219 queue, and may block as it attempts to receive the requested number 220 of messages, or until timeout is reached. It may receive fewer 221 than the requested number. The L{get} method pops the 222 eldest L{Message} off the incoming queue and copies it into the L{Message} 223 object that you supply. It will not block. 224 225 226 >>> message = Message() 227 >>> messenger.recv(10): 228 >>> while messenger.incoming > 0: 229 ... messenger.get(message) 230 ... print message.subject 231 Hello World 0 232 Hello World 1 233 Hello World 2 234 235 The blocking flag allows you to turn off blocking behavior entirely, 236 in which case L{send} and L{recv} will do whatever they can without 237 blocking, and then return. You can then look at the number 238 of incoming and outgoing messages to see how much outstanding work 239 still remains. 240 """ 241
242 - def __init__(self, name=None):
243 """ 244 Construct a new L{Messenger} with the given name. The name has 245 global scope. If a NULL name is supplied, a UUID based name will 246 be chosen. 247 248 @type name: string 249 @param name: the name of the messenger or None 250 251 """ 252 self._mng = pn_messenger(name) 253 self._selectables = {}
254
255 - def __del__(self):
256 """ 257 Destroy the L{Messenger}. This will close all connections that 258 are managed by the L{Messenger}. Call the L{stop} method before 259 destroying the L{Messenger}. 260 """ 261 if hasattr(self, "_mng"): 262 pn_messenger_free(self._mng) 263 del self._mng
264
265 - def _check(self, err):
266 if err < 0: 267 if (err == PN_INPROGRESS): 268 return 269 exc = EXCEPTIONS.get(err, MessengerException) 270 raise exc("[%s]: %s" % (err, pn_error_text(pn_messenger_error(self._mng)))) 271 else: 272 return err
273 274 @property
275 - def name(self):
276 """ 277 The name of the L{Messenger}. 278 """ 279 return pn_messenger_name(self._mng)
280
281 - def _get_certificate(self):
282 return pn_messenger_get_certificate(self._mng)
283
284 - def _set_certificate(self, value):
285 self._check(pn_messenger_set_certificate(self._mng, value))
286 287 certificate = property(_get_certificate, _set_certificate, 288 doc=""" 289 Path to a certificate file for the L{Messenger}. This certificate is 290 used when the L{Messenger} accepts or establishes SSL/TLS connections. 291 This property must be specified for the L{Messenger} to accept 292 incoming SSL/TLS connections and to establish client authenticated 293 outgoing SSL/TLS connection. Non client authenticated outgoing SSL/TLS 294 connections do not require this property. 295 """) 296
297 - def _get_private_key(self):
298 return pn_messenger_get_private_key(self._mng)
299
300 - def _set_private_key(self, value):
301 self._check(pn_messenger_set_private_key(self._mng, value))
302 303 private_key = property(_get_private_key, _set_private_key, 304 doc=""" 305 Path to a private key file for the L{Messenger's<Messenger>} 306 certificate. This property must be specified for the L{Messenger} to 307 accept incoming SSL/TLS connections and to establish client 308 authenticated outgoing SSL/TLS connection. Non client authenticated 309 SSL/TLS connections do not require this property. 310 """) 311
312 - def _get_password(self):
313 return pn_messenger_get_password(self._mng)
314
315 - def _set_password(self, value):
316 self._check(pn_messenger_set_password(self._mng, value))
317 318 password = property(_get_password, _set_password, 319 doc=""" 320 This property contains the password for the L{Messenger.private_key} 321 file, or None if the file is not encrypted. 322 """) 323
324 - def _get_trusted_certificates(self):
325 return pn_messenger_get_trusted_certificates(self._mng)
326
327 - def _set_trusted_certificates(self, value):
328 self._check(pn_messenger_set_trusted_certificates(self._mng, value))
329 330 trusted_certificates = property(_get_trusted_certificates, 331 _set_trusted_certificates, 332 doc=""" 333 A path to a database of trusted certificates for use in verifying the 334 peer on an SSL/TLS connection. If this property is None, then the peer 335 will not be verified. 336 """) 337
338 - def _get_timeout(self):
339 t = pn_messenger_get_timeout(self._mng) 340 if t == -1: 341 return None 342 else: 343 return float(t)/1000
344
345 - def _set_timeout(self, value):
346 if value is None: 347 t = -1 348 else: 349 t = long(1000*value) 350 self._check(pn_messenger_set_timeout(self._mng, t))
351 352 timeout = property(_get_timeout, _set_timeout, 353 doc=""" 354 The timeout property contains the default timeout for blocking 355 operations performed by the L{Messenger}. 356 """) 357
358 - def _is_blocking(self):
359 return pn_messenger_is_blocking(self._mng)
360
361 - def _set_blocking(self, b):
362 self._check(pn_messenger_set_blocking(self._mng, b))
363 364 blocking = property(_is_blocking, _set_blocking, 365 doc=""" 366 Enable or disable blocking behavior during L{Message} sending 367 and receiving. This affects every blocking call, with the 368 exception of L{work}. Currently, the affected calls are 369 L{send}, L{recv}, and L{stop}. 370 """) 371
372 - def _is_passive(self):
373 return pn_messenger_is_passive(self._mng)
374
375 - def _set_passive(self, b):
376 self._check(pn_messenger_set_passive(self._mng, b))
377 378 passive = property(_is_passive, _set_passive, 379 doc=""" 380 When passive is set to true, Messenger will not attempt to perform I/O 381 internally. In this mode it is necessary to use the selectables API to 382 drive any I/O needed to perform requested actions. In this mode 383 Messenger will never block. 384 """) 385
386 - def _get_incoming_window(self):
387 return pn_messenger_get_incoming_window(self._mng)
388
389 - def _set_incoming_window(self, window):
390 self._check(pn_messenger_set_incoming_window(self._mng, window))
391 392 incoming_window = property(_get_incoming_window, _set_incoming_window, 393 doc=""" 394 The incoming tracking window for the messenger. The messenger will 395 track the remote status of this many incoming deliveries after they 396 have been accepted or rejected. Defaults to zero. 397 398 L{Messages<Message>} enter this window only when you take them into your application 399 using L{get}. If your incoming window size is I{n}, and you get I{n}+1 L{messages<Message>} 400 without explicitly accepting or rejecting the oldest message, then the 401 message that passes beyond the edge of the incoming window will be assigned 402 the default disposition of its link. 403 """) 404
405 - def _get_outgoing_window(self):
406 return pn_messenger_get_outgoing_window(self._mng)
407
408 - def _set_outgoing_window(self, window):
409 self._check(pn_messenger_set_outgoing_window(self._mng, window))
410 411 outgoing_window = property(_get_outgoing_window, _set_outgoing_window, 412 doc=""" 413 The outgoing tracking window for the messenger. The messenger will 414 track the remote status of this many outgoing deliveries after calling 415 send. Defaults to zero. 416 417 A L{Message} enters this window when you call the put() method with the 418 message. If your outgoing window size is I{n}, and you call L{put} I{n}+1 419 times, status information will no longer be available for the 420 first message. 421 """) 422
423 - def start(self):
424 """ 425 Currently a no-op placeholder. 426 For future compatibility, do not L{send} or L{recv} messages 427 before starting the L{Messenger}. 428 """ 429 self._check(pn_messenger_start(self._mng))
430
431 - def stop(self):
432 """ 433 Transitions the L{Messenger} to an inactive state. An inactive 434 L{Messenger} will not send or receive messages from its internal 435 queues. A L{Messenger} should be stopped before being discarded to 436 ensure a clean shutdown handshake occurs on any internally managed 437 connections. 438 """ 439 self._check(pn_messenger_stop(self._mng))
440 441 @property
442 - def stopped(self):
443 """ 444 Returns true iff a L{Messenger} is in the stopped state. 445 This function does not block. 446 """ 447 return pn_messenger_stopped(self._mng)
448
449 - def subscribe(self, source):
450 """ 451 Subscribes the L{Messenger} to messages originating from the 452 specified source. The source is an address as specified in the 453 L{Messenger} introduction with the following addition. If the 454 domain portion of the address begins with the '~' character, the 455 L{Messenger} will interpret the domain as host/port, bind to it, 456 and listen for incoming messages. For example "~0.0.0.0", 457 "amqp://~0.0.0.0", and "amqps://~0.0.0.0" will all bind to any 458 local interface and listen for incoming messages with the last 459 variant only permitting incoming SSL connections. 460 461 @type source: string 462 @param source: the source of messages to subscribe to 463 """ 464 sub_impl = pn_messenger_subscribe(self._mng, source) 465 if not sub_impl: 466 self._check(pn_error_code(pn_messenger_error(self._mng))) 467 return Subscription(sub_impl)
468
469 - def put(self, message):
470 """ 471 Places the content contained in the message onto the outgoing 472 queue of the L{Messenger}. This method will never block, however 473 it will send any unblocked L{Messages<Message>} in the outgoing 474 queue immediately and leave any blocked L{Messages<Message>} 475 remaining in the outgoing queue. The L{send} call may be used to 476 block until the outgoing queue is empty. The L{outgoing} property 477 may be used to check the depth of the outgoing queue. 478 479 When the content in a given L{Message} object is copied to the outgoing 480 message queue, you may then modify or discard the L{Message} object 481 without having any impact on the content in the outgoing queue. 482 483 This method returns an outgoing tracker for the L{Message}. The tracker 484 can be used to determine the delivery status of the L{Message}. 485 486 @type message: Message 487 @param message: the message to place in the outgoing queue 488 @return: a tracker 489 """ 490 message._pre_encode() 491 self._check(pn_messenger_put(self._mng, message._msg)) 492 return pn_messenger_outgoing_tracker(self._mng)
493
494 - def status(self, tracker):
495 """ 496 Gets the last known remote state of the delivery associated with 497 the given tracker. 498 499 @type tracker: tracker 500 @param tracker: the tracker whose status is to be retrieved 501 502 @return: one of None, PENDING, REJECTED, or ACCEPTED 503 """ 504 disp = pn_messenger_status(self._mng, tracker); 505 return STATUSES.get(disp, disp)
506
507 - def buffered(self, tracker):
508 """ 509 Checks if the delivery associated with the given tracker is still 510 waiting to be sent. 511 512 @type tracker: tracker 513 @param tracker: the tracker whose status is to be retrieved 514 515 @return: true if delivery is still buffered 516 """ 517 return pn_messenger_buffered(self._mng, tracker);
518
519 - def settle(self, tracker=None):
520 """ 521 Frees a L{Messenger} from tracking the status associated with a given 522 tracker. If you don't supply a tracker, all outgoing L{messages<Message>} up 523 to the most recent will be settled. 524 """ 525 if tracker is None: 526 tracker = pn_messenger_outgoing_tracker(self._mng) 527 flags = PN_CUMULATIVE 528 else: 529 flags = 0 530 self._check(pn_messenger_settle(self._mng, tracker, flags))
531
532 - def send(self, n=-1):
533 """ 534 This call will block until the indicated number of L{messages<Message>} 535 have been sent, or until the operation times out. If n is -1 this call will 536 block until all outgoing L{messages<Message>} have been sent. If n is 0 then 537 this call will send whatever it can without blocking. 538 """ 539 self._check(pn_messenger_send(self._mng, n))
540
541 - def recv(self, n=None):
542 """ 543 Receives up to I{n} L{messages<Message>} into the incoming queue. If no value 544 for I{n} is supplied, this call will receive as many L{messages<Message>} as it 545 can buffer internally. If the L{Messenger} is in blocking mode, this 546 call will block until at least one L{Message} is available in the 547 incoming queue. 548 """ 549 if n is None: 550 n = -1 551 self._check(pn_messenger_recv(self._mng, n))
552
553 - def work(self, timeout=None):
554 """ 555 Sends or receives any outstanding L{messages<Message>} queued for a L{Messenger}. 556 This will block for the indicated timeout. 557 This method may also do I/O work other than sending and receiving 558 L{messages<Message>}. For example, closing connections after messenger.L{stop}() 559 has been called. 560 """ 561 if timeout is None: 562 t = -1 563 else: 564 t = long(1000*timeout) 565 err = pn_messenger_work(self._mng, t) 566 if (err == PN_TIMEOUT): 567 return False 568 else: 569 self._check(err) 570 return True
571 572 @property
573 - def receiving(self):
574 return pn_messenger_receiving(self._mng)
575
576 - def interrupt(self):
577 """ 578 The L{Messenger} interface is single-threaded. 579 This is the only L{Messenger} function intended to be called 580 from outside of the L{Messenger} thread. 581 Call this from a non-messenger thread to interrupt 582 a L{Messenger} that is blocking. 583 This will cause any in-progress blocking call to throw 584 the L{Interrupt} exception. If there is no currently blocking 585 call, then the next blocking call will be affected, even if it 586 is within the same thread that interrupt was called from. 587 """ 588 self._check(pn_messenger_interrupt(self._mng))
589
590 - def get(self, message=None):
591 """ 592 Moves the message from the head of the incoming message queue into 593 the supplied message object. Any content in the message will be 594 overwritten. 595 596 A tracker for the incoming L{Message} is returned. The tracker can 597 later be used to communicate your acceptance or rejection of the 598 L{Message}. 599 600 If None is passed in for the L{Message} object, the L{Message} 601 popped from the head of the queue is discarded. 602 603 @type message: Message 604 @param message: the destination message object 605 @return: a tracker 606 """ 607 if message is None: 608 impl = None 609 else: 610 impl = message._msg 611 self._check(pn_messenger_get(self._mng, impl)) 612 if message is not None: 613 message._post_decode() 614 return pn_messenger_incoming_tracker(self._mng)
615
616 - def accept(self, tracker=None):
617 """ 618 Signal the sender that you have acted on the L{Message} 619 pointed to by the tracker. If no tracker is supplied, 620 then all messages that have been returned by the L{get} 621 method are accepted, except those that have already been 622 auto-settled by passing beyond your incoming window size. 623 624 @type tracker: tracker 625 @param tracker: a tracker as returned by get 626 """ 627 if tracker is None: 628 tracker = pn_messenger_incoming_tracker(self._mng) 629 flags = PN_CUMULATIVE 630 else: 631 flags = 0 632 self._check(pn_messenger_accept(self._mng, tracker, flags))
633
634 - def reject(self, tracker=None):
635 """ 636 Rejects the L{Message} indicated by the tracker. If no tracker 637 is supplied, all messages that have been returned by the L{get} 638 method are rejected, except those that have already been auto-settled 639 by passing beyond your outgoing window size. 640 641 @type tracker: tracker 642 @param tracker: a tracker as returned by get 643 """ 644 if tracker is None: 645 tracker = pn_messenger_incoming_tracker(self._mng) 646 flags = PN_CUMULATIVE 647 else: 648 flags = 0 649 self._check(pn_messenger_reject(self._mng, tracker, flags))
650 651 @property
652 - def outgoing(self):
653 """ 654 The outgoing queue depth. 655 """ 656 return pn_messenger_outgoing(self._mng)
657 658 @property
659 - def incoming(self):
660 """ 661 The incoming queue depth. 662 """ 663 return pn_messenger_incoming(self._mng)
664
665 - def route(self, pattern, address):
666 """ 667 Adds a routing rule to a L{Messenger's<Messenger>} internal routing table. 668 669 The route procedure may be used to influence how a L{Messenger} will 670 internally treat a given address or class of addresses. Every call 671 to the route procedure will result in L{Messenger} appending a routing 672 rule to its internal routing table. 673 674 Whenever a L{Message} is presented to a L{Messenger} for delivery, it 675 will match the address of this message against the set of routing 676 rules in order. The first rule to match will be triggered, and 677 instead of routing based on the address presented in the message, 678 the L{Messenger} will route based on the address supplied in the rule. 679 680 The pattern matching syntax supports two types of matches, a '%' 681 will match any character except a '/', and a '*' will match any 682 character including a '/'. 683 684 A routing address is specified as a normal AMQP address, however it 685 may additionally use substitution variables from the pattern match 686 that triggered the rule. 687 688 Any message sent to "foo" will be routed to "amqp://foo.com": 689 690 >>> messenger.route("foo", "amqp://foo.com"); 691 692 Any message sent to "foobar" will be routed to 693 "amqp://foo.com/bar": 694 695 >>> messenger.route("foobar", "amqp://foo.com/bar"); 696 697 Any message sent to bar/<path> will be routed to the corresponding 698 path within the amqp://bar.com domain: 699 700 >>> messenger.route("bar/*", "amqp://bar.com/$1"); 701 702 Route all L{messages<Message>} over TLS: 703 704 >>> messenger.route("amqp:*", "amqps:$1") 705 706 Supply credentials for foo.com: 707 708 >>> messenger.route("amqp://foo.com/*", "amqp://user:password@foo.com/$1"); 709 710 Supply credentials for all domains: 711 712 >>> messenger.route("amqp://*", "amqp://user:password@$1"); 713 714 Route all addresses through a single proxy while preserving the 715 original destination: 716 717 >>> messenger.route("amqp://%/*", "amqp://user:password@proxy/$1/$2"); 718 719 Route any address through a single broker: 720 721 >>> messenger.route("*", "amqp://user:password@broker/$1"); 722 """ 723 self._check(pn_messenger_route(self._mng, pattern, address))
724
725 - def rewrite(self, pattern, address):
726 """ 727 Similar to route(), except that the destination of 728 the L{Message} is determined before the message address is rewritten. 729 730 The outgoing address is only rewritten after routing has been 731 finalized. If a message has an outgoing address of 732 "amqp://0.0.0.0:5678", and a rewriting rule that changes its 733 outgoing address to "foo", it will still arrive at the peer that 734 is listening on "amqp://0.0.0.0:5678", but when it arrives there, 735 the receiver will see its outgoing address as "foo". 736 737 The default rewrite rule removes username and password from addresses 738 before they are transmitted. 739 """ 740 self._check(pn_messenger_rewrite(self._mng, pattern, address))
741
742 - def selectable(self):
743 impl = pn_messenger_selectable(self._mng) 744 if impl: 745 fd = pn_selectable_fd(impl) 746 sel = self._selectables.get(fd, None) 747 if sel is None: 748 sel = Selectable(self, impl) 749 self._selectables[fd] = sel 750 return sel 751 else: 752 return None
753 754 @property
755 - def deadline(self):
756 tstamp = pn_messenger_deadline(self._mng) 757 if tstamp: 758 return float(tstamp)/1000 759 else: 760 return None
761
762 -class Message(object):
763 """ 764 The L{Message} class is a mutable holder of message content. 765 766 @ivar instructions: delivery instructions for the message 767 @type instructions: dict 768 @ivar annotations: infrastructure defined message annotations 769 @type annotations: dict 770 @ivar properties: application defined message properties 771 @type properties: dict 772 @ivar body: message body 773 @type body: bytes | unicode | dict | list | int | long | float | UUID 774 """ 775 776 DATA = PN_DATA 777 TEXT = PN_TEXT 778 AMQP = PN_AMQP 779 JSON = PN_JSON 780 781 DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY 782
783 - def __init__(self):
784 self._msg = pn_message() 785 self._id = Data(pn_message_id(self._msg)) 786 self._correlation_id = Data(pn_message_correlation_id(self._msg)) 787 self.instructions = None 788 self.annotations = None 789 self.properties = None 790 self.body = None
791
792 - def __del__(self):
793 if hasattr(self, "_msg"): 794 pn_message_free(self._msg) 795 del self._msg
796
797 - def _check(self, err):
798 if err < 0: 799 exc = EXCEPTIONS.get(err, MessageException) 800 raise exc("[%s]: %s" % (err, pn_message_error(self._msg))) 801 else: 802 return err
803
804 - def _pre_encode(self):
805 inst = Data(pn_message_instructions(self._msg)) 806 ann = Data(pn_message_annotations(self._msg)) 807 props = Data(pn_message_properties(self._msg)) 808 body = Data(pn_message_body(self._msg)) 809 810 inst.clear() 811 if self.instructions is not None: 812 inst.put_object(self.instructions) 813 ann.clear() 814 if self.annotations is not None: 815 ann.put_object(self.annotations) 816 props.clear() 817 if self.properties is not None: 818 props.put_object(self.properties) 819 body.clear() 820 if self.body is not None: 821 body.put_object(self.body)
822
823 - def _post_decode(self):
824 inst = Data(pn_message_instructions(self._msg)) 825 ann = Data(pn_message_annotations(self._msg)) 826 props = Data(pn_message_properties(self._msg)) 827 body = Data(pn_message_body(self._msg)) 828 829 if inst.next(): 830 self.instructions = inst.get_object() 831 else: 832 self.instructions = None 833 if ann.next(): 834 self.annotations = ann.get_object() 835 else: 836 self.annotations = None 837 if props.next(): 838 self.properties = props.get_object() 839 else: 840 self.properties = None 841 if body.next(): 842 self.body = body.get_object() 843 else: 844 self.body = None
845
846 - def clear(self):
847 """ 848 Clears the contents of the L{Message}. All fields will be reset to 849 their default values. 850 """ 851 pn_message_clear(self._msg) 852 self.instructions = None 853 self.annotations = None 854 self.properties = None 855 self.body = None
856
857 - def _is_inferred(self):
858 return pn_message_is_inferred(self._msg)
859
860 - def _set_inferred(self, value):
861 self._check(pn_message_set_inferred(self._msg, bool(value)))
862 863 inferred = property(_is_inferred, _set_inferred) 864
865 - def _is_durable(self):
866 return pn_message_is_durable(self._msg)
867
868 - def _set_durable(self, value):
869 self._check(pn_message_set_durable(self._msg, bool(value)))
870 871 durable = property(_is_durable, _set_durable, 872 doc=""" 873 The durable property indicates that the message should be held durably 874 by any intermediaries taking responsibility for the message. 875 """) 876
877 - def _get_priority(self):
878 return pn_message_get_priority(self._msg)
879
880 - def _set_priority(self, value):
881 self._check(pn_message_set_priority(self._msg, value))
882 883 priority = property(_get_priority, _set_priority, 884 doc=""" 885 The priority of the message. 886 """) 887
888 - def _get_ttl(self):
889 return pn_message_get_ttl(self._msg)
890
891 - def _set_ttl(self, value):
892 self._check(pn_message_set_ttl(self._msg, value))
893 894 ttl = property(_get_ttl, _set_ttl, 895 doc=""" 896 The time to live of the message measured in milliseconds. Expired 897 messages may be dropped. 898 """) 899
900 - def _is_first_acquirer(self):
901 return pn_message_is_first_acquirer(self._msg)
902
903 - def _set_first_acquirer(self, value):
904 self._check(pn_message_set_first_acquirer(self._msg, bool(value)))
905 906 first_acquirer = property(_is_first_acquirer, _set_first_acquirer, 907 doc=""" 908 True iff the recipient is the first to acquire the message. 909 """) 910
911 - def _get_delivery_count(self):
912 return pn_message_get_delivery_count(self._msg)
913
914 - def _set_delivery_count(self, value):
915 self._check(pn_message_set_delivery_count(self._msg, value))
916 917 delivery_count = property(_get_delivery_count, _set_delivery_count, 918 doc=""" 919 The number of delivery attempts made for this message. 920 """) 921 922
923 - def _get_id(self):
924 return self._id.get_object()
925 - def _set_id(self, value):
926 if type(value) in (int, long): 927 value = ulong(value) 928 self._id.rewind() 929 self._id.put_object(value)
930 id = property(_get_id, _set_id, 931 doc=""" 932 The id of the message. 933 """) 934
935 - def _get_user_id(self):
936 return pn_message_get_user_id(self._msg)
937
938 - def _set_user_id(self, value):
939 self._check(pn_message_set_user_id(self._msg, value))
940 941 user_id = property(_get_user_id, _set_user_id, 942 doc=""" 943 The user id of the message creator. 944 """) 945
946 - def _get_address(self):
947 return pn_message_get_address(self._msg)
948
949 - def _set_address(self, value):
950 self._check(pn_message_set_address(self._msg, value))
951 952 address = property(_get_address, _set_address, 953 doc=""" 954 The address of the message. 955 """) 956
957 - def _get_subject(self):
958 return pn_message_get_subject(self._msg)
959
960 - def _set_subject(self, value):
961 self._check(pn_message_set_subject(self._msg, value))
962 963 subject = property(_get_subject, _set_subject, 964 doc=""" 965 The subject of the message. 966 """) 967
968 - def _get_reply_to(self):
969 return pn_message_get_reply_to(self._msg)
970
971 - def _set_reply_to(self, value):
972 self._check(pn_message_set_reply_to(self._msg, value))
973 974 reply_to = property(_get_reply_to, _set_reply_to, 975 doc=""" 976 The reply-to address for the message. 977 """) 978
979 - def _get_correlation_id(self):
980 return self._correlation_id.get_object()
981 - def _set_correlation_id(self, value):
982 if type(value) in (int, long): 983 value = ulong(value) 984 self._correlation_id.rewind() 985 self._correlation_id.put_object(value)
986 987 correlation_id = property(_get_correlation_id, _set_correlation_id, 988 doc=""" 989 The correlation-id for the message. 990 """) 991
992 - def _get_content_type(self):
993 return pn_message_get_content_type(self._msg)
994
995 - def _set_content_type(self, value):
996 self._check(pn_message_set_content_type(self._msg, value))
997 998 content_type = property(_get_content_type, _set_content_type, 999 doc=""" 1000 The content-type of the message. 1001 """) 1002
1003 - def _get_content_encoding(self):
1004 return pn_message_get_content_encoding(self._msg)
1005
1006 - def _set_content_encoding(self, value):
1007 self._check(pn_message_set_content_encoding(self._msg, value))
1008 1009 content_encoding = property(_get_content_encoding, _set_content_encoding, 1010 doc=""" 1011 The content-encoding of the message. 1012 """) 1013
1014 - def _get_expiry_time(self):
1015 return pn_message_get_expiry_time(self._msg)
1016
1017 - def _set_expiry_time(self, value):
1018 self._check(pn_message_set_expiry_time(self._msg, value))
1019 1020 expiry_time = property(_get_expiry_time, _set_expiry_time, 1021 doc=""" 1022 The expiry time of the message. 1023 """) 1024
1025 - def _get_creation_time(self):
1026 return pn_message_get_creation_time(self._msg)
1027
1028 - def _set_creation_time(self, value):
1029 self._check(pn_message_set_creation_time(self._msg, value))
1030 1031 creation_time = property(_get_creation_time, _set_creation_time, 1032 doc=""" 1033 The creation time of the message. 1034 """) 1035
1036 - def _get_group_id(self):
1037 return pn_message_get_group_id(self._msg)
1038
1039 - def _set_group_id(self, value):
1040 self._check(pn_message_set_group_id(self._msg, value))
1041 1042 group_id = property(_get_group_id, _set_group_id, 1043 doc=""" 1044 The group id of the message. 1045 """) 1046
1047 - def _get_group_sequence(self):
1048 return pn_message_get_group_sequence(self._msg)
1049
1050 - def _set_group_sequence(self, value):
1051 self._check(pn_message_set_group_sequence(self._msg, value))
1052 1053 group_sequence = property(_get_group_sequence, _set_group_sequence, 1054 doc=""" 1055 The sequence of the message within its group. 1056 """) 1057
1058 - def _get_reply_to_group_id(self):
1059 return pn_message_get_reply_to_group_id(self._msg)
1060
1061 - def _set_reply_to_group_id(self, value):
1062 self._check(pn_message_set_reply_to_group_id(self._msg, value))
1063 1064 reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, 1065 doc=""" 1066 The group-id for any replies. 1067 """) 1068 1069 # XXX
1070 - def _get_format(self):
1071 return pn_message_get_format(self._msg)
1072
1073 - def _set_format(self, value):
1074 self._check(pn_message_set_format(self._msg, value))
1075 1076 format = property(_get_format, _set_format, 1077 doc=""" 1078 The format of the message. 1079 """) 1080
1081 - def encode(self):
1082 self._pre_encode() 1083 sz = 16 1084 while True: 1085 err, data = pn_message_encode(self._msg, sz) 1086 if err == PN_OVERFLOW: 1087 sz *= 2 1088 continue 1089 else: 1090 self._check(err) 1091 return data
1092
1093 - def decode(self, data):
1094 self._check(pn_message_decode(self._msg, data, len(data))) 1095 self._post_decode()
1096
1097 - def load(self, data):
1098 self._check(pn_message_load(self._msg, data))
1099
1100 - def save(self):
1101 sz = 16 1102 while True: 1103 err, data = pn_message_save(self._msg, sz) 1104 if err == PN_OVERFLOW: 1105 sz *= 2 1106 continue 1107 else: 1108 self._check(err) 1109 return data
1110
1111 - def __repr2__(self):
1112 props = [] 1113 for attr in ("inferred", "address", "reply_to", "durable", "ttl", 1114 "priority", "first_acquirer", "delivery_count", "id", 1115 "correlation_id", "user_id", "group_id", "group_sequence", 1116 "reply_to_group_id", "instructions", "annotations", 1117 "properties", "body"): 1118 value = getattr(self, attr) 1119 if value: props.append("%s=%r" % (attr, value)) 1120 return "Message(%s)" % ", ".join(props)
1121
1122 - def __repr__(self):
1123 tmp = pn_string(None) 1124 err = pn_inspect(self._msg, tmp) 1125 result = pn_string_get(tmp) 1126 pn_free(tmp) 1127 self._check(err) 1128 return result
1129
1130 -class Subscription(object):
1131
1132 - def __init__(self, impl):
1133 self._impl = impl
1134 1135 @property
1136 - def address(self):
1137 return pn_subscription_address(self._impl)
1138
1139 -class Selectable(object):
1140
1141 - def __init__(self, messenger, impl):
1142 self.messenger = messenger 1143 self._impl = impl
1144
1145 - def fileno(self):
1146 if not self._impl: raise ValueError("selectable freed") 1147 return pn_selectable_fd(self._impl)
1148 1149 @property
1150 - def capacity(self):
1151 if not self._impl: raise ValueError("selectable freed") 1152 return pn_selectable_capacity(self._impl)
1153 1154 @property
1155 - def pending(self):
1156 if not self._impl: raise ValueError("selectable freed") 1157 return pn_selectable_pending(self._impl)
1158 1159 @property
1160 - def deadline(self):
1161 if not self._impl: raise ValueError("selectable freed") 1162 tstamp = pn_selectable_deadline(self._impl) 1163 if tstamp: 1164 return float(tstamp)/1000 1165 else: 1166 return None
1167
1168 - def readable(self):
1169 if not self._impl: raise ValueError("selectable freed") 1170 pn_selectable_readable(self._impl)
1171
1172 - def writable(self):
1173 if not self._impl: raise ValueError("selectable freed") 1174 pn_selectable_writable(self._impl)
1175
1176 - def expired(self):
1177 if not self._impl: raise ValueError("selectable freed") 1178 pn_selectable_expired(self._impl)
1179
1180 - def _is_registered(self):
1181 if not self._impl: raise ValueError("selectable freed") 1182 return pn_selectable_is_registered(self._impl)
1183
1184 - def _set_registered(self, registered):
1185 if not self._impl: raise ValueError("selectable freed") 1186 pn_selectable_set_registered(self._impl, registered)
1187 1188 registered = property(_is_registered, _set_registered, 1189 doc=""" 1190 The registered property may be get/set by an I/O polling system to 1191 indicate whether the fd has been registered or not. 1192 """) 1193 1194 @property
1195 - def is_terminal(self):
1196 if not self._impl: return True 1197 return pn_selectable_is_terminal(self._impl)
1198
1199 - def free(self):
1200 if self._impl: 1201 del self.messenger._selectables[self.fileno()] 1202 pn_selectable_free(self._impl) 1203 self._impl = None
1204
1205 - def __del__(self):
1206 self.free()
1207
1208 -class DataException(ProtonException):
1209 """ 1210 The DataException class is the root of the Data exception hierarchy. 1211 All exceptions raised by the Data class extend this exception. 1212 """ 1213 pass
1214
1215 -class UnmappedType:
1216
1217 - def __init__(self, msg):
1218 self.msg = msg
1219
1220 - def __repr__(self):
1221 return "UnmappedType(%s)" % self.msg
1222
1223 -class ulong(long):
1224
1225 - def __repr__(self):
1226 return "ulong(%s)" % long.__repr__(self)
1227
1228 -class timestamp(long):
1229
1230 - def __repr__(self):
1231 return "timestamp(%s)" % long.__repr__(self)
1232
1233 -class symbol(unicode):
1234
1235 - def __repr__(self):
1236 return "symbol(%s)" % unicode.__repr__(self)
1237
1238 -class char(unicode):
1239
1240 - def __repr__(self):
1241 return "char(%s)" % unicode.__repr__(self)
1242
1243 -class Described(object):
1244
1245 - def __init__(self, descriptor, value):
1246 self.descriptor = descriptor 1247 self.value = value
1248
1249 - def __repr__(self):
1250 return "Described(%r, %r)" % (self.descriptor, self.value)
1251
1252 - def __eq__(self, o):
1253 if isinstance(o, Described): 1254 return self.descriptor == o.descriptor and self.value == o.value 1255 else: 1256 return False
1257 1258 UNDESCRIBED = Constant("UNDESCRIBED")
1259 1260 -class Array(object):
1261
1262 - def __init__(self, descriptor, type, *elements):
1263 self.descriptor = descriptor 1264 self.type = type 1265 self.elements = elements
1266
1267 - def __repr__(self):
1268 if self.elements: 1269 els = ", %s" % (", ".join(map(repr, self.elements))) 1270 else: 1271 els = "" 1272 return "Array(%r, %r%s)" % (self.descriptor, self.type, els)
1273
1274 - def __eq__(self, o):
1275 if isinstance(o, Array): 1276 return self.descriptor == o.descriptor and \ 1277 self.type == o.type and self.elements == o.elements 1278 else: 1279 return False
1280
1281 -class Data:
1282 """ 1283 The L{Data} class provides an interface for decoding, extracting, 1284 creating, and encoding arbitrary AMQP data. A L{Data} object 1285 contains a tree of AMQP values. Leaf nodes in this tree correspond 1286 to scalars in the AMQP type system such as L{ints<INT>} or 1287 L{strings<STRING>}. Non-leaf nodes in this tree correspond to 1288 compound values in the AMQP type system such as L{lists<LIST>}, 1289 L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. 1290 The root node of the tree is the L{Data} object itself and can have 1291 an arbitrary number of children. 1292 1293 A L{Data} object maintains the notion of the current sibling node 1294 and a current parent node. Siblings are ordered within their parent. 1295 Values are accessed and/or added by using the L{next}, L{prev}, 1296 L{enter}, and L{exit} methods to navigate to the desired location in 1297 the tree and using the supplied variety of put_*/get_* methods to 1298 access or add a value of the desired type. 1299 1300 The put_* methods will always add a value I{after} the current node 1301 in the tree. If the current node has a next sibling the put_* method 1302 will overwrite the value on this node. If there is no current node 1303 or the current node has no next sibling then one will be added. The 1304 put_* methods always set the added/modified node to the current 1305 node. The get_* methods read the value of the current node and do 1306 not change which node is current. 1307 1308 The following types of scalar values are supported: 1309 1310 - L{NULL} 1311 - L{BOOL} 1312 - L{UBYTE} 1313 - L{USHORT} 1314 - L{SHORT} 1315 - L{UINT} 1316 - L{INT} 1317 - L{ULONG} 1318 - L{LONG} 1319 - L{FLOAT} 1320 - L{DOUBLE} 1321 - L{BINARY} 1322 - L{STRING} 1323 - L{SYMBOL} 1324 1325 The following types of compound values are supported: 1326 1327 - L{DESCRIBED} 1328 - L{ARRAY} 1329 - L{LIST} 1330 - L{MAP} 1331 """ 1332 1333 NULL = PN_NULL; "A null value." 1334 BOOL = PN_BOOL; "A boolean value." 1335 UBYTE = PN_UBYTE; "An unsigned byte value." 1336 BYTE = PN_BYTE; "A signed byte value." 1337 USHORT = PN_USHORT; "An unsigned short value." 1338 SHORT = PN_SHORT; "A short value." 1339 UINT = PN_UINT; "An unsigned int value." 1340 INT = PN_INT; "A signed int value." 1341 CHAR = PN_CHAR; "A character value." 1342 ULONG = PN_ULONG; "An unsigned long value." 1343 LONG = PN_LONG; "A signed long value." 1344 TIMESTAMP = PN_TIMESTAMP; "A timestamp value." 1345 FLOAT = PN_FLOAT; "A float value." 1346 DOUBLE = PN_DOUBLE; "A double value." 1347 DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." 1348 DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." 1349 DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." 1350 UUID = PN_UUID; "A UUID value." 1351 BINARY = PN_BINARY; "A binary string." 1352 STRING = PN_STRING; "A unicode string." 1353 SYMBOL = PN_SYMBOL; "A symbolic string." 1354 DESCRIBED = PN_DESCRIBED; "A described value." 1355 ARRAY = PN_ARRAY; "An array value." 1356 LIST = PN_LIST; "A list value." 1357 MAP = PN_MAP; "A map value." 1358 1359 type_names = { 1360 NULL: "null", 1361 BOOL: "bool", 1362 BYTE: "byte", 1363 UBYTE: "ubyte", 1364 SHORT: "short", 1365 USHORT: "ushort", 1366 INT: "int", 1367 UINT: "uint", 1368 CHAR: "char", 1369 LONG: "long", 1370 ULONG: "ulong", 1371 TIMESTAMP: "timestamp", 1372 FLOAT: "float", 1373 DOUBLE: "double", 1374 DECIMAL32: "decimal32", 1375 DECIMAL64: "decimal64", 1376 DECIMAL128: "decimal128", 1377 UUID: "uuid", 1378 BINARY: "binary", 1379 STRING: "string", 1380 SYMBOL: "symbol", 1381 DESCRIBED: "described", 1382 ARRAY: "array", 1383 LIST: "list", 1384 MAP: "map" 1385 } 1386 1387 @classmethod
1388 - def type_name(type): return Data.type_names[type]
1389
1390 - def __init__(self, capacity=16):
1391 if type(capacity) in (int, long): 1392 self._data = pn_data(capacity) 1393 self._free = True 1394 else: 1395 self._data = capacity 1396 self._free = False
1397
1398 - def __del__(self):
1399 if self._free and hasattr(self, "_data"): 1400 pn_data_free(self._data) 1401 del self._data
1402
1403 - def _check(self, err):
1404 if err < 0: 1405 exc = EXCEPTIONS.get(err, DataException) 1406 raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data)))) 1407 else: 1408 return err
1409
1410 - def clear(self):
1411 """ 1412 Clears the data object. 1413 """ 1414 pn_data_clear(self._data)
1415
1416 - def rewind(self):
1417 """ 1418 Clears current node and sets the parent to the root node. Clearing the 1419 current node sets it _before_ the first node, calling next() will advance to 1420 the first node. 1421 """ 1422 pn_data_rewind(self._data)
1423
1424 - def next(self):
1425 """ 1426 Advances the current node to its next sibling and returns its 1427 type. If there is no next sibling the current node remains 1428 unchanged and None is returned. 1429 """ 1430 found = pn_data_next(self._data) 1431 if found: 1432 return self.type() 1433 else: 1434 return None
1435
1436 - def prev(self):
1437 """ 1438 Advances the current node to its previous sibling and returns its 1439 type. If there is no previous sibling the current node remains 1440 unchanged and None is returned. 1441 """ 1442 found = pn_data_prev(self._data) 1443 if found: 1444 return self.type() 1445 else: 1446 return None
1447
1448 - def enter(self):
1449 """ 1450 Sets the parent node to the current node and clears the current node. 1451 Clearing the current node sets it _before_ the first child, 1452 call next() advances to the first child. 1453 """ 1454 return pn_data_enter(self._data)
1455
1456 - def exit(self):
1457 """ 1458 Sets the current node to the parent node and the parent node to 1459 its own parent. 1460 """ 1461 return pn_data_exit(self._data)
1462
1463 - def lookup(self, name):
1464 return pn_data_lookup(self._data, name)
1465
1466 - def narrow(self):
1467 pn_data_narrow(self._data)
1468
1469 - def widen(self):
1470 pn_data_widen(self._data)
1471
1472 - def type(self):
1473 """ 1474 Returns the type of the current node. 1475 """ 1476 dtype = pn_data_type(self._data) 1477 if dtype == -1: 1478 return None 1479 else: 1480 return dtype
1481
1482 - def encode(self):
1483 """ 1484 Returns a representation of the data encoded in AMQP format. 1485 """ 1486 size = 1024 1487 while True: 1488 cd, enc = pn_data_encode(self._data, size) 1489 if cd == PN_OVERFLOW: 1490 size *= 2 1491 elif cd >= 0: 1492 return enc 1493 else: 1494 self._check(cd)
1495
1496 - def decode(self, encoded):
1497 """ 1498 Decodes the first value from supplied AMQP data and returns the 1499 number of bytes consumed. 1500 1501 @type encoded: binary 1502 @param encoded: AMQP encoded binary data 1503 """ 1504 return self._check(pn_data_decode(self._data, encoded))
1505
1506 - def put_list(self):
1507 """ 1508 Puts a list value. Elements may be filled by entering the list 1509 node and putting element values. 1510 1511 >>> data = Data() 1512 >>> data.put_list() 1513 >>> data.enter() 1514 >>> data.put_int(1) 1515 >>> data.put_int(2) 1516 >>> data.put_int(3) 1517 >>> data.exit() 1518 """ 1519 self._check(pn_data_put_list(self._data))
1520
1521 - def put_map(self):
1522 """ 1523 Puts a map value. Elements may be filled by entering the map node 1524 and putting alternating key value pairs. 1525 1526 >>> data = Data() 1527 >>> data.put_map() 1528 >>> data.enter() 1529 >>> data.put_string("key") 1530 >>> data.put_string("value") 1531 >>> data.exit() 1532 """ 1533 self._check(pn_data_put_map(self._data))
1534
1535 - def put_array(self, described, element_type):
1536 """ 1537 Puts an array value. Elements may be filled by entering the array 1538 node and putting the element values. The values must all be of the 1539 specified array element type. If an array is described then the 1540 first child value of the array is the descriptor and may be of any 1541 type. 1542 1543 >>> data = Data() 1544 >>> 1545 >>> data.put_array(False, Data.INT) 1546 >>> data.enter() 1547 >>> data.put_int(1) 1548 >>> data.put_int(2) 1549 >>> data.put_int(3) 1550 >>> data.exit() 1551 >>> 1552 >>> data.put_array(True, Data.DOUBLE) 1553 >>> data.enter() 1554 >>> data.put_symbol("array-descriptor") 1555 >>> data.put_double(1.1) 1556 >>> data.put_double(1.2) 1557 >>> data.put_double(1.3) 1558 >>> data.exit() 1559 1560 @type described: bool 1561 @param described: specifies whether the array is described 1562 @type element_type: int 1563 @param element_type: the type of the array elements 1564 """ 1565 self._check(pn_data_put_array(self._data, described, element_type))
1566
1567 - def put_described(self):
1568 """ 1569 Puts a described value. A described node has two children, the 1570 descriptor and the value. These are specified by entering the node 1571 and putting the desired values. 1572 1573 >>> data = Data() 1574 >>> data.put_described() 1575 >>> data.enter() 1576 >>> data.put_symbol("value-descriptor") 1577 >>> data.put_string("the value") 1578 >>> data.exit() 1579 """ 1580 self._check(pn_data_put_described(self._data))
1581
1582 - def put_null(self):
1583 """ 1584 Puts a null value. 1585 """ 1586 self._check(pn_data_put_null(self._data))
1587
1588 - def put_bool(self, b):
1589 """ 1590 Puts a boolean value. 1591 1592 @param b: a boolean value 1593 """ 1594 self._check(pn_data_put_bool(self._data, b))
1595
1596 - def put_ubyte(self, ub):
1597 """ 1598 Puts an unsigned byte value. 1599 1600 @param ub: an integral value 1601 """ 1602 self._check(pn_data_put_ubyte(self._data, ub))
1603
1604 - def put_byte(self, b):
1605 """ 1606 Puts a signed byte value. 1607 1608 @param b: an integral value 1609 """ 1610 self._check(pn_data_put_byte(self._data, b))
1611
1612 - def put_ushort(self, us):
1613 """ 1614 Puts an unsigned short value. 1615 1616 @param us: an integral value. 1617 """ 1618 self._check(pn_data_put_ushort(self._data, us))
1619
1620 - def put_short(self, s):
1621 """ 1622 Puts a signed short value. 1623 1624 @param s: an integral value 1625 """ 1626 self._check(pn_data_put_short(self._data, s))
1627
1628 - def put_uint(self, ui):
1629 """ 1630 Puts an unsigned int value. 1631 1632 @param ui: an integral value 1633 """ 1634 self._check(pn_data_put_uint(self._data, ui))
1635
1636 - def put_int(self, i):
1637 """ 1638 Puts a signed int value. 1639 1640 @param i: an integral value 1641 """ 1642 self._check(pn_data_put_int(self._data, i))
1643
1644 - def put_char(self, c):
1645 """ 1646 Puts a char value. 1647 1648 @param c: a single character 1649 """ 1650 self._check(pn_data_put_char(self._data, ord(c)))
1651
1652 - def put_ulong(self, ul):
1653 """ 1654 Puts an unsigned long value. 1655 1656 @param ul: an integral value 1657 """ 1658 self._check(pn_data_put_ulong(self._data, ul))
1659
1660 - def put_long(self, l):
1661 """ 1662 Puts a signed long value. 1663 1664 @param l: an integral value 1665 """ 1666 self._check(pn_data_put_long(self._data, l))
1667
1668 - def put_timestamp(self, t):
1669 """ 1670 Puts a timestamp value. 1671 1672 @param t: an integral value 1673 """ 1674 self._check(pn_data_put_timestamp(self._data, t))
1675
1676 - def put_float(self, f):
1677 """ 1678 Puts a float value. 1679 1680 @param f: a floating point value 1681 """ 1682 self._check(pn_data_put_float(self._data, f))
1683
1684 - def put_double(self, d):
1685 """ 1686 Puts a double value. 1687 1688 @param d: a floating point value. 1689 """ 1690 self._check(pn_data_put_double(self._data, d))
1691
1692 - def put_decimal32(self, d):
1693 """ 1694 Puts a decimal32 value. 1695 1696 @param d: a decimal32 value 1697 """ 1698 self._check(pn_data_put_decimal32(self._data, d))
1699
1700 - def put_decimal64(self, d):
1701 """ 1702 Puts a decimal64 value. 1703 1704 @param d: a decimal64 value 1705 """ 1706 self._check(pn_data_put_decimal64(self._data, d))
1707
1708 - def put_decimal128(self, d):
1709 """ 1710 Puts a decimal128 value. 1711 1712 @param d: a decimal128 value 1713 """ 1714 self._check(pn_data_put_decimal128(self._data, d))
1715
1716 - def put_uuid(self, u):
1717 """ 1718 Puts a UUID value. 1719 1720 @param u: a uuid value 1721 """ 1722 self._check(pn_data_put_uuid(self._data, u.bytes))
1723
1724 - def put_binary(self, b):
1725 """ 1726 Puts a binary value. 1727 1728 @type b: binary 1729 @param b: a binary value 1730 """ 1731 self._check(pn_data_put_binary(self._data, b))
1732
1733 - def put_string(self, s):
1734 """ 1735 Puts a unicode value. 1736 1737 @type s: unicode 1738 @param s: a unicode value 1739 """ 1740 self._check(pn_data_put_string(self._data, s.encode("utf8")))
1741
1742 - def put_symbol(self, s):
1743 """ 1744 Puts a symbolic value. 1745 1746 @type s: string 1747 @param s: the symbol name 1748 """ 1749 self._check(pn_data_put_symbol(self._data, s))
1750
1751 - def get_list(self):
1752 """ 1753 If the current node is a list, return the number of elements, 1754 otherwise return zero. List elements can be accessed by entering 1755 the list. 1756 1757 >>> count = data.get_list() 1758 >>> data.enter() 1759 >>> for i in range(count): 1760 ... type = data.next() 1761 ... if type == Data.STRING: 1762 ... print data.get_string() 1763 ... elif type == ...: 1764 ... ... 1765 >>> data.exit() 1766 """ 1767 return pn_data_get_list(self._data)
1768
1769 - def get_map(self):
1770 """ 1771 If the current node is a map, return the number of child elements, 1772 otherwise return zero. Key value pairs can be accessed by entering 1773 the map. 1774 1775 >>> count = data.get_map() 1776 >>> data.enter() 1777 >>> for i in range(count/2): 1778 ... type = data.next() 1779 ... if type == Data.STRING: 1780 ... print data.get_string() 1781 ... elif type == ...: 1782 ... ... 1783 >>> data.exit() 1784 """ 1785 return pn_data_get_map(self._data)
1786
1787 - def get_array(self):
1788 """ 1789 If the current node is an array, return a tuple of the element 1790 count, a boolean indicating whether the array is described, and 1791 the type of each element, otherwise return (0, False, None). Array 1792 data can be accessed by entering the array. 1793 1794 >>> # read an array of strings with a symbolic descriptor 1795 >>> count, described, type = data.get_array() 1796 >>> data.enter() 1797 >>> data.next() 1798 >>> print "Descriptor:", data.get_symbol() 1799 >>> for i in range(count): 1800 ... data.next() 1801 ... print "Element:", data.get_string() 1802 >>> data.exit() 1803 """ 1804 count = pn_data_get_array(self._data) 1805 described = pn_data_is_array_described(self._data) 1806 type = pn_data_get_array_type(self._data) 1807 if type == -1: 1808 type = None 1809 return count, described, type
1810
1811 - def is_described(self):
1812 """ 1813 Checks if the current node is a described value. The descriptor 1814 and value may be accessed by entering the described value. 1815 1816 >>> # read a symbolically described string 1817 >>> assert data.is_described() # will error if the current node is not described 1818 >>> data.enter() 1819 >>> print data.get_symbol() 1820 >>> print data.get_string() 1821 >>> data.exit() 1822 """ 1823 return pn_data_is_described(self._data)
1824
1825 - def is_null(self):
1826 """ 1827 Checks if the current node is a null. 1828 """ 1829 return pn_data_is_null(self._data)
1830
1831 - def get_bool(self):
1832 """ 1833 If the current node is a boolean, returns its value, returns False 1834 otherwise. 1835 """ 1836 return pn_data_get_bool(self._data)
1837
1838 - def get_ubyte(self):
1839 """ 1840 If the current node is an unsigned byte, returns its value, 1841 returns 0 otherwise. 1842 """ 1843 return pn_data_get_ubyte(self._data)
1844
1845 - def get_byte(self):
1846 """ 1847 If the current node is a signed byte, returns its value, returns 0 1848 otherwise. 1849 """ 1850 return pn_data_get_byte(self._data)
1851
1852 - def get_ushort(self):
1853 """ 1854 If the current node is an unsigned short, returns its value, 1855 returns 0 otherwise. 1856 """ 1857 return pn_data_get_ushort(self._data)
1858
1859 - def get_short(self):
1860 """ 1861 If the current node is a signed short, returns its value, returns 1862 0 otherwise. 1863 """ 1864 return pn_data_get_short(self._data)
1865
1866 - def get_uint(self):
1867 """ 1868 If the current node is an unsigned int, returns its value, returns 1869 0 otherwise. 1870 """ 1871 return pn_data_get_uint(self._data)
1872
1873 - def get_int(self):
1874 """ 1875 If the current node is a signed int, returns its value, returns 0 1876 otherwise. 1877 """ 1878 return pn_data_get_int(self._data)
1879
1880 - def get_char(self):
1881 """ 1882 If the current node is a char, returns its value, returns 0 1883 otherwise. 1884 """ 1885 return char(unichr(pn_data_get_char(self._data)))
1886
1887 - def get_ulong(self):
1888 """ 1889 If the current node is an unsigned long, returns its value, 1890 returns 0 otherwise. 1891 """ 1892 return ulong(pn_data_get_ulong(self._data))
1893
1894 - def get_long(self):
1895 """ 1896 If the current node is an signed long, returns its value, returns 1897 0 otherwise. 1898 """ 1899 return pn_data_get_long(self._data)
1900
1901 - def get_timestamp(self):
1902 """ 1903 If the current node is a timestamp, returns its value, returns 0 1904 otherwise. 1905 """ 1906 return timestamp(pn_data_get_timestamp(self._data))
1907
1908 - def get_float(self):
1909 """ 1910 If the current node is a float, returns its value, raises 0 1911 otherwise. 1912 """ 1913 return pn_data_get_float(self._data)
1914
1915 - def get_double(self):
1916 """ 1917 If the current node is a double, returns its value, returns 0 1918 otherwise. 1919 """ 1920 return pn_data_get_double(self._data)
1921 1922 # XXX: need to convert
1923 - def get_decimal32(self):
1924 """ 1925 If the current node is a decimal32, returns its value, returns 0 1926 otherwise. 1927 """ 1928 return pn_data_get_decimal32(self._data)
1929 1930 # XXX: need to convert
1931 - def get_decimal64(self):
1932 """ 1933 If the current node is a decimal64, returns its value, returns 0 1934 otherwise. 1935 """ 1936 return pn_data_get_decimal64(self._data)
1937 1938 # XXX: need to convert
1939 - def get_decimal128(self):
1940 """ 1941 If the current node is a decimal128, returns its value, returns 0 1942 otherwise. 1943 """ 1944 return pn_data_get_decimal128(self._data)
1945
1946 - def get_uuid(self):
1947 """ 1948 If the current node is a UUID, returns its value, returns None 1949 otherwise. 1950 """ 1951 if pn_data_type(self._data) == Data.UUID: 1952 return uuid.UUID(bytes=pn_data_get_uuid(self._data)) 1953 else: 1954 return None
1955
1956 - def get_binary(self):
1957 """ 1958 If the current node is binary, returns its value, returns "" 1959 otherwise. 1960 """ 1961 return pn_data_get_binary(self._data)
1962
1963 - def get_string(self):
1964 """ 1965 If the current node is a string, returns its value, returns "" 1966 otherwise. 1967 """ 1968 return pn_data_get_string(self._data).decode("utf8")
1969
1970 - def get_symbol(self):
1971 """ 1972 If the current node is a symbol, returns its value, returns "" 1973 otherwise. 1974 """ 1975 return symbol(pn_data_get_symbol(self._data))
1976
1977 - def copy(self, src):
1978 self._check(pn_data_copy(self._data, src._data))
1979
1980 - def format(self):
1981 sz = 16 1982 while True: 1983 err, result = pn_data_format(self._data, sz) 1984 if err == PN_OVERFLOW: 1985 sz *= 2 1986 continue 1987 else: 1988 self._check(err) 1989 return result
1990
1991 - def dump(self):
1992 pn_data_dump(self._data)
1993
1994 - def put_dict(self, d):
1995 self.put_map() 1996 self.enter() 1997 try: 1998 for k, v in d.items(): 1999 self.put_object(k) 2000 self.put_object(v) 2001 finally: 2002 self.exit()
2003
2004 - def get_dict(self):
2005 if self.enter(): 2006 try: 2007 result = {} 2008 while self.next(): 2009 k = self.get_object() 2010 if self.next(): 2011 v = self.get_object() 2012 else: 2013 v = None 2014 result[k] = v 2015 finally: 2016 self.exit() 2017 return result
2018
2019 - def put_sequence(self, s):
2020 self.put_list() 2021 self.enter() 2022 try: 2023 for o in s: 2024 self.put_object(o) 2025 finally: 2026 self.exit()
2027
2028 - def get_sequence(self):
2029 if self.enter(): 2030 try: 2031 result = [] 2032 while self.next(): 2033 result.append(self.get_object()) 2034 finally: 2035 self.exit() 2036 return result
2037
2038 - def get_py_described(self):
2039 if self.enter(): 2040 try: 2041 self.next() 2042 descriptor = self.get_object() 2043 self.next() 2044 value = self.get_object() 2045 finally: 2046 self.exit() 2047 return Described(descriptor, value)
2048
2049 - def put_py_described(self, d):
2050 self.put_described() 2051 self.enter() 2052 try: 2053 self.put_object(d.descriptor) 2054 self.put_object(d.value) 2055 finally: 2056 self.exit()
2057
2058 - def get_py_array(self):
2059 """ 2060 If the current node is an array, return an Array object 2061 representing the array and its contents. Otherwise return None. 2062 This is a convenience wrapper around get_array, enter, etc. 2063 """ 2064 2065 count, described, type = self.get_array() 2066 if type is None: return None 2067 if self.enter(): 2068 try: 2069 if described: 2070 self.next() 2071 descriptor = self.get_object() 2072 else: 2073 descriptor = UNDESCRIBED 2074 elements = [] 2075 while self.next(): 2076 elements.append(self.get_object()) 2077 finally: 2078 self.exit() 2079 return Array(descriptor, type, *elements)
2080
2081 - def put_py_array(self, a):
2082 described = a.descriptor != UNDESCRIBED 2083 self.put_array(described, a.type) 2084 self.enter() 2085 try: 2086 if described: 2087 self.put_object(a.descriptor) 2088 for e in a.elements: 2089 self.put_object(e) 2090 finally: 2091 self.exit()
2092 2093 put_mappings = { 2094 None.__class__: lambda s, _: s.put_null(), 2095 bool: put_bool, 2096 dict: put_dict, 2097 list: put_sequence, 2098 tuple: put_sequence, 2099 unicode: put_string, 2100 bytes: put_binary, 2101 symbol: put_symbol, 2102 int: put_long, 2103 char: put_char, 2104 long: put_long, 2105 ulong: put_ulong, 2106 timestamp: put_timestamp, 2107 float: put_double, 2108 uuid.UUID: put_uuid, 2109 Described: put_py_described, 2110 Array: put_py_array 2111 } 2112 get_mappings = { 2113 NULL: lambda s: None, 2114 BOOL: get_bool, 2115 BYTE: get_byte, 2116 UBYTE: get_ubyte, 2117 SHORT: get_short, 2118 USHORT: get_ushort, 2119 INT: get_int, 2120 UINT: get_uint, 2121 CHAR: get_char, 2122 LONG: get_long, 2123 ULONG: get_ulong, 2124 TIMESTAMP: get_timestamp, 2125 FLOAT: get_float, 2126 DOUBLE: get_double, 2127 DECIMAL32: get_decimal32, 2128 DECIMAL64: get_decimal64, 2129 DECIMAL128: get_decimal128, 2130 UUID: get_uuid, 2131 BINARY: get_binary, 2132 STRING: get_string, 2133 SYMBOL: get_symbol, 2134 DESCRIBED: get_py_described, 2135 ARRAY: get_py_array, 2136 LIST: get_sequence, 2137 MAP: get_dict 2138 } 2139 2140
2141 - def put_object(self, obj):
2142 putter = self.put_mappings[obj.__class__] 2143 putter(self, obj)
2144
2145 - def get_object(self):
2146 type = self.type() 2147 if type is None: return None 2148 getter = self.get_mappings.get(type) 2149 if getter: 2150 return getter(self) 2151 else: 2152 return UnmappedType(str(type))
2153
2154 -class ConnectionException(ProtonException):
2155 pass
2156
2157 -class Endpoint(object):
2158 2159 LOCAL_UNINIT = PN_LOCAL_UNINIT 2160 REMOTE_UNINIT = PN_REMOTE_UNINIT 2161 LOCAL_ACTIVE = PN_LOCAL_ACTIVE 2162 REMOTE_ACTIVE = PN_REMOTE_ACTIVE 2163 LOCAL_CLOSED = PN_LOCAL_CLOSED 2164 REMOTE_CLOSED = PN_REMOTE_CLOSED 2165
2166 - def __init__(self):
2167 self.condition = None
2168
2169 - def _update_cond(self):
2170 obj2cond(self.condition, self._get_cond_impl())
2171 2172 @property
2173 - def remote_condition(self):
2174 return cond2obj(self._get_remote_cond_impl())
2175 2176 # the following must be provided by subclasses
2177 - def _get_cond_impl(self):
2178 assert False, "Subclass must override this!"
2179
2180 - def _get_remote_cond_impl(self):
2181 assert False, "Subclass must override this!"
2182
2183 -class Condition:
2184
2185 - def __init__(self, name, description=None, info=None):
2186 self.name = name 2187 self.description = description 2188 self.info = info
2189
2190 - def __repr__(self):
2191 return "Condition(%s)" % ", ".join([repr(x) for x in 2192 (self.name, self.description, self.info) 2193 if x])
2194
2195 - def __eq__(self, o):
2196 if not isinstance(o, Condition): return False 2197 return self.name == o.name and \ 2198 self.description == o.description and \ 2199 self.info == o.info
2200
2201 -def obj2cond(obj, cond):
2202 pn_condition_clear(cond) 2203 if obj: 2204 pn_condition_set_name(cond, str(obj.name)) 2205 pn_condition_set_description(cond, obj.description) 2206 info = Data(pn_condition_info(cond)) 2207 if obj.info: 2208 info.put_object(obj.info)
2209
2210 -def cond2obj(cond):
2211 if pn_condition_is_set(cond): 2212 return Condition(pn_condition_get_name(cond), 2213 pn_condition_get_description(cond), 2214 dat2obj(pn_condition_info(cond))) 2215 else: 2216 return None
2217
2218 -def dat2obj(dimpl):
2219 d = Data(dimpl) 2220 d.rewind() 2221 d.next() 2222 obj = d.get_object() 2223 d.rewind() 2224 return obj
2225
2226 -def obj2dat(obj, dimpl):
2227 if obj is not None: 2228 d = Data(dimpl) 2229 d.put_object(obj)
2230
2231 -class Connection(Endpoint):
2232 2233 @staticmethod
2234 - def _wrap_connection(c_conn):
2235 """Maintain only a single instance of this class for each Connection 2236 object that exists in the the C Engine. This is done by storing a (weak) 2237 reference to the python instance in the context field of the C object. 2238 """ 2239 if not c_conn: return None 2240 py_conn = pn_connection_get_context(c_conn) 2241 if py_conn: return py_conn 2242 wrapper = Connection(_conn=c_conn) 2243 return wrapper
2244
2245 - def __init__(self, _conn=None):
2246 Endpoint.__init__(self) 2247 if _conn: 2248 self._conn = _conn 2249 else: 2250 self._conn = pn_connection() 2251 pn_connection_set_context(self._conn, self) 2252 self.offered_capabilities = None 2253 self.desired_capabilities = None 2254 self.properties = None 2255 self._sessions = set()
2256
2257 - def __del__(self):
2258 if hasattr(self, "_conn") and self._conn: 2259 # pn_connection_free will release all child sessions in the C Engine, so 2260 # free all child python Sessions to avoid dangling references 2261 if hasattr(self, "_sessions") and self._sessions: 2262 for s in self._sessions: 2263 s._release() 2264 pn_connection_set_context(self._conn, None) 2265 pn_connection_free(self._conn)
2266
2267 - def _check(self, err):
2268 if err < 0: 2269 exc = EXCEPTIONS.get(err, ConnectionException) 2270 raise exc("[%s]: %s" % (err, pn_connection_error(self._conn))) 2271 else: 2272 return err
2273
2274 - def _get_cond_impl(self):
2275 return pn_connection_condition(self._conn)
2276
2277 - def _get_remote_cond_impl(self):
2278 return pn_connection_remote_condition(self._conn)
2279
2280 - def collect(self, collector):
2281 if collector is None: 2282 pn_connection_collect(self._conn, None) 2283 else: 2284 pn_connection_collect(self._conn, collector._impl) 2285 # XXX: we can't let coll go out of scope or the connection will be 2286 # pointing to garbage 2287 self._collector = collector
2288
2289 - def _get_container(self):
2290 return pn_connection_get_container(self._conn)
2291 - def _set_container(self, name):
2292 return pn_connection_set_container(self._conn, name)
2293 2294 container = property(_get_container, _set_container) 2295
2296 - def _get_hostname(self):
2297 return pn_connection_get_hostname(self._conn)
2298 - def _set_hostname(self, name):
2299 return pn_connection_set_hostname(self._conn, name)
2300 2301 hostname = property(_get_hostname, _set_hostname) 2302 2303 @property
2304 - def remote_container(self):
2305 return pn_connection_remote_container(self._conn)
2306 2307 @property
2308 - def remote_hostname(self):
2309 return pn_connection_remote_hostname(self._conn)
2310 2311 @property
2313 return dat2obj(pn_connection_remote_offered_capabilities(self._conn))
2314 2315 @property
2317 return dat2obj(pn_connection_remote_desired_capabilities(self._conn))
2318 2319 @property
2320 - def remote_properties(self):
2321 return dat2obj(pn_connection_remote_properties(self._conn))
2322
2323 - def open(self):
2324 obj2dat(self.offered_capabilities, 2325 pn_connection_offered_capabilities(self._conn)) 2326 obj2dat(self.desired_capabilities, 2327 pn_connection_desired_capabilities(self._conn)) 2328 obj2dat(self.properties, pn_connection_properties(self._conn)) 2329 pn_connection_open(self._conn)
2330
2331 - def close(self):
2332 self._update_cond() 2333 pn_connection_close(self._conn)
2334 2335 @property
2336 - def state(self):
2337 return pn_connection_state(self._conn)
2338
2339 - def session(self):
2340 return Session._wrap_session(pn_session(self._conn))
2341
2342 - def session_head(self, mask):
2343 return Session._wrap_session(pn_session_head(self._conn, mask))
2344 2347 2348 @property
2349 - def work_head(self):
2350 return Delivery._wrap_delivery(pn_work_head(self._conn))
2351 2352 @property
2353 - def error(self):
2354 return pn_error_code(pn_connection_error(self._conn))
2355
2356 -class SessionException(ProtonException):
2357 pass
2358
2359 -class Session(Endpoint):
2360 2361 @staticmethod
2362 - def _wrap_session(c_ssn):
2363 """Maintain only a single instance of this class for each Session object that 2364 exists in the C Engine. 2365 """ 2366 if c_ssn is None: return None 2367 py_ssn = pn_session_get_context(c_ssn) 2368 if py_ssn: return py_ssn 2369 wrapper = Session(c_ssn) 2370 return wrapper
2371
2372 - def __init__(self, ssn):
2373 Endpoint.__init__(self) 2374 self._ssn = ssn 2375 pn_session_set_context(self._ssn, self) 2376 self._links = set() 2377 self.connection._sessions.add(self)
2378
2379 - def _release(self):
2380 """Release the underlying C Engine resource.""" 2381 if self._ssn: 2382 # pn_session_free will release all child links in the C Engine, so free 2383 # all child python Links to avoid dangling references 2384 for l in self._links: 2385 l._release() 2386 pn_session_set_context(self._ssn, None) 2387 pn_session_free(self._ssn) 2388 self._ssn = None
2389
2390 - def free(self):
2391 """Release the Session, freeing its resources. 2392 2393 Call this when you no longer need the session. This will allow the 2394 session's resources to be reclaimed. Once called, you should no longer 2395 reference the session. 2396 2397 """ 2398 self.connection._sessions.remove(self) 2399 self._release()
2400
2401 - def _get_cond_impl(self):
2402 return pn_session_condition(self._ssn)
2403
2404 - def _get_remote_cond_impl(self):
2405 return pn_session_remote_condition(self._ssn)
2406
2407 - def _get_incoming_capacity(self):
2408 return pn_session_get_incoming_capacity(self._ssn)
2409
2410 - def _set_incoming_capacity(self, capacity):
2411 pn_session_set_incoming_capacity(self._ssn, capacity)
2412 2413 incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) 2414 2415 @property
2416 - def outgoing_bytes(self):
2417 return pn_session_outgoing_bytes(self._ssn)
2418 2419 @property
2420 - def incoming_bytes(self):
2421 return pn_session_incoming_bytes(self._ssn)
2422
2423 - def open(self):
2424 pn_session_open(self._ssn)
2425
2426 - def close(self):
2427 self._update_cond() 2428 pn_session_close(self._ssn)
2429
2430 - def next(self, mask):
2431 return Session._wrap_session(pn_session_next(self._ssn, mask))
2432 2433 @property
2434 - def state(self):
2435 return pn_session_state(self._ssn)
2436 2437 @property
2438 - def connection(self):
2439 return Connection._wrap_connection(pn_session_connection(self._ssn))
2440
2441 - def sender(self, name):
2442 return Link._wrap_link(pn_sender(self._ssn, name))
2443
2444 - def receiver(self, name):
2445 return Link._wrap_link(pn_receiver(self._ssn, name))
2446
2447 -class LinkException(ProtonException):
2448 pass
2449 2602
2603 -class Terminus(object):
2604 2605 UNSPECIFIED = PN_UNSPECIFIED 2606 SOURCE = PN_SOURCE 2607 TARGET = PN_TARGET 2608 COORDINATOR = PN_COORDINATOR 2609 2610 NONDURABLE = PN_NONDURABLE 2611 CONFIGURATION = PN_CONFIGURATION 2612 DELIVERIES = PN_DELIVERIES 2613 2614 DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED 2615 DIST_MODE_COPY = PN_DIST_MODE_COPY 2616 DIST_MODE_MOVE = PN_DIST_MODE_MOVE 2617
2618 - def __init__(self, impl):
2619 self._impl = impl
2620
2621 - def _check(self, err):
2622 if err < 0: 2623 exc = EXCEPTIONS.get(err, LinkException) 2624 raise exc("[%s]" % err) 2625 else: 2626 return err
2627
2628 - def _get_type(self):
2629 return pn_terminus_get_type(self._impl)
2630 - def _set_type(self, type):
2631 self._check(pn_terminus_set_type(self._impl, type))
2632 type = property(_get_type, _set_type) 2633
2634 - def _get_address(self):
2635 return pn_terminus_get_address(self._impl)
2636 - def _set_address(self, address):
2637 self._check(pn_terminus_set_address(self._impl, address))
2638 address = property(_get_address, _set_address) 2639
2640 - def _get_durability(self):
2641 return pn_terminus_get_durability(self._impl)
2642 - def _set_durability(self, seconds):
2643 self._check(pn_terminus_set_durability(self._impl, seconds))
2644 durability = property(_get_durability, _set_durability) 2645
2646 - def _get_expiry_policy(self):
2647 return pn_terminus_get_expiry_policy(self._impl)
2648 - def _set_expiry_policy(self, seconds):
2649 self._check(pn_terminus_set_expiry_policy(self._impl, seconds))
2650 expiry_policy = property(_get_expiry_policy, _set_expiry_policy) 2651
2652 - def _get_timeout(self):
2653 return pn_terminus_get_timeout(self._impl)
2654 - def _set_timeout(self, seconds):
2655 self._check(pn_terminus_set_timeout(self._impl, seconds))
2656 timeout = property(_get_timeout, _set_timeout) 2657
2658 - def _is_dynamic(self):
2659 return pn_terminus_is_dynamic(self._impl)
2660 - def _set_dynamic(self, dynamic):
2661 self._check(pn_terminus_set_dynamic(self._impl, dynamic))
2662 dynamic = property(_is_dynamic, _set_dynamic) 2663
2664 - def _get_distribution_mode(self):
2665 return pn_terminus_get_distribution_mode(self._impl)
2666 - def _set_distribution_mode(self, mode):
2667 self._check(pn_terminus_set_distribution_mode(self._impl, mode))
2668 distribution_mode = property(_get_distribution_mode, _set_distribution_mode) 2669 2670 @property
2671 - def properties(self):
2672 return Data(pn_terminus_properties(self._impl))
2673 2674 @property
2675 - def capabilities(self):
2676 return Data(pn_terminus_capabilities(self._impl))
2677 2678 @property
2679 - def outcomes(self):
2680 return Data(pn_terminus_outcomes(self._impl))
2681 2682 @property
2683 - def filter(self):
2684 return Data(pn_terminus_filter(self._impl))
2685
2686 - def copy(self, src):
2687 self._check(pn_terminus_copy(self._impl, src._impl))
2688
2689 -class Sender(Link):
2690
2691 - def __init__(self, c_link):
2692 super(Sender, self).__init__(c_link)
2693
2694 - def offered(self, n):
2695 pn_link_offered(self._link, n)
2696
2697 - def send(self, bytes):
2698 return self._check(pn_link_send(self._link, bytes))
2699
2700 -class Receiver(Link):
2701
2702 - def __init__(self, c_link):
2703 super(Receiver, self).__init__(c_link)
2704
2705 - def flow(self, n):
2706 pn_link_flow(self._link, n)
2707
2708 - def recv(self, limit):
2709 n, bytes = pn_link_recv(self._link, limit) 2710 if n == PN_EOS: 2711 return None 2712 else: 2713 self._check(n) 2714 return bytes
2715
2716 - def drain(self, n):
2717 pn_link_drain(self._link, n)
2718
2719 - def draining(self):
2720 return pn_link_draining(self._link)
2721
2722 -class Disposition(object):
2723 2724 RECEIVED = PN_RECEIVED 2725 ACCEPTED = PN_ACCEPTED 2726 REJECTED = PN_REJECTED 2727 RELEASED = PN_RELEASED 2728 MODIFIED = PN_MODIFIED 2729
2730 - def __init__(self, impl, local):
2731 self._impl = impl 2732 self.local = local 2733 self._data = None 2734 self._condition = None 2735 self._annotations = None
2736 2737 @property
2738 - def type(self):
2739 return pn_disposition_type(self._impl)
2740
2741 - def _get_section_number(self):
2742 return pn_disposition_get_section_number(self._impl)
2743 - def _set_section_number(self, n):
2744 pn_disposition_set_section_number(self._impl, n)
2745 section_number = property(_get_section_number, _set_section_number) 2746
2747 - def _get_section_offset(self):
2748 return pn_disposition_get_section_offset(self._impl)
2749 - def _set_section_offset(self, n):
2750 pn_disposition_set_section_offset(self._impl, n)
2751 section_offset = property(_get_section_offset, _set_section_offset) 2752
2753 - def _get_failed(self):
2754 return pn_disposition_is_failed(self._impl)
2755 - def _set_failed(self, b):
2756 pn_disposition_set_failed(self._impl, b)
2757 failed = property(_get_failed, _set_failed) 2758
2759 - def _get_undeliverable(self):
2760 return pn_disposition_is_undeliverable(self._impl)
2761 - def _set_undeliverable(self, b):
2762 pn_disposition_set_undeliverable(self._impl, b)
2763 undeliverable = property(_get_undeliverable, _set_undeliverable) 2764
2765 - def _get_data(self):
2766 if self.local: 2767 return self._data 2768 else: 2769 return dat2obj(pn_disposition_data(self._impl))
2770 - def _set_data(self, obj):
2771 if self.local: 2772 self._data = obj 2773 else: 2774 raise AttributeError("data attribute is read-only")
2775 data = property(_get_data, _set_data) 2776
2777 - def _get_annotations(self):
2778 if self.local: 2779 return self._annotations 2780 else: 2781 return dat2obj(pn_disposition_annotations(self._impl))
2782 - def _set_annotations(self, obj):
2783 if self.local: 2784 self._annotations = obj 2785 else: 2786 raise AttributeError("annotations attribute is read-only")
2787 annotations = property(_get_annotations, _set_annotations) 2788
2789 - def _get_condition(self):
2790 if self.local: 2791 return self._condition 2792 else: 2793 return cond2obj(pn_disposition_condition(self._impl))
2794 - def _set_condition(self, obj):
2795 if self.local: 2796 self._condition = obj 2797 else: 2798 raise AttributeError("condition attribute is read-only")
2799 condition = property(_get_condition, _set_condition)
2800
2801 -class Delivery(object):
2802 2803 RECEIVED = Disposition.RECEIVED 2804 ACCEPTED = Disposition.ACCEPTED 2805 REJECTED = Disposition.REJECTED 2806 RELEASED = Disposition.RELEASED 2807 MODIFIED = Disposition.MODIFIED 2808 2809 @staticmethod
2810 - def _wrap_delivery(c_dlv):
2811 """Maintain only a single instance of this class for each Delivery object that 2812 exists in the C Engine. 2813 """ 2814 if not c_dlv: return None 2815 py_dlv = pn_delivery_get_context(c_dlv) 2816 if py_dlv: return py_dlv 2817 wrapper = Delivery(c_dlv) 2818 return wrapper
2819
2820 - def __init__(self, dlv):
2821 self._dlv = dlv 2822 pn_delivery_set_context(self._dlv, self) 2823 self.local = Disposition(pn_delivery_local(self._dlv), True) 2824 self.remote = Disposition(pn_delivery_remote(self._dlv), False) 2825 self.link._deliveries.add(self)
2826
2827 - def _release(self):
2828 """Release the underlying C Engine resource.""" 2829 if self._dlv: 2830 pn_delivery_set_context(self._dlv, None) 2831 pn_delivery_settle(self._dlv) 2832 self._dlv = None
2833 2834 @property
2835 - def tag(self):
2836 return pn_delivery_tag(self._dlv)
2837 2838 @property
2839 - def writable(self):
2840 return pn_delivery_writable(self._dlv)
2841 2842 @property
2843 - def readable(self):
2844 return pn_delivery_readable(self._dlv)
2845 2846 @property
2847 - def updated(self):
2848 return pn_delivery_updated(self._dlv)
2849
2850 - def update(self, state):
2851 obj2dat(self.local._data, pn_disposition_data(self.local._impl)) 2852 obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) 2853 obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) 2854 pn_delivery_update(self._dlv, state)
2855 2856 @property
2857 - def pending(self):
2858 return pn_delivery_pending(self._dlv)
2859 2860 @property
2861 - def partial(self):
2862 return pn_delivery_partial(self._dlv)
2863 2864 @property
2865 - def local_state(self):
2866 return pn_delivery_local_state(self._dlv)
2867 2868 @property
2869 - def remote_state(self):
2870 return pn_delivery_remote_state(self._dlv)
2871 2872 @property
2873 - def settled(self):
2874 return pn_delivery_settled(self._dlv)
2875
2876 - def settle(self):
2877 """Release the delivery""" 2878 self.link._deliveries.remove(self) 2879 self._release()
2880 2881 @property
2882 - def work_next(self):
2883 return Delivery._wrap_delivery(pn_work_next(self._dlv))
2884 2885 @property
2888
2889 -class TransportException(ProtonException):
2890 pass
2891
2892 -class Transport(object):
2893 2894 TRACE_DRV = PN_TRACE_DRV 2895 TRACE_FRM = PN_TRACE_FRM 2896 TRACE_RAW = PN_TRACE_RAW 2897
2898 - def __init__(self, _trans=None):
2899 if not _trans: 2900 self._trans = pn_transport() 2901 else: 2902 self._shared_trans = True 2903 self._trans = _trans 2904 self._sasl = None 2905 self._ssl = None
2906
2907 - def __del__(self):
2908 if hasattr(self, "_trans"): 2909 if not hasattr(self, "_shared_trans"): 2910 pn_transport_free(self._trans) 2911 if hasattr(self, "_sasl") and self._sasl: 2912 # pn_transport_free deallocs the C sasl associated with the 2913 # transport, so erase the reference if a SASL object was used. 2914 self._sasl._sasl = None 2915 self._sasl = None 2916 if hasattr(self, "_ssl") and self._ssl: 2917 # ditto the owned c SSL object 2918 self._ssl._ssl = None 2919 self._ssl = None 2920 del self._trans
2921
2922 - def _check(self, err):
2923 if err < 0: 2924 exc = EXCEPTIONS.get(err, TransportException) 2925 raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._trans)))) 2926 else: 2927 return err
2928
2929 - def bind(self, connection):
2930 """Assign a connection to the transport""" 2931 self._check(pn_transport_bind(self._trans, connection._conn)) 2932 # keep python connection from being garbage collected: 2933 self._connection = connection
2934
2935 - def unbind(self):
2936 """Release the connection""" 2937 self._check(pn_transport_unbind(self._trans)) 2938 self._connection = None
2939
2940 - def trace(self, n):
2941 pn_transport_trace(self._trans, n)
2942
2943 - def tick(self, now):
2944 """Process any timed events (like heartbeat generation). 2945 now = seconds since epoch (float). 2946 """ 2947 next = pn_transport_tick(self._trans, long(now * 1000)) 2948 return float(next) / 1000.0
2949
2950 - def capacity(self):
2951 c = pn_transport_capacity(self._trans) 2952 if c >= PN_EOS: 2953 return c 2954 else: 2955 return self._check(c)
2956
2957 - def push(self, bytes):
2958 self._check(pn_transport_push(self._trans, bytes))
2959
2960 - def close_tail(self):
2961 self._check(pn_transport_close_tail(self._trans))
2962
2963 - def pending(self):
2964 p = pn_transport_pending(self._trans) 2965 if p >= PN_EOS: 2966 return p 2967 else: 2968 return self._check(p)
2969
2970 - def peek(self, size):
2971 cd, out = pn_transport_peek(self._trans, size) 2972 if cd == PN_EOS: 2973 return None 2974 else: 2975 self._check(cd) 2976 return out
2977
2978 - def pop(self, size):
2979 pn_transport_pop(self._trans, size)
2980
2981 - def close_head(self):
2982 self._check(pn_transport_close_head(self._trans))
2983
2984 - def output(self, size):
2985 p = self.pending() 2986 if p < 0: 2987 return None 2988 else: 2989 out = self.peek(min(size, p)) 2990 self.pop(len(out)) 2991 return out
2992
2993 - def input(self, bytes):
2994 if not bytes: 2995 self.close_tail() 2996 return None 2997 else: 2998 c = self.capacity() 2999 if (c < 0): 3000 return None 3001 trimmed = bytes[:c] 3002 self.push(trimmed) 3003 return len(trimmed)
3004 3005 # AMQP 1.0 max-frame-size
3006 - def _get_max_frame_size(self):
3007 return pn_transport_get_max_frame(self._trans)
3008
3009 - def _set_max_frame_size(self, value):
3010 pn_transport_set_max_frame(self._trans, value)
3011 3012 max_frame_size = property(_get_max_frame_size, _set_max_frame_size, 3013 doc=""" 3014 Sets the maximum size for received frames (in bytes). 3015 """) 3016 3017 @property
3018 - def remote_max_frame_size(self):
3019 return pn_transport_get_remote_max_frame(self._trans)
3020
3021 - def _get_channel_max(self):
3022 return pn_transport_get_channel_max(self._trans)
3023
3024 - def _set_channel_max(self, value):
3025 pn_transport_set_channel_max(self._trans, value)
3026 3027 channel_max = property(_get_channel_max, _set_channel_max, 3028 doc=""" 3029 Sets the maximum channel that may be used on the transport. 3030 """) 3031 3032 @property
3033 - def remote_channel_max(self):
3034 return pn_transport_remote_channel_max(self._trans)
3035 3036 # AMQP 1.0 idle-time-out
3037 - def _get_idle_timeout(self):
3038 msec = pn_transport_get_idle_timeout(self._trans) 3039 return float(msec)/1000.0
3040
3041 - def _set_idle_timeout(self, sec):
3042 pn_transport_set_idle_timeout(self._trans, long(sec * 1000))
3043 3044 idle_timeout = property(_get_idle_timeout, _set_idle_timeout, 3045 doc=""" 3046 The idle timeout of the connection (float, in seconds). 3047 """) 3048 3049 @property
3050 - def remote_idle_timeout(self):
3051 msec = pn_transport_get_remote_idle_timeout(self._trans) 3052 return float(msec)/1000.0
3053 3054 @property
3055 - def frames_output(self):
3056 return pn_transport_get_frames_output(self._trans)
3057 3058 @property
3059 - def frames_input(self):
3060 return pn_transport_get_frames_input(self._trans)
3061
3062 - def sasl(self):
3063 # SASL factory (singleton for this transport) 3064 if not self._sasl: 3065 self._sasl = SASL(self) 3066 return self._sasl
3067
3068 - def ssl(self, domain=None, session_details=None):
3069 # SSL factory (singleton for this transport) 3070 if not self._ssl: 3071 self._ssl = SSL(self, domain, session_details) 3072 return self._ssl
3073
3074 -class SASLException(TransportException):
3075 pass
3076
3077 -class SASL(object):
3078 3079 OK = PN_SASL_OK 3080 AUTH = PN_SASL_AUTH 3081
3082 - def __new__(cls, transport):
3083 """Enforce a singleton SASL object per Transport""" 3084 if not transport._sasl: 3085 obj = super(SASL, cls).__new__(cls) 3086 obj._sasl = pn_sasl(transport._trans) 3087 transport._sasl = obj 3088 return transport._sasl
3089
3090 - def _check(self, err):
3091 if err < 0: 3092 exc = EXCEPTIONS.get(err, SASLException) 3093 raise exc("[%s]" % (err)) 3094 else: 3095 return err
3096
3097 - def mechanisms(self, mechs):
3098 pn_sasl_mechanisms(self._sasl, mechs)
3099
3100 - def client(self):
3101 pn_sasl_client(self._sasl)
3102
3103 - def server(self):
3104 pn_sasl_server(self._sasl)
3105
3106 - def plain(self, user, password):
3107 pn_sasl_plain(self._sasl, user, password)
3108
3109 - def send(self, data):
3110 self._check(pn_sasl_send(self._sasl, data, len(data)))
3111
3112 - def recv(self):
3113 sz = 16 3114 while True: 3115 n, data = pn_sasl_recv(self._sasl, sz) 3116 if n == PN_OVERFLOW: 3117 sz *= 2 3118 continue 3119 elif n == PN_EOS: 3120 return None 3121 else: 3122 self._check(n) 3123 return data
3124 3125 @property
3126 - def outcome(self):
3127 outcome = pn_sasl_outcome(self._sasl) 3128 if outcome == PN_SASL_NONE: 3129 return None 3130 else: 3131 return outcome
3132
3133 - def done(self, outcome):
3134 pn_sasl_done(self._sasl, outcome)
3135 3136 STATE_CONF = PN_SASL_CONF 3137 STATE_IDLE = PN_SASL_IDLE 3138 STATE_STEP = PN_SASL_STEP 3139 STATE_PASS = PN_SASL_PASS 3140 STATE_FAIL = PN_SASL_FAIL 3141 3142 @property
3143 - def state(self):
3144 return pn_sasl_state(self._sasl)
3145
3146 3147 -class SSLException(TransportException):
3148 pass
3149
3150 -class SSLUnavailable(SSLException):
3151 pass
3152
3153 -class SSLDomain(object):
3154 3155 MODE_CLIENT = PN_SSL_MODE_CLIENT 3156 MODE_SERVER = PN_SSL_MODE_SERVER 3157 VERIFY_PEER = PN_SSL_VERIFY_PEER 3158 VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME 3159 ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER 3160
3161 - def __init__(self, mode):
3162 self._domain = pn_ssl_domain(mode) 3163 if self._domain is None: 3164 raise SSLUnavailable()
3165
3166 - def _check(self, err):
3167 if err < 0: 3168 exc = EXCEPTIONS.get(err, SSLException) 3169 raise exc("SSL failure.") 3170 else: 3171 return err
3172
3173 - def set_credentials(self, cert_file, key_file, password):
3174 return self._check( pn_ssl_domain_set_credentials(self._domain, 3175 cert_file, key_file, 3176 password) )
3177 - def set_trusted_ca_db(self, certificate_db):
3178 return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain, 3179 certificate_db) )
3180 - def set_peer_authentication(self, verify_mode, trusted_CAs=None):
3181 return self._check( pn_ssl_domain_set_peer_authentication(self._domain, 3182 verify_mode, 3183 trusted_CAs) )
3184
3185 - def allow_unsecured_client(self):
3186 return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) )
3187
3188 -class SSL(object):
3189
3190 - def _check(self, err):
3191 if err < 0: 3192 exc = EXCEPTIONS.get(err, SSLException) 3193 raise exc("SSL failure.") 3194 else: 3195 return err
3196
3197 - def __new__(cls, transport, domain, session_details=None):
3198 """Enforce a singleton SSL object per Transport""" 3199 if transport._ssl: 3200 # unfortunately, we've combined the allocation and the configuration in a 3201 # single step. So catch any attempt by the application to provide what 3202 # may be a different configuration than the original (hack) 3203 ssl = transport._ssl 3204 if (domain and (ssl._domain is not domain) or 3205 session_details and (ssl._session_details is not session_details)): 3206 raise SSLException("Cannot re-configure existing SSL object!") 3207 else: 3208 obj = super(SSL, cls).__new__(cls) 3209 obj._domain = domain 3210 obj._session_details = session_details 3211 session_id = None 3212 if session_details: 3213 session_id = session_details.get_session_id() 3214 obj._ssl = pn_ssl( transport._trans ) 3215 if obj._ssl is None: 3216 raise SSLUnavailable() 3217 pn_ssl_init( obj._ssl, domain._domain, session_id ) 3218 transport._ssl = obj 3219 return transport._ssl
3220
3221 - def cipher_name(self):
3222 rc, name = pn_ssl_get_cipher_name( self._ssl, 128 ) 3223 if rc: 3224 return name 3225 return None
3226
3227 - def protocol_name(self):
3228 rc, name = pn_ssl_get_protocol_name( self._ssl, 128 ) 3229 if rc: 3230 return name 3231 return None
3232 3233 RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN 3234 RESUME_NEW = PN_SSL_RESUME_NEW 3235 RESUME_REUSED = PN_SSL_RESUME_REUSED 3236
3237 - def resume_status(self):
3238 return pn_ssl_resume_status( self._ssl )
3239
3240 - def _set_peer_hostname(self, hostname):
3241 self._check(pn_ssl_set_peer_hostname( self._ssl, hostname ))
3242 - def _get_peer_hostname(self):
3243 err, name = pn_ssl_get_peer_hostname( self._ssl, 1024 ) 3244 self._check(err) 3245 return name
3246 peer_hostname = property(_get_peer_hostname, _set_peer_hostname, 3247 doc=""" 3248 Manage the expected name of the remote peer. Used to authenticate the remote. 3249 """)
3250
3251 3252 -class SSLSessionDetails(object):
3253 """ Unique identifier for the SSL session. Used to resume previous session on a new 3254 SSL connection. 3255 """ 3256
3257 - def __init__(self, session_id):
3258 self._session_id = session_id
3259
3260 - def get_session_id(self):
3261 return self._session_id
3262
3263 3264 -class Collector:
3265
3266 - def __init__(self):
3267 self._impl = pn_collector()
3268
3269 - def peek(self):
3270 event = pn_collector_peek(self._impl) 3271 if event is None: 3272 return None 3273 3274 tpi = pn_event_transport(event) 3275 if tpi: 3276 tp = Transport(tpi) 3277 else: 3278 tp = None 3279 return Event(type=pn_event_type(event), 3280 category=pn_event_category(event), 3281 connection=Connection._wrap_connection(pn_event_connection(event)), 3282 session=Session._wrap_session(pn_event_session(event)), 3283 link=Link._wrap_link(pn_event_link(event)), 3284 delivery=Delivery._wrap_delivery(pn_event_delivery(event)), 3285 transport=tp)
3286
3287 - def pop(self):
3288 pn_collector_pop(self._impl)
3289
3290 - def __del__(self):
3291 pn_collector_free(self._impl)
3292
3293 -class Event:
3294 3295 CATEGORY_PROTOCOL = PN_EVENT_CATEGORY_PROTOCOL 3296 3297 CONNECTION_LOCAL_STATE = PN_CONNECTION_LOCAL_STATE 3298 CONNECTION_REMOTE_STATE = PN_CONNECTION_REMOTE_STATE 3299 SESSION_LOCAL_STATE = PN_SESSION_LOCAL_STATE 3300 SESSION_REMOTE_STATE = PN_SESSION_REMOTE_STATE 3301 LINK_LOCAL_STATE = PN_LINK_LOCAL_STATE 3302 LINK_REMOTE_STATE = PN_LINK_REMOTE_STATE 3303 LINK_FLOW = PN_LINK_FLOW 3304 DELIVERY = PN_DELIVERY 3305 TRANSPORT = PN_TRANSPORT 3306
3307 - def __init__(self, type, category, 3308 connection, session, link, delivery, transport):
3309 self.type = type 3310 self.category = category 3311 self.connection = connection 3312 self.session = session 3313 self.link = link 3314 self.delivery = delivery 3315 self.transport = transport
3316
3317 - def __repr__(self):
3318 objects = [self.connection, self.session, self.link, self.delivery, 3319 self.transport] 3320 return "%s(%s)" % (pn_event_type_name(self.type), 3321 ", ".join([str(o) for o in objects if o is not None]))
3322
3323 ### 3324 # Driver 3325 ### 3326 3327 -class DriverException(ProtonException):
3328 """ 3329 The DriverException class is the root of the driver exception hierarchy. 3330 """ 3331 pass
3332
3333 -class Connector(object):
3334 3335 @staticmethod
3336 - def _wrap_connector(c_cxtr, py_driver=None):
3337 """Maintain only a single instance of this class for each Connector object that 3338 exists in the C Driver. 3339 """ 3340 if not c_cxtr: return None 3341 py_cxtr = pn_connector_context(c_cxtr) 3342 if py_cxtr: return py_cxtr 3343 wrapper = Connector(_cxtr=c_cxtr, _py_driver=py_driver) 3344 return wrapper
3345
3346 - def __init__(self, _cxtr, _py_driver):
3347 self._cxtr = _cxtr 3348 assert(_py_driver) 3349 self._driver = weakref.ref(_py_driver) 3350 pn_connector_set_context(self._cxtr, self) 3351 self._connection = None 3352 self._driver()._connectors.add(self)
3353
3354 - def _release(self):
3355 """Release the underlying C Engine resource.""" 3356 if self._cxtr: 3357 pn_connector_set_context(self._cxtr, None) 3358 pn_connector_free(self._cxtr) 3359 self._cxtr = None
3360
3361 - def free(self):
3362 """Release the Connector, freeing its resources. 3363 3364 Call this when you no longer need the Connector. This will allow the 3365 connector's resources to be reclaimed. Once called, you should no longer 3366 reference this connector. 3367 3368 """ 3369 self.connection = None 3370 d = self._driver() 3371 if d: d._connectors.remove(self) 3372 self._release()
3373
3374 - def next(self):
3375 return Connector._wrap_connector(pn_connector_next(self._cxtr))
3376
3377 - def process(self):
3378 pn_connector_process(self._cxtr)
3379
3380 - def listener(self):
3381 return Listener._wrap_listener(pn_connector_listener(self._cxtr))
3382
3383 - def sasl(self):
3384 ## seems easier just to grab the SASL associated with the transport: 3385 trans = self.transport 3386 if trans: 3387 return SASL(self.transport) 3388 return None
3389 3390 @property
3391 - def transport(self):
3392 trans = pn_connector_transport(self._cxtr) 3393 if trans: 3394 return Transport(trans) 3395 return None
3396
3397 - def close(self):
3398 return pn_connector_close(self._cxtr)
3399 3400 @property
3401 - def closed(self):
3402 return pn_connector_closed(self._cxtr)
3403
3404 - def _get_connection(self):
3405 return self._connection
3406
3407 - def _set_connection(self, conn):
3408 if conn: 3409 pn_connector_set_connection(self._cxtr, conn._conn) 3410 else: 3411 pn_connector_set_connection(self._cxtr, None) 3412 self._connection = conn
3413 3414 3415 connection = property(_get_connection, _set_connection, 3416 doc=""" 3417 Associate a Connection with this Connector. 3418 """)
3419
3420 -class Listener(object):
3421 3422 @staticmethod
3423 - def _wrap_listener(c_lsnr, py_driver=None):
3424 """Maintain only a single instance of this class for each Listener object that 3425 exists in the C Driver. 3426 """ 3427 if not c_lsnr: return None 3428 py_lsnr = pn_listener_context(c_lsnr) 3429 if py_lsnr: return py_lsnr 3430 wrapper = Listener(_lsnr=c_lsnr, _py_driver=py_driver) 3431 return wrapper
3432
3433 - def __init__(self, _lsnr, _py_driver):
3434 self._lsnr = _lsnr 3435 assert(_py_driver) 3436 self._driver = weakref.ref(_py_driver) 3437 pn_listener_set_context(self._lsnr, self) 3438 self._driver()._listeners.add(self)
3439
3440 - def _release(self):
3441 """Release the underlying C Engine resource.""" 3442 if self._lsnr: 3443 pn_listener_set_context(self._lsnr, None); 3444 pn_listener_free(self._lsnr) 3445 self._lsnr = None
3446
3447 - def free(self):
3448 """Release the Listener, freeing its resources""" 3449 d = self._driver() 3450 if d: d._listeners.remove(self) 3451 self._release()
3452
3453 - def next(self):
3454 return Listener._wrap_listener(pn_listener_next(self._lsnr))
3455
3456 - def accept(self):
3457 d = self._driver() 3458 if d: 3459 cxtr = pn_listener_accept(self._lsnr) 3460 c = Connector._wrap_connector(cxtr, d) 3461 return c 3462 return None
3463
3464 - def close(self):
3465 pn_listener_close(self._lsnr)
3466
3467 -class Driver(object):
3468 - def __init__(self):
3469 self._driver = pn_driver() 3470 self._listeners = set() 3471 self._connectors = set()
3472
3473 - def __del__(self):
3474 # freeing the driver will release all child objects in the C Engine, so 3475 # clean up their references in the corresponding Python objects 3476 for c in self._connectors: 3477 c._release() 3478 for l in self._listeners: 3479 l._release() 3480 if hasattr(self, "_driver") and self._driver: 3481 pn_driver_free(self._driver) 3482 del self._driver
3483
3484 - def wait(self, timeout_sec):
3485 if timeout_sec is None or timeout_sec < 0.0: 3486 t = -1 3487 else: 3488 t = long(1000*timeout_sec) 3489 return pn_driver_wait(self._driver, t)
3490
3491 - def wakeup(self):
3492 return pn_driver_wakeup(self._driver)
3493
3494 - def listener(self, host, port):
3495 """Construct a listener""" 3496 return Listener._wrap_listener(pn_listener(self._driver, host, port, None), 3497 self)
3498
3499 - def pending_listener(self):
3500 return Listener._wrap_listener(pn_driver_listener(self._driver))
3501
3502 - def head_listener(self):
3503 return Listener._wrap_listener(pn_listener_head(self._driver))
3504
3505 - def connector(self, host, port):
3506 return Connector._wrap_connector(pn_connector(self._driver, host, port, None), 3507 self)
3508
3509 - def head_connector(self):
3510 return Connector._wrap_connector(pn_connector_head(self._driver))
3511
3512 - def pending_connector(self):
3513 return Connector._wrap_connector(pn_driver_connector(self._driver))
3514 3515 __all__ = [ 3516 "API_LANGUAGE", 3517 "IMPLEMENTATION_LANGUAGE", 3518 "ABORTED", 3519 "ACCEPTED", 3520 "AUTOMATIC", 3521 "PENDING", 3522 "MANUAL", 3523 "REJECTED", 3524 "RELEASED", 3525 "SETTLED", 3526 "UNDESCRIBED", 3527 "Array", 3528 "Collector", 3529 "Condition", 3530 "Connection", 3531 "Connector", 3532 "Data", 3533 "Delivery", 3534 "Disposition", 3535 "Described", 3536 "Driver", 3537 "DriverException", 3538 "Endpoint", 3539 "Event", 3540 "Link", 3541 "Listener", 3542 "Message", 3543 "MessageException", 3544 "Messenger", 3545 "MessengerException", 3546 "ProtonException", 3547 "Receiver", 3548 "SASL", 3549 "Sender", 3550 "Session", 3551 "SSL", 3552 "SSLDomain", 3553 "SSLSessionDetails", 3554 "SSLUnavailable", 3555 "SSLException", 3556 "Terminus", 3557 "Timeout", 3558 "Interrupt", 3559 "Transport", 3560 "TransportException", 3561 "char", 3562 "symbol", 3563 "timestamp", 3564 "ulong" 3565 ] 3566