001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker;
018
019import org.apache.activemq.broker.region.Destination;
020import org.apache.activemq.broker.region.Region;
021import org.apache.activemq.command.Message;
022import org.apache.activemq.command.MessageId;
023import org.apache.activemq.state.ProducerState;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027import java.io.IOException;
028import java.util.concurrent.atomic.AtomicLong;
029
030/**
031 * Holds internal state in the broker for a MessageProducer
032 * 
033 * 
034 */
035public class ProducerBrokerExchange {
036
037    private static final Logger LOG = LoggerFactory.getLogger(ProducerBrokerExchange.class);
038    private ConnectionContext connectionContext;
039    private Destination regionDestination;
040    private Region region;
041    private ProducerState producerState;
042    private boolean mutable = true;
043    private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
044    private boolean auditProducerSequenceIds;
045    private boolean isNetworkProducer;
046    private BrokerService brokerService;
047
048    public ProducerBrokerExchange() {
049    }
050
051    public ProducerBrokerExchange copy() {
052        ProducerBrokerExchange rc = new ProducerBrokerExchange();
053        rc.connectionContext = connectionContext.copy();
054        rc.regionDestination = regionDestination;
055        rc.region = region;
056        rc.producerState = producerState;
057        rc.mutable = mutable;
058        return rc;
059    }
060
061    
062    /**
063     * @return the connectionContext
064     */
065    public ConnectionContext getConnectionContext() {
066        return this.connectionContext;
067    }
068
069    /**
070     * @param connectionContext the connectionContext to set
071     */
072    public void setConnectionContext(ConnectionContext connectionContext) {
073        this.connectionContext = connectionContext;
074    }
075
076    /**
077     * @return the mutable
078     */
079    public boolean isMutable() {
080        return this.mutable;
081    }
082
083    /**
084     * @param mutable the mutable to set
085     */
086    public void setMutable(boolean mutable) {
087        this.mutable = mutable;
088    }
089
090    /**
091     * @return the regionDestination
092     */
093    public Destination getRegionDestination() {
094        return this.regionDestination;
095    }
096
097    /**
098     * @param regionDestination the regionDestination to set
099     */
100    public void setRegionDestination(Destination regionDestination) {
101        this.regionDestination = regionDestination;
102    }
103
104    /**
105     * @return the region
106     */
107    public Region getRegion() {
108        return this.region;
109    }
110
111    /**
112     * @param region the region to set
113     */
114    public void setRegion(Region region) {
115        this.region = region;
116    }
117
118    /**
119     * @return the producerState
120     */
121    public ProducerState getProducerState() {
122        return this.producerState;
123    }
124
125    /**
126     * @param producerState the producerState to set
127     */
128    public void setProducerState(ProducerState producerState) {
129        this.producerState = producerState;
130    }
131
132    /**
133     * Enforce duplicate suppression using info from persistence adapter
134     * @param messageSend
135     * @return false if message should be ignored as a duplicate
136     */
137    public boolean canDispatch(Message messageSend) {
138        boolean canDispatch = true;
139        if (auditProducerSequenceIds && messageSend.isPersistent()) {
140            final long producerSequenceId = messageSend.getMessageId().getProducerSequenceId();
141            if (isNetworkProducer) {
142                //  messages are multiplexed on this producer so we need to query the persistenceAdapter
143                long lastStoredForMessageProducer = getStoredSequenceIdForMessage(messageSend.getMessageId());
144                if (producerSequenceId <= lastStoredForMessageProducer) {
145                    canDispatch = false;
146                    if (LOG.isDebugEnabled()) {
147                        LOG.debug("suppressing duplicate message send  [" + (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()) + "] from network producer with producerSequenceId ["
148                                + producerSequenceId + "] less than last stored: "  + lastStoredForMessageProducer);
149                    }
150                }
151            } else if (producerSequenceId <= lastSendSequenceNumber.get()) {
152                canDispatch = false;
153                if (LOG.isDebugEnabled()) {
154                    LOG.debug("suppressing duplicate message send [" + (LOG.isTraceEnabled() ? messageSend : messageSend.getMessageId()) + "] with producerSequenceId ["
155                            + producerSequenceId + "] less than last stored: "  + lastSendSequenceNumber);
156                }
157            } else {
158                // track current so we can suppress duplicates later in the stream
159                lastSendSequenceNumber.set(producerSequenceId);
160            }
161        }
162        return canDispatch;
163    }
164
165    private long getStoredSequenceIdForMessage(MessageId messageId) {
166        try {
167            return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
168       } catch (IOException ignored) {
169            LOG.debug("Failed to determine last producer sequence id for: " +messageId, ignored);
170        }
171        return -1;
172    }
173
174    public void setLastStoredSequenceId(long l) {
175        auditProducerSequenceIds = true;
176        if (connectionContext.isNetworkConnection()) {
177            brokerService = connectionContext.getBroker().getBrokerService();
178            isNetworkProducer = true;
179        }
180        lastSendSequenceNumber.set(l);
181        LOG.debug("last stored sequence id set: " + l);
182    }
183}