Package paramiko :: Module ber
[frames] | no frames]

Source Code for Module paramiko.ber

  1  # Copyright (C) 2003-2007  Robey Pointer <robeypointer@gmail.com> 
  2  # 
  3  # This file is part of paramiko. 
  4  # 
  5  # Paramiko is free software; you can redistribute it and/or modify it under the 
  6  # terms of the GNU Lesser General Public License as published by the Free 
  7  # Software Foundation; either version 2.1 of the License, or (at your option) 
  8  # any later version. 
  9  # 
 10  # Paramiko is distributed in the hope that it will be useful, but WITHOUT ANY 
 11  # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 
 12  # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
 13  # details. 
 14  # 
 15  # You should have received a copy of the GNU Lesser General Public License 
 16  # along with Paramiko; if not, write to the Free Software Foundation, Inc., 
 17  # 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. 
 18  from paramiko.common import max_byte, zero_byte 
 19  from paramiko.py3compat import b, byte_ord, byte_chr, long 
 20   
 21  import paramiko.util as util 
 22   
 23   
24 -class BERException (Exception):
25 pass
26 27
28 -class BER(object):
29 """ 30 Robey's tiny little attempt at a BER decoder. 31 """ 32
33 - def __init__(self, content=bytes()):
34 self.content = b(content) 35 self.idx = 0
36
37 - def asbytes(self):
38 return self.content
39
40 - def __str__(self):
41 return self.asbytes()
42
43 - def __repr__(self):
44 return 'BER(\'' + repr(self.content) + '\')'
45
46 - def decode(self):
47 return self.decode_next()
48
49 - def decode_next(self):
50 if self.idx >= len(self.content): 51 return None 52 ident = byte_ord(self.content[self.idx]) 53 self.idx += 1 54 if (ident & 31) == 31: 55 # identifier > 30 56 ident = 0 57 while self.idx < len(self.content): 58 t = byte_ord(self.content[self.idx]) 59 self.idx += 1 60 ident = (ident << 7) | (t & 0x7f) 61 if not (t & 0x80): 62 break 63 if self.idx >= len(self.content): 64 return None 65 # now fetch length 66 size = byte_ord(self.content[self.idx]) 67 self.idx += 1 68 if size & 0x80: 69 # more complimicated... 70 # FIXME: theoretically should handle indefinite-length (0x80) 71 t = size & 0x7f 72 if self.idx + t > len(self.content): 73 return None 74 size = util.inflate_long(self.content[self.idx: self.idx + t], True) 75 self.idx += t 76 if self.idx + size > len(self.content): 77 # can't fit 78 return None 79 data = self.content[self.idx: self.idx + size] 80 self.idx += size 81 # now switch on id 82 if ident == 0x30: 83 # sequence 84 return self.decode_sequence(data) 85 elif ident == 2: 86 # int 87 return util.inflate_long(data) 88 else: 89 # 1: boolean (00 false, otherwise true) 90 raise BERException('Unknown ber encoding type %d (robey is lazy)' % ident)
91
92 - def decode_sequence(data):
93 out = [] 94 ber = BER(data) 95 while True: 96 x = ber.decode_next() 97 if x is None: 98 break 99 out.append(x) 100 return out
101 decode_sequence = staticmethod(decode_sequence) 102
103 - def encode_tlv(self, ident, val):
104 # no need to support ident > 31 here 105 self.content += byte_chr(ident) 106 if len(val) > 0x7f: 107 lenstr = util.deflate_long(len(val)) 108 self.content += byte_chr(0x80 + len(lenstr)) + lenstr 109 else: 110 self.content += byte_chr(len(val)) 111 self.content += val
112
113 - def encode(self, x):
114 if type(x) is bool: 115 if x: 116 self.encode_tlv(1, max_byte) 117 else: 118 self.encode_tlv(1, zero_byte) 119 elif (type(x) is int) or (type(x) is long): 120 self.encode_tlv(2, util.deflate_long(x)) 121 elif type(x) is str: 122 self.encode_tlv(4, x) 123 elif (type(x) is list) or (type(x) is tuple): 124 self.encode_tlv(0x30, self.encode_sequence(x)) 125 else: 126 raise BERException('Unknown type for encoding: %s' % repr(type(x)))
127
128 - def encode_sequence(data):
129 ber = BER() 130 for item in data: 131 ber.encode(item) 132 return ber.asbytes()
133 encode_sequence = staticmethod(encode_sequence)
134