001/* 002 * (C) Copyright 2011-2018 Nuxeo (http://nuxeo.com/) and others. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 * 016 * Contributors: 017 * Thomas Roger <[email protected]> 018 */ 019package org.nuxeo.ecm.activity; 020 021import java.io.Serializable; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.Date; 025import java.util.HashMap; 026import java.util.Iterator; 027import java.util.List; 028import java.util.Locale; 029import java.util.Map; 030import java.util.MissingResourceException; 031import java.util.regex.Matcher; 032import java.util.regex.Pattern; 033 034import javax.persistence.EntityManager; 035import javax.persistence.Query; 036 037import org.apache.commons.lang3.StringUtils; 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040import org.nuxeo.common.utils.i18n.I18NUtils; 041import org.nuxeo.ecm.core.api.NuxeoException; 042import org.nuxeo.ecm.core.persistence.PersistenceProvider; 043import org.nuxeo.ecm.core.persistence.PersistenceProviderFactory; 044import org.nuxeo.ecm.core.repository.RepositoryInitializationHandler; 045import org.nuxeo.runtime.api.Framework; 046import org.nuxeo.runtime.model.ComponentContext; 047import org.nuxeo.runtime.model.ComponentInstance; 048import org.nuxeo.runtime.model.DefaultComponent; 049 050/** 051 * Default implementation of {@link ActivityStreamService}. 052 * 053 * @author <a href="mailto:[email protected]">Thomas Roger</a> 054 * @since 5.5 055 */ 056public class ActivityStreamServiceImpl extends DefaultComponent implements ActivityStreamService { 057 058 private static final Log log = LogFactory.getLog(ActivityStreamServiceImpl.class); 059 060 public static final String ACTIVITIES_PROVIDER = "nxactivities"; 061 062 public static final String ACTIVITY_STREAM_FILTER_EP = "activityStreamFilters"; 063 064 public static final String ACTIVITY_STREAMS_EP = "activityStreams"; 065 066 public static final String ACTIVITY_VERBS_EP = "activityVerbs"; 067 068 public static final String ACTIVITY_LINK_BUILDERS_EP = "activityLinkBuilders"; 069 070 public static final String ACTIVITY_UPGRADERS_EP = "activityUpgraders"; 071 072 protected final ThreadLocal<EntityManager> localEntityManager = new ThreadLocal<>(); 073 074 protected final Map<String, ActivityStreamFilter> activityStreamFilters = new HashMap<>(); 075 076 protected ActivityStreamRegistry activityStreamRegistry; 077 078 protected ActivityVerbRegistry activityVerbRegistry; 079 080 protected ActivityLinkBuilderRegistry activityLinkBuilderRegistry; 081 082 protected ActivityUpgraderRegistry activityUpgraderRegistry; 083 084 protected PersistenceProvider persistenceProvider; 085 086 protected RepositoryInitializationHandler initializationHandler; 087 088 public void upgradeActivities() { 089 for (final ActivityUpgrader upgrader : activityUpgraderRegistry.getOrderedActivityUpgraders()) { 090 try { 091 getOrCreatePersistenceProvider().run(false, em -> { 092 upgradeActivities(em, upgrader); 093 }); 094 } catch (NuxeoException e) { 095 log.error(String.format("Error while running '%s' activity upgrader: %s", upgrader.getName(), 096 e.getMessage())); 097 log.debug(e, e); 098 } 099 } 100 } 101 102 protected void upgradeActivities(EntityManager em, ActivityUpgrader upgrader) { 103 try { 104 localEntityManager.set(em); 105 upgrader.doUpgrade(this); 106 } finally { 107 localEntityManager.remove(); 108 } 109 } 110 111 @Override 112 public ActivitiesList query(String filterId, final Map<String, Serializable> parameters) { 113 return query(filterId, parameters, 0, 0); 114 } 115 116 @Override 117 public ActivitiesList query(String filterId, final Map<String, Serializable> parameters, final long offset, 118 final long limit) { 119 if (ALL_ACTIVITIES.equals(filterId)) { 120 return queryAll(offset, limit); 121 } 122 123 final ActivityStreamFilter filter = activityStreamFilters.get(filterId); 124 if (filter == null) { 125 throw new NuxeoException(String.format("Unable to retrieve '%s' ActivityStreamFilter", filterId)); 126 } 127 128 return query(filter, parameters, offset, limit); 129 } 130 131 protected ActivitiesList query(final ActivityStreamFilter filter, final Map<String, Serializable> parameters, 132 final long offset, final long limit) { 133 return getOrCreatePersistenceProvider().run(false, em -> { 134 return query(em, filter, parameters, offset, limit); 135 }); 136 } 137 138 protected ActivitiesList query(EntityManager em, ActivityStreamFilter filter, Map<String, Serializable> parameters, 139 long offset, long limit) { 140 try { 141 localEntityManager.set(em); 142 return filter.query(this, parameters, offset, limit); 143 } finally { 144 localEntityManager.remove(); 145 } 146 147 } 148 149 protected ActivitiesList queryAll(final long offset, final long limit) { 150 return getOrCreatePersistenceProvider().run(false, em -> { 151 return queryAll(em, offset, limit); 152 }); 153 } 154 155 @SuppressWarnings("unchecked") 156 protected ActivitiesList queryAll(EntityManager em, long offset, long limit) { 157 Query query = em.createQuery("select activity from Activity activity order by activity.id asc"); 158 if (limit > 0) { 159 query.setMaxResults((int) limit); 160 } 161 if (offset > 0) { 162 query.setFirstResult((int) offset); 163 } 164 return new ActivitiesListImpl(query.getResultList()); 165 } 166 167 @Override 168 public Activity addActivity(final Activity activity) { 169 if (activity.getPublishedDate() == null) { 170 activity.setPublishedDate(new Date()); 171 } 172 getOrCreatePersistenceProvider().run(true, em -> { 173 addActivity(em, activity); 174 }); 175 return activity; 176 } 177 178 protected void addActivity(EntityManager em, Activity activity) { 179 try { 180 localEntityManager.set(em); 181 em.persist(activity); 182 for (ActivityStreamFilter filter : activityStreamFilters.values()) { 183 if (filter.isInterestedIn(activity)) { 184 filter.handleNewActivity(this, activity); 185 } 186 } 187 } finally { 188 localEntityManager.remove(); 189 } 190 } 191 192 @Override 193 public void removeActivities(final Collection<Activity> activities) { 194 if (activities == null || activities.isEmpty()) { 195 return; 196 } 197 getOrCreatePersistenceProvider().run(true, em -> { 198 removeActivities(em, activities); 199 }); 200 } 201 202 protected void removeActivities(EntityManager em, Collection<Activity> activities) { 203 try { 204 localEntityManager.set(em); 205 206 ActivitiesList l = new ActivitiesListImpl(activities); 207 for (ActivityStreamFilter filter : activityStreamFilters.values()) { 208 filter.handleRemovedActivities(this, l); 209 } 210 211 Query query = em.createQuery("delete from Activity activity where activity.id in (:ids)"); 212 query.setParameter("ids", l.toActivityIds()); 213 query.executeUpdate(); 214 } finally { 215 localEntityManager.remove(); 216 } 217 } 218 219 @Override 220 public ActivityMessage toActivityMessage(final Activity activity, Locale locale) { 221 return toActivityMessage(activity, locale, null); 222 } 223 224 @Override 225 public ActivityMessage toActivityMessage(Activity activity, Locale locale, String activityLinkBuilderName) { 226 ActivityLinkBuilder activityLinkBuilder = getActivityLinkBuilder(activityLinkBuilderName); 227 228 Map<String, String> fields = activity.toMap(); 229 230 String actor = activity.getActor(); 231 String displayActor = activity.getDisplayActor(); 232 String displayActorLink; 233 if (ActivityHelper.isUser(actor)) { 234 displayActorLink = activityLinkBuilder.getUserProfileLink(actor, activity.getDisplayActor()); 235 } else { 236 displayActorLink = activity.getDisplayActor(); 237 } 238 239 List<ActivityReplyMessage> activityReplyMessages = toActivityReplyMessages(activity.getActivityReplies(), 240 locale, activityLinkBuilderName); 241 242 ActivityVerb verb = activityVerbRegistry.get(activity.getVerb()); 243 244 if (verb == null || verb.getLabelKey() == null) { 245 return new ActivityMessage(activity.getId(), actor, displayActor, displayActorLink, activity.getVerb(), 246 activity.toString(), activity.getPublishedDate(), null, activityReplyMessages); 247 } 248 249 String labelKey = verb.getLabelKey(); 250 String messageTemplate; 251 try { 252 messageTemplate = I18NUtils.getMessageString("messages", labelKey, null, locale); 253 } catch (MissingResourceException e) { 254 log.error(e.getMessage()); 255 log.debug(e, e); 256 // just return the labelKey if we have no resource bundle 257 return new ActivityMessage(activity.getId(), actor, displayActor, displayActorLink, activity.getVerb(), 258 labelKey, activity.getPublishedDate(), verb.getIcon(), activityReplyMessages); 259 } 260 261 Pattern pattern = Pattern.compile("\\$\\{(.*?)\\}"); 262 Matcher m = pattern.matcher(messageTemplate); 263 while (m.find()) { 264 String param = m.group().replaceAll("[\\|$\\|{\\}]", ""); 265 if (fields.containsKey(param)) { 266 String value = fields.get(param); 267 final String displayValue = fields.get("display" + StringUtils.capitalize(param)); 268 if (ActivityHelper.isDocument(value)) { 269 value = activityLinkBuilder.getDocumentLink(value, displayValue); 270 } else if (ActivityHelper.isUser(value)) { 271 value = activityLinkBuilder.getUserProfileLink(value, displayValue); 272 } else { 273 // simple text 274 value = ActivityMessageHelper.replaceURLsByLinks(value); 275 } 276 messageTemplate = messageTemplate.replace(m.group(), value); 277 } 278 } 279 280 return new ActivityMessage(activity.getId(), actor, displayActor, displayActorLink, activity.getVerb(), 281 messageTemplate, activity.getPublishedDate(), verb.getIcon(), activityReplyMessages); 282 } 283 284 @Override 285 public ActivityLinkBuilder getActivityLinkBuilder(String name) { 286 ActivityLinkBuilder activityLinkBuilder; 287 if (StringUtils.isBlank(name)) { 288 activityLinkBuilder = activityLinkBuilderRegistry.getDefaultActivityLinkBuilder(); 289 } else { 290 activityLinkBuilder = activityLinkBuilderRegistry.get(name); 291 if (activityLinkBuilder == null) { 292 log.warn("Fallback on default Activity link builder"); 293 activityLinkBuilder = activityLinkBuilderRegistry.getDefaultActivityLinkBuilder(); 294 } 295 } 296 return activityLinkBuilder; 297 } 298 299 @Override 300 public ActivityReplyMessage toActivityReplyMessage(ActivityReply activityReply, Locale locale) { 301 return toActivityReplyMessage(activityReply, locale, null); 302 } 303 304 @Override 305 public ActivityReplyMessage toActivityReplyMessage(ActivityReply activityReply, Locale locale, 306 String activityLinkBuilderName) { 307 ActivityLinkBuilder activityLinkBuilder = getActivityLinkBuilder(activityLinkBuilderName); 308 309 String actor = activityReply.getActor(); 310 String displayActor = activityReply.getDisplayActor(); 311 String displayActorLink = activityLinkBuilder.getUserProfileLink(actor, displayActor); 312 String message = ActivityMessageHelper.replaceURLsByLinks(activityReply.getMessage()); 313 return new ActivityReplyMessage(activityReply.getId(), actor, displayActor, displayActorLink, message, 314 activityReply.getPublishedDate()); 315 316 } 317 318 private List<ActivityReplyMessage> toActivityReplyMessages(List<ActivityReply> replies, Locale locale, 319 String activityLinkBuilderName) { 320 List<ActivityReplyMessage> activityReplyMessages = new ArrayList<>(); 321 for (ActivityReply reply : replies) { 322 activityReplyMessages.add(toActivityReplyMessage(reply, locale, activityLinkBuilderName)); 323 } 324 return activityReplyMessages; 325 } 326 327 @Override 328 public ActivityStream getActivityStream(String name) { 329 return activityStreamRegistry.get(name); 330 } 331 332 @Override 333 public ActivityReply addActivityReply(Serializable activityId, ActivityReply activityReply) { 334 Activity activity = getActivity(activityId); 335 if (activity != null) { 336 List<ActivityReply> replies = activity.getActivityReplies(); 337 String newReplyId = computeNewReplyId(activity); 338 activityReply.setId(newReplyId); 339 replies.add(activityReply); 340 activity.setActivityReplies(replies); 341 updateActivity(activity); 342 } 343 return activityReply; 344 } 345 346 /** 347 * @since 5.6 348 */ 349 protected String computeNewReplyId(Activity activity) { 350 String replyIdPrefix = activity.getId() + "-reply-"; 351 List<ActivityReply> replies = activity.getActivityReplies(); 352 long maxId = 0; 353 for (ActivityReply reply : replies) { 354 String replyId = reply.getId(); 355 long currentId = Long.parseLong(replyId.replace(replyIdPrefix, "")); 356 if (currentId > maxId) { 357 maxId = currentId; 358 } 359 } 360 return replyIdPrefix + (maxId + 1); 361 } 362 363 public Activity getActivity(final Serializable activityId) { 364 return getOrCreatePersistenceProvider().run(false, em -> { 365 return getActivity(em, activityId); 366 }); 367 } 368 369 public ActivitiesList getActivities(final Collection<Serializable> activityIds) { 370 return getOrCreatePersistenceProvider().run(false, em -> { 371 return getActivities(em, activityIds); 372 }); 373 } 374 375 @Override 376 public ActivityReply removeActivityReply(final Serializable activityId, final String activityReplyId) { 377 return getOrCreatePersistenceProvider().run(true, em -> { 378 return removeActivityReply(em, activityId, activityReplyId); 379 }); 380 } 381 382 /** 383 * @since 5.6 384 */ 385 protected ActivityReply removeActivityReply(EntityManager em, Serializable activityId, String activityReplyId) { 386 try { 387 localEntityManager.set(em); 388 389 Activity activity = getActivity(activityId); 390 if (activity != null) { 391 List<ActivityReply> replies = activity.getActivityReplies(); 392 for (Iterator<ActivityReply> it = replies.iterator(); it.hasNext();) { 393 ActivityReply reply = it.next(); 394 if (reply.getId().equals(activityReplyId)) { 395 for (ActivityStreamFilter filter : activityStreamFilters.values()) { 396 filter.handleRemovedActivityReply(this, activity, reply); 397 } 398 it.remove(); 399 activity.setActivityReplies(replies); 400 updateActivity(activity); 401 return reply; 402 } 403 } 404 } 405 return null; 406 } finally { 407 localEntityManager.remove(); 408 } 409 } 410 411 protected Activity getActivity(EntityManager em, Serializable activityId) { 412 Query query = em.createQuery("select activity from Activity activity where activity.id = :activityId"); 413 query.setParameter("activityId", activityId); 414 return (Activity) query.getSingleResult(); 415 } 416 417 @SuppressWarnings("unchecked") 418 protected ActivitiesList getActivities(EntityManager em, Collection<Serializable> activityIds) { 419 Query query = em.createQuery("select activity from Activity activity where activity.id in (:ids)"); 420 query.setParameter("ids", activityIds); 421 return new ActivitiesListImpl(query.getResultList()); 422 } 423 424 protected void updateActivity(final Activity activity) { 425 getOrCreatePersistenceProvider().run(false, em -> { 426 activity.setLastUpdatedDate(new Date()); 427 return em.merge(activity); 428 }); 429 } 430 431 public EntityManager getEntityManager() { 432 return localEntityManager.get(); 433 } 434 435 public PersistenceProvider getOrCreatePersistenceProvider() { 436 if (persistenceProvider == null) { 437 activatePersistenceProvider(); 438 } 439 return persistenceProvider; 440 } 441 442 protected void activatePersistenceProvider() { 443 Thread thread = Thread.currentThread(); 444 ClassLoader last = thread.getContextClassLoader(); 445 try { 446 thread.setContextClassLoader(PersistenceProvider.class.getClassLoader()); 447 PersistenceProviderFactory persistenceProviderFactory = Framework.getService( 448 PersistenceProviderFactory.class); 449 persistenceProvider = persistenceProviderFactory.newProvider(ACTIVITIES_PROVIDER); 450 persistenceProvider.openPersistenceUnit(); 451 } finally { 452 thread.setContextClassLoader(last); 453 } 454 } 455 456 protected void deactivatePersistenceProvider() { 457 if (persistenceProvider != null) { 458 persistenceProvider.closePersistenceUnit(); 459 persistenceProvider = null; 460 } 461 } 462 463 @Override 464 public void activate(ComponentContext context) { 465 super.activate(context); 466 activityStreamRegistry = new ActivityStreamRegistry(); 467 activityVerbRegistry = new ActivityVerbRegistry(); 468 activityLinkBuilderRegistry = new ActivityLinkBuilderRegistry(); 469 activityUpgraderRegistry = new ActivityUpgraderRegistry(); 470 471 initializationHandler = new ActivityRepositoryInitializationHandler(); 472 initializationHandler.install(); 473 } 474 475 @Override 476 public void deactivate(ComponentContext context) { 477 deactivatePersistenceProvider(); 478 479 if (initializationHandler != null) { 480 initializationHandler.uninstall(); 481 } 482 483 super.deactivate(context); 484 } 485 486 @Override 487 public void registerContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 488 if (ACTIVITY_STREAM_FILTER_EP.equals(extensionPoint)) { 489 registerActivityStreamFilter((ActivityStreamFilterDescriptor) contribution); 490 } else if (ACTIVITY_STREAMS_EP.equals(extensionPoint)) { 491 registerActivityStream((ActivityStream) contribution); 492 } else if (ACTIVITY_VERBS_EP.equals(extensionPoint)) { 493 registerActivityVerb((ActivityVerb) contribution); 494 } else if (ACTIVITY_LINK_BUILDERS_EP.equals(extensionPoint)) { 495 registerActivityLinkBuilder((ActivityLinkBuilderDescriptor) contribution); 496 } else if (ACTIVITY_UPGRADERS_EP.equals(extensionPoint)) { 497 registerActivityUpgrader((ActivityUpgraderDescriptor) contribution); 498 } 499 } 500 501 private void registerActivityStreamFilter(ActivityStreamFilterDescriptor descriptor) { 502 ActivityStreamFilter filter = descriptor.getActivityStreamFilter(); 503 504 String filterId = filter.getId(); 505 506 boolean enabled = descriptor.isEnabled(); 507 if (activityStreamFilters.containsKey(filterId)) { 508 log.info("Overriding activity stream filter with id " + filterId); 509 if (!enabled) { 510 activityStreamFilters.remove(filterId); 511 log.info("Disabled activity stream filter with id " + filterId); 512 } 513 } 514 if (enabled) { 515 log.info("Registering activity stream filter with id " + filterId); 516 activityStreamFilters.put(filterId, descriptor.getActivityStreamFilter()); 517 } 518 } 519 520 private void registerActivityStream(ActivityStream activityStream) { 521 log.info(String.format("Registering activity stream '%s'", activityStream.getName())); 522 activityStreamRegistry.addContribution(activityStream); 523 } 524 525 private void registerActivityVerb(ActivityVerb activityVerb) { 526 log.info(String.format("Registering activity verb '%s'", activityVerb.getVerb())); 527 activityVerbRegistry.addContribution(activityVerb); 528 } 529 530 private void registerActivityLinkBuilder(ActivityLinkBuilderDescriptor activityLinkBuilderDescriptor) { 531 log.info(String.format("Registering activity link builder '%s'", activityLinkBuilderDescriptor.getName())); 532 activityLinkBuilderRegistry.addContribution(activityLinkBuilderDescriptor); 533 } 534 535 private void registerActivityUpgrader(ActivityUpgraderDescriptor activityUpgraderDescriptor) { 536 log.info(String.format("Registering activity upgrader '%s'", activityUpgraderDescriptor.getName())); 537 activityUpgraderRegistry.addContribution(activityUpgraderDescriptor); 538 } 539 540 @Override 541 public void unregisterContribution(Object contribution, String extensionPoint, ComponentInstance contributor) { 542 if (ACTIVITY_STREAM_FILTER_EP.equals(extensionPoint)) { 543 unregisterActivityStreamFilter((ActivityStreamFilterDescriptor) contribution); 544 } else if (ACTIVITY_STREAMS_EP.equals(extensionPoint)) { 545 unregisterActivityStream((ActivityStream) contribution); 546 } else if (ACTIVITY_VERBS_EP.equals(extensionPoint)) { 547 unregisterActivityVerb((ActivityVerb) contribution); 548 } else if (ACTIVITY_LINK_BUILDERS_EP.equals(extensionPoint)) { 549 unregisterActivityLinkBuilder((ActivityLinkBuilderDescriptor) contribution); 550 } else if (ACTIVITY_UPGRADERS_EP.equals(extensionPoint)) { 551 unregisterActivityUpgrader((ActivityUpgraderDescriptor) contribution); 552 } 553 } 554 555 private void unregisterActivityStreamFilter(ActivityStreamFilterDescriptor descriptor) { 556 ActivityStreamFilter filter = descriptor.getActivityStreamFilter(); 557 String filterId = filter.getId(); 558 activityStreamFilters.remove(filterId); 559 log.info("Unregistering activity stream filter with id " + filterId); 560 } 561 562 private void unregisterActivityStream(ActivityStream activityStream) { 563 activityStreamRegistry.removeContribution(activityStream); 564 log.info(String.format("Unregistering activity stream '%s'", activityStream.getName())); 565 } 566 567 private void unregisterActivityVerb(ActivityVerb activityVerb) { 568 activityVerbRegistry.removeContribution(activityVerb); 569 log.info(String.format("Unregistering activity verb '%s'", activityVerb.getVerb())); 570 } 571 572 private void unregisterActivityLinkBuilder(ActivityLinkBuilderDescriptor activityLinkBuilderDescriptor) { 573 activityLinkBuilderRegistry.removeContribution(activityLinkBuilderDescriptor); 574 log.info(String.format("Unregistering activity link builder '%s'", activityLinkBuilderDescriptor.getName())); 575 } 576 577 private void unregisterActivityUpgrader(ActivityUpgraderDescriptor activityUpgraderDescriptor) { 578 activityUpgraderRegistry.removeContribution(activityUpgraderDescriptor); 579 log.info(String.format("Unregistering activity upgrader '%s'", activityUpgraderDescriptor.getName())); 580 } 581 582}