001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.kaha.impl.container; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.Collection; 022import java.util.Iterator; 023import java.util.Map; 024import java.util.Set; 025 026import org.apache.activemq.kaha.ContainerId; 027import org.apache.activemq.kaha.IndexMBean; 028import org.apache.activemq.kaha.MapContainer; 029import org.apache.activemq.kaha.Marshaller; 030import org.apache.activemq.kaha.RuntimeStoreException; 031import org.apache.activemq.kaha.Store; 032import org.apache.activemq.kaha.StoreEntry; 033import org.apache.activemq.kaha.StoreLocation; 034import org.apache.activemq.kaha.impl.DataManager; 035import org.apache.activemq.kaha.impl.data.Item; 036import org.apache.activemq.kaha.impl.index.Index; 037import org.apache.activemq.kaha.impl.index.IndexItem; 038import org.apache.activemq.kaha.impl.index.IndexLinkedList; 039import org.apache.activemq.kaha.impl.index.IndexManager; 040import org.apache.activemq.kaha.impl.index.VMIndex; 041import org.apache.activemq.kaha.impl.index.hash.HashIndex; 042import org.apache.activemq.util.IOHelper; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Implementation of a MapContainer 048 * 049 * 050 */ 051public final class MapContainerImpl extends BaseContainerImpl implements MapContainer { 052 053 private static final Logger LOG = LoggerFactory.getLogger(MapContainerImpl.class); 054 protected Index index; 055 protected Marshaller keyMarshaller = Store.OBJECT_MARSHALLER; 056 protected Marshaller valueMarshaller = Store.OBJECT_MARSHALLER; 057 protected File directory; 058 private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE; 059 private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE; 060 private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE; 061 private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY; 062 private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR; 063 064 public MapContainerImpl(File directory, ContainerId id, IndexItem root, IndexManager indexManager, 065 DataManager dataManager, boolean persistentIndex) { 066 super(id, root, indexManager, dataManager, persistentIndex); 067 this.directory = directory; 068 } 069 070 public synchronized void init() { 071 super.init(); 072 if (index == null) { 073 if (persistentIndex) { 074 String name = containerId.getDataContainerName() + "_" + containerId.getKey(); 075 try { 076 HashIndex hashIndex = new HashIndex(directory, name, indexManager); 077 hashIndex.setNumberOfBins(getIndexBinSize()); 078 hashIndex.setKeySize(getIndexKeySize()); 079 hashIndex.setPageSize(getIndexPageSize()); 080 hashIndex.setMaximumCapacity(getIndexMaxBinSize()); 081 hashIndex.setLoadFactor(getIndexLoadFactor()); 082 this.index = hashIndex; 083 } catch (IOException e) { 084 LOG.error("Failed to create HashIndex", e); 085 throw new RuntimeException(e); 086 } 087 } else { 088 this.index = new VMIndex(indexManager); 089 } 090 } 091 index.setKeyMarshaller(keyMarshaller); 092 } 093 094 /* 095 * (non-Javadoc) 096 * 097 * @see org.apache.activemq.kaha.MapContainer#load() 098 */ 099 public synchronized void load() { 100 checkClosed(); 101 if (!loaded) { 102 if (!loaded) { 103 loaded = true; 104 try { 105 init(); 106 index.load(); 107 long nextItem = root.getNextItem(); 108 while (nextItem != Item.POSITION_NOT_SET) { 109 IndexItem item = indexManager.getIndex(nextItem); 110 StoreLocation data = item.getKeyDataItem(); 111 Object key = dataManager.readItem(keyMarshaller, data); 112 if (index.isTransient()) { 113 index.store(key, item); 114 } 115 indexList.add(item); 116 nextItem = item.getNextItem(); 117 } 118 } catch (IOException e) { 119 LOG.error("Failed to load container " + getId(), e); 120 throw new RuntimeStoreException(e); 121 } 122 } 123 } 124 } 125 126 /* 127 * (non-Javadoc) 128 * 129 * @see org.apache.activemq.kaha.MapContainer#unload() 130 */ 131 public synchronized void unload() { 132 checkClosed(); 133 if (loaded) { 134 loaded = false; 135 try { 136 index.unload(); 137 } catch (IOException e) { 138 LOG.warn("Failed to unload the index", e); 139 } 140 indexList.clear(); 141 } 142 } 143 144 public synchronized void delete() { 145 unload(); 146 try { 147 index.delete(); 148 } catch (IOException e) { 149 LOG.warn("Failed to unload the index", e); 150 } 151 } 152 153 154 public synchronized void setKeyMarshaller(Marshaller keyMarshaller) { 155 checkClosed(); 156 this.keyMarshaller = keyMarshaller; 157 if (index != null) { 158 index.setKeyMarshaller(keyMarshaller); 159 } 160 } 161 162 public synchronized void setValueMarshaller(Marshaller valueMarshaller) { 163 checkClosed(); 164 this.valueMarshaller = valueMarshaller; 165 } 166 167 /* 168 * (non-Javadoc) 169 * 170 * @see org.apache.activemq.kaha.MapContainer#size() 171 */ 172 public synchronized int size() { 173 load(); 174 return indexList.size(); 175 } 176 177 /* 178 * (non-Javadoc) 179 * 180 * @see org.apache.activemq.kaha.MapContainer#isEmpty() 181 */ 182 public synchronized boolean isEmpty() { 183 load(); 184 return indexList.isEmpty(); 185 } 186 187 /* 188 * (non-Javadoc) 189 * 190 * @see org.apache.activemq.kaha.MapContainer#containsKey(java.lang.Object) 191 */ 192 public synchronized boolean containsKey(Object key) { 193 load(); 194 try { 195 return index.containsKey(key); 196 } catch (IOException e) { 197 LOG.error("Failed trying to find key: " + key, e); 198 throw new RuntimeException(e); 199 } 200 } 201 202 /* 203 * (non-Javadoc) 204 * 205 * @see org.apache.activemq.kaha.MapContainer#get(java.lang.Object) 206 */ 207 public synchronized Object get(Object key) { 208 load(); 209 Object result = null; 210 StoreEntry item = null; 211 try { 212 item = index.get(key); 213 } catch (IOException e) { 214 LOG.error("Failed trying to get key: " + key, e); 215 throw new RuntimeException(e); 216 } 217 if (item != null) { 218 result = getValue(item); 219 } 220 return result; 221 } 222 223 /** 224 * Get the StoreEntry associated with the key 225 * 226 * @param key 227 * @return the StoreEntry 228 */ 229 public synchronized StoreEntry getEntry(Object key) { 230 load(); 231 StoreEntry item = null; 232 try { 233 item = index.get(key); 234 } catch (IOException e) { 235 LOG.error("Failed trying to get key: " + key, e); 236 throw new RuntimeException(e); 237 } 238 return item; 239 } 240 241 /* 242 * (non-Javadoc) 243 * 244 * @see org.apache.activemq.kaha.MapContainer#containsValue(java.lang.Object) 245 */ 246 public synchronized boolean containsValue(Object o) { 247 load(); 248 boolean result = false; 249 if (o != null) { 250 IndexItem item = indexList.getFirst(); 251 while (item != null) { 252 Object value = getValue(item); 253 if (value != null && value.equals(o)) { 254 result = true; 255 break; 256 } 257 item = indexList.getNextEntry(item); 258 } 259 } 260 return result; 261 } 262 263 /* 264 * (non-Javadoc) 265 * 266 * @see org.apache.activemq.kaha.MapContainer#putAll(java.util.Map) 267 */ 268 public synchronized void putAll(Map t) { 269 load(); 270 if (t != null) { 271 for (Iterator i = t.entrySet().iterator(); i.hasNext();) { 272 Map.Entry entry = (Map.Entry)i.next(); 273 put(entry.getKey(), entry.getValue()); 274 } 275 } 276 } 277 278 /* 279 * (non-Javadoc) 280 * 281 * @see org.apache.activemq.kaha.MapContainer#keySet() 282 */ 283 public synchronized Set keySet() { 284 load(); 285 return new ContainerKeySet(this); 286 } 287 288 /* 289 * (non-Javadoc) 290 * 291 * @see org.apache.activemq.kaha.MapContainer#values() 292 */ 293 public synchronized Collection values() { 294 load(); 295 return new ContainerValueCollection(this); 296 } 297 298 /* 299 * (non-Javadoc) 300 * 301 * @see org.apache.activemq.kaha.MapContainer#entrySet() 302 */ 303 public synchronized Set entrySet() { 304 load(); 305 return new ContainerEntrySet(this); 306 } 307 308 /* 309 * (non-Javadoc) 310 * 311 * @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object, 312 * java.lang.Object) 313 */ 314 public synchronized Object put(Object key, Object value) { 315 load(); 316 Object result = remove(key); 317 IndexItem item = write(key, value); 318 try { 319 index.store(key, item); 320 } catch (IOException e) { 321 LOG.error("Failed trying to insert key: " + key, e); 322 throw new RuntimeException(e); 323 } 324 indexList.add(item); 325 return result; 326 } 327 328 /* 329 * (non-Javadoc) 330 * 331 * @see org.apache.activemq.kaha.MapContainer#remove(java.lang.Object) 332 */ 333 public synchronized Object remove(Object key) { 334 load(); 335 try { 336 Object result = null; 337 IndexItem item = (IndexItem)index.remove(key); 338 if (item != null) { 339 // refresh the index 340 item = (IndexItem)indexList.refreshEntry(item); 341 result = getValue(item); 342 IndexItem prev = indexList.getPrevEntry(item); 343 IndexItem next = indexList.getNextEntry(item); 344 indexList.remove(item); 345 delete(item, prev, next); 346 } 347 return result; 348 } catch (IOException e) { 349 LOG.error("Failed trying to remove key: " + key, e); 350 throw new RuntimeException(e); 351 } 352 } 353 354 public synchronized boolean removeValue(Object o) { 355 load(); 356 boolean result = false; 357 if (o != null) { 358 IndexItem item = indexList.getFirst(); 359 while (item != null) { 360 Object value = getValue(item); 361 if (value != null && value.equals(o)) { 362 result = true; 363 // find the key 364 Object key = getKey(item); 365 if (key != null) { 366 remove(key); 367 } 368 break; 369 } 370 item = indexList.getNextEntry(item); 371 } 372 } 373 return result; 374 } 375 376 protected synchronized void remove(IndexItem item) { 377 Object key = getKey(item); 378 if (key != null) { 379 remove(key); 380 } 381 } 382 383 /* 384 * (non-Javadoc) 385 * 386 * @see org.apache.activemq.kaha.MapContainer#clear() 387 */ 388 public synchronized void clear() { 389 checkClosed(); 390 loaded = true; 391 init(); 392 if (index != null) { 393 try { 394 index.clear(); 395 } catch (IOException e) { 396 LOG.error("Failed trying clear index", e); 397 throw new RuntimeException(e); 398 } 399 } 400 super.clear(); 401 doClear(); 402 } 403 404 /** 405 * Add an entry to the Store Map 406 * 407 * @param key 408 * @param value 409 * @return the StoreEntry associated with the entry 410 */ 411 public synchronized StoreEntry place(Object key, Object value) { 412 load(); 413 try { 414 remove(key); 415 IndexItem item = write(key, value); 416 index.store(key, item); 417 indexList.add(item); 418 return item; 419 } catch (IOException e) { 420 LOG.error("Failed trying to place key: " + key, e); 421 throw new RuntimeException(e); 422 } 423 } 424 425 /** 426 * Remove an Entry from ther Map 427 * 428 * @param entry 429 * @throws IOException 430 */ 431 public synchronized void remove(StoreEntry entry) { 432 load(); 433 IndexItem item = (IndexItem)entry; 434 if (item != null) { 435 Object key = getKey(item); 436 try { 437 index.remove(key); 438 } catch (IOException e) { 439 LOG.error("Failed trying to remove entry: " + entry, e); 440 throw new RuntimeException(e); 441 } 442 IndexItem prev = indexList.getPrevEntry(item); 443 IndexItem next = indexList.getNextEntry(item); 444 indexList.remove(item); 445 delete(item, prev, next); 446 } 447 } 448 449 public synchronized StoreEntry getFirst() { 450 load(); 451 return indexList.getFirst(); 452 } 453 454 public synchronized StoreEntry getLast() { 455 load(); 456 return indexList.getLast(); 457 } 458 459 public synchronized StoreEntry getNext(StoreEntry entry) { 460 load(); 461 IndexItem item = (IndexItem)entry; 462 return indexList.getNextEntry(item); 463 } 464 465 public synchronized StoreEntry getPrevious(StoreEntry entry) { 466 load(); 467 IndexItem item = (IndexItem)entry; 468 return indexList.getPrevEntry(item); 469 } 470 471 public synchronized StoreEntry refresh(StoreEntry entry) { 472 load(); 473 return indexList.getEntry(entry); 474 } 475 476 /** 477 * Get the value from it's location 478 * 479 * @param item 480 * @return the value associated with the store entry 481 */ 482 public synchronized Object getValue(StoreEntry item) { 483 load(); 484 Object result = null; 485 if (item != null) { 486 try { 487 // ensure this value is up to date 488 // item=indexList.getEntry(item); 489 StoreLocation data = item.getValueDataItem(); 490 result = dataManager.readItem(valueMarshaller, data); 491 } catch (IOException e) { 492 LOG.error("Failed to get value for " + item, e); 493 throw new RuntimeStoreException(e); 494 } 495 } 496 return result; 497 } 498 499 /** 500 * Get the Key object from it's location 501 * 502 * @param item 503 * @return the Key Object associated with the StoreEntry 504 */ 505 public synchronized Object getKey(StoreEntry item) { 506 load(); 507 Object result = null; 508 if (item != null) { 509 try { 510 StoreLocation data = item.getKeyDataItem(); 511 result = dataManager.readItem(keyMarshaller, data); 512 } catch (IOException e) { 513 LOG.error("Failed to get key for " + item, e); 514 throw new RuntimeStoreException(e); 515 } 516 } 517 return result; 518 } 519 520 protected IndexLinkedList getItemList() { 521 return indexList; 522 } 523 524 protected synchronized IndexItem write(Object key, Object value) { 525 IndexItem index = null; 526 try { 527 index = indexManager.createNewIndex(); 528 StoreLocation data = dataManager.storeDataItem(keyMarshaller, key); 529 index.setKeyData(data); 530 531 if (value != null) { 532 data = dataManager.storeDataItem(valueMarshaller, value); 533 index.setValueData(data); 534 } 535 IndexItem prev = indexList.getLast(); 536 prev = prev != null ? prev : indexList.getRoot(); 537 IndexItem next = indexList.getNextEntry(prev); 538 prev.setNextItem(index.getOffset()); 539 index.setPreviousItem(prev.getOffset()); 540 updateIndexes(prev); 541 if (next != null) { 542 next.setPreviousItem(index.getOffset()); 543 index.setNextItem(next.getOffset()); 544 updateIndexes(next); 545 } 546 storeIndex(index); 547 } catch (IOException e) { 548 LOG.error("Failed to write " + key + " , " + value, e); 549 throw new RuntimeStoreException(e); 550 } 551 return index; 552 } 553 554 public int getIndexBinSize() { 555 return indexBinSize; 556 } 557 558 public void setIndexBinSize(int indexBinSize) { 559 this.indexBinSize = indexBinSize; 560 } 561 562 public int getIndexKeySize() { 563 return indexKeySize; 564 } 565 566 public void setIndexKeySize(int indexKeySize) { 567 this.indexKeySize = indexKeySize; 568 } 569 570 public int getIndexPageSize() { 571 return indexPageSize; 572 } 573 574 public void setIndexPageSize(int indexPageSize) { 575 this.indexPageSize = indexPageSize; 576 } 577 578 public int getIndexLoadFactor() { 579 return indexLoadFactor; 580 } 581 582 public void setIndexLoadFactor(int loadFactor) { 583 this.indexLoadFactor = loadFactor; 584 } 585 586 587 public IndexMBean getIndexMBean() { 588 return (IndexMBean) index; 589 } 590 public int getIndexMaxBinSize() { 591 return indexMaxBinSize; 592 } 593 594 public void setIndexMaxBinSize(int maxBinSize) { 595 this.indexMaxBinSize = maxBinSize; 596 } 597 598 599 600 public String toString() { 601 load(); 602 StringBuffer buf = new StringBuffer(); 603 buf.append("{"); 604 Iterator i = entrySet().iterator(); 605 boolean hasNext = i.hasNext(); 606 while (hasNext) { 607 Map.Entry e = (Entry) i.next(); 608 Object key = e.getKey(); 609 Object value = e.getValue(); 610 buf.append(key); 611 buf.append("="); 612 613 buf.append(value); 614 hasNext = i.hasNext(); 615 if (hasNext) 616 buf.append(", "); 617 } 618 buf.append("}"); 619 return buf.toString(); 620 } 621}