Skip to content

Commit

Permalink
Merge pull request #821 from dilini-muthumala/master
Browse files Browse the repository at this point in the history
Merging incremental-checkpoint-feature branch to master
  • Loading branch information
tishan89 authored May 31, 2018
2 parents 3151e7a + 889e5cb commit 0771568
Show file tree
Hide file tree
Showing 73 changed files with 5,980 additions and 345 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
WSO2 Siddhi
===========
Siddhi
======

Siddhi is a java library that listens to events from data streams, detects complex conditions described via a **Streaming
SQL language**, and triggers actions. It performs both **_Stream Processing_** and **_Complex Event Processing_**.
Expand Down
1,935 changes: 1,935 additions & 0 deletions docs/api/4.0.9-SNAPSHOT.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/api/latest.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# API Docs - v4.1.26
# API Docs - v4.1.27-SNAPSHOT

## Core

Expand Down
2 changes: 1 addition & 1 deletion docs/documentation/siddhi-architecture.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Siddhi Architecture

WSO2 Siddhi is a software library that can be utilized in any of the following ways:
Siddhi is a software library that can be utilized in any of the following ways:

- Run as a server on its own
- Run within WSO2 SP as a service
Expand Down
4 changes: 2 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
WSO2 Siddhi
===========
Siddhi
======

Siddhi is a java library that listens to events from data streams, detects complex conditions described via a **Streaming
SQL language**, and triggers actions. It performs both **_Stream Processing_** and **_Complex Event Processing_**.
Expand Down
13 changes: 13 additions & 0 deletions findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,15 @@
<Class name="org.wso2.siddhi.core.table.holder.IndexEventHolder"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Class name="org.wso2.siddhi.core.table.holder.SnapshotableIndexEventHolder"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Class name="org.wso2.siddhi.core.table.holder.ListEventHolder"/>
<Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/>
</Match>


<Match>
<Package name="~org\.wso2\.siddhi\.core\.query\.output\.ratelimit\.time.*"/>
Expand All @@ -181,6 +190,10 @@
<Package name="~org\.wso2\.siddhi\.core\.query\.selector\.attribute\.aggregator\.incremental.*"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Package name="~org\.wso2\.siddhi\.core\.util\.snapshot\.*"/>
<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"/>
</Match>

<!--other-->
<Match>
Expand Down
8 changes: 4 additions & 4 deletions mkdocs.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
site_name: WSO2 Siddhi
site_name: Siddhi
site_description: Stream Processing and Complex Event Processing Engine
site_author: WSO2
site_url: https://wso2.github.io/siddhi
extra_css:
- stylesheets/extra.css
repo_name: WSO2 Siddhi
repo_name: Siddhi
repo_url: https://github.com/wso2/siddhi
copyright: Copyright &copy; 2011 - 2017 WSO2
copyright: Copyright &copy; 2011 - 2018 WSO2
extra:
logo: images/siddhi-logo-w.svg
palette:
Expand All @@ -26,7 +26,7 @@ markdown_extensions:
- toc(permalink=true)
- codehilite(guess_lang=false)
pages:
- Welcome to WSO2 Siddhi: index.md
- Welcome to Siddhi: index.md
- Features: features.md
- Quick Start Guide: documentation/siddhi-quckstart-4.0.md
- Documentation:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder;
import org.wso2.siddhi.core.util.parser.StoreQueryParser;
import org.wso2.siddhi.core.util.parser.helper.QueryParserHelper;
import org.wso2.siddhi.core.util.snapshot.AsyncSnapshotPersistor;
import org.wso2.siddhi.core.util.persistence.util.PersistenceHelper;
import org.wso2.siddhi.core.util.snapshot.PersistenceReference;
import org.wso2.siddhi.core.util.statistics.BufferedEventsTracker;
import org.wso2.siddhi.core.util.statistics.LatencyTracker;
Expand Down Expand Up @@ -120,6 +120,7 @@ public class SiddhiAppRuntime {
private LatencyTracker storeQueryLatencyTracker;
private SiddhiDebugger siddhiDebugger;
private boolean running = false;
private Future futureIncrementalPersistor;

public SiddhiAppRuntime(Map<String, AbstractDefinition> streamDefinitionMap,
Map<String, AbstractDefinition> tableDefinitionMap,
Expand Down Expand Up @@ -531,13 +532,13 @@ public PersistenceReference persist() {
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// take snapshots of execution units
byte[] snapshots = siddhiAppContext.getSnapshotService().snapshot();
// start the snapshot persisting task asynchronously
AsyncSnapshotPersistor asyncSnapshotPersistor = new AsyncSnapshotPersistor(snapshots,
siddhiAppContext.getSiddhiContext().getPersistenceStore(), siddhiAppContext.getName());
String revision = asyncSnapshotPersistor.getRevision();
Future future = siddhiAppContext.getExecutorService().submit(asyncSnapshotPersistor);
return new PersistenceReference(future, revision);
if (siddhiAppContext.getSiddhiContext().getPersistenceStore() != null) {
return PersistenceHelper.persist(siddhiAppContext.getSnapshotService().fullSnapshot(),
siddhiAppContext);
} else {
return PersistenceHelper.persist(siddhiAppContext.getSnapshotService().incrementalSnapshot(),
siddhiAppContext);
}
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
Expand All @@ -549,7 +550,7 @@ public byte[] snapshot() {
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// take snapshots of execution units
return siddhiAppContext.getSnapshotService().snapshot();
return siddhiAppContext.getSnapshotService().fullSnapshot();
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
Expand All @@ -561,7 +562,7 @@ public void restore(byte[] snapshot) throws CannotRestoreSiddhiAppStateException
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// start the restoring process
siddhiAppContext.getPersistenceService().restore(snapshot);
siddhiAppContext.getSnapshotService().restore(snapshot);
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
Expand All @@ -573,7 +574,7 @@ public void restoreRevision(String revision) throws CannotRestoreSiddhiAppStateE
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// start the restoring process
siddhiAppContext.getPersistenceService().restoreRevision(revision);
siddhiAppContext.getSnapshotService().restoreRevision(revision);
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
Expand All @@ -586,7 +587,7 @@ public String restoreLastRevision() throws CannotRestoreSiddhiAppStateException
// first, pause all the event sources
sourceMap.values().forEach(list -> list.forEach(Source::pause));
// start the restoring process
revision = siddhiAppContext.getPersistenceService().restoreLastRevision();
revision = siddhiAppContext.getSnapshotService().restoreLastRevision();
} finally {
// at the end, resume the event sources
sourceMap.values().forEach(list -> list.forEach(Source::resume));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.wso2.siddhi.core.util.SiddhiAppRuntimeBuilder;
import org.wso2.siddhi.core.util.config.ConfigManager;
import org.wso2.siddhi.core.util.parser.SiddhiAppParser;
import org.wso2.siddhi.core.util.persistence.IncrementalPersistenceStore;
import org.wso2.siddhi.core.util.persistence.PersistenceStore;
import org.wso2.siddhi.query.api.SiddhiApp;
import org.wso2.siddhi.query.compiler.SiddhiCompiler;
Expand Down Expand Up @@ -235,4 +236,8 @@ public void restoreLastState() {
}
}
}

public void setIncrementalPersistenceStore(IncrementalPersistenceStore incrementalPersistenceStore) {
this.siddhiContext.setIncrementalPersistenceStore(incrementalPersistenceStore);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.wso2.siddhi.core.util.ElementIdGenerator;
import org.wso2.siddhi.core.util.ThreadBarrier;
import org.wso2.siddhi.core.util.extension.holder.EternalReferencedHolder;
import org.wso2.siddhi.core.util.persistence.PersistenceService;
import org.wso2.siddhi.core.util.snapshot.SnapshotService;
import org.wso2.siddhi.core.util.statistics.StatisticsManager;
import org.wso2.siddhi.core.util.timestamp.TimestampGenerator;
Expand Down Expand Up @@ -56,7 +55,6 @@ public class SiddhiAppContext {

private ThreadBarrier threadBarrier = null;
private TimestampGenerator timestampGenerator = null;
private PersistenceService persistenceService;
private ElementIdGenerator elementIdGenerator;
private Map<String, Script> scriptFunctionMap;
private ExceptionHandler<Object> disruptorExceptionHandler;
Expand Down Expand Up @@ -166,14 +164,6 @@ public void setSnapshotService(SnapshotService snapshotService) {
this.snapshotService = snapshotService;
}

public PersistenceService getPersistenceService() {
return persistenceService;
}

public void setPersistenceService(PersistenceService persistenceService) {
this.persistenceService = persistenceService;
}

public ElementIdGenerator getElementIdGenerator() {
return elementIdGenerator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@

import com.lmax.disruptor.ExceptionHandler;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.exception.PersistenceStoreException;
import org.wso2.siddhi.core.stream.input.source.SourceHandlerManager;
import org.wso2.siddhi.core.stream.output.sink.SinkHandlerManager;
import org.wso2.siddhi.core.table.record.RecordTableHandlerManager;
import org.wso2.siddhi.core.util.SiddhiExtensionLoader;
import org.wso2.siddhi.core.util.config.ConfigManager;
import org.wso2.siddhi.core.util.config.InMemoryConfigManager;
import org.wso2.siddhi.core.util.extension.holder.AbstractExtensionHolder;
import org.wso2.siddhi.core.util.persistence.IncrementalPersistenceStore;
import org.wso2.siddhi.core.util.persistence.PersistenceStore;
import org.wso2.siddhi.core.util.statistics.metrics.SiddhiMetricsFactory;

Expand All @@ -45,6 +47,7 @@ public class SiddhiContext {
private ExceptionHandler<Object> defaultDisrupterExceptionHandler;
private Map<String, Class> siddhiExtensions = new HashMap<>();
private PersistenceStore persistenceStore = null;
private IncrementalPersistenceStore incrementalPersistenceStore = null;
private ConcurrentHashMap<String, DataSource> siddhiDataSources;
private StatisticsConfiguration statisticsConfiguration;
private ConcurrentHashMap<Class, AbstractExtensionHolder> extensionHolderMap;
Expand Down Expand Up @@ -82,14 +85,31 @@ public Map<String, Class> getSiddhiExtensions() {
return siddhiExtensions;
}

public PersistenceStore getPersistenceStore() {
public synchronized PersistenceStore getPersistenceStore() {
return persistenceStore;
}

public void setPersistenceStore(PersistenceStore persistenceStore) {
public synchronized void setPersistenceStore(PersistenceStore persistenceStore) {
if (incrementalPersistenceStore != null) {
throw new PersistenceStoreException("Only one type of persistence store can exist. " +
"Incremental persistence store '" + incrementalPersistenceStore.getClass().getName() +
"' already registered!");
}
this.persistenceStore = persistenceStore;
}

public synchronized IncrementalPersistenceStore getIncrementalPersistenceStore() {
return incrementalPersistenceStore;
}

public synchronized void setIncrementalPersistenceStore(IncrementalPersistenceStore incrementalPersistenceStore) {
if (persistenceStore != null) {
throw new PersistenceStoreException("Only one type of persistence store can exist." +
" Persistence store '" + persistenceStore.getClass().getName() + "' already registered!");
}
this.incrementalPersistenceStore = incrementalPersistenceStore;
}

public void setConfigManager(ConfigManager configManager) {
this.configManager = configManager;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2017, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.siddhi.core.event.stream;

import java.io.Serializable;


/**
* The class which resembles an instance of operation performed on SnapshotableStreamEventQueue.
*/
public class Operation implements Serializable {

/**
* Possible Operator actions
*/
public enum Operator {
ADD ,
REMOVE,
CLEAR,
OVERWRITE,
DELETE_BY_OPERATOR,
DELETE_BY_INDEX
}

public Operator operation;
public Object parameters;

public Operation(Operator operation, Object parameters) {
this.operation = operation;
this.parameters = parameters;
}

public Operation(Operator operation) {
this.operation = operation;
}
}
Loading

0 comments on commit 0771568

Please sign in to comment.