001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.group;
018
019import org.apache.activemq.command.ConsumerId;
020
021/**
022 * Uses hash-code buckets to associate consumers with sets of message group IDs.
023 * 
024 * 
025 */
026public class MessageGroupHashBucket implements MessageGroupMap {
027
028    private final int bucketCount;
029    private final ConsumerId[] consumers;
030
031    public MessageGroupHashBucket(int bucketCount) {
032        this.bucketCount = bucketCount;
033        this.consumers = new ConsumerId[bucketCount];
034    }
035
036    public void put(String groupId, ConsumerId consumerId) {
037        int bucket = getBucketNumber(groupId);
038        consumers[bucket] = consumerId;
039    }
040
041    public ConsumerId get(String groupId) {
042        int bucket = getBucketNumber(groupId);
043        return consumers[bucket];
044    }
045
046    public ConsumerId removeGroup(String groupId) {
047        int bucket = getBucketNumber(groupId);
048        ConsumerId answer = consumers[bucket];
049        consumers[bucket] = null;
050        return answer;
051    }
052
053    public MessageGroupSet removeConsumer(ConsumerId consumerId) {
054        MessageGroupSet answer = null;
055        for (int i = 0; i < consumers.length; i++) {
056            ConsumerId owner = consumers[i];
057            if (owner != null && owner.equals(consumerId)) {
058                answer = createMessageGroupSet(i, answer);
059                consumers[i] = null;
060            }
061        }
062        if (answer == null) {
063            // make an empty set
064            answer = EmptyMessageGroupSet.INSTANCE;
065        }
066        return answer;
067    }
068
069    public String toString() {
070        int count = 0;
071        for (int i = 0; i < consumers.length; i++) {
072            if (consumers[i] != null) {
073                count++;
074            }
075        }
076        return "active message group buckets: " + count;
077    }
078
079    protected MessageGroupSet createMessageGroupSet(int bucketNumber, final MessageGroupSet parent) {
080        final MessageGroupSet answer = createMessageGroupSet(bucketNumber);
081        if (parent == null) {
082            return answer;
083        } else {
084            // union the two sets together
085            return new MessageGroupSet() {
086                public boolean contains(String groupID) {
087                    return parent.contains(groupID) || answer.contains(groupID);
088                }
089            };
090        }
091    }
092
093    protected MessageGroupSet createMessageGroupSet(final int bucketNumber) {
094        return new MessageGroupSet() {
095            public boolean contains(String groupID) {
096                int bucket = getBucketNumber(groupID);
097                return bucket == bucketNumber;
098            }
099        };
100    }
101
102    protected int getBucketNumber(String groupId) {
103        int bucket = groupId.hashCode() % bucketCount;
104        // bucket could be negative
105        if (bucket < 0) {
106            bucket *= -1;
107        }
108        return bucket;
109    }
110}