001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.store.amq;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.command.ActiveMQDestination;
025import org.apache.activemq.command.JournalTopicAck;
026import org.apache.activemq.command.Message;
027import org.apache.activemq.command.MessageAck;
028import org.apache.activemq.kaha.impl.async.Location;
029import org.apache.activemq.util.ByteSequence;
030import org.apache.activemq.wireformat.WireFormat;
031
032/**
033 */
034public class AMQTxOperation {
035
036    public static final byte ADD_OPERATION_TYPE = 0;
037    public static final byte REMOVE_OPERATION_TYPE = 1;
038    public static final byte ACK_OPERATION_TYPE = 3;
039    private byte operationType;
040    private ActiveMQDestination destination;
041    private Object data;
042    private Location location;
043
044    public AMQTxOperation() {
045    }
046
047    public AMQTxOperation(byte operationType, ActiveMQDestination destination, Object data, Location location) {
048        this.operationType = operationType;
049        this.destination = destination;
050        this.data = data;
051        this.location = location;
052
053    }
054
055    /**
056     * @return the data
057     */
058    public Object getData() {
059        return this.data;
060    }
061
062    /**
063     * @param data the data to set
064     */
065    public void setData(Object data) {
066        this.data = data;
067    }
068
069    /**
070     * @return the location
071     */
072    public Location getLocation() {
073        return this.location;
074    }
075
076    /**
077     * @param location the location to set
078     */
079    public void setLocation(Location location) {
080        this.location = location;
081    }
082
083    /**
084     * @return the operationType
085     */
086    public byte getOperationType() {
087        return this.operationType;
088    }
089
090    /**
091     * @param operationType the operationType to set
092     */
093    public void setOperationType(byte operationType) {
094        this.operationType = operationType;
095    }
096
097    public boolean replay(AMQPersistenceAdapter adapter, ConnectionContext context) throws IOException {
098        boolean result = false;
099        AMQMessageStore store = (AMQMessageStore)adapter.createMessageStore(destination);
100        if (operationType == ADD_OPERATION_TYPE) {
101            result = store.replayAddMessage(context, (Message)data, location);
102        } else if (operationType == REMOVE_OPERATION_TYPE) {
103            result = store.replayRemoveMessage(context, (MessageAck)data);
104        } else {
105            JournalTopicAck ack = (JournalTopicAck)data;
106            result = ((AMQTopicMessageStore)store).replayAcknowledge(context, ack.getClientId(), ack
107                .getSubscritionName(), ack.getMessageId());
108        }
109        return result;
110    }
111
112    public void writeExternal(WireFormat wireFormat, DataOutput dos) throws IOException {
113        location.writeExternal(dos);
114        ByteSequence packet = wireFormat.marshal(getData());
115        dos.writeInt(packet.length);
116        dos.write(packet.data, packet.offset, packet.length);
117        packet = wireFormat.marshal(destination);
118        dos.writeInt(packet.length);
119        dos.write(packet.data, packet.offset, packet.length);
120    }
121
122    public void readExternal(WireFormat wireFormat, DataInput dis) throws IOException {
123        this.location = new Location();
124        this.location.readExternal(dis);
125        int size = dis.readInt();
126        byte[] data = new byte[size];
127        dis.readFully(data);
128        setData(wireFormat.unmarshal(new ByteSequence(data)));
129        size = dis.readInt();
130        data = new byte[size];
131        dis.readFully(data);
132        this.destination = (ActiveMQDestination)wireFormat.unmarshal(new ByteSequence(data));
133    }
134}