Source code for tooz.drivers.mysql

# -*- coding: utf-8 -*-
#
# Copyright © 2014 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pymysql

import tooz
from tooz import coordination
from tooz.drivers import _retry
from tooz import locking
from tooz import utils


class MySQLLock(locking.Lock):
    """A MySQL based lock."""

    def __init__(self, name, parsed_url, options):
        super(MySQLLock, self).__init__(name)
        self._conn = MySQLDriver.get_connection(parsed_url, options)

    def acquire(self, blocking=True):
        def _acquire(retry=False):
            try:
                with self._conn as cur:
                    cur.execute("SELECT GET_LOCK(%s, 0);", self.name)
                    # Can return NULL on error
                    if cur.fetchone()[0] is 1:
                        return True
            except pymysql.MySQLError as e:
                raise coordination.ToozError(utils.exception_message(e))
            if retry:
                raise _retry.Retry
            else:
                return False

        if blocking is False:
            return _acquire()
        else:
            kwargs = _retry.RETRYING_KWARGS.copy()
            if blocking is not True:
                kwargs['stop_max_delay'] = blocking
            return _retry.Retrying(**kwargs).call(_acquire, retry=True)

    def release(self):
        try:
            with self._conn as cur:
                cur.execute("SELECT RELEASE_LOCK(%s);", self.name)
                return cur.fetchone()[0]
        except pymysql.MySQLError as e:
            raise coordination.ToozError(utils.exception_message(e))


[docs]class MySQLDriver(coordination.CoordinationDriver): """A mysql based driver."""
[docs] def __init__(self, member_id, parsed_url, options): """Initialize the MySQL driver.""" super(MySQLDriver, self).__init__() self._parsed_url = parsed_url self._options = options
def _start(self): self._conn = MySQLDriver.get_connection(self._parsed_url, self._options) def _stop(self): self._conn.close() def get_lock(self, name): return locking.WeakLockHelper( self._parsed_url.geturl(), MySQLLock, name, self._parsed_url, self._options) @staticmethod def watch_join_group(group_id, callback): raise tooz.NotImplemented @staticmethod def unwatch_join_group(group_id, callback): raise tooz.NotImplemented @staticmethod def watch_leave_group(group_id, callback): raise tooz.NotImplemented @staticmethod def unwatch_leave_group(group_id, callback): raise tooz.NotImplemented @staticmethod def watch_elected_as_leader(group_id, callback): raise tooz.NotImplemented @staticmethod def unwatch_elected_as_leader(group_id, callback): raise tooz.NotImplemented @staticmethod def get_connection(parsed_url, options): host = parsed_url.netloc port = parsed_url.port dbname = parsed_url.path[1:] username = parsed_url.username password = parsed_url.password unix_socket = options.get("unix_socket", [None])[-1] try: if unix_socket: return pymysql.Connect(unix_socket=unix_socket, port=port, user=username, passwd=password, database=dbname) else: return pymysql.Connect(host=host, port=port, user=username, passwd=password, database=dbname) except pymysql.err.OperationalError as e: raise coordination.ToozConnectionError(utils.exception_message(e))