PYTHON 10
Memcache.py Guest on 24th September 2020 03:02:04 PM
  1. #!/usr/bin/env python
  2.  
  3. """client module for memcached (memory cache daemon)
  4.  
  5. Overview
  6. ========
  7.  
  8. See U{the MemCached homepage<http://www.danga.com/memcached>} for more
  9. about memcached.
  10.  
  11. Usage summary
  12. =============
  13.  
  14. This should give you a feel for how this module operates::
  15.  
  16.    import memcache
  17.    mc = memcache.Client(['127.0.0.1:11211'], debug=0)
  18.  
  19.    mc.set("some_key", "Some value")
  20.    value = mc.get("some_key")
  21.  
  22.    mc.set("another_key", 3)
  23.    mc.delete("another_key")
  24.  
  25.    mc.set("key", "1") # note that the key used for incr/decr must be
  26.                       # a string.
  27.    mc.incr("key")
  28.    mc.decr("key")
  29.  
  30. The standard way to use memcache with a database is like this:
  31.  
  32.    key = derive_key(obj)
  33.    obj = mc.get(key)
  34.    if not obj:
  35.        obj = backend_api.get(...)
  36.        mc.set(key, obj)
  37.  
  38.    # we now have obj, and future passes through this code
  39.    # will use the object from the cache.
  40.  
  41. Detailed Documentation
  42. ======================
  43.  
  44. More detailed documentation is available in the L{Client} class.
  45.  
  46. """
  47.  
  48. from __future__ import print_function
  49.  
  50. import binascii
  51. import os
  52. import re
  53. import socket
  54. import sys
  55. import threading
  56. import time
  57. import zlib
  58.  
  59. import six
  60.  
  61. if six.PY2:
  62.     # With Python 2, the faster C implementation has to be imported explicitly.
  63.     import cPickle as pickle
  64. else:
  65.     import pickle
  66.  
  67.  
  68. def cmemcache_hash(key):
  69.     return (
  70.         (((binascii.crc32(key) & 0xffffffff)
  71.           >> 16) & 0x7fff) or 1)
  72. serverHashFunction = cmemcache_hash
  73.  
  74.  
  75. def useOldServerHashFunction():
  76.     """Use the old python-memcache server hash function."""
  77.     global serverHashFunction
  78.     serverHashFunction = binascii.crc32
  79.  
  80. from io import BytesIO
  81. if six.PY2:
  82.     try:
  83.         unicode
  84.     except NameError:
  85.         _has_unicode = False
  86.     else:
  87.         _has_unicode = True
  88. else:
  89.     _has_unicode = True
  90.  
  91. _str_cls = six.string_types
  92.  
  93. valid_key_chars_re = re.compile(b'[\x21-\x7e\x80-\xff]+$')
  94.  
  95.  
  96. #  Original author: Evan Martin of Danga Interactive
  97. __author__ = "Sean Reifschneider <[email protected]>"
  98. __version__ = "1.58"
  99. __copyright__ = "Copyright (C) 2003 Danga Interactive"
  100. #  http://en.wikipedia.org/wiki/Python_Software_Foundation_License
  101. __license__ = "Python Software Foundation License"
  102.  
  103. SERVER_MAX_KEY_LENGTH = 250
  104. # Storing values larger than 1MB requires starting memcached with -I <size> for
  105. # memcached >= 1.4.2 or recompiling for < 1.4.2. If you do, this value can be
  106. # changed by doing "memcache.SERVER_MAX_VALUE_LENGTH = N" after importing this
  107. # module.
  108. SERVER_MAX_VALUE_LENGTH = 1024 * 1024
  109.  
  110.  
  111. class _Error(Exception):
  112.     pass
  113.  
  114.  
  115. class _ConnectionDeadError(Exception):
  116.     pass
  117.  
  118.  
  119. _DEAD_RETRY = 30  # number of seconds before retrying a dead server.
  120. _SOCKET_TIMEOUT = 3  # number of seconds before sockets timeout.
  121.  
  122.  
  123. class Client(threading.local):
  124.     """Object representing a pool of memcache servers.
  125.  
  126.    See L{memcache} for an overview.
  127.  
  128.    In all cases where a key is used, the key can be either:
  129.        1. A simple hashable type (string, integer, etc.).
  130.        2. A tuple of C{(hashvalue, key)}.  This is useful if you want
  131.        to avoid making this module calculate a hash value.  You may
  132.        prefer, for example, to keep all of a given user's objects on
  133.        the same memcache server, so you could use the user's unique
  134.        id as the hash value.
  135.  
  136.  
  137.    @group Setup: __init__, set_servers, forget_dead_hosts,
  138.    disconnect_all, debuglog
  139.    @group Insertion: set, add, replace, set_multi
  140.    @group Retrieval: get, get_multi
  141.    @group Integers: incr, decr
  142.    @group Removal: delete, delete_multi
  143.    @sort: __init__, set_servers, forget_dead_hosts, disconnect_all,
  144.           debuglog,\ set, set_multi, add, replace, get, get_multi,
  145.           incr, decr, delete, delete_multi
  146.    """
  147.     _FLAG_PICKLE = 1 << 0
  148.     _FLAG_INTEGER = 1 << 1
  149.     _FLAG_LONG = 1 << 2
  150.     _FLAG_COMPRESSED = 1 << 3
  151.  
  152.     _SERVER_RETRIES = 10  # how many times to try finding a free server.
  153.  
  154.     # exceptions for Client
  155.     class MemcachedKeyError(Exception):
  156.         pass
  157.  
  158.     class MemcachedKeyLengthError(MemcachedKeyError):
  159.         pass
  160.  
  161.     class MemcachedKeyCharacterError(MemcachedKeyError):
  162.         pass
  163.  
  164.     class MemcachedKeyNoneError(MemcachedKeyError):
  165.         pass
  166.  
  167.     class MemcachedKeyTypeError(MemcachedKeyError):
  168.         pass
  169.  
  170.     class MemcachedStringEncodingError(Exception):
  171.         pass
  172.  
  173.     def __init__(self, servers, debug=0, pickleProtocol=0,
  174.                  pickler=pickle.Pickler, unpickler=pickle.Unpickler,
  175.                  compressor=zlib.compress, decompressor=zlib.decompress,
  176.                  pload=None, pid=None,
  177.                  server_max_key_length=None, server_max_value_length=None,
  178.                  dead_retry=_DEAD_RETRY, socket_timeout=_SOCKET_TIMEOUT,
  179.                  cache_cas=False, flush_on_reconnect=0, check_keys=True):
  180.         """Create a new Client object with the given list of servers.
  181.  
  182.        @param servers: C{servers} is passed to L{set_servers}.
  183.        @param debug: whether to display error messages when a server
  184.        can't be contacted.
  185.        @param pickleProtocol: number to mandate protocol used by
  186.        (c)Pickle.
  187.        @param pickler: optional override of default Pickler to allow
  188.        subclassing.
  189.        @param unpickler: optional override of default Unpickler to
  190.        allow subclassing.
  191.        @param pload: optional persistent_load function to call on
  192.        pickle loading.  Useful for cPickle since subclassing isn't
  193.        allowed.
  194.        @param pid: optional persistent_id function to call on pickle
  195.        storing.  Useful for cPickle since subclassing isn't allowed.
  196.        @param dead_retry: number of seconds before retrying a
  197.        blacklisted server. Default to 30 s.
  198.        @param socket_timeout: timeout in seconds for all calls to a
  199.        server. Defaults to 3 seconds.
  200.        @param cache_cas: (default False) If true, cas operations will
  201.        be cached.  WARNING: This cache is not expired internally, if
  202.        you have a long-running process you will need to expire it
  203.        manually via client.reset_cas(), or the cache can grow
  204.        unlimited.
  205.        @param server_max_key_length: (default SERVER_MAX_KEY_LENGTH)
  206.        Data that is larger than this will not be sent to the server.
  207.        @param server_max_value_length: (default
  208.        SERVER_MAX_VALUE_LENGTH) Data that is larger than this will
  209.        not be sent to the server.
  210.        @param flush_on_reconnect: optional flag which prevents a
  211.        scenario that can cause stale data to be read: If there's more
  212.        than one memcached server and the connection to one is
  213.        interrupted, keys that mapped to that server will get
  214.        reassigned to another. If the first server comes back, those
  215.        keys will map to it again. If it still has its data, get()s
  216.        can read stale data that was overwritten on another
  217.        server. This flag is off by default for backwards
  218.        compatibility.
  219.        @param check_keys: (default True) If True, the key is checked
  220.        to ensure it is the correct length and composed of the right
  221.        characters.
  222.        """
  223.         super(Client, self).__init__()
  224.         self.debug = debug
  225.         self.dead_retry = dead_retry
  226.         self.socket_timeout = socket_timeout
  227.         self.flush_on_reconnect = flush_on_reconnect
  228.         self.set_servers(servers)
  229.         self.stats = {}
  230.         self.cache_cas = cache_cas
  231.         self.reset_cas()
  232.         self.do_check_key = check_keys
  233.  
  234.         # Allow users to modify pickling/unpickling behavior
  235.         self.pickleProtocol = pickleProtocol
  236.         self.pickler = pickler
  237.         self.unpickler = unpickler
  238.         self.compressor = compressor
  239.         self.decompressor = decompressor
  240.         self.persistent_load = pload
  241.         self.persistent_id = pid
  242.         self.server_max_key_length = server_max_key_length
  243.         if self.server_max_key_length is None:
  244.             self.server_max_key_length = SERVER_MAX_KEY_LENGTH
  245.         self.server_max_value_length = server_max_value_length
  246.         if self.server_max_value_length is None:
  247.             self.server_max_value_length = SERVER_MAX_VALUE_LENGTH
  248.  
  249.         #  figure out the pickler style
  250.         file = BytesIO()
  251.         try:
  252.             pickler = self.pickler(file, protocol=self.pickleProtocol)
  253.             self.picklerIsKeyword = True
  254.         except TypeError:
  255.             self.picklerIsKeyword = False
  256.  
  257.     def _encode_key(self, key):
  258.         if isinstance(key, tuple):
  259.             if isinstance(key[1], six.text_type):
  260.                 return (key[0], key[1].encode('utf8'))
  261.         elif isinstance(key, six.text_type):
  262.             return key.encode('utf8')
  263.         return key
  264.  
  265.     def _encode_cmd(self, cmd, key, headers, noreply, *args):
  266.         cmd_bytes = cmd.encode('utf-8') if six.PY3 else cmd
  267.         fullcmd = [cmd_bytes, b' ', key]
  268.  
  269.         if headers:
  270.             if six.PY3:
  271.                 headers = headers.encode('utf-8')
  272.             fullcmd.append(b' ')
  273.             fullcmd.append(headers)
  274.  
  275.         if noreply:
  276.             fullcmd.append(b' noreply')
  277.  
  278.         if args:
  279.             fullcmd.append(b' ')
  280.             fullcmd.extend(args)
  281.         return b''.join(fullcmd)
  282.  
  283.     def reset_cas(self):
  284.         """Reset the cas cache.
  285.  
  286.        This is only used if the Client() object was created with
  287.        "cache_cas=True".  If used, this cache does not expire
  288.        internally, so it can grow unbounded if you do not clear it
  289.        yourself.
  290.        """
  291.         self.cas_ids = {}
  292.  
  293.     def set_servers(self, servers):
  294.         """Set the pool of servers used by this client.
  295.  
  296.        @param servers: an array of servers.
  297.        Servers can be passed in two forms:
  298.            1. Strings of the form C{"host:port"}, which implies a
  299.            default weight of 1.
  300.            2. Tuples of the form C{("host:port", weight)}, where
  301.            C{weight} is an integer weight value.
  302.  
  303.        """
  304.         self.servers = [_Host(s, self.debug, dead_retry=self.dead_retry,
  305.                               socket_timeout=self.socket_timeout,
  306.                               flush_on_reconnect=self.flush_on_reconnect)
  307.                         for s in servers]
  308.         self._init_buckets()
  309.  
  310.     def get_stats(self, stat_args=None):
  311.         """Get statistics from each of the servers.
  312.  
  313.        @param stat_args: Additional arguments to pass to the memcache
  314.            "stats" command.
  315.  
  316.        @return: A list of tuples ( server_identifier,
  317.            stats_dictionary ).  The dictionary contains a number of
  318.            name/value pairs specifying the name of the status field
  319.            and the string value associated with it.  The values are
  320.            not converted from strings.
  321.        """
  322.         data = []
  323.         for s in self.servers:
  324.             if not s.connect():
  325.                 continue
  326.             if s.family == socket.AF_INET:
  327.                 name = '%s:%s (%s)' % (s.ip, s.port, s.weight)
  328.             elif s.family == socket.AF_INET6:
  329.                 name = '[%s]:%s (%s)' % (s.ip, s.port, s.weight)
  330.             else:
  331.                 name = 'unix:%s (%s)' % (s.address, s.weight)
  332.             if not stat_args:
  333.                 s.send_cmd('stats')
  334.             else:
  335.                 s.send_cmd('stats ' + stat_args)
  336.             serverData = {}
  337.             data.append((name, serverData))
  338.             readline = s.readline
  339.             while 1:
  340.                 line = readline()
  341.                 if not line or line.decode('ascii').strip() == 'END':
  342.                     break
  343.                 stats = line.decode('ascii').split(' ', 2)
  344.                 serverData[stats[1]] = stats[2]
  345.  
  346.         return(data)
  347.  
  348.     def get_slab_stats(self):
  349.         data = []
  350.         for s in self.servers:
  351.             if not s.connect():
  352.                 continue
  353.             if s.family == socket.AF_INET:
  354.                 name = '%s:%s (%s)' % (s.ip, s.port, s.weight)
  355.             elif s.family == socket.AF_INET6:
  356.                 name = '[%s]:%s (%s)' % (s.ip, s.port, s.weight)
  357.             else:
  358.                 name = 'unix:%s (%s)' % (s.address, s.weight)
  359.             serverData = {}
  360.             data.append((name, serverData))
  361.             s.send_cmd('stats slabs')
  362.             readline = s.readline
  363.             while 1:
  364.                 line = readline()
  365.                 if not line or line.strip() == 'END':
  366.                     break
  367.                 item = line.split(' ', 2)
  368.                 if line.startswith('STAT active_slabs') or line.startswith('STAT total_malloced'):
  369.                     serverData[item[1]]=item[2]
  370.                 else:
  371.                     # 0 = STAT, 1 = ITEM, 2 = Value
  372.                     slab = item[1].split(':', 2)
  373.                     # 0 = Slab #, 1 = Name
  374.                     if slab[0] not in serverData:
  375.                         serverData[slab[0]] = {}
  376.                     serverData[slab[0]][slab[1]] = item[2]
  377.         return data
  378.  
  379.     def get_slabs(self):
  380.         data = []
  381.         for s in self.servers:
  382.             if not s.connect():
  383.                 continue
  384.             if s.family == socket.AF_INET:
  385.                 name = '%s:%s (%s)' % (s.ip, s.port, s.weight)
  386.             elif s.family == socket.AF_INET6:
  387.                 name = '[%s]:%s (%s)' % (s.ip, s.port, s.weight)
  388.             else:
  389.                 name = 'unix:%s (%s)' % (s.address, s.weight)
  390.             serverData = {}
  391.             data.append((name, serverData))
  392.             s.send_cmd('stats items')
  393.             readline = s.readline
  394.             while 1:
  395.                 line = readline()
  396.                 if not line or line.strip() == 'END':
  397.                     break
  398.                 item = line.split(' ', 2)
  399.                 # 0 = STAT, 1 = ITEM, 2 = Value
  400.                 slab = item[1].split(':', 2)
  401.                 # 0 = items, 1 = Slab #, 2 = Name
  402.                 if slab[1] not in serverData:
  403.                     serverData[slab[1]] = {}
  404.                 serverData[slab[1]][slab[2]] = item[2]
  405.         return data
  406.  
  407.     def flush_all(self):
  408.         """Expire all data in memcache servers that are reachable."""
  409.         for s in self.servers:
  410.             if not s.connect():
  411.                 continue
  412.             s.flush()
  413.  
  414.     def debuglog(self, str):
  415.         if self.debug:
  416.             sys.stderr.write("MemCached: %s\n" % str)
  417.  
  418.     def _statlog(self, func):
  419.         if func not in self.stats:
  420.             self.stats[func] = 1
  421.         else:
  422.             self.stats[func] += 1
  423.  
  424.     def forget_dead_hosts(self):
  425.         """Reset every host in the pool to an "alive" state."""
  426.         for s in self.servers:
  427.             s.deaduntil = 0
  428.  
  429.     def _init_buckets(self):
  430.         self.buckets = []
  431.         for server in self.servers:
  432.             for i in range(server.weight):
  433.                 self.buckets.append(server)
  434.  
  435.     def _get_server(self, key):
  436.         if isinstance(key, tuple):
  437.             serverhash, key = key
  438.         else:
  439.             serverhash = serverHashFunction(key)
  440.  
  441.         if not self.buckets:
  442.             return None, None
  443.  
  444.         for i in range(Client._SERVER_RETRIES):
  445.             server = self.buckets[serverhash % len(self.buckets)]
  446.             if server.connect():
  447.                 # print("(using server %s)" % server,)
  448.                 return server, key
  449.             serverhash = str(serverhash) + str(i)
  450.             if isinstance(serverhash, six.text_type):
  451.                 serverhash = serverhash.encode('ascii')
  452.             serverhash = serverHashFunction(serverhash)
  453.         return None, None
  454.  
  455.     def disconnect_all(self):
  456.         for s in self.servers:
  457.             s.close_socket()
  458.  
  459.     def delete_multi(self, keys, time=None, key_prefix='', noreply=False):
  460.         """Delete multiple keys in the memcache doing just one query.
  461.  
  462.        >>> notset_keys = mc.set_multi({'a1' : 'val1', 'a2' : 'val2'})
  463.        >>> mc.get_multi(['a1', 'a2']) == {'a1' : 'val1','a2' : 'val2'}
  464.        1
  465.        >>> mc.delete_multi(['key1', 'key2'])
  466.        1
  467.        >>> mc.get_multi(['key1', 'key2']) == {}
  468.        1
  469.  
  470.        This method is recommended over iterated regular L{delete}s as
  471.        it reduces total latency, since your app doesn't have to wait
  472.        for each round-trip of L{delete} before sending the next one.
  473.  
  474.        @param keys: An iterable of keys to clear
  475.        @param time: number of seconds any subsequent set / update
  476.        commands should fail. Defaults to 0 for no delay.
  477.        @param key_prefix: Optional string to prepend to each key when
  478.            sending to memcache.  See docs for L{get_multi} and
  479.            L{set_multi}.
  480.        @param noreply: optional parameter instructs the server to not send the
  481.            reply.
  482.        @return: 1 if no failure in communication with any memcacheds.
  483.        @rtype: int
  484.        """
  485.  
  486.         self._statlog('delete_multi')
  487.  
  488.         server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(
  489.             keys, key_prefix)
  490.  
  491.         # send out all requests on each server before reading anything
  492.         dead_servers = []
  493.  
  494.         rc = 1
  495.         for server in six.iterkeys(server_keys):
  496.             bigcmd = []
  497.             write = bigcmd.append
  498.             if time is not None:
  499.                 headers = str(time)
  500.             else:
  501.                 headers = None
  502.             for key in server_keys[server]:  # These are mangled keys
  503.                 cmd = self._encode_cmd('delete', key, headers, noreply, b'\r\n')
  504.                 write(cmd)
  505.             try:
  506.                 server.send_cmds(b''.join(bigcmd))
  507.             except socket.error as msg:
  508.                 rc = 0
  509.                 if isinstance(msg, tuple):
  510.                     msg = msg[1]
  511.                 server.mark_dead(msg)
  512.                 dead_servers.append(server)
  513.  
  514.         # if noreply, just return
  515.         if noreply:
  516.             return rc
  517.  
  518.         # if any servers died on the way, don't expect them to respond.
  519.         for server in dead_servers:
  520.             del server_keys[server]
  521.  
  522.         for server, keys in six.iteritems(server_keys):
  523.             try:
  524.                 for key in keys:
  525.                     server.expect(b"DELETED")
  526.             except socket.error as msg:
  527.                 if isinstance(msg, tuple):
  528.                     msg = msg[1]
  529.                 server.mark_dead(msg)
  530.                 rc = 0
  531.         return rc
  532.  
  533.     def delete(self, key, time=None, noreply=False):
  534.         '''Deletes a key from the memcache.
  535.  
  536.        @return: Nonzero on success.
  537.        @param time: number of seconds any subsequent set / update commands
  538.        should fail. Defaults to None for no delay.
  539.        @param noreply: optional parameter instructs the server to not send the
  540.            reply.
  541.        @rtype: int
  542.        '''
  543.         return self._deletetouch([b'DELETED', b'NOT_FOUND'], "delete", key,
  544.                                  time, noreply)
  545.  
  546.     def touch(self, key, time=0, noreply=False):
  547.         '''Updates the expiration time of a key in memcache.
  548.  
  549.        @return: Nonzero on success.
  550.        @param time: Tells memcached the time which this value should
  551.            expire, either as a delta number of seconds, or an absolute
  552.            unix time-since-the-epoch value. See the memcached protocol
  553.            docs section "Storage Commands" for more info on <exptime>. We
  554.            default to 0 == cache forever.
  555.        @param noreply: optional parameter instructs the server to not send the
  556.            reply.
  557.        @rtype: int
  558.        '''
  559.         return self._deletetouch([b'TOUCHED'], "touch", key, time, noreply)
  560.  
  561.     def _deletetouch(self, expected, cmd, key, time=0, noreply=False):
  562.         key = self._encode_key(key)
  563.         if self.do_check_key:
  564.             self.check_key(key)
  565.         server, key = self._get_server(key)
  566.         if not server:
  567.             return 0
  568.         self._statlog(cmd)
  569.         if time is not None and time != 0:
  570.             headers = str(time)
  571.         else:
  572.             headers = None
  573.         fullcmd = self._encode_cmd(cmd, key, headers, noreply)
  574.  
  575.         try:
  576.             server.send_cmd(fullcmd)
  577.             if noreply:
  578.                 return 1
  579.             line = server.readline()
  580.             if line and line.strip() in expected:
  581.                 return 1
  582.             self.debuglog('%s expected %s, got: %r'
  583.                           % (cmd, ' or '.join(expected), line))
  584.         except socket.error as msg:
  585.             if isinstance(msg, tuple):
  586.                 msg = msg[1]
  587.             server.mark_dead(msg)
  588.         return 0
  589.  
  590.     def incr(self, key, delta=1, noreply=False):
  591.         """Increment value for C{key} by C{delta}
  592.  
  593.        Sends a command to the server to atomically increment the
  594.        value for C{key} by C{delta}, or by 1 if C{delta} is
  595.        unspecified.  Returns None if C{key} doesn't exist on server,
  596.        otherwise it returns the new value after incrementing.
  597.  
  598.        Note that the value for C{key} must already exist in the
  599.        memcache, and it must be the string representation of an
  600.        integer.
  601.  
  602.        >>> mc.set("counter", "20")  # returns 1, indicating success
  603.        1
  604.        >>> mc.incr("counter")
  605.        21
  606.        >>> mc.incr("counter")
  607.        22
  608.  
  609.        Overflow on server is not checked.  Be aware of values
  610.        approaching 2**32.  See L{decr}.
  611.  
  612.        @param delta: Integer amount to increment by (should be zero
  613.        or greater).
  614.  
  615.        @param noreply: optional parameter instructs the server to not send the
  616.        reply.
  617.  
  618.        @return: New value after incrementing, no None for noreply or error.
  619.        @rtype: int
  620.        """
  621.         return self._incrdecr("incr", key, delta, noreply)
  622.  
  623.     def decr(self, key, delta=1, noreply=False):
  624.         """Decrement value for C{key} by C{delta}
  625.  
  626.        Like L{incr}, but decrements.  Unlike L{incr}, underflow is
  627.        checked and new values are capped at 0.  If server value is 1,
  628.        a decrement of 2 returns 0, not -1.
  629.  
  630.        @param delta: Integer amount to decrement by (should be zero
  631.        or greater).
  632.  
  633.        @param noreply: optional parameter instructs the server to not send the
  634.        reply.
  635.  
  636.        @return: New value after decrementing,  or None for noreply or error.
  637.        @rtype: int
  638.        """
  639.         return self._incrdecr("decr", key, delta, noreply)
  640.  
  641.     def _incrdecr(self, cmd, key, delta, noreply=False):
  642.         key = self._encode_key(key)
  643.         if self.do_check_key:
  644.             self.check_key(key)
  645.         server, key = self._get_server(key)
  646.         if not server:
  647.             return None
  648.         self._statlog(cmd)
  649.         fullcmd = self._encode_cmd(cmd, key, str(delta), noreply)
  650.         try:
  651.             server.send_cmd(fullcmd)
  652.             if noreply:
  653.                 return
  654.             line = server.readline()
  655.             if line is None or line.strip() == b'NOT_FOUND':
  656.                 return None
  657.             return int(line)
  658.         except socket.error as msg:
  659.             if isinstance(msg, tuple):
  660.                 msg = msg[1]
  661.             server.mark_dead(msg)
  662.             return None
  663.  
  664.     def add(self, key, val, time=0, min_compress_len=0, noreply=False):
  665.         '''Add new key with value.
  666.  
  667.        Like L{set}, but only stores in memcache if the key doesn't
  668.        already exist.
  669.  
  670.        @return: Nonzero on success.
  671.        @rtype: int
  672.        '''
  673.         return self._set("add", key, val, time, min_compress_len, noreply)
  674.  
  675.     def append(self, key, val, time=0, min_compress_len=0, noreply=False):
  676.         '''Append the value to the end of the existing key's value.
  677.  
  678.        Only stores in memcache if key already exists.
  679.        Also see L{prepend}.
  680.  
  681.        @return: Nonzero on success.
  682.        @rtype: int
  683.        '''
  684.         return self._set("append", key, val, time, min_compress_len, noreply)
  685.  
  686.     def prepend(self, key, val, time=0, min_compress_len=0, noreply=False):
  687.         '''Prepend the value to the beginning of the existing key's value.
  688.  
  689.        Only stores in memcache if key already exists.
  690.        Also see L{append}.
  691.  
  692.        @return: Nonzero on success.
  693.        @rtype: int
  694.        '''
  695.         return self._set("prepend", key, val, time, min_compress_len, noreply)
  696.  
  697.     def replace(self, key, val, time=0, min_compress_len=0, noreply=False):
  698.         '''Replace existing key with value.
  699.  
  700.        Like L{set}, but only stores in memcache if the key already exists.
  701.        The opposite of L{add}.
  702.  
  703.        @return: Nonzero on success.
  704.        @rtype: int
  705.        '''
  706.         return self._set("replace", key, val, time, min_compress_len, noreply)
  707.  
  708.     def set(self, key, val, time=0, min_compress_len=0, noreply=False):
  709.         '''Unconditionally sets a key to a given value in the memcache.
  710.  
  711.        The C{key} can optionally be an tuple, with the first element
  712.        being the server hash value and the second being the key.  If
  713.        you want to avoid making this module calculate a hash value.
  714.        You may prefer, for example, to keep all of a given user's
  715.        objects on the same memcache server, so you could use the
  716.        user's unique id as the hash value.
  717.  
  718.        @return: Nonzero on success.
  719.        @rtype: int
  720.  
  721.        @param time: Tells memcached the time which this value should
  722.        expire, either as a delta number of seconds, or an absolute
  723.        unix time-since-the-epoch value. See the memcached protocol
  724.        docs section "Storage Commands" for more info on <exptime>. We
  725.        default to 0 == cache forever.
  726.  
  727.        @param min_compress_len: The threshold length to kick in
  728.        auto-compression of the value using the compressor
  729.        routine. If the value being cached is a string, then the
  730.        length of the string is measured, else if the value is an
  731.        object, then the length of the pickle result is measured. If
  732.        the resulting attempt at compression yeilds a larger string
  733.        than the input, then it is discarded. For backwards
  734.        compatability, this parameter defaults to 0, indicating don't
  735.        ever try to compress.
  736.  
  737.        @param noreply: optional parameter instructs the server to not
  738.        send the reply.
  739.        '''
  740.         return self._set("set", key, val, time, min_compress_len, noreply)
  741.  
  742.     def cas(self, key, val, time=0, min_compress_len=0, noreply=False):
  743.         '''Check and set (CAS)
  744.  
  745.        Sets a key to a given value in the memcache if it hasn't been
  746.        altered since last fetched. (See L{gets}).
  747.  
  748.        The C{key} can optionally be an tuple, with the first element
  749.        being the server hash value and the second being the key.  If
  750.        you want to avoid making this module calculate a hash value.
  751.        You may prefer, for example, to keep all of a given user's
  752.        objects on the same memcache server, so you could use the
  753.        user's unique id as the hash value.
  754.  
  755.        @return: Nonzero on success.
  756.        @rtype: int
  757.  
  758.        @param time: Tells memcached the time which this value should
  759.        expire, either as a delta number of seconds, or an absolute
  760.        unix time-since-the-epoch value. See the memcached protocol
  761.        docs section "Storage Commands" for more info on <exptime>. We
  762.        default to 0 == cache forever.
  763.  
  764.        @param min_compress_len: The threshold length to kick in
  765.        auto-compression of the value using the compressor
  766.        routine. If the value being cached is a string, then the
  767.        length of the string is measured, else if the value is an
  768.        object, then the length of the pickle result is measured. If
  769.        the resulting attempt at compression yeilds a larger string
  770.        than the input, then it is discarded. For backwards
  771.        compatability, this parameter defaults to 0, indicating don't
  772.        ever try to compress.
  773.  
  774.        @param noreply: optional parameter instructs the server to not
  775.        send the reply.
  776.        '''
  777.         return self._set("cas", key, val, time, min_compress_len, noreply)
  778.  
  779.     def _map_and_prefix_keys(self, key_iterable, key_prefix):
  780.         """Compute the mapping of server (_Host instance) -> list of keys to
  781.        stuff onto that server, as well as the mapping of prefixed key
  782.        -> original key.
  783.        """
  784.         key_prefix = self._encode_key(key_prefix)
  785.         # Check it just once ...
  786.         key_extra_len = len(key_prefix)
  787.         if key_prefix and self.do_check_key:
  788.             self.check_key(key_prefix)
  789.  
  790.         # server (_Host) -> list of unprefixed server keys in mapping
  791.         server_keys = {}
  792.  
  793.         prefixed_to_orig_key = {}
  794.         # build up a list for each server of all the keys we want.
  795.         for orig_key in key_iterable:
  796.             if isinstance(orig_key, tuple):
  797.                 # Tuple of hashvalue, key ala _get_server(). Caller is
  798.                 # essentially telling us what server to stuff this on.
  799.                 # Ensure call to _get_server gets a Tuple as well.
  800.                 serverhash, key = orig_key
  801.  
  802.                 key = self._encode_key(key)
  803.                 if not isinstance(key, six.binary_type):
  804.                     # set_multi supports int / long keys.
  805.                     key = str(key)
  806.                     if six.PY3:
  807.                         key = key.encode('utf8')
  808.                 bytes_orig_key = key
  809.  
  810.                 # Gotta pre-mangle key before hashing to a
  811.                 # server. Returns the mangled key.
  812.                 server, key = self._get_server(
  813.                     (serverhash, key_prefix + key))
  814.  
  815.                 orig_key = orig_key[1]
  816.             else:
  817.                 key = self._encode_key(orig_key)
  818.                 if not isinstance(key, six.binary_type):
  819.                     # set_multi supports int / long keys.
  820.                     key = str(key)
  821.                     if six.PY3:
  822.                         key = key.encode('utf8')
  823.                 bytes_orig_key = key
  824.                 server, key = self._get_server(key_prefix + key)
  825.  
  826.             #  alert when passed in key is None
  827.             if orig_key is None:
  828.                 self.check_key(orig_key, key_extra_len=key_extra_len)
  829.  
  830.             # Now check to make sure key length is proper ...
  831.             if self.do_check_key:
  832.                 self.check_key(bytes_orig_key, key_extra_len=key_extra_len)
  833.  
  834.             if not server:
  835.                 continue
  836.  
  837.             if server not in server_keys:
  838.                 server_keys[server] = []
  839.             server_keys[server].append(key)
  840.             prefixed_to_orig_key[key] = orig_key
  841.  
  842.         return (server_keys, prefixed_to_orig_key)
  843.  
  844.     def set_multi(self, mapping, time=0, key_prefix='', min_compress_len=0,
  845.                   noreply=False):
  846.         '''Sets multiple keys in the memcache doing just one query.
  847.  
  848.        >>> notset_keys = mc.set_multi({'key1' : 'val1', 'key2' : 'val2'})
  849.        >>> mc.get_multi(['key1', 'key2']) == {'key1' : 'val1',
  850.        ...                                    'key2' : 'val2'}
  851.        1
  852.  
  853.  
  854.        This method is recommended over regular L{set} as it lowers
  855.        the number of total packets flying around your network,
  856.        reducing total latency, since your app doesn't have to wait
  857.        for each round-trip of L{set} before sending the next one.
  858.  
  859.        @param mapping: A dict of key/value pairs to set.
  860.  
  861.        @param time: Tells memcached the time which this value should
  862.            expire, either as a delta number of seconds, or an
  863.            absolute unix time-since-the-epoch value. See the
  864.            memcached protocol docs section "Storage Commands" for
  865.            more info on <exptime>. We default to 0 == cache forever.
  866.  
  867.        @param key_prefix: Optional string to prepend to each key when
  868.            sending to memcache. Allows you to efficiently stuff these
  869.            keys into a pseudo-namespace in memcache:
  870.  
  871.            >> notset_keys = mc.set_multi(
  872.            ...     {'key1' : 'val1', 'key2' : 'val2'},
  873.            ...     key_prefix='subspace_')
  874.            >>> len(notset_keys) == 0
  875.            True
  876.            >>> mc.get_multi(['subspace_key1',
  877.            ...               'subspace_key2']) == {'subspace_key1': 'val1',
  878.            ...                                     'subspace_key2' : 'val2'}
  879.            True
  880.  
  881.            Causes key 'subspace_key1' and 'subspace_key2' to be
  882.            set. Useful in conjunction with a higher-level layer which
  883.            applies namespaces to data in memcache.  In this case, the
  884.            return result would be the list of notset original keys,
  885.            prefix not applied.
  886.  
  887.        @param min_compress_len: The threshold length to kick in
  888.            auto-compression of the value using the compressor
  889.            routine. If the value being cached is a string, then the
  890.            length of the string is measured, else if the value is an
  891.            object, then the length of the pickle result is
  892.            measured. If the resulting attempt at compression yeilds a
  893.            larger string than the input, then it is discarded. For
  894.            backwards compatability, this parameter defaults to 0,
  895.            indicating don't ever try to compress.
  896.  
  897.        @param noreply: optional parameter instructs the server to not
  898.            send the reply.
  899.  
  900.        @return: List of keys which failed to be stored [ memcache out
  901.           of memory, etc. ].
  902.  
  903.        @rtype: list
  904.        '''
  905.         self._statlog('set_multi')
  906.  
  907.         server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(
  908.             six.iterkeys(mapping), key_prefix)
  909.  
  910.         # send out all requests on each server before reading anything
  911.         dead_servers = []
  912.         notstored = []  # original keys.
  913.  
  914.         for server in six.iterkeys(server_keys):
  915.             bigcmd = []
  916.             write = bigcmd.append
  917.             try:
  918.                 for key in server_keys[server]:  # These are mangled keys
  919.                     store_info = self._val_to_store_info(
  920.                         mapping[prefixed_to_orig_key[key]],
  921.                         min_compress_len)
  922.                     if store_info:
  923.                         flags, len_val, val = store_info
  924.                         headers = "%d %d %d" % (flags, time, len_val)
  925.                         fullcmd = self._encode_cmd('set', key, headers,
  926.                                                    noreply,
  927.                                                    b'\r\n', val, b'\r\n')
  928.                         write(fullcmd)
  929.                     else:
  930.                         notstored.append(prefixed_to_orig_key[key])
  931.                 server.send_cmds(b''.join(bigcmd))
  932.             except socket.error as msg:
  933.                 if isinstance(msg, tuple):
  934.                     msg = msg[1]
  935.                 server.mark_dead(msg)
  936.                 dead_servers.append(server)
  937.  
  938.         # if noreply, just return early
  939.         if noreply:
  940.             return notstored
  941.  
  942.         # if any servers died on the way, don't expect them to respond.
  943.         for server in dead_servers:
  944.             del server_keys[server]
  945.  
  946.         #  short-circuit if there are no servers, just return all keys
  947.         if not server_keys:
  948.             return list(mapping.keys())
  949.  
  950.         for server, keys in six.iteritems(server_keys):
  951.             try:
  952.                 for key in keys:
  953.                     if server.readline() == b'STORED':
  954.                         continue
  955.                     else:
  956.                         # un-mangle.
  957.                         notstored.append(prefixed_to_orig_key[key])
  958.             except (_Error, socket.error) as msg:
  959.                 if isinstance(msg, tuple):
  960.                     msg = msg[1]
  961.                 server.mark_dead(msg)
  962.         return notstored
  963.  
  964.     def _val_to_store_info(self, val, min_compress_len):
  965.         """Transform val to a storable representation.
  966.  
  967.        Returns a tuple of the flags, the length of the new value, and
  968.        the new value itself.
  969.        """
  970.         flags = 0
  971.         if isinstance(val, six.binary_type):
  972.             pass
  973.         elif isinstance(val, six.text_type):
  974.             val = val.encode('utf-8')
  975.         elif isinstance(val, int):
  976.             flags |= Client._FLAG_INTEGER
  977.             val = '%d' % val
  978.             if six.PY3:
  979.                 val = val.encode('ascii')
  980.             # force no attempt to compress this silly string.
  981.             min_compress_len = 0
  982.         elif six.PY2 and isinstance(val, long):
  983.             flags |= Client._FLAG_LONG
  984.             val = str(val)
  985.             if six.PY3:
  986.                 val = val.encode('ascii')
  987.             # force no attempt to compress this silly string.
  988.             min_compress_len = 0
  989.         else:
  990.             flags |= Client._FLAG_PICKLE
  991.             file = BytesIO()
  992.             if self.picklerIsKeyword:
  993.                 pickler = self.pickler(file, protocol=self.pickleProtocol)
  994.             else:
  995.                 pickler = self.pickler(file, self.pickleProtocol)
  996.             if self.persistent_id:
  997.                 pickler.persistent_id = self.persistent_id
  998.             pickler.dump(val)
  999.             val = file.getvalue()
  1000.  
  1001.         lv = len(val)
  1002.         # We should try to compress if min_compress_len > 0
  1003.         # and this string is longer than our min threshold.
  1004.         if min_compress_len and lv > min_compress_len:
  1005.             comp_val = self.compressor(val)
  1006.             # Only retain the result if the compression result is smaller
  1007.             # than the original.
  1008.             if len(comp_val) < lv:
  1009.                 flags |= Client._FLAG_COMPRESSED
  1010.                 val = comp_val
  1011.  
  1012.         #  silently do not store if value length exceeds maximum
  1013.         if (self.server_max_value_length != 0 and
  1014.                 len(val) > self.server_max_value_length):
  1015.             return(0)
  1016.  
  1017.         return (flags, len(val), val)
  1018.  
  1019.     def _set(self, cmd, key, val, time, min_compress_len=0, noreply=False):
  1020.         key = self._encode_key(key)
  1021.         if self.do_check_key:
  1022.             self.check_key(key)
  1023.         server, key = self._get_server(key)
  1024.         if not server:
  1025.             return 0
  1026.  
  1027.         def _unsafe_set():
  1028.             self._statlog(cmd)
  1029.  
  1030.             if cmd == 'cas' and key not in self.cas_ids:
  1031.                 return self._set('set', key, val, time, min_compress_len,
  1032.                                  noreply)
  1033.  
  1034.             store_info = self._val_to_store_info(val, min_compress_len)
  1035.             if not store_info:
  1036.                 return(0)
  1037.             flags, len_val, encoded_val = store_info
  1038.  
  1039.             if cmd == 'cas':
  1040.                 headers = ("%d %d %d %d"
  1041.                            % (flags, time, len_val, self.cas_ids[key]))
  1042.             else:
  1043.                 headers = "%d %d %d" % (flags, time, len_val)
  1044.             fullcmd = self._encode_cmd(cmd, key, headers, noreply,
  1045.                                        b'\r\n', encoded_val)
  1046.  
  1047.             try:
  1048.                 server.send_cmd(fullcmd)
  1049.                 if noreply:
  1050.                     return True
  1051.                 return(server.expect(b"STORED", raise_exception=True)
  1052.                        == b"STORED")
  1053.             except socket.error as msg:
  1054.                 if isinstance(msg, tuple):
  1055.                     msg = msg[1]
  1056.                 server.mark_dead(msg)
  1057.             return 0
  1058.  
  1059.         try:
  1060.             return _unsafe_set()
  1061.         except _ConnectionDeadError:
  1062.             # retry once
  1063.             try:
  1064.                 if server._get_socket():
  1065.                     return _unsafe_set()
  1066.             except (_ConnectionDeadError, socket.error) as msg:
  1067.                 server.mark_dead(msg)
  1068.             return 0
  1069.  
  1070.     def _get(self, cmd, key):
  1071.         key = self._encode_key(key)
  1072.         if self.do_check_key:
  1073.             self.check_key(key)
  1074.         server, key = self._get_server(key)
  1075.         if not server:
  1076.             return None
  1077.  
  1078.         def _unsafe_get():
  1079.             self._statlog(cmd)
  1080.  
  1081.             try:
  1082.                 cmd_bytes = cmd.encode('utf-8') if six.PY3 else cmd
  1083.                 fullcmd = b''.join((cmd_bytes, b' ', key))
  1084.                 server.send_cmd(fullcmd)
  1085.                 rkey = flags = rlen = cas_id = None
  1086.  
  1087.                 if cmd == 'gets':
  1088.                     rkey, flags, rlen, cas_id, = self._expect_cas_value(
  1089.                         server, raise_exception=True
  1090.                     )
  1091.                     if rkey and self.cache_cas:
  1092.                         self.cas_ids[rkey] = cas_id
  1093.                 else:
  1094.                     rkey, flags, rlen, = self._expectvalue(
  1095.                         server, raise_exception=True
  1096.                     )
  1097.  
  1098.                 if not rkey:
  1099.                     return None
  1100.                 try:
  1101.                     value = self._recv_value(server, flags, rlen)
  1102.                 finally:
  1103.                     server.expect(b"END", raise_exception=True)
  1104.             except (_Error, socket.error) as msg:
  1105.                 if isinstance(msg, tuple):
  1106.                     msg = msg[1]
  1107.                 server.mark_dead(msg)
  1108.                 return None
  1109.  
  1110.             return value
  1111.  
  1112.         try:
  1113.             return _unsafe_get()
  1114.         except _ConnectionDeadError:
  1115.             # retry once
  1116.             try:
  1117.                 if server.connect():
  1118.                     return _unsafe_get()
  1119.                 return None
  1120.             except (_ConnectionDeadError, socket.error) as msg:
  1121.                 server.mark_dead(msg)
  1122.             return None
  1123.  
  1124.     def get(self, key):
  1125.         '''Retrieves a key from the memcache.
  1126.  
  1127.        @return: The value or None.
  1128.        '''
  1129.         return self._get('get', key)
  1130.  
  1131.     def gets(self, key):
  1132.         '''Retrieves a key from the memcache. Used in conjunction with 'cas'.
  1133.  
  1134.        @return: The value or None.
  1135.        '''
  1136.         return self._get('gets', key)
  1137.  
  1138.     def get_multi(self, keys, key_prefix=''):
  1139.         '''Retrieves multiple keys from the memcache doing just one query.
  1140.  
  1141.        >>> success = mc.set("foo", "bar")
  1142.        >>> success = mc.set("baz", 42)
  1143.        >>> mc.get_multi(["foo", "baz", "foobar"]) == {
  1144.        ...     "foo": "bar", "baz": 42
  1145.        ... }
  1146.        1
  1147.        >>> mc.set_multi({'k1' : 1, 'k2' : 2}, key_prefix='pfx_') == []
  1148.        1
  1149.  
  1150.        This looks up keys 'pfx_k1', 'pfx_k2', ... . Returned dict
  1151.        will just have unprefixed keys 'k1', 'k2'.
  1152.  
  1153.        >>> mc.get_multi(['k1', 'k2', 'nonexist'],
  1154.        ...              key_prefix='pfx_') == {'k1' : 1, 'k2' : 2}
  1155.        1
  1156.  
  1157.        get_mult [ and L{set_multi} ] can take str()-ables like ints /
  1158.        longs as keys too. Such as your db pri key fields.  They're
  1159.        rotored through str() before being passed off to memcache,
  1160.        with or without the use of a key_prefix.  In this mode, the
  1161.        key_prefix could be a table name, and the key itself a db
  1162.        primary key number.
  1163.  
  1164.        >>> mc.set_multi({42: 'douglass adams',
  1165.        ...               46: 'and 2 just ahead of me'},
  1166.        ...              key_prefix='numkeys_') == []
  1167.        1
  1168.        >>> mc.get_multi([46, 42], key_prefix='numkeys_') == {
  1169.        ...     42: 'douglass adams',
  1170.        ...     46: 'and 2 just ahead of me'
  1171.        ... }
  1172.        1
  1173.  
  1174.        This method is recommended over regular L{get} as it lowers
  1175.        the number of total packets flying around your network,
  1176.        reducing total latency, since your app doesn't have to wait
  1177.        for each round-trip of L{get} before sending the next one.
  1178.  
  1179.        See also L{set_multi}.
  1180.  
  1181.        @param keys: An array of keys.
  1182.  
  1183.        @param key_prefix: A string to prefix each key when we
  1184.        communicate with memcache.  Facilitates pseudo-namespaces
  1185.        within memcache. Returned dictionary keys will not have this
  1186.        prefix.
  1187.  
  1188.        @return: A dictionary of key/value pairs that were
  1189.        available. If key_prefix was provided, the keys in the retured
  1190.        dictionary will not have it present.
  1191.        '''
  1192.  
  1193.         self._statlog('get_multi')
  1194.  
  1195.         server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(
  1196.             keys, key_prefix)
  1197.  
  1198.         # send out all requests on each server before reading anything
  1199.         dead_servers = []
  1200.         for server in six.iterkeys(server_keys):
  1201.             try:
  1202.                 fullcmd = b"get " + b" ".join(server_keys[server])
  1203.                 server.send_cmd(fullcmd)
  1204.             except socket.error as msg:
  1205.                 if isinstance(msg, tuple):
  1206.                     msg = msg[1]
  1207.                 server.mark_dead(msg)
  1208.                 dead_servers.append(server)
  1209.  
  1210.         # if any servers died on the way, don't expect them to respond.
  1211.         for server in dead_servers:
  1212.             del server_keys[server]
  1213.  
  1214.         retvals = {}
  1215.         for server in six.iterkeys(server_keys):
  1216.             try:
  1217.                 line = server.readline()
  1218.                 while line and line != b'END':
  1219.                     rkey, flags, rlen = self._expectvalue(server, line)
  1220.                     #  Bo Yang reports that this can sometimes be None
  1221.                     if rkey is not None:
  1222.                         val = self._recv_value(server, flags, rlen)
  1223.                         # un-prefix returned key.
  1224.                         retvals[prefixed_to_orig_key[rkey]] = val
  1225.                     line = server.readline()
  1226.             except (_Error, socket.error) as msg:
  1227.                 if isinstance(msg, tuple):
  1228.                     msg = msg[1]
  1229.                 server.mark_dead(msg)
  1230.         return retvals
  1231.  
  1232.     def _expect_cas_value(self, server, line=None, raise_exception=False):
  1233.         if not line:
  1234.             line = server.readline(raise_exception)
  1235.  
  1236.         if line and line[:5] == b'VALUE':
  1237.             resp, rkey, flags, len, cas_id = line.split()
  1238.             return (rkey, int(flags), int(len), int(cas_id))
  1239.         else:
  1240.             return (None, None, None, None)
  1241.  
  1242.     def _expectvalue(self, server, line=None, raise_exception=False):
  1243.         if not line:
  1244.             line = server.readline(raise_exception)
  1245.  
  1246.         if line and line[:5] == b'VALUE':
  1247.             resp, rkey, flags, len = line.split()
  1248.             flags = int(flags)
  1249.             rlen = int(len)
  1250.             return (rkey, flags, rlen)
  1251.         else:
  1252.             return (None, None, None)
  1253.  
  1254.     def _recv_value(self, server, flags, rlen):
  1255.         rlen += 2  # include \r\n
  1256.         buf = server.recv(rlen)
  1257.         if len(buf) != rlen:
  1258.             raise _Error("received %d bytes when expecting %d"
  1259.                          % (len(buf), rlen))
  1260.  
  1261.         if len(buf) == rlen:
  1262.             buf = buf[:-2]  # strip \r\n
  1263.  
  1264.         if flags & Client._FLAG_COMPRESSED:
  1265.             buf = self.decompressor(buf)
  1266.             flags &= ~Client._FLAG_COMPRESSED
  1267.  
  1268.         if flags == 0:
  1269.             # Bare string
  1270.             if six.PY3:
  1271.                 val = buf.decode('utf8')
  1272.             else:
  1273.                 val = buf
  1274.         elif flags & Client._FLAG_INTEGER:
  1275.             val = int(buf)
  1276.         elif flags & Client._FLAG_LONG:
  1277.             if six.PY3:
  1278.                 val = int(buf)
  1279.             else:
  1280.                 val = long(buf)
  1281.         elif flags & Client._FLAG_PICKLE:
  1282.             try:
  1283.                 file = BytesIO(buf)
  1284.                 unpickler = self.unpickler(file)
  1285.                 if self.persistent_load:
  1286.                     unpickler.persistent_load = self.persistent_load
  1287.                 val = unpickler.load()
  1288.             except Exception as e:
  1289.                 self.debuglog('Pickle error: %s\n' % e)
  1290.                 return None
  1291.         else:
  1292.             self.debuglog("unknown flags on get: %x\n" % flags)
  1293.             raise ValueError('Unknown flags on get: %x' % flags)
  1294.  
  1295.         return val
  1296.  
  1297.     def check_key(self, key, key_extra_len=0):
  1298.         """Checks sanity of key.
  1299.  
  1300.            Fails if:
  1301.  
  1302.            Key length is > SERVER_MAX_KEY_LENGTH (Raises MemcachedKeyLength).
  1303.            Contains control characters  (Raises MemcachedKeyCharacterError).
  1304.            Is not a string (Raises MemcachedStringEncodingError)
  1305.            Is an unicode string (Raises MemcachedStringEncodingError)
  1306.            Is not a string (Raises MemcachedKeyError)
  1307.            Is None (Raises MemcachedKeyError)
  1308.        """
  1309.         if isinstance(key, tuple):
  1310.             key = key[1]
  1311.         if key is None:
  1312.             raise Client.MemcachedKeyNoneError("Key is None")
  1313.         if key is '':
  1314.             if key_extra_len is 0:
  1315.                 raise Client.MemcachedKeyNoneError("Key is empty")
  1316.  
  1317.             #  key is empty but there is some other component to key
  1318.             return
  1319.  
  1320.         if not isinstance(key, six.binary_type):
  1321.             raise Client.MemcachedKeyTypeError("Key must be a binary string")
  1322.  
  1323.         if (self.server_max_key_length != 0 and
  1324.                 len(key) + key_extra_len > self.server_max_key_length):
  1325.             raise Client.MemcachedKeyLengthError(
  1326.                 "Key length is > %s" % self.server_max_key_length
  1327.             )
  1328.         if not valid_key_chars_re.match(key):
  1329.             raise Client.MemcachedKeyCharacterError(
  1330.                 "Control/space characters not allowed (key=%r)" % key)
  1331.  
  1332.  
  1333. class _Host(object):
  1334.  
  1335.     def __init__(self, host, debug=0, dead_retry=_DEAD_RETRY,
  1336.                  socket_timeout=_SOCKET_TIMEOUT, flush_on_reconnect=0):
  1337.         self.dead_retry = dead_retry
  1338.         self.socket_timeout = socket_timeout
  1339.         self.debug = debug
  1340.         self.flush_on_reconnect = flush_on_reconnect
  1341.         if isinstance(host, tuple):
  1342.             host, self.weight = host
  1343.         else:
  1344.             self.weight = 1
  1345.  
  1346.         #  parse the connection string
  1347.         m = re.match(r'^(?P<proto>unix):(?P<path>.*)$', host)
  1348.         if not m:
  1349.             m = re.match(r'^(?P<proto>inet6):'
  1350.                          r'\[(?P<host>[^\[\]]+)\](:(?P<port>[0-9]+))?$', host)
  1351.         if not m:
  1352.             m = re.match(r'^(?P<proto>inet):'
  1353.                          r'(?P<host>[^:]+)(:(?P<port>[0-9]+))?$', host)
  1354.         if not m:
  1355.             m = re.match(r'^(?P<host>[^:]+)(:(?P<port>[0-9]+))?$', host)
  1356.         if not m:
  1357.             raise ValueError('Unable to parse connection string: "%s"' % host)
  1358.  
  1359.         hostData = m.groupdict()
  1360.         if hostData.get('proto') == 'unix':
  1361.             self.family = socket.AF_UNIX
  1362.             self.address = hostData['path']
  1363.         elif hostData.get('proto') == 'inet6':
  1364.             self.family = socket.AF_INET6
  1365.             self.ip = hostData['host']
  1366.             self.port = int(hostData.get('port') or 11211)
  1367.             self.address = (self.ip, self.port)
  1368.         else:
  1369.             self.family = socket.AF_INET
  1370.             self.ip = hostData['host']
  1371.             self.port = int(hostData.get('port') or 11211)
  1372.             self.address = (self.ip, self.port)
  1373.  
  1374.         self.deaduntil = 0
  1375.         self.socket = None
  1376.         self.flush_on_next_connect = 0
  1377.  
  1378.         self.buffer = b''
  1379.  
  1380.     def debuglog(self, str):
  1381.         if self.debug:
  1382.             sys.stderr.write("MemCached: %s\n" % str)
  1383.  
  1384.     def _check_dead(self):
  1385.         if self.deaduntil and self.deaduntil > time.time():
  1386.             return 1
  1387.         self.deaduntil = 0
  1388.         return 0
  1389.  
  1390.     def connect(self):
  1391.         if self._get_socket():
  1392.             return 1
  1393.         return 0
  1394.  
  1395.     def mark_dead(self, reason):
  1396.         self.debuglog("MemCache: %s: %s.  Marking dead." % (self, reason))
  1397.         self.deaduntil = time.time() + self.dead_retry
  1398.         if self.flush_on_reconnect:
  1399.             self.flush_on_next_connect = 1
  1400.         self.close_socket()
  1401.  
  1402.     def _get_socket(self):
  1403.         if self._check_dead():
  1404.             return None
  1405.         if self.socket:
  1406.             return self.socket
  1407.         s = socket.socket(self.family, socket.SOCK_STREAM)
  1408.         if hasattr(s, 'settimeout'):
  1409.             s.settimeout(self.socket_timeout)
  1410.         try:
  1411.             s.connect(self.address)
  1412.         except socket.timeout as msg:
  1413.             self.mark_dead("connect: %s" % msg)
  1414.             return None
  1415.         except socket.error as msg:
  1416.             if isinstance(msg, tuple):
  1417.                 msg = msg[1]
  1418.             self.mark_dead("connect: %s" % msg)
  1419.             return None
  1420.         self.socket = s
  1421.         self.buffer = b''
  1422.         if self.flush_on_next_connect:
  1423.             self.flush()
  1424.             self.flush_on_next_connect = 0
  1425.         return s
  1426.  
  1427.     def close_socket(self):
  1428.         if self.socket:
  1429.             self.socket.close()
  1430.             self.socket = None
  1431.  
  1432.     def send_cmd(self, cmd):
  1433.         if isinstance(cmd, six.text_type):
  1434.             cmd = cmd.encode('utf8')
  1435.         self.socket.sendall(cmd + b'\r\n')
  1436.  
  1437.     def send_cmds(self, cmds):
  1438.         """cmds already has trailing \r\n's applied."""
  1439.         if isinstance(cmds, six.text_type):
  1440.             cmds = cmds.encode('utf8')
  1441.         self.socket.sendall(cmds)
  1442.  
  1443.     def readline(self, raise_exception=False):
  1444.         """Read a line and return it.
  1445.  
  1446.        If "raise_exception" is set, raise _ConnectionDeadError if the
  1447.        read fails, otherwise return an empty string.
  1448.        """
  1449.         buf = self.buffer
  1450.         if self.socket:
  1451.             recv = self.socket.recv
  1452.         else:
  1453.             recv = lambda bufsize: b''
  1454.  
  1455.         while True:
  1456.             index = buf.find(b'\r\n')
  1457.             if index >= 0:
  1458.                 break
  1459.             data = recv(4096)
  1460.             if not data:
  1461.                 # connection close, let's kill it and raise
  1462.                 self.mark_dead('connection closed in readline()')
  1463.                 if raise_exception:
  1464.                     raise _ConnectionDeadError()
  1465.                 else:
  1466.                     return ''
  1467.  
  1468.             buf += data
  1469.         self.buffer = buf[index + 2:]
  1470.         return buf[:index]
  1471.  
  1472.     def expect(self, text, raise_exception=False):
  1473.         line = self.readline(raise_exception)
  1474.         if self.debug and line != text:
  1475.             if six.PY3:
  1476.                 text = text.decode('utf8')
  1477.                 log_line = line.decode('utf8', 'replace')
  1478.             else:
  1479.                 log_line = line
  1480.             self.debuglog("while expecting %r, got unexpected response %r"
  1481.                           % (text, log_line))
  1482.         return line
  1483.  
  1484.     def recv(self, rlen):
  1485.         self_socket_recv = self.socket.recv
  1486.         buf = self.buffer
  1487.         while len(buf) < rlen:
  1488.             foo = self_socket_recv(max(rlen - len(buf), 4096))
  1489.             buf += foo
  1490.             if not foo:
  1491.                 raise _Error('Read %d bytes, expecting %d, '
  1492.                              'read returned 0 length bytes' % (len(buf), rlen))
  1493.         self.buffer = buf[rlen:]
  1494.         return buf[:rlen]
  1495.  
  1496.     def flush(self):
  1497.         self.send_cmd('flush_all')
  1498.         self.expect(b'OK')
  1499.  
  1500.     def __str__(self):
  1501.         d = ''
  1502.         if self.deaduntil:
  1503.             d = " (dead until %d)" % self.deaduntil
  1504.  
  1505.         if self.family == socket.AF_INET:
  1506.             return "inet:%s:%d%s" % (self.address[0], self.address[1], d)
  1507.         elif self.family == socket.AF_INET6:
  1508.             return "inet6:[%s]:%d%s" % (self.address[0], self.address[1], d)
  1509.         else:
  1510.             return "unix:%s%s" % (self.address, d)
  1511.  
  1512.  
  1513. def _doctest():
  1514.     import doctest
  1515.     import memcache
  1516.     servers = ["127.0.0.1:11211"]
  1517.     mc = memcache.Client(servers, debug=1)
  1518.     globs = {"mc": mc}
  1519.     results = doctest.testmod(memcache, globs=globs)
  1520.     mc.disconnect_all()
  1521.     print("Doctests: %s" % (results,))
  1522.     if results.failed:
  1523.         sys.exit(1)
  1524.  
  1525.  
  1526. # vim: ts=4 sw=4 et :

Paste is for source code and general debugging text.

Login or Register to edit, delete and keep track of your pastes and more.

Raw Paste

Login or Register to edit or fork this paste. It's free.