diff --git a/CHANGES.txt b/CHANGES.txt index 26636a9..47bc2a2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Revision 0.4.2, released 09-08-2017 - Bumped pysnmp dependency to 4.3.9 - Fixed crash on string value rendering due to improper low-level pyasn1 .prettyOut() call +- Source code PEP8'ed - Author's e-mail changed, copyright extended towards 2017 Revision 0.4.1, released 12-02-2016 diff --git a/pysnmp_apps/cli/base.py b/pysnmp_apps/cli/base.py index 847602c..e74af7e 100644 --- a/pysnmp_apps/cli/base.py +++ b/pysnmp_apps/cli/base.py @@ -9,31 +9,50 @@ # AST + class ConfigToken: # Abstract grammar token - def __init__(self, type, attr=None): - self.type = type + def __init__(self, typ, attr=None): + self.type = typ self.attr = attr - def __eq__(self, other): return self.type == other - def __ne__(self, other): return self.type != other - def __lt__(self, other): return self.type < other - def __le__(self, other): return self.type <= other - def __gt__(self, other): return self.type > other - def __ge__(self, other): return self.type >= other - def __repr__(self): return self.attr or self.type + + def __eq__(self, other): + return self.type == other + + def __ne__(self, other): + return self.type != other + + def __lt__(self, other): + return self.type < other + + def __le__(self, other): + return self.type <= other + + def __gt__(self, other): + return self.type > other + + def __ge__(self, other): + return self.type >= other + + def __repr__(self): + return self.attr or self.type + def __str__(self): if self.attr is None: return '%s' % self.type else: return '%s(%s)' % (self.type, self.attr) + class ConfigNode: # AST node class -- N-ary tree - def __init__(self, type, attr=None): - self.type, self.attr = type, attr + def __init__(self, typ, attr=None): + self.type, self.attr = typ, attr self._kids = [] + def __getitem__(self, i): return self._kids[i] + def __len__(self): return len(self._kids) if sys.version_info[0] < 3: @@ -42,31 +61,47 @@ def __setslice__(self, low, high, seq): else: def __setitem__(self, idx, seq): self._kids[idx] = seq - def __eq__(self, other): return self.type == other - def __ne__(self, other): return self.type != other - def __lt__(self, other): return self.type < other - def __le__(self, other): return self.type <= other - def __gt__(self, other): return self.type > other - def __ge__(self, other): return self.type >= other + + def __eq__(self, other): + return self.type == other + + def __ne__(self, other): + return self.type != other + + def __lt__(self, other): + return self.type < other + + def __le__(self, other): + return self.type <= other + + def __gt__(self, other): + return self.type > other + + def __ge__(self, other): + return self.type >= other + def __str__(self): if self.attr is None: return self.type else: return '%s(%s)' % (self.type, self.attr) + # Scanner class __ScannerTemplate(spark.GenericScanner): - def tokenize(self, input): + def tokenize(self, data): self.rv = [] - spark.GenericScanner.tokenize(self, input) + spark.GenericScanner.tokenize(self, data) return self.rv + class __FirstLevelScanner(__ScannerTemplate): def t_string(self, s): r' [!#\$%&\'\(\)\*\+,\.//0-9<=>\?@A-Z\\\^_`a-z\{\|\}~][!#\$%&\'\(\)\*\+,\-\.//0-9<=>\?@A-Z\\\^_`a-z\{\|\}~]* ' self.rv.append(ConfigToken('string', s)) + class __SecondLevelScanner(__FirstLevelScanner): def t_semicolon(self, s): r' : ' @@ -90,10 +125,12 @@ def t_whitespace(self, s): ScannerTemplate = __SecondLevelScanner + # Parser class ParserTemplate(spark.GenericASTBuilder): initialSymbol = None + def __init__(self, startSymbol=None): if startSymbol is None: startSymbol = self.initialSymbol @@ -103,10 +140,12 @@ def terminal(self, token): # Reduce to homogeneous AST. return ConfigNode(token.type, token.attr) + # Generator class GeneratorTemplate(spark.GenericASTTraversal): - def __init__(self): pass # Skip superclass constructor + def __init__(self): # Skip superclass constructor + pass def typestring(self, node): return node.type @@ -132,4 +171,5 @@ def preorder(self, client, node): return client - def default(self, client, node): pass + def default(self, client, node): + pass diff --git a/pysnmp_apps/cli/main.py b/pysnmp_apps/cli/main.py index 82600c6..e152d00 100644 --- a/pysnmp_apps/cli/main.py +++ b/pysnmp_apps/cli/main.py @@ -32,6 +32,7 @@ # Usage + def getUsage(): return "\ Command-line SNMP tools version %s, written by Ilya Etingof \n\ @@ -42,14 +43,15 @@ def getUsage(): -V software release information\n\ -d dump raw packets\n\ -D category enable debugging [%s]\n\ -" % ( pysnmpAppsVersion, - pysmiVersion, - pysnmpVersion, - pysnmpMibsVersion, - pyasn1Version, - sys.version.replace('\n', ''), - debug and ','.join(debug.flagMap.keys()) or "") +" % (pysnmpAppsVersion, + pysmiVersion, + pysnmpVersion, + pysnmpMibsVersion, + pyasn1Version, + sys.version.replace('\n', ''), + debug and ','.join(debug.flagMap.keys()) or "") + # Scanner class MainScannerMixIn: @@ -71,13 +73,14 @@ def t_debug(self, s): # Parser + class MainParserMixIn: initialSymbol = 'Cmdline' def error(self, token): raise error.PySnmpError( 'Command-line parser error at token %s\n' % token - ) + ) def p_cmdline(self, args): ''' @@ -104,9 +107,10 @@ def p_cmdlineExt(self, args): Debug ::= debug string Debug ::= debug whitespace string ''' - + # Generator + class __MainGenerator(base.GeneratorTemplate): # SNMPv1/v2 def n_VersionInfo(self, cbCtx, node): @@ -127,6 +131,7 @@ def n_Debug(self, cbCtx, node): f = node[1].attr debug.setLogger(debug.Debug(*f.split(','))) + def generator(cbCtx, ast): snmpEngine, ctx = cbCtx ctx['mibViewController'] = view.MibViewController( diff --git a/pysnmp_apps/cli/mibview.py b/pysnmp_apps/cli/mibview.py index 2c7d79d..70e3213 100644 --- a/pysnmp_apps/cli/mibview.py +++ b/pysnmp_apps/cli/mibview.py @@ -18,43 +18,45 @@ # Usage + def getUsage(): - return "\ -MIB options:\n\ - -m MIB[:...] load given list of MIBs (ALL loads all compiled MIBs)\n\ - -M DIR[:...] look in given list of directories for MIBs\n\ - -P MIBOPTS Toggle various defaults controlling MIB parsing:\n\ - XS: search for ASN.1 MIBs in remote directories specified\n\ - in URL form. The @mib@ token in the URL is substituted\n\ - with actual MIB to be downloaded. Default repository\n\ - address is %s\n\ - XB: search for pysnmp MIBs in remote directories specified\n\ - in URL form. The @mib@ token in the URL is substituted\n\ - with actual MIB to be downloaded. Default repository\n\ - address is %s\n\ - -O OUTOPTS Toggle various defaults controlling output display:\n\ - q: removes the equal sign and type information\n\ - Q: removes the type information\n\ - f: print full OIDs on output\n\ - s: print only last symbolic element of OID\n\ - S: print MIB module-id plus last element\n\ - u: print OIDs using UCD-style prefix suppression\n\ - n: print OIDs numerically\n\ - e: print enums numerically\n\ - b: do not break OID indexes down\n\ - E: include a \" to escape the quotes in indices\n\ - X: place square brackets around each index\n\ - T: print value in hex\n\ - v: print values only (not OID = value)\n\ - U: don't print units\n\ - t: output timeticks values as raw numbers\n\ - -I INOPTS Toggle various defaults controlling input parsing:\n\ - h: don't apply DISPLAY-HINTs\n\ - u: top-level OIDs must have '.' prefix (UCD-style)\n\ -" % (defaultMibSourceUrl, defaultMibBorrowerUrl) + return """\ +MIB options: + -m MIB[:...] load given list of MIBs (ALL loads all compiled MIBs) + -M DIR[:...] look in given list of directories for MIBs + -P MIBOPTS Toggle various defaults controlling MIB parsing: + XS: search for ASN.1 MIBs in remote directories specified + in URL form. The @mib@ token in the URL is substituted + with actual MIB to be downloaded. Default repository + address is %s + XB: search for pysnmp MIBs in remote directories specified + in URL form. The @mib@ token in the URL is substituted + with actual MIB to be downloaded. Default repository + address is %s + -O OUTOPTS Toggle various defaults controlling output display: + q: removes the equal sign and type information + Q: removes the type information + f: print full OIDs on output + s: print only last symbolic element of OID + S: print MIB module-id plus last element + u: print OIDs using UCD-style prefix suppression + n: print OIDs numerically + e: print enums numerically + b: do not break OID indexes down + E: include a " to escape the quotes in indices + X: place square brackets around each index + T: print value in hex + v: print values only (not OID = value) + U: don't print units + t: output timeticks values as raw numbers + -I INOPTS Toggle various defaults controlling input parsing: + h: don't apply DISPLAY-HINTs + u: top-level OIDs must have '.' prefix (UCD-style) +""" % (defaultMibSourceUrl, defaultMibBorrowerUrl) # Scanner + class MibViewScannerMixIn: def t_mibfiles(self, s): r' -m ' @@ -78,6 +80,7 @@ def t_inputopts(self, s): # Parser + class MibViewParserMixIn: def p_mibView(self, args): ''' @@ -114,6 +117,7 @@ def p_mibView(self, args): # Generator + class __MibViewGenerator(base.GeneratorTemplate): # Load MIB modules def n_MibFile(self, cbCtx, node): @@ -132,7 +136,7 @@ def n_MibDir(self, cbCtx, node): def n_Url(self, cbCtx, node): snmpEngine, ctx = cbCtx - ctx['Url'] = node[0].attr+':'+node[2].attr + ctx['Url'] = node[0].attr + ':' + node[2].attr def n_ParserOption_exit(self, cbCtx, node): snmpEngine, ctx = cbCtx @@ -202,7 +206,7 @@ def n_OutputOption(self, cbCtx, node): else: raise error.PySnmpError( 'Unknown output option %s at %s' % (c, self) - ) + ) def n_InputOption(self, cbCtx, node): snmpEngine, ctx = cbCtx @@ -216,7 +220,7 @@ def n_InputOption(self, cbCtx, node): elif c == 'u': mibViewProxy.defaultOidPrefix = ( 'iso', 'org', 'dod', 'internet', 'mgmt', 'mib-2' - ) + ) elif c == 'r': pass elif c == 'h': @@ -224,7 +228,8 @@ def n_InputOption(self, cbCtx, node): else: raise error.PySnmpError( 'Unknown input option %s at %s' % (c, self) - ) + ) + def generator(cbCtx, ast): snmpEngine, ctx = cbCtx @@ -236,27 +241,29 @@ def generator(cbCtx, ast): snmpEngine, ctx = __MibViewGenerator().preorder((snmpEngine, ctx), ast) if 'MibDir' not in ctx: - ctx['MibDir'] = [ defaultMibSourceUrl ] + ctx['MibDir'] = [defaultMibSourceUrl] if 'MibBorrowers' not in ctx: - ctx['MibBorrowers'] = [ defaultMibBorrowerUrl ] + ctx['MibBorrowers'] = [defaultMibBorrowerUrl] compiler.addMibCompiler(snmpEngine.getMibBuilder(), sources=ctx['MibDir'], borrowers=ctx['MibBorrowers']) return snmpEngine, ctx + class UnknownSyntax: def prettyOut(self, val): return str(val) unknownSyntax = UnknownSyntax() - + # Proxy MIB view + class MibViewProxy: # Defaults defaultOidPrefix = ( 'iso', 'org', 'dod', 'internet', 'mgmt', 'mib-2', 'system' - ) + ) defaultMibs = ('SNMPv2-MIB',) defaultMibDirs = () @@ -297,10 +304,9 @@ def __init__(self, mibViewController): self.defaultMibDirs = os.environ['PYSNMPMIBDIRS'].split(':') if self.defaultMibDirs: mibViewController.mibBuilder.setMibSources( - *mibViewController.mibBuilder.getMibSources() + tuple( - [ builder.ZipMibSource(m).init() for m in self.defaultMibDirs ] - ) - ) + *(mibViewController.mibBuilder.getMibSources() + + tuple([builder.ZipMibSource(m).init() for m in self.defaultMibDirs])) + ) if self.defaultMibs: mibViewController.mibBuilder.loadModules(*self.defaultMibs) self.__oidValue = univ.ObjectIdentifier() @@ -324,7 +330,7 @@ def getPrettyOidVal(self, mibViewController, oid, val): name = label if not self.buildAbsoluteName: name = name[len(self.defaultOidPrefix):] - out = out + '.'.join([ str(x) for x in name ]) + out = out + '.'.join([str(x) for x in name]) if suffix: if suffix == (0,): @@ -333,9 +339,9 @@ def getPrettyOidVal(self, mibViewController, oid, val): m, n, s = mibViewController.getNodeLocation(prefix[:-1]) rowNode, = mibViewController.mibBuilder.importSymbols( m, n - ) + ) if self.buildNumericIndices: - out = out + '.' + '.'.join([ str(x) for x in suffix ]) + out = out + '.' + '.'.join([str(x) for x in suffix]) else: try: for i in rowNode.getIndicesFromInstId(suffix): @@ -367,25 +373,25 @@ def getPrettyOidVal(self, mibViewController, oid, val): syntax = mibNode.syntax else: syntax = val - if syntax is None: # lame Agent may return a non-instance OID + if syntax is None: # lame Agent may return a non-instance OID syntax = unknownSyntax if self.buildTypeInfo: out = out + '%s: ' % syntax.__class__.__name__ if self.buildRawVals: out = out + str(val) - elif self.buildHexVals: # XXX make it always in hex? + elif self.buildHexVals: # XXX make it always in hex? if self.__intValue.isSuperTypeOf(val): out = out + '%x' % int(val) elif self.__timeValue.isSameTypeWith(val): out = out + '%x' % int(val) elif self.__oidValue.isSuperTypeOf(val): - out = out + ' '.join([ '%x' % x for x in tuple(val) ]) + out = out + ' '.join(['%x' % x for x in tuple(val)]) else: - out = out + ' '.join([ '%.2x' % x for x in val.asNumbers()]) + out = out + ' '.join(['%.2x' % x for x in val.asNumbers()]) elif self.__timeValue.isSameTypeWith(val): if self.buildRawTimeTicks: out = out + str(int(val)) - else: # TimeTicks is not a TC + else: # TimeTicks is not a TC val = int(val) d, m = divmod(val, 8640000) out = out + '%d days ' % d diff --git a/pysnmp_apps/cli/msgmod.py b/pysnmp_apps/cli/msgmod.py index 54dd9fe..22582a8 100644 --- a/pysnmp_apps/cli/msgmod.py +++ b/pysnmp_apps/cli/msgmod.py @@ -9,14 +9,16 @@ # Usage + def getUsage(): - return "\ -SNMP message processing options:\n\ - -v VERSION SNMP version: \"1\"|\"2c\"|\"3\"\n\ -" + return """\ +SNMP message processing options: + -v VERSION SNMP version: "1"|"2c"|"3" +""" # Scanner + class MPScannerMixIn: def t_version(self, s): r' -v ' @@ -24,6 +26,7 @@ def t_version(self, s): # Parser + class MPParserMixIn: def p_mpSpec(self, args): ''' @@ -34,13 +37,15 @@ def p_mpSpec(self, args): # Generator + class __MPGenerator(base.GeneratorTemplate): _versionIdMap = { '1': 0, '2': 1, '2c': 1, '3': 3 - } + } + def n_SnmpVersionId(self, cbCtx, node): snmpEngine, ctx = cbCtx if len(node) > 2: @@ -52,6 +57,7 @@ def n_SnmpVersionId(self, cbCtx, node): else: raise error.PySnmpError('Bad version value %s' % versionId) + def generator(cbCtx, ast): snmpEngine, ctx = cbCtx __MPGenerator().preorder((snmpEngine, ctx), ast) diff --git a/pysnmp_apps/cli/pdu.py b/pysnmp_apps/cli/pdu.py index 50ae697..9c1fef8 100644 --- a/pysnmp_apps/cli/pdu.py +++ b/pysnmp_apps/cli/pdu.py @@ -15,6 +15,7 @@ # Usage + def getReadUsage(): return "\ Management parameters:\n\ @@ -25,10 +26,13 @@ def getReadUsage(): # Scanner -class ReadPduScannerMixIn: pass + +class ReadPduScannerMixIn: + pass # Parser + class ReadPduParserMixIn: def p_varBindSpec(self, args): ''' @@ -61,6 +65,7 @@ def p_pduSpec(self, args): # Generator + class __ReadPduGenerator(base.GeneratorTemplate): def n_ModName(self, cbCtx, node): snmpEngine, ctx = cbCtx @@ -104,33 +109,33 @@ def n_VarName_exit(self, cbCtx, node): intTypes = (int, long) else: intTypes = (int,) - if [ x for x in suffix if not isinstance(x, intTypes) ]: + if [x for x in suffix if not isinstance(x, intTypes)]: raise error.PySnmpError( 'Cant resolve object at: %s' % (suffix,) - ) + ) modName, nodeDesc, _suffix = mibViewCtl.getNodeLocation(oid) mibNode, = mibViewCtl.mibBuilder.importSymbols(modName, nodeDesc) if not hasattr(self, '_MibTableColumn'): self._MibTableColumn, = mibViewCtl.mibBuilder.importSymbols( 'SNMPv2-SMI', 'MibTableColumn' - ) + ) if isinstance(mibNode, self._MibTableColumn): # Table column if 'objectIndices' in ctx: modName, nodeDesc, _suffix = mibViewCtl.getNodeLocation( mibNode.name[:-1] - ) + ) mibNode, = mibViewCtl.mibBuilder.importSymbols( modName, nodeDesc - ) + ) suffix = suffix + mibNode.getInstIdFromIndices( *ctx['objectIndices'] - ) + ) else: if 'objectIndices' in ctx: raise error.PySnmpError( 'Cant resolve indices: %s' % (ctx['objectIndices'],) - ) + ) ctx['varName'] = oid + suffix if 'objectName' in ctx: del ctx['objectName'] @@ -140,7 +145,7 @@ def n_VarName_exit(self, cbCtx, node): def n_VarBind_exit(self, cbCtx, node): snmpEngine, ctx = cbCtx if 'varBinds' not in ctx: - ctx['varBinds'] = [ (ctx['varName'], None) ] + ctx['varBinds'] = [(ctx['varName'], None)] else: ctx['varBinds'].append((ctx['varName'], None)) del ctx['varName'] @@ -148,30 +153,32 @@ def n_VarBind_exit(self, cbCtx, node): def n_VarBinds_exit(self, cbCtx, node): snmpEngine, ctx = cbCtx if 'varBinds' not in ctx or not ctx['varBinds']: - ctx['varBinds'] = [ ((1, 3, 6), None) ] + ctx['varBinds'] = [((1, 3, 6), None)] + def readPduGenerator(cbCtx, ast): __ReadPduGenerator().preorder(cbCtx, ast) # Write class + def getWriteUsage(): - return "\ -Management parameters:\n\ - <[\"mib-module\"::]\"object-name\"|\"oid\" \"type\"|\"=\" value> ...\n\ - mib-module: MIB name (such as SNMPv2-MIB)\n\ - object-name: MIB symbol (sysDescr.0) or OID\n\ - type: MIB value type\n\ - i integer\n\ - u unsigned integer\n\ - s string\n\ - n NULL\n\ - o ObjectIdentifier\n\ - t TimeTicks\n\ - a IP address\n\ - =: use MIB for value type lookup\n\ - value: value to write\n\ -" + return """\ +Management parameters: + <["mib-module"::]"object-name"|"oid" "type"|"=" value> ... + mib-module: MIB name (such as SNMPv2-MIB) + object-name: MIB symbol (sysDescr.0) or OID + type: MIB value type + i integer + u unsigned integer + s string + n NULL + o ObjectIdentifier + t TimeTicks + a IP address + =: use MIB for value type lookup + value: value to write +""" # Scanner @@ -179,6 +186,7 @@ def getWriteUsage(): # Parser + class WritePduParserMixIn(ReadPduParserMixIn): def p_varBindSpec(self, args): ''' @@ -189,6 +197,7 @@ def p_varBindSpec(self, args): # Generator + class __WritePduGenerator(__ReadPduGenerator): _typeMap = { 'i': rfc1902.Integer(), @@ -198,7 +207,7 @@ class __WritePduGenerator(__ReadPduGenerator): 'o': univ.ObjectIdentifier(), 't': rfc1902.TimeTicks(), 'a': rfc1902.IpAddress() - } + } def n_VarType(self, cbCtx, node): snmpEngine, ctx = cbCtx @@ -222,11 +231,11 @@ def n_VarBind_exit(self, cbCtx, node): raise error.PySnmpError( 'Found MIB scalar %s but non-scalar given %s' % (mibNode.name + (0,), ctx['varName']) - ) + ) else: raise error.PySnmpError( 'Variable %s has no syntax' % (ctx['varName'],) - ) + ) else: try: val = self._typeMap[ctx['varType']] @@ -239,10 +248,11 @@ def n_VarBind_exit(self, cbCtx, node): raise error.PySnmpError(sys.exc_info()[1]) if 'varBinds' not in ctx: - ctx['varBinds'] = [ (ctx['varName'], val) ] + ctx['varBinds'] = [(ctx['varName'], val)] else: ctx['varBinds'].append((ctx['varName'], val)) + def writePduGenerator(cbCtx, ast): snmpEngine, ctx = cbCtx __WritePduGenerator().preorder((snmpEngine, ctx), ast) diff --git a/pysnmp_apps/cli/secmod.py b/pysnmp_apps/cli/secmod.py index 942da47..d767d0b 100644 --- a/pysnmp_apps/cli/secmod.py +++ b/pysnmp_apps/cli/secmod.py @@ -10,25 +10,27 @@ # Usage + def getUsage(): - return "\ -SNMPv1/v2c security options:\n\ - -c COMMUNITY community name\n\ -SNMPv3 security options:\n\ - -u SECURITY-NAME USM user security name\n\ - -l SECURITY-LEVEL \"noAuthNoPriv\"|\"authNoPriv\"|\"authPriv\"\n\ - -a AUTH-PROTOCOL \"MD5\"|\"SHA\"\n\ - -A AUTH-KEY user authentication key\n\ - -x PRIV-PROTOCOL \"DES\"|\"AES\"\n\ - -X PRIV-KEY user privacy key\n\ - -E CONTEXT-ENGINE-ID authoritative context engine ID\n\ - -e ENGINE-ID authoritative SNMP engine ID (will discover)\n\ - -n CONTEXT-NAME authoritative context name\n\ - -Z BOOTS,TIME destination SNMP engine boots/uptime\n\ -" + return """\ +SNMPv1/v2c security options: + -c COMMUNITY community name +SNMPv3 security options: + -u SECURITY-NAME USM user security name + -l SECURITY-LEVEL "noAuthNoPriv"|"authNoPriv"|"authPriv" + -a AUTH-PROTOCOL "MD5"|"SHA" + -A AUTH-KEY user authentication key + -x PRIV-PROTOCOL "DES"|"AES" + -X PRIV-KEY user privacy key + -E CONTEXT-ENGINE-ID authoritative context engine ID + -e ENGINE-ID authoritative SNMP engine ID (will discover) + -n CONTEXT-NAME authoritative context name + -Z BOOTS,TIME destination SNMP engine boots/uptime +""" # Scanner + class SMScannerMixIn: # SNMPv1/v2 @@ -80,6 +82,7 @@ def t_engineBoots(self, s): # Parser + class SMParserMixIn: def p_smSpec(self, args): ''' @@ -124,6 +127,7 @@ def p_smSpec(self, args): ''' # Generator + class __SMGenerator(base.GeneratorTemplate): # SNMPv1/v2 def n_Community(self, cbCtx, node): @@ -209,7 +213,7 @@ def n_ContextName(self, cbCtx, node): else: ctx['contextName'] = node[1].attr - def n_EngineBoots(self, cbCtx, node): # XXX + def n_EngineBoots(self, cbCtx, node): # XXX snmpEngine, ctx = cbCtx if len(node) > 2: ctx['engineBoots'] = node[2].attr @@ -220,6 +224,7 @@ def n_EngineBoots(self, cbCtx, node): # XXX else: ctx['engineTime'] = 0 + def generator(cbCtx, ast): snmpEngine, ctx = cbCtx __SMGenerator().preorder(cbCtx, ast) @@ -230,10 +235,13 @@ def generator(cbCtx, ast): if 'securityLevel' not in ctx: raise error.PySnmpError('Security level not specified') if ctx['securityLevel'] == 'noAuthNoPriv': - if 'authKey' in ctx: del ctx['authKey'] - if 'privKey' in ctx: del ctx['privKey'] + if 'authKey' in ctx: + del ctx['authKey'] + if 'privKey' in ctx: + del ctx['privKey'] elif ctx['securityLevel'] == 'authNoPriv': - if 'privKey' in ctx: del ctx['privKey'] + if 'privKey' in ctx: + del ctx['privKey'] if 'authKey' in ctx: if 'authProtocol' not in ctx: ctx['authProtocol'] = config.usmHMACMD5AuthProtocol @@ -253,7 +261,7 @@ def generator(cbCtx, ast): ctx['authKey'], ctx['privProtocol'], ctx['privKey'] - ) + ) # edit SNMP engine boots/uptime if 'engineBoots' in ctx: snmpEngineBoots, = snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.importSymbols('__SNMP-FRAMEWORK-MIB', 'snmpEngineBoots') @@ -265,7 +273,7 @@ def generator(cbCtx, ast): snmpEngineTime.setSyntax( snmpEngineTime.getSyntax().clone(ctx['engineTime']) ) - else: # SNMPv1/v2c + else: # SNMPv1/v2c if 'communityName' not in ctx: raise error.PySnmpError('Community name not specified') ctx['securityName'] = 'my-agent' @@ -278,6 +286,6 @@ def generator(cbCtx, ast): ctx['paramsName'] = '%s-params' % ctx['securityName'] config.addTargetParams( - snmpEngine, ctx['paramsName'],ctx['securityName'], + snmpEngine, ctx['paramsName'], ctx['securityName'], ctx['securityLevel'], ctx['versionId'] - ) + ) diff --git a/pysnmp_apps/cli/target.py b/pysnmp_apps/cli/target.py index 804037f..84da5d3 100644 --- a/pysnmp_apps/cli/target.py +++ b/pysnmp_apps/cli/target.py @@ -13,19 +13,21 @@ # Usage + def getUsage(): - return "\ -Communication options\n\ - -r RETRIES number of retries when sending request\n\ - -t TIMEOUT request timeout (in seconds)\n\ -Agent address:\n\ - [:]\n\ - transport-domain: \"udp\"|\"udp6\"\n\ - transport-endpoint: \"IP\"|\"IPv6\"|\"FQDN\"[:\"port\"]\n\ -" + return """\ +Communication options + -r RETRIES number of retries when sending request + -t TIMEOUT request timeout (in seconds) +Agent address: + [:] + transport-domain: "udp"|"udp6" + transport-endpoint: "IP"|"IPv6"|"FQDN"[:"port"] +""" # Scanner + class TargetScannerMixIn: def t_retries(self, s): r' -r ' @@ -41,6 +43,7 @@ def t_transport(self, s): # Parser + class TargetParserMixIn: def p_targetSpec(self, args): ''' @@ -69,33 +72,36 @@ def p_targetSpec(self, args): ''' # Generator -if hasattr(socket, 'has_ipv6') and socket.has_ipv6 and \ - hasattr(socket, 'getaddrinfo'): +if (hasattr(socket, 'has_ipv6') and socket.has_ipv6 and + hasattr(socket, 'getaddrinfo')): _getaddrinfo = socket.getaddrinfo else: def _getaddrinfo(a, b, c, d): raise SnmpApplicationError('IPv6 not supported by the system') + class __TargetGeneratorPassOne(base.GeneratorTemplate): defPort = '161' _snmpDomainMap = { - 'udp': (udp.snmpUDPDomain, udp.UdpSocketTransport, lambda h,p: (socket.gethostbyname(h), int(p))), - 'udp6': (udp6.snmpUDP6Domain, udp6.Udp6SocketTransport, lambda h,p: (_getaddrinfo(h,p,socket.AF_INET6, socket.SOCK_DGRAM)[0][4])) - } + 'udp': (udp.snmpUDPDomain, udp.UdpSocketTransport, lambda h, p: (socket.gethostbyname(h), int(p))), + 'udp6': (udp6.snmpUDP6Domain, udp6.Udp6SocketTransport, lambda h, p: (_getaddrinfo(h, p, socket.AF_INET6, socket.SOCK_DGRAM)[0][4])) + } _snmpDomainNameMap = { 2: 'udp', 10: 'udp6' - } + } + def n_Transport(self, cbCtx, node): snmpEngine, ctx = cbCtx if node[0].attr in self._snmpDomainMap: - ( ctx['transportDomain'], - ctx['transportModule'], - ctx['addrRewriteFun'] ) = self._snmpDomainMap[node[0].attr] + (ctx['transportDomain'], + ctx['transportModule'], + ctx['addrRewriteFun']) = self._snmpDomainMap[node[0].attr] else: raise error.PySnmpError( 'Unsupported transport domain %s' % node[0].attr - ) + ) + def n_Endpoint(self, cbCtx, node): snmpEngine, ctx = cbCtx ctx['transportAddress'] = node[0].attr @@ -104,9 +110,9 @@ def n_IPv6(self, cbCtx, node): snmpEngine, ctx = cbCtx if not len(node): if 'transportDomain' not in ctx: - ( ctx['transportDomain'], - ctx['transportModule'], - ctx['addrRewriteFun'] ) = self._snmpDomainMap['udp6'] + (ctx['transportDomain'], + ctx['transportModule'], + ctx['addrRewriteFun']) = self._snmpDomainMap['udp6'] return if ctx.get('transportAddress') is None: ctx['transportAddress'] = '' @@ -126,22 +132,24 @@ def n_Agent_exit(self, cbCtx, node): f = _getaddrinfo(ctx['transportAddress'], 0)[0][0] except: f = -1 - ( ctx['transportDomain'], - ctx['transportModule'], - ctx['addrRewriteFun'] ) = self._snmpDomainMap[ + (ctx['transportDomain'], + ctx['transportModule'], + ctx['addrRewriteFun']) = self._snmpDomainMap[ self._snmpDomainNameMap.get(f, 'udp') - ] + ] if 'transportFormat' in ctx: ctx['transportAddress'] = ( ctx['transportAddress'], ctx['transportFormat'] - ) + ) del ctx['transportFormat'] else: - ctx['transportAddress'] = ( ctx['transportAddress'], self.defPort) + ctx['transportAddress'] = (ctx['transportAddress'], self.defPort) + class __TargetGeneratorTrapPassOne(__TargetGeneratorPassOne): defPort = '162' + class __TargetGeneratorPassTwo(base.GeneratorTemplate): def n_Retries(self, cbCtx, node): snmpEngine, ctx = cbCtx @@ -157,9 +165,9 @@ def n_Timeout(self, cbCtx, node): snmpEngine, ctx = cbCtx try: if len(node) > 2: - ctx['timeout'] = int(node[2].attr)*100 + ctx['timeout'] = int(node[2].attr) * 100 else: - ctx['timeout'] = int(node[1].attr)*100 + ctx['timeout'] = int(node[1].attr) * 100 except: raise error.PySnmpError('Bad timeout value') @@ -176,23 +184,25 @@ def n_Agent_exit(self, cbCtx, node): ctx.get('timeout', 100), ctx.get('retryCount', 5), tagList=ctx.get('transportTag', '') - ) + ) config.addSocketTransport( snmpEngine, ctx['transportDomain'], ctx['transportModule']().openClientMode() - ) + ) __TargetGeneratorTrapPassTwo = __TargetGeneratorPassTwo + def generator(cbCtx, ast): snmpEngine, ctx = cbCtx __TargetGeneratorPassTwo().preorder( __TargetGeneratorPassOne().preorder((snmpEngine, ctx), ast), ast - ) + ) + def generatorTrap(cbCtx, ast): snmpEngine, ctx = cbCtx __TargetGeneratorTrapPassTwo().preorder( __TargetGeneratorTrapPassOne().preorder((snmpEngine, ctx), ast), ast - ) + ) diff --git a/pysnmp_apps/error.py b/pysnmp_apps/error.py index b6681c1..60c2e95 100644 --- a/pysnmp_apps/error.py +++ b/pysnmp_apps/error.py @@ -6,4 +6,6 @@ # from pysnmp import error -class SnmpApplicationError(error.PySnmpError): pass + +class SnmpApplicationError(error.PySnmpError): + pass diff --git a/scripts/snmpbulkwalk.py b/scripts/snmpbulkwalk.py index 7bd1a81..1b6bf87 100755 --- a/scripts/snmpbulkwalk.py +++ b/scripts/snmpbulkwalk.py @@ -6,13 +6,16 @@ # # GETBULK command generator # -import sys, time, traceback +import sys +import time +import traceback from pysnmp_apps.cli import main, msgmod, secmod, target, pdu, mibview, base from pysnmp.entity import engine from pysnmp.entity.rfc3413 import cmdgen from pysnmp.proto import rfc1902 from pysnmp import error + def getUsage(): return "Usage: %s [OPTIONS] \n\ %s%s%s%s\ @@ -33,6 +36,7 @@ def getUsage(): # Construct c/l interpreter for this app + class Scanner(msgmod.MPScannerMixIn, secmod.SMScannerMixIn, mibview.MibViewScannerMixIn, @@ -44,6 +48,7 @@ def t_appopts(self, s): r' -C ' self.rv.append(base.ConfigToken('appopts')) + class Parser(msgmod.MPParserMixIn, secmod.SMParserMixIn, mibview.MibViewParserMixIn, @@ -59,6 +64,7 @@ def p_appOptions(self, args): ApplicationOption ::= appopts string ''' + class __Generator(base.GeneratorTemplate): def n_ApplicationOption(self, cbCtx, node): snmpEngine, ctx = cbCtx @@ -81,7 +87,7 @@ def n_ApplicationOption(self, cbCtx, node): elif c == 'p': ctx['reportFoundVars'] = 1 p = None - elif p is not None and c >= '0' and c <= '9': + elif p is not None and '0' <= c <= '9': p.append(c) else: raise error.PySnmpError('bad -C option - "%s"' % c) @@ -90,51 +96,56 @@ def n_ApplicationOption(self, cbCtx, node): if r is not None: ctx['maxRepetitions'] = int(''.join(r)) + def generator(cbCtx, ast): snmpEngine, ctx = cbCtx return __Generator().preorder((snmpEngine, ctx), ast) + def cbFun(snmpEngine, sendRequestHandle, errorIndication, errorStatus, errorIndex, varBindTable, cbCtx): if errorIndication: - if errorIndication != 'oidNotIncreasing' or \ - not ctx.get('ignoreNonIncreasingOids'): + if (errorIndication != 'oidNotIncreasing' or + not ctx.get('ignoreNonIncreasingOids')): sys.stderr.write('Error: %s\n' % errorIndication) return if errorStatus: sys.stderr.write( '%s at %s\n' % - ( errorStatus.prettyPrint(), - errorIndex and varBindTable[0][int(errorIndex)-1] or '?' ) - ) + (errorStatus.prettyPrint(), + errorIndex and varBindTable[0][int(errorIndex) - 1] or '?') + ) return for varBindRow in varBindTable: - colIdx = -1; inTableFlag = 0 + colIdx = -1 + inTableFlag = 0 for oid, val in varBindRow: colIdx += 1 if cbCtx['myHeadVars'][colIdx].isPrefixOf(oid): - sys.stdout.write('%s\n' % cbCtx['mibViewProxy'].getPrettyOidVal( - cbCtx['mibViewController'], oid, val + sys.stdout.write( + '%s\n' % cbCtx['mibViewProxy'].getPrettyOidVal( + cbCtx['mibViewController'], oid, val ) ) inTableFlag += 1 if cbCtx.get('reportFoundVars'): cbCtx['reportFoundVars'] += inTableFlag if not inTableFlag: - return # stop on end-of-table - return 1 # continue walking + return # stop on end-of-table + return 1 # continue walking # Run SNMP engine snmpEngine = engine.SnmpEngine() - + +ctx = {} + try: # Parse c/l into AST ast = Parser().parse( Scanner().tokenize(' '.join(sys.argv[1:])) ) - ctx = {} # Apply configuration to SNMP entity main.generator((snmpEngine, ctx), ast) @@ -145,7 +156,7 @@ def cbFun(snmpEngine, sendRequestHandle, errorIndication, pdu.readPduGenerator((snmpEngine, ctx), ast) generator((snmpEngine, ctx), ast) - ctx['myHeadVars'] = [ rfc1902.ObjectName(x[0]) for x in ctx['varBinds'] ] + ctx['myHeadVars'] = [rfc1902.ObjectName(x[0]) for x in ctx['varBinds']] cmdgen.BulkCommandGenerator().sendVarBinds( snmpEngine, diff --git a/scripts/snmpget.py b/scripts/snmpget.py index e023967..d0a7fb7 100755 --- a/scripts/snmpget.py +++ b/scripts/snmpget.py @@ -7,12 +7,14 @@ # # GET command generator # -import sys,traceback +import sys +import traceback from pysnmp_apps.cli import main, msgmod, secmod, target, pdu, mibview, base from pysnmp.entity import engine from pysnmp.entity.rfc3413 import cmdgen from pysnmp import error + def getUsage(): return "Usage: %s [OPTIONS] \n\ %s%s%s%s%s%s" % (sys.argv[0], @@ -25,13 +27,16 @@ def getUsage(): # Construct c/l interpreter for this app + class Scanner(msgmod.MPScannerMixIn, secmod.SMScannerMixIn, mibview.MibViewScannerMixIn, target.TargetScannerMixIn, pdu.ReadPduScannerMixIn, main.MainScannerMixIn, - base.ScannerTemplate): pass + base.ScannerTemplate): + pass + class Parser(msgmod.MPParserMixIn, secmod.SMParserMixIn, @@ -39,7 +44,10 @@ class Parser(msgmod.MPParserMixIn, target.TargetParserMixIn, pdu.ReadPduParserMixIn, main.MainParserMixIn, - base.ParserTemplate): pass + base.ParserTemplate): + pass + + def cbFun(snmpEngine, sendRequestHandle, errorIndication, errorStatus, errorIndex, varBinds, cbCtx): @@ -48,12 +56,13 @@ def cbFun(snmpEngine, sendRequestHandle, errorIndication, elif errorStatus: sys.stderr.write( '%s at %s\n' % - ( errorStatus.prettyPrint(), - errorIndex and varBinds[int(errorIndex)-1] or '?' ) - ) + (errorStatus.prettyPrint(), + errorIndex and varBinds[int(errorIndex) - 1] or '?') + ) else: for oid, val in varBinds: - sys.stdout.write('%s\n' % cbCtx['mibViewProxy'].getPrettyOidVal( + sys.stdout.write( + '%s\n' % cbCtx['mibViewProxy'].getPrettyOidVal( cbCtx['mibViewController'], oid, val ) ) diff --git a/scripts/snmpset.py b/scripts/snmpset.py index c8b2279..a55d79c 100755 --- a/scripts/snmpset.py +++ b/scripts/snmpset.py @@ -7,12 +7,14 @@ # # SET command generator # -import sys, traceback +import sys +import traceback from pysnmp_apps.cli import main, msgmod, secmod, target, pdu, mibview, base from pysnmp.entity import engine from pysnmp.entity.rfc3413 import cmdgen from pysnmp import error + def getUsage(): return "Usage: %s [OPTIONS] \n\ %s%s%s%s%s%s" % (sys.argv[0], @@ -25,13 +27,16 @@ def getUsage(): # Construct c/l interpreter for this app + class Scanner(msgmod.MPScannerMixIn, secmod.SMScannerMixIn, mibview.MibViewScannerMixIn, target.TargetScannerMixIn, pdu.WritePduScannerMixIn, main.MainScannerMixIn, - base.ScannerTemplate): pass + base.ScannerTemplate): + pass + class Parser(msgmod.MPParserMixIn, secmod.SMParserMixIn, @@ -39,10 +44,13 @@ class Parser(msgmod.MPParserMixIn, target.TargetParserMixIn, pdu.WritePduParserMixIn, main.MainParserMixIn, - base.ParserTemplate): pass + base.ParserTemplate): + pass # Run SNMP engine + + def cbFun(snmpEngine, sendRequestHandle, errorIndication, errorStatus, errorIndex, varBinds, cbCtx): if errorIndication: @@ -50,12 +58,13 @@ def cbFun(snmpEngine, sendRequestHandle, errorIndication, elif errorStatus: sys.stderr.write( '%s at %s\n' % - ( errorStatus.prettyPrint(), - errorIndex and varBinds[int(errorIndex)-1] or '?' ) + (errorStatus.prettyPrint(), + errorIndex and varBinds[int(errorIndex) - 1] or '?') ) else: for oid, val in varBinds: - sys.stdout.write('%s\n' % cbCtx['mibViewProxy'].getPrettyOidVal( + sys.stdout.write( + '%s\n' % cbCtx['mibViewProxy'].getPrettyOidVal( cbCtx['mibViewController'], oid, val ) ) diff --git a/scripts/snmptranslate.py b/scripts/snmptranslate.py index a896f06..7e4c46b 100755 --- a/scripts/snmptranslate.py +++ b/scripts/snmptranslate.py @@ -7,16 +7,15 @@ # # Command-line MIB browser # -import sys, traceback -from pyasn1.type import univ +import sys +import traceback from pysnmp.entity import engine -from pysnmp.proto import rfc3412 -from pysnmp.smi import builder from pysnmp_apps.cli import main, pdu, mibview, base from pysnmp.smi.error import NoSuchObjectError from pysnmp import error from pyasn1.type import univ + def getUsage(): return "Usage: %s [OPTIONS] \n\ %s%s\ @@ -34,6 +33,7 @@ def getUsage(): # Construct c/l interpreter for this app + class Scanner(mibview.MibViewScannerMixIn, pdu.ReadPduScannerMixIn, main.MainScannerMixIn, @@ -42,6 +42,7 @@ def t_transopts(self, s): r' -T ' self.rv.append(base.ConfigToken('transopts')) + class Parser(mibview.MibViewParserMixIn, pdu.ReadPduParserMixIn, main.MainParserMixIn, @@ -58,6 +59,7 @@ def p_transOptions(self, args): ''' + class __Generator(base.GeneratorTemplate): def n_TranslateOption(self, cbCtx, node): snmpEngine, ctx = cbCtx @@ -82,10 +84,12 @@ def n_TranslateOption(self, cbCtx, node): else: raise error.PySnmpError('unsupported sub-option \"%s\"' % c) + def generator(cbCtx, ast): - snmpEngine, ctx= cbCtx + snmpEngine, ctx = cbCtx return __Generator().preorder((snmpEngine, ctx), ast) + class MibViewProxy(mibview.MibViewProxy): # MIB translate options translateFullDetails = 0 @@ -112,38 +116,36 @@ def getPrettyOidVal(self, mibViewController, oid, val): if self.translateFullDetails: if suffix: out = '%s::%s' % (modName, nodeDesc) - out = out + ' [ %s ]' % '.'.join([ str(x) for x in suffix ]) + out = out + ' [ %s ]' % '.'.join([str(x) for x in suffix]) out = out + '\n' else: out = out + '%s::%s\n%s ::= { %s }' % ( modName, nodeDesc, mibNode.asn1Print(), - ' '.join( - map(lambda x,y: '%s(%s)' % (y, x), prefix, label) - ) - ) + ' '.join(map(lambda x, y: '%s(%s)' % (y, x), prefix, label)) + ) elif self.translateTrivial: out = '%s ::= { %s %s' % ( len(label) > 1 and label[-2] or ".", label[-1], prefix[-1] ) if suffix: - out = out + ' [ %s ]' % '.'.join([ str(x) for x in suffix ]) + out = out + ' [ %s ]' % '.'.join([str(x) for x in suffix]) out = out + ' }' elif self.translateLabeledOid: out = '.' + '.'.join( - map(lambda x,y: '%s(%s)' % (y, x), prefix, label) + map(lambda x, y: '%s(%s)' % (y, x), prefix, label) ) if suffix: - out = out + ' [ %s ]' % '.'.join([ str(x) for x in suffix ]) + out = out + ' [ %s ]' % '.'.join([str(x) for x in suffix]) elif self.translateNumericOid: - out = '.' + '.'.join([ str(x) for x in prefix ]) + out = '.' + '.'.join([str(x) for x in prefix]) if suffix: - out = out + ' [ %s ]' % '.'.join([ str(x) for x in suffix ]) + out = out + ' [ %s ]' % '.'.join([str(x) for x in suffix]) elif self.translateSymbolicOid: out = '.' + '.'.join(label) if suffix: - out = out + ' [ %s ]' % '.'.join([ str(x) for x in suffix ]) + out = out + ' [ %s ]' % '.'.join([str(x) for x in suffix]) if not out: out = mibview.MibViewProxy.getPrettyOidVal( self, mibViewController, oid, self._null @@ -156,14 +158,14 @@ def getPrettyOidVal(self, mibViewController, oid, val): mibBuilder = snmpEngine.getMibBuilder() mibBuilder.loadTexts = True +ctx = {} + try: # Parse c/l into AST ast = Parser().parse( Scanner().tokenize(' '.join(sys.argv[1:])) ) - ctx = {} - # Apply configuration to SNMP entity main.generator((snmpEngine, ctx), ast) ctx['mibViewProxy'] = MibViewProxy(ctx['mibViewController']) @@ -190,7 +192,8 @@ def getPrettyOidVal(self, mibViewController, oid, val): while 1: if val is None: val = univ.Null() - sys.stdout.write('%s\n' % ctx['mibViewProxy'].getPrettyOidVal( + sys.stdout.write( + '%s\n' % ctx['mibViewProxy'].getPrettyOidVal( ctx['mibViewController'], oid, val ) ) diff --git a/scripts/snmptrap.py b/scripts/snmptrap.py index 68105ed..082e540 100755 --- a/scripts/snmptrap.py +++ b/scripts/snmptrap.py @@ -7,7 +7,9 @@ # # Notificaton Originator # -import sys, socket, time, traceback +import sys +import socket +import traceback from pysnmp_apps.cli import main, msgmod, secmod, target, pdu, mibview, base from pysnmp.entity import engine, config from pysnmp.entity.rfc3413 import ntforg @@ -15,29 +17,32 @@ from pysnmp.proto.api import v1, v2c from pysnmp import error + def getUsage(): - return "Usage: %s [OPTIONS] \n\ -%s%s%s%s\ -TRAP options:\n\ - -C: set various application specific behaviours:\n\ - i: send INFORM-PDU, expect a response\n\ -%s\ -SNMPv1 TRAP management parameters:\n\ - enterprise-oid agent generic-trap specific-trap uptime \n\ - where:\n\ + return """\ +Usage: %s [OPTIONS] +%s%s%s%s +TRAP options: + -C: set various application specific behaviours: + i: send INFORM-PDU, expect a response +%s +SNMPv1 TRAP management parameters: + enterprise-oid agent generic-trap specific-trap uptime + where: generic-trap: coldStart|warmStart|linkDown|linkUp|authenticationFailure|egpNeighborLoss|enterpriseSpecific\n\ -SNMPv2/SNMPv3 management parameters:\n\ - uptime trap-oid \n\ -%s" % (sys.argv[0], - main.getUsage(), - msgmod.getUsage(), - secmod.getUsage(), - mibview.getUsage(), - target.getUsage(), - pdu.getWriteUsage()) +SNMPv2/SNMPv3 management parameters: + uptime trap-oid +%s""" % (sys.argv[0], + main.getUsage(), + msgmod.getUsage(), + secmod.getUsage(), + mibview.getUsage(), + target.getUsage(), + pdu.getWriteUsage()) # Construct c/l interpreter for this app + class Scanner(msgmod.MPScannerMixIn, secmod.SMScannerMixIn, mibview.MibViewScannerMixIn, @@ -53,6 +58,7 @@ def t_genericTrap(self, s): r' coldStart|warmStart|linkDown|linkUp|authenticationFailure|egpNeighborLoss|enterpriseSpecific ' self.rv.append(base.ConfigToken('genericTrap', s)) + class Parser(msgmod.MPParserMixIn, secmod.SMParserMixIn, mibview.MibViewParserMixIn, @@ -87,6 +93,7 @@ def p_appOptions(self, args): ApplicationOption ::= appopts string ''' + class __Generator(base.GeneratorTemplate): def n_ApplicationOption(self, cbCtx, node): snmpEngine, ctx = cbCtx @@ -101,7 +108,7 @@ def n_ApplicationOption(self, cbCtx, node): raise error.PySnmpError('bad -C option - "%s"' % c) def n_EnterpriseOid(self, cbCtx, node): - snmpEngine, ctx= cbCtx + snmpEngine, ctx = cbCtx ctx['EnterpriseOid'] = node[0].attr def n_AgentName(self, cbCtx, node): @@ -111,7 +118,7 @@ def n_AgentName(self, cbCtx, node): except socket.error: raise error.PySnmpError( 'Bad agent name %s: %s' % (node[0].attr, sys.exc_info()[1]) - ) + ) def n_GenericTrap(self, cbCtx, node): snmpEngine, ctx = cbCtx @@ -156,34 +163,39 @@ def n_TrapV2cParams_exit(self, cbCtx, node): v2c.apiTrapPDU.setDefaults(pdu) v2c.apiPDU.setVarBinds( pdu, - [ ( v2c.ObjectIdentifier('1.3.6.1.2.1.1.3.0'), v2c.TimeTicks(ctx['Uptime'])), - ( v2c.ObjectIdentifier('1.3.6.1.6.3.1.1.4.1.0'), v2c.ObjectIdentifier(ctx['TrapOid']) ) ] + [(v2c.ObjectIdentifier('1.3.6.1.2.1.1.3.0'), v2c.TimeTicks(ctx['Uptime'])), + (v2c.ObjectIdentifier('1.3.6.1.6.3.1.1.4.1.0'), v2c.ObjectIdentifier(ctx['TrapOid']))] ) ctx['pdu'] = pdu + def generator(cbCtx, ast): snmpEngine, ctx = cbCtx return __Generator().preorder((snmpEngine, ctx), ast) # Run SNMP engine + def cbFun(snmpEngine, notificationHandle, errorIndication, pdu, cbCtx): if errorIndication: sys.stderr.write('%s\n' % errorIndication) return errorStatus = v2c.apiPDU.getErrorStatus(pdu) + varBinds = v2c.apiPDU.getVarBinds(pdu) + if errorStatus: errorIndex = v2c.apiPDU.getErrorIndex(pdu) sys.stderr.write( '%s at %s\n' % - ( errorStatus.prettyPrint(), - errorIndex and varBinds[int(errorIndex)-1] or '?' ) + (errorStatus.prettyPrint(), + errorIndex and varBinds[int(errorIndex) - 1] or '?') ) return - for oid, val in v2c.apiPDU.getVarBinds(pdu): - sys.stdout.write('%s\n' % cbCtx['mibViewProxy'].getPrettyOidVal( + for oid, val in varBinds: + sys.stdout.write( + '%s\n' % cbCtx['mibViewProxy'].getPrettyOidVal( cbCtx['mibViewController'], oid, val ) ) diff --git a/scripts/snmpwalk.py b/scripts/snmpwalk.py index 1bdb8ec..ef5a372 100755 --- a/scripts/snmpwalk.py +++ b/scripts/snmpwalk.py @@ -7,31 +7,35 @@ # # GETNEXT command generator # -import sys, time, traceback +import sys +import time +import traceback from pysnmp_apps.cli import main, msgmod, secmod, target, pdu, mibview, base from pysnmp.entity import engine from pysnmp.entity.rfc3413 import cmdgen from pysnmp.proto import rfc1902 from pysnmp import error + def getUsage(): - return "Usage: %s [OPTIONS] \n\ -%s%s%s%s%s%s\ -GETNEXT options:\n\ - -C set various application specific behaviours:\n\ - c: do not check returned OIDs are increasing\n\ - t: display wall-clock time to complete the request\n\ - p: print the number of variables found\n\ -" % (sys.argv[0], - main.getUsage(), - msgmod.getUsage(), - secmod.getUsage(), - mibview.getUsage(), - target.getUsage(), - pdu.getReadUsage()) + return """Usage: %s [OPTIONS] +%s%s%s%s%s%s +GETNEXT options: + -C set various application specific behaviours: + c: do not check returned OIDs are increasing + t: display wall-clock time to complete the request + p: print the number of variables found +""" % (sys.argv[0], + main.getUsage(), + msgmod.getUsage(), + secmod.getUsage(), + mibview.getUsage(), + target.getUsage(), + pdu.getReadUsage()) # Construct c/l interpreter for this app + class Scanner(msgmod.MPScannerMixIn, secmod.SMScannerMixIn, mibview.MibViewScannerMixIn, @@ -43,6 +47,7 @@ def t_appopts(self, s): r' -C ' self.rv.append(base.ConfigToken('appopts')) + class Parser(msgmod.MPParserMixIn, secmod.SMParserMixIn, mibview.MibViewParserMixIn, @@ -58,6 +63,7 @@ def p_appOptions(self, args): ApplicationOption ::= appopts string ''' + class __Generator(base.GeneratorTemplate): def n_ApplicationOption(self, cbCtx, node): snmpEngine, ctx = cbCtx @@ -75,32 +81,36 @@ def n_ApplicationOption(self, cbCtx, node): else: raise error.PySnmpError('bad -C option - "%s"' % c) + def generator(cbCtx, ast): snmpEngine, ctx = cbCtx return __Generator().preorder((snmpEngine, ctx), ast) # Run SNMP engine + def cbFun(snmpEngine, sendRequestHandle, errorIndication, errorStatus, errorIndex, varBindTable, cbCtx): if errorIndication: - if errorIndication != 'oidNotIncreasing' or \ - not ctx.get('ignoreNonIncreasingOids'): + if (errorIndication != 'oidNotIncreasing' or + not ctx.get('ignoreNonIncreasingOids')): sys.stderr.write('Error: %s\n' % errorIndication) return if errorStatus: sys.stderr.write( '%s at %s\n' % - ( errorStatus.prettyPrint(), - errorIndex and varBindTable[0][int(errorIndex)-1] or '?' ) + (errorStatus.prettyPrint(), + errorIndex and varBindTable[0][int(errorIndex) - 1] or '?') ) return for varBindRow in varBindTable: - colIdx = -1; inTableFlag = 0 + colIdx = -1 + inTableFlag = 0 for oid, val in varBindRow: colIdx += 1 if cbCtx['myHeadVars'][colIdx].isPrefixOf(oid): - sys.stdout.write('%s\n' % cbCtx['mibViewProxy'].getPrettyOidVal( + sys.stdout.write( + '%s\n' % cbCtx['mibViewProxy'].getPrettyOidVal( cbCtx['mibViewController'], oid, val ) ) @@ -108,19 +118,19 @@ def cbFun(snmpEngine, sendRequestHandle, errorIndication, if cbCtx.get('reportFoundVars'): cbCtx['reportFoundVars'] += inTableFlag if not inTableFlag: - return # stop on end-of-table - return 1 # continue walking + return # stop on end-of-table + return 1 # continue walking snmpEngine = engine.SnmpEngine() +ctx = {} + try: # Parse c/l into AST ast = Parser().parse( Scanner().tokenize(' '.join(sys.argv[1:])) ) - ctx = {} - # Apply configuration to SNMP entity main.generator((snmpEngine, ctx), ast) msgmod.generator((snmpEngine, ctx), ast) @@ -130,7 +140,7 @@ def cbFun(snmpEngine, sendRequestHandle, errorIndication, pdu.readPduGenerator((snmpEngine, ctx), ast) generator((snmpEngine, ctx), ast) - ctx['myHeadVars'] = [ rfc1902.ObjectName(x[0]) for x in ctx['varBinds'] ] + ctx['myHeadVars'] = [rfc1902.ObjectName(x[0]) for x in ctx['varBinds']] cmdgen.NextCommandGenerator().sendVarBinds( snmpEngine, diff --git a/setup.py b/setup.py index 4c1e226..5741512 100644 --- a/setup.py +++ b/setup.py @@ -88,7 +88,7 @@ def howto_install_setuptools(): 'author': 'Ilya Etingof', 'author_email': 'etingof@gmail.com', 'url': 'https://github.com/etingof/pysnmp-apps', - 'classifiers': [ x for x in classifiers.split('\n') if x ], + 'classifiers': [x for x in classifiers.split('\n') if x], 'platforms': ['any'], 'license': 'BSD', 'packages': ['pysnmp_apps', 'pysnmp_apps.cli'], @@ -98,7 +98,7 @@ def howto_install_setuptools(): 'scripts/snmpbulkwalk.py', 'scripts/snmptrap.py', 'scripts/snmptranslate.py'] - } + } ) if "py2exe" in sys.argv: