001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.plist;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.NoSuchElementException;
025import java.util.Set;
026import java.util.concurrent.atomic.AtomicBoolean;
027import java.util.concurrent.atomic.AtomicReference;
028import org.apache.kahadb.index.ListIndex;
029import org.apache.kahadb.journal.Location;
030import org.apache.kahadb.page.Transaction;
031import org.apache.kahadb.util.ByteSequence;
032import org.apache.kahadb.util.LocationMarshaller;
033import org.apache.kahadb.util.StringMarshaller;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037public class PList extends ListIndex<String, Location> {
038    static final Logger LOG = LoggerFactory.getLogger(PList.class);
039    final PListStore store;
040    private String name;
041    Object indexLock;
042
043    PList(PListStore store) {
044        this.store = store;
045        this.indexLock = store.getIndexLock();
046        setPageFile(store.getPageFile());
047        setKeyMarshaller(StringMarshaller.INSTANCE);
048        setValueMarshaller(LocationMarshaller.INSTANCE);
049    }
050
051    public void setName(String name) {
052        this.name = name;
053    }
054
055    public String getName() {
056        return this.name;
057    }
058
059    void read(DataInput in) throws IOException {
060        setHeadPageId(in.readLong());
061    }
062
063    public void write(DataOutput out) throws IOException {
064        out.writeLong(getHeadPageId());
065    }
066
067    public synchronized void destroy() throws IOException {
068        synchronized (indexLock) {
069            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
070                public void execute(Transaction tx) throws IOException {
071                    clear(tx);
072                    unload(tx);
073                }
074            });
075        }
076    }
077
078    public void addLast(final String id, final ByteSequence bs) throws IOException {
079        final Location location = this.store.write(bs, false);
080        synchronized (indexLock) {
081            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
082                public void execute(Transaction tx) throws IOException {
083                    add(tx, id, location);
084                }
085            });
086        }
087    }
088
089    public void addFirst(final String id, final ByteSequence bs) throws IOException {
090        final Location location = this.store.write(bs, false);
091        synchronized (indexLock) {
092            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
093                public void execute(Transaction tx) throws IOException {
094                    addFirst(tx, id, location);
095                }
096            });
097        }
098    }
099
100    public boolean remove(final String id) throws IOException {
101        final AtomicBoolean result = new AtomicBoolean();
102        synchronized (indexLock) {
103            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
104                public void execute(Transaction tx) throws IOException {
105                    result.set(remove(tx, id) != null);
106                }
107            });
108        }
109        return result.get();
110    }
111
112    public boolean remove(final long position) throws IOException {
113        final AtomicBoolean result = new AtomicBoolean();
114        synchronized (indexLock) {
115            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
116                public void execute(Transaction tx) throws IOException {
117                    Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
118                    if (iterator.hasNext()) {
119                        iterator.next();
120                        iterator.remove();
121                        result.set(true);
122                    } else {
123                        result.set(false);
124                    }
125                }
126            });
127        }
128        return result.get();
129    }
130
131    public PListEntry get(final long position) throws IOException {
132        PListEntry result = null;
133        final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
134        synchronized (indexLock) {
135            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
136                public void execute(Transaction tx) throws IOException {
137                    Iterator<Map.Entry<String, Location>> iterator = iterator(tx, position);
138                    ref.set(iterator.next());
139                }
140            });
141        }
142        if (ref.get() != null) {
143            ByteSequence bs = this.store.getPayload(ref.get().getValue());
144            result = new PListEntry(ref.get().getKey(), bs);
145        }
146        return result;
147    }
148
149    public PListEntry getFirst() throws IOException {
150        PListEntry result = null;
151        final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
152        synchronized (indexLock) {
153            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
154                public void execute(Transaction tx) throws IOException {
155                    ref.set(getFirst(tx));
156                }
157            });
158        }
159        if (ref.get() != null) {
160            ByteSequence bs = this.store.getPayload(ref.get().getValue());
161            result = new PListEntry(ref.get().getKey(), bs);
162        }
163        return result;
164    }
165
166    public PListEntry getLast() throws IOException {
167        PListEntry result = null;
168        final AtomicReference<Map.Entry<String, Location>> ref = new AtomicReference<Map.Entry<String, Location>>();
169        synchronized (indexLock) {
170            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
171                public void execute(Transaction tx) throws IOException {
172                    ref.set(getLast(tx));
173                }
174            });
175        }
176        if (ref.get() != null) {
177            ByteSequence bs = this.store.getPayload(ref.get().getValue());
178            result = new PListEntry(ref.get().getKey(), bs);
179        }
180        return result;
181    }
182
183    public boolean isEmpty() {
184        return size() == 0;
185    }
186
187    public PListIterator iterator() throws IOException {
188        return new PListIterator();
189    }
190
191    public final class PListIterator implements Iterator<PListEntry> {
192        final Iterator<Map.Entry<String, Location>> iterator;
193        final Transaction tx;
194
195        PListIterator() throws IOException {
196            tx = store.pageFile.tx();
197            synchronized (indexLock) {
198                this.iterator = iterator(tx);
199            }
200        }
201
202        @Override
203        public boolean hasNext() {
204            return iterator.hasNext();
205        }
206
207        @Override
208        public PListEntry next() {
209            Map.Entry<String, Location> entry = iterator.next();
210            ByteSequence bs = null;
211            try {
212                bs = store.getPayload(entry.getValue());
213            } catch (IOException unexpected) {
214                NoSuchElementException e = new NoSuchElementException(unexpected.getLocalizedMessage());
215                e.initCause(unexpected);
216                throw e;
217            }
218            return new PListEntry(entry.getKey(), bs);
219        }
220
221        @Override
222        public void remove() {
223            try {
224                synchronized (indexLock) {
225                    tx.execute(new Transaction.Closure<IOException>() {
226                        @Override
227                        public void execute(Transaction tx) throws IOException {
228                            iterator.remove();
229                        }
230                    });
231                }
232            } catch (IOException unexpected) {
233                IllegalStateException e = new IllegalStateException(unexpected);
234                e.initCause(unexpected);
235                throw e;
236            }
237        }
238
239        public void release() {
240            try {
241                tx.rollback();
242            } catch (IOException unexpected) {
243                IllegalStateException e = new IllegalStateException(unexpected);
244                e.initCause(unexpected);
245                throw e;
246            }
247        }
248    }
249
250    public void claimFileLocations(final Set<Integer> candidates) throws IOException {
251        synchronized (indexLock) {
252            if (loaded.get()) {
253                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
254                    public void execute(Transaction tx) throws IOException {
255                        Iterator<Map.Entry<String,Location>> iterator = iterator(tx);
256                        while (iterator.hasNext()) {
257                            Location location = iterator.next().getValue();
258                            candidates.remove(location.getDataFileId());
259                        }
260                    }
261                });
262            }
263        }
264    }
265
266    @Override
267    public String toString() {
268        return name + "[headPageId=" + getHeadPageId()  + ",tailPageId=" + getTailPageId() + ", size=" + size() + "]";
269    }
270}