001/* 002 * (C) Copyright 2015 Nuxeo SA (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thierry Delprat <[email protected]> 018 */ 019package org.nuxeo.elasticsearch.seqgen; 020 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.List; 024import java.util.NoSuchElementException; 025 026import com.google.common.collect.Lists; 027import org.elasticsearch.action.index.IndexRequest; 028import org.elasticsearch.action.index.IndexResponse; 029import org.elasticsearch.common.xcontent.XContentType; 030import org.elasticsearch.index.VersionType; 031import org.nuxeo.ecm.core.api.ConcurrentUpdateException; 032import org.nuxeo.ecm.core.api.NuxeoException; 033import org.nuxeo.ecm.core.uidgen.AbstractUIDSequencer; 034import org.nuxeo.ecm.core.uidgen.UIDSequencer; 035import org.nuxeo.elasticsearch.ElasticSearchConstants; 036import org.nuxeo.elasticsearch.api.ESClient; 037import org.nuxeo.elasticsearch.api.ElasticSearchAdmin; 038import org.nuxeo.runtime.api.Framework; 039 040/** 041 * Elasticsearch implementation of {@link UIDSequencer}. 042 * <p> 043 * Since elasticsearch does not seem to support a notion of native sequence, the implementation uses the auto-increment 044 * of the version attribute as described in the <a href= 045 * "http://blogs.perl.org/users/clinton_gormley/2011/10/elasticsearchsequence---a-blazing-fast-ticket-server.html" 046 * >ElasticSearch::Sequence - a blazing fast ticket server</a> blog post. 047 * 048 * @since 7.3 049 */ 050public class ESUIDSequencer extends AbstractUIDSequencer { 051 052 protected static final int MAX_RETRY = 3; 053 054 protected ESClient esClient = null; 055 056 protected String indexName; 057 058 @Override 059 public void init() { 060 if (esClient != null) { 061 return; 062 } 063 ElasticSearchAdmin esa = Framework.getService(ElasticSearchAdmin.class); 064 esClient = esa.getClient(); 065 indexName = esa.getIndexNameForType(ElasticSearchConstants.SEQ_ID_TYPE); 066 try { 067 boolean indexExists = esClient.indexExists(indexName); 068 if (!indexExists) { 069 throw new NuxeoException( 070 String.format("Sequencer %s needs an elasticSearchIndex contribution with type %s", getName(), 071 ElasticSearchConstants.SEQ_ID_TYPE)); 072 } 073 } catch (NoSuchElementException | NuxeoException e) { 074 dispose(); 075 throw e; 076 } 077 } 078 079 @Override 080 public void dispose() { 081 if (esClient == null) { 082 return; 083 } 084 esClient = null; 085 indexName = null; 086 } 087 088 @Override 089 public void initSequence(String key, long id) { 090 String source = "{ \"ts\" : " + System.currentTimeMillis() + "}"; 091 IndexResponse res = esClient.index( 092 new IndexRequest(indexName, ElasticSearchConstants.SEQ_ID_TYPE, key).versionType(VersionType.EXTERNAL) 093 .version(id) 094 .source(source, XContentType.JSON)); 095 } 096 097 @Override 098 public long getNextLong(String sequenceName) { 099 String source = "{ \"ts\" : " + System.currentTimeMillis() + "}"; 100 IndexResponse res = esClient.index( 101 new IndexRequest(indexName, ElasticSearchConstants.SEQ_ID_TYPE, sequenceName).source(source, 102 XContentType.JSON)); 103 return res.getVersion(); 104 } 105 106 @Override 107 public List<Long> getNextBlock(String key, int blockSize) { 108 if (blockSize == 1) { 109 return Collections.singletonList(getNextLong(key)); 110 } 111 List<Long> ret = new ArrayList<>(blockSize); 112 long first = getNextBlockWithRetry(key, blockSize); 113 for (long i = 0; i < blockSize; i++) { 114 ret.add(first + i); 115 } 116 return ret; 117 } 118 119 protected long getNextBlockWithRetry(String key, int blockSize) { 120 long ret; 121 for (int i = 0; i < MAX_RETRY; i++) { 122 ret = getNextLong(key); 123 try { 124 initSequence(key, ret + blockSize - 1); 125 return ret; 126 } catch (ConcurrentUpdateException e) { 127 if (i == MAX_RETRY - 1) { 128 throw e; 129 } 130 } 131 } 132 throw new NuxeoException("Unable to get a block of sequence"); 133 } 134}