001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.kahadb.util;
018    
019    import java.io.DataInput;
020    import java.io.DataOutput;
021    import java.io.IOException;
022    import java.util.ArrayList;
023    import java.util.Iterator;
024    import java.util.List;
025    import java.util.NoSuchElementException;
026    
027    /**
028     * Keeps track of a added long values. Collapses ranges of numbers using a
029     * Sequence representation. Use to keep track of received message ids to find
030     * out if a message is duplicate or if there are any missing messages.
031     *
032     * @author chirino
033     */
034    public class SequenceSet extends LinkedNodeList<Sequence> implements Iterable<Long> {
035    
036        public static class Marshaller implements org.apache.kahadb.util.Marshaller<SequenceSet> {
037    
038            public static final Marshaller INSTANCE = new Marshaller();
039    
040            public SequenceSet readPayload(DataInput in) throws IOException {
041                SequenceSet value = new SequenceSet();
042                int count = in.readInt();
043                for (int i = 0; i < count; i++) {
044                    if( in.readBoolean() ) {
045                        Sequence sequence = new Sequence(in.readLong(), in.readLong());
046                        value.addLast(sequence);
047                    } else {
048                        Sequence sequence = new Sequence(in.readLong());
049                        value.addLast(sequence);
050                    }
051                }
052                return value;
053            }
054    
055            public void writePayload(SequenceSet value, DataOutput out) throws IOException {
056                out.writeInt(value.size());
057                Sequence sequence = value.getHead();
058                while (sequence != null ) {
059                    if( sequence.range() > 1 ) {
060                        out.writeBoolean(true);
061                        out.writeLong(sequence.first);
062                        out.writeLong(sequence.last);
063                    } else {
064                        out.writeBoolean(false);
065                        out.writeLong(sequence.first);
066                    }
067                    sequence = sequence.getNext();
068                }
069            }
070    
071            public int getFixedSize() {
072                return -1;
073            }
074    
075            public SequenceSet deepCopy(SequenceSet value) {
076                SequenceSet rc = new SequenceSet();
077                Sequence sequence = value.getHead();
078                while (sequence != null ) {
079                    rc.add(new Sequence(sequence.first, sequence.last));
080                    sequence = sequence.getNext();
081                }
082                return rc;
083            }
084    
085            public boolean isDeepCopySupported() {
086                return true;
087            }
088        }
089    
090        public void add(Sequence value) {
091            // TODO we can probably optimize this a bit
092            for(long i=value.first; i<value.last+1; i++) {
093                add(i);
094            }
095        }
096    
097        /**
098         *
099         * @param value
100         *            the value to add to the list
101         * @return false if the value was a duplicate.
102         */
103        public boolean add(long value) {
104    
105            if (isEmpty()) {
106                addFirst(new Sequence(value));
107                return true;
108            }
109    
110            Sequence sequence = getHead();
111            while (sequence != null) {
112    
113                if (sequence.isAdjacentToLast(value)) {
114                    // grow the sequence...
115                    sequence.last = value;
116                    // it might connect us to the next sequence..
117                    if (sequence.getNext() != null) {
118                        Sequence next = sequence.getNext();
119                        if (next.isAdjacentToFirst(value)) {
120                            // Yep the sequence connected.. so join them.
121                            sequence.last = next.last;
122                            next.unlink();
123                        }
124                    }
125                    return true;
126                }
127    
128                if (sequence.isAdjacentToFirst(value)) {
129                    // grow the sequence...
130                    sequence.first = value;
131    
132                    // it might connect us to the previous
133                    if (sequence.getPrevious() != null) {
134                        Sequence prev = sequence.getPrevious();
135                        if (prev.isAdjacentToLast(value)) {
136                            // Yep the sequence connected.. so join them.
137                            sequence.first = prev.first;
138                            prev.unlink();
139                        }
140                    }
141                    return true;
142                }
143    
144                // Did that value land before this sequence?
145                if (value < sequence.first) {
146                    // Then insert a new entry before this sequence item.
147                    sequence.linkBefore(new Sequence(value));
148                    return true;
149                }
150    
151                // Did that value land within the sequence? The it's a duplicate.
152                if (sequence.contains(value)) {
153                    return false;
154                }
155    
156                sequence = sequence.getNext();
157            }
158    
159            // Then the value is getting appended to the tail of the sequence.
160            addLast(new Sequence(value));
161            return true;
162        }
163    
164        /**
165         * Removes the given value from the Sequence set, splitting a
166         * contained sequence if necessary.
167         *
168         * @param value
169         *          The value that should be removed from the SequenceSet.
170         *
171         * @return true if the value was removed from the set, false if there
172         *         was no sequence in the set that contained the given value.
173         */
174        public boolean remove(long value) {
175            Sequence sequence = getHead();
176            while (sequence != null ) {
177                if(sequence.contains(value)) {
178                    if (sequence.range() == 1) {
179                        sequence.unlink();
180                        return true;
181                    } else if (sequence.getFirst() == value) {
182                        sequence.setFirst(value+1);
183                        return true;
184                    } else if (sequence.getLast() == value) {
185                        sequence.setLast(value-1);
186                        return true;
187                    } else {
188                        sequence.linkBefore(new Sequence(sequence.first, value-1));
189                        sequence.linkAfter(new Sequence(value+1, sequence.last));
190                        sequence.unlink();
191                        return true;
192                    }
193                }
194    
195                sequence = sequence.getNext();
196            }
197    
198            return false;
199        }
200    
201        /**
202         * Removes and returns the first element from this list.
203         *
204         * @return the first element from this list.
205         * @throws NoSuchElementException if this list is empty.
206         */
207        public long removeFirst() {
208            if (isEmpty()) {
209                throw new NoSuchElementException();
210            }
211    
212            Sequence rc = removeFirstSequence(1);
213            return rc.first;
214        }
215    
216        /**
217         * Removes and returns the last sequence from this list.
218         *
219         * @return the last sequence from this list or null if the list is empty.
220         */
221        public Sequence removeLastSequence() {
222            if (isEmpty()) {
223                return null;
224            }
225    
226            Sequence rc = getTail();
227            rc.unlink();
228            return rc;
229        }
230    
231        /**
232         * Removes and returns the first sequence that is count range large.
233         *
234         * @return a sequence that is count range large, or null if no sequence is that large in the list.
235         */
236        public Sequence removeFirstSequence(long count) {
237            if (isEmpty()) {
238                return null;
239            }
240    
241            Sequence sequence = getHead();
242            while (sequence != null ) {
243                if (sequence.range() == count ) {
244                    sequence.unlink();
245                    return sequence;
246                }
247                if (sequence.range() > count ) {
248                    Sequence rc = new Sequence(sequence.first, sequence.first+count-1);
249                    sequence.first+=count;
250                    return rc;
251                }
252                sequence = sequence.getNext();
253            }
254            return null;
255        }
256    
257        /**
258         * @return all the id Sequences that are missing from this set that are not
259         *         in between the range provided.
260         */
261        public List<Sequence> getMissing(long first, long last) {
262            ArrayList<Sequence> rc = new ArrayList<Sequence>();
263            if (first > last) {
264                throw new IllegalArgumentException("First cannot be more than last");
265            }
266            if (isEmpty()) {
267                // We are missing all the messages.
268                rc.add(new Sequence(first, last));
269                return rc;
270            }
271    
272            Sequence sequence = getHead();
273            while (sequence != null && first <= last) {
274                if (sequence.contains(first)) {
275                    first = sequence.last + 1;
276                } else {
277                    if (first < sequence.first) {
278                        if (last < sequence.first) {
279                            rc.add(new Sequence(first, last));
280                            return rc;
281                        } else {
282                            rc.add(new Sequence(first, sequence.first - 1));
283                            first = sequence.last + 1;
284                        }
285                    }
286                }
287                sequence = sequence.getNext();
288            }
289    
290            if (first <= last) {
291                rc.add(new Sequence(first, last));
292            }
293            return rc;
294        }
295    
296        /**
297         * @return all the Sequence that are in this list
298         */
299        public List<Sequence> getReceived() {
300            ArrayList<Sequence> rc = new ArrayList<Sequence>(size());
301            Sequence sequence = getHead();
302            while (sequence != null) {
303                rc.add(new Sequence(sequence.first, sequence.last));
304                sequence = sequence.getNext();
305            }
306            return rc;
307        }
308    
309        /**
310         * Returns true if the value given is contained within one of the
311         * sequences held in this set.
312         *
313         * @param value
314         *      The value to search for in the set.
315         *
316         * @return true if the value is contained in the set.
317         */
318        public boolean contains(long value) {
319            if (isEmpty()) {
320                return false;
321            }
322    
323            Sequence sequence = getHead();
324            while (sequence != null) {
325                if (sequence.contains(value)) {
326                    return true;
327                }
328                sequence = sequence.getNext();
329            }
330    
331            return false;
332        }
333    
334        public boolean contains(int first, int last) {
335            if (isEmpty()) {
336                return false;
337            }
338            Sequence sequence = getHead();
339            while (sequence != null) {
340                if (sequence.first <= first && first <= sequence.last ) {
341                    return last <= sequence.last;
342                }
343                sequence = sequence.getNext();
344            }
345            return false;
346        }
347    
348        /**
349         * Computes the size of this Sequence by summing the values of all
350         * the contained sequences.
351         *
352         * @return the total number of values contained in this set if it
353         *         were to be iterated over like an array.
354         */
355        public long rangeSize() {
356            long result = 0;
357            Sequence sequence = getHead();
358            while (sequence != null) {
359                result += sequence.range();
360                sequence = sequence.getNext();
361            }
362            return result;
363        }
364    
365        public Iterator<Long> iterator() {
366            return new SequenceIterator();
367        }
368    
369        private class SequenceIterator implements Iterator<Long> {
370    
371            private Sequence currentEntry;
372            private long lastReturned = -1;
373    
374            public SequenceIterator() {
375                currentEntry = getHead();
376                if (currentEntry != null) {
377                    lastReturned = currentEntry.first - 1;
378                }
379            }
380    
381            public boolean hasNext() {
382                return currentEntry != null;
383            }
384    
385            public Long next() {
386                if (currentEntry == null) {
387                    throw new NoSuchElementException();
388                }
389    
390                if(lastReturned < currentEntry.first) {
391                    lastReturned = currentEntry.first;
392                    if (currentEntry.range() == 1) {
393                        currentEntry = currentEntry.getNext();
394                    }
395                } else {
396                    lastReturned++;
397                    if (lastReturned == currentEntry.last) {
398                        currentEntry = currentEntry.getNext();
399                    }
400                }
401    
402                return lastReturned;
403            }
404    
405            public void remove() {
406                throw new UnsupportedOperationException();
407            }
408    
409        }
410    
411    }