NotificationService.java
package com.reallifedeveloper.common.application.notification;
import static com.reallifedeveloper.common.domain.LogUtil.removeCRLF;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.annotation.Transactional;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import com.reallifedeveloper.common.application.eventstore.EventStore;
import com.reallifedeveloper.common.application.eventstore.StoredEvent;
import com.reallifedeveloper.common.domain.ErrorHandling;
/**
* An application service to work with {@link NotificationLog NotificationLogs}.
*
* @author RealLifeDeveloper
*/
public class NotificationService {
private static final Logger LOG = LoggerFactory.getLogger(NotificationService.class);
private final EventStore eventStore;
private final PublishedMessageTrackerRepository messageTrackerRepository;
private final NotificationPublisher notificationPublisher;
/**
* Creates a new {@code NotificationService} that uses the given components.
*
* @param eventStore an event store for finding stored domain events
* @param messageTrackerRepository a repository for keeping track of the last notification published
* @param notificationPublisher a publisher of notifications to external systems
*
* @throws IllegalArgumentException if any argument is {@code null}
*/
@SuppressFBWarnings("EI_EXPOSE_REP2")
public NotificationService(EventStore eventStore, PublishedMessageTrackerRepository messageTrackerRepository,
NotificationPublisher notificationPublisher) {
ErrorHandling.checkNull("Arguments must not be null: eventStore=%s, messageTrackerRepository=%s, notificationPublisher=%s",
eventStore, messageTrackerRepository, notificationPublisher);
this.eventStore = eventStore;
this.messageTrackerRepository = messageTrackerRepository;
this.notificationPublisher = notificationPublisher;
}
/**
* Gives a {@link NotificationLog} containing the most recent {@link Notification Notifications}.
*
* @param batchSize the maximum number of notifications in the notification log
*
* @return a notification log with the most recent notifications
*/
@Transactional(readOnly = true)
public NotificationLog currentNotificationLog(int batchSize) {
LOG.trace("currentNotificationLog: batchSize={}", batchSize);
NotificationLogId notificationLogId = calculateCurrentNotificationLogId(batchSize);
NotificationLog notificationLog = findNotificationLog(notificationLogId);
LOG.trace("currentNotificationLog: {}", notificationLog);
return notificationLog;
}
/**
* Gives an archived {@link NotificationLog}.
*
* @param notificationLogId represents the first and last {@link Notification} in the log
*
* @return an archived {@code NotificationLog}
*/
@Transactional(readOnly = true)
@SuppressFBWarnings(value = "CRLF_INJECTION_LOGS", justification = "Logging only of objects, not user data")
public NotificationLog notificationLog(NotificationLogId notificationLogId) {
LOG.trace("notificationLog: notificationLogId={}", notificationLogId);
ErrorHandling.checkNull("notificationLogId must not be null", notificationLogId);
NotificationLog notificationLog = findNotificationLog(notificationLogId);
LOG.trace("notificationLog: {}", notificationLog);
return notificationLog;
}
private NotificationLog findNotificationLog(NotificationLogId notificationLogId) {
List<StoredEvent> storedEvents = eventStore.allEventsBetween(notificationLogId.low(), notificationLogId.high());
long lastStoredEventId = eventStore.lastStoredEventId();
boolean archivedIndicator = notificationLogId.high() <= lastStoredEventId;
NotificationLogId next = notificationLogId.high() < lastStoredEventId ? notificationLogId.next() : null;
NotificationLogId previous = notificationLogId.low() > 1 ? notificationLogId.previous() : null;
return new NotificationLog(notificationLogId, next, previous, notificationsFrom(storedEvents), archivedIndicator);
}
private NotificationLogId calculateCurrentNotificationLogId(int batchSize) {
long count = eventStore.lastStoredEventId();
long remainder = count % batchSize;
if (remainder == 0) {
remainder = batchSize;
}
long low = count - remainder + 1;
if (low < 1) {
low = 1;
}
long high = low + batchSize - 1;
return new NotificationLogId(low, high);
}
private List<Notification> notificationsFrom(List<StoredEvent> storedEvents) {
List<Notification> notifications = new ArrayList<>();
NotificationFactory notificationFactory = NotificationFactory.instance(eventStore);
for (StoredEvent storedEvent : storedEvents) {
notifications.add(notificationFactory.fromStoredEvent(storedEvent));
}
return notifications;
}
/**
* Publishes notifications about all events that have occurred since the last publication to the given publication channel.
*
* @param publicationChannel the name of the publication channel to publish notifications on
*
* @throws IOException if publishing failed
*/
@Transactional
public void publishNotifications(String publicationChannel) throws IOException {
LOG.trace("publishNotifications: publicationChannel={}", removeCRLF(publicationChannel));
PublishedMessageTracker messageTracker = messageTracker(publicationChannel);
List<Notification> notifications = unpublishedNotifications(messageTracker.lastPublishedMessageId());
notificationPublisher.publish(notifications, publicationChannel);
trackLastPublishedMessage(messageTracker, notifications);
LOG.trace("publishNotifications: done");
}
private PublishedMessageTracker messageTracker(String publicationChannel) {
return messageTrackerRepository.findByPublicationChannel(publicationChannel)
.orElseGet(() -> new PublishedMessageTracker(0, publicationChannel));
}
private List<Notification> unpublishedNotifications(long lastPublishedMessageId) {
List<StoredEvent> storedEvents = eventStore.allEventsSince(lastPublishedMessageId);
return notificationsFrom(storedEvents);
}
private void trackLastPublishedMessage(PublishedMessageTracker messageTracker, List<Notification> notifications) {
if (!notifications.isEmpty()) {
Notification lastNotification = notifications.get(notifications.size() - 1);
messageTracker.setLastPublishedMessageid(lastNotification.storedEventId());
messageTrackerRepository.save(messageTracker);
}
}
/**
* Make finalize method final to avoid "Finalizer attacks" and corresponding SpotBugs warning (CT_CONSTRUCTOR_THROW).
*
* @see <a href="https://wiki.sei.cmu.edu/confluence/display/java/OBJ11-J.+Be+wary+of+letting+constructors+throw+exceptions">
* Explanation of finalizer attack</a>
*/
@Override
@Deprecated
@SuppressWarnings({ "checkstyle:NoFinalizer", "PMD.EmptyFinalizer" })
protected final void finalize() throws Throwable {
// Do nothing
}
}