001/* 002 * (C) Copyright 2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Kevin Leturc <[email protected]> 018 */ 019package org.nuxeo.ecm.core.bulk.message; 020 021import static java.util.Collections.emptyMap; 022 023import java.io.IOException; 024import java.io.Serializable; 025import java.util.Arrays; 026import java.util.List; 027import java.util.Map; 028 029import org.apache.avro.Schema; 030import org.apache.avro.io.Decoder; 031import org.apache.avro.io.Encoder; 032import org.apache.avro.reflect.CustomEncoding; 033 034import com.fasterxml.jackson.databind.ObjectMapper; 035 036/** 037 * This {@link CustomEncoding} encodes/decodes {@link Map}<{@link String}, {@link Serializable}> to a JSON 038 * {@link String} using Jackson before encoding it in Avro format. 039 * 040 * @since 10.3 041 */ 042public class MapAsJsonAsStringEncoding extends CustomEncoding<Map<String, Serializable>> { 043 044 protected static final int NULL_SCHEMA_INDEX = 0; 045 046 protected static final int STRING_SCHEMA_INDEX = 1; 047 048 protected static final ObjectMapper MAPPER = new ObjectMapper(); 049 050 public MapAsJsonAsStringEncoding() { 051 List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)); 052 union.get(1).addProp("CustomEncoding", "MapAsJsonAsStringEncoding"); 053 schema = Schema.createUnion(union); 054 } 055 056 @Override 057 @SuppressWarnings("unchecked") 058 protected void write(Object o, Encoder encoder) throws IOException { 059 if (o == null || ((Map<String, Serializable>) o).isEmpty()) { 060 // treat empty map as null to save some bytes 061 // encode the position of the data in the union 062 encoder.writeIndex(NULL_SCHEMA_INDEX); 063 encoder.writeNull(); 064 } else { 065 // encode the position of the data in the union 066 encoder.writeIndex(STRING_SCHEMA_INDEX); 067 String mapAsJson = MAPPER.writeValueAsString(o); 068 encoder.writeString(mapAsJson); 069 } 070 } 071 072 @Override 073 @SuppressWarnings("unchecked") 074 protected Map<String, Serializable> read(Object o, Decoder decoder) throws IOException { 075 int index = decoder.readIndex(); 076 if (index == NULL_SCHEMA_INDEX) { 077 decoder.readNull(); 078 return emptyMap(); 079 } else if (index == STRING_SCHEMA_INDEX) { 080 String mapAsJson = decoder.readString(); 081 return new ObjectMapper().readValue(mapAsJson, Map.class); 082 } else { 083 throw new IOException("Unable to read Map as Json as String, index=" + index + " is unknown"); 084 } 085 } 086}